snapdir_stores/transfer.rs
1//! Transfer configuration, rate limiting, and bounded-concurrency driver.
2//!
3//! This module is the foundation for concurrent object transfers and bandwidth
4//! limiting. It provides:
5//!
6//! - [`TransferConfig`] — how many objects to transfer in parallel and an
7//! optional aggregate byte-rate cap.
8//! - [`RateLimiter`] — a zero-dependency async token bucket built on
9//! [`tokio::time`], shareable across tasks via [`Arc`].
10//! - [`run_concurrent`] — a generic bounded-concurrency driver that runs up to
11//! `concurrency` async operations in flight and returns the first error.
12//!
13//! Nothing here changes the existing (sequential) push / fetch loops yet; the
14//! stores merely carry a [`TransferConfig`] so later gates can wire these
15//! primitives into their transfer loops.
16
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::time::Duration;
20
21use futures::stream::{self, StreamExt, TryStreamExt};
22use snapdir_core::store::StoreError;
23use tokio::sync::Mutex;
24
25use crate::adaptive::{AdaptiveGate, OpResult};
26
27/// Upper bound on the auto-detected default concurrency.
28const DEFAULT_CONCURRENCY_CAP: usize = 16;
29
30/// Whether (and how) a transfer adaptively tunes its concurrency / byte-rate.
31///
32/// This is the **config-level** policy carried by [`TransferConfig`] (distinct
33/// from [`crate::adaptive::AdaptivePolicy`], which is the controller's
34/// always-on tuning view). `Off` — the default — selects the historical fixed
35/// `concurrency` + fixed `max_bytes_per_sec` path, byte-for-byte unchanged.
36/// `On` selects the adaptive path, which sizes the in-flight window to
37/// `ceiling` and lets a live controller drive the effective concurrency in
38/// `[1, ceiling]` and the byte-rate from in-band per-op feedback. Adaptive
39/// **only** changes scheduling/rate: the exact bytes/objects transferred and
40/// the resulting snapshot are identical to the `Off` path.
41#[derive(Debug, Clone, Copy, PartialEq, Default)]
42pub enum AdaptivePolicy {
43 /// Fixed concurrency + fixed rate (the default; historical behavior).
44 #[default]
45 Off,
46 /// Adaptive concurrency + rate, bounded by `ceiling`, aiming for
47 /// `fraction × discovered-knee`.
48 On {
49 /// Target operating fraction of the discovered knee (clamped to
50 /// `(0, 1]`; `0.8` is the usual default).
51 fraction: f64,
52 /// Absolute concurrency ceiling; the effective limit never exceeds it.
53 ceiling: usize,
54 },
55}
56
57/// Configuration for object transfers: how many to run in parallel, an optional
58/// aggregate byte-rate cap, and whether to tune those adaptively.
59///
60/// `Default` auto-detects the available parallelism (capped at
61/// [`DEFAULT_CONCURRENCY_CAP`]), leaves bandwidth unlimited, and disables
62/// adaptive tuning ([`AdaptivePolicy::Off`]).
63#[derive(Debug, Clone)]
64pub struct TransferConfig {
65 /// Maximum number of object transfers to run concurrently. In the adaptive
66 /// (`On`) path this is the slow-start *seed*; the effective in-flight
67 /// window is sized to the policy ceiling and gated to the live limit.
68 pub concurrency: NonZeroUsize,
69 /// Optional aggregate bandwidth cap, in bytes per second. `None` means
70 /// unlimited. In the adaptive path this is the rate *cap* (`max_rate`); the
71 /// controller may target a lower live rate.
72 pub max_bytes_per_sec: Option<u64>,
73 /// Whether to tune concurrency / rate adaptively. [`AdaptivePolicy::Off`]
74 /// (the default) keeps the historical fixed-concurrency path byte-for-byte.
75 pub adaptive: AdaptivePolicy,
76}
77
78impl TransferConfig {
79 /// Builds a non-adaptive config, clamping `concurrency` to at least 1.
80 ///
81 /// The adaptive policy defaults to [`AdaptivePolicy::Off`], so existing
82 /// callers behave exactly as before. Use
83 /// [`with_adaptive`](Self::with_adaptive) to opt in.
84 #[must_use]
85 pub fn new(concurrency: usize, max_bytes_per_sec: Option<u64>) -> Self {
86 Self {
87 concurrency: NonZeroUsize::new(concurrency.max(1)).unwrap_or(NonZeroUsize::MIN),
88 max_bytes_per_sec,
89 adaptive: AdaptivePolicy::Off,
90 }
91 }
92
93 /// Returns this config with its adaptive policy set to `policy` (builder
94 /// style). `Off` is byte-identical to a plain [`new`](Self::new) config.
95 #[must_use]
96 pub fn with_adaptive(mut self, policy: AdaptivePolicy) -> Self {
97 self.adaptive = policy;
98 self
99 }
100}
101
102impl Default for TransferConfig {
103 fn default() -> Self {
104 let detected = std::thread::available_parallelism()
105 .map_or(1, NonZeroUsize::get)
106 .clamp(1, DEFAULT_CONCURRENCY_CAP);
107 Self {
108 // `detected` is >= 1, so the NonZeroUsize is always Some.
109 concurrency: NonZeroUsize::new(detected).unwrap_or(NonZeroUsize::MIN),
110 max_bytes_per_sec: None,
111 adaptive: AdaptivePolicy::Off,
112 }
113 }
114}
115
116/// Classifies a [`StoreError`] for the adaptive controller's congestion signal.
117///
118/// Returns [`OpResult::Throttle`] for clearly *transient / backpressure*
119/// failures the controller should back off on (HTTP 429 / `SlowDown` / 503 /
120/// `RESOURCE_EXHAUSTED`, request timeouts, connection reset/closed, and the
121/// local-FS backpressure errno class — `WouldBlock`, EMFILE "too many open
122/// files", and a full disk). Everything else — `NotFound`, `Integrity`,
123/// `Parse`, and ordinary I/O / backend errors — is [`OpResult::HardErr`].
124///
125/// This is **conservative**: anything not clearly transient defaults to
126/// `HardErr`, so a real failure never masquerades as throttling. It inspects
127/// `StoreError::Backend`'s message + wrapped source string (the SDKs surface
128/// their status that way) and `StoreError::Io`'s [`std::io::ErrorKind`].
129#[must_use]
130pub fn classify_error(err: &StoreError) -> OpResult {
131 match err {
132 StoreError::Io(io_err) => classify_io_kind(io_err),
133 StoreError::Backend { message, source } => {
134 let mut text = message.to_ascii_lowercase();
135 if let Some(src) = source {
136 text.push(' ');
137 text.push_str(&src.to_string().to_ascii_lowercase());
138 }
139 if text_is_transient(&text) {
140 OpResult::Throttle
141 } else {
142 OpResult::HardErr
143 }
144 }
145 // NotFound / Integrity / Parse are never transient backpressure, and
146 // `StoreError` is `#[non_exhaustive]` so any future variant is — by the
147 // conservative rule — a hard error until proven transient.
148 _ => OpResult::HardErr,
149 }
150}
151
152/// Classifies a local-filesystem [`std::io::Error`] as transient backpressure
153/// vs a hard error. Only the clear backpressure errno classes
154/// (`WouldBlock`, a full filesystem, and EMFILE "too many open files", which
155/// stable Rust still surfaces as `Uncategorized` with that message) are
156/// treated as [`OpResult::Throttle`].
157fn classify_io_kind(err: &std::io::Error) -> OpResult {
158 use std::io::ErrorKind;
159 match err.kind() {
160 ErrorKind::WouldBlock | ErrorKind::StorageFull => OpResult::Throttle,
161 // EMFILE / ENFILE land in the catch-all `Other`/`Uncategorized` kind on
162 // stable Rust; sniff the message for "too many open files".
163 _ => {
164 if err
165 .to_string()
166 .to_ascii_lowercase()
167 .contains("too many open files")
168 {
169 OpResult::Throttle
170 } else {
171 OpResult::HardErr
172 }
173 }
174 }
175}
176
177/// Substring test for transient/backpressure SDK error text (case-folded).
178fn text_is_transient(text: &str) -> bool {
179 const TRANSIENT: &[&str] = &[
180 "slowdown",
181 "slow down",
182 "429",
183 "too many requests",
184 "503",
185 "service unavailable",
186 "serviceunavailable",
187 "resource_exhausted",
188 "resource exhausted",
189 "throttl",
190 "request timeout",
191 "requesttimeout",
192 "timed out",
193 "timeout",
194 "connection reset",
195 "connection closed",
196 "connection refused",
197 "broken pipe",
198 "too many open files",
199 ];
200 TRANSIENT.iter().any(|needle| text.contains(needle))
201}
202
203/// Runs `op` over `items` with the in-flight window sized to `gate.ceiling()`
204/// but the *effective* concurrency gated to the gate's live limit: each item
205/// acquires a [`GatePermit`](crate::adaptive::GatePermit) before `op` runs and
206/// holds it until `op` completes. This is the adaptive sibling of
207/// [`run_concurrent`]: a background tick driver resizes the gate live, so the
208/// number of simultaneously-running ops tracks the controller's limit while the
209/// buffer window stays at the ceiling.
210///
211/// Semantics match [`run_concurrent`] otherwise: completion-independent order,
212/// first-error-wins (remaining in-flight work is cancelled).
213///
214/// # Errors
215///
216/// Returns the first [`StoreError`] produced by any operation.
217pub async fn run_adaptive<I, T, F, Fut>(
218 items: I,
219 gate: &AdaptiveGate,
220 op: F,
221) -> Result<Vec<T>, StoreError>
222where
223 I: IntoIterator,
224 F: Fn(I::Item) -> Fut,
225 Fut: std::future::Future<Output = Result<T, StoreError>>,
226{
227 let window = gate.ceiling().max(1);
228 stream::iter(items)
229 .map(|item| {
230 let op = &op;
231 async move {
232 // Effective concurrency = the gate's current limit (<= ceiling),
233 // even though up to `window` futures are buffered.
234 let _permit = gate.acquire().await;
235 op(item).await
236 }
237 })
238 .buffer_unordered(window)
239 .try_collect()
240 .await
241}
242
243/// Shared token-bucket state, guarded by an async mutex.
244///
245/// The refill `rate`/`capacity` live **inside** the bucket (behind the same
246/// mutex as the running `tokens`) so [`RateLimiter::set_rate`] can retune the
247/// limiter live by relocking and updating them. A `rate` of `0.0` means
248/// "unlimited" — [`acquire`](RateLimiter::acquire) returns immediately.
249#[derive(Debug)]
250struct Bucket {
251 /// Refill rate in bytes per second. `0.0` means unlimited (no throttling).
252 rate: f64,
253 /// Maximum burst capacity, in bytes (~1 second's worth of budget).
254 capacity: f64,
255 /// Currently available tokens (bytes).
256 tokens: f64,
257 /// Last time the bucket was refilled.
258 last_refill: tokio::time::Instant,
259}
260
261/// Inner state of a [`RateLimiter`].
262#[derive(Debug)]
263struct Inner {
264 /// The live bucket state (rate/capacity/tokens). A `rate` of `0.0` models
265 /// the unlimited case.
266 bucket: Mutex<Bucket>,
267}
268
269/// An async token-bucket rate limiter that throttles aggregate transfer
270/// throughput.
271///
272/// Construct with [`RateLimiter::new`]. When `max_bytes_per_sec` is `None` (or
273/// `Some(0)`), the limiter is unlimited and [`acquire`](RateLimiter::acquire)
274/// returns immediately. Otherwise tokens refill at `max_bytes_per_sec` per
275/// second, allowing a burst of up to ~1 second's worth of budget.
276///
277/// The limiter is [`Arc`]-shareable and [`Clone`] (cloning shares the same
278/// underlying bucket).
279#[derive(Debug, Clone)]
280pub struct RateLimiter {
281 inner: Arc<Inner>,
282}
283
284impl RateLimiter {
285 /// Builds a limiter. `None` (or `Some(0)`) yields an unlimited, no-op
286 /// limiter whose [`acquire`](RateLimiter::acquire) never waits.
287 #[must_use]
288 pub fn new(max_bytes_per_sec: Option<u64>) -> Self {
289 #[allow(clippy::cast_precision_loss)]
290 let (rate, capacity, tokens) = match max_bytes_per_sec {
291 Some(r) if r > 0 => {
292 let r = r as f64;
293 (r, r, r)
294 }
295 _ => (0.0, 0.0, 0.0),
296 };
297 Self {
298 inner: Arc::new(Inner {
299 bucket: Mutex::new(Bucket {
300 rate,
301 capacity,
302 tokens,
303 last_refill: tokio::time::Instant::now(),
304 }),
305 }),
306 }
307 }
308
309 /// Retunes the limiter's aggregate byte-rate cap **live**, so an adaptive
310 /// controller can raise or lower throttling between operations.
311 ///
312 /// - `None` (or `Some(0)`) switches the limiter to **unlimited**: the rate
313 /// and capacity drop to `0` and the bucket is emptied, so the next
314 /// [`acquire`](RateLimiter::acquire) is a no-op.
315 /// - `Some(r > 0)` installs (or replaces) a bucket refilling at `r`
316 /// bytes/sec with ~1 second of burst capacity. Switching from unlimited
317 /// to limited primes the bucket full (`tokens = capacity`) so a freshly
318 /// throttled limiter still allows one immediate burst.
319 ///
320 /// Calling `set_rate` is the only way the rate changes after [`new`];
321 /// limiters that never call it behave exactly as before.
322 pub async fn set_rate(&self, bytes_per_sec: Option<u64>) {
323 let mut state = self.inner.bucket.lock().await;
324 let was_unlimited = state.rate <= 0.0;
325 #[allow(clippy::cast_precision_loss)]
326 match bytes_per_sec {
327 Some(r) if r > 0 => {
328 let r = r as f64;
329 state.rate = r;
330 state.capacity = r;
331 // Switching unlimited -> limited: prime a full burst. When
332 // already limited, keep the running token count but clamp it to
333 // the new capacity so a rate drop takes effect promptly.
334 if was_unlimited {
335 state.tokens = r;
336 } else {
337 state.tokens = state.tokens.min(r);
338 }
339 state.last_refill = tokio::time::Instant::now();
340 }
341 _ => {
342 // Unlimited: empty the bucket; `acquire` short-circuits on rate==0.
343 state.rate = 0.0;
344 state.capacity = 0.0;
345 state.tokens = 0.0;
346 }
347 }
348 }
349
350 /// Blocks until `n` bytes of budget are available, refilling the bucket at
351 /// the configured rate. Unlimited limiters return immediately.
352 ///
353 /// A single request larger than the bucket capacity is still satisfied: the
354 /// bucket is allowed to go negative and the caller waits out the deficit,
355 /// so throttling is correct even for objects bigger than one second's
356 /// worth of budget.
357 pub async fn acquire(&self, n: u64) {
358 if n == 0 {
359 return;
360 }
361 #[allow(clippy::cast_precision_loss)]
362 let need = n as f64;
363
364 loop {
365 let wait = {
366 let mut state = self.inner.bucket.lock().await;
367 if state.rate <= 0.0 {
368 return; // unlimited fast path (also covers live set_rate(None))
369 }
370 let now = tokio::time::Instant::now();
371 let elapsed = now.duration_since(state.last_refill).as_secs_f64();
372 state.tokens = (state.tokens + elapsed * state.rate).min(state.capacity);
373 state.last_refill = now;
374
375 if state.tokens >= need {
376 state.tokens -= need;
377 return;
378 }
379 // Not enough budget: compute how long until the deficit is
380 // covered, then sleep (releasing the lock first).
381 let deficit = need - state.tokens;
382 deficit / state.rate
383 };
384 tokio::time::sleep(Duration::from_secs_f64(wait)).await;
385 }
386 }
387}
388
389/// Shared token-bucket state for [`BlockingRateLimiter`], guarded by a
390/// **synchronous** [`std::sync::Mutex`] (not tokio's async mutex).
391///
392/// As with [`Bucket`], the refill `rate`/`capacity` live inside the bucket so
393/// [`BlockingRateLimiter::set_rate`] can retune live. `rate == 0.0` means
394/// unlimited.
395#[derive(Debug)]
396struct BlockingBucket {
397 /// Refill rate in bytes per second. `0.0` means unlimited (no throttling).
398 rate: f64,
399 /// Maximum burst capacity, in bytes (~1 second's worth of budget).
400 capacity: f64,
401 /// Currently available tokens (bytes).
402 tokens: f64,
403 /// Last time the bucket was refilled.
404 last_refill: std::time::Instant,
405}
406
407/// Inner state of a [`BlockingRateLimiter`].
408#[derive(Debug)]
409struct BlockingInner {
410 /// The live bucket state (rate/capacity/tokens). A `rate` of `0.0` models
411 /// the unlimited case.
412 bucket: std::sync::Mutex<BlockingBucket>,
413}
414
415/// A **synchronous** token-bucket rate limiter for the store-to-store sync
416/// path.
417///
418/// This is the blocking sibling of [`RateLimiter`]. The
419/// [`StreamStore`](crate::stream::StreamStore) methods are synchronous and
420/// drive their backends' async SDK calls on an internal runtime via `block_on`,
421/// so the store-to-store sync orchestrator parallelizes them across a **rayon**
422/// thread pool of plain OS threads — it cannot use the async [`RateLimiter`]
423/// (awaiting inside a `block_on`-ing rayon worker would nest tokio runtimes).
424/// [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) therefore parks
425/// the calling OS thread with [`std::thread::sleep`] instead of `.await`.
426///
427/// When `max_bytes_per_sec` is `None` (or `Some(0)`), the limiter is unlimited
428/// and [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) returns
429/// immediately. Otherwise tokens refill at `max_bytes_per_sec` per second,
430/// allowing a burst of up to ~1 second's worth of budget. The token math
431/// mirrors [`RateLimiter::acquire`] exactly.
432///
433/// The limiter is [`Arc`]-shareable and [`Clone`] (cloning shares the same
434/// underlying bucket), so every rayon worker throttles against one aggregate
435/// budget.
436#[derive(Debug, Clone)]
437pub struct BlockingRateLimiter {
438 inner: Arc<BlockingInner>,
439}
440
441impl BlockingRateLimiter {
442 /// Builds a synchronous limiter. `None` (or `Some(0)`) yields an unlimited,
443 /// no-op limiter whose
444 /// [`acquire_blocking`](BlockingRateLimiter::acquire_blocking) never waits.
445 #[must_use]
446 pub fn new(max_bytes_per_sec: Option<u64>) -> Self {
447 #[allow(clippy::cast_precision_loss)]
448 let (rate, capacity, tokens) = match max_bytes_per_sec {
449 Some(r) if r > 0 => {
450 let r = r as f64;
451 (r, r, r)
452 }
453 _ => (0.0, 0.0, 0.0),
454 };
455 Self {
456 inner: Arc::new(BlockingInner {
457 bucket: std::sync::Mutex::new(BlockingBucket {
458 rate,
459 capacity,
460 tokens,
461 last_refill: std::time::Instant::now(),
462 }),
463 }),
464 }
465 }
466
467 /// Retunes the limiter's aggregate byte-rate cap **live** (the synchronous
468 /// sibling of [`RateLimiter::set_rate`]). Same semantics: `None`/`Some(0)`
469 /// switches to unlimited and empties the bucket; `Some(r > 0)` installs a
470 /// bucket refilling at `r` bytes/sec (priming a full burst when switching
471 /// from unlimited).
472 pub fn set_rate(&self, bytes_per_sec: Option<u64>) {
473 let mut state = self
474 .inner
475 .bucket
476 .lock()
477 .unwrap_or_else(std::sync::PoisonError::into_inner);
478 let was_unlimited = state.rate <= 0.0;
479 #[allow(clippy::cast_precision_loss)]
480 match bytes_per_sec {
481 Some(r) if r > 0 => {
482 let r = r as f64;
483 state.rate = r;
484 state.capacity = r;
485 if was_unlimited {
486 state.tokens = r;
487 } else {
488 state.tokens = state.tokens.min(r);
489 }
490 state.last_refill = std::time::Instant::now();
491 }
492 _ => {
493 state.rate = 0.0;
494 state.capacity = 0.0;
495 state.tokens = 0.0;
496 }
497 }
498 }
499
500 /// Blocks the calling OS thread until `n` bytes of budget are available,
501 /// refilling the bucket at the configured rate. Unlimited limiters return
502 /// immediately.
503 ///
504 /// A single request larger than the bucket capacity is still satisfied: the
505 /// bucket is allowed to go negative and the caller waits out the deficit,
506 /// so throttling is correct even for objects bigger than one second's worth
507 /// of budget. Mirrors [`RateLimiter::acquire`], but parks the thread with
508 /// [`std::thread::sleep`] instead of awaiting.
509 pub fn acquire_blocking(&self, n: u64) {
510 if n == 0 {
511 return;
512 }
513 #[allow(clippy::cast_precision_loss)]
514 let need = n as f64;
515
516 loop {
517 let wait = {
518 // A poisoned bucket only means a thread panicked mid-acquire;
519 // the token state is still usable, so recover the guard.
520 let mut state = self
521 .inner
522 .bucket
523 .lock()
524 .unwrap_or_else(std::sync::PoisonError::into_inner);
525 if state.rate <= 0.0 {
526 return; // unlimited fast path (also covers live set_rate(None))
527 }
528 let now = std::time::Instant::now();
529 let elapsed = now.duration_since(state.last_refill).as_secs_f64();
530 state.tokens = (state.tokens + elapsed * state.rate).min(state.capacity);
531 state.last_refill = now;
532
533 if state.tokens >= need {
534 state.tokens -= need;
535 return;
536 }
537 // Not enough budget: compute how long until the deficit is
538 // covered, then sleep (releasing the lock first).
539 let deficit = need - state.tokens;
540 deficit / state.rate
541 };
542 std::thread::sleep(Duration::from_secs_f64(wait));
543 }
544 }
545}
546
547/// Runs `op` over `items` with at most `concurrency` operations in flight,
548/// collecting their results in completion-independent order and returning the
549/// first error encountered (remaining in-flight work is cancelled).
550///
551/// This is the engine later gates use to drive concurrent uploads/downloads.
552///
553/// # Errors
554///
555/// Returns the first [`StoreError`] produced by any operation.
556pub async fn run_concurrent<I, T, F, Fut>(
557 items: I,
558 concurrency: NonZeroUsize,
559 op: F,
560) -> Result<Vec<T>, StoreError>
561where
562 I: IntoIterator,
563 F: Fn(I::Item) -> Fut,
564 Fut: std::future::Future<Output = Result<T, StoreError>>,
565{
566 stream::iter(items)
567 .map(op)
568 .buffer_unordered(concurrency.get())
569 .try_collect()
570 .await
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576 use std::sync::atomic::{AtomicUsize, Ordering};
577
578 /// Builds a current-thread tokio runtime with time enabled, avoiding a
579 /// dependency on the `#[tokio::test]` macro (keeps tokio's feature set
580 /// minimal).
581 fn runtime() -> tokio::runtime::Runtime {
582 tokio::runtime::Builder::new_current_thread()
583 .enable_time()
584 .build()
585 .expect("build tokio runtime")
586 }
587
588 #[test]
589 fn transfer_config_default_caps_concurrency() {
590 let cfg = TransferConfig::default();
591 assert!(cfg.concurrency.get() >= 1, "concurrency must be >= 1");
592 assert!(
593 cfg.concurrency.get() <= DEFAULT_CONCURRENCY_CAP,
594 "default concurrency must be capped at {DEFAULT_CONCURRENCY_CAP}, got {}",
595 cfg.concurrency.get()
596 );
597 assert_eq!(cfg.max_bytes_per_sec, None);
598
599 // The clamping ctor never yields 0.
600 assert_eq!(TransferConfig::new(0, None).concurrency.get(), 1);
601 assert_eq!(TransferConfig::new(7, Some(99)).concurrency.get(), 7);
602 assert_eq!(TransferConfig::new(7, Some(99)).max_bytes_per_sec, Some(99));
603 }
604
605 /// Drives `run_concurrent` over N > concurrency items, recording the peak
606 /// number of simultaneously-running ops, and asserts the bound is exactly
607 /// `min(concurrency, N)` — and strictly 1 (sequential) when concurrency=1.
608 fn max_in_flight_for(concurrency: usize, items: usize) -> usize {
609 let in_flight = Arc::new(AtomicUsize::new(0));
610 let high_water = Arc::new(AtomicUsize::new(0));
611
612 let rt = runtime();
613 let result = rt.block_on(async {
614 let in_flight = Arc::clone(&in_flight);
615 let high_water = Arc::clone(&high_water);
616 run_concurrent(
617 0..items,
618 NonZeroUsize::new(concurrency).unwrap(),
619 move |_item| {
620 let in_flight = Arc::clone(&in_flight);
621 let high_water = Arc::clone(&high_water);
622 async move {
623 let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
624 high_water.fetch_max(cur, Ordering::SeqCst);
625 tokio::time::sleep(Duration::from_millis(20)).await;
626 in_flight.fetch_sub(1, Ordering::SeqCst);
627 Ok::<_, StoreError>(())
628 }
629 },
630 )
631 .await
632 });
633 assert!(result.is_ok());
634 high_water.load(Ordering::SeqCst)
635 }
636
637 #[test]
638 fn transfer_config_run_concurrent_max_in_flight() {
639 // concurrency=4 over 12 items: peak in-flight is exactly 4.
640 assert_eq!(max_in_flight_for(4, 12), 4);
641 // concurrency=1 over 5 items: strictly sequential, peak in-flight is 1.
642 assert_eq!(max_in_flight_for(1, 5), 1);
643 // concurrency greater than item count is bounded by the item count.
644 assert_eq!(max_in_flight_for(8, 3), 3);
645 }
646
647 #[test]
648 fn transfer_config_run_concurrent_propagates_error() {
649 let rt = runtime();
650 let result: Result<Vec<()>, StoreError> = rt.block_on(async {
651 run_concurrent(0..10, NonZeroUsize::new(3).unwrap(), |item| async move {
652 if item == 5 {
653 Err(StoreError::Backend {
654 message: "boom".to_owned(),
655 source: None,
656 })
657 } else {
658 tokio::time::sleep(Duration::from_millis(5)).await;
659 Ok(())
660 }
661 })
662 .await
663 });
664 let err = result.expect_err("must surface the failing op's error");
665 assert!(
666 matches!(err, StoreError::Backend { ref message, .. } if message == "boom"),
667 "unexpected error: {err:?}"
668 );
669 }
670
671 #[test]
672 fn sync_snapshot_blocking_rate_limiter() {
673 use std::time::Instant;
674
675 // Unlimited: acquiring a large amount returns essentially instantly.
676 let unlimited = BlockingRateLimiter::new(None);
677 let start = Instant::now();
678 unlimited.acquire_blocking(1_000_000);
679 assert!(
680 start.elapsed() < Duration::from_millis(200),
681 "unlimited acquire_blocking should not block"
682 );
683 // Some(0) is also unlimited.
684 let zero = BlockingRateLimiter::new(Some(0));
685 let start = Instant::now();
686 zero.acquire_blocking(1_000_000);
687 assert!(
688 start.elapsed() < Duration::from_millis(200),
689 "Some(0) acquire_blocking should not block"
690 );
691
692 // Limited to 1000 bytes/sec. The bucket starts full (1000), so the
693 // first 1000 bytes are free; acquiring another ~1000 bytes (2x the
694 // per-second budget in total) must wait for the deficit to refill —
695 // at least ~1s.
696 let limiter = BlockingRateLimiter::new(Some(1000));
697 let start = Instant::now();
698 limiter.acquire_blocking(1000); // drains the initial burst
699 limiter.acquire_blocking(1000); // must wait ~1s to refill
700 let elapsed = start.elapsed();
701 assert!(
702 elapsed >= Duration::from_millis(900),
703 "throttled acquire_blocking should take ~1s, took {elapsed:?}"
704 );
705 }
706
707 #[test]
708 fn transfer_config_rate_limiter_set_rate_live() {
709 let rt = runtime();
710 rt.block_on(async {
711 // Start unlimited: a huge acquire returns instantly.
712 let limiter = RateLimiter::new(None);
713 let start = tokio::time::Instant::now();
714 limiter.acquire(1_000_000).await;
715 assert!(
716 start.elapsed() < Duration::from_millis(200),
717 "unlimited acquire should not block before set_rate"
718 );
719
720 // Tighten to 1000 B/s live. The bucket is primed full (1000), so the
721 // first 1000 bytes are free; the next 1000 must wait ~1s to refill.
722 limiter.set_rate(Some(1000)).await;
723 let start = tokio::time::Instant::now();
724 limiter.acquire(1000).await; // drains the freshly-primed burst
725 limiter.acquire(1000).await; // must wait ~1s
726 let elapsed = start.elapsed();
727 assert!(
728 elapsed >= Duration::from_millis(900),
729 "after set_rate(Some(1000)) a 2x-budget acquire should take ~1s, took {elapsed:?}"
730 );
731
732 // Raise the cap back to unlimited live: acquires stop waiting again.
733 limiter.set_rate(None).await;
734 let start = tokio::time::Instant::now();
735 limiter.acquire(1_000_000).await;
736 assert!(
737 start.elapsed() < Duration::from_millis(200),
738 "after set_rate(None) acquire should no longer block"
739 );
740 });
741 }
742
743 #[test]
744 fn sync_snapshot_blocking_rate_limiter_set_rate_live() {
745 use std::time::Instant;
746
747 // Start unlimited.
748 let limiter = BlockingRateLimiter::new(None);
749 let start = Instant::now();
750 limiter.acquire_blocking(1_000_000);
751 assert!(
752 start.elapsed() < Duration::from_millis(200),
753 "unlimited acquire_blocking should not block before set_rate"
754 );
755
756 // Tighten live to 1000 B/s.
757 limiter.set_rate(Some(1000));
758 let start = Instant::now();
759 limiter.acquire_blocking(1000); // primed burst
760 limiter.acquire_blocking(1000); // waits ~1s
761 let elapsed = start.elapsed();
762 assert!(
763 elapsed >= Duration::from_millis(900),
764 "after set_rate(Some(1000)) a 2x-budget acquire should take ~1s, took {elapsed:?}"
765 );
766
767 // Back to unlimited live.
768 limiter.set_rate(Some(0));
769 let start = Instant::now();
770 limiter.acquire_blocking(1_000_000);
771 assert!(
772 start.elapsed() < Duration::from_millis(200),
773 "after set_rate(Some(0)) acquire_blocking should no longer block"
774 );
775 }
776
777 #[test]
778 fn classify_error_throttle_vs_hard() {
779 use crate::adaptive::OpResult;
780
781 // Backend errors whose message/source look like backpressure -> Throttle.
782 let transient_msgs = [
783 "S3 PUT object failed: SlowDown",
784 "got HTTP 503 Service Unavailable",
785 "rate limited: 429 Too Many Requests",
786 "RESOURCE_EXHAUSTED quota",
787 "request timeout while uploading",
788 "connection reset by peer",
789 "os error: too many open files",
790 ];
791 for msg in transient_msgs {
792 let err = StoreError::Backend {
793 message: msg.to_owned(),
794 source: None,
795 };
796 assert_eq!(
797 classify_error(&err),
798 OpResult::Throttle,
799 "expected Throttle for {msg:?}"
800 );
801 }
802
803 // Hard errors: NotFound / Integrity / Parse / ordinary backend failures.
804 let not_found = StoreError::ObjectNotFound {
805 checksum: "abc".to_owned(),
806 };
807 assert_eq!(classify_error(¬_found), OpResult::HardErr);
808 let integrity = StoreError::Integrity {
809 address: "x".to_owned(),
810 expected: "a".to_owned(),
811 actual: "b".to_owned(),
812 };
813 assert_eq!(classify_error(&integrity), OpResult::HardErr);
814 let other = StoreError::Backend {
815 message: "permission denied".to_owned(),
816 source: None,
817 };
818 assert_eq!(classify_error(&other), OpResult::HardErr);
819
820 // Local-FS backpressure errnos -> Throttle; a plain NotFound IO -> Hard.
821 let emfile = StoreError::Io(std::io::Error::other("too many open files (os error 24)"));
822 assert_eq!(classify_error(&emfile), OpResult::Throttle);
823 let would_block = StoreError::Io(std::io::Error::from(std::io::ErrorKind::WouldBlock));
824 assert_eq!(classify_error(&would_block), OpResult::Throttle);
825 let io_notfound = StoreError::Io(std::io::Error::from(std::io::ErrorKind::NotFound));
826 assert_eq!(classify_error(&io_notfound), OpResult::HardErr);
827 }
828
829 #[test]
830 fn run_adaptive_respects_gate_limit() {
831 use crate::adaptive::AdaptiveGate;
832 use std::sync::atomic::{AtomicUsize, Ordering};
833
834 let rt = runtime();
835 let gate = AdaptiveGate::new(2, 8);
836 let in_flight = Arc::new(AtomicUsize::new(0));
837 let high = Arc::new(AtomicUsize::new(0));
838 let in_flight2 = Arc::clone(&in_flight);
839 let high2 = Arc::clone(&high);
840
841 let result: Result<Vec<()>, StoreError> = rt.block_on(async move {
842 run_adaptive(0..20, &gate, move |_item| {
843 let in_flight = Arc::clone(&in_flight2);
844 let high = Arc::clone(&high2);
845 async move {
846 let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
847 high.fetch_max(cur, Ordering::SeqCst);
848 tokio::time::sleep(Duration::from_millis(15)).await;
849 in_flight.fetch_sub(1, Ordering::SeqCst);
850 Ok(())
851 }
852 })
853 .await
854 });
855 assert!(result.is_ok());
856 // Window is the ceiling (8) but the gate's live limit is 2, so peak
857 // effective concurrency never exceeds 2.
858 assert!(
859 high.load(Ordering::SeqCst) <= 2,
860 "effective concurrency must be gated to the limit, got {}",
861 high.load(Ordering::SeqCst)
862 );
863 }
864
865 #[test]
866 fn transfer_config_rate_limiter() {
867 let rt = runtime();
868 rt.block_on(async {
869 // Unlimited: acquiring a large amount returns essentially instantly.
870 let unlimited = RateLimiter::new(None);
871 let start = tokio::time::Instant::now();
872 unlimited.acquire(1_000_000).await;
873 assert!(
874 start.elapsed() < Duration::from_millis(200),
875 "unlimited acquire should not block"
876 );
877
878 // Limited to 1000 bytes/sec. The bucket starts full (1000), so the
879 // first 1000 bytes are free; acquiring another ~2000 bytes total
880 // must wait for the deficit to refill — at least ~1s.
881 let limiter = RateLimiter::new(Some(1000));
882 let start = tokio::time::Instant::now();
883 limiter.acquire(1000).await; // drains the initial burst
884 limiter.acquire(1000).await; // must wait ~1s to refill
885 let elapsed = start.elapsed();
886 assert!(
887 elapsed >= Duration::from_millis(900),
888 "throttled acquire should take ~1s, took {elapsed:?}"
889 );
890 });
891 }
892}