Skip to main content

xet_data/progress_tracking/
upload_tracking.rs

1use std::collections::BTreeSet;
2use std::collections::hash_map::Entry as HashMapEntry;
3use std::mem::take;
4use std::sync::atomic::Ordering;
5use std::sync::{Arc, Mutex};
6
7use more_asserts::{debug_assert_ge, debug_assert_le};
8use xet_core_structures::MerkleHashMap;
9use xet_core_structures::merklehash::MerkleHash;
10
11use super::UniqueID;
12use super::progress_types::{GroupProgress, ItemProgressUpdater};
13
14pub struct FileXorbDependency {
15    pub file_id: u64,
16    pub xorb_hash: MerkleHash,
17    pub n_bytes: u64,
18    pub is_external: bool,
19}
20
21/// A type with with to track a File ID; reporting is done by Arc<str>, but
22/// this ensures the bookkeeping is correct across duplicates and speeds up the
23/// updates.
24pub type CompletionTrackerFileId = u64;
25
26/// Keeps track of which files depend on a given xorb.
27#[derive(Default)]
28struct XorbDependency {
29    /// List of file indices that need this xorb.
30    file_indices: BTreeSet<usize>,
31
32    /// Number of bytes completed so far
33    completed_bytes: u64,
34
35    /// Number of bytes in that xorb.
36    xorb_size: u64,
37
38    /// True if the xorb has already been updated successfully.
39    is_completed: bool,
40}
41
42#[derive(Default, Debug)]
43struct XorbPartCompletionStats {
44    completed_bytes: u64,
45    n_bytes: u64,
46}
47
48/// Represents a file that depends on one or more xorbs.
49struct FileDependency {
50    tracking_id: UniqueID,
51    updater: Arc<ItemProgressUpdater>,
52    name: Arc<str>,
53    total_bytes: u64,
54    is_final_size_known: bool,
55    completed_bytes: u64,
56    remaining_xorbs_parts: MerkleHashMap<XorbPartCompletionStats>,
57}
58
59/// Tracks all files and all xorbs, allowing you to register file
60/// dependencies on xorbs and then mark xorbs as completed when they
61/// are fully uploaded.
62#[derive(Default)]
63struct CompletionTrackerImpl {
64    files: Vec<FileDependency>,
65    xorbs: MerkleHashMap<XorbDependency>,
66
67    total_upload_bytes: u64,
68    total_upload_bytes_completed: u64,
69
70    total_bytes: u64,
71    total_bytes_completed: u64,
72}
73
74pub struct CompletionTracker {
75    inner: Mutex<CompletionTrackerImpl>,
76    group: Arc<GroupProgress>,
77}
78
79impl CompletionTrackerImpl {
80    fn register_new_file(
81        &mut self,
82        updater: Arc<ItemProgressUpdater>,
83        n_bytes: Option<u64>,
84    ) -> CompletionTrackerFileId {
85        let (total_bytes, is_final_size_known) = match n_bytes {
86            Some(size) => (size, true),
87            None => (0, false),
88        };
89
90        updater.update_item_size(total_bytes, n_bytes.is_some());
91
92        let file_id = self.files.len() as CompletionTrackerFileId;
93        let tracking_id = updater.item().id;
94        let name = updater.item().name.clone();
95
96        let file_dependency = FileDependency {
97            tracking_id,
98            updater,
99            name,
100            total_bytes,
101            is_final_size_known,
102            completed_bytes: 0,
103            remaining_xorbs_parts: MerkleHashMap::new(),
104        };
105
106        self.files.push(file_dependency);
107        self.total_bytes += total_bytes;
108
109        file_id
110    }
111
112    fn increment_file_size(&mut self, file_id: CompletionTrackerFileId, size_increment: u64) {
113        let file_entry = &mut self.files[file_id as usize];
114
115        if file_entry.is_final_size_known {
116            return;
117        }
118
119        file_entry.total_bytes += size_increment;
120        self.total_bytes += size_increment;
121
122        debug_assert_ge!(file_entry.total_bytes, file_entry.completed_bytes);
123        debug_assert_ge!(self.total_bytes, self.total_bytes_completed);
124
125        file_entry.updater.update_item_size(file_entry.total_bytes, false);
126    }
127
128    fn register_dependencies(&mut self, dependencies: &[FileXorbDependency]) {
129        let mut file_bytes_processed = 0;
130
131        for dep in dependencies {
132            let file_entry = &mut self.files[dep.file_id as usize];
133
134            if dep.is_external {
135                file_entry.completed_bytes += dep.n_bytes;
136                debug_assert_le!(file_entry.completed_bytes, file_entry.total_bytes);
137
138                file_entry.updater.report_bytes_completed(dep.n_bytes);
139                file_bytes_processed += dep.n_bytes;
140            } else {
141                debug_assert_ne!(dep.xorb_hash, MerkleHash::marker());
142
143                let entry = self.xorbs.entry(dep.xorb_hash).or_default();
144
145                if entry.is_completed {
146                    file_entry.completed_bytes += dep.n_bytes;
147                    debug_assert_le!(file_entry.completed_bytes, file_entry.total_bytes);
148
149                    file_entry.updater.report_bytes_completed(dep.n_bytes);
150                    file_bytes_processed += dep.n_bytes;
151                } else {
152                    entry.file_indices.insert(dep.file_id as usize);
153                    file_entry.remaining_xorbs_parts.entry(dep.xorb_hash).or_default().n_bytes += dep.n_bytes;
154                }
155            }
156        }
157
158        self.total_bytes_completed += file_bytes_processed;
159        debug_assert_le!(self.total_bytes_completed, self.total_bytes);
160    }
161
162    fn register_new_xorb(&mut self, group: &Arc<GroupProgress>, xorb_hash: MerkleHash, xorb_size: u64) -> bool {
163        match self.xorbs.entry(xorb_hash) {
164            HashMapEntry::Occupied(mut occupied_entry) => {
165                let entry = occupied_entry.get_mut();
166                if entry.xorb_size == 0 {
167                    entry.xorb_size = xorb_size;
168                    self.total_upload_bytes += xorb_size;
169                    group.total_transfer_bytes.fetch_add(xorb_size, Ordering::Release);
170                    true
171                } else {
172                    debug_assert_eq!(entry.xorb_size, xorb_size);
173                    false
174                }
175            },
176            HashMapEntry::Vacant(vacant_entry) => {
177                vacant_entry.insert(XorbDependency {
178                    file_indices: Default::default(),
179                    xorb_size,
180                    completed_bytes: 0,
181                    is_completed: false,
182                });
183
184                self.total_upload_bytes += xorb_size;
185                group.total_transfer_bytes.fetch_add(xorb_size, Ordering::Release);
186                true
187            },
188        }
189    }
190
191    fn register_xorb_upload_completion(&mut self, group: &Arc<GroupProgress>, xorb_hash: MerkleHash) {
192        let (file_indices, byte_completion_increment) = {
193            let entry = self.xorbs.entry(xorb_hash).or_default();
194
195            if entry.is_completed {
196                return;
197            }
198
199            let new_byte_increment = entry.xorb_size - entry.completed_bytes;
200            entry.is_completed = true;
201
202            (take(&mut entry.file_indices), new_byte_increment)
203        };
204
205        let mut file_bytes_processed = 0;
206
207        for file_id in file_indices {
208            let file_entry = &mut self.files[file_id];
209
210            debug_assert!(file_entry.remaining_xorbs_parts.contains_key(&xorb_hash));
211
212            let xorb_part = file_entry.remaining_xorbs_parts.remove(&xorb_hash).unwrap_or_default();
213            debug_assert_le!(xorb_part.completed_bytes, xorb_part.n_bytes);
214
215            let n_bytes_remaining = xorb_part.n_bytes - xorb_part.completed_bytes;
216
217            if n_bytes_remaining > 0 {
218                file_entry.completed_bytes += n_bytes_remaining;
219                file_entry.updater.report_bytes_completed(n_bytes_remaining);
220                file_bytes_processed += n_bytes_remaining;
221            }
222        }
223
224        debug_assert_le!(self.total_upload_bytes_completed + byte_completion_increment, self.total_upload_bytes);
225        self.total_upload_bytes_completed += byte_completion_increment;
226        group
227            .total_transfer_bytes_completed
228            .fetch_add(byte_completion_increment, Ordering::Release);
229
230        self.total_bytes_completed += file_bytes_processed;
231        debug_assert_le!(self.total_bytes_completed, self.total_bytes);
232    }
233
234    fn register_xorb_upload_progress(
235        &mut self,
236        group: &Arc<GroupProgress>,
237        xorb_hash: MerkleHash,
238        new_byte_progress: u64,
239        check_ordering: bool,
240    ) {
241        debug_assert!(self.xorbs.contains_key(&xorb_hash));
242
243        let entry = self.xorbs.entry(xorb_hash).or_default();
244
245        if !check_ordering && entry.is_completed {
246            return;
247        }
248
249        debug_assert!(!entry.is_completed);
250        debug_assert_le!(entry.completed_bytes + new_byte_progress, entry.xorb_size);
251
252        entry.completed_bytes += new_byte_progress;
253
254        let new_completion_ratio = (entry.completed_bytes as f64) / (entry.xorb_size as f64);
255
256        let mut file_bytes_processed = 0;
257
258        for &file_id in entry.file_indices.iter() {
259            let file_entry = &mut self.files[file_id];
260
261            debug_assert!(file_entry.remaining_xorbs_parts.contains_key(&xorb_hash));
262
263            let incremental_update = 'update: {
264                let Some(xorb_part) = file_entry.remaining_xorbs_parts.get_mut(&xorb_hash) else {
265                    break 'update 0;
266                };
267                debug_assert_le!(xorb_part.completed_bytes, xorb_part.n_bytes);
268
269                let new_completion_bytes = ((xorb_part.n_bytes as f64) * new_completion_ratio).floor() as u64;
270
271                debug_assert_ge!(new_completion_bytes, xorb_part.completed_bytes);
272
273                let incremental_update = new_completion_bytes.saturating_sub(xorb_part.completed_bytes);
274                xorb_part.completed_bytes += incremental_update;
275
276                debug_assert_le!(xorb_part.completed_bytes, xorb_part.n_bytes);
277
278                incremental_update
279            };
280
281            if incremental_update != 0 {
282                file_entry.completed_bytes += incremental_update;
283                file_entry.updater.report_bytes_completed(incremental_update);
284                file_bytes_processed += incremental_update;
285            }
286        }
287
288        self.total_upload_bytes_completed += new_byte_progress;
289        debug_assert_le!(self.total_upload_bytes_completed, self.total_upload_bytes);
290
291        group
292            .total_transfer_bytes_completed
293            .fetch_add(new_byte_progress, Ordering::Release);
294
295        self.total_bytes_completed += file_bytes_processed;
296        debug_assert_le!(self.total_bytes_completed, self.total_bytes);
297    }
298
299    fn status(&self) -> (u64, u64) {
300        let (mut sum_completed, mut sum_total) = (0, 0);
301        for file in &self.files {
302            sum_completed += file.completed_bytes;
303            sum_total += file.total_bytes;
304        }
305        (sum_completed, sum_total)
306    }
307
308    fn is_complete(&self) -> bool {
309        let (done, total) = self.status();
310
311        #[cfg(debug_assertions)]
312        {
313            if done == total {
314                self.assert_complete();
315            }
316        }
317
318        done == total
319    }
320
321    fn assert_complete(&self) {
322        for (idx, file) in self.files.iter().enumerate() {
323            assert_eq!(
324                file.completed_bytes, file.total_bytes,
325                "File #{} ({}, {}) is not fully completed: {}/{} bytes",
326                idx, file.name, file.tracking_id, file.completed_bytes, file.total_bytes
327            );
328            assert!(
329                file.remaining_xorbs_parts.is_empty(),
330                "File #{} ({}) still has uncompleted xorb parts: {:?}",
331                idx,
332                file.name,
333                file.remaining_xorbs_parts
334            );
335        }
336
337        for (hash, xorb_dep) in self.xorbs.iter() {
338            assert!(xorb_dep.is_completed, "Xorb {hash:?} is not marked completed.");
339            assert!(
340                xorb_dep.file_indices.is_empty(),
341                "Xorb {:?} still has file references: {:?}",
342                hash,
343                xorb_dep.file_indices
344            );
345        }
346    }
347}
348
349impl CompletionTracker {
350    pub fn new(group: Arc<GroupProgress>) -> Self {
351        Self {
352            inner: Mutex::new(CompletionTrackerImpl::default()),
353            group,
354        }
355    }
356
357    pub fn register_new_file(
358        &self,
359        updater: Arc<ItemProgressUpdater>,
360        n_bytes: Option<u64>,
361    ) -> CompletionTrackerFileId {
362        let mut update_lock = self.inner.lock().unwrap();
363        update_lock.register_new_file(updater, n_bytes)
364    }
365
366    pub fn increment_file_size(&self, file_id: CompletionTrackerFileId, size_increment: u64) {
367        let mut update_lock = self.inner.lock().unwrap();
368        update_lock.increment_file_size(file_id, size_increment);
369    }
370
371    pub fn register_new_xorb(&self, xorb_hash: MerkleHash, xorb_size: u64) -> bool {
372        let mut update_lock = self.inner.lock().unwrap();
373        update_lock.register_new_xorb(&self.group, xorb_hash, xorb_size)
374    }
375
376    pub fn register_dependencies(&self, dependencies: &[FileXorbDependency]) {
377        let mut update_lock = self.inner.lock().unwrap();
378        update_lock.register_dependencies(dependencies);
379    }
380
381    pub fn register_xorb_upload_completion(&self, xorb_hash: MerkleHash) {
382        let mut update_lock = self.inner.lock().unwrap();
383        update_lock.register_xorb_upload_completion(&self.group, xorb_hash);
384    }
385
386    pub fn register_xorb_upload_progress(&self, xorb_hash: MerkleHash, new_byte_progress: u64) {
387        self.register_xorb_upload_progress_impl(xorb_hash, new_byte_progress, true);
388    }
389
390    pub fn register_xorb_upload_progress_background(self: Arc<Self>, xorb_hash: MerkleHash, new_byte_progress: u64) {
391        tokio::spawn(async move {
392            self.register_xorb_upload_progress_impl(xorb_hash, new_byte_progress, false);
393        });
394    }
395
396    fn register_xorb_upload_progress_impl(&self, xorb_hash: MerkleHash, new_byte_progress: u64, check_ordering: bool) {
397        let mut update_lock = self.inner.lock().unwrap();
398        update_lock.register_xorb_upload_progress(&self.group, xorb_hash, new_byte_progress, check_ordering);
399    }
400
401    pub fn status(&self) -> (u64, u64) {
402        self.inner.lock().unwrap().status()
403    }
404
405    pub fn is_complete(&self) -> bool {
406        self.inner.lock().unwrap().is_complete()
407    }
408
409    pub fn assert_complete(&self) {
410        self.inner.lock().unwrap().assert_complete();
411    }
412}
413
414#[cfg(test)]
415mod tests {
416    use xet_core_structures::merklehash::MerkleHash;
417
418    use super::*;
419
420    #[test]
421    fn test_status_and_is_complete() {
422        let group = GroupProgress::new();
423        let tracker = CompletionTracker::new(group.clone());
424
425        let updater_a = group.new_item(UniqueID::new(), "fileA");
426        let file_a = tracker.register_new_file(updater_a, Some(100));
427
428        let updater_b = group.new_item(UniqueID::new(), "fileB");
429        let file_b = tracker.register_new_file(updater_b, Some(50));
430
431        let (done, total) = tracker.status();
432        assert_eq!(done, 0);
433        assert_eq!(total, 150);
434        assert!(!tracker.is_complete());
435
436        let x = MerkleHash::random_from_seed(1);
437        tracker.register_dependencies(&[FileXorbDependency {
438            file_id: file_a,
439            xorb_hash: x,
440            n_bytes: 100,
441            is_external: true,
442        }]);
443
444        let (done, total) = tracker.status();
445        assert_eq!(done, 100);
446        assert_eq!(total, 150);
447        assert!(!tracker.is_complete());
448
449        let y = MerkleHash::random_from_seed(2);
450        tracker.register_dependencies(&[FileXorbDependency {
451            file_id: file_b,
452            xorb_hash: y,
453            n_bytes: 50,
454            is_external: false,
455        }]);
456
457        let (done, total) = tracker.status();
458        assert_eq!(done, 100);
459        assert_eq!(total, 150);
460
461        tracker.register_new_xorb(y, 50);
462        tracker.register_xorb_upload_completion(y);
463
464        let (done, total) = tracker.status();
465        assert_eq!(done, 150);
466        assert_eq!(total, 150);
467        assert!(tracker.is_complete());
468
469        tracker.assert_complete();
470        group.assert_complete();
471    }
472
473    #[test]
474    fn test_multiple_files_one_shared_xorb() {
475        let group = GroupProgress::new();
476        let tracker = CompletionTracker::new(group.clone());
477
478        let updater_a = group.new_item(UniqueID::new(), "fileA");
479        let file_a = tracker.register_new_file(updater_a, Some(200));
480
481        let updater_b = group.new_item(UniqueID::new(), "fileB");
482        let file_b = tracker.register_new_file(updater_b, Some(300));
483
484        let (done, total) = tracker.status();
485        assert_eq!(done, 0);
486        assert_eq!(total, 500);
487
488        let xhash = MerkleHash::random_from_seed(1);
489
490        tracker.register_new_xorb(xhash, 1000);
491
492        tracker.register_dependencies(&[
493            FileXorbDependency {
494                file_id: file_a,
495                xorb_hash: xhash,
496                n_bytes: 100,
497                is_external: false,
498            },
499            FileXorbDependency {
500                file_id: file_b,
501                xorb_hash: xhash,
502                n_bytes: 200,
503                is_external: true,
504            },
505        ]);
506
507        let (done, total) = tracker.status();
508        assert_eq!(done, 200);
509        assert_eq!(total, 500);
510        assert!(!tracker.is_complete());
511
512        tracker.register_xorb_upload_completion(xhash);
513
514        let (done, total) = tracker.status();
515        assert_eq!(done, 300);
516        assert_eq!(total, 500);
517
518        let x2 = MerkleHash::random_from_seed(2);
519
520        tracker.register_new_xorb(x2, 1000);
521
522        tracker.register_dependencies(&[FileXorbDependency {
523            file_id: file_a,
524            xorb_hash: x2,
525            n_bytes: 100,
526            is_external: true,
527        }]);
528
529        let (done, total) = tracker.status();
530        assert_eq!(done, 400);
531        assert_eq!(total, 500);
532
533        tracker.register_dependencies(&[FileXorbDependency {
534            file_id: file_b,
535            xorb_hash: x2,
536            n_bytes: 100,
537            is_external: false,
538        }]);
539
540        let (done, total) = tracker.status();
541        assert_eq!(done, 400);
542        assert_eq!(total, 500);
543        assert!(!tracker.is_complete());
544
545        tracker.register_xorb_upload_completion(x2);
546        let (done, total) = tracker.status();
547        assert_eq!(done, 500);
548        assert_eq!(total, 500);
549        assert!(tracker.is_complete());
550
551        tracker.assert_complete();
552        group.assert_complete();
553    }
554
555    #[test]
556    fn test_single_file_multiple_xorbs() {
557        let group = GroupProgress::new();
558        let tracker = CompletionTracker::new(group.clone());
559
560        let updater = group.new_item(UniqueID::new(), "bigFile");
561        let f = tracker.register_new_file(updater, Some(300));
562
563        let x1 = MerkleHash::random_from_seed(1);
564        let x2 = MerkleHash::random_from_seed(2);
565        let x3 = MerkleHash::random_from_seed(3);
566
567        tracker.register_new_xorb(x1, 100);
568        tracker.register_new_xorb(x3, 100);
569
570        tracker.register_dependencies(&[
571            FileXorbDependency {
572                file_id: f,
573                xorb_hash: x1,
574                n_bytes: 100,
575                is_external: false,
576            },
577            FileXorbDependency {
578                file_id: f,
579                xorb_hash: x2,
580                n_bytes: 100,
581                is_external: true,
582            },
583            FileXorbDependency {
584                file_id: f,
585                xorb_hash: x3,
586                n_bytes: 100,
587                is_external: false,
588            },
589        ]);
590
591        let (done, total) = tracker.status();
592        assert_eq!(done, 100);
593        assert_eq!(total, 300);
594        assert!(!tracker.is_complete());
595
596        tracker.register_xorb_upload_completion(x1);
597        let (done, total) = tracker.status();
598        assert_eq!(done, 200);
599        assert_eq!(total, 300);
600        assert!(!tracker.is_complete());
601
602        tracker.register_xorb_upload_completion(x3);
603        let (done, total) = tracker.status();
604        assert_eq!(done, 300);
605        assert_eq!(total, 300);
606        assert!(tracker.is_complete());
607
608        tracker.assert_complete();
609        group.assert_complete();
610    }
611
612    #[test]
613    fn test_xorb_completed_before_dependencies() {
614        let group = GroupProgress::new();
615        let tracker = CompletionTracker::new(group.clone());
616
617        let updater = group.new_item(UniqueID::new(), "lateFile");
618        let file_id = tracker.register_new_file(updater, Some(50));
619
620        let x = MerkleHash::random_from_seed(999);
621        tracker.register_new_xorb(x, 1000);
622
623        tracker.register_xorb_upload_completion(x);
624
625        tracker.register_dependencies(&[FileXorbDependency {
626            file_id,
627            xorb_hash: x,
628            n_bytes: 50,
629            is_external: false,
630        }]);
631
632        let (done, total) = tracker.status();
633        assert_eq!(done, 50);
634        assert_eq!(total, 50);
635        assert!(tracker.is_complete());
636
637        tracker.assert_complete();
638        group.assert_complete();
639    }
640
641    #[test]
642    fn test_contradictory_logic_with_completed_xorb() {
643        let group = GroupProgress::new();
644        let tracker = CompletionTracker::new(group.clone());
645
646        let updater = group.new_item(UniqueID::new(), "someFile");
647        let file_id = tracker.register_new_file(updater, Some(100));
648        let x = MerkleHash::random_from_seed(123);
649
650        tracker.register_new_xorb(x, 1000);
651
652        tracker.register_xorb_upload_completion(x);
653
654        tracker.register_dependencies(&[FileXorbDependency {
655            file_id,
656            xorb_hash: x,
657            n_bytes: 100,
658            is_external: false,
659        }]);
660
661        let (done, total) = tracker.status();
662        assert_eq!(done, 100);
663        assert_eq!(total, 100);
664        assert!(tracker.is_complete());
665
666        tracker.assert_complete();
667        group.assert_complete();
668    }
669
670    #[test]
671    fn test_increment_file_size_basic() {
672        let group = GroupProgress::new();
673        let tracker = CompletionTracker::new(group.clone());
674
675        let updater = group.new_item(UniqueID::new(), "growingFile");
676        let file_id = tracker.register_new_file(updater, None);
677
678        let (done, total) = tracker.status();
679        assert_eq!(done, 0);
680        assert_eq!(total, 0);
681
682        tracker.increment_file_size(file_id, 100);
683        let (done, total) = tracker.status();
684        assert_eq!(done, 0);
685        assert_eq!(total, 100);
686
687        tracker.increment_file_size(file_id, 150);
688        let (done, total) = tracker.status();
689        assert_eq!(done, 0);
690        assert_eq!(total, 250);
691
692        tracker.increment_file_size(file_id, 50);
693        let (done, total) = tracker.status();
694        assert_eq!(done, 0);
695        assert_eq!(total, 300);
696
697        let x = MerkleHash::random_from_seed(1);
698        tracker.register_dependencies(&[FileXorbDependency {
699            file_id,
700            xorb_hash: x,
701            n_bytes: 300,
702            is_external: true,
703        }]);
704
705        let (done, total) = tracker.status();
706        assert_eq!(done, 300);
707        assert_eq!(total, 300);
708        assert!(tracker.is_complete());
709
710        tracker.assert_complete();
711        group.assert_complete();
712    }
713
714    #[test]
715    fn test_increment_file_size_with_xorb_uploads() {
716        let group = GroupProgress::new();
717        let tracker = CompletionTracker::new(group.clone());
718
719        let updater = group.new_item(UniqueID::new(), "streamFile");
720        let file_id = tracker.register_new_file(updater, None);
721
722        let x1 = MerkleHash::random_from_seed(10);
723        let x2 = MerkleHash::random_from_seed(20);
724
725        tracker.register_new_xorb(x1, 500);
726        tracker.register_new_xorb(x2, 500);
727
728        tracker.increment_file_size(file_id, 200);
729        tracker.register_dependencies(&[FileXorbDependency {
730            file_id,
731            xorb_hash: x1,
732            n_bytes: 200,
733            is_external: false,
734        }]);
735
736        let (done, total) = tracker.status();
737        assert_eq!(done, 0);
738        assert_eq!(total, 200);
739
740        tracker.register_xorb_upload_completion(x1);
741        let (done, total) = tracker.status();
742        assert_eq!(done, 200);
743        assert_eq!(total, 200);
744
745        tracker.increment_file_size(file_id, 300);
746        tracker.register_dependencies(&[FileXorbDependency {
747            file_id,
748            xorb_hash: x2,
749            n_bytes: 300,
750            is_external: false,
751        }]);
752
753        let (done, total) = tracker.status();
754        assert_eq!(done, 200);
755        assert_eq!(total, 500);
756
757        tracker.register_xorb_upload_completion(x2);
758        let (done, total) = tracker.status();
759        assert_eq!(done, 500);
760        assert_eq!(total, 500);
761        assert!(tracker.is_complete());
762
763        tracker.assert_complete();
764        group.assert_complete();
765    }
766
767    #[test]
768    fn test_increment_file_size_mixed_known_unknown() {
769        let group = GroupProgress::new();
770        let tracker = CompletionTracker::new(group.clone());
771
772        let updater_a = group.new_item(UniqueID::new(), "fileA");
773        let file_a = tracker.register_new_file(updater_a, Some(100));
774
775        let updater_b = group.new_item(UniqueID::new(), "fileB");
776        let file_b = tracker.register_new_file(updater_b, None);
777
778        let (done, total) = tracker.status();
779        assert_eq!(done, 0);
780        assert_eq!(total, 100);
781
782        let xa = MerkleHash::random_from_seed(1);
783        tracker.register_dependencies(&[FileXorbDependency {
784            file_id: file_a,
785            xorb_hash: xa,
786            n_bytes: 100,
787            is_external: true,
788        }]);
789
790        let (done, total) = tracker.status();
791        assert_eq!(done, 100);
792        assert_eq!(total, 100);
793
794        tracker.increment_file_size(file_b, 200);
795        let (done, total) = tracker.status();
796        assert_eq!(done, 100);
797        assert_eq!(total, 300);
798
799        let xb = MerkleHash::random_from_seed(2);
800        tracker.register_new_xorb(xb, 200);
801        tracker.register_dependencies(&[FileXorbDependency {
802            file_id: file_b,
803            xorb_hash: xb,
804            n_bytes: 200,
805            is_external: false,
806        }]);
807
808        tracker.register_xorb_upload_completion(xb);
809
810        let (done, total) = tracker.status();
811        assert_eq!(done, 300);
812        assert_eq!(total, 300);
813        assert!(tracker.is_complete());
814
815        tracker.assert_complete();
816        group.assert_complete();
817    }
818
819    #[test]
820    fn test_increment_file_size_ignored_when_already_final() {
821        let group = GroupProgress::new();
822        let tracker = CompletionTracker::new(group.clone());
823
824        let updater = group.new_item(UniqueID::new(), "fixedFile");
825        let file_id = tracker.register_new_file(updater, Some(100));
826
827        tracker.increment_file_size(file_id, 999);
828        let (_, total) = tracker.status();
829        assert_eq!(total, 100);
830
831        let x = MerkleHash::random_from_seed(1);
832        tracker.register_dependencies(&[FileXorbDependency {
833            file_id,
834            xorb_hash: x,
835            n_bytes: 100,
836            is_external: true,
837        }]);
838
839        assert!(tracker.is_complete());
840        tracker.assert_complete();
841        group.assert_complete();
842    }
843
844    #[test]
845    fn test_increment_file_size_with_partial_xorb_progress() {
846        let group = GroupProgress::new();
847        let tracker = CompletionTracker::new(group.clone());
848
849        let updater = group.new_item(UniqueID::new(), "partialFile");
850        let file_id = tracker.register_new_file(updater, None);
851
852        let x = MerkleHash::random_from_seed(42);
853        tracker.register_new_xorb(x, 1000);
854
855        tracker.increment_file_size(file_id, 400);
856        tracker.register_dependencies(&[FileXorbDependency {
857            file_id,
858            xorb_hash: x,
859            n_bytes: 400,
860            is_external: false,
861        }]);
862
863        tracker.register_xorb_upload_progress(x, 500);
864        let (done, total) = tracker.status();
865        assert_eq!(total, 400);
866        assert!(done > 0);
867        assert!(done < 400);
868
869        tracker.increment_file_size(file_id, 200);
870        let (_, total) = tracker.status();
871        assert_eq!(total, 600);
872
873        tracker.register_dependencies(&[FileXorbDependency {
874            file_id,
875            xorb_hash: MerkleHash::random_from_seed(99),
876            n_bytes: 200,
877            is_external: true,
878        }]);
879
880        tracker.register_xorb_upload_completion(x);
881
882        let (done, total) = tracker.status();
883        assert_eq!(done, 600);
884        assert_eq!(total, 600);
885        assert!(tracker.is_complete());
886
887        tracker.assert_complete();
888        group.assert_complete();
889    }
890}