xet-data 1.5.2

Data processing pipeline for chunking, deduplication, and file reconstruction; used in the Hugging Face Xet client tools. Intended to be used through the API in the hf-xet package.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
use std::time::Duration;

use tokio::time::Instant;
use xet_core_structures::ExpWeightedMovingAvg;

pub(crate) const DEFAULT_SPEED_HALF_LIFE: Duration = Duration::from_secs(10);
pub(crate) const DEFAULT_MIN_OBSERVATIONS_FOR_RATE: u32 = 4;

/// Tracks smoothed byte-rate using an exponentially-weighted moving average.
///
/// On each [`update`](Self::update) call the tracker computes the byte deltas
/// since the last call and feeds `(delta_bytes, elapsed_secs)` into the EWMA
/// via [`update_with_weight`](ExpWeightedMovingAvg::update_with_weight).
/// The resulting `value() = Σ(decayed bytes) / Σ(decayed time)` is a smoothed
/// bytes-per-second rate where older observations decay with the configured
/// half-life.
///
/// Two independent channels are tracked: *bytes* (logical/decompressed) and
/// *transfer bytes* (network/compressed).
///
/// The first observation's elapsed time is clamped to at least the half-life
/// so the rate starts conservatively low and converges upward. Rates are not
/// reported until at least `min_observations_for_rate` observations have been
/// recorded (default [`DEFAULT_MIN_OBSERVATIONS_FOR_RATE`]).
pub(crate) struct SpeedTracker {
    bytes_rate: ExpWeightedMovingAvg,
    transfer_rate: ExpWeightedMovingAvg,
    last_bytes_completed: u64,
    last_transfer_bytes_completed: u64,
    last_report_time: Instant,
    observation_count: u32,
    min_initial_interval_secs: f64,
    min_observations_for_rate: u32,
}

impl SpeedTracker {
    pub fn new(half_life: Duration) -> Self {
        Self {
            bytes_rate: ExpWeightedMovingAvg::new_time_decay(half_life),
            transfer_rate: ExpWeightedMovingAvg::new_time_decay(half_life),
            last_bytes_completed: 0,
            last_transfer_bytes_completed: 0,
            last_report_time: Instant::now(),
            observation_count: 0,
            min_initial_interval_secs: half_life.as_secs_f64(),
            min_observations_for_rate: DEFAULT_MIN_OBSERVATIONS_FOR_RATE,
        }
    }

    pub fn with_min_observations(mut self, n: u32) -> Self {
        self.min_observations_for_rate = n;
        self
    }

    /// Feed current cumulative byte counts. Computes deltas from the
    /// previously seen values and the elapsed wall-clock time, then updates
    /// both EWMA channels.
    ///
    /// On the first observation the elapsed time is clamped to at least the
    /// half-life so the rate starts near zero and converges upward, avoiding
    /// wild initial spikes. If elapsed time is zero, this call is a no-op.
    pub fn update(&mut self, bytes_completed: u64, transfer_bytes_completed: u64) {
        let now = Instant::now();
        let mut elapsed = (now - self.last_report_time).as_secs_f64();

        if elapsed > 0.0 {
            if self.observation_count == 0 {
                elapsed = elapsed.max(self.min_initial_interval_secs);
            }

            let bytes_delta = bytes_completed.saturating_sub(self.last_bytes_completed);
            let transfer_delta = transfer_bytes_completed.saturating_sub(self.last_transfer_bytes_completed);

            self.bytes_rate.update_with_weight(bytes_delta as f64, elapsed);
            self.transfer_rate.update_with_weight(transfer_delta as f64, elapsed);

            self.last_bytes_completed = bytes_completed;
            self.last_transfer_bytes_completed = transfer_bytes_completed;
            self.last_report_time = now;
            self.observation_count = self.observation_count.saturating_add(1);
        }
    }

    /// Current smoothed rates in bytes/sec. Returns `(bytes_rate, transfer_rate)`.
    /// Both are `None` until at least `min_observations_for_rate` observations
    /// with nonzero elapsed time have been recorded.
    pub fn rates(&self) -> (Option<f64>, Option<f64>) {
        if self.observation_count >= self.min_observations_for_rate {
            (Some(self.bytes_rate.value()), Some(self.transfer_rate.value()))
        } else {
            (None, None)
        }
    }
}

#[cfg(test)]
mod tests {
    use more_asserts::{assert_ge, assert_le, assert_lt};
    use tokio::time::{Duration, advance, pause};

    use super::*;

    const HALF_LIFE: Duration = Duration::from_secs(10);
    const TICK: Duration = Duration::from_millis(200);

    fn bytes_rate(tracker: &SpeedTracker) -> Option<f64> {
        tracker.rates().0
    }

    fn transfer_rate(tracker: &SpeedTracker) -> Option<f64> {
        tracker.rates().1
    }

    // ── Basic behaviour ────────────────────────────────────────────

    #[tokio::test]
    async fn no_rate_before_any_observation() {
        pause();
        let tracker = SpeedTracker::new(HALF_LIFE);
        let (br, tr) = tracker.rates();
        assert!(br.is_none());
        assert!(tr.is_none());
    }

