1use futures::stream::{self, FuturesUnordered, StreamExt};
51use serde::{Deserialize, Serialize};
52use std::collections::VecDeque;
53use std::path::{Path, PathBuf};
54use std::sync::atomic::{AtomicU64, Ordering};
55use std::sync::{Arc, Mutex, PoisonError};
56use std::time::{Duration, Instant};
57use tracing::{debug, warn};
58
59static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
63
64const FETCH_COLD_START_CONCURRENCY: usize = 4;
68
69const HILL_PROBE_STEP_DIVISOR: usize = 4;
71
72const HILL_MIN_PROBE_STEP: usize = 1;
74
75const HILL_UP_PROBE_ACCEPT_RATIO: f64 = 1.05;
77
78const HILL_DOWN_PROBE_ACCEPT_RATIO: f64 = 0.98;
80
81const HILL_REJECT_COOLDOWN_EPOCHS: usize = 2;
83
84const HILL_STABLE_PROBE_EPOCHS: usize = 3;
87
88const HILL_STRESS_DECREASE_DIVISOR: usize = 2;
90
91const HILL_EPOCH_FULL_WAVES: usize = 2;
95
96fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
99 m.lock().unwrap_or_else(PoisonError::into_inner)
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum Outcome {
105 Success,
107 Timeout,
109 NetworkError,
111 ApplicationError,
115}
116
117const FETCH_MIN_FLOOR: usize = 4;
127
128#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
132pub struct ChannelMax {
133 pub quote: usize,
134 pub store: usize,
135 pub fetch: usize,
136}
137
138impl Default for ChannelMax {
139 fn default() -> Self {
140 Self {
145 quote: 128,
146 store: 64,
147 fetch: 256,
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct AdaptiveConfig {
158 pub enabled: bool,
161 pub min_concurrency: usize,
163 pub max: ChannelMax,
165 pub window_ops: usize,
168 pub min_window_ops: usize,
170 pub success_target: f64,
173 pub timeout_ceiling: f64,
176 pub latency_inflation_factor: f64,
179 pub latency_ewma_alpha: f64,
184}
185
186impl AdaptiveConfig {
187 pub fn sanitize(&mut self) {
193 if !self.latency_ewma_alpha.is_finite() {
194 self.latency_ewma_alpha = 0.2;
195 }
196 self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
197 if !self.success_target.is_finite() {
198 self.success_target = 0.95;
199 }
200 self.success_target = self.success_target.clamp(0.0, 1.0);
201 if !self.timeout_ceiling.is_finite() {
202 self.timeout_ceiling = 0.10;
203 }
204 self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
205 if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
206 self.latency_inflation_factor = 4.0;
207 }
208 self.min_concurrency = self.min_concurrency.max(1);
209 self.window_ops = self.window_ops.max(1);
210 self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
211 self.max.quote = self.max.quote.max(self.min_concurrency);
212 self.max.store = self.max.store.max(self.min_concurrency);
213 self.max.fetch = self.max.fetch.max(self.min_concurrency);
214 }
215}
216
217impl Default for AdaptiveConfig {
218 fn default() -> Self {
219 Self {
220 enabled: true,
221 min_concurrency: 1,
222 max: ChannelMax::default(),
223 window_ops: 32,
224 min_window_ops: 8,
225 success_target: 0.95,
226 timeout_ceiling: 0.10,
227 latency_inflation_factor: 4.0,
234 latency_ewma_alpha: 0.2,
235 }
236 }
237}
238
239#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
250pub struct ChannelStart {
251 pub quote: usize,
252 pub store: usize,
253 pub fetch: usize,
254}
255
256impl Default for ChannelStart {
257 fn default() -> Self {
258 Self {
259 quote: 32,
260 store: 8,
261 fetch: FETCH_COLD_START_CONCURRENCY,
262 }
263 }
264}
265
266#[derive(Debug, Clone, Copy)]
268struct Sample {
269 outcome: Outcome,
270 latency: Duration,
271}
272
273#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276enum LimiterAlgorithm {
277 Aimd,
278 ThroughputHillClimb,
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283enum ProbeDirection {
284 Up,
285 Down,
286}
287
288#[derive(Debug)]
290struct HillClimbState {
291 epoch_started: Option<Instant>,
292 epoch_samples: usize,
293 epoch_successes: usize,
294 epoch_timeouts: usize,
295 epoch_net_errors: usize,
296 epoch_bytes: u64,
297 epoch_latencies: Vec<Duration>,
298 best_goodput_per_sec: Option<f64>,
299 best_latency_p95: Option<Duration>,
300 best_concurrency: usize,
301 stable_epochs: usize,
302 cooldown_epochs: usize,
303 next_probe: ProbeDirection,
304 active_probe: Option<ProbeDirection>,
305}
306
307impl HillClimbState {
308 fn new(start: usize, epoch_capacity: usize) -> Self {
309 Self {
310 epoch_started: None,
311 epoch_samples: 0,
312 epoch_successes: 0,
313 epoch_timeouts: 0,
314 epoch_net_errors: 0,
315 epoch_bytes: 0,
316 epoch_latencies: Vec::with_capacity(epoch_capacity),
317 best_goodput_per_sec: None,
318 best_latency_p95: None,
319 best_concurrency: start,
320 stable_epochs: 0,
321 cooldown_epochs: 0,
322 next_probe: ProbeDirection::Up,
323 active_probe: None,
324 }
325 }
326
327 fn reset_epoch(&mut self) {
328 self.epoch_started = None;
329 self.epoch_samples = 0;
330 self.epoch_successes = 0;
331 self.epoch_timeouts = 0;
332 self.epoch_net_errors = 0;
333 self.epoch_bytes = 0;
334 self.epoch_latencies.clear();
335 }
336
337 fn capacity_total(&self) -> usize {
338 self.epoch_successes + self.epoch_timeouts + self.epoch_net_errors
339 }
340}
341
342#[derive(Debug, Clone)]
348pub struct LimiterConfig {
349 pub enabled: bool,
350 pub min_concurrency: usize,
351 pub max_concurrency: usize,
352 pub window_ops: usize,
353 pub min_window_ops: usize,
354 pub success_target: f64,
355 pub timeout_ceiling: f64,
356 pub latency_inflation_factor: f64,
357 pub latency_ewma_alpha: f64,
358 pub slow_start_ramp_threshold: usize,
372 pub latency_decrease_enabled: bool,
382}
383
384impl LimiterConfig {
385 fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
386 Self {
387 enabled: cfg.enabled,
388 min_concurrency: cfg.min_concurrency,
389 max_concurrency: max_for_channel.max(cfg.min_concurrency),
390 window_ops: cfg.window_ops,
391 min_window_ops: cfg.min_window_ops,
392 success_target: cfg.success_target,
393 timeout_ceiling: cfg.timeout_ceiling,
394 latency_inflation_factor: cfg.latency_inflation_factor,
395 latency_ewma_alpha: cfg.latency_ewma_alpha,
396 slow_start_ramp_threshold: 0,
399 latency_decrease_enabled: true,
400 }
401 }
402
403 fn sanitize(&mut self) {
409 if !self.latency_ewma_alpha.is_finite() {
410 self.latency_ewma_alpha = 0.2;
411 }
412 self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
413 if !self.success_target.is_finite() {
414 self.success_target = 0.95;
415 }
416 self.success_target = self.success_target.clamp(0.0, 1.0);
417 if !self.timeout_ceiling.is_finite() {
418 self.timeout_ceiling = 0.10;
419 }
420 self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
421 if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
422 self.latency_inflation_factor = 4.0;
423 }
424 self.min_concurrency = self.min_concurrency.max(1);
425 self.window_ops = self.window_ops.max(1);
426 self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
427 self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
428 }
429}
430
431#[derive(Debug, Clone)]
437pub struct Limiter {
438 inner: Arc<Mutex<LimiterInner>>,
439 config: Arc<LimiterConfig>,
440 algorithm: LimiterAlgorithm,
441}
442
443#[derive(Debug)]
444struct LimiterInner {
445 current: usize,
447 window: VecDeque<Sample>,
449 samples_since_increase: usize,
453 samples_since_decrease: usize,
458 latency_baseline: Option<Duration>,
461 left_slow_start: bool,
464 hill: HillClimbState,
468}
469
470impl Limiter {
471 #[must_use]
476 pub fn new(start: usize, config: LimiterConfig) -> Self {
477 Self::new_with_algorithm(start, config, LimiterAlgorithm::Aimd)
478 }
479
480 fn new_with_algorithm(
481 start: usize,
482 config: LimiterConfig,
483 algorithm: LimiterAlgorithm,
484 ) -> Self {
485 let mut config = config;
486 config.sanitize();
487 let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
488 let window_cap = config.window_ops;
489 Self {
490 inner: Arc::new(Mutex::new(LimiterInner {
491 current: clamped,
492 window: VecDeque::with_capacity(window_cap),
493 samples_since_increase: 0,
494 samples_since_decrease: 0,
495 latency_baseline: None,
496 left_slow_start: false,
497 hill: HillClimbState::new(clamped, window_cap),
498 })),
499 config: Arc::new(config),
500 algorithm,
501 }
502 }
503
504 #[must_use]
508 pub fn current(&self) -> usize {
509 lock(&self.inner).current
510 }
511
512 pub fn observe(&self, outcome: Outcome, latency: Duration) {
515 self.observe_with_bytes(outcome, latency, 0);
516 }
517
518 pub fn observe_with_bytes(&self, outcome: Outcome, latency: Duration, bytes: u64) {
521 let observed_at = Instant::now();
522 let operation_started = observed_at.checked_sub(latency).unwrap_or(observed_at);
523 self.observe_with_timing(outcome, latency, bytes, operation_started);
524 }
525
526 fn observe_with_timing(
527 &self,
528 outcome: Outcome,
529 latency: Duration,
530 bytes: u64,
531 operation_started: Instant,
532 ) {
533 if !self.config.enabled {
534 return;
535 }
536 let mut g = lock(&self.inner);
537 if g.window.len() == self.config.window_ops {
538 g.window.pop_front();
539 }
540 g.window.push_back(Sample { outcome, latency });
541 if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
542 observe_hill_climb(
543 &mut g,
544 outcome,
545 latency,
546 bytes,
547 operation_started,
548 &self.config,
549 );
550 return;
551 }
552 g.samples_since_increase = g.samples_since_increase.saturating_add(1);
553 g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
554 if g.window.len() < self.config.min_window_ops {
555 return;
556 }
557 let decision = evaluate(&g.window, &self.config, g.latency_baseline);
558 apply_decision(&mut g, decision, &self.config);
559 }
560
561 pub fn warm_start(&self, start: usize) {
584 let clamped = start.clamp(
585 self.config.min_concurrency,
586 self.config.max_concurrency.max(1),
587 );
588 let mut g = lock(&self.inner);
589 g.current = clamped;
590 g.left_slow_start = clamped >= self.config.slow_start_ramp_threshold;
591 g.hill = HillClimbState::new(clamped, self.config.window_ops);
592 }
593
594 #[must_use]
596 pub fn snapshot(&self) -> usize {
597 let g = lock(&self.inner);
598 if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
599 g.hill.best_concurrency
600 } else {
601 g.current
602 }
603 }
604}
605
606#[derive(Debug, Clone, Copy)]
607struct HillEpochStats {
608 goodput_per_sec: f64,
609 latency_p95: Option<Duration>,
610}
611
612#[derive(Debug, Clone, Copy, PartialEq, Eq)]
614enum Decision {
615 Increase,
617 Decrease,
619 Hold,
621}
622
623fn evaluate(
624 window: &VecDeque<Sample>,
625 cfg: &LimiterConfig,
626 baseline: Option<Duration>,
627) -> Decision {
628 let mut successes = 0usize;
633 let mut timeouts = 0usize;
634 let mut net_errors = 0usize;
635 let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
636 for s in window {
637 match s.outcome {
638 Outcome::Success => {
639 successes += 1;
640 latencies.push(s.latency);
641 }
642 Outcome::Timeout => timeouts += 1,
643 Outcome::NetworkError => net_errors += 1,
644 Outcome::ApplicationError => {}
645 }
646 }
647 let capacity_total = successes + timeouts + net_errors;
648 if capacity_total < cfg.min_window_ops {
649 return Decision::Hold;
651 }
652 let total_f = capacity_total as f64;
653 let success_rate = successes as f64 / total_f;
654 let timeout_rate = timeouts as f64 / total_f;
655
656 if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
657 return Decision::Decrease;
658 }
659
660 if let Some(p95) = p95_of(&mut latencies) {
661 if cfg.latency_decrease_enabled {
662 if let Some(base) = baseline {
663 let limit = base.mul_f64(cfg.latency_inflation_factor);
664 if p95 > limit {
665 return Decision::Decrease;
666 }
667 }
668 }
669 Decision::Increase
670 } else {
671 Decision::Hold
672 }
673}
674
675fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
676 match decision {
677 Decision::Increase => {
678 if inner.samples_since_increase < cfg.window_ops {
681 return;
682 }
683 let p95 = window_p95(&inner.window);
684 inner.latency_baseline = Some(match inner.latency_baseline {
685 None => p95,
686 Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
687 });
688 let next = if inner.left_slow_start {
689 inner.current.saturating_add(1)
690 } else {
691 inner.current.saturating_mul(2)
692 };
693 let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
694 if next != inner.current {
695 debug!(
696 from = inner.current,
697 to = next,
698 slow_start = !inner.left_slow_start,
699 "adaptive: increase",
700 );
701 }
702 inner.current = next;
703 inner.samples_since_increase = 0;
704 inner.samples_since_decrease = 0;
705 }
706 Decision::Decrease => {
707 if inner.samples_since_decrease < cfg.min_window_ops {
712 return;
713 }
714 if inner.current >= cfg.slow_start_ramp_threshold {
721 inner.left_slow_start = true;
722 }
723 let next = (inner.current / 2).max(cfg.min_concurrency);
724 if next != inner.current {
725 debug!(from = inner.current, to = next, "adaptive: decrease");
726 }
727 inner.current = next;
728 inner.samples_since_increase = 0;
729 inner.samples_since_decrease = 0;
730 }
731 Decision::Hold => {}
732 }
733}
734
735fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
739 if latencies.is_empty() {
740 return None;
741 }
742 latencies.sort_unstable();
743 let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
744 let idx = idx.saturating_sub(1).min(latencies.len() - 1);
745 latencies.get(idx).copied()
746}
747
748fn window_p95(window: &VecDeque<Sample>) -> Duration {
749 let mut latencies: Vec<Duration> = window
750 .iter()
751 .filter(|s| matches!(s.outcome, Outcome::Success))
752 .map(|s| s.latency)
753 .collect();
754 p95_of(&mut latencies).unwrap_or(Duration::ZERO)
755}
756
757fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
758 let alpha = if alpha.is_finite() {
759 alpha.clamp(0.0, 1.0)
760 } else {
761 return prev;
762 };
763 let prev_ms = prev.as_secs_f64() * 1000.0;
764 let sample_ms = sample.as_secs_f64() * 1000.0;
765 let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
766 if !new_ms.is_finite() || new_ms < 0.0 {
767 return prev;
768 }
769 Duration::from_secs_f64(new_ms / 1000.0)
770}
771
772fn observe_hill_climb(
773 inner: &mut LimiterInner,
774 outcome: Outcome,
775 latency: Duration,
776 bytes: u64,
777 operation_started: Instant,
778 cfg: &LimiterConfig,
779) {
780 match inner.hill.epoch_started {
781 Some(epoch_started) if epoch_started <= operation_started => {}
782 _ => inner.hill.epoch_started = Some(operation_started),
783 }
784 inner.hill.epoch_samples = inner.hill.epoch_samples.saturating_add(1);
785 match outcome {
786 Outcome::Success => {
787 inner.hill.epoch_successes = inner.hill.epoch_successes.saturating_add(1);
788 inner.hill.epoch_bytes = inner.hill.epoch_bytes.saturating_add(bytes);
789 inner.hill.epoch_latencies.push(latency);
790 }
791 Outcome::Timeout => {
792 inner.hill.epoch_timeouts = inner.hill.epoch_timeouts.saturating_add(1);
793 }
794 Outcome::NetworkError => {
795 inner.hill.epoch_net_errors = inner.hill.epoch_net_errors.saturating_add(1);
796 }
797 Outcome::ApplicationError => {}
798 }
799
800 if hill_epoch_stressed(&inner.hill, cfg) {
801 apply_hill_stress(inner, cfg);
802 return;
803 }
804
805 if inner.hill.epoch_samples < hill_epoch_target_samples(inner.current, cfg) {
806 return;
807 }
808
809 if let Some(stats) = hill_epoch_stats(&inner.hill, cfg) {
810 apply_hill_epoch(inner, stats, cfg);
811 }
812 inner.hill.reset_epoch();
813}
814
815fn hill_epoch_target_samples(current: usize, cfg: &LimiterConfig) -> usize {
816 cfg.window_ops
817 .max(current.saturating_mul(HILL_EPOCH_FULL_WAVES))
818 .max(cfg.min_window_ops)
819}
820
821fn hill_epoch_stressed(hill: &HillClimbState, cfg: &LimiterConfig) -> bool {
822 let capacity_total = hill.capacity_total();
823 if capacity_total < cfg.min_window_ops {
824 return false;
825 }
826 let total_f = capacity_total as f64;
827 let success_rate = hill.epoch_successes as f64 / total_f;
828 let timeout_rate = hill.epoch_timeouts as f64 / total_f;
829 success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling
830}
831
832fn hill_epoch_stats(hill: &HillClimbState, cfg: &LimiterConfig) -> Option<HillEpochStats> {
833 let capacity_total = hill.capacity_total();
834 if capacity_total < cfg.min_window_ops || hill.epoch_successes == 0 {
835 return None;
836 }
837 let mut latencies = hill.epoch_latencies.clone();
838 let latency_p95 = p95_of(&mut latencies);
839 let max_latency = latencies.iter().copied().max().unwrap_or(Duration::ZERO);
840 let wall_elapsed = hill.epoch_started.map_or(Duration::ZERO, |s| s.elapsed());
841 let elapsed = wall_elapsed.max(max_latency);
842 let elapsed_secs = elapsed.as_secs_f64();
843 if !elapsed_secs.is_finite() || elapsed_secs <= 0.0 {
844 return None;
845 }
846
847 let units = if hill.epoch_bytes > 0 {
850 hill.epoch_bytes as f64
851 } else {
852 hill.epoch_successes as f64
853 };
854 Some(HillEpochStats {
855 goodput_per_sec: units / elapsed_secs,
856 latency_p95,
857 })
858}
859
860fn apply_hill_stress(inner: &mut LimiterInner, cfg: &LimiterConfig) {
861 let next = (inner.current / HILL_STRESS_DECREASE_DIVISOR)
862 .max(cfg.min_concurrency)
863 .min(cfg.max_concurrency);
864 if next != inner.current {
865 debug!(
866 from = inner.current,
867 to = next,
868 "adaptive: fetch hill stress decrease"
869 );
870 }
871 inner.current = next;
872 inner.hill.best_concurrency = next;
873 inner.hill.best_goodput_per_sec = None;
874 inner.hill.best_latency_p95 = None;
875 inner.hill.stable_epochs = 0;
876 inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
877 inner.hill.active_probe = None;
878 inner.hill.next_probe = ProbeDirection::Up;
879 inner.hill.reset_epoch();
880}
881
882fn apply_hill_epoch(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
883 let Some(best_goodput) = inner.hill.best_goodput_per_sec else {
884 inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
885 inner.hill.best_latency_p95 = stats.latency_p95;
886 inner.hill.best_concurrency = inner.current;
887 probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
888 return;
889 };
890
891 match inner.hill.active_probe {
892 Some(ProbeDirection::Up) => {
893 let improved = stats.goodput_per_sec >= best_goodput * HILL_UP_PROBE_ACCEPT_RATIO;
894 if improved
895 && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
896 {
897 accept_hill_probe(inner, stats, cfg);
898 probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
899 } else {
900 reject_hill_probe(inner);
901 }
902 }
903 Some(ProbeDirection::Down) => {
904 let retained = stats.goodput_per_sec >= best_goodput * HILL_DOWN_PROBE_ACCEPT_RATIO;
905 if retained
906 && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
907 {
908 accept_hill_probe(inner, stats, cfg);
909 inner.hill.next_probe = ProbeDirection::Up;
910 } else {
911 reject_hill_probe(inner);
912 }
913 }
914 None => {
915 refresh_hill_best(inner, stats, cfg);
916 if inner.hill.cooldown_epochs > 0 {
917 inner.hill.cooldown_epochs -= 1;
918 return;
919 }
920 inner.hill.stable_epochs = inner.hill.stable_epochs.saturating_add(1);
921 if inner.hill.stable_epochs >= HILL_STABLE_PROBE_EPOCHS {
922 let direction = inner.hill.next_probe;
923 inner.hill.next_probe = match direction {
924 ProbeDirection::Up => ProbeDirection::Down,
925 ProbeDirection::Down => ProbeDirection::Up,
926 };
927 probe_hill_neighbor(inner, direction, cfg);
928 }
929 }
930 }
931}
932
933fn refresh_hill_best(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
934 inner.hill.best_goodput_per_sec = Some(match inner.hill.best_goodput_per_sec {
935 Some(prev) => ewma_f64(prev, stats.goodput_per_sec, cfg.latency_ewma_alpha),
936 None => stats.goodput_per_sec,
937 });
938 if let Some(latency_p95) = stats.latency_p95 {
939 inner.hill.best_latency_p95 = Some(match inner.hill.best_latency_p95 {
940 Some(prev) => ewma(prev, latency_p95, cfg.latency_ewma_alpha),
941 None => latency_p95,
942 });
943 }
944}
945
946fn hill_latency_acceptable(
947 candidate: Option<Duration>,
948 best: Option<Duration>,
949 cfg: &LimiterConfig,
950) -> bool {
951 match (candidate, best) {
952 (Some(candidate), Some(best)) => candidate <= best.mul_f64(cfg.latency_inflation_factor),
953 _ => true,
954 }
955}
956
957fn ewma_f64(prev: f64, sample: f64, alpha: f64) -> f64 {
958 let alpha = if alpha.is_finite() {
959 alpha.clamp(0.0, 1.0)
960 } else {
961 return prev;
962 };
963 let next = (1.0 - alpha) * prev + alpha * sample;
964 if next.is_finite() && next >= 0.0 {
965 next
966 } else {
967 prev
968 }
969}
970
971fn accept_hill_probe(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
972 debug!(
973 concurrency = inner.current,
974 goodput_per_sec = stats.goodput_per_sec,
975 "adaptive: fetch hill accepted probe"
976 );
977 inner.hill.best_concurrency = inner.current;
978 inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
979 inner.hill.best_latency_p95 = stats.latency_p95;
980 inner.hill.active_probe = None;
981 inner.hill.cooldown_epochs = 0;
982 inner.hill.stable_epochs = 0;
983 inner.current = inner
984 .hill
985 .best_concurrency
986 .clamp(cfg.min_concurrency, cfg.max_concurrency);
987}
988
989fn reject_hill_probe(inner: &mut LimiterInner) {
990 let from = inner.current;
991 let to = inner.hill.best_concurrency;
992 let rejected_direction = inner.hill.active_probe;
993 if from != to {
994 debug!(from, to, "adaptive: fetch hill rejected probe");
995 }
996 inner.current = to;
997 inner.hill.active_probe = None;
998 if let Some(direction) = rejected_direction {
999 inner.hill.next_probe = match direction {
1000 ProbeDirection::Up => ProbeDirection::Down,
1001 ProbeDirection::Down => ProbeDirection::Up,
1002 };
1003 }
1004 inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
1005 inner.hill.stable_epochs = 0;
1006}
1007
1008fn probe_hill_neighbor(inner: &mut LimiterInner, direction: ProbeDirection, cfg: &LimiterConfig) {
1009 let best = inner.hill.best_concurrency;
1010 let step = (best / HILL_PROBE_STEP_DIVISOR).max(HILL_MIN_PROBE_STEP);
1011 let candidate = match direction {
1012 ProbeDirection::Up => best.saturating_add(step).min(cfg.max_concurrency),
1013 ProbeDirection::Down => best.saturating_sub(step).max(cfg.min_concurrency),
1014 };
1015 if candidate == best {
1016 inner.current = best;
1017 inner.hill.active_probe = None;
1018 inner.hill.stable_epochs = 0;
1019 return;
1020 }
1021 debug!(
1022 from = best,
1023 to = candidate,
1024 ?direction,
1025 "adaptive: fetch hill probing"
1026 );
1027 inner.current = candidate;
1028 inner.hill.active_probe = Some(direction);
1029 inner.hill.stable_epochs = 0;
1030}
1031
1032#[derive(Debug, Clone)]
1034pub struct AdaptiveController {
1035 pub quote: Limiter,
1036 pub store: Limiter,
1037 pub fetch: Limiter,
1038 pub(crate) config: AdaptiveConfig,
1045 cold_start: ChannelStart,
1051}
1052
1053impl AdaptiveController {
1054 #[must_use]
1059 pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
1060 let mut config = config;
1061 config.sanitize();
1062 let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
1063 let mut store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
1064 store_cfg.latency_decrease_enabled = false;
1087 store_cfg.slow_start_ramp_threshold = usize::MAX;
1088 let mut fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
1089 fetch_cfg.min_concurrency = fetch_cfg.min_concurrency.max(FETCH_MIN_FLOOR);
1106 fetch_cfg.max_concurrency = fetch_cfg.max_concurrency.max(fetch_cfg.min_concurrency);
1109 fetch_cfg.slow_start_ramp_threshold = usize::MAX;
1134 fetch_cfg.latency_decrease_enabled = false;
1135 Self {
1136 quote: Limiter::new(start.quote, quote_cfg),
1137 store: Limiter::new(start.store, store_cfg),
1138 fetch: Limiter::new_with_algorithm(
1139 start.fetch,
1140 fetch_cfg,
1141 LimiterAlgorithm::ThroughputHillClimb,
1142 ),
1143 config,
1144 cold_start: start,
1145 }
1146 }
1147
1148 #[must_use]
1150 pub fn snapshot(&self) -> ChannelStart {
1151 ChannelStart {
1152 quote: self.quote.snapshot(),
1153 store: self.store.snapshot(),
1154 fetch: self.fetch.snapshot(),
1155 }
1156 }
1157
1158 #[must_use]
1164 pub fn config(&self) -> &AdaptiveConfig {
1165 &self.config
1166 }
1167
1168 pub fn warm_start(&self, snapshot: ChannelStart) {
1182 if !self.config.enabled {
1183 return;
1184 }
1185 self.quote
1186 .warm_start(snapshot.quote.max(self.cold_start.quote));
1187 self.store
1188 .warm_start(snapshot.store.max(self.cold_start.store));
1189 self.fetch
1190 .warm_start(snapshot.fetch.max(self.cold_start.fetch));
1191 }
1192}
1193
1194impl Default for AdaptiveController {
1195 fn default() -> Self {
1196 Self::new(ChannelStart::default(), AdaptiveConfig::default())
1197 }
1198}
1199
1200struct ObserveGuard<'a> {
1208 limiter: &'a Limiter,
1209 started: Instant,
1210 outcome: Option<(Outcome, Duration, u64)>,
1211}
1212
1213impl<'a> ObserveGuard<'a> {
1214 fn new(limiter: &'a Limiter) -> Self {
1215 Self {
1216 limiter,
1217 started: Instant::now(),
1218 outcome: None,
1219 }
1220 }
1221 fn finish(&mut self, outcome: Outcome) {
1222 self.finish_with_bytes(outcome, 0);
1223 }
1224
1225 fn finish_with_bytes(&mut self, outcome: Outcome, bytes: u64) {
1226 self.outcome = Some((outcome, self.started.elapsed(), bytes));
1227 }
1228}
1229
1230impl Drop for ObserveGuard<'_> {
1231 fn drop(&mut self) {
1232 if let Some((outcome, latency, bytes)) = self.outcome.take() {
1233 self.limiter
1234 .observe_with_timing(outcome, latency, bytes, self.started);
1235 }
1236 }
1237}
1238
1239pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
1254where
1255 F: FnOnce() -> Fut,
1256 Fut: std::future::Future<Output = Result<T, E>>,
1257 C: FnOnce(&E) -> Outcome,
1258{
1259 let mut guard = ObserveGuard::new(limiter);
1260 let result = op().await;
1261 let outcome = match &result {
1262 Ok(_) => Outcome::Success,
1263 Err(e) => classify(e),
1264 };
1265 guard.finish(outcome);
1266 drop(guard); result
1268}
1269
1270pub async fn observe_op_with_success_bytes<T, E, F, Fut, C, B>(
1274 limiter: &Limiter,
1275 op: F,
1276 classify: C,
1277 success_bytes: B,
1278) -> Result<T, E>
1279where
1280 F: FnOnce() -> Fut,
1281 Fut: std::future::Future<Output = Result<T, E>>,
1282 C: FnOnce(&E) -> Outcome,
1283 B: FnOnce(&T) -> u64,
1284{
1285 let mut guard = ObserveGuard::new(limiter);
1286 let result = op().await;
1287 match &result {
1288 Ok(value) => guard.finish_with_bytes(Outcome::Success, success_bytes(value)),
1289 Err(e) => guard.finish_with_bytes(classify(e), 0),
1290 }
1291 drop(guard);
1292 result
1293}
1294
1295pub async fn rebucketed_unordered<I, T, E, F, Fut>(
1310 limiter: &Limiter,
1311 items: I,
1312 mut op: F,
1313) -> Result<Vec<T>, E>
1314where
1315 I: IntoIterator,
1316 F: FnMut(I::Item) -> Fut,
1317 Fut: std::future::Future<Output = Result<T, E>>,
1318{
1319 let mut iter = items.into_iter().peekable();
1320 let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
1321 let mut results = Vec::new();
1322 let mut pending_err: Option<E> = None;
1323 loop {
1324 if pending_err.is_none() {
1327 let cap = limiter.current().max(1);
1328 while in_flight.len() < cap {
1329 match iter.next() {
1330 Some(item) => in_flight.push(op(item)),
1331 None => break,
1332 }
1333 }
1334 }
1335 if in_flight.is_empty() {
1336 break;
1337 }
1338 match in_flight.next().await {
1339 Some(Ok(v)) => results.push(v),
1340 Some(Err(e)) => {
1341 if pending_err.is_none() {
1342 pending_err = Some(e);
1343 }
1344 }
1345 None => break,
1346 }
1347 }
1348 match pending_err {
1349 Some(e) => Err(e),
1350 None => Ok(results),
1351 }
1352}
1353
1354pub async fn rebucketed_ordered<I, U, E, F, Fut>(
1367 limiter: &Limiter,
1368 items: I,
1369 op: F,
1370) -> Result<Vec<U>, E>
1371where
1372 I: IntoIterator,
1373 F: FnMut(I::Item) -> Fut,
1374 Fut: std::future::Future<Output = Result<(usize, U), E>>,
1375{
1376 let mut indexed = rebucketed_unordered(limiter, items, op).await?;
1377 indexed.sort_by_key(|(idx, _)| *idx);
1378 Ok(indexed.into_iter().map(|(_, v)| v).collect())
1379}
1380
1381pub async fn rebucketed<I, T, E, F, Fut>(
1387 limiter: &Limiter,
1388 items: I,
1389 ordered: bool,
1390 mut op: F,
1391) -> Result<Vec<T>, E>
1392where
1393 I: IntoIterator,
1394 F: FnMut(I::Item) -> Fut,
1395 Fut: std::future::Future<Output = Result<T, E>>,
1396{
1397 if !ordered {
1398 return rebucketed_unordered(limiter, items, op).await;
1399 }
1400 let mut iter = items.into_iter();
1401 let mut results = Vec::new();
1402 let mut pending_err: Option<E> = None;
1403 loop {
1404 if pending_err.is_some() {
1405 break;
1406 }
1407 let cap = limiter.current().max(1);
1408 let mut batch = Vec::with_capacity(cap);
1409 for item in iter.by_ref().take(cap) {
1410 batch.push(op(item));
1411 }
1412 if batch.is_empty() {
1413 break;
1414 }
1415 let mut s = stream::iter(batch).buffered(cap);
1416 while let Some(r) = s.next().await {
1417 match r {
1418 Ok(v) => results.push(v),
1419 Err(e) => {
1420 if pending_err.is_none() {
1421 pending_err = Some(e);
1422 }
1423 }
1424 }
1425 }
1426 }
1427 match pending_err {
1428 Some(e) => Err(e),
1429 None => Ok(results),
1430 }
1431}
1432
1433#[derive(Debug, Clone, Serialize, Deserialize)]
1438struct PersistedState {
1439 schema: u32,
1440 channels: ChannelStart,
1441}
1442
1443const PERSIST_SCHEMA: u32 = 2;
1444const PERSIST_SCHEMA_AIMD_FETCH: u32 = 1;
1445const PERSIST_FILENAME: &str = "client_adaptive.json";
1446
1447#[must_use]
1451pub fn default_persist_path() -> Option<PathBuf> {
1452 crate::config::data_dir()
1453 .ok()
1454 .map(|d| d.join(PERSIST_FILENAME))
1455}
1456
1457#[must_use]
1463pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
1464 let bytes = std::fs::read(path).ok()?;
1465 let state: PersistedState = match serde_json::from_slice(&bytes) {
1466 Ok(s) => s,
1467 Err(e) => {
1468 warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
1469 return None;
1470 }
1471 };
1472 match state.schema {
1473 PERSIST_SCHEMA => Some(state.channels),
1474 PERSIST_SCHEMA_AIMD_FETCH => {
1475 debug!(
1476 path = %path.display(),
1477 "adaptive: migrating schema-1 snapshot, preserving quote/store and resetting fetch",
1478 );
1479 Some(ChannelStart {
1480 fetch: FETCH_COLD_START_CONCURRENCY,
1481 ..state.channels
1482 })
1483 }
1484 schema => {
1485 debug!(
1486 path = %path.display(),
1487 schema,
1488 expected = PERSIST_SCHEMA,
1489 "adaptive: snapshot schema mismatch, ignoring",
1490 );
1491 None
1492 }
1493 }
1494}
1495
1496pub fn save_snapshot(path: &Path, channels: ChannelStart) {
1499 let state = PersistedState {
1500 schema: PERSIST_SCHEMA,
1501 channels,
1502 };
1503 let bytes = match serde_json::to_vec_pretty(&state) {
1504 Ok(b) => b,
1505 Err(e) => {
1506 warn!(error = %e, "adaptive: snapshot serialize failed");
1507 return;
1508 }
1509 };
1510 if let Some(parent) = path.parent() {
1511 if let Err(e) = std::fs::create_dir_all(parent) {
1512 warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
1513 return;
1514 }
1515 }
1516 let nanos = std::time::SystemTime::now()
1523 .duration_since(std::time::UNIX_EPOCH)
1524 .map(|d| d.subsec_nanos())
1525 .unwrap_or(0);
1526 let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
1527 let tmp = path.with_extension(format!(
1528 "json.tmp.{}.{}.{}",
1529 std::process::id(),
1530 counter,
1531 nanos
1532 ));
1533 if let Err(e) = std::fs::write(&tmp, &bytes) {
1534 warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
1535 return;
1536 }
1537 if let Err(e) = std::fs::rename(&tmp, path) {
1538 warn!(
1539 from = %tmp.display(),
1540 to = %path.display(),
1541 error = %e,
1542 "adaptive: snapshot rename failed",
1543 );
1544 let _ = std::fs::remove_file(&tmp);
1547 }
1548}
1549
1550pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
1560 let handle = std::thread::spawn(move || {
1561 save_snapshot(&path, channels);
1562 });
1563 let started = Instant::now();
1567 let poll = Duration::from_millis(5);
1568 while started.elapsed() < timeout {
1569 if handle.is_finished() {
1570 let _ = handle.join();
1571 return;
1572 }
1573 std::thread::sleep(poll);
1574 }
1575 warn!(
1579 timeout_ms = timeout.as_millis() as u64,
1580 "adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
1581 );
1582 drop(handle);
1583}
1584
1585#[cfg(test)]
1586#[allow(clippy::unwrap_used)]
1587mod tests {
1588 use super::*;
1589
1590 const HILL_TEST_START_CAP: usize = 16;
1591 const HILL_TEST_UP_PROBE_CAP: usize = 20;
1592 const HILL_TEST_NEXT_UP_PROBE_CAP: usize = 25;
1593 const HILL_TEST_DOWN_PROBE_CAP: usize = 12;
1594 const HILL_TEST_CHUNK_BYTES: u64 = 1_000;
1595 const HILL_TEST_BASE_LATENCY_MS: u64 = 100;
1596 const HILL_TEST_REJECT_LATENCY_MS: u64 = 130;
1597 const HILL_TEST_RETAINED_DOWN_LATENCY_MS: u64 = 75;
1598 const HILL_TEST_ASYNC_LATENCY_MS: u64 = 10;
1599
1600 fn cfg_for_tests() -> LimiterConfig {
1601 LimiterConfig {
1602 enabled: true,
1603 min_concurrency: 1,
1604 max_concurrency: 64,
1605 window_ops: 10,
1606 min_window_ops: 5,
1607 success_target: 0.9,
1608 timeout_ceiling: 0.2,
1609 latency_inflation_factor: 2.0,
1610 latency_ewma_alpha: 0.5,
1611 slow_start_ramp_threshold: 0,
1612 latency_decrease_enabled: true,
1613 }
1614 }
1615
1616 fn hill_cfg_for_tests() -> LimiterConfig {
1617 LimiterConfig {
1618 window_ops: 4,
1619 min_window_ops: 2,
1620 max_concurrency: 64,
1621 success_target: 0.9,
1622 timeout_ceiling: 0.2,
1623 ..cfg_for_tests()
1624 }
1625 }
1626
1627 fn fetch_hill_for_tests(start: usize, cfg: LimiterConfig) -> Limiter {
1628 Limiter::new_with_algorithm(start, cfg, LimiterAlgorithm::ThroughputHillClimb)
1629 }
1630
1631 fn observe_hill_success_epoch_with_latency(
1632 limiter: &Limiter,
1633 cfg: &LimiterConfig,
1634 bytes: u64,
1635 latency: Duration,
1636 ) {
1637 let samples = hill_epoch_target_samples(limiter.current(), cfg);
1638 for _ in 0..samples {
1639 limiter.observe_with_bytes(Outcome::Success, latency, bytes);
1640 }
1641 }
1642
1643 fn observe_hill_success_epoch(limiter: &Limiter, cfg: &LimiterConfig, bytes: u64) {
1644 observe_hill_success_epoch_with_latency(
1645 limiter,
1646 cfg,
1647 bytes,
1648 Duration::from_millis(HILL_TEST_BASE_LATENCY_MS),
1649 );
1650 }
1651
1652 fn adaptive_cfg_for_tests() -> AdaptiveConfig {
1657 let l = cfg_for_tests();
1658 AdaptiveConfig {
1659 enabled: l.enabled,
1660 min_concurrency: l.min_concurrency,
1661 max: ChannelMax {
1662 quote: l.max_concurrency,
1663 store: l.max_concurrency,
1664 fetch: l.max_concurrency,
1665 },
1666 window_ops: l.window_ops,
1667 min_window_ops: l.min_window_ops,
1668 success_target: l.success_target,
1669 timeout_ceiling: l.timeout_ceiling,
1670 latency_inflation_factor: l.latency_inflation_factor,
1671 latency_ewma_alpha: l.latency_ewma_alpha,
1672 }
1673 }
1674
1675 #[test]
1676 fn warm_start_keeps_slow_start_armed_below_protected_threshold() {
1677 let cfg = LimiterConfig {
1686 max_concurrency: 256,
1687 slow_start_ramp_threshold: 256,
1688 latency_decrease_enabled: false,
1689 ..cfg_for_tests()
1690 };
1691 let l = Limiter::new(64, cfg.clone());
1692 l.warm_start(20);
1693 assert_eq!(l.current(), 20);
1694 for _ in 0..cfg.window_ops {
1697 l.observe(Outcome::Success, Duration::from_millis(10));
1698 }
1699 assert_eq!(
1700 l.current(),
1701 40,
1702 "protected channel must double after warm_start, not crawl +1",
1703 );
1704
1705 let default_cfg = LimiterConfig {
1708 max_concurrency: 256,
1709 ..cfg_for_tests()
1710 };
1711 let d = Limiter::new(64, default_cfg.clone());
1712 d.warm_start(20);
1713 for _ in 0..default_cfg.window_ops {
1714 d.observe(Outcome::Success, Duration::from_millis(10));
1715 }
1716 assert_eq!(
1717 d.current(),
1718 21,
1719 "default channel must stay additive after warm_start",
1720 );
1721 }
1722
1723 #[test]
1724 fn slow_start_stays_armed_at_ceiling_with_max_threshold() {
1725 let base = LimiterConfig {
1734 max_concurrency: 256,
1735 latency_decrease_enabled: false,
1736 ..cfg_for_tests()
1737 };
1738 let fixed = Limiter::new(
1739 256,
1740 LimiterConfig {
1741 slow_start_ramp_threshold: usize::MAX,
1742 ..base.clone()
1743 },
1744 );
1745 let buggy = Limiter::new(
1746 256,
1747 LimiterConfig {
1748 slow_start_ramp_threshold: 256,
1749 ..base.clone()
1750 },
1751 );
1752 for l in [&fixed, &buggy] {
1753 for _ in 0..base.window_ops {
1754 l.observe(Outcome::Timeout, Duration::from_millis(10));
1755 }
1756 for _ in 0..(base.window_ops * 10) {
1757 l.observe(Outcome::Success, Duration::from_millis(10));
1758 }
1759 }
1760 assert!(
1761 fixed.current() > buggy.current(),
1762 "MAX-threshold limiter ({}) must out-recover the ceiling-threshold one ({})",
1763 fixed.current(),
1764 buggy.current(),
1765 );
1766 }
1767
1768 #[test]
1769 fn protected_slow_start_recovers_faster_than_additive() {
1770 let base = LimiterConfig {
1775 max_concurrency: 256,
1776 latency_decrease_enabled: false,
1777 ..cfg_for_tests()
1778 };
1779 let protected = Limiter::new(
1780 64,
1781 LimiterConfig {
1782 slow_start_ramp_threshold: 256,
1783 ..base.clone()
1784 },
1785 );
1786 let unprotected = Limiter::new(
1787 64,
1788 LimiterConfig {
1789 slow_start_ramp_threshold: 0,
1790 ..base.clone()
1791 },
1792 );
1793
1794 for l in [&protected, &unprotected] {
1796 for _ in 0..base.window_ops {
1797 l.observe(Outcome::Timeout, Duration::from_millis(10));
1798 }
1799 }
1800 for l in [&protected, &unprotected] {
1804 for _ in 0..(base.window_ops * 10) {
1805 l.observe(Outcome::Success, Duration::from_millis(10));
1806 }
1807 }
1808 assert!(
1809 protected.current() > unprotected.current(),
1810 "protected slow-start ({}) should recover faster than additive ({})",
1811 protected.current(),
1812 unprotected.current(),
1813 );
1814 }
1815
1816 #[test]
1817 fn latency_decrease_disabled_ignores_p95_inflation() {
1818 let cfg = LimiterConfig {
1824 max_concurrency: 256,
1825 slow_start_ramp_threshold: 256,
1826 latency_decrease_enabled: false,
1827 ..cfg_for_tests()
1828 };
1829 let l = Limiter::new(16, cfg.clone());
1830 for _ in 0..cfg.window_ops {
1832 l.observe(Outcome::Success, Duration::from_millis(5));
1833 }
1834 let after_baseline = l.current();
1835 for _ in 0..cfg.window_ops {
1839 l.observe(Outcome::Success, Duration::from_millis(500));
1840 }
1841 assert!(
1842 l.current() >= after_baseline,
1843 "latency inflation must not shrink the cap when the check is disabled: {} < {}",
1844 l.current(),
1845 after_baseline,
1846 );
1847 }
1848
1849 #[test]
1850 fn controller_sets_fetch_channel_download_tuning() {
1851 let c = AdaptiveController::new(ChannelStart::default(), AdaptiveConfig::default());
1855 assert!(
1856 !c.fetch.config.latency_decrease_enabled,
1857 "fetch latency-decrease must be disabled",
1858 );
1859 assert_eq!(
1860 c.fetch.config.slow_start_ramp_threshold,
1861 usize::MAX,
1862 "fetch slow-start must never exit (armed at every cap incl. ceiling)",
1863 );
1864 assert!(
1865 c.quote.config.latency_decrease_enabled,
1866 "quote must keep the latency-decrease check",
1867 );
1868 assert_eq!(
1869 c.quote.config.slow_start_ramp_threshold, 0,
1870 "quote must keep classic AIMD slow-start exit",
1871 );
1872 assert!(
1876 !c.store.config.latency_decrease_enabled,
1877 "store latency-decrease must be disabled (verification variance is not congestion)",
1878 );
1879 assert_eq!(
1880 c.store.config.slow_start_ramp_threshold,
1881 usize::MAX,
1882 "store slow-start must never exit so a transient Decrease re-doubles",
1883 );
1884 assert_eq!(
1887 c.store.current(),
1888 ChannelStart::default().store,
1889 "store cold-start floor must remain unchanged at 8",
1890 );
1891 }
1892
1893 #[test]
1894 fn store_channel_ramps_and_recovers_under_v2_468_tuning() {
1895 let mut adaptive = adaptive_cfg_for_tests();
1901 adaptive.max.store = 256;
1903 let c = AdaptiveController::new(
1904 ChannelStart {
1905 quote: 8,
1906 store: 8,
1907 fetch: 8,
1908 },
1909 adaptive,
1910 );
1911 let store = &c.store;
1912 let win = c.config().window_ops;
1913
1914 for _ in 0..win {
1917 store.observe(Outcome::Success, Duration::from_millis(5));
1918 }
1919 let after_baseline = store.current();
1920 assert!(after_baseline >= 8, "store should ramp on healthy windows");
1921 for _ in 0..win {
1922 store.observe(Outcome::Success, Duration::from_secs(30));
1923 }
1924 assert!(
1925 store.current() >= after_baseline,
1926 "verification-latency p95 must not shrink store cap: {} < {}",
1927 store.current(),
1928 after_baseline,
1929 );
1930
1931 let before_stress = store.current();
1933 for _ in 0..win {
1934 store.observe(Outcome::Timeout, Duration::from_millis(50));
1935 }
1936 let after_stress = store.current();
1937 assert!(
1938 after_stress < before_stress,
1939 "timeout-rate breach must still cut the store cap: {after_stress} !< {before_stress}",
1940 );
1941
1942 for _ in 0..(win * 8) {
1948 store.observe(Outcome::Success, Duration::from_millis(5));
1949 }
1950 assert!(
1951 store.current() >= before_stress,
1952 "store must re-double back to {before_stress} after a transient Decrease, got {}",
1953 store.current(),
1954 );
1955 }
1956
1957 #[test]
1958 fn store_application_rejections_do_not_move_cap() {
1959 let mut adaptive = adaptive_cfg_for_tests();
1963 adaptive.max.store = 256;
1964 let c = AdaptiveController::new(
1965 ChannelStart {
1966 quote: 8,
1967 store: 8,
1968 fetch: 8,
1969 },
1970 adaptive,
1971 );
1972 let store = &c.store;
1973 let start = store.current();
1974 for _ in 0..(c.config().window_ops * 5) {
1975 store.observe(Outcome::ApplicationError, Duration::from_secs(30));
1976 }
1977 assert_eq!(
1978 store.current(),
1979 start,
1980 "remote app-rejections must not move the store cap",
1981 );
1982 }
1983
1984 #[test]
1985 fn cold_start_clamps_into_bounds() {
1986 let cfg = cfg_for_tests();
1987 let l = Limiter::new(1000, cfg.clone());
1988 assert_eq!(l.current(), cfg.max_concurrency);
1989 let l = Limiter::new(0, cfg.clone());
1990 assert_eq!(l.current(), cfg.min_concurrency);
1991 }
1992
1993 #[test]
1994 fn slow_start_doubles_then_caps() {
1995 let cfg = cfg_for_tests();
1996 let l = Limiter::new(2, cfg.clone());
1997 for _ in 0..cfg.window_ops {
1999 l.observe(Outcome::Success, Duration::from_millis(50));
2000 }
2001 assert_eq!(l.current(), 4);
2002 for _ in 0..cfg.window_ops {
2003 l.observe(Outcome::Success, Duration::from_millis(50));
2004 }
2005 assert_eq!(l.current(), 8);
2006 }
2007
2008 #[test]
2009 fn first_failure_exits_slow_start() {
2010 let cfg = cfg_for_tests();
2011 let l = Limiter::new(4, cfg.clone());
2012 for _ in 0..6 {
2016 l.observe(Outcome::Success, Duration::from_millis(50));
2017 }
2018 for _ in 0..4 {
2019 l.observe(Outcome::Timeout, Duration::from_millis(50));
2020 }
2021 let after_stress = l.current();
2022 assert!(
2023 after_stress < 4,
2024 "stress should reduce concurrency from 4, got {after_stress}",
2025 );
2026 for _ in 0..(cfg.window_ops * 5) {
2034 l.observe(Outcome::Success, Duration::from_millis(50));
2035 }
2036 assert!(
2037 l.current() > after_stress,
2038 "expected recovery above {after_stress}, got {}",
2039 l.current(),
2040 );
2041 }
2042
2043 #[test]
2044 fn floor_holds_at_one() {
2045 let cfg = cfg_for_tests();
2046 let l = Limiter::new(2, cfg);
2047 for _ in 0..30 {
2048 l.observe(Outcome::Timeout, Duration::from_millis(50));
2049 }
2050 assert_eq!(l.current(), 1);
2051 }
2052
2053 #[test]
2054 fn application_errors_do_not_punish() {
2055 let cfg = cfg_for_tests();
2056 let l = Limiter::new(4, cfg.clone());
2057 for _ in 0..cfg.window_ops * 5 {
2064 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
2065 }
2066 assert_eq!(
2067 l.current(),
2068 4,
2069 "ApplicationError must not move the cap; got {}",
2070 l.current()
2071 );
2072 }
2073
2074 #[test]
2075 fn latency_inflation_triggers_decrease() {
2076 let cfg = LimiterConfig {
2077 window_ops: 20,
2078 min_window_ops: 5,
2079 ..cfg_for_tests()
2080 };
2081 let l = Limiter::new(4, cfg.clone());
2082 for _ in 0..cfg.window_ops {
2084 l.observe(Outcome::Success, Duration::from_millis(50));
2085 }
2086 let after_baseline = l.current();
2087 for _ in 0..cfg.window_ops {
2089 l.observe(Outcome::Success, Duration::from_millis(500));
2090 }
2091 assert!(
2093 l.current() < after_baseline,
2094 "expected decrease from {after_baseline}, got {}",
2095 l.current(),
2096 );
2097 }
2098
2099 #[test]
2100 fn warm_start_overrides_current() {
2101 let cfg = cfg_for_tests();
2102 let l = Limiter::new(2, cfg);
2103 l.warm_start(20);
2104 assert_eq!(l.current(), 20);
2105 }
2106
2107 #[test]
2108 fn warm_start_clamps() {
2109 let cfg = cfg_for_tests();
2110 let l = Limiter::new(2, cfg.clone());
2111 l.warm_start(1_000_000);
2112 assert_eq!(l.current(), cfg.max_concurrency);
2113 }
2114
2115 #[test]
2116 fn disabled_controller_holds_steady() {
2117 let cfg = LimiterConfig {
2118 enabled: false,
2119 ..cfg_for_tests()
2120 };
2121 let l = Limiter::new(8, cfg);
2122 for _ in 0..50 {
2123 l.observe(Outcome::Timeout, Duration::from_millis(50));
2124 }
2125 assert_eq!(l.current(), 8);
2126 }
2127
2128 #[test]
2129 fn controller_snapshot_round_trips() {
2130 let c = AdaptiveController::new(
2136 ChannelStart {
2137 quote: 64,
2138 store: 16,
2139 fetch: 64,
2140 },
2141 adaptive_cfg_for_tests(),
2142 );
2143 let snap = c.snapshot();
2144 assert_eq!(snap.quote, 64);
2145 assert_eq!(snap.store, 16);
2146 assert_eq!(snap.fetch, 64);
2147
2148 let c2 = AdaptiveController::default();
2149 c2.warm_start(snap);
2150 assert_eq!(c2.quote.current(), 64);
2151 assert_eq!(c2.store.current(), 16);
2152 assert_eq!(c2.fetch.current(), 64);
2153 }
2154
2155 #[tokio::test]
2156 async fn observe_op_records_success() {
2157 let cfg = cfg_for_tests();
2158 let l = Limiter::new(4, cfg.clone());
2159 for _ in 0..cfg.window_ops {
2160 let _: Result<(), &str> =
2161 observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
2162 }
2163 assert_eq!(l.current(), 8);
2165 }
2166
2167 #[test]
2168 fn snapshot_round_trips_through_disk() {
2169 let dir = tempfile::tempdir().unwrap();
2170 let path = dir.path().join("client_adaptive.json");
2171 let snap = ChannelStart {
2172 quote: 24,
2173 store: 6,
2174 fetch: 12,
2175 };
2176 save_snapshot(&path, snap);
2177 let loaded = load_snapshot(&path).unwrap();
2178 assert_eq!(loaded.quote, 24);
2179 assert_eq!(loaded.store, 6);
2180 assert_eq!(loaded.fetch, 12);
2181 }
2182
2183 #[test]
2184 fn load_missing_returns_none() {
2185 let dir = tempfile::tempdir().unwrap();
2186 let path = dir.path().join("does_not_exist.json");
2187 assert!(load_snapshot(&path).is_none());
2188 }
2189
2190 #[test]
2191 fn load_corrupt_returns_none() {
2192 let dir = tempfile::tempdir().unwrap();
2193 let path = dir.path().join("bad.json");
2194 std::fs::write(&path, b"not valid json{{{").unwrap();
2195 assert!(load_snapshot(&path).is_none());
2196 }
2197
2198 #[test]
2199 fn load_wrong_schema_returns_none() {
2200 let dir = tempfile::tempdir().unwrap();
2201 let path = dir.path().join("future.json");
2202 let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
2205 std::fs::write(&path, payload).unwrap();
2206 assert!(load_snapshot(&path).is_none());
2207 }
2208
2209 #[test]
2210 fn load_schema_one_preserves_quote_store_and_resets_fetch() {
2211 const LEGACY_QUOTE_CAP: usize = 48;
2212 const LEGACY_STORE_CAP: usize = 24;
2213 const LEGACY_FETCH_CAP: usize = 96;
2214
2215 let dir = tempfile::tempdir().unwrap();
2216 let path = dir.path().join("legacy.json");
2217 let payload = format!(
2218 r#"{{"schema":{},"channels":{{"quote":{},"store":{},"fetch":{}}}}}"#,
2219 PERSIST_SCHEMA_AIMD_FETCH, LEGACY_QUOTE_CAP, LEGACY_STORE_CAP, LEGACY_FETCH_CAP,
2220 );
2221 std::fs::write(&path, payload).unwrap();
2222
2223 let loaded = load_snapshot(&path).unwrap();
2224
2225 assert_eq!(loaded.quote, LEGACY_QUOTE_CAP);
2226 assert_eq!(loaded.store, LEGACY_STORE_CAP);
2227 assert_eq!(loaded.fetch, FETCH_COLD_START_CONCURRENCY);
2228 }
2229
2230 #[tokio::test]
2231 async fn observe_op_records_classified_error() {
2232 let cfg = cfg_for_tests();
2233 let l = Limiter::new(4, cfg.clone());
2234 for _ in 0..cfg.window_ops {
2235 let _: Result<(), &str> =
2236 observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
2237 }
2238 assert!(l.current() < 4);
2239 }
2240
2241 #[test]
2251 fn no_regression_cold_start_at_least_static_defaults() {
2252 let s = ChannelStart::default();
2253 assert!(
2254 s.quote >= 32,
2255 "quote cold-start regressed: got {}, prior static was 32",
2256 s.quote,
2257 );
2258 assert!(
2259 s.store >= 8,
2260 "store cold-start regressed: got {}, prior static was 8",
2261 s.store,
2262 );
2263 assert_eq!(
2264 s.fetch, FETCH_COLD_START_CONCURRENCY,
2265 "fetch cold-start changed unexpectedly: got {}, expected {}",
2266 s.fetch, FETCH_COLD_START_CONCURRENCY,
2267 );
2268 }
2269
2270 #[test]
2274 fn controller_default_config_is_sane() {
2275 let c = AdaptiveController::default();
2276 let starts = ChannelStart::default();
2277 assert_eq!(c.quote.current(), starts.quote);
2278 assert_eq!(c.store.current(), starts.store);
2279 assert_eq!(c.fetch.current(), starts.fetch);
2280 assert_eq!(lock(&c.quote.inner).window.len(), 0);
2282 assert_eq!(lock(&c.store.inner).window.len(), 0);
2283 assert_eq!(lock(&c.fetch.inner).window.len(), 0);
2284 }
2285
2286 #[test]
2290 fn alternating_success_failure_collapses_to_floor() {
2291 let cfg = cfg_for_tests();
2297 let l = Limiter::new(8, cfg.clone());
2298 let mut min_observed = usize::MAX;
2299 let mut max_observed = 0usize;
2300 let mut floor_visits = 0usize;
2301 for i in 0..1000 {
2302 let outcome = if i % 2 == 0 {
2303 Outcome::Success
2304 } else {
2305 Outcome::Timeout
2306 };
2307 l.observe(outcome, Duration::from_millis(50));
2308 let cur = l.current();
2309 assert!(
2310 cur >= cfg.min_concurrency,
2311 "cap underflowed floor at iter {i}: got {cur}",
2312 );
2313 min_observed = min_observed.min(cur);
2314 max_observed = max_observed.max(cur);
2315 if cur == cfg.min_concurrency {
2316 floor_visits += 1;
2317 }
2318 }
2319 assert_eq!(
2320 min_observed, cfg.min_concurrency,
2321 "cap never reached the floor under 50% timeout rate"
2322 );
2323 assert!(
2324 max_observed >= 8,
2325 "cap never visited the start value: max_observed={max_observed}"
2326 );
2327 assert!(
2331 floor_visits > 500,
2332 "cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
2333 );
2334 assert_eq!(
2335 l.current(),
2336 cfg.min_concurrency,
2337 "controller did not settle at floor after 1000 alternations"
2338 );
2339 }
2340
2341 #[test]
2345 fn pure_success_stream_recovers_to_max() {
2346 let cfg = cfg_for_tests();
2347 let l = Limiter::new(cfg.min_concurrency, cfg.clone());
2348 for _ in 0..10_000 {
2349 l.observe(Outcome::Success, Duration::from_millis(5));
2350 }
2351 assert_eq!(
2352 l.current(),
2353 cfg.max_concurrency,
2354 "expected recovery to max ({}), got {}",
2355 cfg.max_concurrency,
2356 l.current(),
2357 );
2358 }
2359
2360 #[test]
2364 fn stress_then_heal_drives_floor_then_recovery() {
2365 let cfg = cfg_for_tests();
2366 let l = Limiter::new(8, cfg.clone());
2367 for _ in 0..100 {
2368 l.observe(Outcome::Timeout, Duration::from_millis(50));
2369 }
2370 let after_stress = l.current();
2371 assert_eq!(
2372 after_stress, cfg.min_concurrency,
2373 "stress should drive cap to floor, got {after_stress}",
2374 );
2375 for _ in 0..1_000 {
2376 l.observe(Outcome::Success, Duration::from_millis(10));
2377 }
2378 let after_heal = l.current();
2379 assert!(
2380 after_heal >= cfg.min_concurrency.saturating_add(4),
2381 "expected substantial recovery from floor, got {after_heal}",
2382 );
2383 }
2384
2385 #[test]
2389 fn baseline_does_not_grow_unbounded_under_slow_links() {
2390 let cfg = cfg_for_tests();
2391 let l = Limiter::new(2, cfg.clone());
2392 for _ in 0..(cfg.window_ops * 10) {
2393 l.observe(Outcome::Success, Duration::from_millis(500));
2394 }
2395 let baseline = lock(&l.inner).latency_baseline;
2396 let base = baseline.expect("baseline should be set after many healthy windows");
2397 assert!(
2398 base > Duration::ZERO,
2399 "baseline must not stay at ZERO, got {base:?}",
2400 );
2401 let lo = Duration::from_millis(250);
2403 let hi = Duration::from_millis(1000);
2404 assert!(
2405 base >= lo && base <= hi,
2406 "baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
2407 );
2408 }
2409
2410 #[test]
2415 fn baseline_initialized_only_after_first_healthy_window() {
2416 let cfg = cfg_for_tests();
2417 let l = Limiter::new(8, cfg.clone());
2418 for _ in 0..50 {
2419 l.observe(Outcome::Timeout, Duration::from_millis(50));
2420 }
2421 assert!(
2423 lock(&l.inner).latency_baseline.is_none(),
2424 "baseline must be None before any healthy window",
2425 );
2426 for _ in 0..(cfg.window_ops * 5) {
2428 l.observe(Outcome::Success, Duration::from_millis(20));
2429 }
2430 let baseline = lock(&l.inner).latency_baseline;
2431 assert!(
2432 baseline.is_some(),
2433 "baseline must be Some after healthy windows",
2434 );
2435 let base = baseline.unwrap_or_default();
2436 assert!(
2437 base > Duration::ZERO,
2438 "baseline must reflect real latency, got {base:?}",
2439 );
2440 }
2441
2442 #[test]
2445 fn min_concurrency_floor_holds_under_torrent_of_errors() {
2446 let cfg = cfg_for_tests();
2447 let l = Limiter::new(8, cfg.clone());
2448 for i in 0..50_000 {
2449 l.observe(Outcome::Timeout, Duration::from_millis(50));
2450 if i == 100 || i == 1_000 || i == 49_999 {
2451 let cur = l.current();
2452 assert_eq!(
2453 cur, cfg.min_concurrency,
2454 "floor breached at iter {i}: got {cur}",
2455 );
2456 }
2457 }
2458 }
2459
2460 #[test]
2462 fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
2463 let cfg = cfg_for_tests();
2464 let start = cfg
2465 .max_concurrency
2466 .saturating_sub(1)
2467 .max(cfg.min_concurrency);
2468 let l = Limiter::new(start, cfg.clone());
2469 for i in 0..50_000 {
2470 l.observe(Outcome::Success, Duration::from_millis(5));
2471 if i == 100 || i == 1_000 || i == 49_999 {
2472 let cur = l.current();
2473 assert!(
2474 cur <= cfg.max_concurrency,
2475 "ceiling breached at iter {i}: got {cur} > {}",
2476 cfg.max_concurrency,
2477 );
2478 }
2479 }
2480 assert_eq!(l.current(), cfg.max_concurrency);
2481 }
2482
2483 #[test]
2489 fn saturating_arithmetic_handles_extreme_config() {
2490 let cfg = LimiterConfig {
2491 max_concurrency: usize::MAX / 2,
2492 ..cfg_for_tests()
2493 };
2494 let start = usize::MAX / 4;
2495 let l = Limiter::new(start, cfg.clone());
2496 for _ in 0..(cfg.window_ops * 10) {
2497 l.observe(Outcome::Success, Duration::from_millis(1));
2498 }
2499 assert_eq!(
2504 l.current(),
2505 cfg.max_concurrency,
2506 "saturating math survived but cap did not grow to ceiling"
2507 );
2508 }
2509
2510 #[test]
2517 fn window_eviction_is_fifo() {
2518 let cfg = LimiterConfig {
2519 window_ops: 10,
2520 min_window_ops: 5,
2521 success_target: 0.9,
2522 timeout_ceiling: 0.1,
2523 ..cfg_for_tests()
2524 };
2525 let l = Limiter::new(8, cfg.clone());
2526 for _ in 0..cfg.window_ops {
2531 l.observe(Outcome::Timeout, Duration::from_millis(50));
2532 }
2533 let after_stress = l.current();
2534 assert!(
2535 after_stress < 8,
2536 "expected cap to drop from 8 after pure-timeout window, got {after_stress}"
2537 );
2538 for _ in 0..(cfg.window_ops * 3) {
2543 l.observe(Outcome::Success, Duration::from_millis(20));
2544 }
2545 let after_recovery = l.current();
2546 assert!(
2549 after_recovery > after_stress,
2550 "FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
2551 );
2552 }
2553
2554 #[test]
2557 fn disabled_controller_returns_initial_value_invariantly() {
2558 let cfg = LimiterConfig {
2559 enabled: false,
2560 ..cfg_for_tests()
2561 };
2562 let initial = 8;
2563 let l = Limiter::new(initial, cfg);
2564 for i in 0..1_000 {
2565 let outcome = match i % 4 {
2566 0 => Outcome::Success,
2567 1 => Outcome::Timeout,
2568 2 => Outcome::NetworkError,
2569 _ => Outcome::ApplicationError,
2570 };
2571 l.observe(outcome, Duration::from_millis(50));
2572 assert_eq!(
2573 l.current(),
2574 initial,
2575 "disabled controller moved at iter {i}",
2576 );
2577 }
2578 }
2579
2580 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2583 async fn concurrent_observations_do_not_corrupt_window() {
2584 let cfg = cfg_for_tests();
2585 let l = Limiter::new(4, cfg.clone());
2586 let mut handles = Vec::with_capacity(100);
2587 for _ in 0..100 {
2588 let l_clone = l.clone();
2589 handles.push(tokio::spawn(async move {
2590 for _ in 0..100 {
2591 l_clone.observe(Outcome::Success, Duration::from_millis(5));
2592 }
2593 }));
2594 }
2595 for h in handles {
2596 h.await.unwrap();
2597 }
2598 let cur = l.current();
2599 assert!(
2600 cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
2601 "cap out of bounds after concurrent observations: {cur}",
2602 );
2603 }
2604
2605 #[test]
2610 fn persisted_snapshot_warm_starts_above_cold_floor() {
2611 let dir = tempfile::tempdir().unwrap();
2612 let path = dir.path().join("client_adaptive.json");
2613 let saved = ChannelStart {
2616 quote: 64,
2617 store: 32,
2618 fetch: 128,
2619 };
2620 save_snapshot(&path, saved);
2621 let loaded = load_snapshot(&path).unwrap();
2622
2623 let low = ChannelStart {
2626 quote: 2,
2627 store: 2,
2628 fetch: 2,
2629 };
2630 let c = AdaptiveController::new(low, AdaptiveConfig::default());
2631 c.warm_start(loaded);
2632 assert_eq!(c.quote.current(), 64);
2633 assert_eq!(c.store.current(), 32);
2634 assert_eq!(c.fetch.current(), 128);
2635 }
2636
2637 #[test]
2641 fn save_load_round_trip_with_concurrent_writes() {
2642 use std::thread;
2643 let dir = tempfile::tempdir().unwrap();
2644 let path = dir.path().join("client_adaptive.json");
2645 let path_a = path.clone();
2646 let path_b = path.clone();
2647 let snap_a = ChannelStart {
2648 quote: 10,
2649 store: 10,
2650 fetch: 10,
2651 };
2652 let snap_b = ChannelStart {
2653 quote: 99,
2654 store: 99,
2655 fetch: 99,
2656 };
2657 let h_a = thread::spawn(move || {
2658 for _ in 0..50 {
2659 save_snapshot(&path_a, snap_a);
2660 }
2661 });
2662 let h_b = thread::spawn(move || {
2663 for _ in 0..50 {
2664 save_snapshot(&path_b, snap_b);
2665 }
2666 });
2667 h_a.join().unwrap();
2668 h_b.join().unwrap();
2669 let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
2670 let valid = (loaded.quote == snap_a.quote
2671 && loaded.store == snap_a.store
2672 && loaded.fetch == snap_a.fetch)
2673 || (loaded.quote == snap_b.quote
2674 && loaded.store == snap_b.store
2675 && loaded.fetch == snap_b.fetch);
2676 assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
2677 }
2678
2679 #[test]
2682 fn save_snapshot_to_unwritable_dir_does_not_panic() {
2683 let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
2687 let snap = ChannelStart {
2688 quote: 1,
2689 store: 1,
2690 fetch: 1,
2691 };
2692 save_snapshot(&path, snap);
2694 assert!(!path.exists());
2696 }
2697
2698 #[test]
2701 fn load_snapshot_from_truncated_file_returns_none() {
2702 let dir = tempfile::tempdir().unwrap();
2703 let path = dir.path().join("truncated.json");
2704 std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
2705 assert!(load_snapshot(&path).is_none());
2706 }
2707
2708 #[test]
2712 fn controller_perf_overhead_is_bounded() {
2713 let cfg = cfg_for_tests();
2714 let l = Limiter::new(8, cfg);
2715 let started = Instant::now();
2716 for _ in 0..100_000 {
2717 let _ = l.current();
2718 l.observe(Outcome::Success, Duration::from_micros(1));
2719 }
2720 let elapsed = started.elapsed();
2721 assert!(
2724 elapsed < Duration::from_millis(500),
2725 "100k observe+current pairs took {elapsed:?}, expected <500ms",
2726 );
2727 }
2728
2729 #[test]
2737 fn nan_and_out_of_range_config_does_not_panic() {
2738 let cfg = AdaptiveConfig {
2739 enabled: true,
2740 min_concurrency: 0, max: ChannelMax {
2742 quote: 0, store: 0,
2744 fetch: 0,
2745 },
2746 window_ops: 10,
2747 min_window_ops: 50, success_target: f64::NAN,
2749 timeout_ceiling: f64::INFINITY,
2750 latency_inflation_factor: f64::NEG_INFINITY,
2751 latency_ewma_alpha: f64::NAN,
2752 };
2753 let c = AdaptiveController::new(ChannelStart::default(), cfg);
2754 let post = &c.config;
2758 assert_eq!(
2759 post.min_concurrency, 1,
2760 "sanitize did not raise min_concurrency from 0"
2761 );
2762 assert!(
2763 post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
2764 "sanitize did not clamp success_target from NaN: {}",
2765 post.success_target
2766 );
2767 assert!(
2768 post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
2769 "sanitize did not clamp timeout_ceiling from Inf: {}",
2770 post.timeout_ceiling
2771 );
2772 assert!(
2773 post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
2774 "sanitize did not fix latency_inflation_factor from -Inf: {}",
2775 post.latency_inflation_factor
2776 );
2777 assert!(
2778 post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
2779 "sanitize did not fix latency_ewma_alpha from NaN: {}",
2780 post.latency_ewma_alpha
2781 );
2782 assert!(
2783 post.min_window_ops <= post.window_ops,
2784 "sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
2785 post.min_window_ops,
2786 post.window_ops
2787 );
2788 assert!(
2789 post.max.quote >= post.min_concurrency,
2790 "max.quote below min_concurrency"
2791 );
2792 for _ in 0..200 {
2795 c.store
2796 .observe(Outcome::Success, Duration::from_secs(99_999));
2797 c.store.observe(Outcome::Timeout, Duration::ZERO);
2798 }
2799 let cur = c.store.current();
2800 assert!(cur >= 1, "cap below floor: {cur}");
2801 }
2802
2803 #[test]
2810 fn transient_burst_does_not_pile_drive_to_floor() {
2811 let cfg = LimiterConfig {
2812 window_ops: 32,
2813 min_window_ops: 8,
2814 success_target: 0.95,
2815 timeout_ceiling: 0.10,
2816 ..cfg_for_tests()
2817 };
2818 let l = Limiter::new(32, cfg);
2819 for _ in 0..8 {
2823 l.observe(Outcome::Timeout, Duration::from_millis(10));
2824 }
2825 let after_burst = l.current();
2828 assert!(
2829 after_burst >= 16,
2830 "transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
2831 );
2832 }
2833
2834 #[tokio::test]
2839 async fn transport_errors_classify_as_capacity_signal() {
2840 use crate::data::client::classify_error;
2841 use crate::data::error::Error;
2842 let make_cfg = || LimiterConfig {
2843 window_ops: 16,
2844 min_window_ops: 5,
2845 success_target: 0.5,
2846 timeout_ceiling: 0.5,
2847 ..cfg_for_tests()
2848 };
2849 type ErrFactory = Box<dyn Fn() -> Error>;
2851 let cases: Vec<(&str, ErrFactory)> = vec![
2852 ("Network", Box::new(|| Error::Network("net".to_string()))),
2853 (
2854 "InsufficientPeers",
2855 Box::new(|| Error::InsufficientPeers("ip".to_string())),
2856 ),
2857 ("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
2858 ("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
2859 ("Storage", Box::new(|| Error::Storage("s".to_string()))),
2860 (
2861 "PartialUpload",
2862 Box::new(|| Error::PartialUpload {
2863 stored: vec![],
2864 stored_count: 0,
2865 failed: vec![],
2866 failed_count: 0,
2867 total_chunks: 0,
2868 spend: Box::new(crate::data::error::PartialUploadSpend {
2869 storage_cost_atto: "0".to_string(),
2870 gas_cost_wei: 0,
2871 }),
2872 reason: "r".to_string(),
2873 }),
2874 ),
2875 ];
2876 for (name, mk) in &cases {
2877 let l = Limiter::new(8, make_cfg());
2878 for _ in 0..16 {
2879 let _: std::result::Result<(), Error> =
2880 observe_op(&l, || async { Err(mk()) }, classify_error).await;
2881 }
2882 let cur = l.current();
2886 assert!(
2887 cur < 8,
2888 "{name} not classified as capacity signal: cap stayed at {cur}",
2889 );
2890 }
2891 }
2892
2893 #[test]
2897 fn per_channel_ceilings_are_independent() {
2898 let cfg = AdaptiveConfig {
2899 max: ChannelMax {
2900 quote: 4, store: 8, fetch: 1024, },
2904 ..AdaptiveConfig::default()
2905 };
2906 let c = AdaptiveController::new(
2907 ChannelStart {
2908 quote: 4,
2909 store: 8,
2910 fetch: 64,
2911 },
2912 cfg,
2913 );
2914 for _ in 0..1000 {
2917 c.quote.observe(Outcome::Success, Duration::from_micros(10));
2918 c.store.observe(Outcome::Success, Duration::from_micros(10));
2919 c.fetch.observe(Outcome::Success, Duration::from_micros(10));
2920 }
2921 assert_eq!(c.quote.current(), 4, "quote should cap at 4");
2922 assert_eq!(c.store.current(), 8, "store should cap at 8");
2923 assert!(
2927 c.fetch.current() > 8 && c.fetch.current() <= 1024,
2928 "fetch did not use its independent ceiling; got {}",
2929 c.fetch.current()
2930 );
2931 }
2932
2933 #[test]
2934 fn fetch_hill_rejects_upward_probe_without_goodput_gain() {
2935 let cfg = hill_cfg_for_tests();
2936 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2937
2938 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2939 assert_eq!(
2940 l.current(),
2941 HILL_TEST_UP_PROBE_CAP,
2942 "first healthy epoch should probe upward"
2943 );
2944
2945 observe_hill_success_epoch_with_latency(
2946 &l,
2947 &cfg,
2948 HILL_TEST_CHUNK_BYTES,
2949 Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
2950 );
2951 assert_eq!(
2952 l.current(),
2953 HILL_TEST_START_CAP,
2954 "slower higher-cap wave should reject the upward probe"
2955 );
2956 assert_eq!(l.snapshot(), HILL_TEST_START_CAP);
2957 }
2958
2959 #[test]
2960 fn fetch_hill_accepts_upward_probe_with_goodput_gain() {
2961 let cfg = hill_cfg_for_tests();
2962 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2963
2964 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2965 assert_eq!(l.current(), HILL_TEST_UP_PROBE_CAP);
2966
2967 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2968 assert_eq!(
2969 l.snapshot(),
2970 HILL_TEST_UP_PROBE_CAP,
2971 "same-size chunks at same latency should promote the higher cap"
2972 );
2973 assert_eq!(
2974 l.current(),
2975 HILL_TEST_NEXT_UP_PROBE_CAP,
2976 "after accepting an upward probe, hill climber should probe higher"
2977 );
2978 }
2979
2980 #[test]
2981 fn fetch_hill_accepts_lower_probe_when_goodput_is_retained() {
2982 let cfg = hill_cfg_for_tests();
2983 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2984
2985 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2986 observe_hill_success_epoch_with_latency(
2987 &l,
2988 &cfg,
2989 HILL_TEST_CHUNK_BYTES,
2990 Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
2991 );
2992 assert_eq!(l.current(), HILL_TEST_START_CAP);
2993
2994 for _ in 0..(HILL_REJECT_COOLDOWN_EPOCHS + HILL_STABLE_PROBE_EPOCHS) {
2995 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2996 }
2997 assert_eq!(
2998 l.current(),
2999 HILL_TEST_DOWN_PROBE_CAP,
3000 "stable best should eventually probe a lower cap"
3001 );
3002
3003 observe_hill_success_epoch_with_latency(
3004 &l,
3005 &cfg,
3006 HILL_TEST_CHUNK_BYTES,
3007 Duration::from_millis(HILL_TEST_RETAINED_DOWN_LATENCY_MS),
3008 );
3009 assert_eq!(
3010 l.snapshot(),
3011 HILL_TEST_DOWN_PROBE_CAP,
3012 "retained goodput at lower concurrency should become the new best"
3013 );
3014 }
3015
3016 #[tokio::test]
3017 async fn fetch_hill_records_constant_size_timed_ops_without_stress() {
3018 let cfg = hill_cfg_for_tests();
3019 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3020 let total_ops = hill_epoch_target_samples(HILL_TEST_START_CAP, &cfg)
3021 + hill_epoch_target_samples(HILL_TEST_UP_PROBE_CAP, &cfg);
3022 let limiter_for_ops = l.clone();
3023
3024 let result: std::result::Result<Vec<()>, ()> =
3025 rebucketed_unordered(&l, 0..total_ops, move |_| {
3026 let limiter = limiter_for_ops.clone();
3027 async move {
3028 observe_op_with_success_bytes(
3029 &limiter,
3030 || async {
3031 tokio::time::sleep(Duration::from_millis(HILL_TEST_ASYNC_LATENCY_MS))
3032 .await;
3033 Ok::<(), ()>(())
3034 },
3035 |_| Outcome::NetworkError,
3036 |_| HILL_TEST_CHUNK_BYTES,
3037 )
3038 .await
3039 }
3040 })
3041 .await;
3042 result.unwrap();
3043
3044 let snapshot = l.snapshot();
3049 assert!(
3050 matches!(snapshot, HILL_TEST_START_CAP | HILL_TEST_UP_PROBE_CAP),
3051 "timed successes should finish at the existing or accepted best cap, got {snapshot}"
3052 );
3053 let current = l.current();
3054 assert!(
3055 matches!(current, HILL_TEST_START_CAP | HILL_TEST_NEXT_UP_PROBE_CAP),
3056 "timed successes should leave the controller unstressed, got {current}"
3057 );
3058 }
3059
3060 #[test]
3061 fn fetch_hill_stress_cuts_before_full_epoch() {
3062 let cfg = LimiterConfig {
3063 window_ops: 8,
3064 min_window_ops: 4,
3065 ..hill_cfg_for_tests()
3066 };
3067 let l = fetch_hill_for_tests(16, cfg.clone());
3068
3069 for _ in 0..cfg.min_window_ops {
3070 l.observe(Outcome::Timeout, Duration::from_millis(10));
3071 }
3072
3073 assert_eq!(
3074 l.current(),
3075 8,
3076 "fetch hill climber should halve on early stress"
3077 );
3078 }
3079
3080 #[test]
3084 fn cold_start_at_least_prior_static_defaults() {
3085 let cs = ChannelStart::default();
3086 assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
3087 assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
3088 assert_eq!(
3089 cs.fetch, FETCH_COLD_START_CONCURRENCY,
3090 "fetch cold-start changed unexpectedly"
3091 );
3092 }
3093
3094 #[test]
3107 fn sustained_stress_reaches_floor_within_bounded_ops() {
3108 let cfg = LimiterConfig {
3109 window_ops: 32,
3110 min_window_ops: 8,
3111 success_target: 0.95,
3112 timeout_ceiling: 0.10,
3113 max_concurrency: 64,
3114 ..cfg_for_tests()
3115 };
3116 let l = Limiter::new(64, cfg);
3117 let mut ops = 0usize;
3118 while l.current() > 1 && ops < 200 {
3119 l.observe(Outcome::Timeout, Duration::from_millis(10));
3120 ops += 1;
3121 }
3122 assert_eq!(
3123 l.current(),
3124 1,
3125 "controller did not reach floor within 200 observations under \
3126 sustained timeout stress; took {ops} ops, ended at cap {}",
3127 l.current()
3128 );
3129 }
3130
3131 #[test]
3136 fn default_controller_has_growth_headroom() {
3137 let c = AdaptiveController::default();
3138 let cs = ChannelStart::default();
3139 let max = ChannelMax::default();
3140 assert_eq!(c.quote.current(), cs.quote);
3141 assert_eq!(c.store.current(), cs.store);
3142 assert_eq!(c.fetch.current(), cs.fetch);
3143 assert!(
3144 max.quote > cs.quote,
3145 "no growth headroom for quote: max={} start={}",
3146 max.quote,
3147 cs.quote
3148 );
3149 assert!(
3150 max.store > cs.store,
3151 "no growth headroom for store: max={} start={}",
3152 max.store,
3153 cs.store
3154 );
3155 assert!(
3156 max.fetch > cs.fetch,
3157 "no growth headroom for fetch: max={} start={}",
3158 max.fetch,
3159 cs.fetch
3160 );
3161 }
3162
3163 #[test]
3170 fn warm_start_floors_at_cold_defaults() {
3171 let c = AdaptiveController::default();
3172 let cold = ChannelStart::default();
3173 let bad_snap = ChannelStart {
3175 quote: 1,
3176 store: 1,
3177 fetch: 1,
3178 };
3179 c.warm_start(bad_snap);
3180 assert_eq!(
3183 c.quote.current(),
3184 cold.quote,
3185 "quote warm_start did not floor at cold default"
3186 );
3187 assert_eq!(
3188 c.store.current(),
3189 cold.store,
3190 "store warm_start did not floor at cold default"
3191 );
3192 assert_eq!(
3193 c.fetch.current(),
3194 cold.fetch,
3195 "fetch warm_start did not floor at cold default"
3196 );
3197 }
3198
3199 #[test]
3202 fn warm_start_honors_values_above_cold_floor() {
3203 let c = AdaptiveController::default();
3204 let cold = ChannelStart::default();
3205 let snap = ChannelStart {
3206 quote: cold.quote * 2,
3207 store: cold.store * 4,
3208 fetch: cold.fetch * 2,
3209 };
3210 c.warm_start(snap);
3211 assert_eq!(c.quote.current(), snap.quote);
3212 assert_eq!(c.store.current(), snap.store);
3213 assert_eq!(c.fetch.current(), snap.fetch);
3214 }
3215
3216 #[tokio::test]
3223 async fn rebucketed_picks_up_cap_changes_mid_stream() {
3224 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3225 use std::sync::Arc as StdArc;
3226 let cfg = LimiterConfig {
3227 min_concurrency: 1,
3228 max_concurrency: 32,
3229 ..cfg_for_tests()
3230 };
3231 let l = Limiter::new(4, cfg);
3232 let max_seen = StdArc::new(AtomicUsize::new(0));
3233 let in_flight = StdArc::new(AtomicUsize::new(0));
3234 let processed = StdArc::new(AtomicUsize::new(0));
3235 let l_for_bump = l.clone();
3236 let processed_for_bump = processed.clone();
3237 let bump_handle = tokio::spawn(async move {
3240 loop {
3241 tokio::time::sleep(Duration::from_millis(2)).await;
3242 if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
3243 l_for_bump.warm_start(16);
3244 return;
3245 }
3246 }
3247 });
3248 let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
3249 let max_seen = max_seen.clone();
3250 let in_flight = in_flight.clone();
3251 let processed = processed.clone();
3252 async move {
3253 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3254 max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
3255 tokio::time::sleep(Duration::from_millis(1)).await;
3256 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3257 processed.fetch_add(1, AtomicOrdering::Relaxed);
3258 Ok::<(), &'static str>(())
3259 }
3260 })
3261 .await
3262 .unwrap();
3263 bump_handle.await.unwrap();
3264 let peak = max_seen.load(AtomicOrdering::Relaxed);
3268 assert!(
3269 peak > 4,
3270 "rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
3271 );
3272 }
3273
3274 #[tokio::test]
3283 async fn observe_op_cancellation_drops_silently() {
3284 let cfg = LimiterConfig {
3285 window_ops: 16,
3286 min_window_ops: 4,
3287 ..cfg_for_tests()
3288 };
3289 let l = Limiter::new(4, cfg);
3290 let l_clone = l.clone();
3294 let fut = observe_op(
3295 &l_clone,
3296 || async {
3297 std::future::pending::<()>().await;
3298 Ok::<(), &'static str>(())
3299 },
3300 |_| Outcome::Timeout,
3301 );
3302 drop(fut);
3303 assert_eq!(l.current(), 4, "cancelled op moved the cap");
3305 for _ in 0..16 {
3310 let _: Result<(), &'static str> = observe_op(
3311 &l,
3312 || async { Ok(()) },
3313 |_| Outcome::NetworkError,
3315 )
3316 .await;
3317 }
3318 assert!(
3321 l.current() > 4,
3322 "cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
3323 l.current(),
3324 );
3325 }
3326
3327 #[test]
3334 fn save_snapshot_is_synchronous_and_durable() {
3335 let dir = tempfile::tempdir().unwrap();
3336 let path = dir.path().join("client_adaptive.json");
3337 let snap = ChannelStart {
3338 quote: 100,
3339 store: 50,
3340 fetch: 200,
3341 };
3342 save_snapshot(&path, snap);
3343 assert!(
3346 path.exists(),
3347 "save_snapshot did not write file synchronously"
3348 );
3349 let loaded = load_snapshot(&path).unwrap();
3350 assert_eq!(loaded.quote, 100);
3351 assert_eq!(loaded.store, 50);
3352 assert_eq!(loaded.fetch, 200);
3353 }
3354
3355 #[tokio::test]
3362 async fn warm_start_disables_slow_start_doubling() {
3363 let cfg = LimiterConfig {
3364 window_ops: 8,
3365 min_window_ops: 4,
3366 success_target: 0.9,
3367 ..cfg_for_tests()
3368 };
3369 let l = Limiter::new(2, cfg.clone());
3370 l.warm_start(16);
3373 assert_eq!(l.current(), 16);
3374 for _ in 0..cfg.window_ops {
3377 l.observe(Outcome::Success, Duration::from_millis(10));
3378 }
3379 assert_eq!(
3380 l.current(),
3381 17,
3382 "warm-start triggered slow-start doubling instead of additive +1"
3383 );
3384 }
3385
3386 #[test]
3391 fn controller_warm_start_floors_at_per_instance_cold_start() {
3392 let custom_cold = ChannelStart {
3393 quote: 2,
3394 store: 1,
3395 fetch: 4,
3396 };
3397 let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
3398 c.warm_start(ChannelStart {
3400 quote: 1,
3401 store: 1,
3402 fetch: 1,
3403 });
3404 assert_eq!(c.quote.current(), 2);
3405 assert_eq!(c.store.current(), 1);
3406 assert_eq!(c.fetch.current(), 4);
3407 c.warm_start(ChannelStart {
3409 quote: 10,
3410 store: 10,
3411 fetch: 10,
3412 });
3413 assert_eq!(c.quote.current(), 10);
3414 assert_eq!(c.store.current(), 10);
3415 assert_eq!(c.fetch.current(), 10);
3416 }
3417
3418 #[test]
3422 fn warm_start_is_noop_when_adaptive_disabled() {
3423 let cfg = AdaptiveConfig {
3424 enabled: false,
3425 ..AdaptiveConfig::default()
3426 };
3427 let custom_cold = ChannelStart {
3428 quote: 5,
3429 store: 5,
3430 fetch: 5,
3431 };
3432 let c = AdaptiveController::new(custom_cold, cfg);
3433 c.warm_start(ChannelStart {
3434 quote: 100,
3435 store: 100,
3436 fetch: 100,
3437 });
3438 assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
3439 assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
3440 assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
3441 }
3442
3443 #[tokio::test]
3447 async fn rebucketed_unordered_is_rolling_not_fenced() {
3448 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3449 use std::sync::Arc as StdArc;
3450 let cfg = LimiterConfig {
3451 min_concurrency: 1,
3452 max_concurrency: 8,
3453 window_ops: 100,
3454 min_window_ops: 50,
3455 ..cfg_for_tests()
3456 };
3457 let l = Limiter::new(4, cfg);
3458 let in_flight = StdArc::new(AtomicUsize::new(0));
3459 let max_in_flight = StdArc::new(AtomicUsize::new(0));
3460 let started = StdArc::new(AtomicUsize::new(0));
3461 let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
3462 let in_flight = in_flight.clone();
3463 let max_in_flight = max_in_flight.clone();
3464 let started = started.clone();
3465 async move {
3466 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3467 max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
3468 started.fetch_add(1, AtomicOrdering::Relaxed);
3469 if i == 0 {
3475 tokio::time::sleep(Duration::from_millis(50)).await;
3476 } else {
3477 tokio::time::sleep(Duration::from_millis(1)).await;
3478 }
3479 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3480 Ok::<(), &'static str>(())
3481 }
3482 })
3483 .await
3484 .unwrap();
3485 assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
3488 let peak = max_in_flight.load(AtomicOrdering::Relaxed);
3489 assert!(
3490 peak >= 4,
3491 "rolling scheduler did not fill cap; peak in-flight = {peak}"
3492 );
3493 }
3494
3495 #[tokio::test]
3497 async fn rebucketed_ordered_preserves_input_order() {
3498 let cfg = LimiterConfig {
3499 min_concurrency: 1,
3500 max_concurrency: 4,
3501 ..cfg_for_tests()
3502 };
3503 let l = Limiter::new(4, cfg);
3504 let items: Vec<usize> = (0..50).collect();
3505 let result: Vec<usize> = rebucketed_ordered(
3506 &l,
3507 items.iter().copied().enumerate(),
3508 |(idx, v)| async move {
3509 let delay = (50 - v) as u64;
3511 tokio::time::sleep(Duration::from_micros(delay)).await;
3512 Ok::<_, &'static str>((idx, v * 10))
3513 },
3514 )
3515 .await
3516 .unwrap();
3517 assert_eq!(result.len(), 50);
3518 for (i, v) in result.iter().enumerate() {
3519 assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
3520 }
3521 }
3522
3523 #[tokio::test]
3528 async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
3529 let cfg = LimiterConfig {
3530 min_concurrency: 1,
3531 max_concurrency: 8,
3532 ..cfg_for_tests()
3533 };
3534 let l = Limiter::new(8, cfg);
3535 let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
3540 let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
3541 let delay = (40 - idx) as u64; tokio::time::sleep(Duration::from_micros(delay)).await;
3543 Ok::<_, &'static str>((idx, hash * 7))
3545 })
3546 .await
3547 .unwrap();
3548 for (i, v) in result.iter().enumerate() {
3549 let expected = (1000 + i as u64) * 7;
3550 assert_eq!(
3551 *v, expected,
3552 "idx {i} paired with wrong content: {v}, expected {expected}"
3553 );
3554 }
3555 }
3556
3557 #[test]
3561 fn save_snapshot_temp_file_is_unique_per_call() {
3562 let dir = tempfile::tempdir().unwrap();
3563 let path = dir.path().join("client_adaptive.json");
3564 for i in 0..100 {
3571 save_snapshot(
3572 &path,
3573 ChannelStart {
3574 quote: i + 1,
3575 store: i + 1,
3576 fetch: i + 1,
3577 },
3578 );
3579 }
3580 let loaded = load_snapshot(&path).unwrap();
3581 assert_eq!(loaded.quote, 100);
3582 assert_eq!(loaded.store, 100);
3583 assert_eq!(loaded.fetch, 100);
3584 let leftover: Vec<_> = std::fs::read_dir(dir.path())
3586 .unwrap()
3587 .filter_map(|e| e.ok())
3588 .filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
3589 .collect();
3590 assert!(
3591 leftover.is_empty(),
3592 "temp files leaked: {:?}",
3593 leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
3594 );
3595 }
3596
3597 #[tokio::test]
3602 async fn rebucketed_empty_input_returns_empty() {
3603 let cfg = cfg_for_tests();
3604 let l = Limiter::new(4, cfg);
3605 let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
3606 Ok::<_, &'static str>(42usize)
3607 })
3608 .await
3609 .unwrap();
3610 assert!(v.is_empty());
3611 let v: Vec<usize> = rebucketed_ordered(
3612 &l,
3613 std::iter::empty::<(usize, ())>(),
3614 |(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
3615 )
3616 .await
3617 .unwrap();
3618 assert!(v.is_empty());
3619 }
3620
3621 #[tokio::test]
3623 async fn rebucketed_exactly_cap_items() {
3624 let cfg = LimiterConfig {
3625 min_concurrency: 1,
3626 max_concurrency: 4,
3627 ..cfg_for_tests()
3628 };
3629 let l = Limiter::new(4, cfg);
3630 let v: Vec<usize> =
3631 rebucketed_unordered(
3632 &l,
3633 0..4usize,
3634 |i| async move { Ok::<_, &'static str>(i * 2) },
3635 )
3636 .await
3637 .unwrap();
3638 assert_eq!(v.len(), 4);
3639 }
3640
3641 #[tokio::test]
3644 async fn rebucketed_preserves_first_error() {
3645 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3646 use std::sync::Arc as StdArc;
3647 let cfg = LimiterConfig {
3648 min_concurrency: 1,
3649 max_concurrency: 4,
3650 ..cfg_for_tests()
3651 };
3652 let l = Limiter::new(4, cfg);
3653 let started = StdArc::new(AtomicUsize::new(0));
3654 let started_clone = started.clone();
3655 let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
3656 let started = started_clone.clone();
3657 async move {
3658 started.fetch_add(1, AtomicOrdering::Relaxed);
3659 if i == 5 {
3660 tokio::time::sleep(Duration::from_micros(100)).await;
3663 return Err("first error");
3664 }
3665 if i == 10 {
3666 return Err("second error - should be ignored");
3667 }
3668 tokio::time::sleep(Duration::from_micros(50)).await;
3669 Ok(())
3670 }
3671 })
3672 .await;
3673 match result {
3674 Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
3675 Ok(_) => panic!("expected error, got ok"),
3676 }
3677 let total = started.load(AtomicOrdering::Relaxed);
3683 assert!(
3684 (5..20).contains(&total),
3685 "started count out of range: {total}"
3686 );
3687 }
3688
3689 #[test]
3692 fn limiter_with_min_equal_max_is_pinned() {
3693 let cfg = LimiterConfig {
3694 min_concurrency: 5,
3695 max_concurrency: 5,
3696 ..cfg_for_tests()
3697 };
3698 let l = Limiter::new(5, cfg);
3699 for _ in 0..1000 {
3700 l.observe(Outcome::Success, Duration::from_millis(1));
3701 }
3702 assert_eq!(l.current(), 5, "cap moved despite min==max");
3703 for _ in 0..1000 {
3704 l.observe(Outcome::Timeout, Duration::from_millis(50));
3705 }
3706 assert_eq!(l.current(), 5, "cap moved despite min==max");
3707 }
3708
3709 #[test]
3712 fn ewma_alpha_zero_returns_prev() {
3713 let prev = Duration::from_millis(100);
3714 let sample = Duration::from_millis(500);
3715 let result = ewma(prev, sample, 0.0);
3716 assert_eq!(result, prev, "alpha=0 must return prev unchanged");
3717 }
3718
3719 #[test]
3722 fn ewma_alpha_one_returns_sample() {
3723 let prev = Duration::from_millis(100);
3724 let sample = Duration::from_millis(500);
3725 let result = ewma(prev, sample, 1.0);
3726 let diff = result.abs_diff(sample);
3728 assert!(
3729 diff <= Duration::from_millis(1),
3730 "alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
3731 );
3732 }
3733
3734 #[test]
3736 fn ewma_alpha_half_returns_midpoint() {
3737 let prev = Duration::from_millis(200);
3738 let sample = Duration::from_millis(400);
3739 let result = ewma(prev, sample, 0.5);
3740 let expected = Duration::from_millis(300);
3741 let diff = result.abs_diff(expected);
3742 assert!(
3743 diff <= Duration::from_millis(1),
3744 "alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
3745 );
3746 }
3747
3748 #[test]
3752 fn ewma_nan_alpha_returns_prev() {
3753 let prev = Duration::from_millis(100);
3754 let sample = Duration::from_millis(500);
3755 let result = ewma(prev, sample, f64::NAN);
3756 assert_eq!(result, prev);
3757 let result = ewma(prev, sample, f64::INFINITY);
3758 assert_eq!(result, prev);
3759 let result = ewma(prev, sample, f64::NEG_INFINITY);
3760 assert_eq!(result, prev);
3761 }
3762
3763 #[test]
3766 fn ewma_clamps_alpha_above_one() {
3767 let prev = Duration::from_millis(100);
3768 let sample = Duration::from_millis(500);
3769 let result = ewma(prev, sample, 2.5);
3770 assert!(result >= Duration::from_millis(499));
3772 assert!(result <= Duration::from_millis(501));
3773 }
3774
3775 #[test]
3779 fn window_full_of_application_errors_does_not_move_cap() {
3780 let cfg = cfg_for_tests();
3781 let l = Limiter::new(8, cfg.clone());
3782 for _ in 0..(cfg.window_ops * 5) {
3783 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3784 }
3785 assert_eq!(
3786 l.current(),
3787 8,
3788 "cap moved on pure-app-error window; should hold"
3789 );
3790 }
3791
3792 #[test]
3796 fn disabled_adaptive_controller_truly_inert() {
3797 let cfg = AdaptiveConfig {
3798 enabled: false,
3799 ..AdaptiveConfig::default()
3800 };
3801 let c = AdaptiveController::new(ChannelStart::default(), cfg);
3802 let baseline_quote = c.quote.current();
3803 let baseline_store = c.store.current();
3804 let baseline_fetch = c.fetch.current();
3805 for _ in 0..10000 {
3806 c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
3807 c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3808 c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
3809 }
3810 assert_eq!(c.quote.current(), baseline_quote);
3811 assert_eq!(c.store.current(), baseline_store);
3812 assert_eq!(c.fetch.current(), baseline_fetch);
3813 }
3814
3815 #[test]
3820 fn channel_state_is_independent() {
3821 let c = AdaptiveController::default();
3822 let q0 = c.quote.current();
3823 let f0 = c.fetch.current();
3824 let s0 = c.store.current();
3825 for _ in 0..1000 {
3826 c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3827 }
3828 assert_eq!(
3830 c.store.current(),
3831 c.config.min_concurrency,
3832 "store did not reach floor after 1000 timeouts; cap={}",
3833 c.store.current()
3834 );
3835 assert!(c.store.current() < s0, "store cap did not move at all");
3836 assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
3838 assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
3839 }
3840
3841 #[test]
3847 fn sanitize_corrects_pathological_floats() {
3848 let mut cfg = AdaptiveConfig {
3849 success_target: f64::NAN,
3850 timeout_ceiling: 5.0,
3851 latency_inflation_factor: f64::NEG_INFINITY,
3852 latency_ewma_alpha: 2.5,
3853 window_ops: 4,
3854 min_window_ops: 10,
3855 ..AdaptiveConfig::default()
3856 };
3857 cfg.sanitize();
3858 assert!(cfg.success_target.is_finite());
3859 assert!((0.0..=1.0).contains(&cfg.success_target));
3860 assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
3861 assert!(cfg.latency_inflation_factor.is_finite());
3862 assert!(cfg.latency_inflation_factor > 0.0);
3863 assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
3864 assert!(
3865 cfg.min_window_ops <= cfg.window_ops,
3866 "min_window_ops {} > window_ops {}",
3867 cfg.min_window_ops,
3868 cfg.window_ops
3869 );
3870 }
3871
3872 #[test]
3877 fn channel_max_serde_round_trips() {
3878 let m = ChannelMax {
3879 quote: 7,
3880 store: 13,
3881 fetch: 200,
3882 };
3883 let json = serde_json::to_string(&m).unwrap();
3884 let back: ChannelMax = serde_json::from_str(&json).unwrap();
3885 assert_eq!(back.quote, 7);
3886 assert_eq!(back.store, 13);
3887 assert_eq!(back.fetch, 200);
3888 }
3889
3890 #[test]
3891 fn channel_start_serde_round_trips() {
3892 let s = ChannelStart {
3893 quote: 11,
3894 store: 22,
3895 fetch: 33,
3896 };
3897 let json = serde_json::to_string(&s).unwrap();
3898 let back: ChannelStart = serde_json::from_str(&json).unwrap();
3899 assert_eq!(back.quote, 11);
3900 assert_eq!(back.store, 22);
3901 assert_eq!(back.fetch, 33);
3902 }
3903
3904 #[tokio::test]
3909 async fn rebucketed_honors_cap_shrinkage_mid_stream() {
3910 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3911 use std::sync::Arc as StdArc;
3912 let cfg = LimiterConfig {
3913 min_concurrency: 1,
3914 max_concurrency: 16,
3915 ..cfg_for_tests()
3916 };
3917 let l = Limiter::new(16, cfg);
3918 let in_flight = StdArc::new(AtomicUsize::new(0));
3919 let max_after_shrink = StdArc::new(AtomicUsize::new(0));
3920 let processed = StdArc::new(AtomicUsize::new(0));
3921 let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
3922 let l_for_shrink = l.clone();
3923 let p_for_shrink = processed.clone();
3924 let shrunk_for_shrink = shrunk.clone();
3925 let shrink_handle = tokio::spawn(async move {
3926 loop {
3928 tokio::time::sleep(Duration::from_millis(2)).await;
3929 if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
3930 l_for_shrink.warm_start(2);
3931 shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
3932 return;
3933 }
3934 }
3935 });
3936 let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
3937 let in_flight = in_flight.clone();
3938 let max_after_shrink = max_after_shrink.clone();
3939 let processed = processed.clone();
3940 let shrunk = shrunk.clone();
3941 async move {
3942 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3943 if shrunk.load(AtomicOrdering::Relaxed) {
3944 max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
3945 }
3946 tokio::time::sleep(Duration::from_millis(1)).await;
3947 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3948 processed.fetch_add(1, AtomicOrdering::Relaxed);
3949 Ok::<(), &'static str>(())
3950 }
3951 })
3952 .await
3953 .unwrap();
3954 shrink_handle.await.unwrap();
3955 let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
3956 assert!(
3961 peak <= 4,
3962 "rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
3963 );
3964 }
3965
3966 #[test]
3972 fn mixed_window_app_errors_with_capacity_signal() {
3973 let cfg = LimiterConfig {
3974 window_ops: 10,
3975 min_window_ops: 5,
3976 timeout_ceiling: 0.2,
3977 success_target: 0.9,
3978 ..cfg_for_tests()
3979 };
3980 let l = Limiter::new(8, cfg.clone());
3985 for _ in 0..5 {
3986 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3987 }
3988 for _ in 0..5 {
3989 l.observe(Outcome::Success, Duration::from_millis(50));
3990 }
3991 assert!(
3992 l.current() >= 8,
3993 "AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
3994 l.current()
3995 );
3996 let l2 = Limiter::new(8, cfg);
3999 for _ in 0..5 {
4000 l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
4001 }
4002 for _ in 0..5 {
4003 l2.observe(Outcome::Timeout, Duration::from_millis(50));
4004 }
4005 assert!(
4006 l2.current() < 8,
4007 "all-timeouts (with AppError padding) did not decrease cap; got {}",
4008 l2.current()
4009 );
4010 }
4011
4012 #[test]
4018 fn concurrent_save_load_no_torn_reads() {
4019 use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
4020 use std::thread;
4021 let dir = tempfile::tempdir().unwrap();
4022 let path = dir.path().join("snap.json");
4023 save_snapshot(
4025 &path,
4026 ChannelStart {
4027 quote: 1,
4028 store: 1,
4029 fetch: 1,
4030 },
4031 );
4032 let stop = std::sync::Arc::new(AtomicBool::new(false));
4033 let p_w = path.clone();
4034 let s_w = stop.clone();
4035 let writer = thread::spawn(move || {
4036 let mut i = 1usize;
4037 while !s_w.load(AtomicOrdering::Relaxed) {
4038 save_snapshot(
4039 &p_w,
4040 ChannelStart {
4041 quote: i,
4042 store: i,
4043 fetch: i,
4044 },
4045 );
4046 i = i.wrapping_add(1).max(1);
4047 }
4048 });
4049 let p_r = path.clone();
4050 let reader = thread::spawn(move || {
4051 let mut torn = 0usize;
4052 for _ in 0..2_000 {
4053 if let Some(snap) = load_snapshot(&p_r) {
4054 if snap.quote != snap.store || snap.store != snap.fetch {
4057 torn += 1;
4058 }
4059 }
4060 }
4061 torn
4062 });
4063 let torn = reader.join().unwrap();
4064 stop.store(true, AtomicOrdering::Relaxed);
4065 writer.join().unwrap();
4066 assert_eq!(
4067 torn, 0,
4068 "observed {torn} torn reads under concurrent writes"
4069 );
4070 }
4071
4072 #[test]
4080 fn save_with_timeout_returns_promptly_on_fast_failure() {
4081 let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
4082 let snap = ChannelStart {
4083 quote: 1,
4084 store: 1,
4085 fetch: 1,
4086 };
4087 let started = Instant::now();
4088 save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
4089 let elapsed = started.elapsed();
4090 assert!(
4093 elapsed < Duration::from_secs(1),
4094 "save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
4095 );
4096 }
4097
4098 #[test]
4103 fn save_with_timeout_bounds_wall_time_on_hang() {
4104 let dir = tempfile::tempdir().unwrap();
4116 let path = dir.path().join("snap.json");
4117 let snap = ChannelStart {
4118 quote: 1,
4119 store: 1,
4120 fetch: 1,
4121 };
4122 let started = Instant::now();
4123 save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
4126 let elapsed = started.elapsed();
4127 assert!(
4128 elapsed < Duration::from_millis(200),
4129 "timeout wrapper did not bound wall time: {elapsed:?}"
4130 );
4131 }
4132}