Skip to main content

snapdir_stores/
transfer.rs

1//! Transfer configuration, rate limiting, and bounded-concurrency driver.
2//!
3//! This module is the foundation for concurrent object transfers and bandwidth
4//! limiting. It provides:
5//!
6//! - [`TransferConfig`] — how many objects to transfer in parallel and an
7//!   optional aggregate byte-rate cap.
8//! - [`RateLimiter`] — a zero-dependency async token bucket built on
9//!   [`tokio::time`], shareable across tasks via [`Arc`].
10//! - [`run_concurrent`] — a generic bounded-concurrency driver that runs up to
11//!   `concurrency` async operations in flight and returns the first error.
12//!
13//! Nothing here changes the existing (sequential) push / fetch loops yet; the
14//! stores merely carry a [`TransferConfig`] so later gates can wire these
15//! primitives into their transfer loops.
16
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::time::Duration;
20
21use futures::stream::{self, StreamExt, TryStreamExt};
22use snapdir_core::store::StoreError;
23use tokio::sync::Mutex;
24
25use crate::adaptive::{AdaptiveGate, OpResult};
26use crate::retry::RetryPolicy;
27
28/// Upper bound on the auto-detected default concurrency.
29const DEFAULT_CONCURRENCY_CAP: usize = 16;
30
31/// Whether (and how) a transfer adaptively tunes its concurrency / byte-rate.
32///
33/// This is the **config-level** policy carried by [`TransferConfig`] (distinct
34/// from [`crate::adaptive::AdaptivePolicy`], which is the controller's
35/// always-on tuning view). `Off` — the default — selects the historical fixed
36/// `concurrency` + fixed `max_bytes_per_sec` path, byte-for-byte unchanged.
37/// `On` selects the adaptive path, which sizes the in-flight window to
38/// `ceiling` and lets a live controller drive the effective concurrency in
39/// `[1, ceiling]` and the byte-rate from in-band per-op feedback. Adaptive
40/// **only** changes scheduling/rate: the exact bytes/objects transferred and
41/// the resulting snapshot are identical to the `Off` path.
42#[derive(Debug, Clone, Copy, PartialEq, Default)]
43pub enum AdaptivePolicy {
44    /// Fixed concurrency + fixed rate (the default; historical behavior).
45    #[default]
46    Off,
47    /// Adaptive concurrency + rate, bounded by `ceiling`, aiming for
48    /// `fraction × discovered-knee`.
49    On {
50        /// Target operating fraction of the discovered knee (clamped to
51        /// `(0, 1]`; `0.8` is the usual default).
52        fraction: f64,
53        /// Absolute concurrency ceiling; the effective limit never exceeds it.
54        ceiling: usize,
55    },
56}
57
58/// Configuration for object transfers: how many to run in parallel, an optional
59/// aggregate byte-rate cap, and whether to tune those adaptively.
60///
61/// `Default` auto-detects the available parallelism (capped at
62/// [`DEFAULT_CONCURRENCY_CAP`]), leaves bandwidth unlimited, and disables
63/// adaptive tuning ([`AdaptivePolicy::Off`]).
64#[derive(Debug, Clone)]
65pub struct TransferConfig {
66    /// Maximum number of object transfers to run concurrently. In the adaptive
67    /// (`On`) path this is the slow-start *seed*; the effective in-flight
68    /// window is sized to the policy ceiling and gated to the live limit.
69    pub concurrency: NonZeroUsize,
70    /// Optional aggregate bandwidth cap, in bytes per second. `None` means
71    /// unlimited. In the adaptive path this is the rate *cap* (`max_rate`); the
72    /// controller may target a lower live rate.
73    pub max_bytes_per_sec: Option<u64>,
74    /// Optional aggregate request-rate cap, in requests per second. `None`
75    /// means unlimited. A later gate wires this into the live store call sites
76    /// (`key_exists` / `get_bytes` / `put_bytes`) by pacing each request through
77    /// a [`RateLimiter`] / [`BlockingRateLimiter`] whose "tokens" are requests
78    /// (`acquire(1)` per request). The per-backend defaults live in
79    /// [`crate::limits::for_scheme`].
80    pub max_requests_per_sec: Option<u64>,
81    /// Whether to tune concurrency / rate adaptively. [`AdaptivePolicy::Off`]
82    /// (the default) keeps the historical fixed-concurrency path byte-for-byte.
83    pub adaptive: AdaptivePolicy,
84    /// The transient-failure retry schedule wrapped around each network SDK
85    /// call (`key_exists` / `get_bytes` / `put_bytes`) via
86    /// [`retry_network`](crate::retry::retry_network). Defaults to
87    /// [`RetryPolicy::default`] (5 attempts, 250ms base, 30s cap); this is the
88    /// CONFIG SEAM the CLI sets later. The per-object integrity-mismatch retry
89    /// in `fetch_verified` is independent of this and unchanged.
90    pub retry: RetryPolicy,
91}
92
93impl TransferConfig {
94    /// Builds a non-adaptive config, clamping `concurrency` to at least 1.
95    ///
96    /// The adaptive policy defaults to [`AdaptivePolicy::Off`], so existing
97    /// callers behave exactly as before. Use
98    /// [`with_adaptive`](Self::with_adaptive) to opt in.
99    #[must_use]
100    pub fn new(concurrency: usize, max_bytes_per_sec: Option<u64>) -> Self {
101        Self {
102            concurrency: NonZeroUsize::new(concurrency.max(1)).unwrap_or(NonZeroUsize::MIN),
103            max_bytes_per_sec,
104            max_requests_per_sec: None,
105            adaptive: AdaptivePolicy::Off,
106            retry: RetryPolicy::default(),
107        }
108    }
109
110    /// Returns this config with its adaptive policy set to `policy` (builder
111    /// style). `Off` is byte-identical to a plain [`new`](Self::new) config.
112    #[must_use]
113    pub fn with_adaptive(mut self, policy: AdaptivePolicy) -> Self {
114        self.adaptive = policy;
115        self
116    }
117
118    /// Returns this config with its network-retry schedule set to `policy`
119    /// (builder style). Defaults to [`RetryPolicy::default`]; the CLI uses this
120    /// to install an operator-configured policy.
121    #[must_use]
122    pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
123        self.retry = policy;
124        self
125    }
126}
127
128impl Default for TransferConfig {
129    fn default() -> Self {
130        let detected = std::thread::available_parallelism()
131            .map_or(1, NonZeroUsize::get)
132            .clamp(1, DEFAULT_CONCURRENCY_CAP);
133        Self {
134            // `detected` is >= 1, so the NonZeroUsize is always Some.
135            concurrency: NonZeroUsize::new(detected).unwrap_or(NonZeroUsize::MIN),
136            max_bytes_per_sec: None,
137            max_requests_per_sec: None,
138            adaptive: AdaptivePolicy::Off,
139            retry: RetryPolicy::default(),
140        }
141    }
142}
143
144/// Classifies a [`StoreError`] for the adaptive controller's congestion signal.
145///
146/// Returns [`OpResult::Throttle`] for clearly *transient / backpressure*
147/// failures the controller should back off on (HTTP 429 / `SlowDown` / 503 /
148/// `RESOURCE_EXHAUSTED`, request timeouts, connection reset/closed, and the
149/// local-FS backpressure errno class — `WouldBlock`, EMFILE "too many open
150/// files", and a full disk). Everything else — `NotFound`, `Integrity`,
151/// `Parse`, and ordinary I/O / backend errors — is [`OpResult::HardErr`].
152///
153/// This is **conservative**: anything not clearly transient defaults to
154/// `HardErr`, so a real failure never masquerades as throttling. It inspects
155/// `StoreError::Backend`'s message + wrapped source string (the SDKs surface
156/// their status that way) and `StoreError::Io`'s [`std::io::ErrorKind`].
157#[must_use]
158pub fn classify_error(err: &StoreError) -> OpResult {
159    match err {
160        StoreError::Io(io_err) => classify_io_kind(io_err),
161        StoreError::Backend { message, source } => {
162            let mut text = message.to_ascii_lowercase();
163            if let Some(src) = source {
164                text.push(' ');
165                text.push_str(&src.to_string().to_ascii_lowercase());
166            }
167            if text_is_transient(&text) {
168                OpResult::Throttle
169            } else {
170                OpResult::HardErr
171            }
172        }
173        // NotFound / Integrity / Parse are never transient backpressure, and
174        // `StoreError` is `#[non_exhaustive]` so any future variant is — by the
175        // conservative rule — a hard error until proven transient.
176        _ => OpResult::HardErr,
177    }
178}
179
180/// Classifies a local-filesystem [`std::io::Error`] as transient backpressure
181/// vs a hard error. Only the clear backpressure errno classes
182/// (`WouldBlock`, a full filesystem, and EMFILE "too many open files", which
183/// stable Rust still surfaces as `Uncategorized` with that message) are
184/// treated as [`OpResult::Throttle`].
185fn classify_io_kind(err: &std::io::Error) -> OpResult {
186    use std::io::ErrorKind;
187    match err.kind() {
188        ErrorKind::WouldBlock | ErrorKind::StorageFull => OpResult::Throttle,
189        // EMFILE / ENFILE land in the catch-all `Other`/`Uncategorized` kind on
190        // stable Rust; sniff the message for "too many open files".
191        _ => {
192            if err
193                .to_string()
194                .to_ascii_lowercase()
195                .contains("too many open files")
196            {
197                OpResult::Throttle
198            } else {
199                OpResult::HardErr
200            }
201        }
202    }
203}
204
205/// Substring test for transient/backpressure SDK error text (case-folded).
206fn text_is_transient(text: &str) -> bool {
207    const TRANSIENT: &[&str] = &[
208        "slowdown",
209        "slow down",
210        "429",
211        "too many requests",
212        "503",
213        "service unavailable",
214        "serviceunavailable",
215        "resource_exhausted",
216        "resource exhausted",
217        "throttl",
218        "request timeout",
219        "requesttimeout",
220        "timed out",
221        "timeout",
222        "connection reset",
223        "connection closed",
224        "connection refused",
225        "broken pipe",
226        "too many open files",
227    ];
228    TRANSIENT.iter().any(|needle| text.contains(needle))
229}
230
231/// Runs `op` over `items` with the in-flight window sized to `gate.ceiling()`
232/// but the *effective* concurrency gated to the gate's live limit: each item
233/// acquires a [`GatePermit`](crate::adaptive::GatePermit) before `op` runs and
234/// holds it until `op` completes. This is the adaptive sibling of
235/// [`run_concurrent`]: a background tick driver resizes the gate live, so the
236/// number of simultaneously-running ops tracks the controller's limit while the
237/// buffer window stays at the ceiling.
238///
239/// Semantics match [`run_concurrent`] otherwise: completion-independent order,
240/// first-error-wins (remaining in-flight work is cancelled).
241///
242/// # Errors
243///
244/// Returns the first [`StoreError`] produced by any operation.
245pub async fn run_adaptive<I, T, F, Fut>(
246    items: I,
247    gate: &AdaptiveGate,
248    op: F,
249) -> Result<Vec<T>, StoreError>
250where
251    I: IntoIterator,
252    F: Fn(I::Item) -> Fut,
253    Fut: std::future::Future<Output = Result<T, StoreError>>,
254{
255    let window = gate.ceiling().max(1);
256    stream::iter(items)
257        .map(|item| {
258            let op = &op;
259            async move {
260                // Effective concurrency = the gate's current limit (<= ceiling),
261                // even though up to `window` futures are buffered.
262                let _permit = gate.acquire().await;
263                op(item).await
264            }
265        })
266        .buffer_unordered(window)
267        .try_collect()
268        .await
269}
270
271/// Shared token-bucket state, guarded by an async mutex.
272///
273/// The refill `rate`/`capacity` live **inside** the bucket (behind the same
274/// mutex as the running `tokens`) so [`RateLimiter::set_rate`] can retune the
275/// limiter live by relocking and updating them. A `rate` of `0.0` means
276/// "unlimited" — [`acquire`](RateLimiter::acquire) returns immediately.
277#[derive(Debug)]
278struct Bucket {
279    /// Refill rate in bytes per second. `0.0` means unlimited (no throttling).
280    rate: f64,
281    /// Maximum burst capacity, in bytes (~1 second's worth of budget).
282    capacity: f64,
283    /// Currently available tokens (bytes).
284    tokens: f64,
285    /// Last time the bucket was refilled.
286    last_refill: tokio::time::Instant,
287}
288
289/// Inner state of a [`RateLimiter`].
290#[derive(Debug)]
291struct Inner {
292    /// The live bucket state (rate/capacity/tokens). A `rate` of `0.0` models
293    /// the unlimited case.
294    bucket: Mutex<Bucket>,
295}
296
297/// An async token-bucket rate limiter that throttles aggregate transfer
298/// throughput.
299///
300/// Construct with [`RateLimiter::new`]. When `max_bytes_per_sec` is `None` (or
301/// `Some(0)`), the limiter is unlimited and [`acquire`](RateLimiter::acquire)
302/// returns immediately. Otherwise tokens refill at `max_bytes_per_sec` per
303/// second, allowing a burst of up to ~1 second's worth of budget.
304///
305/// The limiter is [`Arc`]-shareable and [`Clone`] (cloning shares the same
306/// underlying bucket).
307#[derive(Debug, Clone)]
308pub struct RateLimiter {
309    inner: Arc<Inner>,
310}
311
312impl RateLimiter {
313    /// Builds a limiter. `None` (or `Some(0)`) yields an unlimited, no-op
314    /// limiter whose [`acquire`](RateLimiter::acquire) never waits.
315    #[must_use]
316    pub fn new(max_bytes_per_sec: Option<u64>) -> Self {
317        #[allow(clippy::cast_precision_loss)]
318        let (rate, capacity, tokens) = match max_bytes_per_sec {
319            Some(r) if r > 0 => {
320                let r = r as f64;
321                (r, r, r)
322            }
323            _ => (0.0, 0.0, 0.0),
324        };
325        Self {
326            inner: Arc::new(Inner {
327                bucket: Mutex::new(Bucket {
328                    rate,
329                    capacity,
330                    tokens,
331                    last_refill: tokio::time::Instant::now(),
332                }),
333            }),
334        }
335    }
336
337    /// Retunes the limiter's aggregate byte-rate cap **live**, so an adaptive
338    /// controller can raise or lower throttling between operations.
339    ///
340    /// - `None` (or `Some(0)`) switches the limiter to **unlimited**: the rate
341    ///   and capacity drop to `0` and the bucket is emptied, so the next
342    ///   [`acquire`](RateLimiter::acquire) is a no-op.
343    /// - `Some(r > 0)` installs (or replaces) a bucket refilling at `r`
344    ///   bytes/sec with ~1 second of burst capacity. Switching from unlimited
345    ///   to limited primes the bucket full (`tokens = capacity`) so a freshly
346    ///   throttled limiter still allows one immediate burst.
347    ///
348    /// Calling `set_rate` is the only way the rate changes after [`new`];
349    /// limiters that never call it behave exactly as before.
350    pub async fn set_rate(&self, bytes_per_sec: Option<u64>) {
351        let mut state = self.inner.bucket.lock().await;
352        let was_unlimited = state.rate <= 0.0;
353        #[allow(clippy::cast_precision_loss)]
354        match bytes_per_sec {
355            Some(r) if r > 0 => {
356                let r = r as f64;
357                state.rate = r;
358                state.capacity = r;
359                // Switching unlimited -> limited: prime a full burst. When
360                // already limited, keep the running token count but clamp it to
361                // the new capacity so a rate drop takes effect promptly.
362                if was_unlimited {
363                    state.tokens = r;
364                } else {
365                    state.tokens = state.tokens.min(r);
366                }
367                state.last_refill = tokio::time::Instant::now();
368            }
369            _ => {
370                // Unlimited: empty the bucket; `acquire` short-circuits on rate==0.
371                state.rate = 0.0;
372                state.capacity = 0.0;
373                state.tokens = 0.0;
374            }
375        }
376    }
377
378    /// Blocks until `n` bytes of budget are available, refilling the bucket at
379    /// the configured rate. Unlimited limiters return immediately.
380    ///
381    /// A single request larger than the bucket capacity is still satisfied: the
382    /// bucket is allowed to go negative and the caller waits out the deficit,
383    /// so throttling is correct even for objects bigger than one second's
384    /// worth of budget.
385    pub async fn acquire(&self, n: u64) {
386        if n == 0 {
387            return;
388        }
389        #[allow(clippy::cast_precision_loss)]
390        let need = n as f64;
391
392        loop {
393            let wait = {
394                let mut state = self.inner.bucket.lock().await;
395                if state.rate <= 0.0 {
396                    return; // unlimited fast path (also covers live set_rate(None))
397                }
398                let now = tokio::time::Instant::now();
399                let elapsed = now.duration_since(state.last_refill).as_secs_f64();
400                state.tokens = (state.tokens + elapsed * state.rate).min(state.capacity);
401                state.last_refill = now;
402
403                if state.tokens >= need {
404                    state.tokens -= need;
405                    return;
406                }
407                // Not enough budget: compute how long until the deficit is
408                // covered, then sleep (releasing the lock first).
409                let deficit = need - state.tokens;
410                deficit / state.rate
411            };
412            tokio::time::sleep(Duration::from_secs_f64(wait)).await;
413        }
414    }
415}
416
417/// Shared token-bucket state for [`BlockingRateLimiter`], guarded by a
418/// **synchronous** [`std::sync::Mutex`] (not tokio's async mutex).
419///
420/// As with [`Bucket`], the refill `rate`/`capacity` live inside the bucket so
421/// [`BlockingRateLimiter::set_rate`] can retune live. `rate == 0.0` means
422/// unlimited.
423#[derive(Debug)]
424struct BlockingBucket {
425    /// Refill rate in bytes per second. `0.0` means unlimited (no throttling).
426    rate: f64,
427    /// Maximum burst capacity, in bytes (~1 second's worth of budget).
428    capacity: f64,
429    /// Currently available tokens (bytes).
430    tokens: f64,
431    /// Last time the bucket was refilled.
432    last_refill: std::time::Instant,
433}
434
435/// Inner state of a [`BlockingRateLimiter`].
436#[derive(Debug)]
437struct BlockingInner {
438    /// The live bucket state (rate/capacity/tokens). A `rate` of `0.0` models
439    /// the unlimited case.
440    bucket: std::sync::Mutex<BlockingBucket>,
441}
442
443/// A **synchronous** token-bucket rate limiter for the store-to-store sync
444/// path.
445///
446/// This is the blocking sibling of [`RateLimiter`]. The
447/// [`StreamStore`](crate::stream::StreamStore) methods are synchronous and
448/// drive their backends' async SDK calls on an internal runtime via `block_on`,
449/// so the store-to-store sync orchestrator parallelizes them across a **rayon**
450/// thread pool of plain OS threads — it cannot use the async [`RateLimiter`]
451/// (awaiting inside a `block_on`-ing rayon worker would nest tokio runtimes).
452/// [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) therefore parks
453/// the calling OS thread with [`std::thread::sleep`] instead of `.await`.
454///
455/// When `max_bytes_per_sec` is `None` (or `Some(0)`), the limiter is unlimited
456/// and [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) returns
457/// immediately. Otherwise tokens refill at `max_bytes_per_sec` per second,
458/// allowing a burst of up to ~1 second's worth of budget. The token math
459/// mirrors [`RateLimiter::acquire`] exactly.
460///
461/// The limiter is [`Arc`]-shareable and [`Clone`] (cloning shares the same
462/// underlying bucket), so every rayon worker throttles against one aggregate
463/// budget.
464#[derive(Debug, Clone)]
465pub struct BlockingRateLimiter {
466    inner: Arc<BlockingInner>,
467}
468
469impl BlockingRateLimiter {
470    /// Builds a synchronous limiter. `None` (or `Some(0)`) yields an unlimited,
471    /// no-op limiter whose
472    /// [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) never waits.
473    #[must_use]
474    pub fn new(max_bytes_per_sec: Option<u64>) -> Self {
475        #[allow(clippy::cast_precision_loss)]
476        let (rate, capacity, tokens) = match max_bytes_per_sec {
477            Some(r) if r > 0 => {
478                let r = r as f64;
479                (r, r, r)
480            }
481            _ => (0.0, 0.0, 0.0),
482        };
483        Self {
484            inner: Arc::new(BlockingInner {
485                bucket: std::sync::Mutex::new(BlockingBucket {
486                    rate,
487                    capacity,
488                    tokens,
489                    last_refill: std::time::Instant::now(),
490                }),
491            }),
492        }
493    }
494
495    /// Retunes the limiter's aggregate byte-rate cap **live** (the synchronous
496    /// sibling of [`RateLimiter::set_rate`]). Same semantics: `None`/`Some(0)`
497    /// switches to unlimited and empties the bucket; `Some(r > 0)` installs a
498    /// bucket refilling at `r` bytes/sec (priming a full burst when switching
499    /// from unlimited).
500    pub fn set_rate(&self, bytes_per_sec: Option<u64>) {
501        let mut state = self
502            .inner
503            .bucket
504            .lock()
505            .unwrap_or_else(std::sync::PoisonError::into_inner);
506        let was_unlimited = state.rate <= 0.0;
507        #[allow(clippy::cast_precision_loss)]
508        match bytes_per_sec {
509            Some(r) if r > 0 => {
510                let r = r as f64;
511                state.rate = r;
512                state.capacity = r;
513                if was_unlimited {
514                    state.tokens = r;
515                } else {
516                    state.tokens = state.tokens.min(r);
517                }
518                state.last_refill = std::time::Instant::now();
519            }
520            _ => {
521                state.rate = 0.0;
522                state.capacity = 0.0;
523                state.tokens = 0.0;
524            }
525        }
526    }
527
528    /// Blocks the calling OS thread until `n` bytes of budget are available,
529    /// refilling the bucket at the configured rate. Unlimited limiters return
530    /// immediately.
531    ///
532    /// A single request larger than the bucket capacity is still satisfied: the
533    /// bucket is allowed to go negative and the caller waits out the deficit,
534    /// so throttling is correct even for objects bigger than one second's worth
535    /// of budget. Mirrors [`RateLimiter::acquire`], but parks the thread with
536    /// [`std::thread::sleep`] instead of awaiting.
537    pub fn acquire_blocking(&self, n: u64) {
538        if n == 0 {
539            return;
540        }
541        #[allow(clippy::cast_precision_loss)]
542        let need = n as f64;
543
544        loop {
545            let wait = {
546                // A poisoned bucket only means a thread panicked mid-acquire;
547                // the token state is still usable, so recover the guard.
548                let mut state = self
549                    .inner
550                    .bucket
551                    .lock()
552                    .unwrap_or_else(std::sync::PoisonError::into_inner);
553                if state.rate <= 0.0 {
554                    return; // unlimited fast path (also covers live set_rate(None))
555                }
556                let now = std::time::Instant::now();
557                let elapsed = now.duration_since(state.last_refill).as_secs_f64();
558                state.tokens = (state.tokens + elapsed * state.rate).min(state.capacity);
559                state.last_refill = now;
560
561                if state.tokens >= need {
562                    state.tokens -= need;
563                    return;
564                }
565                // Not enough budget: compute how long until the deficit is
566                // covered, then sleep (releasing the lock first).
567                let deficit = need - state.tokens;
568                deficit / state.rate
569            };
570            std::thread::sleep(Duration::from_secs_f64(wait));
571        }
572    }
573}
574
575/// Runs `op` over `items` with at most `concurrency` operations in flight,
576/// collecting their results in completion-independent order and returning the
577/// first error encountered (remaining in-flight work is cancelled).
578///
579/// This is the engine later gates use to drive concurrent uploads/downloads.
580///
581/// # Errors
582///
583/// Returns the first [`StoreError`] produced by any operation.
584pub async fn run_concurrent<I, T, F, Fut>(
585    items: I,
586    concurrency: NonZeroUsize,
587    op: F,
588) -> Result<Vec<T>, StoreError>
589where
590    I: IntoIterator,
591    F: Fn(I::Item) -> Fut,
592    Fut: std::future::Future<Output = Result<T, StoreError>>,
593{
594    stream::iter(items)
595        .map(op)
596        .buffer_unordered(concurrency.get())
597        .try_collect()
598        .await
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604    use std::sync::atomic::{AtomicUsize, Ordering};
605
606    /// Builds a current-thread tokio runtime with time enabled, avoiding a
607    /// dependency on the `#[tokio::test]` macro (keeps tokio's feature set
608    /// minimal).
609    fn runtime() -> tokio::runtime::Runtime {
610        tokio::runtime::Builder::new_current_thread()
611            .enable_time()
612            .build()
613            .expect("build tokio runtime")
614    }
615
616    #[test]
617    fn transfer_config_default_caps_concurrency() {
618        let cfg = TransferConfig::default();
619        assert!(cfg.concurrency.get() >= 1, "concurrency must be >= 1");
620        assert!(
621            cfg.concurrency.get() <= DEFAULT_CONCURRENCY_CAP,
622            "default concurrency must be capped at {DEFAULT_CONCURRENCY_CAP}, got {}",
623            cfg.concurrency.get()
624        );
625        assert_eq!(cfg.max_bytes_per_sec, None);
626        assert_eq!(cfg.max_requests_per_sec, None);
627
628        // The clamping ctor never yields 0.
629        assert_eq!(TransferConfig::new(0, None).concurrency.get(), 1);
630        assert_eq!(TransferConfig::new(7, Some(99)).concurrency.get(), 7);
631        assert_eq!(TransferConfig::new(7, Some(99)).max_bytes_per_sec, Some(99));
632    }
633
634    /// Drives `run_concurrent` over N > concurrency items, recording the peak
635    /// number of simultaneously-running ops, and asserts the bound is exactly
636    /// `min(concurrency, N)` — and strictly 1 (sequential) when concurrency=1.
637    fn max_in_flight_for(concurrency: usize, items: usize) -> usize {
638        let in_flight = Arc::new(AtomicUsize::new(0));
639        let high_water = Arc::new(AtomicUsize::new(0));
640
641        let rt = runtime();
642        let result = rt.block_on(async {
643            let in_flight = Arc::clone(&in_flight);
644            let high_water = Arc::clone(&high_water);
645            run_concurrent(
646                0..items,
647                NonZeroUsize::new(concurrency).unwrap(),
648                move |_item| {
649                    let in_flight = Arc::clone(&in_flight);
650                    let high_water = Arc::clone(&high_water);
651                    async move {
652                        let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
653                        high_water.fetch_max(cur, Ordering::SeqCst);
654                        tokio::time::sleep(Duration::from_millis(20)).await;
655                        in_flight.fetch_sub(1, Ordering::SeqCst);
656                        Ok::<_, StoreError>(())
657                    }
658                },
659            )
660            .await
661        });
662        assert!(result.is_ok());
663        high_water.load(Ordering::SeqCst)
664    }
665
666    #[test]
667    fn transfer_config_run_concurrent_max_in_flight() {
668        // concurrency=4 over 12 items: peak in-flight is exactly 4.
669        assert_eq!(max_in_flight_for(4, 12), 4);
670        // concurrency=1 over 5 items: strictly sequential, peak in-flight is 1.
671        assert_eq!(max_in_flight_for(1, 5), 1);
672        // concurrency greater than item count is bounded by the item count.
673        assert_eq!(max_in_flight_for(8, 3), 3);
674    }
675
676    #[test]
677    fn transfer_config_run_concurrent_propagates_error() {
678        let rt = runtime();
679        let result: Result<Vec<()>, StoreError> = rt.block_on(async {
680            run_concurrent(0..10, NonZeroUsize::new(3).unwrap(), |item| async move {
681                if item == 5 {
682                    Err(StoreError::Backend {
683                        message: "boom".to_owned(),
684                        source: None,
685                    })
686                } else {
687                    tokio::time::sleep(Duration::from_millis(5)).await;
688                    Ok(())
689                }
690            })
691            .await
692        });
693        let err = result.expect_err("must surface the failing op's error");
694        assert!(
695            matches!(err, StoreError::Backend { ref message, .. } if message == "boom"),
696            "unexpected error: {err:?}"
697        );
698    }
699
700    #[test]
701    fn sync_snapshot_blocking_rate_limiter() {
702        use std::time::Instant;
703
704        // Unlimited: acquiring a large amount returns essentially instantly.
705        let unlimited = BlockingRateLimiter::new(None);
706        let start = Instant::now();
707        unlimited.acquire_blocking(1_000_000);
708        assert!(
709            start.elapsed() < Duration::from_millis(200),
710            "unlimited acquire_blocking should not block"
711        );
712        // Some(0) is also unlimited.
713        let zero = BlockingRateLimiter::new(Some(0));
714        let start = Instant::now();
715        zero.acquire_blocking(1_000_000);
716        assert!(
717            start.elapsed() < Duration::from_millis(200),
718            "Some(0) acquire_blocking should not block"
719        );
720
721        // Limited to 1000 bytes/sec. The bucket starts full (1000), so the
722        // first 1000 bytes are free; acquiring another ~1000 bytes (2x the
723        // per-second budget in total) must wait for the deficit to refill —
724        // at least ~1s.
725        let limiter = BlockingRateLimiter::new(Some(1000));
726        let start = Instant::now();
727        limiter.acquire_blocking(1000); // drains the initial burst
728        limiter.acquire_blocking(1000); // must wait ~1s to refill
729        let elapsed = start.elapsed();
730        assert!(
731            elapsed >= Duration::from_millis(900),
732            "throttled acquire_blocking should take ~1s, took {elapsed:?}"
733        );
734    }
735
736    #[test]
737    fn transfer_config_rate_limiter_set_rate_live() {
738        let rt = runtime();
739        rt.block_on(async {
740            // Start unlimited: a huge acquire returns instantly.
741            let limiter = RateLimiter::new(None);
742            let start = tokio::time::Instant::now();
743            limiter.acquire(1_000_000).await;
744            assert!(
745                start.elapsed() < Duration::from_millis(200),
746                "unlimited acquire should not block before set_rate"
747            );
748
749            // Tighten to 1000 B/s live. The bucket is primed full (1000), so the
750            // first 1000 bytes are free; the next 1000 must wait ~1s to refill.
751            limiter.set_rate(Some(1000)).await;
752            let start = tokio::time::Instant::now();
753            limiter.acquire(1000).await; // drains the freshly-primed burst
754            limiter.acquire(1000).await; // must wait ~1s
755            let elapsed = start.elapsed();
756            assert!(
757                elapsed >= Duration::from_millis(900),
758                "after set_rate(Some(1000)) a 2x-budget acquire should take ~1s, took {elapsed:?}"
759            );
760
761            // Raise the cap back to unlimited live: acquires stop waiting again.
762            limiter.set_rate(None).await;
763            let start = tokio::time::Instant::now();
764            limiter.acquire(1_000_000).await;
765            assert!(
766                start.elapsed() < Duration::from_millis(200),
767                "after set_rate(None) acquire should no longer block"
768            );
769        });
770    }
771
772    #[test]
773    fn sync_snapshot_blocking_rate_limiter_set_rate_live() {
774        use std::time::Instant;
775
776        // Start unlimited.
777        let limiter = BlockingRateLimiter::new(None);
778        let start = Instant::now();
779        limiter.acquire_blocking(1_000_000);
780        assert!(
781            start.elapsed() < Duration::from_millis(200),
782            "unlimited acquire_blocking should not block before set_rate"
783        );
784
785        // Tighten live to 1000 B/s.
786        limiter.set_rate(Some(1000));
787        let start = Instant::now();
788        limiter.acquire_blocking(1000); // primed burst
789        limiter.acquire_blocking(1000); // waits ~1s
790        let elapsed = start.elapsed();
791        assert!(
792            elapsed >= Duration::from_millis(900),
793            "after set_rate(Some(1000)) a 2x-budget acquire should take ~1s, took {elapsed:?}"
794        );
795
796        // Back to unlimited live.
797        limiter.set_rate(Some(0));
798        let start = Instant::now();
799        limiter.acquire_blocking(1_000_000);
800        assert!(
801            start.elapsed() < Duration::from_millis(200),
802            "after set_rate(Some(0)) acquire_blocking should no longer block"
803        );
804    }
805
806    #[test]
807    fn classify_error_throttle_vs_hard() {
808        use crate::adaptive::OpResult;
809
810        // Backend errors whose message/source look like backpressure -> Throttle.
811        let transient_msgs = [
812            "S3 PUT object failed: SlowDown",
813            "got HTTP 503 Service Unavailable",
814            "rate limited: 429 Too Many Requests",
815            "RESOURCE_EXHAUSTED quota",
816            "request timeout while uploading",
817            "connection reset by peer",
818            "os error: too many open files",
819        ];
820        for msg in transient_msgs {
821            let err = StoreError::Backend {
822                message: msg.to_owned(),
823                source: None,
824            };
825            assert_eq!(
826                classify_error(&err),
827                OpResult::Throttle,
828                "expected Throttle for {msg:?}"
829            );
830        }
831
832        // Hard errors: NotFound / Integrity / Parse / ordinary backend failures.
833        let not_found = StoreError::ObjectNotFound {
834            checksum: "abc".to_owned(),
835        };
836        assert_eq!(classify_error(&not_found), OpResult::HardErr);
837        let integrity = StoreError::Integrity {
838            address: "x".to_owned(),
839            expected: "a".to_owned(),
840            actual: "b".to_owned(),
841        };
842        assert_eq!(classify_error(&integrity), OpResult::HardErr);
843        let other = StoreError::Backend {
844            message: "permission denied".to_owned(),
845            source: None,
846        };
847        assert_eq!(classify_error(&other), OpResult::HardErr);
848
849        // Local-FS backpressure errnos -> Throttle; a plain NotFound IO -> Hard.
850        let emfile = StoreError::Io(std::io::Error::other("too many open files (os error 24)"));
851        assert_eq!(classify_error(&emfile), OpResult::Throttle);
852        let would_block = StoreError::Io(std::io::Error::from(std::io::ErrorKind::WouldBlock));
853        assert_eq!(classify_error(&would_block), OpResult::Throttle);
854        let io_notfound = StoreError::Io(std::io::Error::from(std::io::ErrorKind::NotFound));
855        assert_eq!(classify_error(&io_notfound), OpResult::HardErr);
856    }
857
858    #[test]
859    fn run_adaptive_respects_gate_limit() {
860        use crate::adaptive::AdaptiveGate;
861        use std::sync::atomic::{AtomicUsize, Ordering};
862
863        let rt = runtime();
864        let gate = AdaptiveGate::new(2, 8);
865        let in_flight = Arc::new(AtomicUsize::new(0));
866        let high = Arc::new(AtomicUsize::new(0));
867        let in_flight2 = Arc::clone(&in_flight);
868        let high2 = Arc::clone(&high);
869
870        let result: Result<Vec<()>, StoreError> = rt.block_on(async move {
871            run_adaptive(0..20, &gate, move |_item| {
872                let in_flight = Arc::clone(&in_flight2);
873                let high = Arc::clone(&high2);
874                async move {
875                    let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
876                    high.fetch_max(cur, Ordering::SeqCst);
877                    tokio::time::sleep(Duration::from_millis(15)).await;
878                    in_flight.fetch_sub(1, Ordering::SeqCst);
879                    Ok(())
880                }
881            })
882            .await
883        });
884        assert!(result.is_ok());
885        // Window is the ceiling (8) but the gate's live limit is 2, so peak
886        // effective concurrency never exceeds 2.
887        assert!(
888            high.load(Ordering::SeqCst) <= 2,
889            "effective concurrency must be gated to the limit, got {}",
890            high.load(Ordering::SeqCst)
891        );
892    }
893
894    #[test]
895    fn transfer_config_rate_limiter() {
896        let rt = runtime();
897        rt.block_on(async {
898            // Unlimited: acquiring a large amount returns essentially instantly.
899            let unlimited = RateLimiter::new(None);
900            let start = tokio::time::Instant::now();
901            unlimited.acquire(1_000_000).await;
902            assert!(
903                start.elapsed() < Duration::from_millis(200),
904                "unlimited acquire should not block"
905            );
906
907            // Limited to 1000 bytes/sec. The bucket starts full (1000), so the
908            // first 1000 bytes are free; acquiring another ~2000 bytes total
909            // must wait for the deficit to refill — at least ~1s.
910            let limiter = RateLimiter::new(Some(1000));
911            let start = tokio::time::Instant::now();
912            limiter.acquire(1000).await; // drains the initial burst
913            limiter.acquire(1000).await; // must wait ~1s to refill
914            let elapsed = start.elapsed();
915            assert!(
916                elapsed >= Duration::from_millis(900),
917                "throttled acquire should take ~1s, took {elapsed:?}"
918            );
919        });
920    }
921}