    #[tokio::test]
    async fn rates_none_until_min_observations() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE);
        let bytes_per_tick = 2_000u64;
        let mut total = 0u64;

        for _ in 0..DEFAULT_MIN_OBSERVATIONS_FOR_RATE {
            assert!(bytes_rate(&tracker).is_none());
            advance(TICK).await;
            total += bytes_per_tick;
            tracker.update(total, 0);
        }

        assert!(bytes_rate(&tracker).is_some());
    }

    #[tokio::test]
    async fn configurable_min_observations() {
        pause();
        let min_obs = 8u32;
        let mut tracker = SpeedTracker::new(HALF_LIFE).with_min_observations(min_obs);
        let bytes_per_tick = 2_000u64;
        let mut total = 0u64;

        for _ in 0..min_obs - 1 {
            advance(TICK).await;
            total += bytes_per_tick;
            tracker.update(total, 0);
        }
        assert!(bytes_rate(&tracker).is_none());

        advance(TICK).await;
        total += bytes_per_tick;
        tracker.update(total, 0);
        assert!(bytes_rate(&tracker).is_some());
    }

    #[tokio::test]
    async fn constant_rate_converges() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE);

        let rate = 10_000.0;
        let bytes_per_tick = (rate * TICK.as_secs_f64()) as u64;

        let mut total = 0u64;
        for _ in 0..500 {
            advance(TICK).await;
            total += bytes_per_tick;
            tracker.update(total, 0);
        }

        let measured = bytes_rate(&tracker).unwrap();
        assert!((measured - rate).abs() / rate < 0.01);
    }

    #[tokio::test]
    async fn two_channels_tracked_independently() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE);

        let bytes_target = 20_000.0;
        let transfer_target = 5_000.0;
        let bytes_per_tick = (bytes_target * TICK.as_secs_f64()) as u64;
        let transfer_per_tick = (transfer_target * TICK.as_secs_f64()) as u64;

        let mut total_bytes = 0u64;
        let mut total_transfer = 0u64;

        for _ in 0..250 {
            advance(TICK).await;
            total_bytes += bytes_per_tick;
            total_transfer += transfer_per_tick;
            tracker.update(total_bytes, total_transfer);
        }

        let br = bytes_rate(&tracker).unwrap();
        let tr = transfer_rate(&tracker).unwrap();
        assert!((br - bytes_target).abs() / bytes_target < 0.05);
        assert!((tr - transfer_target).abs() / transfer_target < 0.05);
    }

    // ── Warm-up / initial rate ─────────────────────────────────────

    #[tokio::test]
    async fn initial_rate_ramps_up_smoothly() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE).with_min_observations(1);

        let rate = 10_000.0;
        let bytes_per_tick = (rate * TICK.as_secs_f64()) as u64;
        let mut total = 0u64;
        let mut prev_rate = 0.0;

        for i in 0..250 {
            advance(TICK).await;
            total += bytes_per_tick;
            tracker.update(total, 0);

            let r = bytes_rate(&tracker).unwrap();

            if i == 0 {
                assert_lt!(r, rate * 0.20);
            }

            if i > 0 {
                assert_ge!(r, prev_rate * 0.99);
            }

            prev_rate = r;
        }

        assert!((prev_rate - rate).abs() / rate < 0.05);
    }

    #[tokio::test]
    async fn no_initial_spike() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE).with_min_observations(1);

        advance(TICK).await;
        tracker.update(50_000, 0);

        let r = bytes_rate(&tracker).unwrap();
        let max_expected = 50_000.0 / HALF_LIFE.as_secs_f64();
        assert_le!(r, max_expected * 1.01);
    }

    // ── Smoothing / stability ──────────────────────────────────────

    #[tokio::test]
    async fn burst_then_silence_smooths_gradually() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE).with_min_observations(1);

        advance(TICK).await;
        tracker.update(100_000, 0);
        let peak = bytes_rate(&tracker).unwrap();

        let mut prev = peak;
        for _ in 0..10 {
            advance(TICK).await;
            tracker.update(100_000, 0);
            let r = bytes_rate(&tracker).unwrap();
            assert_le!(r, prev);
            prev = r;
        }

        assert_lt!(prev, peak);
        assert!(prev > 0.0);
    }

    #[tokio::test]
    async fn rate_stays_stable_under_uniform_feed() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE);

        let rate = 50_000.0;
        let bytes_per_tick = (rate * TICK.as_secs_f64()) as u64;
        let mut total = 0u64;

        for _ in 0..500 {
            advance(TICK).await;
            total += bytes_per_tick;
            tracker.update(total, 0);
        }

        for _ in 0..50 {
            advance(TICK).await;
            total += bytes_per_tick;
            tracker.update(total, 0);
            let r = bytes_rate(&tracker).unwrap();
            assert!((r - rate).abs() / rate < 0.01);
        }
    }

    #[tokio::test]
    async fn speed_change_adapts() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE);

        let slow = 1_000.0;
        let fast = 10_000.0;
        let slow_per_tick = (slow * TICK.as_secs_f64()) as u64;
        let fast_per_tick = (fast * TICK.as_secs_f64()) as u64;
        let mut total = 0u64;

        for _ in 0..300 {
            advance(TICK).await;
            total += slow_per_tick;
            tracker.update(total, 0);
        }
        let before = bytes_rate(&tracker).unwrap();
        assert!((before - slow).abs() / slow < 0.05);

        for _ in 0..250 {
            advance(TICK).await;
            total += fast_per_tick;
            tracker.update(total, 0);
        }
        let after = bytes_rate(&tracker).unwrap();
        assert!((after - fast).abs() / fast < 0.05);
    }

    // ── Decay / half-life ──────────────────────────────────────────

    #[tokio::test]
    async fn stall_decays_rate_toward_zero() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE);

        let rate = 20_000.0;
        let bytes_per_tick = (rate * TICK.as_secs_f64()) as u64;
        let mut total = 0u64;

        for _ in 0..200 {
            advance(TICK).await;
            total += bytes_per_tick;
            tracker.update(total, 0);
        }
        let active_rate = bytes_rate(&tracker).unwrap();
        assert!(active_rate > rate * 0.5);

        for _ in 0..150 {
            advance(TICK).await;
            tracker.update(total, 0);
        }
        let stalled = bytes_rate(&tracker).unwrap();
        assert_lt!(stalled, active_rate * 0.15);
    }

    #[tokio::test]
    async fn shorter_half_life_decays_faster() {
        pause();
        let mut fast_tracker = SpeedTracker::new(Duration::from_secs(2));
        let mut slow_tracker = SpeedTracker::new(Duration::from_secs(20));

        let bytes_per_tick = 2_000u64;
        let mut total = 0u64;

        for _ in 0..200 {
            advance(TICK).await;
            total += bytes_per_tick;
            fast_tracker.update(total, 0);
            slow_tracker.update(total, 0);
        }

        for _ in 0..25 {
            advance(TICK).await;
            fast_tracker.update(total, 0);
            slow_tracker.update(total, 0);
        }

        let fast_rate = bytes_rate(&fast_tracker).unwrap();
        let slow_rate = bytes_rate(&slow_tracker).unwrap();
        assert_lt!(fast_rate, slow_rate);
    }

    // ── Smoothness metric ──────────────────────────────────────────

    #[tokio::test]
    async fn jitter_in_arrivals_smoothed_out() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE);

        let target_rate = 10_000.0;
        let avg_bytes_per_tick = (target_rate * TICK.as_secs_f64()) as u64;
        let mut total = 0u64;

        let mut rates = Vec::new();

        for i in 0..300 {
            advance(TICK).await;
            if i % 2 == 0 {
                total += avg_bytes_per_tick * 2;
            }
            tracker.update(total, 0);

            if i >= 200 {
                rates.push(bytes_rate(&tracker).unwrap());
            }
        }

        let mean: f64 = rates.iter().sum::<f64>() / rates.len() as f64;

        assert!((mean - target_rate).abs() / target_rate < 0.05);

        let variance: f64 = rates.iter().map(|r| (r - mean).powi(2)).sum::<f64>() / rates.len() as f64;
        let cv = variance.sqrt() / mean;
        assert_lt!(cv, 0.05);
    }

    // ── Edge cases ─────────────────────────────────────────────────

    #[tokio::test]
    async fn zero_elapsed_update_is_noop() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE);
        tracker.update(1000, 500);
        assert!(bytes_rate(&tracker).is_none());
    }

    #[tokio::test]
    async fn resume_after_long_stall_picks_up_new_rate() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE);

        let bytes_per_tick = 2_000u64;
        let mut total = 0u64;

        for _ in 0..300 {
            advance(TICK).await;
            total += bytes_per_tick;
            tracker.update(total, 0);
        }

        for _ in 0..500 {
            advance(TICK).await;
            tracker.update(total, 0);
        }

        let stalled = bytes_rate(&tracker).unwrap();
        assert_lt!(stalled, 100.0);

        let slow_per_tick = 1_000u64;
        for _ in 0..250 {
            advance(TICK).await;
            total += slow_per_tick;
            tracker.update(total, 0);
        }

        let r = bytes_rate(&tracker).unwrap();
        let expected = slow_per_tick as f64 / TICK.as_secs_f64();
        assert!((r - expected).abs() / expected < 0.05);
    }

    #[tokio::test]
    async fn large_tick_interval_works() {
        pause();
        let mut tracker = SpeedTracker::new(HALF_LIFE).with_min_observations(1);

        advance(Duration::from_secs(15)).await;
        tracker.update(150_000, 75_000);

        let br = bytes_rate(&tracker).unwrap();
        let tr = transfer_rate(&tracker).unwrap();
        assert_ge!(br, 9_900.0);
        assert_le!(br, 10_100.0);
        assert_ge!(tr, 4_900.0);
        assert_le!(tr, 5_100.0);
    }
}