Skip to main content

snapdir_stores/
transfer.rs

1//! Transfer configuration, rate limiting, and bounded-concurrency driver.
2//!
3//! This module is the foundation for concurrent object transfers and bandwidth
4//! limiting. It provides:
5//!
6//! - [`TransferConfig`] — how many objects to transfer in parallel and an
7//!   optional aggregate byte-rate cap.
8//! - [`RateLimiter`] — a zero-dependency async token bucket built on
9//!   [`tokio::time`], shareable across tasks via [`Arc`].
10//! - [`run_concurrent`] — a generic bounded-concurrency driver that runs up to
11//!   `concurrency` async operations in flight and returns the first error.
12//!
13//! Nothing here changes the existing (sequential) push / fetch loops yet; the
14//! stores merely carry a [`TransferConfig`] so later gates can wire these
15//! primitives into their transfer loops.
16
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::time::Duration;
20
21use futures::stream::{self, StreamExt, TryStreamExt};
22use snapdir_core::store::StoreError;
23use tokio::sync::Mutex;
24
25use crate::adaptive::{AdaptiveGate, OpResult};
26
27/// Upper bound on the auto-detected default concurrency.
28const DEFAULT_CONCURRENCY_CAP: usize = 16;
29
30/// Whether (and how) a transfer adaptively tunes its concurrency / byte-rate.
31///
32/// This is the **config-level** policy carried by [`TransferConfig`] (distinct
33/// from [`crate::adaptive::AdaptivePolicy`], which is the controller's
34/// always-on tuning view). `Off` — the default — selects the historical fixed
35/// `concurrency` + fixed `max_bytes_per_sec` path, byte-for-byte unchanged.
36/// `On` selects the adaptive path, which sizes the in-flight window to
37/// `ceiling` and lets a live controller drive the effective concurrency in
38/// `[1, ceiling]` and the byte-rate from in-band per-op feedback. Adaptive
39/// **only** changes scheduling/rate: the exact bytes/objects transferred and
40/// the resulting snapshot are identical to the `Off` path.
41#[derive(Debug, Clone, Copy, PartialEq, Default)]
42pub enum AdaptivePolicy {
43    /// Fixed concurrency + fixed rate (the default; historical behavior).
44    #[default]
45    Off,
46    /// Adaptive concurrency + rate, bounded by `ceiling`, aiming for
47    /// `fraction × discovered-knee`.
48    On {
49        /// Target operating fraction of the discovered knee (clamped to
50        /// `(0, 1]`; `0.8` is the usual default).
51        fraction: f64,
52        /// Absolute concurrency ceiling; the effective limit never exceeds it.
53        ceiling: usize,
54    },
55}
56
57/// Configuration for object transfers: how many to run in parallel, an optional
58/// aggregate byte-rate cap, and whether to tune those adaptively.
59///
60/// `Default` auto-detects the available parallelism (capped at
61/// [`DEFAULT_CONCURRENCY_CAP`]), leaves bandwidth unlimited, and disables
62/// adaptive tuning ([`AdaptivePolicy::Off`]).
63#[derive(Debug, Clone)]
64pub struct TransferConfig {
65    /// Maximum number of object transfers to run concurrently. In the adaptive
66    /// (`On`) path this is the slow-start *seed*; the effective in-flight
67    /// window is sized to the policy ceiling and gated to the live limit.
68    pub concurrency: NonZeroUsize,
69    /// Optional aggregate bandwidth cap, in bytes per second. `None` means
70    /// unlimited. In the adaptive path this is the rate *cap* (`max_rate`); the
71    /// controller may target a lower live rate.
72    pub max_bytes_per_sec: Option<u64>,
73    /// Whether to tune concurrency / rate adaptively. [`AdaptivePolicy::Off`]
74    /// (the default) keeps the historical fixed-concurrency path byte-for-byte.
75    pub adaptive: AdaptivePolicy,
76}
77
78impl TransferConfig {
79    /// Builds a non-adaptive config, clamping `concurrency` to at least 1.
80    ///
81    /// The adaptive policy defaults to [`AdaptivePolicy::Off`], so existing
82    /// callers behave exactly as before. Use
83    /// [`with_adaptive`](Self::with_adaptive) to opt in.
84    #[must_use]
85    pub fn new(concurrency: usize, max_bytes_per_sec: Option<u64>) -> Self {
86        Self {
87            concurrency: NonZeroUsize::new(concurrency.max(1)).unwrap_or(NonZeroUsize::MIN),
88            max_bytes_per_sec,
89            adaptive: AdaptivePolicy::Off,
90        }
91    }
92
93    /// Returns this config with its adaptive policy set to `policy` (builder
94    /// style). `Off` is byte-identical to a plain [`new`](Self::new) config.
95    #[must_use]
96    pub fn with_adaptive(mut self, policy: AdaptivePolicy) -> Self {
97        self.adaptive = policy;
98        self
99    }
100}
101
102impl Default for TransferConfig {
103    fn default() -> Self {
104        let detected = std::thread::available_parallelism()
105            .map_or(1, NonZeroUsize::get)
106            .clamp(1, DEFAULT_CONCURRENCY_CAP);
107        Self {
108            // `detected` is >= 1, so the NonZeroUsize is always Some.
109            concurrency: NonZeroUsize::new(detected).unwrap_or(NonZeroUsize::MIN),
110            max_bytes_per_sec: None,
111            adaptive: AdaptivePolicy::Off,
112        }
113    }
114}
115
116/// Classifies a [`StoreError`] for the adaptive controller's congestion signal.
117///
118/// Returns [`OpResult::Throttle`] for clearly *transient / backpressure*
119/// failures the controller should back off on (HTTP 429 / `SlowDown` / 503 /
120/// `RESOURCE_EXHAUSTED`, request timeouts, connection reset/closed, and the
121/// local-FS backpressure errno class — `WouldBlock`, EMFILE "too many open
122/// files", and a full disk). Everything else — `NotFound`, `Integrity`,
123/// `Parse`, and ordinary I/O / backend errors — is [`OpResult::HardErr`].
124///
125/// This is **conservative**: anything not clearly transient defaults to
126/// `HardErr`, so a real failure never masquerades as throttling. It inspects
127/// `StoreError::Backend`'s message + wrapped source string (the SDKs surface
128/// their status that way) and `StoreError::Io`'s [`std::io::ErrorKind`].
129#[must_use]
130pub fn classify_error(err: &StoreError) -> OpResult {
131    match err {
132        StoreError::Io(io_err) => classify_io_kind(io_err),
133        StoreError::Backend { message, source } => {
134            let mut text = message.to_ascii_lowercase();
135            if let Some(src) = source {
136                text.push(' ');
137                text.push_str(&src.to_string().to_ascii_lowercase());
138            }
139            if text_is_transient(&text) {
140                OpResult::Throttle
141            } else {
142                OpResult::HardErr
143            }
144        }
145        // NotFound / Integrity / Parse are never transient backpressure, and
146        // `StoreError` is `#[non_exhaustive]` so any future variant is — by the
147        // conservative rule — a hard error until proven transient.
148        _ => OpResult::HardErr,
149    }
150}
151
152/// Classifies a local-filesystem [`std::io::Error`] as transient backpressure
153/// vs a hard error. Only the clear backpressure errno classes
154/// (`WouldBlock`, a full filesystem, and EMFILE "too many open files", which
155/// stable Rust still surfaces as `Uncategorized` with that message) are
156/// treated as [`OpResult::Throttle`].
157fn classify_io_kind(err: &std::io::Error) -> OpResult {
158    use std::io::ErrorKind;
159    match err.kind() {
160        ErrorKind::WouldBlock | ErrorKind::StorageFull => OpResult::Throttle,
161        // EMFILE / ENFILE land in the catch-all `Other`/`Uncategorized` kind on
162        // stable Rust; sniff the message for "too many open files".
163        _ => {
164            if err
165                .to_string()
166                .to_ascii_lowercase()
167                .contains("too many open files")
168            {
169                OpResult::Throttle
170            } else {
171                OpResult::HardErr
172            }
173        }
174    }
175}
176
177/// Substring test for transient/backpressure SDK error text (case-folded).
178fn text_is_transient(text: &str) -> bool {
179    const TRANSIENT: &[&str] = &[
180        "slowdown",
181        "slow down",
182        "429",
183        "too many requests",
184        "503",
185        "service unavailable",
186        "serviceunavailable",
187        "resource_exhausted",
188        "resource exhausted",
189        "throttl",
190        "request timeout",
191        "requesttimeout",
192        "timed out",
193        "timeout",
194        "connection reset",
195        "connection closed",
196        "connection refused",
197        "broken pipe",
198        "too many open files",
199    ];
200    TRANSIENT.iter().any(|needle| text.contains(needle))
201}
202
203/// Runs `op` over `items` with the in-flight window sized to `gate.ceiling()`
204/// but the *effective* concurrency gated to the gate's live limit: each item
205/// acquires a [`GatePermit`](crate::adaptive::GatePermit) before `op` runs and
206/// holds it until `op` completes. This is the adaptive sibling of
207/// [`run_concurrent`]: a background tick driver resizes the gate live, so the
208/// number of simultaneously-running ops tracks the controller's limit while the
209/// buffer window stays at the ceiling.
210///
211/// Semantics match [`run_concurrent`] otherwise: completion-independent order,
212/// first-error-wins (remaining in-flight work is cancelled).
213///
214/// # Errors
215///
216/// Returns the first [`StoreError`] produced by any operation.
217pub async fn run_adaptive<I, T, F, Fut>(
218    items: I,
219    gate: &AdaptiveGate,
220    op: F,
221) -> Result<Vec<T>, StoreError>
222where
223    I: IntoIterator,
224    F: Fn(I::Item) -> Fut,
225    Fut: std::future::Future<Output = Result<T, StoreError>>,
226{
227    let window = gate.ceiling().max(1);
228    stream::iter(items)
229        .map(|item| {
230            let op = &op;
231            async move {
232                // Effective concurrency = the gate's current limit (<= ceiling),
233                // even though up to `window` futures are buffered.
234                let _permit = gate.acquire().await;
235                op(item).await
236            }
237        })
238        .buffer_unordered(window)
239        .try_collect()
240        .await
241}
242
243/// Shared token-bucket state, guarded by an async mutex.
244///
245/// The refill `rate`/`capacity` live **inside** the bucket (behind the same
246/// mutex as the running `tokens`) so [`RateLimiter::set_rate`] can retune the
247/// limiter live by relocking and updating them. A `rate` of `0.0` means
248/// "unlimited" — [`acquire`](RateLimiter::acquire) returns immediately.
249#[derive(Debug)]
250struct Bucket {
251    /// Refill rate in bytes per second. `0.0` means unlimited (no throttling).
252    rate: f64,
253    /// Maximum burst capacity, in bytes (~1 second's worth of budget).
254    capacity: f64,
255    /// Currently available tokens (bytes).
256    tokens: f64,
257    /// Last time the bucket was refilled.
258    last_refill: tokio::time::Instant,
259}
260
261/// Inner state of a [`RateLimiter`].
262#[derive(Debug)]
263struct Inner {
264    /// The live bucket state (rate/capacity/tokens). A `rate` of `0.0` models
265    /// the unlimited case.
266    bucket: Mutex<Bucket>,
267}
268
269/// An async token-bucket rate limiter that throttles aggregate transfer
270/// throughput.
271///
272/// Construct with [`RateLimiter::new`]. When `max_bytes_per_sec` is `None` (or
273/// `Some(0)`), the limiter is unlimited and [`acquire`](RateLimiter::acquire)
274/// returns immediately. Otherwise tokens refill at `max_bytes_per_sec` per
275/// second, allowing a burst of up to ~1 second's worth of budget.
276///
277/// The limiter is [`Arc`]-shareable and [`Clone`] (cloning shares the same
278/// underlying bucket).
279#[derive(Debug, Clone)]
280pub struct RateLimiter {
281    inner: Arc<Inner>,
282}
283
284impl RateLimiter {
285    /// Builds a limiter. `None` (or `Some(0)`) yields an unlimited, no-op
286    /// limiter whose [`acquire`](RateLimiter::acquire) never waits.
287    #[must_use]
288    pub fn new(max_bytes_per_sec: Option<u64>) -> Self {
289        #[allow(clippy::cast_precision_loss)]
290        let (rate, capacity, tokens) = match max_bytes_per_sec {
291            Some(r) if r > 0 => {
292                let r = r as f64;
293                (r, r, r)
294            }
295            _ => (0.0, 0.0, 0.0),
296        };
297        Self {
298            inner: Arc::new(Inner {
299                bucket: Mutex::new(Bucket {
300                    rate,
301                    capacity,
302                    tokens,
303                    last_refill: tokio::time::Instant::now(),
304                }),
305            }),
306        }
307    }
308
309    /// Retunes the limiter's aggregate byte-rate cap **live**, so an adaptive
310    /// controller can raise or lower throttling between operations.
311    ///
312    /// - `None` (or `Some(0)`) switches the limiter to **unlimited**: the rate
313    ///   and capacity drop to `0` and the bucket is emptied, so the next
314    ///   [`acquire`](RateLimiter::acquire) is a no-op.
315    /// - `Some(r > 0)` installs (or replaces) a bucket refilling at `r`
316    ///   bytes/sec with ~1 second of burst capacity. Switching from unlimited
317    ///   to limited primes the bucket full (`tokens = capacity`) so a freshly
318    ///   throttled limiter still allows one immediate burst.
319    ///
320    /// Calling `set_rate` is the only way the rate changes after [`new`];
321    /// limiters that never call it behave exactly as before.
322    pub async fn set_rate(&self, bytes_per_sec: Option<u64>) {
323        let mut state = self.inner.bucket.lock().await;
324        let was_unlimited = state.rate <= 0.0;
325        #[allow(clippy::cast_precision_loss)]
326        match bytes_per_sec {
327            Some(r) if r > 0 => {
328                let r = r as f64;
329                state.rate = r;
330                state.capacity = r;
331                // Switching unlimited -> limited: prime a full burst. When
332                // already limited, keep the running token count but clamp it to
333                // the new capacity so a rate drop takes effect promptly.
334                if was_unlimited {
335                    state.tokens = r;
336                } else {
337                    state.tokens = state.tokens.min(r);
338                }
339                state.last_refill = tokio::time::Instant::now();
340            }
341            _ => {
342                // Unlimited: empty the bucket; `acquire` short-circuits on rate==0.
343                state.rate = 0.0;
344                state.capacity = 0.0;
345                state.tokens = 0.0;
346            }
347        }
348    }
349
350    /// Blocks until `n` bytes of budget are available, refilling the bucket at
351    /// the configured rate. Unlimited limiters return immediately.
352    ///
353    /// A single request larger than the bucket capacity is still satisfied: the
354    /// bucket is allowed to go negative and the caller waits out the deficit,
355    /// so throttling is correct even for objects bigger than one second's
356    /// worth of budget.
357    pub async fn acquire(&self, n: u64) {
358        if n == 0 {
359            return;
360        }
361        #[allow(clippy::cast_precision_loss)]
362        let need = n as f64;
363
364        loop {
365            let wait = {
366                let mut state = self.inner.bucket.lock().await;
367                if state.rate <= 0.0 {
368                    return; // unlimited fast path (also covers live set_rate(None))
369                }
370                let now = tokio::time::Instant::now();
371                let elapsed = now.duration_since(state.last_refill).as_secs_f64();
372                state.tokens = (state.tokens + elapsed * state.rate).min(state.capacity);
373                state.last_refill = now;
374
375                if state.tokens >= need {
376                    state.tokens -= need;
377                    return;
378                }
379                // Not enough budget: compute how long until the deficit is
380                // covered, then sleep (releasing the lock first).
381                let deficit = need - state.tokens;
382                deficit / state.rate
383            };
384            tokio::time::sleep(Duration::from_secs_f64(wait)).await;
385        }
386    }
387}
388
389/// Shared token-bucket state for [`BlockingRateLimiter`], guarded by a
390/// **synchronous** [`std::sync::Mutex`] (not tokio's async mutex).
391///
392/// As with [`Bucket`], the refill `rate`/`capacity` live inside the bucket so
393/// [`BlockingRateLimiter::set_rate`] can retune live. `rate == 0.0` means
394/// unlimited.
395#[derive(Debug)]
396struct BlockingBucket {
397    /// Refill rate in bytes per second. `0.0` means unlimited (no throttling).
398    rate: f64,
399    /// Maximum burst capacity, in bytes (~1 second's worth of budget).
400    capacity: f64,
401    /// Currently available tokens (bytes).
402    tokens: f64,
403    /// Last time the bucket was refilled.
404    last_refill: std::time::Instant,
405}
406
407/// Inner state of a [`BlockingRateLimiter`].
408#[derive(Debug)]
409struct BlockingInner {
410    /// The live bucket state (rate/capacity/tokens). A `rate` of `0.0` models
411    /// the unlimited case.
412    bucket: std::sync::Mutex<BlockingBucket>,
413}
414
415/// A **synchronous** token-bucket rate limiter for the store-to-store sync
416/// path.
417///
418/// This is the blocking sibling of [`RateLimiter`]. The
419/// [`StreamStore`](crate::stream::StreamStore) methods are synchronous and
420/// drive their backends' async SDK calls on an internal runtime via `block_on`,
421/// so the store-to-store sync orchestrator parallelizes them across a **rayon**
422/// thread pool of plain OS threads — it cannot use the async [`RateLimiter`]
423/// (awaiting inside a `block_on`-ing rayon worker would nest tokio runtimes).
424/// [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) therefore parks
425/// the calling OS thread with [`std::thread::sleep`] instead of `.await`.
426///
427/// When `max_bytes_per_sec` is `None` (or `Some(0)`), the limiter is unlimited
428/// and [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) returns
429/// immediately. Otherwise tokens refill at `max_bytes_per_sec` per second,
430/// allowing a burst of up to ~1 second's worth of budget. The token math
431/// mirrors [`RateLimiter::acquire`] exactly.
432///
433/// The limiter is [`Arc`]-shareable and [`Clone`] (cloning shares the same
434/// underlying bucket), so every rayon worker throttles against one aggregate
435/// budget.
436#[derive(Debug, Clone)]
437pub struct BlockingRateLimiter {
438    inner: Arc<BlockingInner>,
439}
440
441impl BlockingRateLimiter {
442    /// Builds a synchronous limiter. `None` (or `Some(0)`) yields an unlimited,
443    /// no-op limiter whose
444    /// [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) never waits.
445    #[must_use]
446    pub fn new(max_bytes_per_sec: Option<u64>) -> Self {
447        #[allow(clippy::cast_precision_loss)]
448        let (rate, capacity, tokens) = match max_bytes_per_sec {
449            Some(r) if r > 0 => {
450                let r = r as f64;
451                (r, r, r)
452            }
453            _ => (0.0, 0.0, 0.0),
454        };
455        Self {
456            inner: Arc::new(BlockingInner {
457                bucket: std::sync::Mutex::new(BlockingBucket {
458                    rate,
459                    capacity,
460                    tokens,
461                    last_refill: std::time::Instant::now(),
462                }),
463            }),
464        }
465    }
466
467    /// Retunes the limiter's aggregate byte-rate cap **live** (the synchronous
468    /// sibling of [`RateLimiter::set_rate`]). Same semantics: `None`/`Some(0)`
469    /// switches to unlimited and empties the bucket; `Some(r > 0)` installs a
470    /// bucket refilling at `r` bytes/sec (priming a full burst when switching
471    /// from unlimited).
472    pub fn set_rate(&self, bytes_per_sec: Option<u64>) {
473        let mut state = self
474            .inner
475            .bucket
476            .lock()
477            .unwrap_or_else(std::sync::PoisonError::into_inner);
478        let was_unlimited = state.rate <= 0.0;
479        #[allow(clippy::cast_precision_loss)]
480        match bytes_per_sec {
481            Some(r) if r > 0 => {
482                let r = r as f64;
483                state.rate = r;
484                state.capacity = r;
485                if was_unlimited {
486                    state.tokens = r;
487                } else {
488                    state.tokens = state.tokens.min(r);
489                }
490                state.last_refill = std::time::Instant::now();
491            }
492            _ => {
493                state.rate = 0.0;
494                state.capacity = 0.0;
495                state.tokens = 0.0;
496            }
497        }
498    }
499
500    /// Blocks the calling OS thread until `n` bytes of budget are available,
501    /// refilling the bucket at the configured rate. Unlimited limiters return
502    /// immediately.
503    ///
504    /// A single request larger than the bucket capacity is still satisfied: the
505    /// bucket is allowed to go negative and the caller waits out the deficit,
506    /// so throttling is correct even for objects bigger than one second's worth
507    /// of budget. Mirrors [`RateLimiter::acquire`], but parks the thread with
508    /// [`std::thread::sleep`] instead of awaiting.
509    pub fn acquire_blocking(&self, n: u64) {
510        if n == 0 {
511            return;
512        }
513        #[allow(clippy::cast_precision_loss)]
514        let need = n as f64;
515
516        loop {
517            let wait = {
518                // A poisoned bucket only means a thread panicked mid-acquire;
519                // the token state is still usable, so recover the guard.
520                let mut state = self
521                    .inner
522                    .bucket
523                    .lock()
524                    .unwrap_or_else(std::sync::PoisonError::into_inner);
525                if state.rate <= 0.0 {
526                    return; // unlimited fast path (also covers live set_rate(None))
527                }
528                let now = std::time::Instant::now();
529                let elapsed = now.duration_since(state.last_refill).as_secs_f64();
530                state.tokens = (state.tokens + elapsed * state.rate).min(state.capacity);
531                state.last_refill = now;
532
533                if state.tokens >= need {
534                    state.tokens -= need;
535                    return;
536                }
537                // Not enough budget: compute how long until the deficit is
538                // covered, then sleep (releasing the lock first).
539                let deficit = need - state.tokens;
540                deficit / state.rate
541            };
542            std::thread::sleep(Duration::from_secs_f64(wait));
543        }
544    }
545}
546
547/// Runs `op` over `items` with at most `concurrency` operations in flight,
548/// collecting their results in completion-independent order and returning the
549/// first error encountered (remaining in-flight work is cancelled).
550///
551/// This is the engine later gates use to drive concurrent uploads/downloads.
552///
553/// # Errors
554///
555/// Returns the first [`StoreError`] produced by any operation.
556pub async fn run_concurrent<I, T, F, Fut>(
557    items: I,
558    concurrency: NonZeroUsize,
559    op: F,
560) -> Result<Vec<T>, StoreError>
561where
562    I: IntoIterator,
563    F: Fn(I::Item) -> Fut,
564    Fut: std::future::Future<Output = Result<T, StoreError>>,
565{
566    stream::iter(items)
567        .map(op)
568        .buffer_unordered(concurrency.get())
569        .try_collect()
570        .await
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576    use std::sync::atomic::{AtomicUsize, Ordering};
577
578    /// Builds a current-thread tokio runtime with time enabled, avoiding a
579    /// dependency on the `#[tokio::test]` macro (keeps tokio's feature set
580    /// minimal).
581    fn runtime() -> tokio::runtime::Runtime {
582        tokio::runtime::Builder::new_current_thread()
583            .enable_time()
584            .build()
585            .expect("build tokio runtime")
586    }
587
588    #[test]
589    fn transfer_config_default_caps_concurrency() {
590        let cfg = TransferConfig::default();
591        assert!(cfg.concurrency.get() >= 1, "concurrency must be >= 1");
592        assert!(
593            cfg.concurrency.get() <= DEFAULT_CONCURRENCY_CAP,
594            "default concurrency must be capped at {DEFAULT_CONCURRENCY_CAP}, got {}",
595            cfg.concurrency.get()
596        );
597        assert_eq!(cfg.max_bytes_per_sec, None);
598
599        // The clamping ctor never yields 0.
600        assert_eq!(TransferConfig::new(0, None).concurrency.get(), 1);
601        assert_eq!(TransferConfig::new(7, Some(99)).concurrency.get(), 7);
602        assert_eq!(TransferConfig::new(7, Some(99)).max_bytes_per_sec, Some(99));
603    }
604
605    /// Drives `run_concurrent` over N > concurrency items, recording the peak
606    /// number of simultaneously-running ops, and asserts the bound is exactly
607    /// `min(concurrency, N)` — and strictly 1 (sequential) when concurrency=1.
608    fn max_in_flight_for(concurrency: usize, items: usize) -> usize {
609        let in_flight = Arc::new(AtomicUsize::new(0));
610        let high_water = Arc::new(AtomicUsize::new(0));
611
612        let rt = runtime();
613        let result = rt.block_on(async {
614            let in_flight = Arc::clone(&in_flight);
615            let high_water = Arc::clone(&high_water);
616            run_concurrent(
617                0..items,
618                NonZeroUsize::new(concurrency).unwrap(),
619                move |_item| {
620                    let in_flight = Arc::clone(&in_flight);
621                    let high_water = Arc::clone(&high_water);
622                    async move {
623                        let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
624                        high_water.fetch_max(cur, Ordering::SeqCst);
625                        tokio::time::sleep(Duration::from_millis(20)).await;
626                        in_flight.fetch_sub(1, Ordering::SeqCst);
627                        Ok::<_, StoreError>(())
628                    }
629                },
630            )
631            .await
632        });
633        assert!(result.is_ok());
634        high_water.load(Ordering::SeqCst)
635    }
636
637    #[test]
638    fn transfer_config_run_concurrent_max_in_flight() {
639        // concurrency=4 over 12 items: peak in-flight is exactly 4.
640        assert_eq!(max_in_flight_for(4, 12), 4);
641        // concurrency=1 over 5 items: strictly sequential, peak in-flight is 1.
642        assert_eq!(max_in_flight_for(1, 5), 1);
643        // concurrency greater than item count is bounded by the item count.
644        assert_eq!(max_in_flight_for(8, 3), 3);
645    }
646
647    #[test]
648    fn transfer_config_run_concurrent_propagates_error() {
649        let rt = runtime();
650        let result: Result<Vec<()>, StoreError> = rt.block_on(async {
651            run_concurrent(0..10, NonZeroUsize::new(3).unwrap(), |item| async move {
652                if item == 5 {
653                    Err(StoreError::Backend {
654                        message: "boom".to_owned(),
655                        source: None,
656                    })
657                } else {
658                    tokio::time::sleep(Duration::from_millis(5)).await;
659                    Ok(())
660                }
661            })
662            .await
663        });
664        let err = result.expect_err("must surface the failing op's error");
665        assert!(
666            matches!(err, StoreError::Backend { ref message, .. } if message == "boom"),
667            "unexpected error: {err:?}"
668        );
669    }
670
671    #[test]
672    fn sync_snapshot_blocking_rate_limiter() {
673        use std::time::Instant;
674
675        // Unlimited: acquiring a large amount returns essentially instantly.
676        let unlimited = BlockingRateLimiter::new(None);
677        let start = Instant::now();
678        unlimited.acquire_blocking(1_000_000);
679        assert!(
680            start.elapsed() < Duration::from_millis(200),
681            "unlimited acquire_blocking should not block"
682        );
683        // Some(0) is also unlimited.
684        let zero = BlockingRateLimiter::new(Some(0));
685        let start = Instant::now();
686        zero.acquire_blocking(1_000_000);
687        assert!(
688            start.elapsed() < Duration::from_millis(200),
689            "Some(0) acquire_blocking should not block"
690        );
691
692        // Limited to 1000 bytes/sec. The bucket starts full (1000), so the
693        // first 1000 bytes are free; acquiring another ~1000 bytes (2x the
694        // per-second budget in total) must wait for the deficit to refill —
695        // at least ~1s.
696        let limiter = BlockingRateLimiter::new(Some(1000));
697        let start = Instant::now();
698        limiter.acquire_blocking(1000); // drains the initial burst
699        limiter.acquire_blocking(1000); // must wait ~1s to refill
700        let elapsed = start.elapsed();
701        assert!(
702            elapsed >= Duration::from_millis(900),
703            "throttled acquire_blocking should take ~1s, took {elapsed:?}"
704        );
705    }
706
707    #[test]
708    fn transfer_config_rate_limiter_set_rate_live() {
709        let rt = runtime();
710        rt.block_on(async {
711            // Start unlimited: a huge acquire returns instantly.
712            let limiter = RateLimiter::new(None);
713            let start = tokio::time::Instant::now();
714            limiter.acquire(1_000_000).await;
715            assert!(
716                start.elapsed() < Duration::from_millis(200),
717                "unlimited acquire should not block before set_rate"
718            );
719
720            // Tighten to 1000 B/s live. The bucket is primed full (1000), so the
721            // first 1000 bytes are free; the next 1000 must wait ~1s to refill.
722            limiter.set_rate(Some(1000)).await;
723            let start = tokio::time::Instant::now();
724            limiter.acquire(1000).await; // drains the freshly-primed burst
725            limiter.acquire(1000).await; // must wait ~1s
726            let elapsed = start.elapsed();
727            assert!(
728                elapsed >= Duration::from_millis(900),
729                "after set_rate(Some(1000)) a 2x-budget acquire should take ~1s, took {elapsed:?}"
730            );
731
732            // Raise the cap back to unlimited live: acquires stop waiting again.
733            limiter.set_rate(None).await;
734            let start = tokio::time::Instant::now();
735            limiter.acquire(1_000_000).await;
736            assert!(
737                start.elapsed() < Duration::from_millis(200),
738                "after set_rate(None) acquire should no longer block"
739            );
740        });
741    }
742
743    #[test]
744    fn sync_snapshot_blocking_rate_limiter_set_rate_live() {
745        use std::time::Instant;
746
747        // Start unlimited.
748        let limiter = BlockingRateLimiter::new(None);
749        let start = Instant::now();
750        limiter.acquire_blocking(1_000_000);
751        assert!(
752            start.elapsed() < Duration::from_millis(200),
753            "unlimited acquire_blocking should not block before set_rate"
754        );
755
756        // Tighten live to 1000 B/s.
757        limiter.set_rate(Some(1000));
758        let start = Instant::now();
759        limiter.acquire_blocking(1000); // primed burst
760        limiter.acquire_blocking(1000); // waits ~1s
761        let elapsed = start.elapsed();
762        assert!(
763            elapsed >= Duration::from_millis(900),
764            "after set_rate(Some(1000)) a 2x-budget acquire should take ~1s, took {elapsed:?}"
765        );
766
767        // Back to unlimited live.
768        limiter.set_rate(Some(0));
769        let start = Instant::now();
770        limiter.acquire_blocking(1_000_000);
771        assert!(
772            start.elapsed() < Duration::from_millis(200),
773            "after set_rate(Some(0)) acquire_blocking should no longer block"
774        );
775    }
776
777    #[test]
778    fn classify_error_throttle_vs_hard() {
779        use crate::adaptive::OpResult;
780
781        // Backend errors whose message/source look like backpressure -> Throttle.
782        let transient_msgs = [
783            "S3 PUT object failed: SlowDown",
784            "got HTTP 503 Service Unavailable",
785            "rate limited: 429 Too Many Requests",
786            "RESOURCE_EXHAUSTED quota",
787            "request timeout while uploading",
788            "connection reset by peer",
789            "os error: too many open files",
790        ];
791        for msg in transient_msgs {
792            let err = StoreError::Backend {
793                message: msg.to_owned(),
794                source: None,
795            };
796            assert_eq!(
797                classify_error(&err),
798                OpResult::Throttle,
799                "expected Throttle for {msg:?}"
800            );
801        }
802
803        // Hard errors: NotFound / Integrity / Parse / ordinary backend failures.
804        let not_found = StoreError::ObjectNotFound {
805            checksum: "abc".to_owned(),
806        };
807        assert_eq!(classify_error(&not_found), OpResult::HardErr);
808        let integrity = StoreError::Integrity {
809            address: "x".to_owned(),
810            expected: "a".to_owned(),
811            actual: "b".to_owned(),
812        };
813        assert_eq!(classify_error(&integrity), OpResult::HardErr);
814        let other = StoreError::Backend {
815            message: "permission denied".to_owned(),
816            source: None,
817        };
818        assert_eq!(classify_error(&other), OpResult::HardErr);
819
820        // Local-FS backpressure errnos -> Throttle; a plain NotFound IO -> Hard.
821        let emfile = StoreError::Io(std::io::Error::other("too many open files (os error 24)"));
822        assert_eq!(classify_error(&emfile), OpResult::Throttle);
823        let would_block = StoreError::Io(std::io::Error::from(std::io::ErrorKind::WouldBlock));
824        assert_eq!(classify_error(&would_block), OpResult::Throttle);
825        let io_notfound = StoreError::Io(std::io::Error::from(std::io::ErrorKind::NotFound));
826        assert_eq!(classify_error(&io_notfound), OpResult::HardErr);
827    }
828
829    #[test]
830    fn run_adaptive_respects_gate_limit() {
831        use crate::adaptive::AdaptiveGate;
832        use std::sync::atomic::{AtomicUsize, Ordering};
833
834        let rt = runtime();
835        let gate = AdaptiveGate::new(2, 8);
836        let in_flight = Arc::new(AtomicUsize::new(0));
837        let high = Arc::new(AtomicUsize::new(0));
838        let in_flight2 = Arc::clone(&in_flight);
839        let high2 = Arc::clone(&high);
840
841        let result: Result<Vec<()>, StoreError> = rt.block_on(async move {
842            run_adaptive(0..20, &gate, move |_item| {
843                let in_flight = Arc::clone(&in_flight2);
844                let high = Arc::clone(&high2);
845                async move {
846                    let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
847                    high.fetch_max(cur, Ordering::SeqCst);
848                    tokio::time::sleep(Duration::from_millis(15)).await;
849                    in_flight.fetch_sub(1, Ordering::SeqCst);
850                    Ok(())
851                }
852            })
853            .await
854        });
855        assert!(result.is_ok());
856        // Window is the ceiling (8) but the gate's live limit is 2, so peak
857        // effective concurrency never exceeds 2.
858        assert!(
859            high.load(Ordering::SeqCst) <= 2,
860            "effective concurrency must be gated to the limit, got {}",
861            high.load(Ordering::SeqCst)
862        );
863    }
864
865    #[test]
866    fn transfer_config_rate_limiter() {
867        let rt = runtime();
868        rt.block_on(async {
869            // Unlimited: acquiring a large amount returns essentially instantly.
870            let unlimited = RateLimiter::new(None);
871            let start = tokio::time::Instant::now();
872            unlimited.acquire(1_000_000).await;
873            assert!(
874                start.elapsed() < Duration::from_millis(200),
875                "unlimited acquire should not block"
876            );
877
878            // Limited to 1000 bytes/sec. The bucket starts full (1000), so the
879            // first 1000 bytes are free; acquiring another ~2000 bytes total
880            // must wait for the deficit to refill — at least ~1s.
881            let limiter = RateLimiter::new(Some(1000));
882            let start = tokio::time::Instant::now();
883            limiter.acquire(1000).await; // drains the initial burst
884            limiter.acquire(1000).await; // must wait ~1s to refill
885            let elapsed = start.elapsed();
886            assert!(
887                elapsed >= Duration::from_millis(900),
888                "throttled acquire should take ~1s, took {elapsed:?}"
889            );
890        });
891    }
892}