1use alloc::vec;
2use alloc::vec::Vec;
3
4use rns_crypto::Rng;
5
6use crate::constants::*;
7use crate::hash::get_random_hash;
8use crate::buffer::types::Compressor;
9use super::types::*;
10use super::advertisement::ResourceAdvertisement;
11use super::parts::{split_into_parts, build_hashmap, has_collision, prepend_metadata};
12use super::proof::{compute_resource_hash, compute_expected_proof, validate_proof};
13
14pub struct ResourceSender {
19 pub status: ResourceStatus,
21 pub resource_hash: [u8; 32],
23 pub truncated_hash: [u8; 16],
25 pub expected_proof: [u8; 32],
27 pub original_hash: [u8; 32],
29 pub random_hash: Vec<u8>,
31 pub sdu: usize,
33 parts: Vec<Vec<u8>>,
35 pub part_hashes: Vec<[u8; RESOURCE_MAPHASH_LEN]>,
37 hashmap: Vec<u8>,
39 total_parts: usize,
41 pub sent_parts: usize,
43 sent_indices: Vec<bool>,
45 pub flags: AdvFlags,
47 pub transfer_size: usize,
49 pub data_size: usize,
51 pub segment_index: u64,
53 pub total_segments: u64,
55 pub request_id: Option<Vec<u8>>,
57 pub retries_left: usize,
59 pub max_retries: usize,
61 pub max_adv_retries: usize,
63 pub rtt: Option<f64>,
65 pub link_rtt: f64,
67 pub timeout_factor: f64,
69 pub last_activity: f64,
71 pub adv_sent: f64,
73 pub last_part_sent: f64,
75 pub sender_grace_time: f64,
77 receiver_min_consecutive_height: usize,
79}
80
81impl ResourceSender {
82 pub fn new(
100 data: &[u8],
101 metadata: Option<&[u8]>,
102 sdu: usize,
103 encrypt_fn: &dyn Fn(&[u8]) -> Vec<u8>,
104 compressor: &dyn Compressor,
105 rng: &mut dyn Rng,
106 now: f64,
107 auto_compress: bool,
108 is_response: bool,
109 request_id: Option<Vec<u8>>,
110 segment_index: u64,
111 total_segments: u64,
112 original_hash: Option<[u8; 32]>,
113 link_rtt: f64,
114 traffic_timeout_factor: f64,
115 ) -> Result<Self, ResourceError> {
116 let uncompressed_data = match metadata {
118 Some(meta) => prepend_metadata(data, meta),
119 None => data.to_vec(),
120 };
121 let has_metadata = metadata.is_some();
122
123 let data_size = uncompressed_data.len();
124
125 let (working_data, compressed) = if auto_compress {
127 match compressor.compress(&uncompressed_data) {
128 Some(compressed_data) if compressed_data.len() < uncompressed_data.len() => {
129 (compressed_data, true)
130 }
131 _ => (uncompressed_data.clone(), false),
132 }
133 } else {
134 (uncompressed_data.clone(), false)
135 };
136
137 let random_prefix: [u8; RESOURCE_RANDOM_HASH_SIZE] = {
139 let rh = get_random_hash(rng);
140 let mut buf = [0u8; RESOURCE_RANDOM_HASH_SIZE];
141 buf.copy_from_slice(&rh[..RESOURCE_RANDOM_HASH_SIZE]);
142 buf
143 };
144 let mut data_with_random = Vec::with_capacity(RESOURCE_RANDOM_HASH_SIZE + working_data.len());
145 data_with_random.extend_from_slice(&random_prefix);
146 data_with_random.extend_from_slice(&working_data);
147
148 let encrypted_data = encrypt_fn(&data_with_random);
150 let transfer_size = encrypted_data.len();
151
152 let mut resource_hash;
154 let mut truncated_resource_hash;
155 let mut expected_proof;
156 let mut final_random_hash;
157 let mut parts_data;
158 let mut part_hashes;
159 let mut collision_retries = 0;
160 const MAX_COLLISION_RETRIES: usize = 100;
161
162 loop {
163 final_random_hash = {
164 let rh = get_random_hash(rng);
165 rh[..RESOURCE_RANDOM_HASH_SIZE].to_vec()
166 };
167
168 resource_hash = compute_resource_hash(&uncompressed_data, &final_random_hash);
169 truncated_resource_hash = {
170 let mut t = [0u8; 16];
171 t.copy_from_slice(&resource_hash[..16]);
172 t
173 };
174 expected_proof = compute_expected_proof(&uncompressed_data, &resource_hash);
175
176 let (p, h) = split_into_parts(&encrypted_data, sdu, &final_random_hash);
177 parts_data = p;
178 part_hashes = h;
179
180 if !has_collision(&part_hashes) {
181 break;
182 }
183 collision_retries += 1;
185 if collision_retries >= MAX_COLLISION_RETRIES {
186 return Err(ResourceError::CollisionDetected);
187 }
188 }
189
190 let hashmap = build_hashmap(&part_hashes);
191 let total_parts = parts_data.len();
192
193 let orig_hash = original_hash.unwrap_or(resource_hash);
194
195 let flags = AdvFlags {
196 encrypted: true,
197 compressed,
198 split: total_segments > 1,
199 is_request: request_id.is_some() && !is_response,
200 is_response: request_id.is_some() && is_response,
201 has_metadata,
202 };
203
204 Ok(ResourceSender {
205 status: ResourceStatus::Queued,
206 resource_hash,
207 truncated_hash: truncated_resource_hash,
208 expected_proof,
209 original_hash: orig_hash,
210 random_hash: final_random_hash,
211 sdu,
212 parts: parts_data,
213 part_hashes,
214 hashmap,
215 total_parts,
216 sent_parts: 0,
217 sent_indices: vec![false; total_parts],
218 flags,
219 transfer_size,
220 data_size,
221 segment_index,
222 total_segments,
223 request_id,
224 retries_left: RESOURCE_MAX_RETRIES,
225 max_retries: RESOURCE_MAX_RETRIES,
226 max_adv_retries: RESOURCE_MAX_ADV_RETRIES,
227 rtt: None,
228 link_rtt,
229 timeout_factor: traffic_timeout_factor,
230 last_activity: now,
231 adv_sent: now,
232 last_part_sent: now,
233 sender_grace_time: RESOURCE_SENDER_GRACE_TIME,
234 receiver_min_consecutive_height: 0,
235 })
236 }
237
238 pub fn get_advertisement(&self, segment: usize) -> Vec<u8> {
240 let adv = ResourceAdvertisement {
241 transfer_size: self.transfer_size as u64,
242 data_size: self.data_size as u64,
243 num_parts: self.total_parts as u64,
244 resource_hash: self.resource_hash.to_vec(),
245 random_hash: self.random_hash.clone(),
246 original_hash: self.original_hash.to_vec(),
247 hashmap: self.hashmap.clone(),
248 flags: self.flags,
249 segment_index: self.segment_index,
250 total_segments: self.total_segments,
251 request_id: self.request_id.clone(),
252 };
253 adv.pack(segment)
254 }
255
256 pub fn advertise(&mut self, now: f64) -> Vec<ResourceAction> {
258 self.status = ResourceStatus::Advertised;
259 self.last_activity = now;
260 self.adv_sent = now;
261 self.retries_left = self.max_adv_retries;
262 let adv_data = self.get_advertisement(0);
263 vec![ResourceAction::SendAdvertisement(adv_data)]
264 }
265
266 pub fn handle_request(&mut self, request_data: &[u8], now: f64) -> Vec<ResourceAction> {
271 if self.status == ResourceStatus::Failed {
272 return vec![];
273 }
274
275 if self.rtt.is_none() {
277 self.rtt = Some(now - self.adv_sent);
278 }
279
280 if self.status != ResourceStatus::Transferring {
281 self.status = ResourceStatus::Transferring;
282 }
283
284 self.retries_left = self.max_retries;
285 self.last_activity = now;
286
287 let wants_more_hashmap = request_data.first() == Some(&RESOURCE_HASHMAP_IS_EXHAUSTED);
288 let pad = if wants_more_hashmap {
289 1 + RESOURCE_MAPHASH_LEN
290 } else {
291 1
292 };
293
294 if request_data.len() < pad + 32 {
295 return vec![];
296 }
297
298 let requested_hashes_data = &request_data[pad + 32..];
299 let mut actions = Vec::new();
300
301 let num_requested = requested_hashes_data.len() / RESOURCE_MAPHASH_LEN;
303 let mut map_hashes_requested = Vec::with_capacity(num_requested);
304 for i in 0..num_requested {
305 let start = i * RESOURCE_MAPHASH_LEN;
306 let end = start + RESOURCE_MAPHASH_LEN;
307 if end <= requested_hashes_data.len() {
308 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
309 h.copy_from_slice(&requested_hashes_data[start..end]);
310 map_hashes_requested.push(h);
311 }
312 }
313
314 let search_start = self.receiver_min_consecutive_height;
316 let search_end = core::cmp::min(
317 search_start + RESOURCE_COLLISION_GUARD_SIZE,
318 self.total_parts,
319 );
320
321 for part_idx in search_start..search_end {
322 if map_hashes_requested.contains(&self.part_hashes[part_idx]) {
323 actions.push(ResourceAction::SendPart(self.parts[part_idx].clone()));
324 if !self.sent_indices[part_idx] {
325 self.sent_indices[part_idx] = true;
326 self.sent_parts += 1;
327 }
328 self.last_part_sent = now;
329 }
330 }
331
332 if wants_more_hashmap {
334 if let Some(hmu) = self.build_hmu(request_data, now) {
335 actions.push(ResourceAction::SendHmu(hmu));
336 }
337 }
338
339 if self.sent_parts >= self.total_parts {
341 self.status = ResourceStatus::AwaitingProof;
342 self.retries_left = 3; }
344
345 actions
346 }
347
348 fn build_hmu(&mut self, request_data: &[u8], now: f64) -> Option<Vec<u8>> {
350 if request_data.len() < 1 + RESOURCE_MAPHASH_LEN {
351 return None;
352 }
353
354 let last_map_hash_bytes = &request_data[1..1 + RESOURCE_MAPHASH_LEN];
355 let mut last_map_hash = [0u8; RESOURCE_MAPHASH_LEN];
356 last_map_hash.copy_from_slice(last_map_hash_bytes);
357
358 let search_start = self.receiver_min_consecutive_height;
360 let search_end = core::cmp::min(
361 search_start + RESOURCE_COLLISION_GUARD_SIZE,
362 self.total_parts,
363 );
364
365 let mut part_index = search_start;
366 for idx in search_start..search_end {
367 part_index = idx + 1;
368 if self.part_hashes[idx] == last_map_hash {
369 break;
370 }
371 }
372
373 self.receiver_min_consecutive_height =
375 if part_index > RESOURCE_WINDOW_MAX { part_index - 1 - RESOURCE_WINDOW_MAX } else { 0 };
376
377 if part_index % RESOURCE_HASHMAP_MAX_LEN != 0 {
379 return None; }
381
382 let segment = part_index / RESOURCE_HASHMAP_MAX_LEN;
383 let hashmap_start = segment * RESOURCE_HASHMAP_MAX_LEN;
384 let hashmap_end = core::cmp::min(
385 (segment + 1) * RESOURCE_HASHMAP_MAX_LEN,
386 self.total_parts,
387 );
388
389 let mut hashmap_segment = Vec::new();
390 for i in hashmap_start..hashmap_end {
391 hashmap_segment.extend_from_slice(&self.hashmap[i * RESOURCE_MAPHASH_LEN..(i + 1) * RESOURCE_MAPHASH_LEN]);
392 }
393
394 let hmu_payload = crate::msgpack::pack(&crate::msgpack::Value::Array(vec![
396 crate::msgpack::Value::UInt(segment as u64),
397 crate::msgpack::Value::Bin(hashmap_segment),
398 ]));
399
400 let mut hmu = Vec::with_capacity(32 + hmu_payload.len());
401 hmu.extend_from_slice(&self.resource_hash);
402 hmu.extend_from_slice(&hmu_payload);
403
404 self.last_activity = now;
405 Some(hmu)
406 }
407
408 pub fn handle_proof(&mut self, proof_data: &[u8], _now: f64) -> Vec<ResourceAction> {
410 if self.status == ResourceStatus::Failed {
411 return vec![];
412 }
413
414 match validate_proof(proof_data, &self.resource_hash, &self.expected_proof) {
415 Ok(true) => {
416 self.status = ResourceStatus::Complete;
417 vec![ResourceAction::Completed]
418 }
419 Ok(false) => {
420 self.status = ResourceStatus::Failed;
421 vec![ResourceAction::Failed(ResourceError::InvalidProof)]
422 }
423 Err(e) => {
424 self.status = ResourceStatus::Failed;
425 vec![ResourceAction::Failed(e)]
426 }
427 }
428 }
429
430 pub fn handle_reject(&mut self) -> Vec<ResourceAction> {
432 self.status = ResourceStatus::Rejected;
433 vec![ResourceAction::Failed(ResourceError::Rejected)]
434 }
435
436 pub fn cancel(&mut self) -> Vec<ResourceAction> {
438 if self.status < ResourceStatus::Complete {
439 self.status = ResourceStatus::Failed;
440 vec![ResourceAction::SendCancelInitiator(self.resource_hash.to_vec())]
441 } else {
442 vec![]
443 }
444 }
445
446 pub fn tick(&mut self, now: f64) -> Vec<ResourceAction> {
448 if self.status >= ResourceStatus::Complete {
449 return vec![];
450 }
451
452 match self.status {
453 ResourceStatus::Advertised => {
454 let timeout = self.adv_sent
455 + self.rtt.unwrap_or(self.link_rtt * self.timeout_factor)
456 + RESOURCE_PROCESSING_GRACE;
457 if now > timeout {
458 if self.retries_left == 0 {
459 self.status = ResourceStatus::Failed;
460 return vec![ResourceAction::Failed(ResourceError::Timeout)];
461 }
462 self.retries_left -= 1;
463 self.last_activity = now;
464 self.adv_sent = now;
465 let adv_data = self.get_advertisement(0);
466 return vec![ResourceAction::SendAdvertisement(adv_data)];
467 }
468 }
469 ResourceStatus::Transferring => {
470 let rtt = self.rtt.unwrap_or(1.0);
471 let max_extra_wait: f64 = (0..self.max_retries)
472 .map(|r| (r as f64 + 1.0) * RESOURCE_PER_RETRY_DELAY)
473 .sum();
474 let max_wait = rtt * self.timeout_factor * self.max_retries as f64
475 + self.sender_grace_time
476 + max_extra_wait;
477 if now > self.last_activity + max_wait {
478 self.status = ResourceStatus::Failed;
479 return vec![ResourceAction::Failed(ResourceError::Timeout)];
480 }
481 }
482 ResourceStatus::AwaitingProof => {
483 let rtt = self.rtt.unwrap_or(1.0);
484 let timeout = self.last_part_sent
485 + rtt * RESOURCE_PROOF_TIMEOUT_FACTOR
486 + self.sender_grace_time;
487 if now > timeout {
488 if self.retries_left == 0 {
489 self.status = ResourceStatus::Failed;
490 return vec![ResourceAction::Failed(ResourceError::Timeout)];
491 }
492 self.retries_left -= 1;
493 self.last_part_sent = now;
494 return vec![];
496 }
497 }
498 _ => {}
499 }
500
501 vec![]
502 }
503
504 pub fn total_parts(&self) -> usize {
506 self.total_parts
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use crate::buffer::types::NoopCompressor;
514
515 fn identity_encrypt(data: &[u8]) -> Vec<u8> {
516 data.to_vec()
517 }
518
519 fn make_sender(data: &[u8]) -> ResourceSender {
520 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
521 ResourceSender::new(
522 data,
523 None,
524 RESOURCE_SDU,
525 &identity_encrypt,
526 &NoopCompressor,
527 &mut rng,
528 1000.0,
529 false,
530 false,
531 None,
532 1,
533 1,
534 None,
535 0.5,
536 6.0,
537 )
538 .unwrap()
539 }
540
541 #[test]
542 fn test_new_sender_status() {
543 let sender = make_sender(b"test data");
544 assert_eq!(sender.status, ResourceStatus::Queued);
545 }
546
547 #[test]
548 fn test_new_sender_parts() {
549 let data = vec![0xAA; 1000];
550 let sender = make_sender(&data);
551 assert_eq!(sender.total_parts(), 3);
554 }
555
556 #[test]
557 fn test_advertise() {
558 let mut sender = make_sender(b"test data");
559 let actions = sender.advertise(1000.0);
560 assert_eq!(sender.status, ResourceStatus::Advertised);
561 assert_eq!(actions.len(), 1);
562 match &actions[0] {
563 ResourceAction::SendAdvertisement(data) => {
564 assert!(!data.is_empty());
565 }
566 _ => panic!("Expected SendAdvertisement"),
567 }
568 }
569
570 #[test]
571 fn test_handle_request_basic() {
572 let mut sender = make_sender(b"short");
573 sender.advertise(1000.0);
574
575 let mut request = Vec::new();
577 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
578 request.extend_from_slice(&sender.resource_hash);
579 request.extend_from_slice(&sender.part_hashes[0]);
580
581 let actions = sender.handle_request(&request, 1001.0);
582 assert!(!actions.is_empty());
583 let has_part = actions.iter().any(|a| matches!(a, ResourceAction::SendPart(_)));
585 assert!(has_part);
586 }
587
588 #[test]
589 fn test_all_parts_sent_awaiting_proof() {
590 let mut sender = make_sender(b"hi");
591 sender.advertise(1000.0);
592
593 let mut request = Vec::new();
595 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
596 request.extend_from_slice(&sender.resource_hash);
597 for h in &sender.part_hashes.clone() {
598 request.extend_from_slice(h);
599 }
600
601 let _actions = sender.handle_request(&request, 1001.0);
602 assert_eq!(sender.status, ResourceStatus::AwaitingProof);
603 assert_eq!(sender.retries_left, 3);
604 }
605
606 #[test]
607 fn test_valid_proof() {
608 let mut sender = make_sender(b"data");
609 sender.advertise(1000.0);
610
611 let proof_data = super::super::proof::build_proof_data(
612 &sender.resource_hash,
613 &sender.expected_proof,
614 );
615 let actions = sender.handle_proof(&proof_data, 1002.0);
616 assert_eq!(sender.status, ResourceStatus::Complete);
617 assert!(actions.iter().any(|a| matches!(a, ResourceAction::Completed)));
618 }
619
620 #[test]
621 fn test_invalid_proof() {
622 let mut sender = make_sender(b"data");
623 sender.advertise(1000.0);
624
625 let wrong_proof = [0xFF; 32];
626 let proof_data = super::super::proof::build_proof_data(
627 &sender.resource_hash,
628 &wrong_proof,
629 );
630 let _actions = sender.handle_proof(&proof_data, 1002.0);
631 assert_eq!(sender.status, ResourceStatus::Failed);
632 }
633
634 #[test]
635 fn test_handle_reject() {
636 let mut sender = make_sender(b"data");
637 sender.advertise(1000.0);
638 let _actions = sender.handle_reject();
639 assert_eq!(sender.status, ResourceStatus::Rejected);
640 }
641
642 #[test]
643 fn test_cancel() {
644 let mut sender = make_sender(b"data");
645 sender.advertise(1000.0);
646 let actions = sender.cancel();
647 assert_eq!(sender.status, ResourceStatus::Failed);
648 assert!(actions.iter().any(|a| matches!(a, ResourceAction::SendCancelInitiator(_))));
649 }
650
651 #[test]
652 fn test_cancel_already_complete() {
653 let mut sender = make_sender(b"data");
654 sender.status = ResourceStatus::Complete;
655 let actions = sender.cancel();
656 assert!(actions.is_empty());
657 }
658
659 #[test]
660 fn test_tick_advertised_timeout() {
661 let mut sender = make_sender(b"data");
662 sender.advertise(1000.0);
663 sender.retries_left = 0;
664
665 let _actions = sender.tick(2000.0);
667 assert_eq!(sender.status, ResourceStatus::Failed);
668 }
669
670 #[test]
671 fn test_tick_advertised_retry() {
672 let mut sender = make_sender(b"data");
673 sender.advertise(1000.0);
674 assert!(sender.retries_left > 0);
675
676 let actions = sender.tick(2000.0);
677 assert!(actions.iter().any(|a| matches!(a, ResourceAction::SendAdvertisement(_))));
679 }
680
681 #[test]
682 fn test_resource_hash_is_32_bytes() {
683 let sender = make_sender(b"data");
684 assert_eq!(sender.resource_hash.len(), 32);
685 assert_eq!(sender.expected_proof.len(), 32);
686 }
687
688 #[test]
689 fn test_sender_with_metadata() {
690 let mut rng = rns_crypto::FixedRng::new(&[0x55; 64]);
691 let sender = ResourceSender::new(
692 b"data",
693 Some(b"metadata"),
694 RESOURCE_SDU,
695 &identity_encrypt,
696 &NoopCompressor,
697 &mut rng,
698 1000.0,
699 false,
700 false,
701 None,
702 1,
703 1,
704 None,
705 0.5,
706 6.0,
707 )
708 .unwrap();
709 assert!(sender.flags.has_metadata);
710 }
711
712 #[test]
713 fn test_multi_segment_sender() {
714 let orig_hash = [0xBB; 32];
715 let mut rng = rns_crypto::FixedRng::new(&[0x66; 64]);
716 let sender = ResourceSender::new(
717 b"segment 2 data",
718 None,
719 RESOURCE_SDU,
720 &identity_encrypt,
721 &NoopCompressor,
722 &mut rng,
723 1000.0,
724 false,
725 false,
726 None,
727 2,
728 5,
729 Some(orig_hash),
730 0.5,
731 6.0,
732 )
733 .unwrap();
734 assert_eq!(sender.segment_index, 2);
735 assert_eq!(sender.total_segments, 5);
736 assert_eq!(sender.original_hash, orig_hash);
737 assert!(sender.flags.split);
738 }
739}