1use alloc::vec;
2use alloc::vec::Vec;
3
4use rns_crypto::Rng;
5
6use super::advertisement::ResourceAdvertisement;
7use super::parts::{build_hashmap, has_collision, prepend_metadata, split_into_parts};
8use super::proof::{compute_expected_proof, compute_resource_hash, validate_proof};
9use super::types::*;
10use crate::buffer::types::Compressor;
11use crate::constants::*;
12use crate::hash::get_random_hash;
13
14pub struct ResourceSender {
19 pub status: ResourceStatus,
21 pub resource_hash: [u8; 32],
23 pub truncated_hash: [u8; 16],
25 pub expected_proof: [u8; 32],
27 pub original_hash: [u8; 32],
29 pub random_hash: Vec<u8>,
31 pub sdu: usize,
33 parts: Vec<Vec<u8>>,
35 pub part_hashes: Vec<[u8; RESOURCE_MAPHASH_LEN]>,
37 hashmap: Vec<u8>,
39 total_parts: usize,
41 pub sent_parts: usize,
43 sent_indices: Vec<bool>,
45 pub flags: AdvFlags,
47 pub transfer_size: usize,
49 pub data_size: usize,
51 pub segment_index: u64,
53 pub total_segments: u64,
55 pub request_id: Option<Vec<u8>>,
57 pub retries_left: usize,
59 pub max_retries: usize,
61 pub max_adv_retries: usize,
63 pub rtt: Option<f64>,
65 pub link_rtt: f64,
67 pub timeout_factor: f64,
69 pub last_activity: f64,
71 pub adv_sent: f64,
73 pub last_part_sent: f64,
75 pub sender_grace_time: f64,
77 receiver_min_consecutive_height: usize,
79}
80
81impl ResourceSender {
82 #[allow(clippy::too_many_arguments)]
100 pub fn new(
101 data: &[u8],
102 metadata: Option<&[u8]>,
103 sdu: usize,
104 encrypt_fn: &dyn Fn(&[u8]) -> Vec<u8>,
105 compressor: &dyn Compressor,
106 rng: &mut dyn Rng,
107 now: f64,
108 auto_compress: bool,
109 is_response: bool,
110 request_id: Option<Vec<u8>>,
111 segment_index: u64,
112 total_segments: u64,
113 original_hash: Option<[u8; 32]>,
114 link_rtt: f64,
115 traffic_timeout_factor: f64,
116 ) -> Result<Self, ResourceError> {
117 let uncompressed_data = match metadata {
119 Some(meta) => prepend_metadata(data, meta),
120 None => data.to_vec(),
121 };
122 let has_metadata = metadata.is_some();
123
124 let data_size = uncompressed_data.len();
125
126 let (working_data, compressed) = if auto_compress {
128 match compressor.compress(&uncompressed_data) {
129 Some(compressed_data) if compressed_data.len() < uncompressed_data.len() => {
130 (compressed_data, true)
131 }
132 _ => (uncompressed_data.clone(), false),
133 }
134 } else {
135 (uncompressed_data.clone(), false)
136 };
137
138 let random_prefix: [u8; RESOURCE_RANDOM_HASH_SIZE] = {
140 let rh = get_random_hash(rng);
141 let mut buf = [0u8; RESOURCE_RANDOM_HASH_SIZE];
142 buf.copy_from_slice(&rh[..RESOURCE_RANDOM_HASH_SIZE]);
143 buf
144 };
145 let mut data_with_random =
146 Vec::with_capacity(RESOURCE_RANDOM_HASH_SIZE + working_data.len());
147 data_with_random.extend_from_slice(&random_prefix);
148 data_with_random.extend_from_slice(&working_data);
149
150 let encrypted_data = encrypt_fn(&data_with_random);
152 let transfer_size = encrypted_data.len();
153
154 let mut resource_hash;
156 let mut truncated_resource_hash;
157 let mut expected_proof;
158 let mut final_random_hash;
159 let mut parts_data;
160 let mut part_hashes;
161 let mut collision_retries = 0;
162 const MAX_COLLISION_RETRIES: usize = 100;
163
164 loop {
165 final_random_hash = {
166 let rh = get_random_hash(rng);
167 rh[..RESOURCE_RANDOM_HASH_SIZE].to_vec()
168 };
169
170 resource_hash = compute_resource_hash(&uncompressed_data, &final_random_hash);
171 truncated_resource_hash = {
172 let mut t = [0u8; 16];
173 t.copy_from_slice(&resource_hash[..16]);
174 t
175 };
176 expected_proof = compute_expected_proof(&uncompressed_data, &resource_hash);
177
178 let (p, h) = split_into_parts(&encrypted_data, sdu, &final_random_hash);
179 parts_data = p;
180 part_hashes = h;
181
182 if !has_collision(&part_hashes) {
183 break;
184 }
185 collision_retries += 1;
187 if collision_retries >= MAX_COLLISION_RETRIES {
188 return Err(ResourceError::CollisionDetected);
189 }
190 }
191
192 let hashmap = build_hashmap(&part_hashes);
193 let total_parts = parts_data.len();
194
195 let orig_hash = original_hash.unwrap_or(resource_hash);
196
197 let flags = AdvFlags {
198 encrypted: true,
199 compressed,
200 split: total_segments > 1,
201 is_request: request_id.is_some() && !is_response,
202 is_response: request_id.is_some() && is_response,
203 has_metadata,
204 };
205
206 Ok(ResourceSender {
207 status: ResourceStatus::Queued,
208 resource_hash,
209 truncated_hash: truncated_resource_hash,
210 expected_proof,
211 original_hash: orig_hash,
212 random_hash: final_random_hash,
213 sdu,
214 parts: parts_data,
215 part_hashes,
216 hashmap,
217 total_parts,
218 sent_parts: 0,
219 sent_indices: vec![false; total_parts],
220 flags,
221 transfer_size,
222 data_size,
223 segment_index,
224 total_segments,
225 request_id,
226 retries_left: RESOURCE_MAX_RETRIES,
227 max_retries: RESOURCE_MAX_RETRIES,
228 max_adv_retries: RESOURCE_MAX_ADV_RETRIES,
229 rtt: None,
230 link_rtt,
231 timeout_factor: traffic_timeout_factor,
232 last_activity: now,
233 adv_sent: now,
234 last_part_sent: now,
235 sender_grace_time: RESOURCE_SENDER_GRACE_TIME,
236 receiver_min_consecutive_height: 0,
237 })
238 }
239
240 pub fn get_advertisement(&self, segment: usize) -> Vec<u8> {
242 let adv = ResourceAdvertisement {
243 transfer_size: self.transfer_size as u64,
244 data_size: self.data_size as u64,
245 num_parts: self.total_parts as u64,
246 resource_hash: self.resource_hash.to_vec(),
247 random_hash: self.random_hash.clone(),
248 original_hash: self.original_hash.to_vec(),
249 hashmap: self.hashmap.clone(),
250 flags: self.flags,
251 segment_index: self.segment_index,
252 total_segments: self.total_segments,
253 request_id: self.request_id.clone(),
254 };
255 adv.pack(segment)
256 }
257
258 pub fn advertise(&mut self, now: f64) -> Vec<ResourceAction> {
260 self.status = ResourceStatus::Advertised;
261 self.last_activity = now;
262 self.adv_sent = now;
263 self.retries_left = self.max_adv_retries;
264 let adv_data = self.get_advertisement(0);
265 vec![ResourceAction::SendAdvertisement(adv_data)]
266 }
267
268 pub fn handle_request(&mut self, request_data: &[u8], now: f64) -> Vec<ResourceAction> {
273 if self.status == ResourceStatus::Failed {
274 return vec![];
275 }
276
277 if self.rtt.is_none() {
279 self.rtt = Some(now - self.adv_sent);
280 }
281
282 if self.status != ResourceStatus::Transferring {
283 self.status = ResourceStatus::Transferring;
284 }
285
286 self.retries_left = self.max_retries;
287 self.last_activity = now;
288
289 let wants_more_hashmap = request_data.first() == Some(&RESOURCE_HASHMAP_IS_EXHAUSTED);
290 let pad = if wants_more_hashmap {
291 1 + RESOURCE_MAPHASH_LEN
292 } else {
293 1
294 };
295
296 if request_data.len() < pad + 32 {
297 return vec![];
298 }
299
300 let requested_hashes_data = &request_data[pad + 32..];
301 let mut actions = Vec::new();
302
303 let num_requested = requested_hashes_data.len() / RESOURCE_MAPHASH_LEN;
305 let mut map_hashes_requested = Vec::with_capacity(num_requested);
306 for i in 0..num_requested {
307 let start = i * RESOURCE_MAPHASH_LEN;
308 let end = start + RESOURCE_MAPHASH_LEN;
309 if end <= requested_hashes_data.len() {
310 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
311 h.copy_from_slice(&requested_hashes_data[start..end]);
312 map_hashes_requested.push(h);
313 }
314 }
315
316 let search_start = self.receiver_min_consecutive_height;
318 let search_end = core::cmp::min(
319 search_start + RESOURCE_COLLISION_GUARD_SIZE,
320 self.total_parts,
321 );
322
323 for part_idx in search_start..search_end {
324 if map_hashes_requested.contains(&self.part_hashes[part_idx]) {
325 actions.push(ResourceAction::SendPart(self.parts[part_idx].clone()));
326 if !self.sent_indices[part_idx] {
327 self.sent_indices[part_idx] = true;
328 self.sent_parts += 1;
329 }
330 self.last_part_sent = now;
331 }
332 }
333
334 if wants_more_hashmap {
336 if let Some(hmu) = self.build_hmu(request_data, now) {
337 actions.push(ResourceAction::SendHmu(hmu));
338 }
339 }
340
341 if self.sent_parts >= self.total_parts {
343 self.status = ResourceStatus::AwaitingProof;
344 self.retries_left = 3; }
346
347 actions
348 }
349
350 fn build_hmu(&mut self, request_data: &[u8], now: f64) -> Option<Vec<u8>> {
352 if request_data.len() < 1 + RESOURCE_MAPHASH_LEN {
353 return None;
354 }
355
356 let last_map_hash_bytes = &request_data[1..1 + RESOURCE_MAPHASH_LEN];
357 let mut last_map_hash = [0u8; RESOURCE_MAPHASH_LEN];
358 last_map_hash.copy_from_slice(last_map_hash_bytes);
359
360 let search_start = self.receiver_min_consecutive_height;
362 let search_end = core::cmp::min(
363 search_start + RESOURCE_COLLISION_GUARD_SIZE,
364 self.total_parts,
365 );
366
367 let mut part_index = search_start;
368 for idx in search_start..search_end {
369 part_index = idx + 1;
370 if self.part_hashes[idx] == last_map_hash {
371 break;
372 }
373 }
374
375 self.receiver_min_consecutive_height = if part_index > RESOURCE_WINDOW_MAX {
377 part_index - 1 - RESOURCE_WINDOW_MAX
378 } else {
379 0
380 };
381
382 if !part_index.is_multiple_of(RESOURCE_HASHMAP_MAX_LEN) {
384 return None; }
386
387 let segment = part_index / RESOURCE_HASHMAP_MAX_LEN;
388 let hashmap_start = segment * RESOURCE_HASHMAP_MAX_LEN;
389 let hashmap_end =
390 core::cmp::min((segment + 1) * RESOURCE_HASHMAP_MAX_LEN, self.total_parts);
391
392 let mut hashmap_segment = Vec::new();
393 for i in hashmap_start..hashmap_end {
394 hashmap_segment.extend_from_slice(
395 &self.hashmap[i * RESOURCE_MAPHASH_LEN..(i + 1) * RESOURCE_MAPHASH_LEN],
396 );
397 }
398
399 let hmu_payload = crate::msgpack::pack(&crate::msgpack::Value::Array(vec![
401 crate::msgpack::Value::UInt(segment as u64),
402 crate::msgpack::Value::Bin(hashmap_segment),
403 ]));
404
405 let mut hmu = Vec::with_capacity(32 + hmu_payload.len());
406 hmu.extend_from_slice(&self.resource_hash);
407 hmu.extend_from_slice(&hmu_payload);
408
409 self.last_activity = now;
410 Some(hmu)
411 }
412
413 pub fn handle_proof(&mut self, proof_data: &[u8], _now: f64) -> Vec<ResourceAction> {
415 if self.status == ResourceStatus::Failed {
416 return vec![];
417 }
418
419 match validate_proof(proof_data, &self.resource_hash, &self.expected_proof) {
420 Ok(true) => {
421 self.status = ResourceStatus::Complete;
422 vec![ResourceAction::Completed]
423 }
424 Ok(false) => {
425 self.status = ResourceStatus::Failed;
426 vec![ResourceAction::Failed(ResourceError::InvalidProof)]
427 }
428 Err(e) => {
429 self.status = ResourceStatus::Failed;
430 vec![ResourceAction::Failed(e)]
431 }
432 }
433 }
434
435 pub fn handle_reject(&mut self) -> Vec<ResourceAction> {
437 self.status = ResourceStatus::Rejected;
438 vec![ResourceAction::Failed(ResourceError::Rejected)]
439 }
440
441 pub fn cancel(&mut self) -> Vec<ResourceAction> {
443 if self.status < ResourceStatus::Complete {
444 self.status = ResourceStatus::Failed;
445 vec![ResourceAction::SendCancelInitiator(
446 self.resource_hash.to_vec(),
447 )]
448 } else {
449 vec![]
450 }
451 }
452
453 pub fn tick(&mut self, now: f64) -> Vec<ResourceAction> {
455 if self.status >= ResourceStatus::Complete {
456 return vec![];
457 }
458
459 match self.status {
460 ResourceStatus::Advertised => {
461 let timeout = self.adv_sent
462 + self.rtt.unwrap_or(self.link_rtt * self.timeout_factor)
463 + RESOURCE_PROCESSING_GRACE;
464 if now > timeout {
465 if self.retries_left == 0 {
466 self.status = ResourceStatus::Failed;
467 return vec![ResourceAction::Failed(ResourceError::Timeout)];
468 }
469 self.retries_left -= 1;
470 self.last_activity = now;
471 self.adv_sent = now;
472 let adv_data = self.get_advertisement(0);
473 return vec![ResourceAction::SendAdvertisement(adv_data)];
474 }
475 }
476 ResourceStatus::Transferring => {
477 let rtt = self.rtt.unwrap_or(1.0);
478 let max_extra_wait: f64 = (0..self.max_retries)
479 .map(|r| (r as f64 + 1.0) * RESOURCE_PER_RETRY_DELAY)
480 .sum();
481 let max_wait = rtt * self.timeout_factor * self.max_retries as f64
482 + self.sender_grace_time
483 + max_extra_wait;
484 if now > self.last_activity + max_wait {
485 self.status = ResourceStatus::Failed;
486 return vec![ResourceAction::Failed(ResourceError::Timeout)];
487 }
488 }
489 ResourceStatus::AwaitingProof => {
490 let rtt = self.rtt.unwrap_or(1.0);
491 let timeout = self.last_part_sent
492 + rtt * RESOURCE_PROOF_TIMEOUT_FACTOR
493 + self.sender_grace_time;
494 if now > timeout {
495 if self.retries_left == 0 {
496 self.status = ResourceStatus::Failed;
497 return vec![ResourceAction::Failed(ResourceError::Timeout)];
498 }
499 self.retries_left -= 1;
500 self.last_part_sent = now;
501 return vec![];
503 }
504 }
505 _ => {}
506 }
507
508 vec![]
509 }
510
511 pub fn total_parts(&self) -> usize {
513 self.total_parts
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use crate::buffer::types::NoopCompressor;
521
522 fn identity_encrypt(data: &[u8]) -> Vec<u8> {
523 data.to_vec()
524 }
525
526 fn make_sender(data: &[u8]) -> ResourceSender {
527 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
528 ResourceSender::new(
529 data,
530 None,
531 RESOURCE_SDU,
532 &identity_encrypt,
533 &NoopCompressor,
534 &mut rng,
535 1000.0,
536 false,
537 false,
538 None,
539 1,
540 1,
541 None,
542 0.5,
543 6.0,
544 )
545 .unwrap()
546 }
547
548 #[test]
549 fn test_new_sender_status() {
550 let sender = make_sender(b"test data");
551 assert_eq!(sender.status, ResourceStatus::Queued);
552 }
553
554 #[test]
555 fn test_new_sender_parts() {
556 let data = vec![0xAA; 1000];
557 let sender = make_sender(&data);
558 assert_eq!(sender.total_parts(), 3);
561 }
562
563 #[test]
564 fn test_advertise() {
565 let mut sender = make_sender(b"test data");
566 let actions = sender.advertise(1000.0);
567 assert_eq!(sender.status, ResourceStatus::Advertised);
568 assert_eq!(actions.len(), 1);
569 match &actions[0] {
570 ResourceAction::SendAdvertisement(data) => {
571 assert!(!data.is_empty());
572 }
573 _ => panic!("Expected SendAdvertisement"),
574 }
575 }
576
577 #[test]
578 fn test_handle_request_basic() {
579 let mut sender = make_sender(b"short");
580 sender.advertise(1000.0);
581
582 let mut request = Vec::new();
584 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
585 request.extend_from_slice(&sender.resource_hash);
586 request.extend_from_slice(&sender.part_hashes[0]);
587
588 let actions = sender.handle_request(&request, 1001.0);
589 assert!(!actions.is_empty());
590 let has_part = actions
592 .iter()
593 .any(|a| matches!(a, ResourceAction::SendPart(_)));
594 assert!(has_part);
595 }
596
597 #[test]
598 fn test_all_parts_sent_awaiting_proof() {
599 let mut sender = make_sender(b"hi");
600 sender.advertise(1000.0);
601
602 let mut request = Vec::new();
604 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
605 request.extend_from_slice(&sender.resource_hash);
606 for h in &sender.part_hashes.clone() {
607 request.extend_from_slice(h);
608 }
609
610 let _actions = sender.handle_request(&request, 1001.0);
611 assert_eq!(sender.status, ResourceStatus::AwaitingProof);
612 assert_eq!(sender.retries_left, 3);
613 }
614
615 #[test]
616 fn test_valid_proof() {
617 let mut sender = make_sender(b"data");
618 sender.advertise(1000.0);
619
620 let proof_data =
621 super::super::proof::build_proof_data(&sender.resource_hash, &sender.expected_proof);
622 let actions = sender.handle_proof(&proof_data, 1002.0);
623 assert_eq!(sender.status, ResourceStatus::Complete);
624 assert!(actions
625 .iter()
626 .any(|a| matches!(a, ResourceAction::Completed)));
627 }
628
629 #[test]
630 fn test_invalid_proof() {
631 let mut sender = make_sender(b"data");
632 sender.advertise(1000.0);
633
634 let wrong_proof = [0xFF; 32];
635 let proof_data = super::super::proof::build_proof_data(&sender.resource_hash, &wrong_proof);
636 let _actions = sender.handle_proof(&proof_data, 1002.0);
637 assert_eq!(sender.status, ResourceStatus::Failed);
638 }
639
640 #[test]
641 fn test_handle_reject() {
642 let mut sender = make_sender(b"data");
643 sender.advertise(1000.0);
644 let _actions = sender.handle_reject();
645 assert_eq!(sender.status, ResourceStatus::Rejected);
646 }
647
648 #[test]
649 fn test_cancel() {
650 let mut sender = make_sender(b"data");
651 sender.advertise(1000.0);
652 let actions = sender.cancel();
653 assert_eq!(sender.status, ResourceStatus::Failed);
654 assert!(actions
655 .iter()
656 .any(|a| matches!(a, ResourceAction::SendCancelInitiator(_))));
657 }
658
659 #[test]
660 fn test_cancel_already_complete() {
661 let mut sender = make_sender(b"data");
662 sender.status = ResourceStatus::Complete;
663 let actions = sender.cancel();
664 assert!(actions.is_empty());
665 }
666
667 #[test]
668 fn test_tick_advertised_timeout() {
669 let mut sender = make_sender(b"data");
670 sender.advertise(1000.0);
671 sender.retries_left = 0;
672
673 let _actions = sender.tick(2000.0);
675 assert_eq!(sender.status, ResourceStatus::Failed);
676 }
677
678 #[test]
679 fn test_tick_advertised_retry() {
680 let mut sender = make_sender(b"data");
681 sender.advertise(1000.0);
682 assert!(sender.retries_left > 0);
683
684 let actions = sender.tick(2000.0);
685 assert!(actions
687 .iter()
688 .any(|a| matches!(a, ResourceAction::SendAdvertisement(_))));
689 }
690
691 #[test]
692 fn test_resource_hash_is_32_bytes() {
693 let sender = make_sender(b"data");
694 assert_eq!(sender.resource_hash.len(), 32);
695 assert_eq!(sender.expected_proof.len(), 32);
696 }
697
698 #[test]
699 fn test_sender_with_metadata() {
700 let mut rng = rns_crypto::FixedRng::new(&[0x55; 64]);
701 let sender = ResourceSender::new(
702 b"data",
703 Some(b"metadata"),
704 RESOURCE_SDU,
705 &identity_encrypt,
706 &NoopCompressor,
707 &mut rng,
708 1000.0,
709 false,
710 false,
711 None,
712 1,
713 1,
714 None,
715 0.5,
716 6.0,
717 )
718 .unwrap();
719 assert!(sender.flags.has_metadata);
720 }
721
722 #[test]
723 fn test_multi_segment_sender() {
724 let orig_hash = [0xBB; 32];
725 let mut rng = rns_crypto::FixedRng::new(&[0x66; 64]);
726 let sender = ResourceSender::new(
727 b"segment 2 data",
728 None,
729 RESOURCE_SDU,
730 &identity_encrypt,
731 &NoopCompressor,
732 &mut rng,
733 1000.0,
734 false,
735 false,
736 None,
737 2,
738 5,
739 Some(orig_hash),
740 0.5,
741 6.0,
742 )
743 .unwrap();
744 assert_eq!(sender.segment_index, 2);
745 assert_eq!(sender.total_segments, 5);
746 assert_eq!(sender.original_hash, orig_hash);
747 assert!(sender.flags.split);
748 }
749}