1use futures::stream::{self, FuturesUnordered, StreamExt};
50use serde::{Deserialize, Serialize};
51use std::collections::VecDeque;
52use std::path::{Path, PathBuf};
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::{Arc, Mutex, PoisonError};
55use std::time::{Duration, Instant};
56use tracing::{debug, warn};
57
58static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
62
63const FETCH_COLD_START_CONCURRENCY: usize = 4;
67
68const HILL_PROBE_STEP_DIVISOR: usize = 4;
70
71const HILL_MIN_PROBE_STEP: usize = 1;
73
74const HILL_UP_PROBE_ACCEPT_RATIO: f64 = 1.05;
76
77const HILL_DOWN_PROBE_ACCEPT_RATIO: f64 = 0.98;
79
80const HILL_REJECT_COOLDOWN_EPOCHS: usize = 2;
82
83const HILL_STABLE_PROBE_EPOCHS: usize = 3;
86
87const HILL_STRESS_DECREASE_DIVISOR: usize = 2;
89
90const HILL_EPOCH_FULL_WAVES: usize = 2;
94
95fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
98 m.lock().unwrap_or_else(PoisonError::into_inner)
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum Outcome {
104 Success,
106 Timeout,
108 NetworkError,
110 ApplicationError,
114}
115
116const FETCH_MIN_FLOOR: usize = 4;
126
127#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
131pub struct ChannelMax {
132 pub quote: usize,
133 pub store: usize,
134 pub fetch: usize,
135}
136
137impl Default for ChannelMax {
138 fn default() -> Self {
139 Self {
144 quote: 128,
145 store: 64,
146 fetch: 256,
147 }
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct AdaptiveConfig {
157 pub enabled: bool,
160 pub min_concurrency: usize,
162 pub max: ChannelMax,
164 pub window_ops: usize,
167 pub min_window_ops: usize,
169 pub success_target: f64,
172 pub timeout_ceiling: f64,
175 pub latency_inflation_factor: f64,
178 pub latency_ewma_alpha: f64,
183}
184
185impl AdaptiveConfig {
186 pub fn sanitize(&mut self) {
192 if !self.latency_ewma_alpha.is_finite() {
193 self.latency_ewma_alpha = 0.2;
194 }
195 self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
196 if !self.success_target.is_finite() {
197 self.success_target = 0.95;
198 }
199 self.success_target = self.success_target.clamp(0.0, 1.0);
200 if !self.timeout_ceiling.is_finite() {
201 self.timeout_ceiling = 0.10;
202 }
203 self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
204 if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
205 self.latency_inflation_factor = 4.0;
206 }
207 self.min_concurrency = self.min_concurrency.max(1);
208 self.window_ops = self.window_ops.max(1);
209 self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
210 self.max.quote = self.max.quote.max(self.min_concurrency);
211 self.max.store = self.max.store.max(self.min_concurrency);
212 self.max.fetch = self.max.fetch.max(self.min_concurrency);
213 }
214}
215
216impl Default for AdaptiveConfig {
217 fn default() -> Self {
218 Self {
219 enabled: true,
220 min_concurrency: 1,
221 max: ChannelMax::default(),
222 window_ops: 32,
223 min_window_ops: 8,
224 success_target: 0.95,
225 timeout_ceiling: 0.10,
226 latency_inflation_factor: 4.0,
233 latency_ewma_alpha: 0.2,
234 }
235 }
236}
237
238#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
249pub struct ChannelStart {
250 pub quote: usize,
251 pub store: usize,
252 pub fetch: usize,
253}
254
255impl Default for ChannelStart {
256 fn default() -> Self {
257 Self {
258 quote: 32,
259 store: 8,
260 fetch: FETCH_COLD_START_CONCURRENCY,
261 }
262 }
263}
264
265#[derive(Debug, Clone, Copy)]
267struct Sample {
268 outcome: Outcome,
269 latency: Duration,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275enum LimiterAlgorithm {
276 Aimd,
277 ThroughputHillClimb,
278}
279
280#[derive(Debug, Clone, Copy, PartialEq, Eq)]
282enum ProbeDirection {
283 Up,
284 Down,
285}
286
287#[derive(Debug)]
289struct HillClimbState {
290 epoch_started: Option<Instant>,
291 epoch_samples: usize,
292 epoch_successes: usize,
293 epoch_timeouts: usize,
294 epoch_net_errors: usize,
295 epoch_bytes: u64,
296 epoch_latencies: Vec<Duration>,
297 best_goodput_per_sec: Option<f64>,
298 best_latency_p95: Option<Duration>,
299 best_concurrency: usize,
300 stable_epochs: usize,
301 cooldown_epochs: usize,
302 next_probe: ProbeDirection,
303 active_probe: Option<ProbeDirection>,
304}
305
306impl HillClimbState {
307 fn new(start: usize, epoch_capacity: usize) -> Self {
308 Self {
309 epoch_started: None,
310 epoch_samples: 0,
311 epoch_successes: 0,
312 epoch_timeouts: 0,
313 epoch_net_errors: 0,
314 epoch_bytes: 0,
315 epoch_latencies: Vec::with_capacity(epoch_capacity),
316 best_goodput_per_sec: None,
317 best_latency_p95: None,
318 best_concurrency: start,
319 stable_epochs: 0,
320 cooldown_epochs: 0,
321 next_probe: ProbeDirection::Up,
322 active_probe: None,
323 }
324 }
325
326 fn reset_epoch(&mut self) {
327 self.epoch_started = None;
328 self.epoch_samples = 0;
329 self.epoch_successes = 0;
330 self.epoch_timeouts = 0;
331 self.epoch_net_errors = 0;
332 self.epoch_bytes = 0;
333 self.epoch_latencies.clear();
334 }
335
336 fn capacity_total(&self) -> usize {
337 self.epoch_successes + self.epoch_timeouts + self.epoch_net_errors
338 }
339}
340
341#[derive(Debug, Clone)]
347pub struct LimiterConfig {
348 pub enabled: bool,
349 pub min_concurrency: usize,
350 pub max_concurrency: usize,
351 pub window_ops: usize,
352 pub min_window_ops: usize,
353 pub success_target: f64,
354 pub timeout_ceiling: f64,
355 pub latency_inflation_factor: f64,
356 pub latency_ewma_alpha: f64,
357 pub slow_start_ramp_threshold: usize,
371 pub latency_decrease_enabled: bool,
381}
382
383impl LimiterConfig {
384 fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
385 Self {
386 enabled: cfg.enabled,
387 min_concurrency: cfg.min_concurrency,
388 max_concurrency: max_for_channel.max(cfg.min_concurrency),
389 window_ops: cfg.window_ops,
390 min_window_ops: cfg.min_window_ops,
391 success_target: cfg.success_target,
392 timeout_ceiling: cfg.timeout_ceiling,
393 latency_inflation_factor: cfg.latency_inflation_factor,
394 latency_ewma_alpha: cfg.latency_ewma_alpha,
395 slow_start_ramp_threshold: 0,
398 latency_decrease_enabled: true,
399 }
400 }
401
402 fn sanitize(&mut self) {
408 if !self.latency_ewma_alpha.is_finite() {
409 self.latency_ewma_alpha = 0.2;
410 }
411 self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
412 if !self.success_target.is_finite() {
413 self.success_target = 0.95;
414 }
415 self.success_target = self.success_target.clamp(0.0, 1.0);
416 if !self.timeout_ceiling.is_finite() {
417 self.timeout_ceiling = 0.10;
418 }
419 self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
420 if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
421 self.latency_inflation_factor = 4.0;
422 }
423 self.min_concurrency = self.min_concurrency.max(1);
424 self.window_ops = self.window_ops.max(1);
425 self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
426 self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
427 }
428}
429
430#[derive(Debug, Clone)]
436pub struct Limiter {
437 inner: Arc<Mutex<LimiterInner>>,
438 config: Arc<LimiterConfig>,
439 algorithm: LimiterAlgorithm,
440}
441
442#[derive(Debug)]
443struct LimiterInner {
444 current: usize,
446 window: VecDeque<Sample>,
448 samples_since_increase: usize,
452 samples_since_decrease: usize,
457 latency_baseline: Option<Duration>,
460 left_slow_start: bool,
463 hill: HillClimbState,
467}
468
469impl Limiter {
470 #[must_use]
475 pub fn new(start: usize, config: LimiterConfig) -> Self {
476 Self::new_with_algorithm(start, config, LimiterAlgorithm::Aimd)
477 }
478
479 fn new_with_algorithm(
480 start: usize,
481 config: LimiterConfig,
482 algorithm: LimiterAlgorithm,
483 ) -> Self {
484 let mut config = config;
485 config.sanitize();
486 let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
487 let window_cap = config.window_ops;
488 Self {
489 inner: Arc::new(Mutex::new(LimiterInner {
490 current: clamped,
491 window: VecDeque::with_capacity(window_cap),
492 samples_since_increase: 0,
493 samples_since_decrease: 0,
494 latency_baseline: None,
495 left_slow_start: false,
496 hill: HillClimbState::new(clamped, window_cap),
497 })),
498 config: Arc::new(config),
499 algorithm,
500 }
501 }
502
503 #[must_use]
507 pub fn current(&self) -> usize {
508 lock(&self.inner).current
509 }
510
511 pub fn observe(&self, outcome: Outcome, latency: Duration) {
514 self.observe_with_bytes(outcome, latency, 0);
515 }
516
517 pub fn observe_with_bytes(&self, outcome: Outcome, latency: Duration, bytes: u64) {
520 let observed_at = Instant::now();
521 let operation_started = observed_at.checked_sub(latency).unwrap_or(observed_at);
522 self.observe_with_timing(outcome, latency, bytes, operation_started);
523 }
524
525 fn observe_with_timing(
526 &self,
527 outcome: Outcome,
528 latency: Duration,
529 bytes: u64,
530 operation_started: Instant,
531 ) {
532 if !self.config.enabled {
533 return;
534 }
535 let mut g = lock(&self.inner);
536 if g.window.len() == self.config.window_ops {
537 g.window.pop_front();
538 }
539 g.window.push_back(Sample { outcome, latency });
540 if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
541 observe_hill_climb(
542 &mut g,
543 outcome,
544 latency,
545 bytes,
546 operation_started,
547 &self.config,
548 );
549 return;
550 }
551 g.samples_since_increase = g.samples_since_increase.saturating_add(1);
552 g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
553 if g.window.len() < self.config.min_window_ops {
554 return;
555 }
556 let decision = evaluate(&g.window, &self.config, g.latency_baseline);
557 apply_decision(&mut g, decision, &self.config);
558 }
559
560 pub fn warm_start(&self, start: usize) {
583 let clamped = start.clamp(
584 self.config.min_concurrency,
585 self.config.max_concurrency.max(1),
586 );
587 let mut g = lock(&self.inner);
588 g.current = clamped;
589 g.left_slow_start = clamped >= self.config.slow_start_ramp_threshold;
590 g.hill = HillClimbState::new(clamped, self.config.window_ops);
591 }
592
593 #[must_use]
595 pub fn snapshot(&self) -> usize {
596 let g = lock(&self.inner);
597 if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
598 g.hill.best_concurrency
599 } else {
600 g.current
601 }
602 }
603}
604
605#[derive(Debug, Clone, Copy)]
606struct HillEpochStats {
607 goodput_per_sec: f64,
608 latency_p95: Option<Duration>,
609}
610
611#[derive(Debug, Clone, Copy, PartialEq, Eq)]
613enum Decision {
614 Increase,
616 Decrease,
618 Hold,
620}
621
622fn evaluate(
623 window: &VecDeque<Sample>,
624 cfg: &LimiterConfig,
625 baseline: Option<Duration>,
626) -> Decision {
627 let mut successes = 0usize;
632 let mut timeouts = 0usize;
633 let mut net_errors = 0usize;
634 let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
635 for s in window {
636 match s.outcome {
637 Outcome::Success => {
638 successes += 1;
639 latencies.push(s.latency);
640 }
641 Outcome::Timeout => timeouts += 1,
642 Outcome::NetworkError => net_errors += 1,
643 Outcome::ApplicationError => {}
644 }
645 }
646 let capacity_total = successes + timeouts + net_errors;
647 if capacity_total < cfg.min_window_ops {
648 return Decision::Hold;
650 }
651 let total_f = capacity_total as f64;
652 let success_rate = successes as f64 / total_f;
653 let timeout_rate = timeouts as f64 / total_f;
654
655 if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
656 return Decision::Decrease;
657 }
658
659 if let Some(p95) = p95_of(&mut latencies) {
660 if cfg.latency_decrease_enabled {
661 if let Some(base) = baseline {
662 let limit = base.mul_f64(cfg.latency_inflation_factor);
663 if p95 > limit {
664 return Decision::Decrease;
665 }
666 }
667 }
668 Decision::Increase
669 } else {
670 Decision::Hold
671 }
672}
673
674fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
675 match decision {
676 Decision::Increase => {
677 if inner.samples_since_increase < cfg.window_ops {
680 return;
681 }
682 let p95 = window_p95(&inner.window);
683 inner.latency_baseline = Some(match inner.latency_baseline {
684 None => p95,
685 Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
686 });
687 let next = if inner.left_slow_start {
688 inner.current.saturating_add(1)
689 } else {
690 inner.current.saturating_mul(2)
691 };
692 let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
693 if next != inner.current {
694 debug!(
695 from = inner.current,
696 to = next,
697 slow_start = !inner.left_slow_start,
698 "adaptive: increase",
699 );
700 }
701 inner.current = next;
702 inner.samples_since_increase = 0;
703 inner.samples_since_decrease = 0;
704 }
705 Decision::Decrease => {
706 if inner.samples_since_decrease < cfg.min_window_ops {
711 return;
712 }
713 if inner.current >= cfg.slow_start_ramp_threshold {
720 inner.left_slow_start = true;
721 }
722 let next = (inner.current / 2).max(cfg.min_concurrency);
723 if next != inner.current {
724 debug!(from = inner.current, to = next, "adaptive: decrease");
725 }
726 inner.current = next;
727 inner.samples_since_increase = 0;
728 inner.samples_since_decrease = 0;
729 }
730 Decision::Hold => {}
731 }
732}
733
734fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
738 if latencies.is_empty() {
739 return None;
740 }
741 latencies.sort_unstable();
742 let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
743 let idx = idx.saturating_sub(1).min(latencies.len() - 1);
744 latencies.get(idx).copied()
745}
746
747fn window_p95(window: &VecDeque<Sample>) -> Duration {
748 let mut latencies: Vec<Duration> = window
749 .iter()
750 .filter(|s| matches!(s.outcome, Outcome::Success))
751 .map(|s| s.latency)
752 .collect();
753 p95_of(&mut latencies).unwrap_or(Duration::ZERO)
754}
755
756fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
757 let alpha = if alpha.is_finite() {
758 alpha.clamp(0.0, 1.0)
759 } else {
760 return prev;
761 };
762 let prev_ms = prev.as_secs_f64() * 1000.0;
763 let sample_ms = sample.as_secs_f64() * 1000.0;
764 let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
765 if !new_ms.is_finite() || new_ms < 0.0 {
766 return prev;
767 }
768 Duration::from_secs_f64(new_ms / 1000.0)
769}
770
771fn observe_hill_climb(
772 inner: &mut LimiterInner,
773 outcome: Outcome,
774 latency: Duration,
775 bytes: u64,
776 operation_started: Instant,
777 cfg: &LimiterConfig,
778) {
779 match inner.hill.epoch_started {
780 Some(epoch_started) if epoch_started <= operation_started => {}
781 _ => inner.hill.epoch_started = Some(operation_started),
782 }
783 inner.hill.epoch_samples = inner.hill.epoch_samples.saturating_add(1);
784 match outcome {
785 Outcome::Success => {
786 inner.hill.epoch_successes = inner.hill.epoch_successes.saturating_add(1);
787 inner.hill.epoch_bytes = inner.hill.epoch_bytes.saturating_add(bytes);
788 inner.hill.epoch_latencies.push(latency);
789 }
790 Outcome::Timeout => {
791 inner.hill.epoch_timeouts = inner.hill.epoch_timeouts.saturating_add(1);
792 }
793 Outcome::NetworkError => {
794 inner.hill.epoch_net_errors = inner.hill.epoch_net_errors.saturating_add(1);
795 }
796 Outcome::ApplicationError => {}
797 }
798
799 if hill_epoch_stressed(&inner.hill, cfg) {
800 apply_hill_stress(inner, cfg);
801 return;
802 }
803
804 if inner.hill.epoch_samples < hill_epoch_target_samples(inner.current, cfg) {
805 return;
806 }
807
808 if let Some(stats) = hill_epoch_stats(&inner.hill, cfg) {
809 apply_hill_epoch(inner, stats, cfg);
810 }
811 inner.hill.reset_epoch();
812}
813
814fn hill_epoch_target_samples(current: usize, cfg: &LimiterConfig) -> usize {
815 cfg.window_ops
816 .max(current.saturating_mul(HILL_EPOCH_FULL_WAVES))
817 .max(cfg.min_window_ops)
818}
819
820fn hill_epoch_stressed(hill: &HillClimbState, cfg: &LimiterConfig) -> bool {
821 let capacity_total = hill.capacity_total();
822 if capacity_total < cfg.min_window_ops {
823 return false;
824 }
825 let total_f = capacity_total as f64;
826 let success_rate = hill.epoch_successes as f64 / total_f;
827 let timeout_rate = hill.epoch_timeouts as f64 / total_f;
828 success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling
829}
830
831fn hill_epoch_stats(hill: &HillClimbState, cfg: &LimiterConfig) -> Option<HillEpochStats> {
832 let capacity_total = hill.capacity_total();
833 if capacity_total < cfg.min_window_ops || hill.epoch_successes == 0 {
834 return None;
835 }
836 let mut latencies = hill.epoch_latencies.clone();
837 let latency_p95 = p95_of(&mut latencies);
838 let max_latency = latencies.iter().copied().max().unwrap_or(Duration::ZERO);
839 let wall_elapsed = hill.epoch_started.map_or(Duration::ZERO, |s| s.elapsed());
840 let elapsed = wall_elapsed.max(max_latency);
841 let elapsed_secs = elapsed.as_secs_f64();
842 if !elapsed_secs.is_finite() || elapsed_secs <= 0.0 {
843 return None;
844 }
845
846 let units = if hill.epoch_bytes > 0 {
849 hill.epoch_bytes as f64
850 } else {
851 hill.epoch_successes as f64
852 };
853 Some(HillEpochStats {
854 goodput_per_sec: units / elapsed_secs,
855 latency_p95,
856 })
857}
858
859fn apply_hill_stress(inner: &mut LimiterInner, cfg: &LimiterConfig) {
860 let next = (inner.current / HILL_STRESS_DECREASE_DIVISOR)
861 .max(cfg.min_concurrency)
862 .min(cfg.max_concurrency);
863 if next != inner.current {
864 debug!(
865 from = inner.current,
866 to = next,
867 "adaptive: fetch hill stress decrease"
868 );
869 }
870 inner.current = next;
871 inner.hill.best_concurrency = next;
872 inner.hill.best_goodput_per_sec = None;
873 inner.hill.best_latency_p95 = None;
874 inner.hill.stable_epochs = 0;
875 inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
876 inner.hill.active_probe = None;
877 inner.hill.next_probe = ProbeDirection::Up;
878 inner.hill.reset_epoch();
879}
880
881fn apply_hill_epoch(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
882 let Some(best_goodput) = inner.hill.best_goodput_per_sec else {
883 inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
884 inner.hill.best_latency_p95 = stats.latency_p95;
885 inner.hill.best_concurrency = inner.current;
886 probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
887 return;
888 };
889
890 match inner.hill.active_probe {
891 Some(ProbeDirection::Up) => {
892 let improved = stats.goodput_per_sec >= best_goodput * HILL_UP_PROBE_ACCEPT_RATIO;
893 if improved
894 && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
895 {
896 accept_hill_probe(inner, stats, cfg);
897 probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
898 } else {
899 reject_hill_probe(inner);
900 }
901 }
902 Some(ProbeDirection::Down) => {
903 let retained = stats.goodput_per_sec >= best_goodput * HILL_DOWN_PROBE_ACCEPT_RATIO;
904 if retained
905 && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
906 {
907 accept_hill_probe(inner, stats, cfg);
908 inner.hill.next_probe = ProbeDirection::Up;
909 } else {
910 reject_hill_probe(inner);
911 }
912 }
913 None => {
914 refresh_hill_best(inner, stats, cfg);
915 if inner.hill.cooldown_epochs > 0 {
916 inner.hill.cooldown_epochs -= 1;
917 return;
918 }
919 inner.hill.stable_epochs = inner.hill.stable_epochs.saturating_add(1);
920 if inner.hill.stable_epochs >= HILL_STABLE_PROBE_EPOCHS {
921 let direction = inner.hill.next_probe;
922 inner.hill.next_probe = match direction {
923 ProbeDirection::Up => ProbeDirection::Down,
924 ProbeDirection::Down => ProbeDirection::Up,
925 };
926 probe_hill_neighbor(inner, direction, cfg);
927 }
928 }
929 }
930}
931
932fn refresh_hill_best(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
933 inner.hill.best_goodput_per_sec = Some(match inner.hill.best_goodput_per_sec {
934 Some(prev) => ewma_f64(prev, stats.goodput_per_sec, cfg.latency_ewma_alpha),
935 None => stats.goodput_per_sec,
936 });
937 if let Some(latency_p95) = stats.latency_p95 {
938 inner.hill.best_latency_p95 = Some(match inner.hill.best_latency_p95 {
939 Some(prev) => ewma(prev, latency_p95, cfg.latency_ewma_alpha),
940 None => latency_p95,
941 });
942 }
943}
944
945fn hill_latency_acceptable(
946 candidate: Option<Duration>,
947 best: Option<Duration>,
948 cfg: &LimiterConfig,
949) -> bool {
950 match (candidate, best) {
951 (Some(candidate), Some(best)) => candidate <= best.mul_f64(cfg.latency_inflation_factor),
952 _ => true,
953 }
954}
955
956fn ewma_f64(prev: f64, sample: f64, alpha: f64) -> f64 {
957 let alpha = if alpha.is_finite() {
958 alpha.clamp(0.0, 1.0)
959 } else {
960 return prev;
961 };
962 let next = (1.0 - alpha) * prev + alpha * sample;
963 if next.is_finite() && next >= 0.0 {
964 next
965 } else {
966 prev
967 }
968}
969
970fn accept_hill_probe(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
971 debug!(
972 concurrency = inner.current,
973 goodput_per_sec = stats.goodput_per_sec,
974 "adaptive: fetch hill accepted probe"
975 );
976 inner.hill.best_concurrency = inner.current;
977 inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
978 inner.hill.best_latency_p95 = stats.latency_p95;
979 inner.hill.active_probe = None;
980 inner.hill.cooldown_epochs = 0;
981 inner.hill.stable_epochs = 0;
982 inner.current = inner
983 .hill
984 .best_concurrency
985 .clamp(cfg.min_concurrency, cfg.max_concurrency);
986}
987
988fn reject_hill_probe(inner: &mut LimiterInner) {
989 let from = inner.current;
990 let to = inner.hill.best_concurrency;
991 let rejected_direction = inner.hill.active_probe;
992 if from != to {
993 debug!(from, to, "adaptive: fetch hill rejected probe");
994 }
995 inner.current = to;
996 inner.hill.active_probe = None;
997 if let Some(direction) = rejected_direction {
998 inner.hill.next_probe = match direction {
999 ProbeDirection::Up => ProbeDirection::Down,
1000 ProbeDirection::Down => ProbeDirection::Up,
1001 };
1002 }
1003 inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
1004 inner.hill.stable_epochs = 0;
1005}
1006
1007fn probe_hill_neighbor(inner: &mut LimiterInner, direction: ProbeDirection, cfg: &LimiterConfig) {
1008 let best = inner.hill.best_concurrency;
1009 let step = (best / HILL_PROBE_STEP_DIVISOR).max(HILL_MIN_PROBE_STEP);
1010 let candidate = match direction {
1011 ProbeDirection::Up => best.saturating_add(step).min(cfg.max_concurrency),
1012 ProbeDirection::Down => best.saturating_sub(step).max(cfg.min_concurrency),
1013 };
1014 if candidate == best {
1015 inner.current = best;
1016 inner.hill.active_probe = None;
1017 inner.hill.stable_epochs = 0;
1018 return;
1019 }
1020 debug!(
1021 from = best,
1022 to = candidate,
1023 ?direction,
1024 "adaptive: fetch hill probing"
1025 );
1026 inner.current = candidate;
1027 inner.hill.active_probe = Some(direction);
1028 inner.hill.stable_epochs = 0;
1029}
1030
1031#[derive(Debug, Clone)]
1033pub struct AdaptiveController {
1034 pub quote: Limiter,
1035 pub store: Limiter,
1036 pub fetch: Limiter,
1037 pub(crate) config: AdaptiveConfig,
1044 cold_start: ChannelStart,
1050}
1051
1052impl AdaptiveController {
1053 #[must_use]
1058 pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
1059 let mut config = config;
1060 config.sanitize();
1061 let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
1062 let store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
1063 let mut fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
1064 fetch_cfg.min_concurrency = fetch_cfg.min_concurrency.max(FETCH_MIN_FLOOR);
1081 fetch_cfg.max_concurrency = fetch_cfg.max_concurrency.max(fetch_cfg.min_concurrency);
1084 fetch_cfg.slow_start_ramp_threshold = usize::MAX;
1109 fetch_cfg.latency_decrease_enabled = false;
1110 Self {
1111 quote: Limiter::new(start.quote, quote_cfg),
1112 store: Limiter::new(start.store, store_cfg),
1113 fetch: Limiter::new_with_algorithm(
1114 start.fetch,
1115 fetch_cfg,
1116 LimiterAlgorithm::ThroughputHillClimb,
1117 ),
1118 config,
1119 cold_start: start,
1120 }
1121 }
1122
1123 #[must_use]
1125 pub fn snapshot(&self) -> ChannelStart {
1126 ChannelStart {
1127 quote: self.quote.snapshot(),
1128 store: self.store.snapshot(),
1129 fetch: self.fetch.snapshot(),
1130 }
1131 }
1132
1133 #[must_use]
1139 pub fn config(&self) -> &AdaptiveConfig {
1140 &self.config
1141 }
1142
1143 pub fn warm_start(&self, snapshot: ChannelStart) {
1157 if !self.config.enabled {
1158 return;
1159 }
1160 self.quote
1161 .warm_start(snapshot.quote.max(self.cold_start.quote));
1162 self.store
1163 .warm_start(snapshot.store.max(self.cold_start.store));
1164 self.fetch
1165 .warm_start(snapshot.fetch.max(self.cold_start.fetch));
1166 }
1167}
1168
1169impl Default for AdaptiveController {
1170 fn default() -> Self {
1171 Self::new(ChannelStart::default(), AdaptiveConfig::default())
1172 }
1173}
1174
1175struct ObserveGuard<'a> {
1183 limiter: &'a Limiter,
1184 started: Instant,
1185 outcome: Option<(Outcome, Duration, u64)>,
1186}
1187
1188impl<'a> ObserveGuard<'a> {
1189 fn new(limiter: &'a Limiter) -> Self {
1190 Self {
1191 limiter,
1192 started: Instant::now(),
1193 outcome: None,
1194 }
1195 }
1196 fn finish(&mut self, outcome: Outcome) {
1197 self.finish_with_bytes(outcome, 0);
1198 }
1199
1200 fn finish_with_bytes(&mut self, outcome: Outcome, bytes: u64) {
1201 self.outcome = Some((outcome, self.started.elapsed(), bytes));
1202 }
1203}
1204
1205impl Drop for ObserveGuard<'_> {
1206 fn drop(&mut self) {
1207 if let Some((outcome, latency, bytes)) = self.outcome.take() {
1208 self.limiter
1209 .observe_with_timing(outcome, latency, bytes, self.started);
1210 }
1211 }
1212}
1213
1214pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
1229where
1230 F: FnOnce() -> Fut,
1231 Fut: std::future::Future<Output = Result<T, E>>,
1232 C: FnOnce(&E) -> Outcome,
1233{
1234 let mut guard = ObserveGuard::new(limiter);
1235 let result = op().await;
1236 let outcome = match &result {
1237 Ok(_) => Outcome::Success,
1238 Err(e) => classify(e),
1239 };
1240 guard.finish(outcome);
1241 drop(guard); result
1243}
1244
1245pub async fn observe_op_with_success_bytes<T, E, F, Fut, C, B>(
1249 limiter: &Limiter,
1250 op: F,
1251 classify: C,
1252 success_bytes: B,
1253) -> Result<T, E>
1254where
1255 F: FnOnce() -> Fut,
1256 Fut: std::future::Future<Output = Result<T, E>>,
1257 C: FnOnce(&E) -> Outcome,
1258 B: FnOnce(&T) -> u64,
1259{
1260 let mut guard = ObserveGuard::new(limiter);
1261 let result = op().await;
1262 match &result {
1263 Ok(value) => guard.finish_with_bytes(Outcome::Success, success_bytes(value)),
1264 Err(e) => guard.finish_with_bytes(classify(e), 0),
1265 }
1266 drop(guard);
1267 result
1268}
1269
1270pub async fn rebucketed_unordered<I, T, E, F, Fut>(
1285 limiter: &Limiter,
1286 items: I,
1287 mut op: F,
1288) -> Result<Vec<T>, E>
1289where
1290 I: IntoIterator,
1291 F: FnMut(I::Item) -> Fut,
1292 Fut: std::future::Future<Output = Result<T, E>>,
1293{
1294 let mut iter = items.into_iter().peekable();
1295 let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
1296 let mut results = Vec::new();
1297 let mut pending_err: Option<E> = None;
1298 loop {
1299 if pending_err.is_none() {
1302 let cap = limiter.current().max(1);
1303 while in_flight.len() < cap {
1304 match iter.next() {
1305 Some(item) => in_flight.push(op(item)),
1306 None => break,
1307 }
1308 }
1309 }
1310 if in_flight.is_empty() {
1311 break;
1312 }
1313 match in_flight.next().await {
1314 Some(Ok(v)) => results.push(v),
1315 Some(Err(e)) => {
1316 if pending_err.is_none() {
1317 pending_err = Some(e);
1318 }
1319 }
1320 None => break,
1321 }
1322 }
1323 match pending_err {
1324 Some(e) => Err(e),
1325 None => Ok(results),
1326 }
1327}
1328
1329pub async fn rebucketed_ordered<I, U, E, F, Fut>(
1342 limiter: &Limiter,
1343 items: I,
1344 op: F,
1345) -> Result<Vec<U>, E>
1346where
1347 I: IntoIterator,
1348 F: FnMut(I::Item) -> Fut,
1349 Fut: std::future::Future<Output = Result<(usize, U), E>>,
1350{
1351 let mut indexed = rebucketed_unordered(limiter, items, op).await?;
1352 indexed.sort_by_key(|(idx, _)| *idx);
1353 Ok(indexed.into_iter().map(|(_, v)| v).collect())
1354}
1355
1356pub async fn rebucketed<I, T, E, F, Fut>(
1362 limiter: &Limiter,
1363 items: I,
1364 ordered: bool,
1365 mut op: F,
1366) -> Result<Vec<T>, E>
1367where
1368 I: IntoIterator,
1369 F: FnMut(I::Item) -> Fut,
1370 Fut: std::future::Future<Output = Result<T, E>>,
1371{
1372 if !ordered {
1373 return rebucketed_unordered(limiter, items, op).await;
1374 }
1375 let mut iter = items.into_iter();
1376 let mut results = Vec::new();
1377 let mut pending_err: Option<E> = None;
1378 loop {
1379 if pending_err.is_some() {
1380 break;
1381 }
1382 let cap = limiter.current().max(1);
1383 let mut batch = Vec::with_capacity(cap);
1384 for item in iter.by_ref().take(cap) {
1385 batch.push(op(item));
1386 }
1387 if batch.is_empty() {
1388 break;
1389 }
1390 let mut s = stream::iter(batch).buffered(cap);
1391 while let Some(r) = s.next().await {
1392 match r {
1393 Ok(v) => results.push(v),
1394 Err(e) => {
1395 if pending_err.is_none() {
1396 pending_err = Some(e);
1397 }
1398 }
1399 }
1400 }
1401 }
1402 match pending_err {
1403 Some(e) => Err(e),
1404 None => Ok(results),
1405 }
1406}
1407
1408#[derive(Debug, Clone, Serialize, Deserialize)]
1413struct PersistedState {
1414 schema: u32,
1415 channels: ChannelStart,
1416}
1417
1418const PERSIST_SCHEMA: u32 = 2;
1419const PERSIST_SCHEMA_AIMD_FETCH: u32 = 1;
1420const PERSIST_FILENAME: &str = "client_adaptive.json";
1421
1422#[must_use]
1426pub fn default_persist_path() -> Option<PathBuf> {
1427 crate::config::data_dir()
1428 .ok()
1429 .map(|d| d.join(PERSIST_FILENAME))
1430}
1431
1432#[must_use]
1438pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
1439 let bytes = std::fs::read(path).ok()?;
1440 let state: PersistedState = match serde_json::from_slice(&bytes) {
1441 Ok(s) => s,
1442 Err(e) => {
1443 warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
1444 return None;
1445 }
1446 };
1447 match state.schema {
1448 PERSIST_SCHEMA => Some(state.channels),
1449 PERSIST_SCHEMA_AIMD_FETCH => {
1450 debug!(
1451 path = %path.display(),
1452 "adaptive: migrating schema-1 snapshot, preserving quote/store and resetting fetch",
1453 );
1454 Some(ChannelStart {
1455 fetch: FETCH_COLD_START_CONCURRENCY,
1456 ..state.channels
1457 })
1458 }
1459 schema => {
1460 debug!(
1461 path = %path.display(),
1462 schema,
1463 expected = PERSIST_SCHEMA,
1464 "adaptive: snapshot schema mismatch, ignoring",
1465 );
1466 None
1467 }
1468 }
1469}
1470
1471pub fn save_snapshot(path: &Path, channels: ChannelStart) {
1474 let state = PersistedState {
1475 schema: PERSIST_SCHEMA,
1476 channels,
1477 };
1478 let bytes = match serde_json::to_vec_pretty(&state) {
1479 Ok(b) => b,
1480 Err(e) => {
1481 warn!(error = %e, "adaptive: snapshot serialize failed");
1482 return;
1483 }
1484 };
1485 if let Some(parent) = path.parent() {
1486 if let Err(e) = std::fs::create_dir_all(parent) {
1487 warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
1488 return;
1489 }
1490 }
1491 let nanos = std::time::SystemTime::now()
1498 .duration_since(std::time::UNIX_EPOCH)
1499 .map(|d| d.subsec_nanos())
1500 .unwrap_or(0);
1501 let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
1502 let tmp = path.with_extension(format!(
1503 "json.tmp.{}.{}.{}",
1504 std::process::id(),
1505 counter,
1506 nanos
1507 ));
1508 if let Err(e) = std::fs::write(&tmp, &bytes) {
1509 warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
1510 return;
1511 }
1512 if let Err(e) = std::fs::rename(&tmp, path) {
1513 warn!(
1514 from = %tmp.display(),
1515 to = %path.display(),
1516 error = %e,
1517 "adaptive: snapshot rename failed",
1518 );
1519 let _ = std::fs::remove_file(&tmp);
1522 }
1523}
1524
1525pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
1535 let handle = std::thread::spawn(move || {
1536 save_snapshot(&path, channels);
1537 });
1538 let started = Instant::now();
1542 let poll = Duration::from_millis(5);
1543 while started.elapsed() < timeout {
1544 if handle.is_finished() {
1545 let _ = handle.join();
1546 return;
1547 }
1548 std::thread::sleep(poll);
1549 }
1550 warn!(
1554 timeout_ms = timeout.as_millis() as u64,
1555 "adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
1556 );
1557 drop(handle);
1558}
1559
1560#[cfg(test)]
1561#[allow(clippy::unwrap_used)]
1562mod tests {
1563 use super::*;
1564
1565 const HILL_TEST_START_CAP: usize = 16;
1566 const HILL_TEST_UP_PROBE_CAP: usize = 20;
1567 const HILL_TEST_NEXT_UP_PROBE_CAP: usize = 25;
1568 const HILL_TEST_DOWN_PROBE_CAP: usize = 12;
1569 const HILL_TEST_CHUNK_BYTES: u64 = 1_000;
1570 const HILL_TEST_BASE_LATENCY_MS: u64 = 100;
1571 const HILL_TEST_REJECT_LATENCY_MS: u64 = 130;
1572 const HILL_TEST_RETAINED_DOWN_LATENCY_MS: u64 = 75;
1573 const HILL_TEST_ASYNC_LATENCY_MS: u64 = 10;
1574
1575 fn cfg_for_tests() -> LimiterConfig {
1576 LimiterConfig {
1577 enabled: true,
1578 min_concurrency: 1,
1579 max_concurrency: 64,
1580 window_ops: 10,
1581 min_window_ops: 5,
1582 success_target: 0.9,
1583 timeout_ceiling: 0.2,
1584 latency_inflation_factor: 2.0,
1585 latency_ewma_alpha: 0.5,
1586 slow_start_ramp_threshold: 0,
1587 latency_decrease_enabled: true,
1588 }
1589 }
1590
1591 fn hill_cfg_for_tests() -> LimiterConfig {
1592 LimiterConfig {
1593 window_ops: 4,
1594 min_window_ops: 2,
1595 max_concurrency: 64,
1596 success_target: 0.9,
1597 timeout_ceiling: 0.2,
1598 ..cfg_for_tests()
1599 }
1600 }
1601
1602 fn fetch_hill_for_tests(start: usize, cfg: LimiterConfig) -> Limiter {
1603 Limiter::new_with_algorithm(start, cfg, LimiterAlgorithm::ThroughputHillClimb)
1604 }
1605
1606 fn observe_hill_success_epoch_with_latency(
1607 limiter: &Limiter,
1608 cfg: &LimiterConfig,
1609 bytes: u64,
1610 latency: Duration,
1611 ) {
1612 let samples = hill_epoch_target_samples(limiter.current(), cfg);
1613 for _ in 0..samples {
1614 limiter.observe_with_bytes(Outcome::Success, latency, bytes);
1615 }
1616 }
1617
1618 fn observe_hill_success_epoch(limiter: &Limiter, cfg: &LimiterConfig, bytes: u64) {
1619 observe_hill_success_epoch_with_latency(
1620 limiter,
1621 cfg,
1622 bytes,
1623 Duration::from_millis(HILL_TEST_BASE_LATENCY_MS),
1624 );
1625 }
1626
1627 fn adaptive_cfg_for_tests() -> AdaptiveConfig {
1632 let l = cfg_for_tests();
1633 AdaptiveConfig {
1634 enabled: l.enabled,
1635 min_concurrency: l.min_concurrency,
1636 max: ChannelMax {
1637 quote: l.max_concurrency,
1638 store: l.max_concurrency,
1639 fetch: l.max_concurrency,
1640 },
1641 window_ops: l.window_ops,
1642 min_window_ops: l.min_window_ops,
1643 success_target: l.success_target,
1644 timeout_ceiling: l.timeout_ceiling,
1645 latency_inflation_factor: l.latency_inflation_factor,
1646 latency_ewma_alpha: l.latency_ewma_alpha,
1647 }
1648 }
1649
1650 #[test]
1651 fn warm_start_keeps_slow_start_armed_below_protected_threshold() {
1652 let cfg = LimiterConfig {
1661 max_concurrency: 256,
1662 slow_start_ramp_threshold: 256,
1663 latency_decrease_enabled: false,
1664 ..cfg_for_tests()
1665 };
1666 let l = Limiter::new(64, cfg.clone());
1667 l.warm_start(20);
1668 assert_eq!(l.current(), 20);
1669 for _ in 0..cfg.window_ops {
1672 l.observe(Outcome::Success, Duration::from_millis(10));
1673 }
1674 assert_eq!(
1675 l.current(),
1676 40,
1677 "protected channel must double after warm_start, not crawl +1",
1678 );
1679
1680 let default_cfg = LimiterConfig {
1683 max_concurrency: 256,
1684 ..cfg_for_tests()
1685 };
1686 let d = Limiter::new(64, default_cfg.clone());
1687 d.warm_start(20);
1688 for _ in 0..default_cfg.window_ops {
1689 d.observe(Outcome::Success, Duration::from_millis(10));
1690 }
1691 assert_eq!(
1692 d.current(),
1693 21,
1694 "default channel must stay additive after warm_start",
1695 );
1696 }
1697
1698 #[test]
1699 fn slow_start_stays_armed_at_ceiling_with_max_threshold() {
1700 let base = LimiterConfig {
1709 max_concurrency: 256,
1710 latency_decrease_enabled: false,
1711 ..cfg_for_tests()
1712 };
1713 let fixed = Limiter::new(
1714 256,
1715 LimiterConfig {
1716 slow_start_ramp_threshold: usize::MAX,
1717 ..base.clone()
1718 },
1719 );
1720 let buggy = Limiter::new(
1721 256,
1722 LimiterConfig {
1723 slow_start_ramp_threshold: 256,
1724 ..base.clone()
1725 },
1726 );
1727 for l in [&fixed, &buggy] {
1728 for _ in 0..base.window_ops {
1729 l.observe(Outcome::Timeout, Duration::from_millis(10));
1730 }
1731 for _ in 0..(base.window_ops * 10) {
1732 l.observe(Outcome::Success, Duration::from_millis(10));
1733 }
1734 }
1735 assert!(
1736 fixed.current() > buggy.current(),
1737 "MAX-threshold limiter ({}) must out-recover the ceiling-threshold one ({})",
1738 fixed.current(),
1739 buggy.current(),
1740 );
1741 }
1742
1743 #[test]
1744 fn protected_slow_start_recovers_faster_than_additive() {
1745 let base = LimiterConfig {
1750 max_concurrency: 256,
1751 latency_decrease_enabled: false,
1752 ..cfg_for_tests()
1753 };
1754 let protected = Limiter::new(
1755 64,
1756 LimiterConfig {
1757 slow_start_ramp_threshold: 256,
1758 ..base.clone()
1759 },
1760 );
1761 let unprotected = Limiter::new(
1762 64,
1763 LimiterConfig {
1764 slow_start_ramp_threshold: 0,
1765 ..base.clone()
1766 },
1767 );
1768
1769 for l in [&protected, &unprotected] {
1771 for _ in 0..base.window_ops {
1772 l.observe(Outcome::Timeout, Duration::from_millis(10));
1773 }
1774 }
1775 for l in [&protected, &unprotected] {
1779 for _ in 0..(base.window_ops * 10) {
1780 l.observe(Outcome::Success, Duration::from_millis(10));
1781 }
1782 }
1783 assert!(
1784 protected.current() > unprotected.current(),
1785 "protected slow-start ({}) should recover faster than additive ({})",
1786 protected.current(),
1787 unprotected.current(),
1788 );
1789 }
1790
1791 #[test]
1792 fn latency_decrease_disabled_ignores_p95_inflation() {
1793 let cfg = LimiterConfig {
1799 max_concurrency: 256,
1800 slow_start_ramp_threshold: 256,
1801 latency_decrease_enabled: false,
1802 ..cfg_for_tests()
1803 };
1804 let l = Limiter::new(16, cfg.clone());
1805 for _ in 0..cfg.window_ops {
1807 l.observe(Outcome::Success, Duration::from_millis(5));
1808 }
1809 let after_baseline = l.current();
1810 for _ in 0..cfg.window_ops {
1814 l.observe(Outcome::Success, Duration::from_millis(500));
1815 }
1816 assert!(
1817 l.current() >= after_baseline,
1818 "latency inflation must not shrink the cap when the check is disabled: {} < {}",
1819 l.current(),
1820 after_baseline,
1821 );
1822 }
1823
1824 #[test]
1825 fn controller_sets_fetch_channel_download_tuning() {
1826 let c = AdaptiveController::new(ChannelStart::default(), AdaptiveConfig::default());
1829 assert!(
1830 !c.fetch.config.latency_decrease_enabled,
1831 "fetch latency-decrease must be disabled",
1832 );
1833 assert_eq!(
1834 c.fetch.config.slow_start_ramp_threshold,
1835 usize::MAX,
1836 "fetch slow-start must never exit (armed at every cap incl. ceiling)",
1837 );
1838 assert!(
1839 c.quote.config.latency_decrease_enabled,
1840 "quote must keep the latency-decrease check",
1841 );
1842 assert_eq!(
1843 c.quote.config.slow_start_ramp_threshold, 0,
1844 "quote must keep classic AIMD slow-start exit",
1845 );
1846 assert!(c.store.config.latency_decrease_enabled);
1847 assert_eq!(c.store.config.slow_start_ramp_threshold, 0);
1848 }
1849
1850 #[test]
1851 fn cold_start_clamps_into_bounds() {
1852 let cfg = cfg_for_tests();
1853 let l = Limiter::new(1000, cfg.clone());
1854 assert_eq!(l.current(), cfg.max_concurrency);
1855 let l = Limiter::new(0, cfg.clone());
1856 assert_eq!(l.current(), cfg.min_concurrency);
1857 }
1858
1859 #[test]
1860 fn slow_start_doubles_then_caps() {
1861 let cfg = cfg_for_tests();
1862 let l = Limiter::new(2, cfg.clone());
1863 for _ in 0..cfg.window_ops {
1865 l.observe(Outcome::Success, Duration::from_millis(50));
1866 }
1867 assert_eq!(l.current(), 4);
1868 for _ in 0..cfg.window_ops {
1869 l.observe(Outcome::Success, Duration::from_millis(50));
1870 }
1871 assert_eq!(l.current(), 8);
1872 }
1873
1874 #[test]
1875 fn first_failure_exits_slow_start() {
1876 let cfg = cfg_for_tests();
1877 let l = Limiter::new(4, cfg.clone());
1878 for _ in 0..6 {
1882 l.observe(Outcome::Success, Duration::from_millis(50));
1883 }
1884 for _ in 0..4 {
1885 l.observe(Outcome::Timeout, Duration::from_millis(50));
1886 }
1887 let after_stress = l.current();
1888 assert!(
1889 after_stress < 4,
1890 "stress should reduce concurrency from 4, got {after_stress}",
1891 );
1892 for _ in 0..(cfg.window_ops * 5) {
1900 l.observe(Outcome::Success, Duration::from_millis(50));
1901 }
1902 assert!(
1903 l.current() > after_stress,
1904 "expected recovery above {after_stress}, got {}",
1905 l.current(),
1906 );
1907 }
1908
1909 #[test]
1910 fn floor_holds_at_one() {
1911 let cfg = cfg_for_tests();
1912 let l = Limiter::new(2, cfg);
1913 for _ in 0..30 {
1914 l.observe(Outcome::Timeout, Duration::from_millis(50));
1915 }
1916 assert_eq!(l.current(), 1);
1917 }
1918
1919 #[test]
1920 fn application_errors_do_not_punish() {
1921 let cfg = cfg_for_tests();
1922 let l = Limiter::new(4, cfg.clone());
1923 for _ in 0..cfg.window_ops * 5 {
1930 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
1931 }
1932 assert_eq!(
1933 l.current(),
1934 4,
1935 "ApplicationError must not move the cap; got {}",
1936 l.current()
1937 );
1938 }
1939
1940 #[test]
1941 fn latency_inflation_triggers_decrease() {
1942 let cfg = LimiterConfig {
1943 window_ops: 20,
1944 min_window_ops: 5,
1945 ..cfg_for_tests()
1946 };
1947 let l = Limiter::new(4, cfg.clone());
1948 for _ in 0..cfg.window_ops {
1950 l.observe(Outcome::Success, Duration::from_millis(50));
1951 }
1952 let after_baseline = l.current();
1953 for _ in 0..cfg.window_ops {
1955 l.observe(Outcome::Success, Duration::from_millis(500));
1956 }
1957 assert!(
1959 l.current() < after_baseline,
1960 "expected decrease from {after_baseline}, got {}",
1961 l.current(),
1962 );
1963 }
1964
1965 #[test]
1966 fn warm_start_overrides_current() {
1967 let cfg = cfg_for_tests();
1968 let l = Limiter::new(2, cfg);
1969 l.warm_start(20);
1970 assert_eq!(l.current(), 20);
1971 }
1972
1973 #[test]
1974 fn warm_start_clamps() {
1975 let cfg = cfg_for_tests();
1976 let l = Limiter::new(2, cfg.clone());
1977 l.warm_start(1_000_000);
1978 assert_eq!(l.current(), cfg.max_concurrency);
1979 }
1980
1981 #[test]
1982 fn disabled_controller_holds_steady() {
1983 let cfg = LimiterConfig {
1984 enabled: false,
1985 ..cfg_for_tests()
1986 };
1987 let l = Limiter::new(8, cfg);
1988 for _ in 0..50 {
1989 l.observe(Outcome::Timeout, Duration::from_millis(50));
1990 }
1991 assert_eq!(l.current(), 8);
1992 }
1993
1994 #[test]
1995 fn controller_snapshot_round_trips() {
1996 let c = AdaptiveController::new(
2002 ChannelStart {
2003 quote: 64,
2004 store: 16,
2005 fetch: 64,
2006 },
2007 adaptive_cfg_for_tests(),
2008 );
2009 let snap = c.snapshot();
2010 assert_eq!(snap.quote, 64);
2011 assert_eq!(snap.store, 16);
2012 assert_eq!(snap.fetch, 64);
2013
2014 let c2 = AdaptiveController::default();
2015 c2.warm_start(snap);
2016 assert_eq!(c2.quote.current(), 64);
2017 assert_eq!(c2.store.current(), 16);
2018 assert_eq!(c2.fetch.current(), 64);
2019 }
2020
2021 #[tokio::test]
2022 async fn observe_op_records_success() {
2023 let cfg = cfg_for_tests();
2024 let l = Limiter::new(4, cfg.clone());
2025 for _ in 0..cfg.window_ops {
2026 let _: Result<(), &str> =
2027 observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
2028 }
2029 assert_eq!(l.current(), 8);
2031 }
2032
2033 #[test]
2034 fn snapshot_round_trips_through_disk() {
2035 let dir = tempfile::tempdir().unwrap();
2036 let path = dir.path().join("client_adaptive.json");
2037 let snap = ChannelStart {
2038 quote: 24,
2039 store: 6,
2040 fetch: 12,
2041 };
2042 save_snapshot(&path, snap);
2043 let loaded = load_snapshot(&path).unwrap();
2044 assert_eq!(loaded.quote, 24);
2045 assert_eq!(loaded.store, 6);
2046 assert_eq!(loaded.fetch, 12);
2047 }
2048
2049 #[test]
2050 fn load_missing_returns_none() {
2051 let dir = tempfile::tempdir().unwrap();
2052 let path = dir.path().join("does_not_exist.json");
2053 assert!(load_snapshot(&path).is_none());
2054 }
2055
2056 #[test]
2057 fn load_corrupt_returns_none() {
2058 let dir = tempfile::tempdir().unwrap();
2059 let path = dir.path().join("bad.json");
2060 std::fs::write(&path, b"not valid json{{{").unwrap();
2061 assert!(load_snapshot(&path).is_none());
2062 }
2063
2064 #[test]
2065 fn load_wrong_schema_returns_none() {
2066 let dir = tempfile::tempdir().unwrap();
2067 let path = dir.path().join("future.json");
2068 let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
2071 std::fs::write(&path, payload).unwrap();
2072 assert!(load_snapshot(&path).is_none());
2073 }
2074
2075 #[test]
2076 fn load_schema_one_preserves_quote_store_and_resets_fetch() {
2077 const LEGACY_QUOTE_CAP: usize = 48;
2078 const LEGACY_STORE_CAP: usize = 24;
2079 const LEGACY_FETCH_CAP: usize = 96;
2080
2081 let dir = tempfile::tempdir().unwrap();
2082 let path = dir.path().join("legacy.json");
2083 let payload = format!(
2084 r#"{{"schema":{},"channels":{{"quote":{},"store":{},"fetch":{}}}}}"#,
2085 PERSIST_SCHEMA_AIMD_FETCH, LEGACY_QUOTE_CAP, LEGACY_STORE_CAP, LEGACY_FETCH_CAP,
2086 );
2087 std::fs::write(&path, payload).unwrap();
2088
2089 let loaded = load_snapshot(&path).unwrap();
2090
2091 assert_eq!(loaded.quote, LEGACY_QUOTE_CAP);
2092 assert_eq!(loaded.store, LEGACY_STORE_CAP);
2093 assert_eq!(loaded.fetch, FETCH_COLD_START_CONCURRENCY);
2094 }
2095
2096 #[tokio::test]
2097 async fn observe_op_records_classified_error() {
2098 let cfg = cfg_for_tests();
2099 let l = Limiter::new(4, cfg.clone());
2100 for _ in 0..cfg.window_ops {
2101 let _: Result<(), &str> =
2102 observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
2103 }
2104 assert!(l.current() < 4);
2105 }
2106
2107 #[test]
2117 fn no_regression_cold_start_at_least_static_defaults() {
2118 let s = ChannelStart::default();
2119 assert!(
2120 s.quote >= 32,
2121 "quote cold-start regressed: got {}, prior static was 32",
2122 s.quote,
2123 );
2124 assert!(
2125 s.store >= 8,
2126 "store cold-start regressed: got {}, prior static was 8",
2127 s.store,
2128 );
2129 assert_eq!(
2130 s.fetch, FETCH_COLD_START_CONCURRENCY,
2131 "fetch cold-start changed unexpectedly: got {}, expected {}",
2132 s.fetch, FETCH_COLD_START_CONCURRENCY,
2133 );
2134 }
2135
2136 #[test]
2140 fn controller_default_config_is_sane() {
2141 let c = AdaptiveController::default();
2142 let starts = ChannelStart::default();
2143 assert_eq!(c.quote.current(), starts.quote);
2144 assert_eq!(c.store.current(), starts.store);
2145 assert_eq!(c.fetch.current(), starts.fetch);
2146 assert_eq!(lock(&c.quote.inner).window.len(), 0);
2148 assert_eq!(lock(&c.store.inner).window.len(), 0);
2149 assert_eq!(lock(&c.fetch.inner).window.len(), 0);
2150 }
2151
2152 #[test]
2156 fn alternating_success_failure_collapses_to_floor() {
2157 let cfg = cfg_for_tests();
2163 let l = Limiter::new(8, cfg.clone());
2164 let mut min_observed = usize::MAX;
2165 let mut max_observed = 0usize;
2166 let mut floor_visits = 0usize;
2167 for i in 0..1000 {
2168 let outcome = if i % 2 == 0 {
2169 Outcome::Success
2170 } else {
2171 Outcome::Timeout
2172 };
2173 l.observe(outcome, Duration::from_millis(50));
2174 let cur = l.current();
2175 assert!(
2176 cur >= cfg.min_concurrency,
2177 "cap underflowed floor at iter {i}: got {cur}",
2178 );
2179 min_observed = min_observed.min(cur);
2180 max_observed = max_observed.max(cur);
2181 if cur == cfg.min_concurrency {
2182 floor_visits += 1;
2183 }
2184 }
2185 assert_eq!(
2186 min_observed, cfg.min_concurrency,
2187 "cap never reached the floor under 50% timeout rate"
2188 );
2189 assert!(
2190 max_observed >= 8,
2191 "cap never visited the start value: max_observed={max_observed}"
2192 );
2193 assert!(
2197 floor_visits > 500,
2198 "cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
2199 );
2200 assert_eq!(
2201 l.current(),
2202 cfg.min_concurrency,
2203 "controller did not settle at floor after 1000 alternations"
2204 );
2205 }
2206
2207 #[test]
2211 fn pure_success_stream_recovers_to_max() {
2212 let cfg = cfg_for_tests();
2213 let l = Limiter::new(cfg.min_concurrency, cfg.clone());
2214 for _ in 0..10_000 {
2215 l.observe(Outcome::Success, Duration::from_millis(5));
2216 }
2217 assert_eq!(
2218 l.current(),
2219 cfg.max_concurrency,
2220 "expected recovery to max ({}), got {}",
2221 cfg.max_concurrency,
2222 l.current(),
2223 );
2224 }
2225
2226 #[test]
2230 fn stress_then_heal_drives_floor_then_recovery() {
2231 let cfg = cfg_for_tests();
2232 let l = Limiter::new(8, cfg.clone());
2233 for _ in 0..100 {
2234 l.observe(Outcome::Timeout, Duration::from_millis(50));
2235 }
2236 let after_stress = l.current();
2237 assert_eq!(
2238 after_stress, cfg.min_concurrency,
2239 "stress should drive cap to floor, got {after_stress}",
2240 );
2241 for _ in 0..1_000 {
2242 l.observe(Outcome::Success, Duration::from_millis(10));
2243 }
2244 let after_heal = l.current();
2245 assert!(
2246 after_heal >= cfg.min_concurrency.saturating_add(4),
2247 "expected substantial recovery from floor, got {after_heal}",
2248 );
2249 }
2250
2251 #[test]
2255 fn baseline_does_not_grow_unbounded_under_slow_links() {
2256 let cfg = cfg_for_tests();
2257 let l = Limiter::new(2, cfg.clone());
2258 for _ in 0..(cfg.window_ops * 10) {
2259 l.observe(Outcome::Success, Duration::from_millis(500));
2260 }
2261 let baseline = lock(&l.inner).latency_baseline;
2262 let base = baseline.expect("baseline should be set after many healthy windows");
2263 assert!(
2264 base > Duration::ZERO,
2265 "baseline must not stay at ZERO, got {base:?}",
2266 );
2267 let lo = Duration::from_millis(250);
2269 let hi = Duration::from_millis(1000);
2270 assert!(
2271 base >= lo && base <= hi,
2272 "baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
2273 );
2274 }
2275
2276 #[test]
2281 fn baseline_initialized_only_after_first_healthy_window() {
2282 let cfg = cfg_for_tests();
2283 let l = Limiter::new(8, cfg.clone());
2284 for _ in 0..50 {
2285 l.observe(Outcome::Timeout, Duration::from_millis(50));
2286 }
2287 assert!(
2289 lock(&l.inner).latency_baseline.is_none(),
2290 "baseline must be None before any healthy window",
2291 );
2292 for _ in 0..(cfg.window_ops * 5) {
2294 l.observe(Outcome::Success, Duration::from_millis(20));
2295 }
2296 let baseline = lock(&l.inner).latency_baseline;
2297 assert!(
2298 baseline.is_some(),
2299 "baseline must be Some after healthy windows",
2300 );
2301 let base = baseline.unwrap_or_default();
2302 assert!(
2303 base > Duration::ZERO,
2304 "baseline must reflect real latency, got {base:?}",
2305 );
2306 }
2307
2308 #[test]
2311 fn min_concurrency_floor_holds_under_torrent_of_errors() {
2312 let cfg = cfg_for_tests();
2313 let l = Limiter::new(8, cfg.clone());
2314 for i in 0..50_000 {
2315 l.observe(Outcome::Timeout, Duration::from_millis(50));
2316 if i == 100 || i == 1_000 || i == 49_999 {
2317 let cur = l.current();
2318 assert_eq!(
2319 cur, cfg.min_concurrency,
2320 "floor breached at iter {i}: got {cur}",
2321 );
2322 }
2323 }
2324 }
2325
2326 #[test]
2328 fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
2329 let cfg = cfg_for_tests();
2330 let start = cfg
2331 .max_concurrency
2332 .saturating_sub(1)
2333 .max(cfg.min_concurrency);
2334 let l = Limiter::new(start, cfg.clone());
2335 for i in 0..50_000 {
2336 l.observe(Outcome::Success, Duration::from_millis(5));
2337 if i == 100 || i == 1_000 || i == 49_999 {
2338 let cur = l.current();
2339 assert!(
2340 cur <= cfg.max_concurrency,
2341 "ceiling breached at iter {i}: got {cur} > {}",
2342 cfg.max_concurrency,
2343 );
2344 }
2345 }
2346 assert_eq!(l.current(), cfg.max_concurrency);
2347 }
2348
2349 #[test]
2355 fn saturating_arithmetic_handles_extreme_config() {
2356 let cfg = LimiterConfig {
2357 max_concurrency: usize::MAX / 2,
2358 ..cfg_for_tests()
2359 };
2360 let start = usize::MAX / 4;
2361 let l = Limiter::new(start, cfg.clone());
2362 for _ in 0..(cfg.window_ops * 10) {
2363 l.observe(Outcome::Success, Duration::from_millis(1));
2364 }
2365 assert_eq!(
2370 l.current(),
2371 cfg.max_concurrency,
2372 "saturating math survived but cap did not grow to ceiling"
2373 );
2374 }
2375
2376 #[test]
2383 fn window_eviction_is_fifo() {
2384 let cfg = LimiterConfig {
2385 window_ops: 10,
2386 min_window_ops: 5,
2387 success_target: 0.9,
2388 timeout_ceiling: 0.1,
2389 ..cfg_for_tests()
2390 };
2391 let l = Limiter::new(8, cfg.clone());
2392 for _ in 0..cfg.window_ops {
2397 l.observe(Outcome::Timeout, Duration::from_millis(50));
2398 }
2399 let after_stress = l.current();
2400 assert!(
2401 after_stress < 8,
2402 "expected cap to drop from 8 after pure-timeout window, got {after_stress}"
2403 );
2404 for _ in 0..(cfg.window_ops * 3) {
2409 l.observe(Outcome::Success, Duration::from_millis(20));
2410 }
2411 let after_recovery = l.current();
2412 assert!(
2415 after_recovery > after_stress,
2416 "FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
2417 );
2418 }
2419
2420 #[test]
2423 fn disabled_controller_returns_initial_value_invariantly() {
2424 let cfg = LimiterConfig {
2425 enabled: false,
2426 ..cfg_for_tests()
2427 };
2428 let initial = 8;
2429 let l = Limiter::new(initial, cfg);
2430 for i in 0..1_000 {
2431 let outcome = match i % 4 {
2432 0 => Outcome::Success,
2433 1 => Outcome::Timeout,
2434 2 => Outcome::NetworkError,
2435 _ => Outcome::ApplicationError,
2436 };
2437 l.observe(outcome, Duration::from_millis(50));
2438 assert_eq!(
2439 l.current(),
2440 initial,
2441 "disabled controller moved at iter {i}",
2442 );
2443 }
2444 }
2445
2446 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2449 async fn concurrent_observations_do_not_corrupt_window() {
2450 let cfg = cfg_for_tests();
2451 let l = Limiter::new(4, cfg.clone());
2452 let mut handles = Vec::with_capacity(100);
2453 for _ in 0..100 {
2454 let l_clone = l.clone();
2455 handles.push(tokio::spawn(async move {
2456 for _ in 0..100 {
2457 l_clone.observe(Outcome::Success, Duration::from_millis(5));
2458 }
2459 }));
2460 }
2461 for h in handles {
2462 h.await.unwrap();
2463 }
2464 let cur = l.current();
2465 assert!(
2466 cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
2467 "cap out of bounds after concurrent observations: {cur}",
2468 );
2469 }
2470
2471 #[test]
2476 fn persisted_snapshot_warm_starts_above_cold_floor() {
2477 let dir = tempfile::tempdir().unwrap();
2478 let path = dir.path().join("client_adaptive.json");
2479 let saved = ChannelStart {
2482 quote: 64,
2483 store: 32,
2484 fetch: 128,
2485 };
2486 save_snapshot(&path, saved);
2487 let loaded = load_snapshot(&path).unwrap();
2488
2489 let low = ChannelStart {
2492 quote: 2,
2493 store: 2,
2494 fetch: 2,
2495 };
2496 let c = AdaptiveController::new(low, AdaptiveConfig::default());
2497 c.warm_start(loaded);
2498 assert_eq!(c.quote.current(), 64);
2499 assert_eq!(c.store.current(), 32);
2500 assert_eq!(c.fetch.current(), 128);
2501 }
2502
2503 #[test]
2507 fn save_load_round_trip_with_concurrent_writes() {
2508 use std::thread;
2509 let dir = tempfile::tempdir().unwrap();
2510 let path = dir.path().join("client_adaptive.json");
2511 let path_a = path.clone();
2512 let path_b = path.clone();
2513 let snap_a = ChannelStart {
2514 quote: 10,
2515 store: 10,
2516 fetch: 10,
2517 };
2518 let snap_b = ChannelStart {
2519 quote: 99,
2520 store: 99,
2521 fetch: 99,
2522 };
2523 let h_a = thread::spawn(move || {
2524 for _ in 0..50 {
2525 save_snapshot(&path_a, snap_a);
2526 }
2527 });
2528 let h_b = thread::spawn(move || {
2529 for _ in 0..50 {
2530 save_snapshot(&path_b, snap_b);
2531 }
2532 });
2533 h_a.join().unwrap();
2534 h_b.join().unwrap();
2535 let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
2536 let valid = (loaded.quote == snap_a.quote
2537 && loaded.store == snap_a.store
2538 && loaded.fetch == snap_a.fetch)
2539 || (loaded.quote == snap_b.quote
2540 && loaded.store == snap_b.store
2541 && loaded.fetch == snap_b.fetch);
2542 assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
2543 }
2544
2545 #[test]
2548 fn save_snapshot_to_unwritable_dir_does_not_panic() {
2549 let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
2553 let snap = ChannelStart {
2554 quote: 1,
2555 store: 1,
2556 fetch: 1,
2557 };
2558 save_snapshot(&path, snap);
2560 assert!(!path.exists());
2562 }
2563
2564 #[test]
2567 fn load_snapshot_from_truncated_file_returns_none() {
2568 let dir = tempfile::tempdir().unwrap();
2569 let path = dir.path().join("truncated.json");
2570 std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
2571 assert!(load_snapshot(&path).is_none());
2572 }
2573
2574 #[test]
2578 fn controller_perf_overhead_is_bounded() {
2579 let cfg = cfg_for_tests();
2580 let l = Limiter::new(8, cfg);
2581 let started = Instant::now();
2582 for _ in 0..100_000 {
2583 let _ = l.current();
2584 l.observe(Outcome::Success, Duration::from_micros(1));
2585 }
2586 let elapsed = started.elapsed();
2587 assert!(
2590 elapsed < Duration::from_millis(500),
2591 "100k observe+current pairs took {elapsed:?}, expected <500ms",
2592 );
2593 }
2594
2595 #[test]
2603 fn nan_and_out_of_range_config_does_not_panic() {
2604 let cfg = AdaptiveConfig {
2605 enabled: true,
2606 min_concurrency: 0, max: ChannelMax {
2608 quote: 0, store: 0,
2610 fetch: 0,
2611 },
2612 window_ops: 10,
2613 min_window_ops: 50, success_target: f64::NAN,
2615 timeout_ceiling: f64::INFINITY,
2616 latency_inflation_factor: f64::NEG_INFINITY,
2617 latency_ewma_alpha: f64::NAN,
2618 };
2619 let c = AdaptiveController::new(ChannelStart::default(), cfg);
2620 let post = &c.config;
2624 assert_eq!(
2625 post.min_concurrency, 1,
2626 "sanitize did not raise min_concurrency from 0"
2627 );
2628 assert!(
2629 post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
2630 "sanitize did not clamp success_target from NaN: {}",
2631 post.success_target
2632 );
2633 assert!(
2634 post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
2635 "sanitize did not clamp timeout_ceiling from Inf: {}",
2636 post.timeout_ceiling
2637 );
2638 assert!(
2639 post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
2640 "sanitize did not fix latency_inflation_factor from -Inf: {}",
2641 post.latency_inflation_factor
2642 );
2643 assert!(
2644 post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
2645 "sanitize did not fix latency_ewma_alpha from NaN: {}",
2646 post.latency_ewma_alpha
2647 );
2648 assert!(
2649 post.min_window_ops <= post.window_ops,
2650 "sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
2651 post.min_window_ops,
2652 post.window_ops
2653 );
2654 assert!(
2655 post.max.quote >= post.min_concurrency,
2656 "max.quote below min_concurrency"
2657 );
2658 for _ in 0..200 {
2661 c.store
2662 .observe(Outcome::Success, Duration::from_secs(99_999));
2663 c.store.observe(Outcome::Timeout, Duration::ZERO);
2664 }
2665 let cur = c.store.current();
2666 assert!(cur >= 1, "cap below floor: {cur}");
2667 }
2668
2669 #[test]
2676 fn transient_burst_does_not_pile_drive_to_floor() {
2677 let cfg = LimiterConfig {
2678 window_ops: 32,
2679 min_window_ops: 8,
2680 success_target: 0.95,
2681 timeout_ceiling: 0.10,
2682 ..cfg_for_tests()
2683 };
2684 let l = Limiter::new(32, cfg);
2685 for _ in 0..8 {
2689 l.observe(Outcome::Timeout, Duration::from_millis(10));
2690 }
2691 let after_burst = l.current();
2694 assert!(
2695 after_burst >= 16,
2696 "transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
2697 );
2698 }
2699
2700 #[tokio::test]
2705 async fn transport_errors_classify_as_capacity_signal() {
2706 use crate::data::client::classify_error;
2707 use crate::data::error::Error;
2708 let make_cfg = || LimiterConfig {
2709 window_ops: 16,
2710 min_window_ops: 5,
2711 success_target: 0.5,
2712 timeout_ceiling: 0.5,
2713 ..cfg_for_tests()
2714 };
2715 type ErrFactory = Box<dyn Fn() -> Error>;
2717 let cases: Vec<(&str, ErrFactory)> = vec![
2718 ("Network", Box::new(|| Error::Network("net".to_string()))),
2719 (
2720 "InsufficientPeers",
2721 Box::new(|| Error::InsufficientPeers("ip".to_string())),
2722 ),
2723 ("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
2724 ("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
2725 ("Storage", Box::new(|| Error::Storage("s".to_string()))),
2726 (
2727 "PartialUpload",
2728 Box::new(|| Error::PartialUpload {
2729 stored: vec![],
2730 stored_count: 0,
2731 failed: vec![],
2732 failed_count: 0,
2733 total_chunks: 0,
2734 reason: "r".to_string(),
2735 }),
2736 ),
2737 ];
2738 for (name, mk) in &cases {
2739 let l = Limiter::new(8, make_cfg());
2740 for _ in 0..16 {
2741 let _: std::result::Result<(), Error> =
2742 observe_op(&l, || async { Err(mk()) }, classify_error).await;
2743 }
2744 let cur = l.current();
2748 assert!(
2749 cur < 8,
2750 "{name} not classified as capacity signal: cap stayed at {cur}",
2751 );
2752 }
2753 }
2754
2755 #[test]
2759 fn per_channel_ceilings_are_independent() {
2760 let cfg = AdaptiveConfig {
2761 max: ChannelMax {
2762 quote: 4, store: 8, fetch: 1024, },
2766 ..AdaptiveConfig::default()
2767 };
2768 let c = AdaptiveController::new(
2769 ChannelStart {
2770 quote: 4,
2771 store: 8,
2772 fetch: 64,
2773 },
2774 cfg,
2775 );
2776 for _ in 0..1000 {
2779 c.quote.observe(Outcome::Success, Duration::from_micros(10));
2780 c.store.observe(Outcome::Success, Duration::from_micros(10));
2781 c.fetch.observe(Outcome::Success, Duration::from_micros(10));
2782 }
2783 assert_eq!(c.quote.current(), 4, "quote should cap at 4");
2784 assert_eq!(c.store.current(), 8, "store should cap at 8");
2785 assert!(
2789 c.fetch.current() > 8 && c.fetch.current() <= 1024,
2790 "fetch did not use its independent ceiling; got {}",
2791 c.fetch.current()
2792 );
2793 }
2794
2795 #[test]
2796 fn fetch_hill_rejects_upward_probe_without_goodput_gain() {
2797 let cfg = hill_cfg_for_tests();
2798 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2799
2800 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2801 assert_eq!(
2802 l.current(),
2803 HILL_TEST_UP_PROBE_CAP,
2804 "first healthy epoch should probe upward"
2805 );
2806
2807 observe_hill_success_epoch_with_latency(
2808 &l,
2809 &cfg,
2810 HILL_TEST_CHUNK_BYTES,
2811 Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
2812 );
2813 assert_eq!(
2814 l.current(),
2815 HILL_TEST_START_CAP,
2816 "slower higher-cap wave should reject the upward probe"
2817 );
2818 assert_eq!(l.snapshot(), HILL_TEST_START_CAP);
2819 }
2820
2821 #[test]
2822 fn fetch_hill_accepts_upward_probe_with_goodput_gain() {
2823 let cfg = hill_cfg_for_tests();
2824 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2825
2826 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2827 assert_eq!(l.current(), HILL_TEST_UP_PROBE_CAP);
2828
2829 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2830 assert_eq!(
2831 l.snapshot(),
2832 HILL_TEST_UP_PROBE_CAP,
2833 "same-size chunks at same latency should promote the higher cap"
2834 );
2835 assert_eq!(
2836 l.current(),
2837 HILL_TEST_NEXT_UP_PROBE_CAP,
2838 "after accepting an upward probe, hill climber should probe higher"
2839 );
2840 }
2841
2842 #[test]
2843 fn fetch_hill_accepts_lower_probe_when_goodput_is_retained() {
2844 let cfg = hill_cfg_for_tests();
2845 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2846
2847 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2848 observe_hill_success_epoch_with_latency(
2849 &l,
2850 &cfg,
2851 HILL_TEST_CHUNK_BYTES,
2852 Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
2853 );
2854 assert_eq!(l.current(), HILL_TEST_START_CAP);
2855
2856 for _ in 0..(HILL_REJECT_COOLDOWN_EPOCHS + HILL_STABLE_PROBE_EPOCHS) {
2857 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2858 }
2859 assert_eq!(
2860 l.current(),
2861 HILL_TEST_DOWN_PROBE_CAP,
2862 "stable best should eventually probe a lower cap"
2863 );
2864
2865 observe_hill_success_epoch_with_latency(
2866 &l,
2867 &cfg,
2868 HILL_TEST_CHUNK_BYTES,
2869 Duration::from_millis(HILL_TEST_RETAINED_DOWN_LATENCY_MS),
2870 );
2871 assert_eq!(
2872 l.snapshot(),
2873 HILL_TEST_DOWN_PROBE_CAP,
2874 "retained goodput at lower concurrency should become the new best"
2875 );
2876 }
2877
2878 #[tokio::test]
2879 async fn fetch_hill_accepts_constant_size_upward_probe_from_timed_ops() {
2880 let cfg = hill_cfg_for_tests();
2881 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2882 let total_ops = hill_epoch_target_samples(HILL_TEST_START_CAP, &cfg)
2883 + hill_epoch_target_samples(HILL_TEST_UP_PROBE_CAP, &cfg);
2884 let limiter_for_ops = l.clone();
2885
2886 let result: std::result::Result<Vec<()>, ()> =
2887 rebucketed_unordered(&l, 0..total_ops, move |_| {
2888 let limiter = limiter_for_ops.clone();
2889 async move {
2890 observe_op_with_success_bytes(
2891 &limiter,
2892 || async {
2893 tokio::time::sleep(Duration::from_millis(HILL_TEST_ASYNC_LATENCY_MS))
2894 .await;
2895 Ok::<(), ()>(())
2896 },
2897 |_| Outcome::NetworkError,
2898 |_| HILL_TEST_CHUNK_BYTES,
2899 )
2900 .await
2901 }
2902 })
2903 .await;
2904 result.unwrap();
2905
2906 assert_eq!(
2907 l.snapshot(),
2908 HILL_TEST_UP_PROBE_CAP,
2909 "timed constant-size chunks should prove the higher cap improves goodput"
2910 );
2911 assert_eq!(
2912 l.current(),
2913 HILL_TEST_NEXT_UP_PROBE_CAP,
2914 "accepted upward probe should immediately test the next cap"
2915 );
2916 }
2917
2918 #[test]
2919 fn fetch_hill_stress_cuts_before_full_epoch() {
2920 let cfg = LimiterConfig {
2921 window_ops: 8,
2922 min_window_ops: 4,
2923 ..hill_cfg_for_tests()
2924 };
2925 let l = fetch_hill_for_tests(16, cfg.clone());
2926
2927 for _ in 0..cfg.min_window_ops {
2928 l.observe(Outcome::Timeout, Duration::from_millis(10));
2929 }
2930
2931 assert_eq!(
2932 l.current(),
2933 8,
2934 "fetch hill climber should halve on early stress"
2935 );
2936 }
2937
2938 #[test]
2942 fn cold_start_at_least_prior_static_defaults() {
2943 let cs = ChannelStart::default();
2944 assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
2945 assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
2946 assert_eq!(
2947 cs.fetch, FETCH_COLD_START_CONCURRENCY,
2948 "fetch cold-start changed unexpectedly"
2949 );
2950 }
2951
2952 #[test]
2965 fn sustained_stress_reaches_floor_within_bounded_ops() {
2966 let cfg = LimiterConfig {
2967 window_ops: 32,
2968 min_window_ops: 8,
2969 success_target: 0.95,
2970 timeout_ceiling: 0.10,
2971 max_concurrency: 64,
2972 ..cfg_for_tests()
2973 };
2974 let l = Limiter::new(64, cfg);
2975 let mut ops = 0usize;
2976 while l.current() > 1 && ops < 200 {
2977 l.observe(Outcome::Timeout, Duration::from_millis(10));
2978 ops += 1;
2979 }
2980 assert_eq!(
2981 l.current(),
2982 1,
2983 "controller did not reach floor within 200 observations under \
2984 sustained timeout stress; took {ops} ops, ended at cap {}",
2985 l.current()
2986 );
2987 }
2988
2989 #[test]
2994 fn default_controller_has_growth_headroom() {
2995 let c = AdaptiveController::default();
2996 let cs = ChannelStart::default();
2997 let max = ChannelMax::default();
2998 assert_eq!(c.quote.current(), cs.quote);
2999 assert_eq!(c.store.current(), cs.store);
3000 assert_eq!(c.fetch.current(), cs.fetch);
3001 assert!(
3002 max.quote > cs.quote,
3003 "no growth headroom for quote: max={} start={}",
3004 max.quote,
3005 cs.quote
3006 );
3007 assert!(
3008 max.store > cs.store,
3009 "no growth headroom for store: max={} start={}",
3010 max.store,
3011 cs.store
3012 );
3013 assert!(
3014 max.fetch > cs.fetch,
3015 "no growth headroom for fetch: max={} start={}",
3016 max.fetch,
3017 cs.fetch
3018 );
3019 }
3020
3021 #[test]
3028 fn warm_start_floors_at_cold_defaults() {
3029 let c = AdaptiveController::default();
3030 let cold = ChannelStart::default();
3031 let bad_snap = ChannelStart {
3033 quote: 1,
3034 store: 1,
3035 fetch: 1,
3036 };
3037 c.warm_start(bad_snap);
3038 assert_eq!(
3041 c.quote.current(),
3042 cold.quote,
3043 "quote warm_start did not floor at cold default"
3044 );
3045 assert_eq!(
3046 c.store.current(),
3047 cold.store,
3048 "store warm_start did not floor at cold default"
3049 );
3050 assert_eq!(
3051 c.fetch.current(),
3052 cold.fetch,
3053 "fetch warm_start did not floor at cold default"
3054 );
3055 }
3056
3057 #[test]
3060 fn warm_start_honors_values_above_cold_floor() {
3061 let c = AdaptiveController::default();
3062 let cold = ChannelStart::default();
3063 let snap = ChannelStart {
3064 quote: cold.quote * 2,
3065 store: cold.store * 4,
3066 fetch: cold.fetch * 2,
3067 };
3068 c.warm_start(snap);
3069 assert_eq!(c.quote.current(), snap.quote);
3070 assert_eq!(c.store.current(), snap.store);
3071 assert_eq!(c.fetch.current(), snap.fetch);
3072 }
3073
3074 #[tokio::test]
3081 async fn rebucketed_picks_up_cap_changes_mid_stream() {
3082 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3083 use std::sync::Arc as StdArc;
3084 let cfg = LimiterConfig {
3085 min_concurrency: 1,
3086 max_concurrency: 32,
3087 ..cfg_for_tests()
3088 };
3089 let l = Limiter::new(4, cfg);
3090 let max_seen = StdArc::new(AtomicUsize::new(0));
3091 let in_flight = StdArc::new(AtomicUsize::new(0));
3092 let processed = StdArc::new(AtomicUsize::new(0));
3093 let l_for_bump = l.clone();
3094 let processed_for_bump = processed.clone();
3095 let bump_handle = tokio::spawn(async move {
3098 loop {
3099 tokio::time::sleep(Duration::from_millis(2)).await;
3100 if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
3101 l_for_bump.warm_start(16);
3102 return;
3103 }
3104 }
3105 });
3106 let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
3107 let max_seen = max_seen.clone();
3108 let in_flight = in_flight.clone();
3109 let processed = processed.clone();
3110 async move {
3111 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3112 max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
3113 tokio::time::sleep(Duration::from_millis(1)).await;
3114 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3115 processed.fetch_add(1, AtomicOrdering::Relaxed);
3116 Ok::<(), &'static str>(())
3117 }
3118 })
3119 .await
3120 .unwrap();
3121 bump_handle.await.unwrap();
3122 let peak = max_seen.load(AtomicOrdering::Relaxed);
3126 assert!(
3127 peak > 4,
3128 "rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
3129 );
3130 }
3131
3132 #[tokio::test]
3141 async fn observe_op_cancellation_drops_silently() {
3142 let cfg = LimiterConfig {
3143 window_ops: 16,
3144 min_window_ops: 4,
3145 ..cfg_for_tests()
3146 };
3147 let l = Limiter::new(4, cfg);
3148 let l_clone = l.clone();
3152 let fut = observe_op(
3153 &l_clone,
3154 || async {
3155 std::future::pending::<()>().await;
3156 Ok::<(), &'static str>(())
3157 },
3158 |_| Outcome::Timeout,
3159 );
3160 drop(fut);
3161 assert_eq!(l.current(), 4, "cancelled op moved the cap");
3163 for _ in 0..16 {
3168 let _: Result<(), &'static str> = observe_op(
3169 &l,
3170 || async { Ok(()) },
3171 |_| Outcome::NetworkError,
3173 )
3174 .await;
3175 }
3176 assert!(
3179 l.current() > 4,
3180 "cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
3181 l.current(),
3182 );
3183 }
3184
3185 #[test]
3192 fn save_snapshot_is_synchronous_and_durable() {
3193 let dir = tempfile::tempdir().unwrap();
3194 let path = dir.path().join("client_adaptive.json");
3195 let snap = ChannelStart {
3196 quote: 100,
3197 store: 50,
3198 fetch: 200,
3199 };
3200 save_snapshot(&path, snap);
3201 assert!(
3204 path.exists(),
3205 "save_snapshot did not write file synchronously"
3206 );
3207 let loaded = load_snapshot(&path).unwrap();
3208 assert_eq!(loaded.quote, 100);
3209 assert_eq!(loaded.store, 50);
3210 assert_eq!(loaded.fetch, 200);
3211 }
3212
3213 #[tokio::test]
3220 async fn warm_start_disables_slow_start_doubling() {
3221 let cfg = LimiterConfig {
3222 window_ops: 8,
3223 min_window_ops: 4,
3224 success_target: 0.9,
3225 ..cfg_for_tests()
3226 };
3227 let l = Limiter::new(2, cfg.clone());
3228 l.warm_start(16);
3231 assert_eq!(l.current(), 16);
3232 for _ in 0..cfg.window_ops {
3235 l.observe(Outcome::Success, Duration::from_millis(10));
3236 }
3237 assert_eq!(
3238 l.current(),
3239 17,
3240 "warm-start triggered slow-start doubling instead of additive +1"
3241 );
3242 }
3243
3244 #[test]
3249 fn controller_warm_start_floors_at_per_instance_cold_start() {
3250 let custom_cold = ChannelStart {
3251 quote: 2,
3252 store: 1,
3253 fetch: 4,
3254 };
3255 let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
3256 c.warm_start(ChannelStart {
3258 quote: 1,
3259 store: 1,
3260 fetch: 1,
3261 });
3262 assert_eq!(c.quote.current(), 2);
3263 assert_eq!(c.store.current(), 1);
3264 assert_eq!(c.fetch.current(), 4);
3265 c.warm_start(ChannelStart {
3267 quote: 10,
3268 store: 10,
3269 fetch: 10,
3270 });
3271 assert_eq!(c.quote.current(), 10);
3272 assert_eq!(c.store.current(), 10);
3273 assert_eq!(c.fetch.current(), 10);
3274 }
3275
3276 #[test]
3280 fn warm_start_is_noop_when_adaptive_disabled() {
3281 let cfg = AdaptiveConfig {
3282 enabled: false,
3283 ..AdaptiveConfig::default()
3284 };
3285 let custom_cold = ChannelStart {
3286 quote: 5,
3287 store: 5,
3288 fetch: 5,
3289 };
3290 let c = AdaptiveController::new(custom_cold, cfg);
3291 c.warm_start(ChannelStart {
3292 quote: 100,
3293 store: 100,
3294 fetch: 100,
3295 });
3296 assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
3297 assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
3298 assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
3299 }
3300
3301 #[tokio::test]
3305 async fn rebucketed_unordered_is_rolling_not_fenced() {
3306 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3307 use std::sync::Arc as StdArc;
3308 let cfg = LimiterConfig {
3309 min_concurrency: 1,
3310 max_concurrency: 8,
3311 window_ops: 100,
3312 min_window_ops: 50,
3313 ..cfg_for_tests()
3314 };
3315 let l = Limiter::new(4, cfg);
3316 let in_flight = StdArc::new(AtomicUsize::new(0));
3317 let max_in_flight = StdArc::new(AtomicUsize::new(0));
3318 let started = StdArc::new(AtomicUsize::new(0));
3319 let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
3320 let in_flight = in_flight.clone();
3321 let max_in_flight = max_in_flight.clone();
3322 let started = started.clone();
3323 async move {
3324 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3325 max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
3326 started.fetch_add(1, AtomicOrdering::Relaxed);
3327 if i == 0 {
3333 tokio::time::sleep(Duration::from_millis(50)).await;
3334 } else {
3335 tokio::time::sleep(Duration::from_millis(1)).await;
3336 }
3337 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3338 Ok::<(), &'static str>(())
3339 }
3340 })
3341 .await
3342 .unwrap();
3343 assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
3346 let peak = max_in_flight.load(AtomicOrdering::Relaxed);
3347 assert!(
3348 peak >= 4,
3349 "rolling scheduler did not fill cap; peak in-flight = {peak}"
3350 );
3351 }
3352
3353 #[tokio::test]
3355 async fn rebucketed_ordered_preserves_input_order() {
3356 let cfg = LimiterConfig {
3357 min_concurrency: 1,
3358 max_concurrency: 4,
3359 ..cfg_for_tests()
3360 };
3361 let l = Limiter::new(4, cfg);
3362 let items: Vec<usize> = (0..50).collect();
3363 let result: Vec<usize> = rebucketed_ordered(
3364 &l,
3365 items.iter().copied().enumerate(),
3366 |(idx, v)| async move {
3367 let delay = (50 - v) as u64;
3369 tokio::time::sleep(Duration::from_micros(delay)).await;
3370 Ok::<_, &'static str>((idx, v * 10))
3371 },
3372 )
3373 .await
3374 .unwrap();
3375 assert_eq!(result.len(), 50);
3376 for (i, v) in result.iter().enumerate() {
3377 assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
3378 }
3379 }
3380
3381 #[tokio::test]
3386 async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
3387 let cfg = LimiterConfig {
3388 min_concurrency: 1,
3389 max_concurrency: 8,
3390 ..cfg_for_tests()
3391 };
3392 let l = Limiter::new(8, cfg);
3393 let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
3398 let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
3399 let delay = (40 - idx) as u64; tokio::time::sleep(Duration::from_micros(delay)).await;
3401 Ok::<_, &'static str>((idx, hash * 7))
3403 })
3404 .await
3405 .unwrap();
3406 for (i, v) in result.iter().enumerate() {
3407 let expected = (1000 + i as u64) * 7;
3408 assert_eq!(
3409 *v, expected,
3410 "idx {i} paired with wrong content: {v}, expected {expected}"
3411 );
3412 }
3413 }
3414
3415 #[test]
3419 fn save_snapshot_temp_file_is_unique_per_call() {
3420 let dir = tempfile::tempdir().unwrap();
3421 let path = dir.path().join("client_adaptive.json");
3422 for i in 0..100 {
3429 save_snapshot(
3430 &path,
3431 ChannelStart {
3432 quote: i + 1,
3433 store: i + 1,
3434 fetch: i + 1,
3435 },
3436 );
3437 }
3438 let loaded = load_snapshot(&path).unwrap();
3439 assert_eq!(loaded.quote, 100);
3440 assert_eq!(loaded.store, 100);
3441 assert_eq!(loaded.fetch, 100);
3442 let leftover: Vec<_> = std::fs::read_dir(dir.path())
3444 .unwrap()
3445 .filter_map(|e| e.ok())
3446 .filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
3447 .collect();
3448 assert!(
3449 leftover.is_empty(),
3450 "temp files leaked: {:?}",
3451 leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
3452 );
3453 }
3454
3455 #[tokio::test]
3460 async fn rebucketed_empty_input_returns_empty() {
3461 let cfg = cfg_for_tests();
3462 let l = Limiter::new(4, cfg);
3463 let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
3464 Ok::<_, &'static str>(42usize)
3465 })
3466 .await
3467 .unwrap();
3468 assert!(v.is_empty());
3469 let v: Vec<usize> = rebucketed_ordered(
3470 &l,
3471 std::iter::empty::<(usize, ())>(),
3472 |(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
3473 )
3474 .await
3475 .unwrap();
3476 assert!(v.is_empty());
3477 }
3478
3479 #[tokio::test]
3481 async fn rebucketed_exactly_cap_items() {
3482 let cfg = LimiterConfig {
3483 min_concurrency: 1,
3484 max_concurrency: 4,
3485 ..cfg_for_tests()
3486 };
3487 let l = Limiter::new(4, cfg);
3488 let v: Vec<usize> =
3489 rebucketed_unordered(
3490 &l,
3491 0..4usize,
3492 |i| async move { Ok::<_, &'static str>(i * 2) },
3493 )
3494 .await
3495 .unwrap();
3496 assert_eq!(v.len(), 4);
3497 }
3498
3499 #[tokio::test]
3502 async fn rebucketed_preserves_first_error() {
3503 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3504 use std::sync::Arc as StdArc;
3505 let cfg = LimiterConfig {
3506 min_concurrency: 1,
3507 max_concurrency: 4,
3508 ..cfg_for_tests()
3509 };
3510 let l = Limiter::new(4, cfg);
3511 let started = StdArc::new(AtomicUsize::new(0));
3512 let started_clone = started.clone();
3513 let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
3514 let started = started_clone.clone();
3515 async move {
3516 started.fetch_add(1, AtomicOrdering::Relaxed);
3517 if i == 5 {
3518 tokio::time::sleep(Duration::from_micros(100)).await;
3521 return Err("first error");
3522 }
3523 if i == 10 {
3524 return Err("second error - should be ignored");
3525 }
3526 tokio::time::sleep(Duration::from_micros(50)).await;
3527 Ok(())
3528 }
3529 })
3530 .await;
3531 match result {
3532 Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
3533 Ok(_) => panic!("expected error, got ok"),
3534 }
3535 let total = started.load(AtomicOrdering::Relaxed);
3541 assert!(
3542 (5..20).contains(&total),
3543 "started count out of range: {total}"
3544 );
3545 }
3546
3547 #[test]
3550 fn limiter_with_min_equal_max_is_pinned() {
3551 let cfg = LimiterConfig {
3552 min_concurrency: 5,
3553 max_concurrency: 5,
3554 ..cfg_for_tests()
3555 };
3556 let l = Limiter::new(5, cfg);
3557 for _ in 0..1000 {
3558 l.observe(Outcome::Success, Duration::from_millis(1));
3559 }
3560 assert_eq!(l.current(), 5, "cap moved despite min==max");
3561 for _ in 0..1000 {
3562 l.observe(Outcome::Timeout, Duration::from_millis(50));
3563 }
3564 assert_eq!(l.current(), 5, "cap moved despite min==max");
3565 }
3566
3567 #[test]
3570 fn ewma_alpha_zero_returns_prev() {
3571 let prev = Duration::from_millis(100);
3572 let sample = Duration::from_millis(500);
3573 let result = ewma(prev, sample, 0.0);
3574 assert_eq!(result, prev, "alpha=0 must return prev unchanged");
3575 }
3576
3577 #[test]
3580 fn ewma_alpha_one_returns_sample() {
3581 let prev = Duration::from_millis(100);
3582 let sample = Duration::from_millis(500);
3583 let result = ewma(prev, sample, 1.0);
3584 let diff = result.abs_diff(sample);
3586 assert!(
3587 diff <= Duration::from_millis(1),
3588 "alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
3589 );
3590 }
3591
3592 #[test]
3594 fn ewma_alpha_half_returns_midpoint() {
3595 let prev = Duration::from_millis(200);
3596 let sample = Duration::from_millis(400);
3597 let result = ewma(prev, sample, 0.5);
3598 let expected = Duration::from_millis(300);
3599 let diff = result.abs_diff(expected);
3600 assert!(
3601 diff <= Duration::from_millis(1),
3602 "alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
3603 );
3604 }
3605
3606 #[test]
3610 fn ewma_nan_alpha_returns_prev() {
3611 let prev = Duration::from_millis(100);
3612 let sample = Duration::from_millis(500);
3613 let result = ewma(prev, sample, f64::NAN);
3614 assert_eq!(result, prev);
3615 let result = ewma(prev, sample, f64::INFINITY);
3616 assert_eq!(result, prev);
3617 let result = ewma(prev, sample, f64::NEG_INFINITY);
3618 assert_eq!(result, prev);
3619 }
3620
3621 #[test]
3624 fn ewma_clamps_alpha_above_one() {
3625 let prev = Duration::from_millis(100);
3626 let sample = Duration::from_millis(500);
3627 let result = ewma(prev, sample, 2.5);
3628 assert!(result >= Duration::from_millis(499));
3630 assert!(result <= Duration::from_millis(501));
3631 }
3632
3633 #[test]
3637 fn window_full_of_application_errors_does_not_move_cap() {
3638 let cfg = cfg_for_tests();
3639 let l = Limiter::new(8, cfg.clone());
3640 for _ in 0..(cfg.window_ops * 5) {
3641 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3642 }
3643 assert_eq!(
3644 l.current(),
3645 8,
3646 "cap moved on pure-app-error window; should hold"
3647 );
3648 }
3649
3650 #[test]
3654 fn disabled_adaptive_controller_truly_inert() {
3655 let cfg = AdaptiveConfig {
3656 enabled: false,
3657 ..AdaptiveConfig::default()
3658 };
3659 let c = AdaptiveController::new(ChannelStart::default(), cfg);
3660 let baseline_quote = c.quote.current();
3661 let baseline_store = c.store.current();
3662 let baseline_fetch = c.fetch.current();
3663 for _ in 0..10000 {
3664 c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
3665 c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3666 c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
3667 }
3668 assert_eq!(c.quote.current(), baseline_quote);
3669 assert_eq!(c.store.current(), baseline_store);
3670 assert_eq!(c.fetch.current(), baseline_fetch);
3671 }
3672
3673 #[test]
3678 fn channel_state_is_independent() {
3679 let c = AdaptiveController::default();
3680 let q0 = c.quote.current();
3681 let f0 = c.fetch.current();
3682 let s0 = c.store.current();
3683 for _ in 0..1000 {
3684 c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3685 }
3686 assert_eq!(
3688 c.store.current(),
3689 c.config.min_concurrency,
3690 "store did not reach floor after 1000 timeouts; cap={}",
3691 c.store.current()
3692 );
3693 assert!(c.store.current() < s0, "store cap did not move at all");
3694 assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
3696 assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
3697 }
3698
3699 #[test]
3705 fn sanitize_corrects_pathological_floats() {
3706 let mut cfg = AdaptiveConfig {
3707 success_target: f64::NAN,
3708 timeout_ceiling: 5.0,
3709 latency_inflation_factor: f64::NEG_INFINITY,
3710 latency_ewma_alpha: 2.5,
3711 window_ops: 4,
3712 min_window_ops: 10,
3713 ..AdaptiveConfig::default()
3714 };
3715 cfg.sanitize();
3716 assert!(cfg.success_target.is_finite());
3717 assert!((0.0..=1.0).contains(&cfg.success_target));
3718 assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
3719 assert!(cfg.latency_inflation_factor.is_finite());
3720 assert!(cfg.latency_inflation_factor > 0.0);
3721 assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
3722 assert!(
3723 cfg.min_window_ops <= cfg.window_ops,
3724 "min_window_ops {} > window_ops {}",
3725 cfg.min_window_ops,
3726 cfg.window_ops
3727 );
3728 }
3729
3730 #[test]
3735 fn channel_max_serde_round_trips() {
3736 let m = ChannelMax {
3737 quote: 7,
3738 store: 13,
3739 fetch: 200,
3740 };
3741 let json = serde_json::to_string(&m).unwrap();
3742 let back: ChannelMax = serde_json::from_str(&json).unwrap();
3743 assert_eq!(back.quote, 7);
3744 assert_eq!(back.store, 13);
3745 assert_eq!(back.fetch, 200);
3746 }
3747
3748 #[test]
3749 fn channel_start_serde_round_trips() {
3750 let s = ChannelStart {
3751 quote: 11,
3752 store: 22,
3753 fetch: 33,
3754 };
3755 let json = serde_json::to_string(&s).unwrap();
3756 let back: ChannelStart = serde_json::from_str(&json).unwrap();
3757 assert_eq!(back.quote, 11);
3758 assert_eq!(back.store, 22);
3759 assert_eq!(back.fetch, 33);
3760 }
3761
3762 #[tokio::test]
3767 async fn rebucketed_honors_cap_shrinkage_mid_stream() {
3768 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3769 use std::sync::Arc as StdArc;
3770 let cfg = LimiterConfig {
3771 min_concurrency: 1,
3772 max_concurrency: 16,
3773 ..cfg_for_tests()
3774 };
3775 let l = Limiter::new(16, cfg);
3776 let in_flight = StdArc::new(AtomicUsize::new(0));
3777 let max_after_shrink = StdArc::new(AtomicUsize::new(0));
3778 let processed = StdArc::new(AtomicUsize::new(0));
3779 let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
3780 let l_for_shrink = l.clone();
3781 let p_for_shrink = processed.clone();
3782 let shrunk_for_shrink = shrunk.clone();
3783 let shrink_handle = tokio::spawn(async move {
3784 loop {
3786 tokio::time::sleep(Duration::from_millis(2)).await;
3787 if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
3788 l_for_shrink.warm_start(2);
3789 shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
3790 return;
3791 }
3792 }
3793 });
3794 let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
3795 let in_flight = in_flight.clone();
3796 let max_after_shrink = max_after_shrink.clone();
3797 let processed = processed.clone();
3798 let shrunk = shrunk.clone();
3799 async move {
3800 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3801 if shrunk.load(AtomicOrdering::Relaxed) {
3802 max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
3803 }
3804 tokio::time::sleep(Duration::from_millis(1)).await;
3805 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3806 processed.fetch_add(1, AtomicOrdering::Relaxed);
3807 Ok::<(), &'static str>(())
3808 }
3809 })
3810 .await
3811 .unwrap();
3812 shrink_handle.await.unwrap();
3813 let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
3814 assert!(
3819 peak <= 4,
3820 "rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
3821 );
3822 }
3823
3824 #[test]
3830 fn mixed_window_app_errors_with_capacity_signal() {
3831 let cfg = LimiterConfig {
3832 window_ops: 10,
3833 min_window_ops: 5,
3834 timeout_ceiling: 0.2,
3835 success_target: 0.9,
3836 ..cfg_for_tests()
3837 };
3838 let l = Limiter::new(8, cfg.clone());
3843 for _ in 0..5 {
3844 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3845 }
3846 for _ in 0..5 {
3847 l.observe(Outcome::Success, Duration::from_millis(50));
3848 }
3849 assert!(
3850 l.current() >= 8,
3851 "AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
3852 l.current()
3853 );
3854 let l2 = Limiter::new(8, cfg);
3857 for _ in 0..5 {
3858 l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
3859 }
3860 for _ in 0..5 {
3861 l2.observe(Outcome::Timeout, Duration::from_millis(50));
3862 }
3863 assert!(
3864 l2.current() < 8,
3865 "all-timeouts (with AppError padding) did not decrease cap; got {}",
3866 l2.current()
3867 );
3868 }
3869
3870 #[test]
3876 fn concurrent_save_load_no_torn_reads() {
3877 use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
3878 use std::thread;
3879 let dir = tempfile::tempdir().unwrap();
3880 let path = dir.path().join("snap.json");
3881 save_snapshot(
3883 &path,
3884 ChannelStart {
3885 quote: 1,
3886 store: 1,
3887 fetch: 1,
3888 },
3889 );
3890 let stop = std::sync::Arc::new(AtomicBool::new(false));
3891 let p_w = path.clone();
3892 let s_w = stop.clone();
3893 let writer = thread::spawn(move || {
3894 let mut i = 1usize;
3895 while !s_w.load(AtomicOrdering::Relaxed) {
3896 save_snapshot(
3897 &p_w,
3898 ChannelStart {
3899 quote: i,
3900 store: i,
3901 fetch: i,
3902 },
3903 );
3904 i = i.wrapping_add(1).max(1);
3905 }
3906 });
3907 let p_r = path.clone();
3908 let reader = thread::spawn(move || {
3909 let mut torn = 0usize;
3910 for _ in 0..2_000 {
3911 if let Some(snap) = load_snapshot(&p_r) {
3912 if snap.quote != snap.store || snap.store != snap.fetch {
3915 torn += 1;
3916 }
3917 }
3918 }
3919 torn
3920 });
3921 let torn = reader.join().unwrap();
3922 stop.store(true, AtomicOrdering::Relaxed);
3923 writer.join().unwrap();
3924 assert_eq!(
3925 torn, 0,
3926 "observed {torn} torn reads under concurrent writes"
3927 );
3928 }
3929
3930 #[test]
3938 fn save_with_timeout_returns_promptly_on_fast_failure() {
3939 let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
3940 let snap = ChannelStart {
3941 quote: 1,
3942 store: 1,
3943 fetch: 1,
3944 };
3945 let started = Instant::now();
3946 save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
3947 let elapsed = started.elapsed();
3948 assert!(
3951 elapsed < Duration::from_secs(1),
3952 "save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
3953 );
3954 }
3955
3956 #[test]
3961 fn save_with_timeout_bounds_wall_time_on_hang() {
3962 let dir = tempfile::tempdir().unwrap();
3974 let path = dir.path().join("snap.json");
3975 let snap = ChannelStart {
3976 quote: 1,
3977 store: 1,
3978 fetch: 1,
3979 };
3980 let started = Instant::now();
3981 save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
3984 let elapsed = started.elapsed();
3985 assert!(
3986 elapsed < Duration::from_millis(200),
3987 "timeout wrapper did not bound wall time: {elapsed:?}"
3988 );
3989 }
3990}