1use serde::{Deserialize, Serialize};
44use std::collections::VecDeque;
45use std::path::{Path, PathBuf};
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::sync::{Arc, Mutex, PoisonError};
48use std::time::{Duration, Instant};
49use tracing::{debug, warn};
50
51static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
55
56fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
59 m.lock().unwrap_or_else(PoisonError::into_inner)
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum Outcome {
65 Success,
67 Timeout,
69 NetworkError,
71 ApplicationError,
75}
76
77#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
81pub struct ChannelMax {
82 pub quote: usize,
83 pub store: usize,
84 pub fetch: usize,
85}
86
87impl Default for ChannelMax {
88 fn default() -> Self {
89 Self {
94 quote: 128,
95 store: 64,
96 fetch: 256,
97 }
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct AdaptiveConfig {
107 pub enabled: bool,
110 pub min_concurrency: usize,
112 pub max: ChannelMax,
114 pub window_ops: usize,
117 pub min_window_ops: usize,
119 pub success_target: f64,
122 pub timeout_ceiling: f64,
125 pub latency_inflation_factor: f64,
128 pub latency_ewma_alpha: f64,
133}
134
135impl AdaptiveConfig {
136 pub fn sanitize(&mut self) {
142 if !self.latency_ewma_alpha.is_finite() {
143 self.latency_ewma_alpha = 0.2;
144 }
145 self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
146 if !self.success_target.is_finite() {
147 self.success_target = 0.95;
148 }
149 self.success_target = self.success_target.clamp(0.0, 1.0);
150 if !self.timeout_ceiling.is_finite() {
151 self.timeout_ceiling = 0.10;
152 }
153 self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
154 if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
155 self.latency_inflation_factor = 2.0;
156 }
157 self.min_concurrency = self.min_concurrency.max(1);
158 self.window_ops = self.window_ops.max(1);
159 self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
160 self.max.quote = self.max.quote.max(self.min_concurrency);
161 self.max.store = self.max.store.max(self.min_concurrency);
162 self.max.fetch = self.max.fetch.max(self.min_concurrency);
163 }
164}
165
166impl Default for AdaptiveConfig {
167 fn default() -> Self {
168 Self {
169 enabled: true,
170 min_concurrency: 1,
171 max: ChannelMax::default(),
172 window_ops: 32,
173 min_window_ops: 8,
174 success_target: 0.95,
175 timeout_ceiling: 0.10,
176 latency_inflation_factor: 2.0,
177 latency_ewma_alpha: 0.2,
178 }
179 }
180}
181
182#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
194pub struct ChannelStart {
195 pub quote: usize,
196 pub store: usize,
197 pub fetch: usize,
198}
199
200impl Default for ChannelStart {
201 fn default() -> Self {
202 Self {
203 quote: 32,
204 store: 8,
205 fetch: 64,
206 }
207 }
208}
209
210#[derive(Debug, Clone, Copy)]
212struct Sample {
213 outcome: Outcome,
214 latency: Duration,
215}
216
217#[derive(Debug, Clone)]
223pub struct LimiterConfig {
224 pub enabled: bool,
225 pub min_concurrency: usize,
226 pub max_concurrency: usize,
227 pub window_ops: usize,
228 pub min_window_ops: usize,
229 pub success_target: f64,
230 pub timeout_ceiling: f64,
231 pub latency_inflation_factor: f64,
232 pub latency_ewma_alpha: f64,
233}
234
235impl LimiterConfig {
236 fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
237 Self {
238 enabled: cfg.enabled,
239 min_concurrency: cfg.min_concurrency,
240 max_concurrency: max_for_channel.max(cfg.min_concurrency),
241 window_ops: cfg.window_ops,
242 min_window_ops: cfg.min_window_ops,
243 success_target: cfg.success_target,
244 timeout_ceiling: cfg.timeout_ceiling,
245 latency_inflation_factor: cfg.latency_inflation_factor,
246 latency_ewma_alpha: cfg.latency_ewma_alpha,
247 }
248 }
249
250 fn sanitize(&mut self) {
256 if !self.latency_ewma_alpha.is_finite() {
257 self.latency_ewma_alpha = 0.2;
258 }
259 self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
260 if !self.success_target.is_finite() {
261 self.success_target = 0.95;
262 }
263 self.success_target = self.success_target.clamp(0.0, 1.0);
264 if !self.timeout_ceiling.is_finite() {
265 self.timeout_ceiling = 0.10;
266 }
267 self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
268 if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
269 self.latency_inflation_factor = 2.0;
270 }
271 self.min_concurrency = self.min_concurrency.max(1);
272 self.window_ops = self.window_ops.max(1);
273 self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
274 self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
275 }
276}
277
278#[derive(Debug, Clone)]
284pub struct Limiter {
285 inner: Arc<Mutex<LimiterInner>>,
286 config: Arc<LimiterConfig>,
287}
288
289#[derive(Debug)]
290struct LimiterInner {
291 current: usize,
293 window: VecDeque<Sample>,
295 samples_since_increase: usize,
299 samples_since_decrease: usize,
304 latency_baseline: Option<Duration>,
307 left_slow_start: bool,
310}
311
312impl Limiter {
313 #[must_use]
318 pub fn new(start: usize, config: LimiterConfig) -> Self {
319 let mut config = config;
320 config.sanitize();
321 let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
322 let window_cap = config.window_ops;
323 Self {
324 inner: Arc::new(Mutex::new(LimiterInner {
325 current: clamped,
326 window: VecDeque::with_capacity(window_cap),
327 samples_since_increase: 0,
328 samples_since_decrease: 0,
329 latency_baseline: None,
330 left_slow_start: false,
331 })),
332 config: Arc::new(config),
333 }
334 }
335
336 #[must_use]
340 pub fn current(&self) -> usize {
341 lock(&self.inner).current
342 }
343
344 pub fn observe(&self, outcome: Outcome, latency: Duration) {
347 if !self.config.enabled {
348 return;
349 }
350 let mut g = lock(&self.inner);
351 if g.window.len() == self.config.window_ops {
352 g.window.pop_front();
353 }
354 g.window.push_back(Sample { outcome, latency });
355 g.samples_since_increase = g.samples_since_increase.saturating_add(1);
356 g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
357 if g.window.len() < self.config.min_window_ops {
358 return;
359 }
360 let decision = evaluate(&g.window, &self.config, g.latency_baseline);
361 apply_decision(&mut g, decision, &self.config);
362 }
363
364 pub fn warm_start(&self, start: usize) {
372 let clamped = start.clamp(
373 self.config.min_concurrency,
374 self.config.max_concurrency.max(1),
375 );
376 let mut g = lock(&self.inner);
377 g.current = clamped;
378 g.left_slow_start = true;
379 }
380
381 #[must_use]
383 pub fn snapshot(&self) -> usize {
384 lock(&self.inner).current
385 }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
390enum Decision {
391 Increase,
393 Decrease,
395 Hold,
397}
398
399fn evaluate(
400 window: &VecDeque<Sample>,
401 cfg: &LimiterConfig,
402 baseline: Option<Duration>,
403) -> Decision {
404 let mut successes = 0usize;
409 let mut timeouts = 0usize;
410 let mut net_errors = 0usize;
411 let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
412 for s in window {
413 match s.outcome {
414 Outcome::Success => {
415 successes += 1;
416 latencies.push(s.latency);
417 }
418 Outcome::Timeout => timeouts += 1,
419 Outcome::NetworkError => net_errors += 1,
420 Outcome::ApplicationError => {}
421 }
422 }
423 let capacity_total = successes + timeouts + net_errors;
424 if capacity_total < cfg.min_window_ops {
425 return Decision::Hold;
427 }
428 let total_f = capacity_total as f64;
429 let success_rate = successes as f64 / total_f;
430 let timeout_rate = timeouts as f64 / total_f;
431
432 if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
433 return Decision::Decrease;
434 }
435
436 if let Some(p95) = p95_of(&mut latencies) {
437 if let Some(base) = baseline {
438 let limit = base.mul_f64(cfg.latency_inflation_factor);
439 if p95 > limit {
440 return Decision::Decrease;
441 }
442 }
443 Decision::Increase
444 } else {
445 Decision::Hold
446 }
447}
448
449fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
450 match decision {
451 Decision::Increase => {
452 if inner.samples_since_increase < cfg.window_ops {
455 return;
456 }
457 let p95 = window_p95(&inner.window);
458 inner.latency_baseline = Some(match inner.latency_baseline {
459 None => p95,
460 Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
461 });
462 let next = if inner.left_slow_start {
463 inner.current.saturating_add(1)
464 } else {
465 inner.current.saturating_mul(2)
466 };
467 let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
468 if next != inner.current {
469 debug!(
470 from = inner.current,
471 to = next,
472 slow_start = !inner.left_slow_start,
473 "adaptive: increase",
474 );
475 }
476 inner.current = next;
477 inner.samples_since_increase = 0;
478 inner.samples_since_decrease = 0;
479 }
480 Decision::Decrease => {
481 if inner.samples_since_decrease < cfg.min_window_ops {
486 return;
487 }
488 inner.left_slow_start = true;
489 let next = (inner.current / 2).max(cfg.min_concurrency);
490 if next != inner.current {
491 debug!(from = inner.current, to = next, "adaptive: decrease");
492 }
493 inner.current = next;
494 inner.samples_since_increase = 0;
495 inner.samples_since_decrease = 0;
496 }
497 Decision::Hold => {}
498 }
499}
500
501fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
505 if latencies.is_empty() {
506 return None;
507 }
508 latencies.sort_unstable();
509 let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
510 let idx = idx.saturating_sub(1).min(latencies.len() - 1);
511 latencies.get(idx).copied()
512}
513
514fn window_p95(window: &VecDeque<Sample>) -> Duration {
515 let mut latencies: Vec<Duration> = window
516 .iter()
517 .filter(|s| matches!(s.outcome, Outcome::Success))
518 .map(|s| s.latency)
519 .collect();
520 p95_of(&mut latencies).unwrap_or(Duration::ZERO)
521}
522
523fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
524 let alpha = if alpha.is_finite() {
525 alpha.clamp(0.0, 1.0)
526 } else {
527 return prev;
528 };
529 let prev_ms = prev.as_secs_f64() * 1000.0;
530 let sample_ms = sample.as_secs_f64() * 1000.0;
531 let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
532 if !new_ms.is_finite() || new_ms < 0.0 {
533 return prev;
534 }
535 Duration::from_secs_f64(new_ms / 1000.0)
536}
537
538#[derive(Debug, Clone)]
540pub struct AdaptiveController {
541 pub quote: Limiter,
542 pub store: Limiter,
543 pub fetch: Limiter,
544 pub(crate) config: AdaptiveConfig,
551 cold_start: ChannelStart,
557}
558
559impl AdaptiveController {
560 #[must_use]
565 pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
566 let mut config = config;
567 config.sanitize();
568 let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
569 let store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
570 let fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
571 Self {
572 quote: Limiter::new(start.quote, quote_cfg),
573 store: Limiter::new(start.store, store_cfg),
574 fetch: Limiter::new(start.fetch, fetch_cfg),
575 config,
576 cold_start: start,
577 }
578 }
579
580 #[must_use]
582 pub fn snapshot(&self) -> ChannelStart {
583 ChannelStart {
584 quote: self.quote.snapshot(),
585 store: self.store.snapshot(),
586 fetch: self.fetch.snapshot(),
587 }
588 }
589
590 #[must_use]
596 pub fn config(&self) -> &AdaptiveConfig {
597 &self.config
598 }
599
600 pub fn warm_start(&self, snapshot: ChannelStart) {
614 if !self.config.enabled {
615 return;
616 }
617 self.quote
618 .warm_start(snapshot.quote.max(self.cold_start.quote));
619 self.store
620 .warm_start(snapshot.store.max(self.cold_start.store));
621 self.fetch
622 .warm_start(snapshot.fetch.max(self.cold_start.fetch));
623 }
624}
625
626impl Default for AdaptiveController {
627 fn default() -> Self {
628 Self::new(ChannelStart::default(), AdaptiveConfig::default())
629 }
630}
631
632struct ObserveGuard<'a> {
640 limiter: &'a Limiter,
641 started: Instant,
642 outcome: Option<(Outcome, Duration)>,
643}
644
645impl<'a> ObserveGuard<'a> {
646 fn new(limiter: &'a Limiter) -> Self {
647 Self {
648 limiter,
649 started: Instant::now(),
650 outcome: None,
651 }
652 }
653 fn finish(&mut self, outcome: Outcome) {
654 self.outcome = Some((outcome, self.started.elapsed()));
655 }
656}
657
658impl Drop for ObserveGuard<'_> {
659 fn drop(&mut self) {
660 if let Some((outcome, latency)) = self.outcome.take() {
661 self.limiter.observe(outcome, latency);
662 }
663 }
664}
665
666pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
681where
682 F: FnOnce() -> Fut,
683 Fut: std::future::Future<Output = Result<T, E>>,
684 C: FnOnce(&E) -> Outcome,
685{
686 let mut guard = ObserveGuard::new(limiter);
687 let result = op().await;
688 let outcome = match &result {
689 Ok(_) => Outcome::Success,
690 Err(e) => classify(e),
691 };
692 guard.finish(outcome);
693 drop(guard); result
695}
696
697pub async fn rebucketed_unordered<I, T, E, F, Fut>(
712 limiter: &Limiter,
713 items: I,
714 mut op: F,
715) -> Result<Vec<T>, E>
716where
717 I: IntoIterator,
718 F: FnMut(I::Item) -> Fut,
719 Fut: std::future::Future<Output = Result<T, E>>,
720{
721 use futures::stream::{FuturesUnordered, StreamExt};
722 let mut iter = items.into_iter().peekable();
723 let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
724 let mut results = Vec::new();
725 let mut pending_err: Option<E> = None;
726 loop {
727 if pending_err.is_none() {
730 let cap = limiter.current().max(1);
731 while in_flight.len() < cap {
732 match iter.next() {
733 Some(item) => in_flight.push(op(item)),
734 None => break,
735 }
736 }
737 }
738 if in_flight.is_empty() {
739 break;
740 }
741 match in_flight.next().await {
742 Some(Ok(v)) => results.push(v),
743 Some(Err(e)) => {
744 if pending_err.is_none() {
745 pending_err = Some(e);
746 }
747 }
748 None => break,
749 }
750 }
751 match pending_err {
752 Some(e) => Err(e),
753 None => Ok(results),
754 }
755}
756
757pub async fn rebucketed_ordered<I, U, E, F, Fut>(
770 limiter: &Limiter,
771 items: I,
772 op: F,
773) -> Result<Vec<U>, E>
774where
775 I: IntoIterator,
776 F: FnMut(I::Item) -> Fut,
777 Fut: std::future::Future<Output = Result<(usize, U), E>>,
778{
779 let mut indexed = rebucketed_unordered(limiter, items, op).await?;
780 indexed.sort_by_key(|(idx, _)| *idx);
781 Ok(indexed.into_iter().map(|(_, v)| v).collect())
782}
783
784pub async fn rebucketed<I, T, E, F, Fut>(
790 limiter: &Limiter,
791 items: I,
792 ordered: bool,
793 mut op: F,
794) -> Result<Vec<T>, E>
795where
796 I: IntoIterator,
797 F: FnMut(I::Item) -> Fut,
798 Fut: std::future::Future<Output = Result<T, E>>,
799{
800 if !ordered {
801 return rebucketed_unordered(limiter, items, op).await;
802 }
803 use futures::stream::{self, StreamExt};
804 let mut iter = items.into_iter();
805 let mut results = Vec::new();
806 let mut pending_err: Option<E> = None;
807 loop {
808 if pending_err.is_some() {
809 break;
810 }
811 let cap = limiter.current().max(1);
812 let mut batch = Vec::with_capacity(cap);
813 for item in iter.by_ref().take(cap) {
814 batch.push(op(item));
815 }
816 if batch.is_empty() {
817 break;
818 }
819 let mut s = stream::iter(batch).buffered(cap);
820 while let Some(r) = s.next().await {
821 match r {
822 Ok(v) => results.push(v),
823 Err(e) => {
824 if pending_err.is_none() {
825 pending_err = Some(e);
826 }
827 }
828 }
829 }
830 }
831 match pending_err {
832 Some(e) => Err(e),
833 None => Ok(results),
834 }
835}
836
837#[derive(Debug, Clone, Serialize, Deserialize)]
842struct PersistedState {
843 schema: u32,
844 channels: ChannelStart,
845}
846
847const PERSIST_SCHEMA: u32 = 1;
848const PERSIST_FILENAME: &str = "client_adaptive.json";
849
850#[must_use]
854pub fn default_persist_path() -> Option<PathBuf> {
855 crate::config::data_dir()
856 .ok()
857 .map(|d| d.join(PERSIST_FILENAME))
858}
859
860#[must_use]
866pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
867 let bytes = std::fs::read(path).ok()?;
868 let state: PersistedState = match serde_json::from_slice(&bytes) {
869 Ok(s) => s,
870 Err(e) => {
871 warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
872 return None;
873 }
874 };
875 if state.schema != PERSIST_SCHEMA {
876 debug!(
877 path = %path.display(),
878 schema = state.schema,
879 expected = PERSIST_SCHEMA,
880 "adaptive: snapshot schema mismatch, ignoring",
881 );
882 return None;
883 }
884 Some(state.channels)
885}
886
887pub fn save_snapshot(path: &Path, channels: ChannelStart) {
890 let state = PersistedState {
891 schema: PERSIST_SCHEMA,
892 channels,
893 };
894 let bytes = match serde_json::to_vec_pretty(&state) {
895 Ok(b) => b,
896 Err(e) => {
897 warn!(error = %e, "adaptive: snapshot serialize failed");
898 return;
899 }
900 };
901 if let Some(parent) = path.parent() {
902 if let Err(e) = std::fs::create_dir_all(parent) {
903 warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
904 return;
905 }
906 }
907 let nanos = std::time::SystemTime::now()
914 .duration_since(std::time::UNIX_EPOCH)
915 .map(|d| d.subsec_nanos())
916 .unwrap_or(0);
917 let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
918 let tmp = path.with_extension(format!(
919 "json.tmp.{}.{}.{}",
920 std::process::id(),
921 counter,
922 nanos
923 ));
924 if let Err(e) = std::fs::write(&tmp, &bytes) {
925 warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
926 return;
927 }
928 if let Err(e) = std::fs::rename(&tmp, path) {
929 warn!(
930 from = %tmp.display(),
931 to = %path.display(),
932 error = %e,
933 "adaptive: snapshot rename failed",
934 );
935 let _ = std::fs::remove_file(&tmp);
938 }
939}
940
941pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
951 let handle = std::thread::spawn(move || {
952 save_snapshot(&path, channels);
953 });
954 let started = Instant::now();
958 let poll = Duration::from_millis(5);
959 while started.elapsed() < timeout {
960 if handle.is_finished() {
961 let _ = handle.join();
962 return;
963 }
964 std::thread::sleep(poll);
965 }
966 warn!(
970 timeout_ms = timeout.as_millis() as u64,
971 "adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
972 );
973 drop(handle);
974}
975
976#[cfg(test)]
977#[allow(clippy::unwrap_used)]
978mod tests {
979 use super::*;
980
981 fn cfg_for_tests() -> LimiterConfig {
982 LimiterConfig {
983 enabled: true,
984 min_concurrency: 1,
985 max_concurrency: 64,
986 window_ops: 10,
987 min_window_ops: 5,
988 success_target: 0.9,
989 timeout_ceiling: 0.2,
990 latency_inflation_factor: 2.0,
991 latency_ewma_alpha: 0.5,
992 }
993 }
994
995 fn adaptive_cfg_for_tests() -> AdaptiveConfig {
1000 let l = cfg_for_tests();
1001 AdaptiveConfig {
1002 enabled: l.enabled,
1003 min_concurrency: l.min_concurrency,
1004 max: ChannelMax {
1005 quote: l.max_concurrency,
1006 store: l.max_concurrency,
1007 fetch: l.max_concurrency,
1008 },
1009 window_ops: l.window_ops,
1010 min_window_ops: l.min_window_ops,
1011 success_target: l.success_target,
1012 timeout_ceiling: l.timeout_ceiling,
1013 latency_inflation_factor: l.latency_inflation_factor,
1014 latency_ewma_alpha: l.latency_ewma_alpha,
1015 }
1016 }
1017
1018 #[test]
1019 fn cold_start_clamps_into_bounds() {
1020 let cfg = cfg_for_tests();
1021 let l = Limiter::new(1000, cfg.clone());
1022 assert_eq!(l.current(), cfg.max_concurrency);
1023 let l = Limiter::new(0, cfg.clone());
1024 assert_eq!(l.current(), cfg.min_concurrency);
1025 }
1026
1027 #[test]
1028 fn slow_start_doubles_then_caps() {
1029 let cfg = cfg_for_tests();
1030 let l = Limiter::new(2, cfg.clone());
1031 for _ in 0..cfg.window_ops {
1033 l.observe(Outcome::Success, Duration::from_millis(50));
1034 }
1035 assert_eq!(l.current(), 4);
1036 for _ in 0..cfg.window_ops {
1037 l.observe(Outcome::Success, Duration::from_millis(50));
1038 }
1039 assert_eq!(l.current(), 8);
1040 }
1041
1042 #[test]
1043 fn first_failure_exits_slow_start() {
1044 let cfg = cfg_for_tests();
1045 let l = Limiter::new(4, cfg.clone());
1046 for _ in 0..6 {
1050 l.observe(Outcome::Success, Duration::from_millis(50));
1051 }
1052 for _ in 0..4 {
1053 l.observe(Outcome::Timeout, Duration::from_millis(50));
1054 }
1055 let after_stress = l.current();
1056 assert!(
1057 after_stress < 4,
1058 "stress should reduce concurrency from 4, got {after_stress}",
1059 );
1060 for _ in 0..(cfg.window_ops * 5) {
1068 l.observe(Outcome::Success, Duration::from_millis(50));
1069 }
1070 assert!(
1071 l.current() > after_stress,
1072 "expected recovery above {after_stress}, got {}",
1073 l.current(),
1074 );
1075 }
1076
1077 #[test]
1078 fn floor_holds_at_one() {
1079 let cfg = cfg_for_tests();
1080 let l = Limiter::new(2, cfg);
1081 for _ in 0..30 {
1082 l.observe(Outcome::Timeout, Duration::from_millis(50));
1083 }
1084 assert_eq!(l.current(), 1);
1085 }
1086
1087 #[test]
1088 fn application_errors_do_not_punish() {
1089 let cfg = cfg_for_tests();
1090 let l = Limiter::new(4, cfg.clone());
1091 for _ in 0..cfg.window_ops * 5 {
1098 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
1099 }
1100 assert_eq!(
1101 l.current(),
1102 4,
1103 "ApplicationError must not move the cap; got {}",
1104 l.current()
1105 );
1106 }
1107
1108 #[test]
1109 fn latency_inflation_triggers_decrease() {
1110 let cfg = LimiterConfig {
1111 window_ops: 20,
1112 min_window_ops: 5,
1113 ..cfg_for_tests()
1114 };
1115 let l = Limiter::new(4, cfg.clone());
1116 for _ in 0..cfg.window_ops {
1118 l.observe(Outcome::Success, Duration::from_millis(50));
1119 }
1120 let after_baseline = l.current();
1121 for _ in 0..cfg.window_ops {
1123 l.observe(Outcome::Success, Duration::from_millis(500));
1124 }
1125 assert!(
1127 l.current() < after_baseline,
1128 "expected decrease from {after_baseline}, got {}",
1129 l.current(),
1130 );
1131 }
1132
1133 #[test]
1134 fn warm_start_overrides_current() {
1135 let cfg = cfg_for_tests();
1136 let l = Limiter::new(2, cfg);
1137 l.warm_start(20);
1138 assert_eq!(l.current(), 20);
1139 }
1140
1141 #[test]
1142 fn warm_start_clamps() {
1143 let cfg = cfg_for_tests();
1144 let l = Limiter::new(2, cfg.clone());
1145 l.warm_start(1_000_000);
1146 assert_eq!(l.current(), cfg.max_concurrency);
1147 }
1148
1149 #[test]
1150 fn disabled_controller_holds_steady() {
1151 let cfg = LimiterConfig {
1152 enabled: false,
1153 ..cfg_for_tests()
1154 };
1155 let l = Limiter::new(8, cfg);
1156 for _ in 0..50 {
1157 l.observe(Outcome::Timeout, Duration::from_millis(50));
1158 }
1159 assert_eq!(l.current(), 8);
1160 }
1161
1162 #[test]
1163 fn controller_snapshot_round_trips() {
1164 let c = AdaptiveController::new(
1170 ChannelStart {
1171 quote: 64,
1172 store: 16,
1173 fetch: 64,
1174 },
1175 adaptive_cfg_for_tests(),
1176 );
1177 let snap = c.snapshot();
1178 assert_eq!(snap.quote, 64);
1179 assert_eq!(snap.store, 16);
1180 assert_eq!(snap.fetch, 64);
1181
1182 let c2 = AdaptiveController::default();
1183 c2.warm_start(snap);
1184 assert_eq!(c2.quote.current(), 64);
1185 assert_eq!(c2.store.current(), 16);
1186 assert_eq!(c2.fetch.current(), 64);
1187 }
1188
1189 #[tokio::test]
1190 async fn observe_op_records_success() {
1191 let cfg = cfg_for_tests();
1192 let l = Limiter::new(4, cfg.clone());
1193 for _ in 0..cfg.window_ops {
1194 let _: Result<(), &str> =
1195 observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
1196 }
1197 assert_eq!(l.current(), 8);
1199 }
1200
1201 #[test]
1202 fn snapshot_round_trips_through_disk() {
1203 let dir = tempfile::tempdir().unwrap();
1204 let path = dir.path().join("client_adaptive.json");
1205 let snap = ChannelStart {
1206 quote: 24,
1207 store: 6,
1208 fetch: 12,
1209 };
1210 save_snapshot(&path, snap);
1211 let loaded = load_snapshot(&path).unwrap();
1212 assert_eq!(loaded.quote, 24);
1213 assert_eq!(loaded.store, 6);
1214 assert_eq!(loaded.fetch, 12);
1215 }
1216
1217 #[test]
1218 fn load_missing_returns_none() {
1219 let dir = tempfile::tempdir().unwrap();
1220 let path = dir.path().join("does_not_exist.json");
1221 assert!(load_snapshot(&path).is_none());
1222 }
1223
1224 #[test]
1225 fn load_corrupt_returns_none() {
1226 let dir = tempfile::tempdir().unwrap();
1227 let path = dir.path().join("bad.json");
1228 std::fs::write(&path, b"not valid json{{{").unwrap();
1229 assert!(load_snapshot(&path).is_none());
1230 }
1231
1232 #[test]
1233 fn load_wrong_schema_returns_none() {
1234 let dir = tempfile::tempdir().unwrap();
1235 let path = dir.path().join("future.json");
1236 let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
1239 std::fs::write(&path, payload).unwrap();
1240 assert!(load_snapshot(&path).is_none());
1241 }
1242
1243 #[tokio::test]
1244 async fn observe_op_records_classified_error() {
1245 let cfg = cfg_for_tests();
1246 let l = Limiter::new(4, cfg.clone());
1247 for _ in 0..cfg.window_ops {
1248 let _: Result<(), &str> =
1249 observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
1250 }
1251 assert!(l.current() < 4);
1252 }
1253
1254 #[test]
1264 fn no_regression_cold_start_at_least_static_defaults() {
1265 let s = ChannelStart::default();
1266 assert!(
1267 s.quote >= 32,
1268 "quote cold-start regressed: got {}, prior static was 32",
1269 s.quote,
1270 );
1271 assert!(
1272 s.store >= 8,
1273 "store cold-start regressed: got {}, prior static was 8",
1274 s.store,
1275 );
1276 assert!(
1277 s.fetch >= 64,
1278 "fetch cold-start regressed: got {}, prior static was 64 (unbounded before)",
1279 s.fetch,
1280 );
1281 }
1282
1283 #[test]
1287 fn controller_default_config_is_sane() {
1288 let c = AdaptiveController::default();
1289 let starts = ChannelStart::default();
1290 assert_eq!(c.quote.current(), starts.quote);
1291 assert_eq!(c.store.current(), starts.store);
1292 assert_eq!(c.fetch.current(), starts.fetch);
1293 assert_eq!(lock(&c.quote.inner).window.len(), 0);
1295 assert_eq!(lock(&c.store.inner).window.len(), 0);
1296 assert_eq!(lock(&c.fetch.inner).window.len(), 0);
1297 }
1298
1299 #[test]
1303 fn alternating_success_failure_collapses_to_floor() {
1304 let cfg = cfg_for_tests();
1310 let l = Limiter::new(8, cfg.clone());
1311 let mut min_observed = usize::MAX;
1312 let mut max_observed = 0usize;
1313 let mut floor_visits = 0usize;
1314 for i in 0..1000 {
1315 let outcome = if i % 2 == 0 {
1316 Outcome::Success
1317 } else {
1318 Outcome::Timeout
1319 };
1320 l.observe(outcome, Duration::from_millis(50));
1321 let cur = l.current();
1322 assert!(
1323 cur >= cfg.min_concurrency,
1324 "cap underflowed floor at iter {i}: got {cur}",
1325 );
1326 min_observed = min_observed.min(cur);
1327 max_observed = max_observed.max(cur);
1328 if cur == cfg.min_concurrency {
1329 floor_visits += 1;
1330 }
1331 }
1332 assert_eq!(
1333 min_observed, cfg.min_concurrency,
1334 "cap never reached the floor under 50% timeout rate"
1335 );
1336 assert!(
1337 max_observed >= 8,
1338 "cap never visited the start value: max_observed={max_observed}"
1339 );
1340 assert!(
1344 floor_visits > 500,
1345 "cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
1346 );
1347 assert_eq!(
1348 l.current(),
1349 cfg.min_concurrency,
1350 "controller did not settle at floor after 1000 alternations"
1351 );
1352 }
1353
1354 #[test]
1358 fn pure_success_stream_recovers_to_max() {
1359 let cfg = cfg_for_tests();
1360 let l = Limiter::new(cfg.min_concurrency, cfg.clone());
1361 for _ in 0..10_000 {
1362 l.observe(Outcome::Success, Duration::from_millis(5));
1363 }
1364 assert_eq!(
1365 l.current(),
1366 cfg.max_concurrency,
1367 "expected recovery to max ({}), got {}",
1368 cfg.max_concurrency,
1369 l.current(),
1370 );
1371 }
1372
1373 #[test]
1377 fn stress_then_heal_drives_floor_then_recovery() {
1378 let cfg = cfg_for_tests();
1379 let l = Limiter::new(8, cfg.clone());
1380 for _ in 0..100 {
1381 l.observe(Outcome::Timeout, Duration::from_millis(50));
1382 }
1383 let after_stress = l.current();
1384 assert_eq!(
1385 after_stress, cfg.min_concurrency,
1386 "stress should drive cap to floor, got {after_stress}",
1387 );
1388 for _ in 0..1_000 {
1389 l.observe(Outcome::Success, Duration::from_millis(10));
1390 }
1391 let after_heal = l.current();
1392 assert!(
1393 after_heal >= cfg.min_concurrency.saturating_add(4),
1394 "expected substantial recovery from floor, got {after_heal}",
1395 );
1396 }
1397
1398 #[test]
1402 fn baseline_does_not_grow_unbounded_under_slow_links() {
1403 let cfg = cfg_for_tests();
1404 let l = Limiter::new(2, cfg.clone());
1405 for _ in 0..(cfg.window_ops * 10) {
1406 l.observe(Outcome::Success, Duration::from_millis(500));
1407 }
1408 let baseline = lock(&l.inner).latency_baseline;
1409 let base = baseline.expect("baseline should be set after many healthy windows");
1410 assert!(
1411 base > Duration::ZERO,
1412 "baseline must not stay at ZERO, got {base:?}",
1413 );
1414 let lo = Duration::from_millis(250);
1416 let hi = Duration::from_millis(1000);
1417 assert!(
1418 base >= lo && base <= hi,
1419 "baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
1420 );
1421 }
1422
1423 #[test]
1428 fn baseline_initialized_only_after_first_healthy_window() {
1429 let cfg = cfg_for_tests();
1430 let l = Limiter::new(8, cfg.clone());
1431 for _ in 0..50 {
1432 l.observe(Outcome::Timeout, Duration::from_millis(50));
1433 }
1434 assert!(
1436 lock(&l.inner).latency_baseline.is_none(),
1437 "baseline must be None before any healthy window",
1438 );
1439 for _ in 0..(cfg.window_ops * 5) {
1441 l.observe(Outcome::Success, Duration::from_millis(20));
1442 }
1443 let baseline = lock(&l.inner).latency_baseline;
1444 assert!(
1445 baseline.is_some(),
1446 "baseline must be Some after healthy windows",
1447 );
1448 let base = baseline.unwrap_or_default();
1449 assert!(
1450 base > Duration::ZERO,
1451 "baseline must reflect real latency, got {base:?}",
1452 );
1453 }
1454
1455 #[test]
1458 fn min_concurrency_floor_holds_under_torrent_of_errors() {
1459 let cfg = cfg_for_tests();
1460 let l = Limiter::new(8, cfg.clone());
1461 for i in 0..50_000 {
1462 l.observe(Outcome::Timeout, Duration::from_millis(50));
1463 if i == 100 || i == 1_000 || i == 49_999 {
1464 let cur = l.current();
1465 assert_eq!(
1466 cur, cfg.min_concurrency,
1467 "floor breached at iter {i}: got {cur}",
1468 );
1469 }
1470 }
1471 }
1472
1473 #[test]
1475 fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
1476 let cfg = cfg_for_tests();
1477 let start = cfg
1478 .max_concurrency
1479 .saturating_sub(1)
1480 .max(cfg.min_concurrency);
1481 let l = Limiter::new(start, cfg.clone());
1482 for i in 0..50_000 {
1483 l.observe(Outcome::Success, Duration::from_millis(5));
1484 if i == 100 || i == 1_000 || i == 49_999 {
1485 let cur = l.current();
1486 assert!(
1487 cur <= cfg.max_concurrency,
1488 "ceiling breached at iter {i}: got {cur} > {}",
1489 cfg.max_concurrency,
1490 );
1491 }
1492 }
1493 assert_eq!(l.current(), cfg.max_concurrency);
1494 }
1495
1496 #[test]
1502 fn saturating_arithmetic_handles_extreme_config() {
1503 let cfg = LimiterConfig {
1504 max_concurrency: usize::MAX / 2,
1505 ..cfg_for_tests()
1506 };
1507 let start = usize::MAX / 4;
1508 let l = Limiter::new(start, cfg.clone());
1509 for _ in 0..(cfg.window_ops * 10) {
1510 l.observe(Outcome::Success, Duration::from_millis(1));
1511 }
1512 assert_eq!(
1517 l.current(),
1518 cfg.max_concurrency,
1519 "saturating math survived but cap did not grow to ceiling"
1520 );
1521 }
1522
1523 #[test]
1530 fn window_eviction_is_fifo() {
1531 let cfg = LimiterConfig {
1532 window_ops: 10,
1533 min_window_ops: 5,
1534 success_target: 0.9,
1535 timeout_ceiling: 0.1,
1536 ..cfg_for_tests()
1537 };
1538 let l = Limiter::new(8, cfg.clone());
1539 for _ in 0..cfg.window_ops {
1544 l.observe(Outcome::Timeout, Duration::from_millis(50));
1545 }
1546 let after_stress = l.current();
1547 assert!(
1548 after_stress < 8,
1549 "expected cap to drop from 8 after pure-timeout window, got {after_stress}"
1550 );
1551 for _ in 0..(cfg.window_ops * 3) {
1556 l.observe(Outcome::Success, Duration::from_millis(20));
1557 }
1558 let after_recovery = l.current();
1559 assert!(
1562 after_recovery > after_stress,
1563 "FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
1564 );
1565 }
1566
1567 #[test]
1570 fn disabled_controller_returns_initial_value_invariantly() {
1571 let cfg = LimiterConfig {
1572 enabled: false,
1573 ..cfg_for_tests()
1574 };
1575 let initial = 8;
1576 let l = Limiter::new(initial, cfg);
1577 for i in 0..1_000 {
1578 let outcome = match i % 4 {
1579 0 => Outcome::Success,
1580 1 => Outcome::Timeout,
1581 2 => Outcome::NetworkError,
1582 _ => Outcome::ApplicationError,
1583 };
1584 l.observe(outcome, Duration::from_millis(50));
1585 assert_eq!(
1586 l.current(),
1587 initial,
1588 "disabled controller moved at iter {i}",
1589 );
1590 }
1591 }
1592
1593 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1596 async fn concurrent_observations_do_not_corrupt_window() {
1597 let cfg = cfg_for_tests();
1598 let l = Limiter::new(4, cfg.clone());
1599 let mut handles = Vec::with_capacity(100);
1600 for _ in 0..100 {
1601 let l_clone = l.clone();
1602 handles.push(tokio::spawn(async move {
1603 for _ in 0..100 {
1604 l_clone.observe(Outcome::Success, Duration::from_millis(5));
1605 }
1606 }));
1607 }
1608 for h in handles {
1609 h.await.unwrap();
1610 }
1611 let cur = l.current();
1612 assert!(
1613 cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
1614 "cap out of bounds after concurrent observations: {cur}",
1615 );
1616 }
1617
1618 #[test]
1623 fn persisted_snapshot_warm_starts_above_cold_floor() {
1624 let dir = tempfile::tempdir().unwrap();
1625 let path = dir.path().join("client_adaptive.json");
1626 let saved = ChannelStart {
1629 quote: 64,
1630 store: 32,
1631 fetch: 128,
1632 };
1633 save_snapshot(&path, saved);
1634 let loaded = load_snapshot(&path).unwrap();
1635
1636 let low = ChannelStart {
1639 quote: 2,
1640 store: 2,
1641 fetch: 2,
1642 };
1643 let c = AdaptiveController::new(low, AdaptiveConfig::default());
1644 c.warm_start(loaded);
1645 assert_eq!(c.quote.current(), 64);
1646 assert_eq!(c.store.current(), 32);
1647 assert_eq!(c.fetch.current(), 128);
1648 }
1649
1650 #[test]
1654 fn save_load_round_trip_with_concurrent_writes() {
1655 use std::thread;
1656 let dir = tempfile::tempdir().unwrap();
1657 let path = dir.path().join("client_adaptive.json");
1658 let path_a = path.clone();
1659 let path_b = path.clone();
1660 let snap_a = ChannelStart {
1661 quote: 10,
1662 store: 10,
1663 fetch: 10,
1664 };
1665 let snap_b = ChannelStart {
1666 quote: 99,
1667 store: 99,
1668 fetch: 99,
1669 };
1670 let h_a = thread::spawn(move || {
1671 for _ in 0..50 {
1672 save_snapshot(&path_a, snap_a);
1673 }
1674 });
1675 let h_b = thread::spawn(move || {
1676 for _ in 0..50 {
1677 save_snapshot(&path_b, snap_b);
1678 }
1679 });
1680 h_a.join().unwrap();
1681 h_b.join().unwrap();
1682 let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
1683 let valid = (loaded.quote == snap_a.quote
1684 && loaded.store == snap_a.store
1685 && loaded.fetch == snap_a.fetch)
1686 || (loaded.quote == snap_b.quote
1687 && loaded.store == snap_b.store
1688 && loaded.fetch == snap_b.fetch);
1689 assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
1690 }
1691
1692 #[test]
1695 fn save_snapshot_to_unwritable_dir_does_not_panic() {
1696 let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
1700 let snap = ChannelStart {
1701 quote: 1,
1702 store: 1,
1703 fetch: 1,
1704 };
1705 save_snapshot(&path, snap);
1707 assert!(!path.exists());
1709 }
1710
1711 #[test]
1714 fn load_snapshot_from_truncated_file_returns_none() {
1715 let dir = tempfile::tempdir().unwrap();
1716 let path = dir.path().join("truncated.json");
1717 std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
1718 assert!(load_snapshot(&path).is_none());
1719 }
1720
1721 #[test]
1725 fn controller_perf_overhead_is_bounded() {
1726 let cfg = cfg_for_tests();
1727 let l = Limiter::new(8, cfg);
1728 let started = Instant::now();
1729 for _ in 0..100_000 {
1730 let _ = l.current();
1731 l.observe(Outcome::Success, Duration::from_micros(1));
1732 }
1733 let elapsed = started.elapsed();
1734 assert!(
1737 elapsed < Duration::from_millis(500),
1738 "100k observe+current pairs took {elapsed:?}, expected <500ms",
1739 );
1740 }
1741
1742 #[test]
1750 fn nan_and_out_of_range_config_does_not_panic() {
1751 let cfg = AdaptiveConfig {
1752 enabled: true,
1753 min_concurrency: 0, max: ChannelMax {
1755 quote: 0, store: 0,
1757 fetch: 0,
1758 },
1759 window_ops: 10,
1760 min_window_ops: 50, success_target: f64::NAN,
1762 timeout_ceiling: f64::INFINITY,
1763 latency_inflation_factor: f64::NEG_INFINITY,
1764 latency_ewma_alpha: f64::NAN,
1765 };
1766 let c = AdaptiveController::new(ChannelStart::default(), cfg);
1767 let post = &c.config;
1771 assert_eq!(
1772 post.min_concurrency, 1,
1773 "sanitize did not raise min_concurrency from 0"
1774 );
1775 assert!(
1776 post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
1777 "sanitize did not clamp success_target from NaN: {}",
1778 post.success_target
1779 );
1780 assert!(
1781 post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
1782 "sanitize did not clamp timeout_ceiling from Inf: {}",
1783 post.timeout_ceiling
1784 );
1785 assert!(
1786 post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
1787 "sanitize did not fix latency_inflation_factor from -Inf: {}",
1788 post.latency_inflation_factor
1789 );
1790 assert!(
1791 post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
1792 "sanitize did not fix latency_ewma_alpha from NaN: {}",
1793 post.latency_ewma_alpha
1794 );
1795 assert!(
1796 post.min_window_ops <= post.window_ops,
1797 "sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
1798 post.min_window_ops,
1799 post.window_ops
1800 );
1801 assert!(
1802 post.max.quote >= post.min_concurrency,
1803 "max.quote below min_concurrency"
1804 );
1805 for _ in 0..200 {
1808 c.store
1809 .observe(Outcome::Success, Duration::from_secs(99_999));
1810 c.store.observe(Outcome::Timeout, Duration::ZERO);
1811 }
1812 let cur = c.store.current();
1813 assert!(cur >= 1, "cap below floor: {cur}");
1814 }
1815
1816 #[test]
1823 fn transient_burst_does_not_pile_drive_to_floor() {
1824 let cfg = LimiterConfig {
1825 window_ops: 32,
1826 min_window_ops: 8,
1827 success_target: 0.95,
1828 timeout_ceiling: 0.10,
1829 ..cfg_for_tests()
1830 };
1831 let l = Limiter::new(32, cfg);
1832 for _ in 0..8 {
1836 l.observe(Outcome::Timeout, Duration::from_millis(10));
1837 }
1838 let after_burst = l.current();
1841 assert!(
1842 after_burst >= 16,
1843 "transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
1844 );
1845 }
1846
1847 #[tokio::test]
1852 async fn transport_errors_classify_as_capacity_signal() {
1853 use crate::data::client::classify_error;
1854 use crate::data::error::Error;
1855 let make_cfg = || LimiterConfig {
1856 window_ops: 16,
1857 min_window_ops: 5,
1858 success_target: 0.5,
1859 timeout_ceiling: 0.5,
1860 ..cfg_for_tests()
1861 };
1862 type ErrFactory = Box<dyn Fn() -> Error>;
1864 let cases: Vec<(&str, ErrFactory)> = vec![
1865 ("Network", Box::new(|| Error::Network("net".to_string()))),
1866 (
1867 "InsufficientPeers",
1868 Box::new(|| Error::InsufficientPeers("ip".to_string())),
1869 ),
1870 ("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
1871 ("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
1872 ("Storage", Box::new(|| Error::Storage("s".to_string()))),
1873 (
1874 "PartialUpload",
1875 Box::new(|| Error::PartialUpload {
1876 stored: vec![],
1877 stored_count: 0,
1878 failed: vec![],
1879 failed_count: 0,
1880 total_chunks: 0,
1881 reason: "r".to_string(),
1882 }),
1883 ),
1884 ];
1885 for (name, mk) in &cases {
1886 let l = Limiter::new(8, make_cfg());
1887 for _ in 0..16 {
1888 let _: std::result::Result<(), Error> =
1889 observe_op(&l, || async { Err(mk()) }, classify_error).await;
1890 }
1891 let cur = l.current();
1895 assert!(
1896 cur < 8,
1897 "{name} not classified as capacity signal: cap stayed at {cur}",
1898 );
1899 }
1900 }
1901
1902 #[test]
1906 fn per_channel_ceilings_are_independent() {
1907 let cfg = AdaptiveConfig {
1908 max: ChannelMax {
1909 quote: 4, store: 8, fetch: 1024, },
1913 ..AdaptiveConfig::default()
1914 };
1915 let c = AdaptiveController::new(
1916 ChannelStart {
1917 quote: 4,
1918 store: 8,
1919 fetch: 64,
1920 },
1921 cfg,
1922 );
1923 for _ in 0..1000 {
1926 c.quote.observe(Outcome::Success, Duration::from_micros(10));
1927 c.store.observe(Outcome::Success, Duration::from_micros(10));
1928 c.fetch.observe(Outcome::Success, Duration::from_micros(10));
1929 }
1930 assert_eq!(c.quote.current(), 4, "quote should cap at 4");
1931 assert_eq!(c.store.current(), 8, "store should cap at 8");
1932 assert_eq!(
1937 c.fetch.current(),
1938 1024,
1939 "fetch did not reach its independent max of 1024; got {}",
1940 c.fetch.current()
1941 );
1942 }
1943
1944 #[test]
1949 fn cold_start_at_least_prior_static_defaults() {
1950 let cs = ChannelStart::default();
1951 assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
1956 assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
1957 assert!(cs.fetch >= 64, "fetch cold-start regressed: {}", cs.fetch);
1958 }
1959
1960 #[test]
1973 fn sustained_stress_reaches_floor_within_bounded_ops() {
1974 let cfg = LimiterConfig {
1975 window_ops: 32,
1976 min_window_ops: 8,
1977 success_target: 0.95,
1978 timeout_ceiling: 0.10,
1979 max_concurrency: 64,
1980 ..cfg_for_tests()
1981 };
1982 let l = Limiter::new(64, cfg);
1983 let mut ops = 0usize;
1984 while l.current() > 1 && ops < 200 {
1985 l.observe(Outcome::Timeout, Duration::from_millis(10));
1986 ops += 1;
1987 }
1988 assert_eq!(
1989 l.current(),
1990 1,
1991 "controller did not reach floor within 200 observations under \
1992 sustained timeout stress; took {ops} ops, ended at cap {}",
1993 l.current()
1994 );
1995 }
1996
1997 #[test]
2002 fn default_controller_has_growth_headroom() {
2003 let c = AdaptiveController::default();
2004 let cs = ChannelStart::default();
2005 let max = ChannelMax::default();
2006 assert_eq!(c.quote.current(), cs.quote);
2007 assert_eq!(c.store.current(), cs.store);
2008 assert_eq!(c.fetch.current(), cs.fetch);
2009 assert!(
2010 max.quote > cs.quote,
2011 "no growth headroom for quote: max={} start={}",
2012 max.quote,
2013 cs.quote
2014 );
2015 assert!(
2016 max.store > cs.store,
2017 "no growth headroom for store: max={} start={}",
2018 max.store,
2019 cs.store
2020 );
2021 assert!(
2022 max.fetch > cs.fetch,
2023 "no growth headroom for fetch: max={} start={}",
2024 max.fetch,
2025 cs.fetch
2026 );
2027 }
2028
2029 #[test]
2036 fn warm_start_floors_at_cold_defaults() {
2037 let c = AdaptiveController::default();
2038 let cold = ChannelStart::default();
2039 let bad_snap = ChannelStart {
2041 quote: 1,
2042 store: 1,
2043 fetch: 1,
2044 };
2045 c.warm_start(bad_snap);
2046 assert_eq!(
2049 c.quote.current(),
2050 cold.quote,
2051 "quote warm_start did not floor at cold default"
2052 );
2053 assert_eq!(
2054 c.store.current(),
2055 cold.store,
2056 "store warm_start did not floor at cold default"
2057 );
2058 assert_eq!(
2059 c.fetch.current(),
2060 cold.fetch,
2061 "fetch warm_start did not floor at cold default"
2062 );
2063 }
2064
2065 #[test]
2068 fn warm_start_honors_values_above_cold_floor() {
2069 let c = AdaptiveController::default();
2070 let cold = ChannelStart::default();
2071 let snap = ChannelStart {
2072 quote: cold.quote * 2,
2073 store: cold.store * 4,
2074 fetch: cold.fetch * 2,
2075 };
2076 c.warm_start(snap);
2077 assert_eq!(c.quote.current(), snap.quote);
2078 assert_eq!(c.store.current(), snap.store);
2079 assert_eq!(c.fetch.current(), snap.fetch);
2080 }
2081
2082 #[tokio::test]
2089 async fn rebucketed_picks_up_cap_changes_mid_stream() {
2090 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
2091 use std::sync::Arc as StdArc;
2092 let cfg = LimiterConfig {
2093 min_concurrency: 1,
2094 max_concurrency: 32,
2095 ..cfg_for_tests()
2096 };
2097 let l = Limiter::new(4, cfg);
2098 let max_seen = StdArc::new(AtomicUsize::new(0));
2099 let in_flight = StdArc::new(AtomicUsize::new(0));
2100 let processed = StdArc::new(AtomicUsize::new(0));
2101 let l_for_bump = l.clone();
2102 let processed_for_bump = processed.clone();
2103 let bump_handle = tokio::spawn(async move {
2106 loop {
2107 tokio::time::sleep(Duration::from_millis(2)).await;
2108 if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
2109 l_for_bump.warm_start(16);
2110 return;
2111 }
2112 }
2113 });
2114 let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
2115 let max_seen = max_seen.clone();
2116 let in_flight = in_flight.clone();
2117 let processed = processed.clone();
2118 async move {
2119 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
2120 max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
2121 tokio::time::sleep(Duration::from_millis(1)).await;
2122 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
2123 processed.fetch_add(1, AtomicOrdering::Relaxed);
2124 Ok::<(), &'static str>(())
2125 }
2126 })
2127 .await
2128 .unwrap();
2129 bump_handle.await.unwrap();
2130 let peak = max_seen.load(AtomicOrdering::Relaxed);
2134 assert!(
2135 peak > 4,
2136 "rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
2137 );
2138 }
2139
2140 #[tokio::test]
2149 async fn observe_op_cancellation_drops_silently() {
2150 let cfg = LimiterConfig {
2151 window_ops: 16,
2152 min_window_ops: 4,
2153 ..cfg_for_tests()
2154 };
2155 let l = Limiter::new(4, cfg);
2156 let l_clone = l.clone();
2160 let fut = observe_op(
2161 &l_clone,
2162 || async {
2163 std::future::pending::<()>().await;
2164 Ok::<(), &'static str>(())
2165 },
2166 |_| Outcome::Timeout,
2167 );
2168 drop(fut);
2169 assert_eq!(l.current(), 4, "cancelled op moved the cap");
2171 for _ in 0..16 {
2176 let _: Result<(), &'static str> = observe_op(
2177 &l,
2178 || async { Ok(()) },
2179 |_| Outcome::NetworkError,
2181 )
2182 .await;
2183 }
2184 assert!(
2187 l.current() > 4,
2188 "cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
2189 l.current(),
2190 );
2191 }
2192
2193 #[test]
2200 fn save_snapshot_is_synchronous_and_durable() {
2201 let dir = tempfile::tempdir().unwrap();
2202 let path = dir.path().join("client_adaptive.json");
2203 let snap = ChannelStart {
2204 quote: 100,
2205 store: 50,
2206 fetch: 200,
2207 };
2208 save_snapshot(&path, snap);
2209 assert!(
2212 path.exists(),
2213 "save_snapshot did not write file synchronously"
2214 );
2215 let loaded = load_snapshot(&path).unwrap();
2216 assert_eq!(loaded.quote, 100);
2217 assert_eq!(loaded.store, 50);
2218 assert_eq!(loaded.fetch, 200);
2219 }
2220
2221 #[tokio::test]
2228 async fn warm_start_disables_slow_start_doubling() {
2229 let cfg = LimiterConfig {
2230 window_ops: 8,
2231 min_window_ops: 4,
2232 success_target: 0.9,
2233 ..cfg_for_tests()
2234 };
2235 let l = Limiter::new(2, cfg.clone());
2236 l.warm_start(16);
2239 assert_eq!(l.current(), 16);
2240 for _ in 0..cfg.window_ops {
2243 l.observe(Outcome::Success, Duration::from_millis(10));
2244 }
2245 assert_eq!(
2246 l.current(),
2247 17,
2248 "warm-start triggered slow-start doubling instead of additive +1"
2249 );
2250 }
2251
2252 #[test]
2257 fn controller_warm_start_floors_at_per_instance_cold_start() {
2258 let custom_cold = ChannelStart {
2259 quote: 2,
2260 store: 1,
2261 fetch: 4,
2262 };
2263 let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
2264 c.warm_start(ChannelStart {
2266 quote: 1,
2267 store: 1,
2268 fetch: 1,
2269 });
2270 assert_eq!(c.quote.current(), 2);
2271 assert_eq!(c.store.current(), 1);
2272 assert_eq!(c.fetch.current(), 4);
2273 c.warm_start(ChannelStart {
2275 quote: 10,
2276 store: 10,
2277 fetch: 10,
2278 });
2279 assert_eq!(c.quote.current(), 10);
2280 assert_eq!(c.store.current(), 10);
2281 assert_eq!(c.fetch.current(), 10);
2282 }
2283
2284 #[test]
2288 fn warm_start_is_noop_when_adaptive_disabled() {
2289 let cfg = AdaptiveConfig {
2290 enabled: false,
2291 ..AdaptiveConfig::default()
2292 };
2293 let custom_cold = ChannelStart {
2294 quote: 5,
2295 store: 5,
2296 fetch: 5,
2297 };
2298 let c = AdaptiveController::new(custom_cold, cfg);
2299 c.warm_start(ChannelStart {
2300 quote: 100,
2301 store: 100,
2302 fetch: 100,
2303 });
2304 assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
2305 assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
2306 assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
2307 }
2308
2309 #[tokio::test]
2313 async fn rebucketed_unordered_is_rolling_not_fenced() {
2314 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
2315 use std::sync::Arc as StdArc;
2316 let cfg = LimiterConfig {
2317 min_concurrency: 1,
2318 max_concurrency: 8,
2319 window_ops: 100,
2320 min_window_ops: 50,
2321 ..cfg_for_tests()
2322 };
2323 let l = Limiter::new(4, cfg);
2324 let in_flight = StdArc::new(AtomicUsize::new(0));
2325 let max_in_flight = StdArc::new(AtomicUsize::new(0));
2326 let started = StdArc::new(AtomicUsize::new(0));
2327 let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
2328 let in_flight = in_flight.clone();
2329 let max_in_flight = max_in_flight.clone();
2330 let started = started.clone();
2331 async move {
2332 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
2333 max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
2334 started.fetch_add(1, AtomicOrdering::Relaxed);
2335 if i == 0 {
2341 tokio::time::sleep(Duration::from_millis(50)).await;
2342 } else {
2343 tokio::time::sleep(Duration::from_millis(1)).await;
2344 }
2345 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
2346 Ok::<(), &'static str>(())
2347 }
2348 })
2349 .await
2350 .unwrap();
2351 assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
2354 let peak = max_in_flight.load(AtomicOrdering::Relaxed);
2355 assert!(
2356 peak >= 4,
2357 "rolling scheduler did not fill cap; peak in-flight = {peak}"
2358 );
2359 }
2360
2361 #[tokio::test]
2363 async fn rebucketed_ordered_preserves_input_order() {
2364 let cfg = LimiterConfig {
2365 min_concurrency: 1,
2366 max_concurrency: 4,
2367 ..cfg_for_tests()
2368 };
2369 let l = Limiter::new(4, cfg);
2370 let items: Vec<usize> = (0..50).collect();
2371 let result: Vec<usize> = rebucketed_ordered(
2372 &l,
2373 items.iter().copied().enumerate(),
2374 |(idx, v)| async move {
2375 let delay = (50 - v) as u64;
2377 tokio::time::sleep(Duration::from_micros(delay)).await;
2378 Ok::<_, &'static str>((idx, v * 10))
2379 },
2380 )
2381 .await
2382 .unwrap();
2383 assert_eq!(result.len(), 50);
2384 for (i, v) in result.iter().enumerate() {
2385 assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
2386 }
2387 }
2388
2389 #[tokio::test]
2394 async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
2395 let cfg = LimiterConfig {
2396 min_concurrency: 1,
2397 max_concurrency: 8,
2398 ..cfg_for_tests()
2399 };
2400 let l = Limiter::new(8, cfg);
2401 let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
2406 let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
2407 let delay = (40 - idx) as u64; tokio::time::sleep(Duration::from_micros(delay)).await;
2409 Ok::<_, &'static str>((idx, hash * 7))
2411 })
2412 .await
2413 .unwrap();
2414 for (i, v) in result.iter().enumerate() {
2415 let expected = (1000 + i as u64) * 7;
2416 assert_eq!(
2417 *v, expected,
2418 "idx {i} paired with wrong content: {v}, expected {expected}"
2419 );
2420 }
2421 }
2422
2423 #[test]
2427 fn save_snapshot_temp_file_is_unique_per_call() {
2428 let dir = tempfile::tempdir().unwrap();
2429 let path = dir.path().join("client_adaptive.json");
2430 for i in 0..100 {
2437 save_snapshot(
2438 &path,
2439 ChannelStart {
2440 quote: i + 1,
2441 store: i + 1,
2442 fetch: i + 1,
2443 },
2444 );
2445 }
2446 let loaded = load_snapshot(&path).unwrap();
2447 assert_eq!(loaded.quote, 100);
2448 assert_eq!(loaded.store, 100);
2449 assert_eq!(loaded.fetch, 100);
2450 let leftover: Vec<_> = std::fs::read_dir(dir.path())
2452 .unwrap()
2453 .filter_map(|e| e.ok())
2454 .filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
2455 .collect();
2456 assert!(
2457 leftover.is_empty(),
2458 "temp files leaked: {:?}",
2459 leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
2460 );
2461 }
2462
2463 #[tokio::test]
2468 async fn rebucketed_empty_input_returns_empty() {
2469 let cfg = cfg_for_tests();
2470 let l = Limiter::new(4, cfg);
2471 let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
2472 Ok::<_, &'static str>(42usize)
2473 })
2474 .await
2475 .unwrap();
2476 assert!(v.is_empty());
2477 let v: Vec<usize> = rebucketed_ordered(
2478 &l,
2479 std::iter::empty::<(usize, ())>(),
2480 |(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
2481 )
2482 .await
2483 .unwrap();
2484 assert!(v.is_empty());
2485 }
2486
2487 #[tokio::test]
2489 async fn rebucketed_exactly_cap_items() {
2490 let cfg = LimiterConfig {
2491 min_concurrency: 1,
2492 max_concurrency: 4,
2493 ..cfg_for_tests()
2494 };
2495 let l = Limiter::new(4, cfg);
2496 let v: Vec<usize> =
2497 rebucketed_unordered(
2498 &l,
2499 0..4usize,
2500 |i| async move { Ok::<_, &'static str>(i * 2) },
2501 )
2502 .await
2503 .unwrap();
2504 assert_eq!(v.len(), 4);
2505 }
2506
2507 #[tokio::test]
2510 async fn rebucketed_preserves_first_error() {
2511 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
2512 use std::sync::Arc as StdArc;
2513 let cfg = LimiterConfig {
2514 min_concurrency: 1,
2515 max_concurrency: 4,
2516 ..cfg_for_tests()
2517 };
2518 let l = Limiter::new(4, cfg);
2519 let started = StdArc::new(AtomicUsize::new(0));
2520 let started_clone = started.clone();
2521 let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
2522 let started = started_clone.clone();
2523 async move {
2524 started.fetch_add(1, AtomicOrdering::Relaxed);
2525 if i == 5 {
2526 tokio::time::sleep(Duration::from_micros(100)).await;
2529 return Err("first error");
2530 }
2531 if i == 10 {
2532 return Err("second error - should be ignored");
2533 }
2534 tokio::time::sleep(Duration::from_micros(50)).await;
2535 Ok(())
2536 }
2537 })
2538 .await;
2539 match result {
2540 Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
2541 Ok(_) => panic!("expected error, got ok"),
2542 }
2543 let total = started.load(AtomicOrdering::Relaxed);
2549 assert!(
2550 (5..20).contains(&total),
2551 "started count out of range: {total}"
2552 );
2553 }
2554
2555 #[test]
2558 fn limiter_with_min_equal_max_is_pinned() {
2559 let cfg = LimiterConfig {
2560 min_concurrency: 5,
2561 max_concurrency: 5,
2562 ..cfg_for_tests()
2563 };
2564 let l = Limiter::new(5, cfg);
2565 for _ in 0..1000 {
2566 l.observe(Outcome::Success, Duration::from_millis(1));
2567 }
2568 assert_eq!(l.current(), 5, "cap moved despite min==max");
2569 for _ in 0..1000 {
2570 l.observe(Outcome::Timeout, Duration::from_millis(50));
2571 }
2572 assert_eq!(l.current(), 5, "cap moved despite min==max");
2573 }
2574
2575 #[test]
2578 fn ewma_alpha_zero_returns_prev() {
2579 let prev = Duration::from_millis(100);
2580 let sample = Duration::from_millis(500);
2581 let result = ewma(prev, sample, 0.0);
2582 assert_eq!(result, prev, "alpha=0 must return prev unchanged");
2583 }
2584
2585 #[test]
2588 fn ewma_alpha_one_returns_sample() {
2589 let prev = Duration::from_millis(100);
2590 let sample = Duration::from_millis(500);
2591 let result = ewma(prev, sample, 1.0);
2592 let diff = result.abs_diff(sample);
2594 assert!(
2595 diff <= Duration::from_millis(1),
2596 "alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
2597 );
2598 }
2599
2600 #[test]
2602 fn ewma_alpha_half_returns_midpoint() {
2603 let prev = Duration::from_millis(200);
2604 let sample = Duration::from_millis(400);
2605 let result = ewma(prev, sample, 0.5);
2606 let expected = Duration::from_millis(300);
2607 let diff = result.abs_diff(expected);
2608 assert!(
2609 diff <= Duration::from_millis(1),
2610 "alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
2611 );
2612 }
2613
2614 #[test]
2618 fn ewma_nan_alpha_returns_prev() {
2619 let prev = Duration::from_millis(100);
2620 let sample = Duration::from_millis(500);
2621 let result = ewma(prev, sample, f64::NAN);
2622 assert_eq!(result, prev);
2623 let result = ewma(prev, sample, f64::INFINITY);
2624 assert_eq!(result, prev);
2625 let result = ewma(prev, sample, f64::NEG_INFINITY);
2626 assert_eq!(result, prev);
2627 }
2628
2629 #[test]
2632 fn ewma_clamps_alpha_above_one() {
2633 let prev = Duration::from_millis(100);
2634 let sample = Duration::from_millis(500);
2635 let result = ewma(prev, sample, 2.5);
2636 assert!(result >= Duration::from_millis(499));
2638 assert!(result <= Duration::from_millis(501));
2639 }
2640
2641 #[test]
2645 fn window_full_of_application_errors_does_not_move_cap() {
2646 let cfg = cfg_for_tests();
2647 let l = Limiter::new(8, cfg.clone());
2648 for _ in 0..(cfg.window_ops * 5) {
2649 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
2650 }
2651 assert_eq!(
2652 l.current(),
2653 8,
2654 "cap moved on pure-app-error window; should hold"
2655 );
2656 }
2657
2658 #[test]
2662 fn disabled_adaptive_controller_truly_inert() {
2663 let cfg = AdaptiveConfig {
2664 enabled: false,
2665 ..AdaptiveConfig::default()
2666 };
2667 let c = AdaptiveController::new(ChannelStart::default(), cfg);
2668 let baseline_quote = c.quote.current();
2669 let baseline_store = c.store.current();
2670 let baseline_fetch = c.fetch.current();
2671 for _ in 0..10000 {
2672 c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
2673 c.store.observe(Outcome::Timeout, Duration::from_millis(1));
2674 c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
2675 }
2676 assert_eq!(c.quote.current(), baseline_quote);
2677 assert_eq!(c.store.current(), baseline_store);
2678 assert_eq!(c.fetch.current(), baseline_fetch);
2679 }
2680
2681 #[test]
2686 fn channel_state_is_independent() {
2687 let c = AdaptiveController::default();
2688 let q0 = c.quote.current();
2689 let f0 = c.fetch.current();
2690 let s0 = c.store.current();
2691 for _ in 0..1000 {
2692 c.store.observe(Outcome::Timeout, Duration::from_millis(1));
2693 }
2694 assert_eq!(
2696 c.store.current(),
2697 c.config.min_concurrency,
2698 "store did not reach floor after 1000 timeouts; cap={}",
2699 c.store.current()
2700 );
2701 assert!(c.store.current() < s0, "store cap did not move at all");
2702 assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
2704 assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
2705 }
2706
2707 #[test]
2713 fn sanitize_corrects_pathological_floats() {
2714 let mut cfg = AdaptiveConfig {
2715 success_target: f64::NAN,
2716 timeout_ceiling: 5.0,
2717 latency_inflation_factor: f64::NEG_INFINITY,
2718 latency_ewma_alpha: 2.5,
2719 window_ops: 4,
2720 min_window_ops: 10,
2721 ..AdaptiveConfig::default()
2722 };
2723 cfg.sanitize();
2724 assert!(cfg.success_target.is_finite());
2725 assert!((0.0..=1.0).contains(&cfg.success_target));
2726 assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
2727 assert!(cfg.latency_inflation_factor.is_finite());
2728 assert!(cfg.latency_inflation_factor > 0.0);
2729 assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
2730 assert!(
2731 cfg.min_window_ops <= cfg.window_ops,
2732 "min_window_ops {} > window_ops {}",
2733 cfg.min_window_ops,
2734 cfg.window_ops
2735 );
2736 }
2737
2738 #[test]
2743 fn channel_max_serde_round_trips() {
2744 let m = ChannelMax {
2745 quote: 7,
2746 store: 13,
2747 fetch: 200,
2748 };
2749 let json = serde_json::to_string(&m).unwrap();
2750 let back: ChannelMax = serde_json::from_str(&json).unwrap();
2751 assert_eq!(back.quote, 7);
2752 assert_eq!(back.store, 13);
2753 assert_eq!(back.fetch, 200);
2754 }
2755
2756 #[test]
2757 fn channel_start_serde_round_trips() {
2758 let s = ChannelStart {
2759 quote: 11,
2760 store: 22,
2761 fetch: 33,
2762 };
2763 let json = serde_json::to_string(&s).unwrap();
2764 let back: ChannelStart = serde_json::from_str(&json).unwrap();
2765 assert_eq!(back.quote, 11);
2766 assert_eq!(back.store, 22);
2767 assert_eq!(back.fetch, 33);
2768 }
2769
2770 #[tokio::test]
2775 async fn rebucketed_honors_cap_shrinkage_mid_stream() {
2776 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
2777 use std::sync::Arc as StdArc;
2778 let cfg = LimiterConfig {
2779 min_concurrency: 1,
2780 max_concurrency: 16,
2781 ..cfg_for_tests()
2782 };
2783 let l = Limiter::new(16, cfg);
2784 let in_flight = StdArc::new(AtomicUsize::new(0));
2785 let max_after_shrink = StdArc::new(AtomicUsize::new(0));
2786 let processed = StdArc::new(AtomicUsize::new(0));
2787 let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
2788 let l_for_shrink = l.clone();
2789 let p_for_shrink = processed.clone();
2790 let shrunk_for_shrink = shrunk.clone();
2791 let shrink_handle = tokio::spawn(async move {
2792 loop {
2794 tokio::time::sleep(Duration::from_millis(2)).await;
2795 if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
2796 l_for_shrink.warm_start(2);
2797 shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
2798 return;
2799 }
2800 }
2801 });
2802 let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
2803 let in_flight = in_flight.clone();
2804 let max_after_shrink = max_after_shrink.clone();
2805 let processed = processed.clone();
2806 let shrunk = shrunk.clone();
2807 async move {
2808 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
2809 if shrunk.load(AtomicOrdering::Relaxed) {
2810 max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
2811 }
2812 tokio::time::sleep(Duration::from_millis(1)).await;
2813 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
2814 processed.fetch_add(1, AtomicOrdering::Relaxed);
2815 Ok::<(), &'static str>(())
2816 }
2817 })
2818 .await
2819 .unwrap();
2820 shrink_handle.await.unwrap();
2821 let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
2822 assert!(
2827 peak <= 4,
2828 "rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
2829 );
2830 }
2831
2832 #[test]
2838 fn mixed_window_app_errors_with_capacity_signal() {
2839 let cfg = LimiterConfig {
2840 window_ops: 10,
2841 min_window_ops: 5,
2842 timeout_ceiling: 0.2,
2843 success_target: 0.9,
2844 ..cfg_for_tests()
2845 };
2846 let l = Limiter::new(8, cfg.clone());
2851 for _ in 0..5 {
2852 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
2853 }
2854 for _ in 0..5 {
2855 l.observe(Outcome::Success, Duration::from_millis(50));
2856 }
2857 assert!(
2858 l.current() >= 8,
2859 "AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
2860 l.current()
2861 );
2862 let l2 = Limiter::new(8, cfg);
2865 for _ in 0..5 {
2866 l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
2867 }
2868 for _ in 0..5 {
2869 l2.observe(Outcome::Timeout, Duration::from_millis(50));
2870 }
2871 assert!(
2872 l2.current() < 8,
2873 "all-timeouts (with AppError padding) did not decrease cap; got {}",
2874 l2.current()
2875 );
2876 }
2877
2878 #[test]
2884 fn concurrent_save_load_no_torn_reads() {
2885 use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
2886 use std::thread;
2887 let dir = tempfile::tempdir().unwrap();
2888 let path = dir.path().join("snap.json");
2889 save_snapshot(
2891 &path,
2892 ChannelStart {
2893 quote: 1,
2894 store: 1,
2895 fetch: 1,
2896 },
2897 );
2898 let stop = std::sync::Arc::new(AtomicBool::new(false));
2899 let p_w = path.clone();
2900 let s_w = stop.clone();
2901 let writer = thread::spawn(move || {
2902 let mut i = 1usize;
2903 while !s_w.load(AtomicOrdering::Relaxed) {
2904 save_snapshot(
2905 &p_w,
2906 ChannelStart {
2907 quote: i,
2908 store: i,
2909 fetch: i,
2910 },
2911 );
2912 i = i.wrapping_add(1).max(1);
2913 }
2914 });
2915 let p_r = path.clone();
2916 let reader = thread::spawn(move || {
2917 let mut torn = 0usize;
2918 for _ in 0..2_000 {
2919 if let Some(snap) = load_snapshot(&p_r) {
2920 if snap.quote != snap.store || snap.store != snap.fetch {
2923 torn += 1;
2924 }
2925 }
2926 }
2927 torn
2928 });
2929 let torn = reader.join().unwrap();
2930 stop.store(true, AtomicOrdering::Relaxed);
2931 writer.join().unwrap();
2932 assert_eq!(
2933 torn, 0,
2934 "observed {torn} torn reads under concurrent writes"
2935 );
2936 }
2937
2938 #[test]
2946 fn save_with_timeout_returns_promptly_on_fast_failure() {
2947 let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
2948 let snap = ChannelStart {
2949 quote: 1,
2950 store: 1,
2951 fetch: 1,
2952 };
2953 let started = Instant::now();
2954 save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
2955 let elapsed = started.elapsed();
2956 assert!(
2959 elapsed < Duration::from_secs(1),
2960 "save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
2961 );
2962 }
2963
2964 #[test]
2969 fn save_with_timeout_bounds_wall_time_on_hang() {
2970 let dir = tempfile::tempdir().unwrap();
2982 let path = dir.path().join("snap.json");
2983 let snap = ChannelStart {
2984 quote: 1,
2985 store: 1,
2986 fetch: 1,
2987 };
2988 let started = Instant::now();
2989 save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
2992 let elapsed = started.elapsed();
2993 assert!(
2994 elapsed < Duration::from_millis(200),
2995 "timeout wrapper did not bound wall time: {elapsed:?}"
2996 );
2997 }
2998}