snapdir_stores/transfer.rs
1//! Transfer configuration, rate limiting, and bounded-concurrency driver.
2//!
3//! This module is the foundation for concurrent object transfers and bandwidth
4//! limiting. It provides:
5//!
6//! - [`TransferConfig`] — how many objects to transfer in parallel and an
7//! optional aggregate byte-rate cap.
8//! - [`RateLimiter`] — a zero-dependency async token bucket built on
9//! [`tokio::time`], shareable across tasks via [`Arc`].
10//! - [`run_concurrent`] — a generic bounded-concurrency driver that runs up to
11//! `concurrency` async operations in flight and returns the first error.
12//!
13//! Nothing here changes the existing (sequential) push / fetch loops yet; the
14//! stores merely carry a [`TransferConfig`] so later gates can wire these
15//! primitives into their transfer loops.
16
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::time::Duration;
20
21use futures::stream::{self, StreamExt, TryStreamExt};
22use snapdir_core::store::StoreError;
23use tokio::sync::Mutex;
24
25use crate::adaptive::{AdaptiveGate, OpResult};
26use crate::retry::RetryPolicy;
27
28/// Upper bound on the auto-detected default concurrency.
29const DEFAULT_CONCURRENCY_CAP: usize = 16;
30
31/// Whether (and how) a transfer adaptively tunes its concurrency / byte-rate.
32///
33/// This is the **config-level** policy carried by [`TransferConfig`] (distinct
34/// from [`crate::adaptive::AdaptivePolicy`], which is the controller's
35/// always-on tuning view). `Off` — the default — selects the historical fixed
36/// `concurrency` + fixed `max_bytes_per_sec` path, byte-for-byte unchanged.
37/// `On` selects the adaptive path, which sizes the in-flight window to
38/// `ceiling` and lets a live controller drive the effective concurrency in
39/// `[1, ceiling]` and the byte-rate from in-band per-op feedback. Adaptive
40/// **only** changes scheduling/rate: the exact bytes/objects transferred and
41/// the resulting snapshot are identical to the `Off` path.
42#[derive(Debug, Clone, Copy, PartialEq, Default)]
43pub enum AdaptivePolicy {
44 /// Fixed concurrency + fixed rate (the default; historical behavior).
45 #[default]
46 Off,
47 /// Adaptive concurrency + rate, bounded by `ceiling`, aiming for
48 /// `fraction × discovered-knee`.
49 On {
50 /// Target operating fraction of the discovered knee (clamped to
51 /// `(0, 1]`; `0.8` is the usual default).
52 fraction: f64,
53 /// Absolute concurrency ceiling; the effective limit never exceeds it.
54 ceiling: usize,
55 },
56}
57
58/// Configuration for object transfers: how many to run in parallel, an optional
59/// aggregate byte-rate cap, and whether to tune those adaptively.
60///
61/// `Default` auto-detects the available parallelism (capped at
62/// [`DEFAULT_CONCURRENCY_CAP`]), leaves bandwidth unlimited, and disables
63/// adaptive tuning ([`AdaptivePolicy::Off`]).
64#[derive(Debug, Clone)]
65pub struct TransferConfig {
66 /// Maximum number of object transfers to run concurrently. In the adaptive
67 /// (`On`) path this is the slow-start *seed*; the effective in-flight
68 /// window is sized to the policy ceiling and gated to the live limit.
69 pub concurrency: NonZeroUsize,
70 /// Optional aggregate bandwidth cap, in bytes per second. `None` means
71 /// unlimited. In the adaptive path this is the rate *cap* (`max_rate`); the
72 /// controller may target a lower live rate.
73 pub max_bytes_per_sec: Option<u64>,
74 /// Optional aggregate request-rate cap, in requests per second. `None`
75 /// means unlimited. A later gate wires this into the live store call sites
76 /// (`key_exists` / `get_bytes` / `put_bytes`) by pacing each request through
77 /// a [`RateLimiter`] / [`BlockingRateLimiter`] whose "tokens" are requests
78 /// (`acquire(1)` per request). The per-backend defaults live in
79 /// [`crate::limits::for_scheme`].
80 pub max_requests_per_sec: Option<u64>,
81 /// Whether to tune concurrency / rate adaptively. [`AdaptivePolicy::Off`]
82 /// (the default) keeps the historical fixed-concurrency path byte-for-byte.
83 pub adaptive: AdaptivePolicy,
84 /// The transient-failure retry schedule wrapped around each network SDK
85 /// call (`key_exists` / `get_bytes` / `put_bytes`) via
86 /// [`retry_network`](crate::retry::retry_network). Defaults to
87 /// [`RetryPolicy::default`] (5 attempts, 250ms base, 30s cap); this is the
88 /// CONFIG SEAM the CLI sets later. The per-object integrity-mismatch retry
89 /// in `fetch_verified` is independent of this and unchanged.
90 pub retry: RetryPolicy,
91}
92
93impl TransferConfig {
94 /// Builds a non-adaptive config, clamping `concurrency` to at least 1.
95 ///
96 /// The adaptive policy defaults to [`AdaptivePolicy::Off`], so existing
97 /// callers behave exactly as before. Use
98 /// [`with_adaptive`](Self::with_adaptive) to opt in.
99 #[must_use]
100 pub fn new(concurrency: usize, max_bytes_per_sec: Option<u64>) -> Self {
101 Self {
102 concurrency: NonZeroUsize::new(concurrency.max(1)).unwrap_or(NonZeroUsize::MIN),
103 max_bytes_per_sec,
104 max_requests_per_sec: None,
105 adaptive: AdaptivePolicy::Off,
106 retry: RetryPolicy::default(),
107 }
108 }
109
110 /// Returns this config with its adaptive policy set to `policy` (builder
111 /// style). `Off` is byte-identical to a plain [`new`](Self::new) config.
112 #[must_use]
113 pub fn with_adaptive(mut self, policy: AdaptivePolicy) -> Self {
114 self.adaptive = policy;
115 self
116 }
117
118 /// Returns this config with its network-retry schedule set to `policy`
119 /// (builder style). Defaults to [`RetryPolicy::default`]; the CLI uses this
120 /// to install an operator-configured policy.
121 #[must_use]
122 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
123 self.retry = policy;
124 self
125 }
126}
127
128impl Default for TransferConfig {
129 fn default() -> Self {
130 let detected = std::thread::available_parallelism()
131 .map_or(1, NonZeroUsize::get)
132 .clamp(1, DEFAULT_CONCURRENCY_CAP);
133 Self {
134 // `detected` is >= 1, so the NonZeroUsize is always Some.
135 concurrency: NonZeroUsize::new(detected).unwrap_or(NonZeroUsize::MIN),
136 max_bytes_per_sec: None,
137 max_requests_per_sec: None,
138 adaptive: AdaptivePolicy::Off,
139 retry: RetryPolicy::default(),
140 }
141 }
142}
143
144/// Classifies a [`StoreError`] for the adaptive controller's congestion signal.
145///
146/// Returns [`OpResult::Throttle`] for clearly *transient / backpressure*
147/// failures the controller should back off on (HTTP 429 / `SlowDown` / 503 /
148/// `RESOURCE_EXHAUSTED`, request timeouts, connection reset/closed, and the
149/// local-FS backpressure errno class — `WouldBlock`, EMFILE "too many open
150/// files", and a full disk). Everything else — `NotFound`, `Integrity`,
151/// `Parse`, and ordinary I/O / backend errors — is [`OpResult::HardErr`].
152///
153/// This is **conservative**: anything not clearly transient defaults to
154/// `HardErr`, so a real failure never masquerades as throttling. It inspects
155/// `StoreError::Backend`'s message + wrapped source string (the SDKs surface
156/// their status that way) and `StoreError::Io`'s [`std::io::ErrorKind`].
157#[must_use]
158pub fn classify_error(err: &StoreError) -> OpResult {
159 match err {
160 StoreError::Io(io_err) => classify_io_kind(io_err),
161 StoreError::Backend { message, source } => {
162 let mut text = message.to_ascii_lowercase();
163 if let Some(src) = source {
164 text.push(' ');
165 text.push_str(&src.to_string().to_ascii_lowercase());
166 }
167 if text_is_transient(&text) {
168 OpResult::Throttle
169 } else {
170 OpResult::HardErr
171 }
172 }
173 // NotFound / Integrity / Parse are never transient backpressure, and
174 // `StoreError` is `#[non_exhaustive]` so any future variant is — by the
175 // conservative rule — a hard error until proven transient.
176 _ => OpResult::HardErr,
177 }
178}
179
180/// Classifies a local-filesystem [`std::io::Error`] as transient backpressure
181/// vs a hard error. Only the clear backpressure errno classes
182/// (`WouldBlock`, a full filesystem, and EMFILE "too many open files", which
183/// stable Rust still surfaces as `Uncategorized` with that message) are
184/// treated as [`OpResult::Throttle`].
185fn classify_io_kind(err: &std::io::Error) -> OpResult {
186 use std::io::ErrorKind;
187 match err.kind() {
188 ErrorKind::WouldBlock | ErrorKind::StorageFull => OpResult::Throttle,
189 // EMFILE / ENFILE land in the catch-all `Other`/`Uncategorized` kind on
190 // stable Rust; sniff the message for "too many open files".
191 _ => {
192 if err
193 .to_string()
194 .to_ascii_lowercase()
195 .contains("too many open files")
196 {
197 OpResult::Throttle
198 } else {
199 OpResult::HardErr
200 }
201 }
202 }
203}
204
205/// Substring test for transient/backpressure SDK error text (case-folded).
206fn text_is_transient(text: &str) -> bool {
207 const TRANSIENT: &[&str] = &[
208 "slowdown",
209 "slow down",
210 "429",
211 "too many requests",
212 "503",
213 "service unavailable",
214 "serviceunavailable",
215 "resource_exhausted",
216 "resource exhausted",
217 "throttl",
218 "request timeout",
219 "requesttimeout",
220 "timed out",
221 "timeout",
222 "connection reset",
223 "connection closed",
224 "connection refused",
225 "broken pipe",
226 "too many open files",
227 ];
228 TRANSIENT.iter().any(|needle| text.contains(needle))
229}
230
231/// Runs `op` over `items` with the in-flight window sized to `gate.ceiling()`
232/// but the *effective* concurrency gated to the gate's live limit: each item
233/// acquires a [`GatePermit`](crate::adaptive::GatePermit) before `op` runs and
234/// holds it until `op` completes. This is the adaptive sibling of
235/// [`run_concurrent`]: a background tick driver resizes the gate live, so the
236/// number of simultaneously-running ops tracks the controller's limit while the
237/// buffer window stays at the ceiling.
238///
239/// Semantics match [`run_concurrent`] otherwise: completion-independent order,
240/// first-error-wins (remaining in-flight work is cancelled).
241///
242/// # Errors
243///
244/// Returns the first [`StoreError`] produced by any operation.
245pub async fn run_adaptive<I, T, F, Fut>(
246 items: I,
247 gate: &AdaptiveGate,
248 op: F,
249) -> Result<Vec<T>, StoreError>
250where
251 I: IntoIterator,
252 F: Fn(I::Item) -> Fut,
253 Fut: std::future::Future<Output = Result<T, StoreError>>,
254{
255 let window = gate.ceiling().max(1);
256 stream::iter(items)
257 .map(|item| {
258 let op = &op;
259 async move {
260 // Effective concurrency = the gate's current limit (<= ceiling),
261 // even though up to `window` futures are buffered.
262 let _permit = gate.acquire().await;
263 op(item).await
264 }
265 })
266 .buffer_unordered(window)
267 .try_collect()
268 .await
269}
270
271/// Shared token-bucket state, guarded by an async mutex.
272///
273/// The refill `rate`/`capacity` live **inside** the bucket (behind the same
274/// mutex as the running `tokens`) so [`RateLimiter::set_rate`] can retune the
275/// limiter live by relocking and updating them. A `rate` of `0.0` means
276/// "unlimited" — [`acquire`](RateLimiter::acquire) returns immediately.
277#[derive(Debug)]
278struct Bucket {
279 /// Refill rate in bytes per second. `0.0` means unlimited (no throttling).
280 rate: f64,
281 /// Maximum burst capacity, in bytes (~1 second's worth of budget).
282 capacity: f64,
283 /// Currently available tokens (bytes).
284 tokens: f64,
285 /// Last time the bucket was refilled.
286 last_refill: tokio::time::Instant,
287}
288
289/// Inner state of a [`RateLimiter`].
290#[derive(Debug)]
291struct Inner {
292 /// The live bucket state (rate/capacity/tokens). A `rate` of `0.0` models
293 /// the unlimited case.
294 bucket: Mutex<Bucket>,
295}
296
297/// An async token-bucket rate limiter that throttles aggregate transfer
298/// throughput.
299///
300/// Construct with [`RateLimiter::new`]. When `max_bytes_per_sec` is `None` (or
301/// `Some(0)`), the limiter is unlimited and [`acquire`](RateLimiter::acquire)
302/// returns immediately. Otherwise tokens refill at `max_bytes_per_sec` per
303/// second, allowing a burst of up to ~1 second's worth of budget.
304///
305/// The limiter is [`Arc`]-shareable and [`Clone`] (cloning shares the same
306/// underlying bucket).
307#[derive(Debug, Clone)]
308pub struct RateLimiter {
309 inner: Arc<Inner>,
310}
311
312impl RateLimiter {
313 /// Builds a limiter. `None` (or `Some(0)`) yields an unlimited, no-op
314 /// limiter whose [`acquire`](RateLimiter::acquire) never waits.
315 #[must_use]
316 pub fn new(max_bytes_per_sec: Option<u64>) -> Self {
317 #[allow(clippy::cast_precision_loss)]
318 let (rate, capacity, tokens) = match max_bytes_per_sec {
319 Some(r) if r > 0 => {
320 let r = r as f64;
321 (r, r, r)
322 }
323 _ => (0.0, 0.0, 0.0),
324 };
325 Self {
326 inner: Arc::new(Inner {
327 bucket: Mutex::new(Bucket {
328 rate,
329 capacity,
330 tokens,
331 last_refill: tokio::time::Instant::now(),
332 }),
333 }),
334 }
335 }
336
337 /// Retunes the limiter's aggregate byte-rate cap **live**, so an adaptive
338 /// controller can raise or lower throttling between operations.
339 ///
340 /// - `None` (or `Some(0)`) switches the limiter to **unlimited**: the rate
341 /// and capacity drop to `0` and the bucket is emptied, so the next
342 /// [`acquire`](RateLimiter::acquire) is a no-op.
343 /// - `Some(r > 0)` installs (or replaces) a bucket refilling at `r`
344 /// bytes/sec with ~1 second of burst capacity. Switching from unlimited
345 /// to limited primes the bucket full (`tokens = capacity`) so a freshly
346 /// throttled limiter still allows one immediate burst.
347 ///
348 /// Calling `set_rate` is the only way the rate changes after [`new`];
349 /// limiters that never call it behave exactly as before.
350 pub async fn set_rate(&self, bytes_per_sec: Option<u64>) {
351 let mut state = self.inner.bucket.lock().await;
352 let was_unlimited = state.rate <= 0.0;
353 #[allow(clippy::cast_precision_loss)]
354 match bytes_per_sec {
355 Some(r) if r > 0 => {
356 let r = r as f64;
357 state.rate = r;
358 state.capacity = r;
359 // Switching unlimited -> limited: prime a full burst. When
360 // already limited, keep the running token count but clamp it to
361 // the new capacity so a rate drop takes effect promptly.
362 if was_unlimited {
363 state.tokens = r;
364 } else {
365 state.tokens = state.tokens.min(r);
366 }
367 state.last_refill = tokio::time::Instant::now();
368 }
369 _ => {
370 // Unlimited: empty the bucket; `acquire` short-circuits on rate==0.
371 state.rate = 0.0;
372 state.capacity = 0.0;
373 state.tokens = 0.0;
374 }
375 }
376 }
377
378 /// Blocks until `n` bytes of budget are available, refilling the bucket at
379 /// the configured rate. Unlimited limiters return immediately.
380 ///
381 /// A single request larger than the bucket capacity is still satisfied: the
382 /// bucket is allowed to go negative and the caller waits out the deficit,
383 /// so throttling is correct even for objects bigger than one second's
384 /// worth of budget.
385 pub async fn acquire(&self, n: u64) {
386 if n == 0 {
387 return;
388 }
389 #[allow(clippy::cast_precision_loss)]
390 let need = n as f64;
391
392 loop {
393 let wait = {
394 let mut state = self.inner.bucket.lock().await;
395 if state.rate <= 0.0 {
396 return; // unlimited fast path (also covers live set_rate(None))
397 }
398 let now = tokio::time::Instant::now();
399 let elapsed = now.duration_since(state.last_refill).as_secs_f64();
400 state.tokens = (state.tokens + elapsed * state.rate).min(state.capacity);
401 state.last_refill = now;
402
403 if state.tokens >= need {
404 state.tokens -= need;
405 return;
406 }
407 // Not enough budget: compute how long until the deficit is
408 // covered, then sleep (releasing the lock first).
409 let deficit = need - state.tokens;
410 deficit / state.rate
411 };
412 tokio::time::sleep(Duration::from_secs_f64(wait)).await;
413 }
414 }
415}
416
417/// Shared token-bucket state for [`BlockingRateLimiter`], guarded by a
418/// **synchronous** [`std::sync::Mutex`] (not tokio's async mutex).
419///
420/// As with [`Bucket`], the refill `rate`/`capacity` live inside the bucket so
421/// [`BlockingRateLimiter::set_rate`] can retune live. `rate == 0.0` means
422/// unlimited.
423#[derive(Debug)]
424struct BlockingBucket {
425 /// Refill rate in bytes per second. `0.0` means unlimited (no throttling).
426 rate: f64,
427 /// Maximum burst capacity, in bytes (~1 second's worth of budget).
428 capacity: f64,
429 /// Currently available tokens (bytes).
430 tokens: f64,
431 /// Last time the bucket was refilled.
432 last_refill: std::time::Instant,
433}
434
435/// Inner state of a [`BlockingRateLimiter`].
436#[derive(Debug)]
437struct BlockingInner {
438 /// The live bucket state (rate/capacity/tokens). A `rate` of `0.0` models
439 /// the unlimited case.
440 bucket: std::sync::Mutex<BlockingBucket>,
441}
442
443/// A **synchronous** token-bucket rate limiter for the store-to-store sync
444/// path.
445///
446/// This is the blocking sibling of [`RateLimiter`]. The
447/// [`StreamStore`](crate::stream::StreamStore) methods are synchronous and
448/// drive their backends' async SDK calls on an internal runtime via `block_on`,
449/// so the store-to-store sync orchestrator parallelizes them across a **rayon**
450/// thread pool of plain OS threads — it cannot use the async [`RateLimiter`]
451/// (awaiting inside a `block_on`-ing rayon worker would nest tokio runtimes).
452/// [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) therefore parks
453/// the calling OS thread with [`std::thread::sleep`] instead of `.await`.
454///
455/// When `max_bytes_per_sec` is `None` (or `Some(0)`), the limiter is unlimited
456/// and [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) returns
457/// immediately. Otherwise tokens refill at `max_bytes_per_sec` per second,
458/// allowing a burst of up to ~1 second's worth of budget. The token math
459/// mirrors [`RateLimiter::acquire`] exactly.
460///
461/// The limiter is [`Arc`]-shareable and [`Clone`] (cloning shares the same
462/// underlying bucket), so every rayon worker throttles against one aggregate
463/// budget.
464#[derive(Debug, Clone)]
465pub struct BlockingRateLimiter {
466 inner: Arc<BlockingInner>,
467}
468
469impl BlockingRateLimiter {
470 /// Builds a synchronous limiter. `None` (or `Some(0)`) yields an unlimited,
471 /// no-op limiter whose
472 /// [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) never waits.
473 #[must_use]
474 pub fn new(max_bytes_per_sec: Option<u64>) -> Self {
475 #[allow(clippy::cast_precision_loss)]
476 let (rate, capacity, tokens) = match max_bytes_per_sec {
477 Some(r) if r > 0 => {
478 let r = r as f64;
479 (r, r, r)
480 }
481 _ => (0.0, 0.0, 0.0),
482 };
483 Self {
484 inner: Arc::new(BlockingInner {
485 bucket: std::sync::Mutex::new(BlockingBucket {
486 rate,
487 capacity,
488 tokens,
489 last_refill: std::time::Instant::now(),
490 }),
491 }),
492 }
493 }
494
495 /// Retunes the limiter's aggregate byte-rate cap **live** (the synchronous
496 /// sibling of [`RateLimiter::set_rate`]). Same semantics: `None`/`Some(0)`
497 /// switches to unlimited and empties the bucket; `Some(r > 0)` installs a
498 /// bucket refilling at `r` bytes/sec (priming a full burst when switching
499 /// from unlimited).
500 pub fn set_rate(&self, bytes_per_sec: Option<u64>) {
501 let mut state = self
502 .inner
503 .bucket
504 .lock()
505 .unwrap_or_else(std::sync::PoisonError::into_inner);
506 let was_unlimited = state.rate <= 0.0;
507 #[allow(clippy::cast_precision_loss)]
508 match bytes_per_sec {
509 Some(r) if r > 0 => {
510 let r = r as f64;
511 state.rate = r;
512 state.capacity = r;
513 if was_unlimited {
514 state.tokens = r;
515 } else {
516 state.tokens = state.tokens.min(r);
517 }
518 state.last_refill = std::time::Instant::now();
519 }
520 _ => {
521 state.rate = 0.0;
522 state.capacity = 0.0;
523 state.tokens = 0.0;
524 }
525 }
526 }
527
528 /// Blocks the calling OS thread until `n` bytes of budget are available,
529 /// refilling the bucket at the configured rate. Unlimited limiters return
530 /// immediately.
531 ///
532 /// A single request larger than the bucket capacity is still satisfied: the
533 /// bucket is allowed to go negative and the caller waits out the deficit,
534 /// so throttling is correct even for objects bigger than one second's worth
535 /// of budget. Mirrors [`RateLimiter::acquire`], but parks the thread with
536 /// [`std::thread::sleep`] instead of awaiting.
537 pub fn acquire_blocking(&self, n: u64) {
538 if n == 0 {
539 return;
540 }
541 #[allow(clippy::cast_precision_loss)]
542 let need = n as f64;
543
544 loop {
545 let wait = {
546 // A poisoned bucket only means a thread panicked mid-acquire;
547 // the token state is still usable, so recover the guard.
548 let mut state = self
549 .inner
550 .bucket
551 .lock()
552 .unwrap_or_else(std::sync::PoisonError::into_inner);
553 if state.rate <= 0.0 {
554 return; // unlimited fast path (also covers live set_rate(None))
555 }
556 let now = std::time::Instant::now();
557 let elapsed = now.duration_since(state.last_refill).as_secs_f64();
558 state.tokens = (state.tokens + elapsed * state.rate).min(state.capacity);
559 state.last_refill = now;
560
561 if state.tokens >= need {
562 state.tokens -= need;
563 return;
564 }
565 // Not enough budget: compute how long until the deficit is
566 // covered, then sleep (releasing the lock first).
567 let deficit = need - state.tokens;
568 deficit / state.rate
569 };
570 std::thread::sleep(Duration::from_secs_f64(wait));
571 }
572 }
573}
574
575/// Runs `op` over `items` with at most `concurrency` operations in flight,
576/// collecting their results in completion-independent order and returning the
577/// first error encountered (remaining in-flight work is cancelled).
578///
579/// This is the engine later gates use to drive concurrent uploads/downloads.
580///
581/// # Errors
582///
583/// Returns the first [`StoreError`] produced by any operation.
584pub async fn run_concurrent<I, T, F, Fut>(
585 items: I,
586 concurrency: NonZeroUsize,
587 op: F,
588) -> Result<Vec<T>, StoreError>
589where
590 I: IntoIterator,
591 F: Fn(I::Item) -> Fut,
592 Fut: std::future::Future<Output = Result<T, StoreError>>,
593{
594 stream::iter(items)
595 .map(op)
596 .buffer_unordered(concurrency.get())
597 .try_collect()
598 .await
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604 use std::sync::atomic::{AtomicUsize, Ordering};
605
606 /// Builds a current-thread tokio runtime with time enabled, avoiding a
607 /// dependency on the `#[tokio::test]` macro (keeps tokio's feature set
608 /// minimal).
609 fn runtime() -> tokio::runtime::Runtime {
610 tokio::runtime::Builder::new_current_thread()
611 .enable_time()
612 .build()
613 .expect("build tokio runtime")
614 }
615
616 #[test]
617 fn transfer_config_default_caps_concurrency() {
618 let cfg = TransferConfig::default();
619 assert!(cfg.concurrency.get() >= 1, "concurrency must be >= 1");
620 assert!(
621 cfg.concurrency.get() <= DEFAULT_CONCURRENCY_CAP,
622 "default concurrency must be capped at {DEFAULT_CONCURRENCY_CAP}, got {}",
623 cfg.concurrency.get()
624 );
625 assert_eq!(cfg.max_bytes_per_sec, None);
626 assert_eq!(cfg.max_requests_per_sec, None);
627
628 // The clamping ctor never yields 0.
629 assert_eq!(TransferConfig::new(0, None).concurrency.get(), 1);
630 assert_eq!(TransferConfig::new(7, Some(99)).concurrency.get(), 7);
631 assert_eq!(TransferConfig::new(7, Some(99)).max_bytes_per_sec, Some(99));
632 }
633
634 /// Drives `run_concurrent` over N > concurrency items, recording the peak
635 /// number of simultaneously-running ops, and asserts the bound is exactly
636 /// `min(concurrency, N)` — and strictly 1 (sequential) when concurrency=1.
637 fn max_in_flight_for(concurrency: usize, items: usize) -> usize {
638 let in_flight = Arc::new(AtomicUsize::new(0));
639 let high_water = Arc::new(AtomicUsize::new(0));
640
641 let rt = runtime();
642 let result = rt.block_on(async {
643 let in_flight = Arc::clone(&in_flight);
644 let high_water = Arc::clone(&high_water);
645 run_concurrent(
646 0..items,
647 NonZeroUsize::new(concurrency).unwrap(),
648 move |_item| {
649 let in_flight = Arc::clone(&in_flight);
650 let high_water = Arc::clone(&high_water);
651 async move {
652 let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
653 high_water.fetch_max(cur, Ordering::SeqCst);
654 tokio::time::sleep(Duration::from_millis(20)).await;
655 in_flight.fetch_sub(1, Ordering::SeqCst);
656 Ok::<_, StoreError>(())
657 }
658 },
659 )
660 .await
661 });
662 assert!(result.is_ok());
663 high_water.load(Ordering::SeqCst)
664 }
665
666 #[test]
667 fn transfer_config_run_concurrent_max_in_flight() {
668 // concurrency=4 over 12 items: peak in-flight is exactly 4.
669 assert_eq!(max_in_flight_for(4, 12), 4);
670 // concurrency=1 over 5 items: strictly sequential, peak in-flight is 1.
671 assert_eq!(max_in_flight_for(1, 5), 1);
672 // concurrency greater than item count is bounded by the item count.
673 assert_eq!(max_in_flight_for(8, 3), 3);
674 }
675
676 #[test]
677 fn transfer_config_run_concurrent_propagates_error() {
678 let rt = runtime();
679 let result: Result<Vec<()>, StoreError> = rt.block_on(async {
680 run_concurrent(0..10, NonZeroUsize::new(3).unwrap(), |item| async move {
681 if item == 5 {
682 Err(StoreError::Backend {
683 message: "boom".to_owned(),
684 source: None,
685 })
686 } else {
687 tokio::time::sleep(Duration::from_millis(5)).await;
688 Ok(())
689 }
690 })
691 .await
692 });
693 let err = result.expect_err("must surface the failing op's error");
694 assert!(
695 matches!(err, StoreError::Backend { ref message, .. } if message == "boom"),
696 "unexpected error: {err:?}"
697 );
698 }
699
700 #[test]
701 fn sync_snapshot_blocking_rate_limiter() {
702 use std::time::Instant;
703
704 // Unlimited: acquiring a large amount returns essentially instantly.
705 let unlimited = BlockingRateLimiter::new(None);
706 let start = Instant::now();
707 unlimited.acquire_blocking(1_000_000);
708 assert!(
709 start.elapsed() < Duration::from_millis(200),
710 "unlimited acquire_blocking should not block"
711 );
712 // Some(0) is also unlimited.
713 let zero = BlockingRateLimiter::new(Some(0));
714 let start = Instant::now();
715 zero.acquire_blocking(1_000_000);
716 assert!(
717 start.elapsed() < Duration::from_millis(200),
718 "Some(0) acquire_blocking should not block"
719 );
720
721 // Limited to 1000 bytes/sec. The bucket starts full (1000), so the
722 // first 1000 bytes are free; acquiring another ~1000 bytes (2x the
723 // per-second budget in total) must wait for the deficit to refill —
724 // at least ~1s.
725 let limiter = BlockingRateLimiter::new(Some(1000));
726 let start = Instant::now();
727 limiter.acquire_blocking(1000); // drains the initial burst
728 limiter.acquire_blocking(1000); // must wait ~1s to refill
729 let elapsed = start.elapsed();
730 assert!(
731 elapsed >= Duration::from_millis(900),
732 "throttled acquire_blocking should take ~1s, took {elapsed:?}"
733 );
734 }
735
736 #[test]
737 fn transfer_config_rate_limiter_set_rate_live() {
738 let rt = runtime();
739 rt.block_on(async {
740 // Start unlimited: a huge acquire returns instantly.
741 let limiter = RateLimiter::new(None);
742 let start = tokio::time::Instant::now();
743 limiter.acquire(1_000_000).await;
744 assert!(
745 start.elapsed() < Duration::from_millis(200),
746 "unlimited acquire should not block before set_rate"
747 );
748
749 // Tighten to 1000 B/s live. The bucket is primed full (1000), so the
750 // first 1000 bytes are free; the next 1000 must wait ~1s to refill.
751 limiter.set_rate(Some(1000)).await;
752 let start = tokio::time::Instant::now();
753 limiter.acquire(1000).await; // drains the freshly-primed burst
754 limiter.acquire(1000).await; // must wait ~1s
755 let elapsed = start.elapsed();
756 assert!(
757 elapsed >= Duration::from_millis(900),
758 "after set_rate(Some(1000)) a 2x-budget acquire should take ~1s, took {elapsed:?}"
759 );
760
761 // Raise the cap back to unlimited live: acquires stop waiting again.
762 limiter.set_rate(None).await;
763 let start = tokio::time::Instant::now();
764 limiter.acquire(1_000_000).await;
765 assert!(
766 start.elapsed() < Duration::from_millis(200),
767 "after set_rate(None) acquire should no longer block"
768 );
769 });
770 }
771
772 #[test]
773 fn sync_snapshot_blocking_rate_limiter_set_rate_live() {
774 use std::time::Instant;
775
776 // Start unlimited.
777 let limiter = BlockingRateLimiter::new(None);
778 let start = Instant::now();
779 limiter.acquire_blocking(1_000_000);
780 assert!(
781 start.elapsed() < Duration::from_millis(200),
782 "unlimited acquire_blocking should not block before set_rate"
783 );
784
785 // Tighten live to 1000 B/s.
786 limiter.set_rate(Some(1000));
787 let start = Instant::now();
788 limiter.acquire_blocking(1000); // primed burst
789 limiter.acquire_blocking(1000); // waits ~1s
790 let elapsed = start.elapsed();
791 assert!(
792 elapsed >= Duration::from_millis(900),
793 "after set_rate(Some(1000)) a 2x-budget acquire should take ~1s, took {elapsed:?}"
794 );
795
796 // Back to unlimited live.
797 limiter.set_rate(Some(0));
798 let start = Instant::now();
799 limiter.acquire_blocking(1_000_000);
800 assert!(
801 start.elapsed() < Duration::from_millis(200),
802 "after set_rate(Some(0)) acquire_blocking should no longer block"
803 );
804 }
805
806 #[test]
807 fn classify_error_throttle_vs_hard() {
808 use crate::adaptive::OpResult;
809
810 // Backend errors whose message/source look like backpressure -> Throttle.
811 let transient_msgs = [
812 "S3 PUT object failed: SlowDown",
813 "got HTTP 503 Service Unavailable",
814 "rate limited: 429 Too Many Requests",
815 "RESOURCE_EXHAUSTED quota",
816 "request timeout while uploading",
817 "connection reset by peer",
818 "os error: too many open files",
819 ];
820 for msg in transient_msgs {
821 let err = StoreError::Backend {
822 message: msg.to_owned(),
823 source: None,
824 };
825 assert_eq!(
826 classify_error(&err),
827 OpResult::Throttle,
828 "expected Throttle for {msg:?}"
829 );
830 }
831
832 // Hard errors: NotFound / Integrity / Parse / ordinary backend failures.
833 let not_found = StoreError::ObjectNotFound {
834 checksum: "abc".to_owned(),
835 };
836 assert_eq!(classify_error(¬_found), OpResult::HardErr);
837 let integrity = StoreError::Integrity {
838 address: "x".to_owned(),
839 expected: "a".to_owned(),
840 actual: "b".to_owned(),
841 };
842 assert_eq!(classify_error(&integrity), OpResult::HardErr);
843 let other = StoreError::Backend {
844 message: "permission denied".to_owned(),
845 source: None,
846 };
847 assert_eq!(classify_error(&other), OpResult::HardErr);
848
849 // Local-FS backpressure errnos -> Throttle; a plain NotFound IO -> Hard.
850 let emfile = StoreError::Io(std::io::Error::other("too many open files (os error 24)"));
851 assert_eq!(classify_error(&emfile), OpResult::Throttle);
852 let would_block = StoreError::Io(std::io::Error::from(std::io::ErrorKind::WouldBlock));
853 assert_eq!(classify_error(&would_block), OpResult::Throttle);
854 let io_notfound = StoreError::Io(std::io::Error::from(std::io::ErrorKind::NotFound));
855 assert_eq!(classify_error(&io_notfound), OpResult::HardErr);
856 }
857
858 #[test]
859 fn run_adaptive_respects_gate_limit() {
860 use crate::adaptive::AdaptiveGate;
861 use std::sync::atomic::{AtomicUsize, Ordering};
862
863 let rt = runtime();
864 let gate = AdaptiveGate::new(2, 8);
865 let in_flight = Arc::new(AtomicUsize::new(0));
866 let high = Arc::new(AtomicUsize::new(0));
867 let in_flight2 = Arc::clone(&in_flight);
868 let high2 = Arc::clone(&high);
869
870 let result: Result<Vec<()>, StoreError> = rt.block_on(async move {
871 run_adaptive(0..20, &gate, move |_item| {
872 let in_flight = Arc::clone(&in_flight2);
873 let high = Arc::clone(&high2);
874 async move {
875 let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
876 high.fetch_max(cur, Ordering::SeqCst);
877 tokio::time::sleep(Duration::from_millis(15)).await;
878 in_flight.fetch_sub(1, Ordering::SeqCst);
879 Ok(())
880 }
881 })
882 .await
883 });
884 assert!(result.is_ok());
885 // Window is the ceiling (8) but the gate's live limit is 2, so peak
886 // effective concurrency never exceeds 2.
887 assert!(
888 high.load(Ordering::SeqCst) <= 2,
889 "effective concurrency must be gated to the limit, got {}",
890 high.load(Ordering::SeqCst)
891 );
892 }
893
894 #[test]
895 fn transfer_config_rate_limiter() {
896 let rt = runtime();
897 rt.block_on(async {
898 // Unlimited: acquiring a large amount returns essentially instantly.
899 let unlimited = RateLimiter::new(None);
900 let start = tokio::time::Instant::now();
901 unlimited.acquire(1_000_000).await;
902 assert!(
903 start.elapsed() < Duration::from_millis(200),
904 "unlimited acquire should not block"
905 );
906
907 // Limited to 1000 bytes/sec. The bucket starts full (1000), so the
908 // first 1000 bytes are free; acquiring another ~2000 bytes total
909 // must wait for the deficit to refill — at least ~1s.
910 let limiter = RateLimiter::new(Some(1000));
911 let start = tokio::time::Instant::now();
912 limiter.acquire(1000).await; // drains the initial burst
913 limiter.acquire(1000).await; // must wait ~1s to refill
914 let elapsed = start.elapsed();
915 assert!(
916 elapsed >= Duration::from_millis(900),
917 "throttled acquire should take ~1s, took {elapsed:?}"
918 );
919 });
920 }
921}