1use std::collections::BTreeSet;
2use std::collections::hash_map::Entry as HashMapEntry;
3use std::mem::take;
4use std::sync::atomic::Ordering;
5use std::sync::{Arc, Mutex};
6
7use more_asserts::{debug_assert_ge, debug_assert_le};
8use xet_core_structures::MerkleHashMap;
9use xet_core_structures::merklehash::MerkleHash;
10
11use super::UniqueID;
12use super::progress_types::{GroupProgress, ItemProgressUpdater};
13
14pub struct FileXorbDependency {
15 pub file_id: u64,
16 pub xorb_hash: MerkleHash,
17 pub n_bytes: u64,
18 pub is_external: bool,
19}
20
21pub type CompletionTrackerFileId = u64;
25
26#[derive(Default)]
28struct XorbDependency {
29 file_indices: BTreeSet<usize>,
31
32 completed_bytes: u64,
34
35 xorb_size: u64,
37
38 is_completed: bool,
40}
41
42#[derive(Default, Debug)]
43struct XorbPartCompletionStats {
44 completed_bytes: u64,
45 n_bytes: u64,
46}
47
48struct FileDependency {
50 tracking_id: UniqueID,
51 updater: Arc<ItemProgressUpdater>,
52 name: Arc<str>,
53 total_bytes: u64,
54 is_final_size_known: bool,
55 completed_bytes: u64,
56 remaining_xorbs_parts: MerkleHashMap<XorbPartCompletionStats>,
57}
58
59#[derive(Default)]
63struct CompletionTrackerImpl {
64 files: Vec<FileDependency>,
65 xorbs: MerkleHashMap<XorbDependency>,
66
67 total_upload_bytes: u64,
68 total_upload_bytes_completed: u64,
69
70 total_bytes: u64,
71 total_bytes_completed: u64,
72}
73
74pub struct CompletionTracker {
75 inner: Mutex<CompletionTrackerImpl>,
76 group: Arc<GroupProgress>,
77}
78
79impl CompletionTrackerImpl {
80 fn register_new_file(
81 &mut self,
82 updater: Arc<ItemProgressUpdater>,
83 n_bytes: Option<u64>,
84 ) -> CompletionTrackerFileId {
85 let (total_bytes, is_final_size_known) = match n_bytes {
86 Some(size) => (size, true),
87 None => (0, false),
88 };
89
90 updater.update_item_size(total_bytes, n_bytes.is_some());
91
92 let file_id = self.files.len() as CompletionTrackerFileId;
93 let tracking_id = updater.item().id;
94 let name = updater.item().name.clone();
95
96 let file_dependency = FileDependency {
97 tracking_id,
98 updater,
99 name,
100 total_bytes,
101 is_final_size_known,
102 completed_bytes: 0,
103 remaining_xorbs_parts: MerkleHashMap::new(),
104 };
105
106 self.files.push(file_dependency);
107 self.total_bytes += total_bytes;
108
109 file_id
110 }
111
112 fn increment_file_size(&mut self, file_id: CompletionTrackerFileId, size_increment: u64) {
113 let file_entry = &mut self.files[file_id as usize];
114
115 if file_entry.is_final_size_known {
116 return;
117 }
118
119 file_entry.total_bytes += size_increment;
120 self.total_bytes += size_increment;
121
122 debug_assert_ge!(file_entry.total_bytes, file_entry.completed_bytes);
123 debug_assert_ge!(self.total_bytes, self.total_bytes_completed);
124
125 file_entry.updater.update_item_size(file_entry.total_bytes, false);
126 }
127
128 fn register_dependencies(&mut self, dependencies: &[FileXorbDependency]) {
129 let mut file_bytes_processed = 0;
130
131 for dep in dependencies {
132 let file_entry = &mut self.files[dep.file_id as usize];
133
134 if dep.is_external {
135 file_entry.completed_bytes += dep.n_bytes;
136 debug_assert_le!(file_entry.completed_bytes, file_entry.total_bytes);
137
138 file_entry.updater.report_bytes_completed(dep.n_bytes);
139 file_bytes_processed += dep.n_bytes;
140 } else {
141 debug_assert_ne!(dep.xorb_hash, MerkleHash::marker());
142
143 let entry = self.xorbs.entry(dep.xorb_hash).or_default();
144
145 if entry.is_completed {
146 file_entry.completed_bytes += dep.n_bytes;
147 debug_assert_le!(file_entry.completed_bytes, file_entry.total_bytes);
148
149 file_entry.updater.report_bytes_completed(dep.n_bytes);
150 file_bytes_processed += dep.n_bytes;
151 } else {
152 entry.file_indices.insert(dep.file_id as usize);
153 file_entry.remaining_xorbs_parts.entry(dep.xorb_hash).or_default().n_bytes += dep.n_bytes;
154 }
155 }
156 }
157
158 self.total_bytes_completed += file_bytes_processed;
159 debug_assert_le!(self.total_bytes_completed, self.total_bytes);
160 }
161
162 fn register_new_xorb(&mut self, group: &Arc<GroupProgress>, xorb_hash: MerkleHash, xorb_size: u64) -> bool {
163 match self.xorbs.entry(xorb_hash) {
164 HashMapEntry::Occupied(mut occupied_entry) => {
165 let entry = occupied_entry.get_mut();
166 if entry.xorb_size == 0 {
167 entry.xorb_size = xorb_size;
168 self.total_upload_bytes += xorb_size;
169 group.total_transfer_bytes.fetch_add(xorb_size, Ordering::Release);
170 true
171 } else {
172 debug_assert_eq!(entry.xorb_size, xorb_size);
173 false
174 }
175 },
176 HashMapEntry::Vacant(vacant_entry) => {
177 vacant_entry.insert(XorbDependency {
178 file_indices: Default::default(),
179 xorb_size,
180 completed_bytes: 0,
181 is_completed: false,
182 });
183
184 self.total_upload_bytes += xorb_size;
185 group.total_transfer_bytes.fetch_add(xorb_size, Ordering::Release);
186 true
187 },
188 }
189 }
190
191 fn register_xorb_upload_completion(&mut self, group: &Arc<GroupProgress>, xorb_hash: MerkleHash) {
192 let (file_indices, byte_completion_increment) = {
193 let entry = self.xorbs.entry(xorb_hash).or_default();
194
195 if entry.is_completed {
196 return;
197 }
198
199 let new_byte_increment = entry.xorb_size - entry.completed_bytes;
200 entry.is_completed = true;
201
202 (take(&mut entry.file_indices), new_byte_increment)
203 };
204
205 let mut file_bytes_processed = 0;
206
207 for file_id in file_indices {
208 let file_entry = &mut self.files[file_id];
209
210 debug_assert!(file_entry.remaining_xorbs_parts.contains_key(&xorb_hash));
211
212 let xorb_part = file_entry.remaining_xorbs_parts.remove(&xorb_hash).unwrap_or_default();
213 debug_assert_le!(xorb_part.completed_bytes, xorb_part.n_bytes);
214
215 let n_bytes_remaining = xorb_part.n_bytes - xorb_part.completed_bytes;
216
217 if n_bytes_remaining > 0 {
218 file_entry.completed_bytes += n_bytes_remaining;
219 file_entry.updater.report_bytes_completed(n_bytes_remaining);
220 file_bytes_processed += n_bytes_remaining;
221 }
222 }
223
224 debug_assert_le!(self.total_upload_bytes_completed + byte_completion_increment, self.total_upload_bytes);
225 self.total_upload_bytes_completed += byte_completion_increment;
226 group
227 .total_transfer_bytes_completed
228 .fetch_add(byte_completion_increment, Ordering::Release);
229
230 self.total_bytes_completed += file_bytes_processed;
231 debug_assert_le!(self.total_bytes_completed, self.total_bytes);
232 }
233
234 fn register_xorb_upload_progress(
235 &mut self,
236 group: &Arc<GroupProgress>,
237 xorb_hash: MerkleHash,
238 new_byte_progress: u64,
239 check_ordering: bool,
240 ) {
241 debug_assert!(self.xorbs.contains_key(&xorb_hash));
242
243 let entry = self.xorbs.entry(xorb_hash).or_default();
244
245 if !check_ordering && entry.is_completed {
246 return;
247 }
248
249 debug_assert!(!entry.is_completed);
250 debug_assert_le!(entry.completed_bytes + new_byte_progress, entry.xorb_size);
251
252 entry.completed_bytes += new_byte_progress;
253
254 let new_completion_ratio = (entry.completed_bytes as f64) / (entry.xorb_size as f64);
255
256 let mut file_bytes_processed = 0;
257
258 for &file_id in entry.file_indices.iter() {
259 let file_entry = &mut self.files[file_id];
260
261 debug_assert!(file_entry.remaining_xorbs_parts.contains_key(&xorb_hash));
262
263 let incremental_update = 'update: {
264 let Some(xorb_part) = file_entry.remaining_xorbs_parts.get_mut(&xorb_hash) else {
265 break 'update 0;
266 };
267 debug_assert_le!(xorb_part.completed_bytes, xorb_part.n_bytes);
268
269 let new_completion_bytes = ((xorb_part.n_bytes as f64) * new_completion_ratio).floor() as u64;
270
271 debug_assert_ge!(new_completion_bytes, xorb_part.completed_bytes);
272
273 let incremental_update = new_completion_bytes.saturating_sub(xorb_part.completed_bytes);
274 xorb_part.completed_bytes += incremental_update;
275
276 debug_assert_le!(xorb_part.completed_bytes, xorb_part.n_bytes);
277
278 incremental_update
279 };
280
281 if incremental_update != 0 {
282 file_entry.completed_bytes += incremental_update;
283 file_entry.updater.report_bytes_completed(incremental_update);
284 file_bytes_processed += incremental_update;
285 }
286 }
287
288 self.total_upload_bytes_completed += new_byte_progress;
289 debug_assert_le!(self.total_upload_bytes_completed, self.total_upload_bytes);
290
291 group
292 .total_transfer_bytes_completed
293 .fetch_add(new_byte_progress, Ordering::Release);
294
295 self.total_bytes_completed += file_bytes_processed;
296 debug_assert_le!(self.total_bytes_completed, self.total_bytes);
297 }
298
299 fn status(&self) -> (u64, u64) {
300 let (mut sum_completed, mut sum_total) = (0, 0);
301 for file in &self.files {
302 sum_completed += file.completed_bytes;
303 sum_total += file.total_bytes;
304 }
305 (sum_completed, sum_total)
306 }
307
308 fn is_complete(&self) -> bool {
309 let (done, total) = self.status();
310
311 #[cfg(debug_assertions)]
312 {
313 if done == total {
314 self.assert_complete();
315 }
316 }
317
318 done == total
319 }
320
321 fn assert_complete(&self) {
322 for (idx, file) in self.files.iter().enumerate() {
323 assert_eq!(
324 file.completed_bytes, file.total_bytes,
325 "File #{} ({}, {}) is not fully completed: {}/{} bytes",
326 idx, file.name, file.tracking_id, file.completed_bytes, file.total_bytes
327 );
328 assert!(
329 file.remaining_xorbs_parts.is_empty(),
330 "File #{} ({}) still has uncompleted xorb parts: {:?}",
331 idx,
332 file.name,
333 file.remaining_xorbs_parts
334 );
335 }
336
337 for (hash, xorb_dep) in self.xorbs.iter() {
338 assert!(xorb_dep.is_completed, "Xorb {hash:?} is not marked completed.");
339 assert!(
340 xorb_dep.file_indices.is_empty(),
341 "Xorb {:?} still has file references: {:?}",
342 hash,
343 xorb_dep.file_indices
344 );
345 }
346 }
347}
348
349impl CompletionTracker {
350 pub fn new(group: Arc<GroupProgress>) -> Self {
351 Self {
352 inner: Mutex::new(CompletionTrackerImpl::default()),
353 group,
354 }
355 }
356
357 pub fn register_new_file(
358 &self,
359 updater: Arc<ItemProgressUpdater>,
360 n_bytes: Option<u64>,
361 ) -> CompletionTrackerFileId {
362 let mut update_lock = self.inner.lock().unwrap();
363 update_lock.register_new_file(updater, n_bytes)
364 }
365
366 pub fn increment_file_size(&self, file_id: CompletionTrackerFileId, size_increment: u64) {
367 let mut update_lock = self.inner.lock().unwrap();
368 update_lock.increment_file_size(file_id, size_increment);
369 }
370
371 pub fn register_new_xorb(&self, xorb_hash: MerkleHash, xorb_size: u64) -> bool {
372 let mut update_lock = self.inner.lock().unwrap();
373 update_lock.register_new_xorb(&self.group, xorb_hash, xorb_size)
374 }
375
376 pub fn register_dependencies(&self, dependencies: &[FileXorbDependency]) {
377 let mut update_lock = self.inner.lock().unwrap();
378 update_lock.register_dependencies(dependencies);
379 }
380
381 pub fn register_xorb_upload_completion(&self, xorb_hash: MerkleHash) {
382 let mut update_lock = self.inner.lock().unwrap();
383 update_lock.register_xorb_upload_completion(&self.group, xorb_hash);
384 }
385
386 pub fn register_xorb_upload_progress(&self, xorb_hash: MerkleHash, new_byte_progress: u64) {
387 self.register_xorb_upload_progress_impl(xorb_hash, new_byte_progress, true);
388 }
389
390 pub fn register_xorb_upload_progress_background(self: Arc<Self>, xorb_hash: MerkleHash, new_byte_progress: u64) {
391 tokio::spawn(async move {
392 self.register_xorb_upload_progress_impl(xorb_hash, new_byte_progress, false);
393 });
394 }
395
396 fn register_xorb_upload_progress_impl(&self, xorb_hash: MerkleHash, new_byte_progress: u64, check_ordering: bool) {
397 let mut update_lock = self.inner.lock().unwrap();
398 update_lock.register_xorb_upload_progress(&self.group, xorb_hash, new_byte_progress, check_ordering);
399 }
400
401 pub fn status(&self) -> (u64, u64) {
402 self.inner.lock().unwrap().status()
403 }
404
405 pub fn is_complete(&self) -> bool {
406 self.inner.lock().unwrap().is_complete()
407 }
408
409 pub fn assert_complete(&self) {
410 self.inner.lock().unwrap().assert_complete();
411 }
412}
413
414#[cfg(test)]
415mod tests {
416 use xet_core_structures::merklehash::MerkleHash;
417
418 use super::*;
419
420 #[test]
421 fn test_status_and_is_complete() {
422 let group = GroupProgress::new();
423 let tracker = CompletionTracker::new(group.clone());
424
425 let updater_a = group.new_item(UniqueID::new(), "fileA");
426 let file_a = tracker.register_new_file(updater_a, Some(100));
427
428 let updater_b = group.new_item(UniqueID::new(), "fileB");
429 let file_b = tracker.register_new_file(updater_b, Some(50));
430
431 let (done, total) = tracker.status();
432 assert_eq!(done, 0);
433 assert_eq!(total, 150);
434 assert!(!tracker.is_complete());
435
436 let x = MerkleHash::random_from_seed(1);
437 tracker.register_dependencies(&[FileXorbDependency {
438 file_id: file_a,
439 xorb_hash: x,
440 n_bytes: 100,
441 is_external: true,
442 }]);
443
444 let (done, total) = tracker.status();
445 assert_eq!(done, 100);
446 assert_eq!(total, 150);
447 assert!(!tracker.is_complete());
448
449 let y = MerkleHash::random_from_seed(2);
450 tracker.register_dependencies(&[FileXorbDependency {
451 file_id: file_b,
452 xorb_hash: y,
453 n_bytes: 50,
454 is_external: false,
455 }]);
456
457 let (done, total) = tracker.status();
458 assert_eq!(done, 100);
459 assert_eq!(total, 150);
460
461 tracker.register_new_xorb(y, 50);
462 tracker.register_xorb_upload_completion(y);
463
464 let (done, total) = tracker.status();
465 assert_eq!(done, 150);
466 assert_eq!(total, 150);
467 assert!(tracker.is_complete());
468
469 tracker.assert_complete();
470 group.assert_complete();
471 }
472
473 #[test]
474 fn test_multiple_files_one_shared_xorb() {
475 let group = GroupProgress::new();
476 let tracker = CompletionTracker::new(group.clone());
477
478 let updater_a = group.new_item(UniqueID::new(), "fileA");
479 let file_a = tracker.register_new_file(updater_a, Some(200));
480
481 let updater_b = group.new_item(UniqueID::new(), "fileB");
482 let file_b = tracker.register_new_file(updater_b, Some(300));
483
484 let (done, total) = tracker.status();
485 assert_eq!(done, 0);
486 assert_eq!(total, 500);
487
488 let xhash = MerkleHash::random_from_seed(1);
489
490 tracker.register_new_xorb(xhash, 1000);
491
492 tracker.register_dependencies(&[
493 FileXorbDependency {
494 file_id: file_a,
495 xorb_hash: xhash,
496 n_bytes: 100,
497 is_external: false,
498 },
499 FileXorbDependency {
500 file_id: file_b,
501 xorb_hash: xhash,
502 n_bytes: 200,
503 is_external: true,
504 },
505 ]);
506
507 let (done, total) = tracker.status();
508 assert_eq!(done, 200);
509 assert_eq!(total, 500);
510 assert!(!tracker.is_complete());
511
512 tracker.register_xorb_upload_completion(xhash);
513
514 let (done, total) = tracker.status();
515 assert_eq!(done, 300);
516 assert_eq!(total, 500);
517
518 let x2 = MerkleHash::random_from_seed(2);
519
520 tracker.register_new_xorb(x2, 1000);
521
522 tracker.register_dependencies(&[FileXorbDependency {
523 file_id: file_a,
524 xorb_hash: x2,
525 n_bytes: 100,
526 is_external: true,
527 }]);
528
529 let (done, total) = tracker.status();
530 assert_eq!(done, 400);
531 assert_eq!(total, 500);
532
533 tracker.register_dependencies(&[FileXorbDependency {
534 file_id: file_b,
535 xorb_hash: x2,
536 n_bytes: 100,
537 is_external: false,
538 }]);
539
540 let (done, total) = tracker.status();
541 assert_eq!(done, 400);
542 assert_eq!(total, 500);
543 assert!(!tracker.is_complete());
544
545 tracker.register_xorb_upload_completion(x2);
546 let (done, total) = tracker.status();
547 assert_eq!(done, 500);
548 assert_eq!(total, 500);
549 assert!(tracker.is_complete());
550
551 tracker.assert_complete();
552 group.assert_complete();
553 }
554
555 #[test]
556 fn test_single_file_multiple_xorbs() {
557 let group = GroupProgress::new();
558 let tracker = CompletionTracker::new(group.clone());
559
560 let updater = group.new_item(UniqueID::new(), "bigFile");
561 let f = tracker.register_new_file(updater, Some(300));
562
563 let x1 = MerkleHash::random_from_seed(1);
564 let x2 = MerkleHash::random_from_seed(2);
565 let x3 = MerkleHash::random_from_seed(3);
566
567 tracker.register_new_xorb(x1, 100);
568 tracker.register_new_xorb(x3, 100);
569
570 tracker.register_dependencies(&[
571 FileXorbDependency {
572 file_id: f,
573 xorb_hash: x1,
574 n_bytes: 100,
575 is_external: false,
576 },
577 FileXorbDependency {
578 file_id: f,
579 xorb_hash: x2,
580 n_bytes: 100,
581 is_external: true,
582 },
583 FileXorbDependency {
584 file_id: f,
585 xorb_hash: x3,
586 n_bytes: 100,
587 is_external: false,
588 },
589 ]);
590
591 let (done, total) = tracker.status();
592 assert_eq!(done, 100);
593 assert_eq!(total, 300);
594 assert!(!tracker.is_complete());
595
596 tracker.register_xorb_upload_completion(x1);
597 let (done, total) = tracker.status();
598 assert_eq!(done, 200);
599 assert_eq!(total, 300);
600 assert!(!tracker.is_complete());
601
602 tracker.register_xorb_upload_completion(x3);
603 let (done, total) = tracker.status();
604 assert_eq!(done, 300);
605 assert_eq!(total, 300);
606 assert!(tracker.is_complete());
607
608 tracker.assert_complete();
609 group.assert_complete();
610 }
611
612 #[test]
613 fn test_xorb_completed_before_dependencies() {
614 let group = GroupProgress::new();
615 let tracker = CompletionTracker::new(group.clone());
616
617 let updater = group.new_item(UniqueID::new(), "lateFile");
618 let file_id = tracker.register_new_file(updater, Some(50));
619
620 let x = MerkleHash::random_from_seed(999);
621 tracker.register_new_xorb(x, 1000);
622
623 tracker.register_xorb_upload_completion(x);
624
625 tracker.register_dependencies(&[FileXorbDependency {
626 file_id,
627 xorb_hash: x,
628 n_bytes: 50,
629 is_external: false,
630 }]);
631
632 let (done, total) = tracker.status();
633 assert_eq!(done, 50);
634 assert_eq!(total, 50);
635 assert!(tracker.is_complete());
636
637 tracker.assert_complete();
638 group.assert_complete();
639 }
640
641 #[test]
642 fn test_contradictory_logic_with_completed_xorb() {
643 let group = GroupProgress::new();
644 let tracker = CompletionTracker::new(group.clone());
645
646 let updater = group.new_item(UniqueID::new(), "someFile");
647 let file_id = tracker.register_new_file(updater, Some(100));
648 let x = MerkleHash::random_from_seed(123);
649
650 tracker.register_new_xorb(x, 1000);
651
652 tracker.register_xorb_upload_completion(x);
653
654 tracker.register_dependencies(&[FileXorbDependency {
655 file_id,
656 xorb_hash: x,
657 n_bytes: 100,
658 is_external: false,
659 }]);
660
661 let (done, total) = tracker.status();
662 assert_eq!(done, 100);
663 assert_eq!(total, 100);
664 assert!(tracker.is_complete());
665
666 tracker.assert_complete();
667 group.assert_complete();
668 }
669
670 #[test]
671 fn test_increment_file_size_basic() {
672 let group = GroupProgress::new();
673 let tracker = CompletionTracker::new(group.clone());
674
675 let updater = group.new_item(UniqueID::new(), "growingFile");
676 let file_id = tracker.register_new_file(updater, None);
677
678 let (done, total) = tracker.status();
679 assert_eq!(done, 0);
680 assert_eq!(total, 0);
681
682 tracker.increment_file_size(file_id, 100);
683 let (done, total) = tracker.status();
684 assert_eq!(done, 0);
685 assert_eq!(total, 100);
686
687 tracker.increment_file_size(file_id, 150);
688 let (done, total) = tracker.status();
689 assert_eq!(done, 0);
690 assert_eq!(total, 250);
691
692 tracker.increment_file_size(file_id, 50);
693 let (done, total) = tracker.status();
694 assert_eq!(done, 0);
695 assert_eq!(total, 300);
696
697 let x = MerkleHash::random_from_seed(1);
698 tracker.register_dependencies(&[FileXorbDependency {
699 file_id,
700 xorb_hash: x,
701 n_bytes: 300,
702 is_external: true,
703 }]);
704
705 let (done, total) = tracker.status();
706 assert_eq!(done, 300);
707 assert_eq!(total, 300);
708 assert!(tracker.is_complete());
709
710 tracker.assert_complete();
711 group.assert_complete();
712 }
713
714 #[test]
715 fn test_increment_file_size_with_xorb_uploads() {
716 let group = GroupProgress::new();
717 let tracker = CompletionTracker::new(group.clone());
718
719 let updater = group.new_item(UniqueID::new(), "streamFile");
720 let file_id = tracker.register_new_file(updater, None);
721
722 let x1 = MerkleHash::random_from_seed(10);
723 let x2 = MerkleHash::random_from_seed(20);
724
725 tracker.register_new_xorb(x1, 500);
726 tracker.register_new_xorb(x2, 500);
727
728 tracker.increment_file_size(file_id, 200);
729 tracker.register_dependencies(&[FileXorbDependency {
730 file_id,
731 xorb_hash: x1,
732 n_bytes: 200,
733 is_external: false,
734 }]);
735
736 let (done, total) = tracker.status();
737 assert_eq!(done, 0);
738 assert_eq!(total, 200);
739
740 tracker.register_xorb_upload_completion(x1);
741 let (done, total) = tracker.status();
742 assert_eq!(done, 200);
743 assert_eq!(total, 200);
744
745 tracker.increment_file_size(file_id, 300);
746 tracker.register_dependencies(&[FileXorbDependency {
747 file_id,
748 xorb_hash: x2,
749 n_bytes: 300,
750 is_external: false,
751 }]);
752
753 let (done, total) = tracker.status();
754 assert_eq!(done, 200);
755 assert_eq!(total, 500);
756
757 tracker.register_xorb_upload_completion(x2);
758 let (done, total) = tracker.status();
759 assert_eq!(done, 500);
760 assert_eq!(total, 500);
761 assert!(tracker.is_complete());
762
763 tracker.assert_complete();
764 group.assert_complete();
765 }
766
767 #[test]
768 fn test_increment_file_size_mixed_known_unknown() {
769 let group = GroupProgress::new();
770 let tracker = CompletionTracker::new(group.clone());
771
772 let updater_a = group.new_item(UniqueID::new(), "fileA");
773 let file_a = tracker.register_new_file(updater_a, Some(100));
774
775 let updater_b = group.new_item(UniqueID::new(), "fileB");
776 let file_b = tracker.register_new_file(updater_b, None);
777
778 let (done, total) = tracker.status();
779 assert_eq!(done, 0);
780 assert_eq!(total, 100);
781
782 let xa = MerkleHash::random_from_seed(1);
783 tracker.register_dependencies(&[FileXorbDependency {
784 file_id: file_a,
785 xorb_hash: xa,
786 n_bytes: 100,
787 is_external: true,
788 }]);
789
790 let (done, total) = tracker.status();
791 assert_eq!(done, 100);
792 assert_eq!(total, 100);
793
794 tracker.increment_file_size(file_b, 200);
795 let (done, total) = tracker.status();
796 assert_eq!(done, 100);
797 assert_eq!(total, 300);
798
799 let xb = MerkleHash::random_from_seed(2);
800 tracker.register_new_xorb(xb, 200);
801 tracker.register_dependencies(&[FileXorbDependency {
802 file_id: file_b,
803 xorb_hash: xb,
804 n_bytes: 200,
805 is_external: false,
806 }]);
807
808 tracker.register_xorb_upload_completion(xb);
809
810 let (done, total) = tracker.status();
811 assert_eq!(done, 300);
812 assert_eq!(total, 300);
813 assert!(tracker.is_complete());
814
815 tracker.assert_complete();
816 group.assert_complete();
817 }
818
819 #[test]
820 fn test_increment_file_size_ignored_when_already_final() {
821 let group = GroupProgress::new();
822 let tracker = CompletionTracker::new(group.clone());
823
824 let updater = group.new_item(UniqueID::new(), "fixedFile");
825 let file_id = tracker.register_new_file(updater, Some(100));
826
827 tracker.increment_file_size(file_id, 999);
828 let (_, total) = tracker.status();
829 assert_eq!(total, 100);
830
831 let x = MerkleHash::random_from_seed(1);
832 tracker.register_dependencies(&[FileXorbDependency {
833 file_id,
834 xorb_hash: x,
835 n_bytes: 100,
836 is_external: true,
837 }]);
838
839 assert!(tracker.is_complete());
840 tracker.assert_complete();
841 group.assert_complete();
842 }
843
844 #[test]
845 fn test_increment_file_size_with_partial_xorb_progress() {
846 let group = GroupProgress::new();
847 let tracker = CompletionTracker::new(group.clone());
848
849 let updater = group.new_item(UniqueID::new(), "partialFile");
850 let file_id = tracker.register_new_file(updater, None);
851
852 let x = MerkleHash::random_from_seed(42);
853 tracker.register_new_xorb(x, 1000);
854
855 tracker.increment_file_size(file_id, 400);
856 tracker.register_dependencies(&[FileXorbDependency {
857 file_id,
858 xorb_hash: x,
859 n_bytes: 400,
860 is_external: false,
861 }]);
862
863 tracker.register_xorb_upload_progress(x, 500);
864 let (done, total) = tracker.status();
865 assert_eq!(total, 400);
866 assert!(done > 0);
867 assert!(done < 400);
868
869 tracker.increment_file_size(file_id, 200);
870 let (_, total) = tracker.status();
871 assert_eq!(total, 600);
872
873 tracker.register_dependencies(&[FileXorbDependency {
874 file_id,
875 xorb_hash: MerkleHash::random_from_seed(99),
876 n_bytes: 200,
877 is_external: true,
878 }]);
879
880 tracker.register_xorb_upload_completion(x);
881
882 let (done, total) = tracker.status();
883 assert_eq!(done, 600);
884 assert_eq!(total, 600);
885 assert!(tracker.is_complete());
886
887 tracker.assert_complete();
888 group.assert_complete();
889 }
890}