1use alloc::vec;
2use alloc::vec::Vec;
3
4use rns_crypto::Rng;
5
6use super::advertisement::ResourceAdvertisement;
7use super::parts::{build_hashmap, has_collision, prepend_metadata, split_into_parts};
8use super::proof::{compute_expected_proof, compute_resource_hash, validate_proof};
9use super::types::*;
10use crate::buffer::types::Compressor;
11use crate::constants::*;
12use crate::hash::get_random_hash;
13
14pub struct ResourceSender {
19 pub status: ResourceStatus,
21 pub resource_hash: [u8; 32],
23 pub truncated_hash: [u8; 16],
25 pub expected_proof: [u8; 32],
27 pub original_hash: [u8; 32],
29 pub random_hash: Vec<u8>,
31 pub sdu: usize,
33 parts: Vec<Vec<u8>>,
35 pub part_hashes: Vec<[u8; RESOURCE_MAPHASH_LEN]>,
37 hashmap: Vec<u8>,
39 total_parts: usize,
41 pub sent_parts: usize,
43 sent_indices: Vec<bool>,
45 pub flags: AdvFlags,
47 pub transfer_size: usize,
49 pub data_size: usize,
51 pub segment_index: u64,
53 pub total_segments: u64,
55 pub request_id: Option<Vec<u8>>,
57 pub retries_left: usize,
59 pub max_retries: usize,
61 pub max_adv_retries: usize,
63 pub rtt: Option<f64>,
65 pub link_rtt: f64,
67 pub timeout_factor: f64,
69 pub last_activity: f64,
71 pub adv_sent: f64,
73 pub last_part_sent: f64,
75 pub sender_grace_time: f64,
77 receiver_min_consecutive_height: usize,
79}
80
81impl ResourceSender {
82 #[allow(clippy::too_many_arguments)]
100 pub fn new(
101 data: &[u8],
102 metadata: Option<&[u8]>,
103 sdu: usize,
104 encrypt_fn: &dyn Fn(&[u8]) -> Vec<u8>,
105 compressor: &dyn Compressor,
106 rng: &mut dyn Rng,
107 now: f64,
108 auto_compress: bool,
109 is_response: bool,
110 request_id: Option<Vec<u8>>,
111 segment_index: u64,
112 total_segments: u64,
113 original_hash: Option<[u8; 32]>,
114 link_rtt: f64,
115 traffic_timeout_factor: f64,
116 ) -> Result<Self, ResourceError> {
117 let uncompressed_data = match metadata {
119 Some(meta) => prepend_metadata(data, meta),
120 None => data.to_vec(),
121 };
122 let has_metadata = metadata.is_some();
123
124 let data_size = uncompressed_data.len();
125
126 let (working_data, compressed) =
128 if auto_compress && uncompressed_data.len() <= RESOURCE_AUTO_COMPRESS_MAX_SIZE {
129 match compressor.compress(&uncompressed_data) {
130 Some(compressed_data) if compressed_data.len() < uncompressed_data.len() => {
131 (compressed_data, true)
132 }
133 _ => (uncompressed_data.clone(), false),
134 }
135 } else {
136 (uncompressed_data.clone(), false)
137 };
138
139 let random_prefix: [u8; RESOURCE_RANDOM_HASH_SIZE] = {
141 let rh = get_random_hash(rng);
142 let mut buf = [0u8; RESOURCE_RANDOM_HASH_SIZE];
143 buf.copy_from_slice(&rh[..RESOURCE_RANDOM_HASH_SIZE]);
144 buf
145 };
146 let mut data_with_random =
147 Vec::with_capacity(RESOURCE_RANDOM_HASH_SIZE + working_data.len());
148 data_with_random.extend_from_slice(&random_prefix);
149 data_with_random.extend_from_slice(&working_data);
150
151 let encrypted_data = encrypt_fn(&data_with_random);
153 let transfer_size = encrypted_data.len();
154
155 let mut resource_hash;
157 let mut truncated_resource_hash;
158 let mut expected_proof;
159 let mut final_random_hash;
160 let mut parts_data;
161 let mut part_hashes;
162 let mut collision_retries = 0;
163 const MAX_COLLISION_RETRIES: usize = 100;
164
165 loop {
166 final_random_hash = {
167 let rh = get_random_hash(rng);
168 rh[..RESOURCE_RANDOM_HASH_SIZE].to_vec()
169 };
170
171 resource_hash = compute_resource_hash(&uncompressed_data, &final_random_hash);
172 truncated_resource_hash = {
173 let mut t = [0u8; 16];
174 t.copy_from_slice(&resource_hash[..16]);
175 t
176 };
177 expected_proof = compute_expected_proof(&uncompressed_data, &resource_hash);
178
179 let (p, h) = split_into_parts(&encrypted_data, sdu, &final_random_hash);
180 parts_data = p;
181 part_hashes = h;
182
183 if !has_collision(&part_hashes) {
184 break;
185 }
186 collision_retries += 1;
188 if collision_retries >= MAX_COLLISION_RETRIES {
189 return Err(ResourceError::CollisionDetected);
190 }
191 }
192
193 let hashmap = build_hashmap(&part_hashes);
194 let total_parts = parts_data.len();
195
196 let orig_hash = original_hash.unwrap_or(resource_hash);
197
198 let flags = AdvFlags {
199 encrypted: true,
200 compressed,
201 split: total_segments > 1,
202 is_request: request_id.is_some() && !is_response,
203 is_response: request_id.is_some() && is_response,
204 has_metadata,
205 };
206
207 Ok(ResourceSender {
208 status: ResourceStatus::Queued,
209 resource_hash,
210 truncated_hash: truncated_resource_hash,
211 expected_proof,
212 original_hash: orig_hash,
213 random_hash: final_random_hash,
214 sdu,
215 parts: parts_data,
216 part_hashes,
217 hashmap,
218 total_parts,
219 sent_parts: 0,
220 sent_indices: vec![false; total_parts],
221 flags,
222 transfer_size,
223 data_size,
224 segment_index,
225 total_segments,
226 request_id,
227 retries_left: RESOURCE_MAX_RETRIES,
228 max_retries: RESOURCE_MAX_RETRIES,
229 max_adv_retries: RESOURCE_MAX_ADV_RETRIES,
230 rtt: None,
231 link_rtt,
232 timeout_factor: traffic_timeout_factor,
233 last_activity: now,
234 adv_sent: now,
235 last_part_sent: now,
236 sender_grace_time: RESOURCE_SENDER_GRACE_TIME,
237 receiver_min_consecutive_height: 0,
238 })
239 }
240
241 pub fn get_advertisement(&self, segment: usize) -> Vec<u8> {
243 let adv = ResourceAdvertisement {
244 transfer_size: self.transfer_size as u64,
245 data_size: self.data_size as u64,
246 num_parts: self.total_parts as u64,
247 resource_hash: self.resource_hash.to_vec(),
248 random_hash: self.random_hash.clone(),
249 original_hash: self.original_hash.to_vec(),
250 hashmap: self.hashmap.clone(),
251 flags: self.flags,
252 segment_index: self.segment_index,
253 total_segments: self.total_segments,
254 request_id: self.request_id.clone(),
255 };
256 adv.pack(segment)
257 }
258
259 pub fn advertise(&mut self, now: f64) -> Vec<ResourceAction> {
261 self.status = ResourceStatus::Advertised;
262 self.last_activity = now;
263 self.adv_sent = now;
264 self.retries_left = self.max_adv_retries;
265 let adv_data = self.get_advertisement(0);
266 vec![ResourceAction::SendAdvertisement(adv_data)]
267 }
268
269 pub fn handle_request(&mut self, request_data: &[u8], now: f64) -> Vec<ResourceAction> {
274 if self.status == ResourceStatus::Failed {
275 return vec![];
276 }
277
278 if self.rtt.is_none() {
280 self.rtt = Some(now - self.adv_sent);
281 }
282
283 if self.status != ResourceStatus::Transferring {
284 self.status = ResourceStatus::Transferring;
285 }
286
287 self.retries_left = self.max_retries;
288 self.last_activity = now;
289
290 let wants_more_hashmap = request_data.first() == Some(&RESOURCE_HASHMAP_IS_EXHAUSTED);
291 let pad = if wants_more_hashmap {
292 1 + RESOURCE_MAPHASH_LEN
293 } else {
294 1
295 };
296
297 if request_data.len() < pad + 32 {
298 return vec![];
299 }
300
301 let requested_hashes_data = &request_data[pad + 32..];
302 let mut actions = Vec::new();
303
304 let num_requested = requested_hashes_data.len() / RESOURCE_MAPHASH_LEN;
306 let mut map_hashes_requested = Vec::with_capacity(num_requested);
307 for i in 0..num_requested {
308 let start = i * RESOURCE_MAPHASH_LEN;
309 let end = start + RESOURCE_MAPHASH_LEN;
310 if end <= requested_hashes_data.len() {
311 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
312 h.copy_from_slice(&requested_hashes_data[start..end]);
313 map_hashes_requested.push(h);
314 }
315 }
316
317 let search_start = self.receiver_min_consecutive_height;
319 let search_end = core::cmp::min(
320 search_start + RESOURCE_COLLISION_GUARD_SIZE,
321 self.total_parts,
322 );
323
324 for part_idx in search_start..search_end {
325 if map_hashes_requested.contains(&self.part_hashes[part_idx]) {
326 actions.push(ResourceAction::SendPart(self.parts[part_idx].clone()));
327 if !self.sent_indices[part_idx] {
328 self.sent_indices[part_idx] = true;
329 self.sent_parts += 1;
330 }
331 self.last_part_sent = now;
332 }
333 }
334
335 if wants_more_hashmap {
337 if let Some(hmu) = self.build_hmu(request_data, now) {
338 actions.push(ResourceAction::SendHmu(hmu));
339 }
340 }
341
342 if self.sent_parts >= self.total_parts {
344 self.status = ResourceStatus::AwaitingProof;
345 self.retries_left = 3; }
347
348 actions
349 }
350
351 fn build_hmu(&mut self, request_data: &[u8], now: f64) -> Option<Vec<u8>> {
353 if request_data.len() < 1 + RESOURCE_MAPHASH_LEN {
354 return None;
355 }
356
357 let last_map_hash_bytes = &request_data[1..1 + RESOURCE_MAPHASH_LEN];
358 let mut last_map_hash = [0u8; RESOURCE_MAPHASH_LEN];
359 last_map_hash.copy_from_slice(last_map_hash_bytes);
360
361 let search_start = self.receiver_min_consecutive_height;
363 let search_end = core::cmp::min(
364 search_start + RESOURCE_COLLISION_GUARD_SIZE,
365 self.total_parts,
366 );
367
368 let mut part_index = search_start;
369 for idx in search_start..search_end {
370 part_index = idx + 1;
371 if self.part_hashes[idx] == last_map_hash {
372 break;
373 }
374 }
375
376 self.receiver_min_consecutive_height = if part_index > RESOURCE_WINDOW_MAX {
378 part_index - 1 - RESOURCE_WINDOW_MAX
379 } else {
380 0
381 };
382
383 if !part_index.is_multiple_of(RESOURCE_HASHMAP_MAX_LEN) {
385 return None; }
387
388 let segment = part_index / RESOURCE_HASHMAP_MAX_LEN;
389 let hashmap_start = segment * RESOURCE_HASHMAP_MAX_LEN;
390 let hashmap_end =
391 core::cmp::min((segment + 1) * RESOURCE_HASHMAP_MAX_LEN, self.total_parts);
392
393 let mut hashmap_segment = Vec::new();
394 for i in hashmap_start..hashmap_end {
395 hashmap_segment.extend_from_slice(
396 &self.hashmap[i * RESOURCE_MAPHASH_LEN..(i + 1) * RESOURCE_MAPHASH_LEN],
397 );
398 }
399
400 let hmu_payload = crate::msgpack::pack(&crate::msgpack::Value::Array(vec![
402 crate::msgpack::Value::UInt(segment as u64),
403 crate::msgpack::Value::Bin(hashmap_segment),
404 ]));
405
406 let mut hmu = Vec::with_capacity(32 + hmu_payload.len());
407 hmu.extend_from_slice(&self.resource_hash);
408 hmu.extend_from_slice(&hmu_payload);
409
410 self.last_activity = now;
411 Some(hmu)
412 }
413
414 pub fn handle_proof(&mut self, proof_data: &[u8], _now: f64) -> Vec<ResourceAction> {
416 if self.status == ResourceStatus::Failed {
417 return vec![];
418 }
419
420 match validate_proof(proof_data, &self.resource_hash, &self.expected_proof) {
421 Ok(true) => {
422 self.status = ResourceStatus::Complete;
423 vec![ResourceAction::Completed]
424 }
425 Ok(false) => {
426 self.status = ResourceStatus::Failed;
427 vec![ResourceAction::Failed(ResourceError::InvalidProof)]
428 }
429 Err(e) => {
430 self.status = ResourceStatus::Failed;
431 vec![ResourceAction::Failed(e)]
432 }
433 }
434 }
435
436 pub fn handle_reject(&mut self) -> Vec<ResourceAction> {
438 self.status = ResourceStatus::Rejected;
439 vec![ResourceAction::Failed(ResourceError::Rejected)]
440 }
441
442 pub fn cancel(&mut self) -> Vec<ResourceAction> {
444 if self.status < ResourceStatus::Complete {
445 self.status = ResourceStatus::Failed;
446 vec![ResourceAction::SendCancelInitiator(
447 self.resource_hash.to_vec(),
448 )]
449 } else {
450 vec![]
451 }
452 }
453
454 pub fn tick(&mut self, now: f64) -> Vec<ResourceAction> {
456 if self.status >= ResourceStatus::Complete {
457 return vec![];
458 }
459
460 match self.status {
461 ResourceStatus::Advertised => {
462 let timeout = self.adv_sent
463 + self.rtt.unwrap_or(self.link_rtt * self.timeout_factor)
464 + RESOURCE_PROCESSING_GRACE;
465 if now > timeout {
466 if self.retries_left == 0 {
467 self.status = ResourceStatus::Failed;
468 return vec![ResourceAction::Failed(ResourceError::Timeout)];
469 }
470 self.retries_left -= 1;
471 self.last_activity = now;
472 self.adv_sent = now;
473 let adv_data = self.get_advertisement(0);
474 return vec![ResourceAction::SendAdvertisement(adv_data)];
475 }
476 }
477 ResourceStatus::Transferring => {
478 let rtt = self.rtt.unwrap_or(1.0);
479 let max_extra_wait: f64 = (0..self.max_retries)
480 .map(|r| (r as f64 + 1.0) * RESOURCE_PER_RETRY_DELAY)
481 .sum();
482 let max_wait = rtt * self.timeout_factor * self.max_retries as f64
483 + self.sender_grace_time
484 + max_extra_wait;
485 if now > self.last_activity + max_wait {
486 self.status = ResourceStatus::Failed;
487 return vec![ResourceAction::Failed(ResourceError::Timeout)];
488 }
489 }
490 ResourceStatus::AwaitingProof => {
491 let rtt = self.rtt.unwrap_or(1.0);
492 let timeout = self.last_part_sent
493 + rtt * RESOURCE_PROOF_TIMEOUT_FACTOR
494 + self.sender_grace_time;
495 if now > timeout {
496 if self.retries_left == 0 {
497 self.status = ResourceStatus::Failed;
498 return vec![ResourceAction::Failed(ResourceError::Timeout)];
499 }
500 self.retries_left -= 1;
501 self.last_part_sent = now;
502 return vec![];
504 }
505 }
506 _ => {}
507 }
508
509 vec![]
510 }
511
512 pub fn total_parts(&self) -> usize {
514 self.total_parts
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::buffer::types::NoopCompressor;
522
523 fn identity_encrypt(data: &[u8]) -> Vec<u8> {
524 data.to_vec()
525 }
526
527 fn make_sender(data: &[u8]) -> ResourceSender {
528 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
529 ResourceSender::new(
530 data,
531 None,
532 RESOURCE_SDU,
533 &identity_encrypt,
534 &NoopCompressor,
535 &mut rng,
536 1000.0,
537 false,
538 false,
539 None,
540 1,
541 1,
542 None,
543 0.5,
544 6.0,
545 )
546 .unwrap()
547 }
548
549 #[test]
550 fn test_new_sender_status() {
551 let sender = make_sender(b"test data");
552 assert_eq!(sender.status, ResourceStatus::Queued);
553 }
554
555 #[test]
556 fn test_new_sender_parts() {
557 let data = vec![0xAA; 1000];
558 let sender = make_sender(&data);
559 assert_eq!(sender.total_parts(), 3);
562 }
563
564 #[test]
565 fn test_advertise() {
566 let mut sender = make_sender(b"test data");
567 let actions = sender.advertise(1000.0);
568 assert_eq!(sender.status, ResourceStatus::Advertised);
569 assert_eq!(actions.len(), 1);
570 match &actions[0] {
571 ResourceAction::SendAdvertisement(data) => {
572 assert!(!data.is_empty());
573 }
574 _ => panic!("Expected SendAdvertisement"),
575 }
576 }
577
578 #[test]
579 fn test_handle_request_basic() {
580 let mut sender = make_sender(b"short");
581 sender.advertise(1000.0);
582
583 let mut request = Vec::new();
585 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
586 request.extend_from_slice(&sender.resource_hash);
587 request.extend_from_slice(&sender.part_hashes[0]);
588
589 let actions = sender.handle_request(&request, 1001.0);
590 assert!(!actions.is_empty());
591 let has_part = actions
593 .iter()
594 .any(|a| matches!(a, ResourceAction::SendPart(_)));
595 assert!(has_part);
596 }
597
598 #[test]
599 fn test_all_parts_sent_awaiting_proof() {
600 let mut sender = make_sender(b"hi");
601 sender.advertise(1000.0);
602
603 let mut request = Vec::new();
605 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
606 request.extend_from_slice(&sender.resource_hash);
607 for h in &sender.part_hashes.clone() {
608 request.extend_from_slice(h);
609 }
610
611 let _actions = sender.handle_request(&request, 1001.0);
612 assert_eq!(sender.status, ResourceStatus::AwaitingProof);
613 assert_eq!(sender.retries_left, 3);
614 }
615
616 #[test]
617 fn test_valid_proof() {
618 let mut sender = make_sender(b"data");
619 sender.advertise(1000.0);
620
621 let proof_data =
622 super::super::proof::build_proof_data(&sender.resource_hash, &sender.expected_proof);
623 let actions = sender.handle_proof(&proof_data, 1002.0);
624 assert_eq!(sender.status, ResourceStatus::Complete);
625 assert!(actions
626 .iter()
627 .any(|a| matches!(a, ResourceAction::Completed)));
628 }
629
630 #[test]
631 fn test_invalid_proof() {
632 let mut sender = make_sender(b"data");
633 sender.advertise(1000.0);
634
635 let wrong_proof = [0xFF; 32];
636 let proof_data = super::super::proof::build_proof_data(&sender.resource_hash, &wrong_proof);
637 let _actions = sender.handle_proof(&proof_data, 1002.0);
638 assert_eq!(sender.status, ResourceStatus::Failed);
639 }
640
641 #[test]
642 fn test_handle_reject() {
643 let mut sender = make_sender(b"data");
644 sender.advertise(1000.0);
645 let _actions = sender.handle_reject();
646 assert_eq!(sender.status, ResourceStatus::Rejected);
647 }
648
649 #[test]
650 fn test_cancel() {
651 let mut sender = make_sender(b"data");
652 sender.advertise(1000.0);
653 let actions = sender.cancel();
654 assert_eq!(sender.status, ResourceStatus::Failed);
655 assert!(actions
656 .iter()
657 .any(|a| matches!(a, ResourceAction::SendCancelInitiator(_))));
658 }
659
660 #[test]
661 fn test_cancel_already_complete() {
662 let mut sender = make_sender(b"data");
663 sender.status = ResourceStatus::Complete;
664 let actions = sender.cancel();
665 assert!(actions.is_empty());
666 }
667
668 #[test]
669 fn test_tick_advertised_timeout() {
670 let mut sender = make_sender(b"data");
671 sender.advertise(1000.0);
672 sender.retries_left = 0;
673
674 let _actions = sender.tick(2000.0);
676 assert_eq!(sender.status, ResourceStatus::Failed);
677 }
678
679 #[test]
680 fn test_tick_advertised_retry() {
681 let mut sender = make_sender(b"data");
682 sender.advertise(1000.0);
683 assert!(sender.retries_left > 0);
684
685 let actions = sender.tick(2000.0);
686 assert!(actions
688 .iter()
689 .any(|a| matches!(a, ResourceAction::SendAdvertisement(_))));
690 }
691
692 #[test]
693 fn test_resource_hash_is_32_bytes() {
694 let sender = make_sender(b"data");
695 assert_eq!(sender.resource_hash.len(), 32);
696 assert_eq!(sender.expected_proof.len(), 32);
697 }
698
699 #[test]
700 fn test_sender_with_metadata() {
701 let mut rng = rns_crypto::FixedRng::new(&[0x55; 64]);
702 let sender = ResourceSender::new(
703 b"data",
704 Some(b"metadata"),
705 RESOURCE_SDU,
706 &identity_encrypt,
707 &NoopCompressor,
708 &mut rng,
709 1000.0,
710 false,
711 false,
712 None,
713 1,
714 1,
715 None,
716 0.5,
717 6.0,
718 )
719 .unwrap();
720 assert!(sender.flags.has_metadata);
721 }
722
723 #[test]
724 fn test_multi_segment_sender() {
725 let orig_hash = [0xBB; 32];
726 let mut rng = rns_crypto::FixedRng::new(&[0x66; 64]);
727 let sender = ResourceSender::new(
728 b"segment 2 data",
729 None,
730 RESOURCE_SDU,
731 &identity_encrypt,
732 &NoopCompressor,
733 &mut rng,
734 1000.0,
735 false,
736 false,
737 None,
738 2,
739 5,
740 Some(orig_hash),
741 0.5,
742 6.0,
743 )
744 .unwrap();
745 assert_eq!(sender.segment_index, 2);
746 assert_eq!(sender.total_segments, 5);
747 assert_eq!(sender.original_hash, orig_hash);
748 assert!(sender.flags.split);
749 }
750}