1use futures::stream::{self, FuturesUnordered, StreamExt};
51use serde::{Deserialize, Serialize};
52use std::collections::VecDeque;
53use std::path::{Path, PathBuf};
54use std::sync::atomic::{AtomicU64, Ordering};
55use std::sync::{Arc, Mutex, PoisonError};
56use std::time::{Duration, Instant};
57use tracing::{debug, warn};
58
59static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
63
64const FETCH_COLD_START_CONCURRENCY: usize = 4;
68
69const HILL_PROBE_STEP_DIVISOR: usize = 4;
71
72const HILL_MIN_PROBE_STEP: usize = 1;
74
75const HILL_UP_PROBE_ACCEPT_RATIO: f64 = 1.05;
77
78const HILL_DOWN_PROBE_ACCEPT_RATIO: f64 = 0.98;
80
81const HILL_REJECT_COOLDOWN_EPOCHS: usize = 2;
83
84const HILL_STABLE_PROBE_EPOCHS: usize = 3;
87
88const HILL_STRESS_DECREASE_DIVISOR: usize = 2;
90
91const HILL_EPOCH_FULL_WAVES: usize = 2;
95
96fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
99 m.lock().unwrap_or_else(PoisonError::into_inner)
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum Outcome {
105 Success,
107 Timeout,
109 NetworkError,
111 ApplicationError,
115}
116
117const FETCH_MIN_FLOOR: usize = 4;
127
128#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
132pub struct ChannelMax {
133 pub quote: usize,
134 pub store: usize,
135 pub fetch: usize,
136}
137
138impl Default for ChannelMax {
139 fn default() -> Self {
140 Self {
145 quote: 128,
146 store: 64,
147 fetch: 256,
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct AdaptiveConfig {
158 pub enabled: bool,
161 pub min_concurrency: usize,
163 pub max: ChannelMax,
165 pub window_ops: usize,
168 pub min_window_ops: usize,
170 pub success_target: f64,
173 pub timeout_ceiling: f64,
176 pub latency_inflation_factor: f64,
179 pub latency_ewma_alpha: f64,
184}
185
186impl AdaptiveConfig {
187 pub fn sanitize(&mut self) {
193 if !self.latency_ewma_alpha.is_finite() {
194 self.latency_ewma_alpha = 0.2;
195 }
196 self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
197 if !self.success_target.is_finite() {
198 self.success_target = 0.95;
199 }
200 self.success_target = self.success_target.clamp(0.0, 1.0);
201 if !self.timeout_ceiling.is_finite() {
202 self.timeout_ceiling = 0.10;
203 }
204 self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
205 if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
206 self.latency_inflation_factor = 4.0;
207 }
208 self.min_concurrency = self.min_concurrency.max(1);
209 self.window_ops = self.window_ops.max(1);
210 self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
211 self.max.quote = self.max.quote.max(self.min_concurrency);
212 self.max.store = self.max.store.max(self.min_concurrency);
213 self.max.fetch = self.max.fetch.max(self.min_concurrency);
214 }
215}
216
217impl Default for AdaptiveConfig {
218 fn default() -> Self {
219 Self {
220 enabled: true,
221 min_concurrency: 1,
222 max: ChannelMax::default(),
223 window_ops: 32,
224 min_window_ops: 8,
225 success_target: 0.95,
226 timeout_ceiling: 0.10,
227 latency_inflation_factor: 4.0,
234 latency_ewma_alpha: 0.2,
235 }
236 }
237}
238
239#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
250pub struct ChannelStart {
251 pub quote: usize,
252 pub store: usize,
253 pub fetch: usize,
254}
255
256impl Default for ChannelStart {
257 fn default() -> Self {
258 Self {
259 quote: 32,
260 store: 8,
261 fetch: FETCH_COLD_START_CONCURRENCY,
262 }
263 }
264}
265
266#[derive(Debug, Clone, Copy)]
268struct Sample {
269 outcome: Outcome,
270 latency: Duration,
271}
272
273#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276enum LimiterAlgorithm {
277 Aimd,
278 ThroughputHillClimb,
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283enum ProbeDirection {
284 Up,
285 Down,
286}
287
288#[derive(Debug)]
290struct HillClimbState {
291 epoch_started: Option<Instant>,
292 epoch_samples: usize,
293 epoch_successes: usize,
294 epoch_timeouts: usize,
295 epoch_net_errors: usize,
296 epoch_bytes: u64,
297 epoch_latencies: Vec<Duration>,
298 best_goodput_per_sec: Option<f64>,
299 best_latency_p95: Option<Duration>,
300 best_concurrency: usize,
301 stable_epochs: usize,
302 cooldown_epochs: usize,
303 next_probe: ProbeDirection,
304 active_probe: Option<ProbeDirection>,
305}
306
307impl HillClimbState {
308 fn new(start: usize, epoch_capacity: usize) -> Self {
309 Self {
310 epoch_started: None,
311 epoch_samples: 0,
312 epoch_successes: 0,
313 epoch_timeouts: 0,
314 epoch_net_errors: 0,
315 epoch_bytes: 0,
316 epoch_latencies: Vec::with_capacity(epoch_capacity),
317 best_goodput_per_sec: None,
318 best_latency_p95: None,
319 best_concurrency: start,
320 stable_epochs: 0,
321 cooldown_epochs: 0,
322 next_probe: ProbeDirection::Up,
323 active_probe: None,
324 }
325 }
326
327 fn reset_epoch(&mut self) {
328 self.epoch_started = None;
329 self.epoch_samples = 0;
330 self.epoch_successes = 0;
331 self.epoch_timeouts = 0;
332 self.epoch_net_errors = 0;
333 self.epoch_bytes = 0;
334 self.epoch_latencies.clear();
335 }
336
337 fn capacity_total(&self) -> usize {
338 self.epoch_successes + self.epoch_timeouts + self.epoch_net_errors
339 }
340}
341
342#[derive(Debug, Clone)]
348pub struct LimiterConfig {
349 pub enabled: bool,
350 pub min_concurrency: usize,
351 pub max_concurrency: usize,
352 pub window_ops: usize,
353 pub min_window_ops: usize,
354 pub success_target: f64,
355 pub timeout_ceiling: f64,
356 pub latency_inflation_factor: f64,
357 pub latency_ewma_alpha: f64,
358 pub slow_start_ramp_threshold: usize,
372 pub latency_decrease_enabled: bool,
382 pub retain_increase_credit_on_decrease: bool,
399}
400
401impl LimiterConfig {
402 fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
403 Self {
404 enabled: cfg.enabled,
405 min_concurrency: cfg.min_concurrency,
406 max_concurrency: max_for_channel.max(cfg.min_concurrency),
407 window_ops: cfg.window_ops,
408 min_window_ops: cfg.min_window_ops,
409 success_target: cfg.success_target,
410 timeout_ceiling: cfg.timeout_ceiling,
411 latency_inflation_factor: cfg.latency_inflation_factor,
412 latency_ewma_alpha: cfg.latency_ewma_alpha,
413 slow_start_ramp_threshold: 0,
416 latency_decrease_enabled: true,
417 retain_increase_credit_on_decrease: false,
418 }
419 }
420
421 fn sanitize(&mut self) {
427 if !self.latency_ewma_alpha.is_finite() {
428 self.latency_ewma_alpha = 0.2;
429 }
430 self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
431 if !self.success_target.is_finite() {
432 self.success_target = 0.95;
433 }
434 self.success_target = self.success_target.clamp(0.0, 1.0);
435 if !self.timeout_ceiling.is_finite() {
436 self.timeout_ceiling = 0.10;
437 }
438 self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
439 if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
440 self.latency_inflation_factor = 4.0;
441 }
442 self.min_concurrency = self.min_concurrency.max(1);
443 self.window_ops = self.window_ops.max(1);
444 self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
445 self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
446 }
447}
448
449#[derive(Debug, Clone)]
455pub struct Limiter {
456 inner: Arc<Mutex<LimiterInner>>,
457 config: Arc<LimiterConfig>,
458 algorithm: LimiterAlgorithm,
459}
460
461#[derive(Debug)]
462struct LimiterInner {
463 current: usize,
465 window: VecDeque<Sample>,
467 samples_since_increase: usize,
471 samples_since_decrease: usize,
476 latency_baseline: Option<Duration>,
479 left_slow_start: bool,
482 hill: HillClimbState,
486}
487
488impl Limiter {
489 #[must_use]
494 pub fn new(start: usize, config: LimiterConfig) -> Self {
495 Self::new_with_algorithm(start, config, LimiterAlgorithm::Aimd)
496 }
497
498 fn new_with_algorithm(
499 start: usize,
500 config: LimiterConfig,
501 algorithm: LimiterAlgorithm,
502 ) -> Self {
503 let mut config = config;
504 config.sanitize();
505 let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
506 let window_cap = config.window_ops;
507 Self {
508 inner: Arc::new(Mutex::new(LimiterInner {
509 current: clamped,
510 window: VecDeque::with_capacity(window_cap),
511 samples_since_increase: 0,
512 samples_since_decrease: 0,
513 latency_baseline: None,
514 left_slow_start: false,
515 hill: HillClimbState::new(clamped, window_cap),
516 })),
517 config: Arc::new(config),
518 algorithm,
519 }
520 }
521
522 #[must_use]
526 pub fn current(&self) -> usize {
527 lock(&self.inner).current
528 }
529
530 pub fn observe(&self, outcome: Outcome, latency: Duration) {
533 self.observe_with_bytes(outcome, latency, 0);
534 }
535
536 pub fn observe_with_bytes(&self, outcome: Outcome, latency: Duration, bytes: u64) {
539 let observed_at = Instant::now();
540 let operation_started = observed_at.checked_sub(latency).unwrap_or(observed_at);
541 self.observe_with_timing(outcome, latency, bytes, operation_started);
542 }
543
544 fn observe_with_timing(
545 &self,
546 outcome: Outcome,
547 latency: Duration,
548 bytes: u64,
549 operation_started: Instant,
550 ) {
551 if !self.config.enabled {
552 return;
553 }
554 let mut g = lock(&self.inner);
555 if g.window.len() == self.config.window_ops {
556 g.window.pop_front();
557 }
558 g.window.push_back(Sample { outcome, latency });
559 if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
560 observe_hill_climb(
561 &mut g,
562 outcome,
563 latency,
564 bytes,
565 operation_started,
566 &self.config,
567 );
568 return;
569 }
570 g.samples_since_increase = g.samples_since_increase.saturating_add(1);
571 g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
572 if g.window.len() < self.config.min_window_ops {
573 return;
574 }
575 let decision = evaluate(&g.window, &self.config, g.latency_baseline);
576 apply_decision(&mut g, decision, &self.config);
577 }
578
579 pub fn warm_start(&self, start: usize) {
602 let clamped = start.clamp(
603 self.config.min_concurrency,
604 self.config.max_concurrency.max(1),
605 );
606 let mut g = lock(&self.inner);
607 g.current = clamped;
608 g.left_slow_start = clamped >= self.config.slow_start_ramp_threshold;
609 g.hill = HillClimbState::new(clamped, self.config.window_ops);
610 }
611
612 #[must_use]
614 pub fn snapshot(&self) -> usize {
615 let g = lock(&self.inner);
616 if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
617 g.hill.best_concurrency
618 } else {
619 g.current
620 }
621 }
622}
623
624#[derive(Debug, Clone, Copy)]
625struct HillEpochStats {
626 goodput_per_sec: f64,
627 latency_p95: Option<Duration>,
628}
629
630#[derive(Debug, Clone, Copy, PartialEq, Eq)]
632enum Decision {
633 Increase,
635 Decrease,
637 Hold,
639}
640
641fn evaluate(
642 window: &VecDeque<Sample>,
643 cfg: &LimiterConfig,
644 baseline: Option<Duration>,
645) -> Decision {
646 let mut successes = 0usize;
651 let mut timeouts = 0usize;
652 let mut net_errors = 0usize;
653 let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
654 for s in window {
655 match s.outcome {
656 Outcome::Success => {
657 successes += 1;
658 latencies.push(s.latency);
659 }
660 Outcome::Timeout => timeouts += 1,
661 Outcome::NetworkError => net_errors += 1,
662 Outcome::ApplicationError => {}
663 }
664 }
665 let capacity_total = successes + timeouts + net_errors;
666 if capacity_total < cfg.min_window_ops {
667 return Decision::Hold;
669 }
670 let total_f = capacity_total as f64;
671 let success_rate = successes as f64 / total_f;
672 let timeout_rate = timeouts as f64 / total_f;
673
674 if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
675 return Decision::Decrease;
676 }
677
678 if let Some(p95) = p95_of(&mut latencies) {
679 if cfg.latency_decrease_enabled {
680 if let Some(base) = baseline {
681 let limit = base.mul_f64(cfg.latency_inflation_factor);
682 if p95 > limit {
683 return Decision::Decrease;
684 }
685 }
686 }
687 Decision::Increase
688 } else {
689 Decision::Hold
690 }
691}
692
693fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
694 match decision {
695 Decision::Increase => {
696 if inner.samples_since_increase < cfg.window_ops {
699 return;
700 }
701 let p95 = window_p95(&inner.window);
702 inner.latency_baseline = Some(match inner.latency_baseline {
703 None => p95,
704 Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
705 });
706 let next = if inner.left_slow_start {
707 inner.current.saturating_add(1)
708 } else {
709 inner.current.saturating_mul(2)
710 };
711 let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
712 if next != inner.current {
713 debug!(
714 from = inner.current,
715 to = next,
716 slow_start = !inner.left_slow_start,
717 "adaptive: increase",
718 );
719 }
720 inner.current = next;
721 inner.samples_since_increase = 0;
722 inner.samples_since_decrease = 0;
723 }
724 Decision::Decrease => {
725 if inner.samples_since_decrease < cfg.min_window_ops {
730 return;
731 }
732 if inner.current >= cfg.slow_start_ramp_threshold {
739 inner.left_slow_start = true;
740 }
741 let next = (inner.current / 2).max(cfg.min_concurrency);
742 if next != inner.current {
743 debug!(from = inner.current, to = next, "adaptive: decrease");
744 }
745 inner.current = next;
746 if !cfg.retain_increase_credit_on_decrease {
751 inner.samples_since_increase = 0;
752 }
753 inner.samples_since_decrease = 0;
754 }
755 Decision::Hold => {}
756 }
757}
758
759fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
763 if latencies.is_empty() {
764 return None;
765 }
766 latencies.sort_unstable();
767 let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
768 let idx = idx.saturating_sub(1).min(latencies.len() - 1);
769 latencies.get(idx).copied()
770}
771
772fn window_p95(window: &VecDeque<Sample>) -> Duration {
773 let mut latencies: Vec<Duration> = window
774 .iter()
775 .filter(|s| matches!(s.outcome, Outcome::Success))
776 .map(|s| s.latency)
777 .collect();
778 p95_of(&mut latencies).unwrap_or(Duration::ZERO)
779}
780
781fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
782 let alpha = if alpha.is_finite() {
783 alpha.clamp(0.0, 1.0)
784 } else {
785 return prev;
786 };
787 let prev_ms = prev.as_secs_f64() * 1000.0;
788 let sample_ms = sample.as_secs_f64() * 1000.0;
789 let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
790 if !new_ms.is_finite() || new_ms < 0.0 {
791 return prev;
792 }
793 Duration::from_secs_f64(new_ms / 1000.0)
794}
795
796fn observe_hill_climb(
797 inner: &mut LimiterInner,
798 outcome: Outcome,
799 latency: Duration,
800 bytes: u64,
801 operation_started: Instant,
802 cfg: &LimiterConfig,
803) {
804 match inner.hill.epoch_started {
805 Some(epoch_started) if epoch_started <= operation_started => {}
806 _ => inner.hill.epoch_started = Some(operation_started),
807 }
808 inner.hill.epoch_samples = inner.hill.epoch_samples.saturating_add(1);
809 match outcome {
810 Outcome::Success => {
811 inner.hill.epoch_successes = inner.hill.epoch_successes.saturating_add(1);
812 inner.hill.epoch_bytes = inner.hill.epoch_bytes.saturating_add(bytes);
813 inner.hill.epoch_latencies.push(latency);
814 }
815 Outcome::Timeout => {
816 inner.hill.epoch_timeouts = inner.hill.epoch_timeouts.saturating_add(1);
817 }
818 Outcome::NetworkError => {
819 inner.hill.epoch_net_errors = inner.hill.epoch_net_errors.saturating_add(1);
820 }
821 Outcome::ApplicationError => {}
822 }
823
824 if hill_epoch_stressed(&inner.hill, cfg) {
825 apply_hill_stress(inner, cfg);
826 return;
827 }
828
829 if inner.hill.epoch_samples < hill_epoch_target_samples(inner.current, cfg) {
830 return;
831 }
832
833 if let Some(stats) = hill_epoch_stats(&inner.hill, cfg) {
834 apply_hill_epoch(inner, stats, cfg);
835 }
836 inner.hill.reset_epoch();
837}
838
839fn hill_epoch_target_samples(current: usize, cfg: &LimiterConfig) -> usize {
840 cfg.window_ops
841 .max(current.saturating_mul(HILL_EPOCH_FULL_WAVES))
842 .max(cfg.min_window_ops)
843}
844
845fn hill_epoch_stressed(hill: &HillClimbState, cfg: &LimiterConfig) -> bool {
846 let capacity_total = hill.capacity_total();
847 if capacity_total < cfg.min_window_ops {
848 return false;
849 }
850 let total_f = capacity_total as f64;
851 let success_rate = hill.epoch_successes as f64 / total_f;
852 let timeout_rate = hill.epoch_timeouts as f64 / total_f;
853 success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling
854}
855
856fn hill_epoch_stats(hill: &HillClimbState, cfg: &LimiterConfig) -> Option<HillEpochStats> {
857 let capacity_total = hill.capacity_total();
858 if capacity_total < cfg.min_window_ops || hill.epoch_successes == 0 {
859 return None;
860 }
861 let mut latencies = hill.epoch_latencies.clone();
862 let latency_p95 = p95_of(&mut latencies);
863 let max_latency = latencies.iter().copied().max().unwrap_or(Duration::ZERO);
864 let wall_elapsed = hill.epoch_started.map_or(Duration::ZERO, |s| s.elapsed());
865 let elapsed = wall_elapsed.max(max_latency);
866 let elapsed_secs = elapsed.as_secs_f64();
867 if !elapsed_secs.is_finite() || elapsed_secs <= 0.0 {
868 return None;
869 }
870
871 let units = if hill.epoch_bytes > 0 {
874 hill.epoch_bytes as f64
875 } else {
876 hill.epoch_successes as f64
877 };
878 Some(HillEpochStats {
879 goodput_per_sec: units / elapsed_secs,
880 latency_p95,
881 })
882}
883
884fn apply_hill_stress(inner: &mut LimiterInner, cfg: &LimiterConfig) {
885 let next = (inner.current / HILL_STRESS_DECREASE_DIVISOR)
886 .max(cfg.min_concurrency)
887 .min(cfg.max_concurrency);
888 if next != inner.current {
889 debug!(
890 from = inner.current,
891 to = next,
892 "adaptive: fetch hill stress decrease"
893 );
894 }
895 inner.current = next;
896 inner.hill.best_concurrency = next;
897 inner.hill.best_goodput_per_sec = None;
898 inner.hill.best_latency_p95 = None;
899 inner.hill.stable_epochs = 0;
900 inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
901 inner.hill.active_probe = None;
902 inner.hill.next_probe = ProbeDirection::Up;
903 inner.hill.reset_epoch();
904}
905
906fn apply_hill_epoch(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
907 let Some(best_goodput) = inner.hill.best_goodput_per_sec else {
908 inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
909 inner.hill.best_latency_p95 = stats.latency_p95;
910 inner.hill.best_concurrency = inner.current;
911 probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
912 return;
913 };
914
915 match inner.hill.active_probe {
916 Some(ProbeDirection::Up) => {
917 let improved = stats.goodput_per_sec >= best_goodput * HILL_UP_PROBE_ACCEPT_RATIO;
918 if improved
919 && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
920 {
921 accept_hill_probe(inner, stats, cfg);
922 probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
923 } else {
924 reject_hill_probe(inner);
925 }
926 }
927 Some(ProbeDirection::Down) => {
928 let retained = stats.goodput_per_sec >= best_goodput * HILL_DOWN_PROBE_ACCEPT_RATIO;
929 if retained
930 && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
931 {
932 accept_hill_probe(inner, stats, cfg);
933 inner.hill.next_probe = ProbeDirection::Up;
934 } else {
935 reject_hill_probe(inner);
936 }
937 }
938 None => {
939 refresh_hill_best(inner, stats, cfg);
940 if inner.hill.cooldown_epochs > 0 {
941 inner.hill.cooldown_epochs -= 1;
942 return;
943 }
944 inner.hill.stable_epochs = inner.hill.stable_epochs.saturating_add(1);
945 if inner.hill.stable_epochs >= HILL_STABLE_PROBE_EPOCHS {
946 let direction = inner.hill.next_probe;
947 inner.hill.next_probe = match direction {
948 ProbeDirection::Up => ProbeDirection::Down,
949 ProbeDirection::Down => ProbeDirection::Up,
950 };
951 probe_hill_neighbor(inner, direction, cfg);
952 }
953 }
954 }
955}
956
957fn refresh_hill_best(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
958 inner.hill.best_goodput_per_sec = Some(match inner.hill.best_goodput_per_sec {
959 Some(prev) => ewma_f64(prev, stats.goodput_per_sec, cfg.latency_ewma_alpha),
960 None => stats.goodput_per_sec,
961 });
962 if let Some(latency_p95) = stats.latency_p95 {
963 inner.hill.best_latency_p95 = Some(match inner.hill.best_latency_p95 {
964 Some(prev) => ewma(prev, latency_p95, cfg.latency_ewma_alpha),
965 None => latency_p95,
966 });
967 }
968}
969
970fn hill_latency_acceptable(
971 candidate: Option<Duration>,
972 best: Option<Duration>,
973 cfg: &LimiterConfig,
974) -> bool {
975 match (candidate, best) {
976 (Some(candidate), Some(best)) => candidate <= best.mul_f64(cfg.latency_inflation_factor),
977 _ => true,
978 }
979}
980
981fn ewma_f64(prev: f64, sample: f64, alpha: f64) -> f64 {
982 let alpha = if alpha.is_finite() {
983 alpha.clamp(0.0, 1.0)
984 } else {
985 return prev;
986 };
987 let next = (1.0 - alpha) * prev + alpha * sample;
988 if next.is_finite() && next >= 0.0 {
989 next
990 } else {
991 prev
992 }
993}
994
995fn accept_hill_probe(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
996 debug!(
997 concurrency = inner.current,
998 goodput_per_sec = stats.goodput_per_sec,
999 "adaptive: fetch hill accepted probe"
1000 );
1001 inner.hill.best_concurrency = inner.current;
1002 inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
1003 inner.hill.best_latency_p95 = stats.latency_p95;
1004 inner.hill.active_probe = None;
1005 inner.hill.cooldown_epochs = 0;
1006 inner.hill.stable_epochs = 0;
1007 inner.current = inner
1008 .hill
1009 .best_concurrency
1010 .clamp(cfg.min_concurrency, cfg.max_concurrency);
1011}
1012
1013fn reject_hill_probe(inner: &mut LimiterInner) {
1014 let from = inner.current;
1015 let to = inner.hill.best_concurrency;
1016 let rejected_direction = inner.hill.active_probe;
1017 if from != to {
1018 debug!(from, to, "adaptive: fetch hill rejected probe");
1019 }
1020 inner.current = to;
1021 inner.hill.active_probe = None;
1022 if let Some(direction) = rejected_direction {
1023 inner.hill.next_probe = match direction {
1024 ProbeDirection::Up => ProbeDirection::Down,
1025 ProbeDirection::Down => ProbeDirection::Up,
1026 };
1027 }
1028 inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
1029 inner.hill.stable_epochs = 0;
1030}
1031
1032fn probe_hill_neighbor(inner: &mut LimiterInner, direction: ProbeDirection, cfg: &LimiterConfig) {
1033 let best = inner.hill.best_concurrency;
1034 let step = (best / HILL_PROBE_STEP_DIVISOR).max(HILL_MIN_PROBE_STEP);
1035 let candidate = match direction {
1036 ProbeDirection::Up => best.saturating_add(step).min(cfg.max_concurrency),
1037 ProbeDirection::Down => best.saturating_sub(step).max(cfg.min_concurrency),
1038 };
1039 if candidate == best {
1040 inner.current = best;
1041 inner.hill.active_probe = None;
1042 inner.hill.stable_epochs = 0;
1043 return;
1044 }
1045 debug!(
1046 from = best,
1047 to = candidate,
1048 ?direction,
1049 "adaptive: fetch hill probing"
1050 );
1051 inner.current = candidate;
1052 inner.hill.active_probe = Some(direction);
1053 inner.hill.stable_epochs = 0;
1054}
1055
1056#[derive(Debug, Clone)]
1058pub struct AdaptiveController {
1059 pub quote: Limiter,
1060 pub store: Limiter,
1061 pub fetch: Limiter,
1062 pub(crate) config: AdaptiveConfig,
1069 cold_start: ChannelStart,
1075}
1076
1077impl AdaptiveController {
1078 #[must_use]
1083 pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
1084 let mut config = config;
1085 config.sanitize();
1086 let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
1087 let mut store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
1088 store_cfg.latency_decrease_enabled = false;
1111 store_cfg.slow_start_ramp_threshold = usize::MAX;
1112 store_cfg.retain_increase_credit_on_decrease = true;
1133 store_cfg.success_target = 0.88;
1134 let mut fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
1135 fetch_cfg.min_concurrency = fetch_cfg.min_concurrency.max(FETCH_MIN_FLOOR);
1152 fetch_cfg.max_concurrency = fetch_cfg.max_concurrency.max(fetch_cfg.min_concurrency);
1155 fetch_cfg.slow_start_ramp_threshold = usize::MAX;
1180 fetch_cfg.latency_decrease_enabled = false;
1181 Self {
1182 quote: Limiter::new(start.quote, quote_cfg),
1183 store: Limiter::new(start.store, store_cfg),
1184 fetch: Limiter::new_with_algorithm(
1185 start.fetch,
1186 fetch_cfg,
1187 LimiterAlgorithm::ThroughputHillClimb,
1188 ),
1189 config,
1190 cold_start: start,
1191 }
1192 }
1193
1194 #[must_use]
1196 pub fn snapshot(&self) -> ChannelStart {
1197 ChannelStart {
1198 quote: self.quote.snapshot(),
1199 store: self.store.snapshot(),
1200 fetch: self.fetch.snapshot(),
1201 }
1202 }
1203
1204 #[must_use]
1210 pub fn config(&self) -> &AdaptiveConfig {
1211 &self.config
1212 }
1213
1214 pub fn warm_start(&self, snapshot: ChannelStart) {
1228 if !self.config.enabled {
1229 return;
1230 }
1231 self.quote
1232 .warm_start(snapshot.quote.max(self.cold_start.quote));
1233 self.store
1234 .warm_start(snapshot.store.max(self.cold_start.store));
1235 self.fetch
1236 .warm_start(snapshot.fetch.max(self.cold_start.fetch));
1237 }
1238}
1239
1240impl Default for AdaptiveController {
1241 fn default() -> Self {
1242 Self::new(ChannelStart::default(), AdaptiveConfig::default())
1243 }
1244}
1245
1246struct ObserveGuard<'a> {
1254 limiter: &'a Limiter,
1255 started: Instant,
1256 outcome: Option<(Outcome, Duration, u64)>,
1257}
1258
1259impl<'a> ObserveGuard<'a> {
1260 fn new(limiter: &'a Limiter) -> Self {
1261 Self {
1262 limiter,
1263 started: Instant::now(),
1264 outcome: None,
1265 }
1266 }
1267 fn finish(&mut self, outcome: Outcome) {
1268 self.finish_with_bytes(outcome, 0);
1269 }
1270
1271 fn finish_with_bytes(&mut self, outcome: Outcome, bytes: u64) {
1272 self.outcome = Some((outcome, self.started.elapsed(), bytes));
1273 }
1274}
1275
1276impl Drop for ObserveGuard<'_> {
1277 fn drop(&mut self) {
1278 if let Some((outcome, latency, bytes)) = self.outcome.take() {
1279 self.limiter
1280 .observe_with_timing(outcome, latency, bytes, self.started);
1281 }
1282 }
1283}
1284
1285pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
1300where
1301 F: FnOnce() -> Fut,
1302 Fut: std::future::Future<Output = Result<T, E>>,
1303 C: FnOnce(&E) -> Outcome,
1304{
1305 let mut guard = ObserveGuard::new(limiter);
1306 let result = op().await;
1307 let outcome = match &result {
1308 Ok(_) => Outcome::Success,
1309 Err(e) => classify(e),
1310 };
1311 guard.finish(outcome);
1312 drop(guard); result
1314}
1315
1316pub async fn observe_op_with_success_bytes<T, E, F, Fut, C, B>(
1320 limiter: &Limiter,
1321 op: F,
1322 classify: C,
1323 success_bytes: B,
1324) -> Result<T, E>
1325where
1326 F: FnOnce() -> Fut,
1327 Fut: std::future::Future<Output = Result<T, E>>,
1328 C: FnOnce(&E) -> Outcome,
1329 B: FnOnce(&T) -> u64,
1330{
1331 let mut guard = ObserveGuard::new(limiter);
1332 let result = op().await;
1333 match &result {
1334 Ok(value) => guard.finish_with_bytes(Outcome::Success, success_bytes(value)),
1335 Err(e) => guard.finish_with_bytes(classify(e), 0),
1336 }
1337 drop(guard);
1338 result
1339}
1340
1341pub async fn rebucketed_unordered<I, T, E, F, Fut>(
1356 limiter: &Limiter,
1357 items: I,
1358 mut op: F,
1359) -> Result<Vec<T>, E>
1360where
1361 I: IntoIterator,
1362 F: FnMut(I::Item) -> Fut,
1363 Fut: std::future::Future<Output = Result<T, E>>,
1364{
1365 let mut iter = items.into_iter().peekable();
1366 let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
1367 let mut results = Vec::new();
1368 let mut pending_err: Option<E> = None;
1369 loop {
1370 if pending_err.is_none() {
1373 let cap = limiter.current().max(1);
1374 while in_flight.len() < cap {
1375 match iter.next() {
1376 Some(item) => in_flight.push(op(item)),
1377 None => break,
1378 }
1379 }
1380 }
1381 if in_flight.is_empty() {
1382 break;
1383 }
1384 match in_flight.next().await {
1385 Some(Ok(v)) => results.push(v),
1386 Some(Err(e)) => {
1387 if pending_err.is_none() {
1388 pending_err = Some(e);
1389 }
1390 }
1391 None => break,
1392 }
1393 }
1394 match pending_err {
1395 Some(e) => Err(e),
1396 None => Ok(results),
1397 }
1398}
1399
1400pub async fn rebucketed_ordered<I, U, E, F, Fut>(
1413 limiter: &Limiter,
1414 items: I,
1415 op: F,
1416) -> Result<Vec<U>, E>
1417where
1418 I: IntoIterator,
1419 F: FnMut(I::Item) -> Fut,
1420 Fut: std::future::Future<Output = Result<(usize, U), E>>,
1421{
1422 let mut indexed = rebucketed_unordered(limiter, items, op).await?;
1423 indexed.sort_by_key(|(idx, _)| *idx);
1424 Ok(indexed.into_iter().map(|(_, v)| v).collect())
1425}
1426
1427pub async fn rebucketed<I, T, E, F, Fut>(
1433 limiter: &Limiter,
1434 items: I,
1435 ordered: bool,
1436 mut op: F,
1437) -> Result<Vec<T>, E>
1438where
1439 I: IntoIterator,
1440 F: FnMut(I::Item) -> Fut,
1441 Fut: std::future::Future<Output = Result<T, E>>,
1442{
1443 if !ordered {
1444 return rebucketed_unordered(limiter, items, op).await;
1445 }
1446 let mut iter = items.into_iter();
1447 let mut results = Vec::new();
1448 let mut pending_err: Option<E> = None;
1449 loop {
1450 if pending_err.is_some() {
1451 break;
1452 }
1453 let cap = limiter.current().max(1);
1454 let mut batch = Vec::with_capacity(cap);
1455 for item in iter.by_ref().take(cap) {
1456 batch.push(op(item));
1457 }
1458 if batch.is_empty() {
1459 break;
1460 }
1461 let mut s = stream::iter(batch).buffered(cap);
1462 while let Some(r) = s.next().await {
1463 match r {
1464 Ok(v) => results.push(v),
1465 Err(e) => {
1466 if pending_err.is_none() {
1467 pending_err = Some(e);
1468 }
1469 }
1470 }
1471 }
1472 }
1473 match pending_err {
1474 Some(e) => Err(e),
1475 None => Ok(results),
1476 }
1477}
1478
1479#[derive(Debug, Clone, Serialize, Deserialize)]
1484struct PersistedState {
1485 schema: u32,
1486 channels: ChannelStart,
1487}
1488
1489const PERSIST_SCHEMA: u32 = 2;
1490const PERSIST_SCHEMA_AIMD_FETCH: u32 = 1;
1491const PERSIST_FILENAME: &str = "client_adaptive.json";
1492
1493#[must_use]
1497pub fn default_persist_path() -> Option<PathBuf> {
1498 crate::config::data_dir()
1499 .ok()
1500 .map(|d| d.join(PERSIST_FILENAME))
1501}
1502
1503#[must_use]
1509pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
1510 let bytes = std::fs::read(path).ok()?;
1511 let state: PersistedState = match serde_json::from_slice(&bytes) {
1512 Ok(s) => s,
1513 Err(e) => {
1514 warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
1515 return None;
1516 }
1517 };
1518 match state.schema {
1519 PERSIST_SCHEMA => Some(state.channels),
1520 PERSIST_SCHEMA_AIMD_FETCH => {
1521 debug!(
1522 path = %path.display(),
1523 "adaptive: migrating schema-1 snapshot, preserving quote/store and resetting fetch",
1524 );
1525 Some(ChannelStart {
1526 fetch: FETCH_COLD_START_CONCURRENCY,
1527 ..state.channels
1528 })
1529 }
1530 schema => {
1531 debug!(
1532 path = %path.display(),
1533 schema,
1534 expected = PERSIST_SCHEMA,
1535 "adaptive: snapshot schema mismatch, ignoring",
1536 );
1537 None
1538 }
1539 }
1540}
1541
1542pub fn save_snapshot(path: &Path, channels: ChannelStart) {
1545 let state = PersistedState {
1546 schema: PERSIST_SCHEMA,
1547 channels,
1548 };
1549 let bytes = match serde_json::to_vec_pretty(&state) {
1550 Ok(b) => b,
1551 Err(e) => {
1552 warn!(error = %e, "adaptive: snapshot serialize failed");
1553 return;
1554 }
1555 };
1556 if let Some(parent) = path.parent() {
1557 if let Err(e) = std::fs::create_dir_all(parent) {
1558 warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
1559 return;
1560 }
1561 }
1562 let nanos = std::time::SystemTime::now()
1569 .duration_since(std::time::UNIX_EPOCH)
1570 .map(|d| d.subsec_nanos())
1571 .unwrap_or(0);
1572 let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
1573 let tmp = path.with_extension(format!(
1574 "json.tmp.{}.{}.{}",
1575 std::process::id(),
1576 counter,
1577 nanos
1578 ));
1579 if let Err(e) = std::fs::write(&tmp, &bytes) {
1580 warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
1581 return;
1582 }
1583 if let Err(e) = std::fs::rename(&tmp, path) {
1584 warn!(
1585 from = %tmp.display(),
1586 to = %path.display(),
1587 error = %e,
1588 "adaptive: snapshot rename failed",
1589 );
1590 let _ = std::fs::remove_file(&tmp);
1593 }
1594}
1595
1596pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
1606 let handle = std::thread::spawn(move || {
1607 save_snapshot(&path, channels);
1608 });
1609 let started = Instant::now();
1613 let poll = Duration::from_millis(5);
1614 while started.elapsed() < timeout {
1615 if handle.is_finished() {
1616 let _ = handle.join();
1617 return;
1618 }
1619 std::thread::sleep(poll);
1620 }
1621 warn!(
1625 timeout_ms = timeout.as_millis() as u64,
1626 "adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
1627 );
1628 drop(handle);
1629}
1630
1631#[cfg(test)]
1632#[allow(clippy::unwrap_used)]
1633mod tests {
1634 use super::*;
1635
1636 const HILL_TEST_START_CAP: usize = 16;
1637 const HILL_TEST_UP_PROBE_CAP: usize = 20;
1638 const HILL_TEST_NEXT_UP_PROBE_CAP: usize = 25;
1639 const HILL_TEST_DOWN_PROBE_CAP: usize = 12;
1640 const HILL_TEST_CHUNK_BYTES: u64 = 1_000;
1641 const HILL_TEST_BASE_LATENCY_MS: u64 = 100;
1642 const HILL_TEST_REJECT_LATENCY_MS: u64 = 130;
1643 const HILL_TEST_RETAINED_DOWN_LATENCY_MS: u64 = 75;
1644 const HILL_TEST_ASYNC_LATENCY_MS: u64 = 10;
1645
1646 fn cfg_for_tests() -> LimiterConfig {
1647 LimiterConfig {
1648 enabled: true,
1649 min_concurrency: 1,
1650 max_concurrency: 64,
1651 window_ops: 10,
1652 min_window_ops: 5,
1653 success_target: 0.9,
1654 timeout_ceiling: 0.2,
1655 latency_inflation_factor: 2.0,
1656 latency_ewma_alpha: 0.5,
1657 slow_start_ramp_threshold: 0,
1658 latency_decrease_enabled: true,
1659 retain_increase_credit_on_decrease: false,
1660 }
1661 }
1662
1663 fn hill_cfg_for_tests() -> LimiterConfig {
1664 LimiterConfig {
1665 window_ops: 4,
1666 min_window_ops: 2,
1667 max_concurrency: 64,
1668 success_target: 0.9,
1669 timeout_ceiling: 0.2,
1670 ..cfg_for_tests()
1671 }
1672 }
1673
1674 fn fetch_hill_for_tests(start: usize, cfg: LimiterConfig) -> Limiter {
1675 Limiter::new_with_algorithm(start, cfg, LimiterAlgorithm::ThroughputHillClimb)
1676 }
1677
1678 fn observe_hill_success_epoch_with_latency(
1679 limiter: &Limiter,
1680 cfg: &LimiterConfig,
1681 bytes: u64,
1682 latency: Duration,
1683 ) {
1684 let samples = hill_epoch_target_samples(limiter.current(), cfg);
1685 for _ in 0..samples {
1686 limiter.observe_with_bytes(Outcome::Success, latency, bytes);
1687 }
1688 }
1689
1690 fn observe_hill_success_epoch(limiter: &Limiter, cfg: &LimiterConfig, bytes: u64) {
1691 observe_hill_success_epoch_with_latency(
1692 limiter,
1693 cfg,
1694 bytes,
1695 Duration::from_millis(HILL_TEST_BASE_LATENCY_MS),
1696 );
1697 }
1698
1699 fn adaptive_cfg_for_tests() -> AdaptiveConfig {
1704 let l = cfg_for_tests();
1705 AdaptiveConfig {
1706 enabled: l.enabled,
1707 min_concurrency: l.min_concurrency,
1708 max: ChannelMax {
1709 quote: l.max_concurrency,
1710 store: l.max_concurrency,
1711 fetch: l.max_concurrency,
1712 },
1713 window_ops: l.window_ops,
1714 min_window_ops: l.min_window_ops,
1715 success_target: l.success_target,
1716 timeout_ceiling: l.timeout_ceiling,
1717 latency_inflation_factor: l.latency_inflation_factor,
1718 latency_ewma_alpha: l.latency_ewma_alpha,
1719 }
1720 }
1721
1722 #[test]
1723 fn warm_start_keeps_slow_start_armed_below_protected_threshold() {
1724 let cfg = LimiterConfig {
1733 max_concurrency: 256,
1734 slow_start_ramp_threshold: 256,
1735 latency_decrease_enabled: false,
1736 ..cfg_for_tests()
1737 };
1738 let l = Limiter::new(64, cfg.clone());
1739 l.warm_start(20);
1740 assert_eq!(l.current(), 20);
1741 for _ in 0..cfg.window_ops {
1744 l.observe(Outcome::Success, Duration::from_millis(10));
1745 }
1746 assert_eq!(
1747 l.current(),
1748 40,
1749 "protected channel must double after warm_start, not crawl +1",
1750 );
1751
1752 let default_cfg = LimiterConfig {
1755 max_concurrency: 256,
1756 ..cfg_for_tests()
1757 };
1758 let d = Limiter::new(64, default_cfg.clone());
1759 d.warm_start(20);
1760 for _ in 0..default_cfg.window_ops {
1761 d.observe(Outcome::Success, Duration::from_millis(10));
1762 }
1763 assert_eq!(
1764 d.current(),
1765 21,
1766 "default channel must stay additive after warm_start",
1767 );
1768 }
1769
1770 #[test]
1771 fn slow_start_stays_armed_at_ceiling_with_max_threshold() {
1772 let base = LimiterConfig {
1781 max_concurrency: 256,
1782 latency_decrease_enabled: false,
1783 ..cfg_for_tests()
1784 };
1785 let fixed = Limiter::new(
1786 256,
1787 LimiterConfig {
1788 slow_start_ramp_threshold: usize::MAX,
1789 ..base.clone()
1790 },
1791 );
1792 let buggy = Limiter::new(
1793 256,
1794 LimiterConfig {
1795 slow_start_ramp_threshold: 256,
1796 ..base.clone()
1797 },
1798 );
1799 for l in [&fixed, &buggy] {
1800 for _ in 0..base.window_ops {
1801 l.observe(Outcome::Timeout, Duration::from_millis(10));
1802 }
1803 for _ in 0..(base.window_ops * 10) {
1804 l.observe(Outcome::Success, Duration::from_millis(10));
1805 }
1806 }
1807 assert!(
1808 fixed.current() > buggy.current(),
1809 "MAX-threshold limiter ({}) must out-recover the ceiling-threshold one ({})",
1810 fixed.current(),
1811 buggy.current(),
1812 );
1813 }
1814
1815 #[test]
1816 fn protected_slow_start_recovers_faster_than_additive() {
1817 let base = LimiterConfig {
1822 max_concurrency: 256,
1823 latency_decrease_enabled: false,
1824 ..cfg_for_tests()
1825 };
1826 let protected = Limiter::new(
1827 64,
1828 LimiterConfig {
1829 slow_start_ramp_threshold: 256,
1830 ..base.clone()
1831 },
1832 );
1833 let unprotected = Limiter::new(
1834 64,
1835 LimiterConfig {
1836 slow_start_ramp_threshold: 0,
1837 ..base.clone()
1838 },
1839 );
1840
1841 for l in [&protected, &unprotected] {
1843 for _ in 0..base.window_ops {
1844 l.observe(Outcome::Timeout, Duration::from_millis(10));
1845 }
1846 }
1847 for l in [&protected, &unprotected] {
1851 for _ in 0..(base.window_ops * 10) {
1852 l.observe(Outcome::Success, Duration::from_millis(10));
1853 }
1854 }
1855 assert!(
1856 protected.current() > unprotected.current(),
1857 "protected slow-start ({}) should recover faster than additive ({})",
1858 protected.current(),
1859 unprotected.current(),
1860 );
1861 }
1862
1863 #[test]
1864 fn latency_decrease_disabled_ignores_p95_inflation() {
1865 let cfg = LimiterConfig {
1871 max_concurrency: 256,
1872 slow_start_ramp_threshold: 256,
1873 latency_decrease_enabled: false,
1874 ..cfg_for_tests()
1875 };
1876 let l = Limiter::new(16, cfg.clone());
1877 for _ in 0..cfg.window_ops {
1879 l.observe(Outcome::Success, Duration::from_millis(5));
1880 }
1881 let after_baseline = l.current();
1882 for _ in 0..cfg.window_ops {
1886 l.observe(Outcome::Success, Duration::from_millis(500));
1887 }
1888 assert!(
1889 l.current() >= after_baseline,
1890 "latency inflation must not shrink the cap when the check is disabled: {} < {}",
1891 l.current(),
1892 after_baseline,
1893 );
1894 }
1895
1896 #[test]
1897 fn controller_sets_fetch_channel_download_tuning() {
1898 let c = AdaptiveController::new(ChannelStart::default(), AdaptiveConfig::default());
1902 assert!(
1903 !c.fetch.config.latency_decrease_enabled,
1904 "fetch latency-decrease must be disabled",
1905 );
1906 assert_eq!(
1907 c.fetch.config.slow_start_ramp_threshold,
1908 usize::MAX,
1909 "fetch slow-start must never exit (armed at every cap incl. ceiling)",
1910 );
1911 assert!(
1912 c.quote.config.latency_decrease_enabled,
1913 "quote must keep the latency-decrease check",
1914 );
1915 assert_eq!(
1916 c.quote.config.slow_start_ramp_threshold, 0,
1917 "quote must keep classic AIMD slow-start exit",
1918 );
1919 assert!(
1920 !c.quote.config.retain_increase_credit_on_decrease,
1921 "quote must keep the classic gate (Decrease resets the increase counter)",
1922 );
1923 assert!(
1924 c.store.config.retain_increase_credit_on_decrease,
1925 "store must retain increase credit across a Decrease (V2-554)",
1926 );
1927 assert!(
1928 (c.store.config.success_target - 0.88).abs() < f64::EPSILON,
1929 "store must relax success_target to 0.88 so a few-percent shortfall still ramps (V2-554), got {}",
1930 c.store.config.success_target,
1931 );
1932 assert!(
1933 (c.quote.config.success_target - c.config().success_target).abs() < f64::EPSILON,
1934 "quote must keep the global success_target",
1935 );
1936 assert!(
1940 !c.store.config.latency_decrease_enabled,
1941 "store latency-decrease must be disabled (verification variance is not congestion)",
1942 );
1943 assert_eq!(
1944 c.store.config.slow_start_ramp_threshold,
1945 usize::MAX,
1946 "store slow-start must never exit so a transient Decrease re-doubles",
1947 );
1948 assert_eq!(
1951 c.store.current(),
1952 ChannelStart::default().store,
1953 "store cold-start floor must remain unchanged at 8",
1954 );
1955 }
1956
1957 #[test]
1958 fn store_channel_ramps_and_recovers_under_v2_468_tuning() {
1959 let mut adaptive = adaptive_cfg_for_tests();
1965 adaptive.max.store = 256;
1967 let c = AdaptiveController::new(
1968 ChannelStart {
1969 quote: 8,
1970 store: 8,
1971 fetch: 8,
1972 },
1973 adaptive,
1974 );
1975 let store = &c.store;
1976 let win = c.config().window_ops;
1977
1978 for _ in 0..win {
1981 store.observe(Outcome::Success, Duration::from_millis(5));
1982 }
1983 let after_baseline = store.current();
1984 assert!(after_baseline >= 8, "store should ramp on healthy windows");
1985 for _ in 0..win {
1986 store.observe(Outcome::Success, Duration::from_secs(30));
1987 }
1988 assert!(
1989 store.current() >= after_baseline,
1990 "verification-latency p95 must not shrink store cap: {} < {}",
1991 store.current(),
1992 after_baseline,
1993 );
1994
1995 let before_stress = store.current();
1997 for _ in 0..win {
1998 store.observe(Outcome::Timeout, Duration::from_millis(50));
1999 }
2000 let after_stress = store.current();
2001 assert!(
2002 after_stress < before_stress,
2003 "timeout-rate breach must still cut the store cap: {after_stress} !< {before_stress}",
2004 );
2005
2006 for _ in 0..(win * 8) {
2012 store.observe(Outcome::Success, Duration::from_millis(5));
2013 }
2014 assert!(
2015 store.current() >= before_stress,
2016 "store must re-double back to {before_stress} after a transient Decrease, got {}",
2017 store.current(),
2018 );
2019 }
2020
2021 #[test]
2022 fn store_application_rejections_do_not_move_cap() {
2023 let mut adaptive = adaptive_cfg_for_tests();
2027 adaptive.max.store = 256;
2028 let c = AdaptiveController::new(
2029 ChannelStart {
2030 quote: 8,
2031 store: 8,
2032 fetch: 8,
2033 },
2034 adaptive,
2035 );
2036 let store = &c.store;
2037 let start = store.current();
2038 for _ in 0..(c.config().window_ops * 5) {
2039 store.observe(Outcome::ApplicationError, Duration::from_secs(30));
2040 }
2041 assert_eq!(
2042 store.current(),
2043 start,
2044 "remote app-rejections must not move the store cap",
2045 );
2046 }
2047
2048 #[test]
2049 fn store_gate_rebalance_ramps_where_classic_gate_pins() {
2050 let build = |success_target: f64, retain: bool| {
2061 let mut cfg = cfg_for_tests();
2062 cfg.min_concurrency = 1;
2063 cfg.max_concurrency = 256;
2064 cfg.window_ops = 32;
2065 cfg.min_window_ops = 8;
2066 cfg.success_target = success_target;
2067 cfg.timeout_ceiling = 0.10;
2068 cfg.slow_start_ramp_threshold = usize::MAX;
2071 cfg.latency_decrease_enabled = false;
2072 cfg.retain_increase_credit_on_decrease = retain;
2073 Limiter::new(8, cfg)
2074 };
2075 let fixed = build(0.88, true);
2076 let classic = build(0.95, false);
2077
2078 for _ in 0..80 {
2079 for _ in 0..20 {
2080 fixed.observe(Outcome::Success, Duration::from_millis(5));
2081 classic.observe(Outcome::Success, Duration::from_millis(5));
2082 }
2083 fixed.observe(Outcome::NetworkError, Duration::from_millis(5));
2084 classic.observe(Outcome::NetworkError, Duration::from_millis(5));
2085 }
2086
2087 assert!(
2090 fixed.current() >= 32,
2091 "rebalanced store gate must ramp off the floor under ~5% shortfall, got {}",
2092 fixed.current(),
2093 );
2094 assert!(
2097 classic.current() <= 8,
2098 "classic gate should stay pinned near the floor on the same input, got {}",
2099 classic.current(),
2100 );
2101 assert!(
2102 fixed.current() > classic.current(),
2103 "rebalanced gate {} must out-grow the classic gate {}",
2104 fixed.current(),
2105 classic.current(),
2106 );
2107 }
2108
2109 #[test]
2110 fn cold_start_clamps_into_bounds() {
2111 let cfg = cfg_for_tests();
2112 let l = Limiter::new(1000, cfg.clone());
2113 assert_eq!(l.current(), cfg.max_concurrency);
2114 let l = Limiter::new(0, cfg.clone());
2115 assert_eq!(l.current(), cfg.min_concurrency);
2116 }
2117
2118 #[test]
2119 fn slow_start_doubles_then_caps() {
2120 let cfg = cfg_for_tests();
2121 let l = Limiter::new(2, cfg.clone());
2122 for _ in 0..cfg.window_ops {
2124 l.observe(Outcome::Success, Duration::from_millis(50));
2125 }
2126 assert_eq!(l.current(), 4);
2127 for _ in 0..cfg.window_ops {
2128 l.observe(Outcome::Success, Duration::from_millis(50));
2129 }
2130 assert_eq!(l.current(), 8);
2131 }
2132
2133 #[test]
2134 fn first_failure_exits_slow_start() {
2135 let cfg = cfg_for_tests();
2136 let l = Limiter::new(4, cfg.clone());
2137 for _ in 0..6 {
2141 l.observe(Outcome::Success, Duration::from_millis(50));
2142 }
2143 for _ in 0..4 {
2144 l.observe(Outcome::Timeout, Duration::from_millis(50));
2145 }
2146 let after_stress = l.current();
2147 assert!(
2148 after_stress < 4,
2149 "stress should reduce concurrency from 4, got {after_stress}",
2150 );
2151 for _ in 0..(cfg.window_ops * 5) {
2159 l.observe(Outcome::Success, Duration::from_millis(50));
2160 }
2161 assert!(
2162 l.current() > after_stress,
2163 "expected recovery above {after_stress}, got {}",
2164 l.current(),
2165 );
2166 }
2167
2168 #[test]
2169 fn floor_holds_at_one() {
2170 let cfg = cfg_for_tests();
2171 let l = Limiter::new(2, cfg);
2172 for _ in 0..30 {
2173 l.observe(Outcome::Timeout, Duration::from_millis(50));
2174 }
2175 assert_eq!(l.current(), 1);
2176 }
2177
2178 #[test]
2179 fn application_errors_do_not_punish() {
2180 let cfg = cfg_for_tests();
2181 let l = Limiter::new(4, cfg.clone());
2182 for _ in 0..cfg.window_ops * 5 {
2189 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
2190 }
2191 assert_eq!(
2192 l.current(),
2193 4,
2194 "ApplicationError must not move the cap; got {}",
2195 l.current()
2196 );
2197 }
2198
2199 #[test]
2200 fn latency_inflation_triggers_decrease() {
2201 let cfg = LimiterConfig {
2202 window_ops: 20,
2203 min_window_ops: 5,
2204 ..cfg_for_tests()
2205 };
2206 let l = Limiter::new(4, cfg.clone());
2207 for _ in 0..cfg.window_ops {
2209 l.observe(Outcome::Success, Duration::from_millis(50));
2210 }
2211 let after_baseline = l.current();
2212 for _ in 0..cfg.window_ops {
2214 l.observe(Outcome::Success, Duration::from_millis(500));
2215 }
2216 assert!(
2218 l.current() < after_baseline,
2219 "expected decrease from {after_baseline}, got {}",
2220 l.current(),
2221 );
2222 }
2223
2224 #[test]
2225 fn warm_start_overrides_current() {
2226 let cfg = cfg_for_tests();
2227 let l = Limiter::new(2, cfg);
2228 l.warm_start(20);
2229 assert_eq!(l.current(), 20);
2230 }
2231
2232 #[test]
2233 fn warm_start_clamps() {
2234 let cfg = cfg_for_tests();
2235 let l = Limiter::new(2, cfg.clone());
2236 l.warm_start(1_000_000);
2237 assert_eq!(l.current(), cfg.max_concurrency);
2238 }
2239
2240 #[test]
2241 fn disabled_controller_holds_steady() {
2242 let cfg = LimiterConfig {
2243 enabled: false,
2244 ..cfg_for_tests()
2245 };
2246 let l = Limiter::new(8, cfg);
2247 for _ in 0..50 {
2248 l.observe(Outcome::Timeout, Duration::from_millis(50));
2249 }
2250 assert_eq!(l.current(), 8);
2251 }
2252
2253 #[test]
2254 fn controller_snapshot_round_trips() {
2255 let c = AdaptiveController::new(
2261 ChannelStart {
2262 quote: 64,
2263 store: 16,
2264 fetch: 64,
2265 },
2266 adaptive_cfg_for_tests(),
2267 );
2268 let snap = c.snapshot();
2269 assert_eq!(snap.quote, 64);
2270 assert_eq!(snap.store, 16);
2271 assert_eq!(snap.fetch, 64);
2272
2273 let c2 = AdaptiveController::default();
2274 c2.warm_start(snap);
2275 assert_eq!(c2.quote.current(), 64);
2276 assert_eq!(c2.store.current(), 16);
2277 assert_eq!(c2.fetch.current(), 64);
2278 }
2279
2280 #[tokio::test]
2281 async fn observe_op_records_success() {
2282 let cfg = cfg_for_tests();
2283 let l = Limiter::new(4, cfg.clone());
2284 for _ in 0..cfg.window_ops {
2285 let _: Result<(), &str> =
2286 observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
2287 }
2288 assert_eq!(l.current(), 8);
2290 }
2291
2292 #[test]
2293 fn snapshot_round_trips_through_disk() {
2294 let dir = tempfile::tempdir().unwrap();
2295 let path = dir.path().join("client_adaptive.json");
2296 let snap = ChannelStart {
2297 quote: 24,
2298 store: 6,
2299 fetch: 12,
2300 };
2301 save_snapshot(&path, snap);
2302 let loaded = load_snapshot(&path).unwrap();
2303 assert_eq!(loaded.quote, 24);
2304 assert_eq!(loaded.store, 6);
2305 assert_eq!(loaded.fetch, 12);
2306 }
2307
2308 #[test]
2309 fn load_missing_returns_none() {
2310 let dir = tempfile::tempdir().unwrap();
2311 let path = dir.path().join("does_not_exist.json");
2312 assert!(load_snapshot(&path).is_none());
2313 }
2314
2315 #[test]
2316 fn load_corrupt_returns_none() {
2317 let dir = tempfile::tempdir().unwrap();
2318 let path = dir.path().join("bad.json");
2319 std::fs::write(&path, b"not valid json{{{").unwrap();
2320 assert!(load_snapshot(&path).is_none());
2321 }
2322
2323 #[test]
2324 fn load_wrong_schema_returns_none() {
2325 let dir = tempfile::tempdir().unwrap();
2326 let path = dir.path().join("future.json");
2327 let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
2330 std::fs::write(&path, payload).unwrap();
2331 assert!(load_snapshot(&path).is_none());
2332 }
2333
2334 #[test]
2335 fn load_schema_one_preserves_quote_store_and_resets_fetch() {
2336 const LEGACY_QUOTE_CAP: usize = 48;
2337 const LEGACY_STORE_CAP: usize = 24;
2338 const LEGACY_FETCH_CAP: usize = 96;
2339
2340 let dir = tempfile::tempdir().unwrap();
2341 let path = dir.path().join("legacy.json");
2342 let payload = format!(
2343 r#"{{"schema":{},"channels":{{"quote":{},"store":{},"fetch":{}}}}}"#,
2344 PERSIST_SCHEMA_AIMD_FETCH, LEGACY_QUOTE_CAP, LEGACY_STORE_CAP, LEGACY_FETCH_CAP,
2345 );
2346 std::fs::write(&path, payload).unwrap();
2347
2348 let loaded = load_snapshot(&path).unwrap();
2349
2350 assert_eq!(loaded.quote, LEGACY_QUOTE_CAP);
2351 assert_eq!(loaded.store, LEGACY_STORE_CAP);
2352 assert_eq!(loaded.fetch, FETCH_COLD_START_CONCURRENCY);
2353 }
2354
2355 #[tokio::test]
2356 async fn observe_op_records_classified_error() {
2357 let cfg = cfg_for_tests();
2358 let l = Limiter::new(4, cfg.clone());
2359 for _ in 0..cfg.window_ops {
2360 let _: Result<(), &str> =
2361 observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
2362 }
2363 assert!(l.current() < 4);
2364 }
2365
2366 #[test]
2376 fn no_regression_cold_start_at_least_static_defaults() {
2377 let s = ChannelStart::default();
2378 assert!(
2379 s.quote >= 32,
2380 "quote cold-start regressed: got {}, prior static was 32",
2381 s.quote,
2382 );
2383 assert!(
2384 s.store >= 8,
2385 "store cold-start regressed: got {}, prior static was 8",
2386 s.store,
2387 );
2388 assert_eq!(
2389 s.fetch, FETCH_COLD_START_CONCURRENCY,
2390 "fetch cold-start changed unexpectedly: got {}, expected {}",
2391 s.fetch, FETCH_COLD_START_CONCURRENCY,
2392 );
2393 }
2394
2395 #[test]
2399 fn controller_default_config_is_sane() {
2400 let c = AdaptiveController::default();
2401 let starts = ChannelStart::default();
2402 assert_eq!(c.quote.current(), starts.quote);
2403 assert_eq!(c.store.current(), starts.store);
2404 assert_eq!(c.fetch.current(), starts.fetch);
2405 assert_eq!(lock(&c.quote.inner).window.len(), 0);
2407 assert_eq!(lock(&c.store.inner).window.len(), 0);
2408 assert_eq!(lock(&c.fetch.inner).window.len(), 0);
2409 }
2410
2411 #[test]
2415 fn alternating_success_failure_collapses_to_floor() {
2416 let cfg = cfg_for_tests();
2422 let l = Limiter::new(8, cfg.clone());
2423 let mut min_observed = usize::MAX;
2424 let mut max_observed = 0usize;
2425 let mut floor_visits = 0usize;
2426 for i in 0..1000 {
2427 let outcome = if i % 2 == 0 {
2428 Outcome::Success
2429 } else {
2430 Outcome::Timeout
2431 };
2432 l.observe(outcome, Duration::from_millis(50));
2433 let cur = l.current();
2434 assert!(
2435 cur >= cfg.min_concurrency,
2436 "cap underflowed floor at iter {i}: got {cur}",
2437 );
2438 min_observed = min_observed.min(cur);
2439 max_observed = max_observed.max(cur);
2440 if cur == cfg.min_concurrency {
2441 floor_visits += 1;
2442 }
2443 }
2444 assert_eq!(
2445 min_observed, cfg.min_concurrency,
2446 "cap never reached the floor under 50% timeout rate"
2447 );
2448 assert!(
2449 max_observed >= 8,
2450 "cap never visited the start value: max_observed={max_observed}"
2451 );
2452 assert!(
2456 floor_visits > 500,
2457 "cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
2458 );
2459 assert_eq!(
2460 l.current(),
2461 cfg.min_concurrency,
2462 "controller did not settle at floor after 1000 alternations"
2463 );
2464 }
2465
2466 #[test]
2470 fn pure_success_stream_recovers_to_max() {
2471 let cfg = cfg_for_tests();
2472 let l = Limiter::new(cfg.min_concurrency, cfg.clone());
2473 for _ in 0..10_000 {
2474 l.observe(Outcome::Success, Duration::from_millis(5));
2475 }
2476 assert_eq!(
2477 l.current(),
2478 cfg.max_concurrency,
2479 "expected recovery to max ({}), got {}",
2480 cfg.max_concurrency,
2481 l.current(),
2482 );
2483 }
2484
2485 #[test]
2489 fn stress_then_heal_drives_floor_then_recovery() {
2490 let cfg = cfg_for_tests();
2491 let l = Limiter::new(8, cfg.clone());
2492 for _ in 0..100 {
2493 l.observe(Outcome::Timeout, Duration::from_millis(50));
2494 }
2495 let after_stress = l.current();
2496 assert_eq!(
2497 after_stress, cfg.min_concurrency,
2498 "stress should drive cap to floor, got {after_stress}",
2499 );
2500 for _ in 0..1_000 {
2501 l.observe(Outcome::Success, Duration::from_millis(10));
2502 }
2503 let after_heal = l.current();
2504 assert!(
2505 after_heal >= cfg.min_concurrency.saturating_add(4),
2506 "expected substantial recovery from floor, got {after_heal}",
2507 );
2508 }
2509
2510 #[test]
2514 fn baseline_does_not_grow_unbounded_under_slow_links() {
2515 let cfg = cfg_for_tests();
2516 let l = Limiter::new(2, cfg.clone());
2517 for _ in 0..(cfg.window_ops * 10) {
2518 l.observe(Outcome::Success, Duration::from_millis(500));
2519 }
2520 let baseline = lock(&l.inner).latency_baseline;
2521 let base = baseline.expect("baseline should be set after many healthy windows");
2522 assert!(
2523 base > Duration::ZERO,
2524 "baseline must not stay at ZERO, got {base:?}",
2525 );
2526 let lo = Duration::from_millis(250);
2528 let hi = Duration::from_millis(1000);
2529 assert!(
2530 base >= lo && base <= hi,
2531 "baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
2532 );
2533 }
2534
2535 #[test]
2540 fn baseline_initialized_only_after_first_healthy_window() {
2541 let cfg = cfg_for_tests();
2542 let l = Limiter::new(8, cfg.clone());
2543 for _ in 0..50 {
2544 l.observe(Outcome::Timeout, Duration::from_millis(50));
2545 }
2546 assert!(
2548 lock(&l.inner).latency_baseline.is_none(),
2549 "baseline must be None before any healthy window",
2550 );
2551 for _ in 0..(cfg.window_ops * 5) {
2553 l.observe(Outcome::Success, Duration::from_millis(20));
2554 }
2555 let baseline = lock(&l.inner).latency_baseline;
2556 assert!(
2557 baseline.is_some(),
2558 "baseline must be Some after healthy windows",
2559 );
2560 let base = baseline.unwrap_or_default();
2561 assert!(
2562 base > Duration::ZERO,
2563 "baseline must reflect real latency, got {base:?}",
2564 );
2565 }
2566
2567 #[test]
2570 fn min_concurrency_floor_holds_under_torrent_of_errors() {
2571 let cfg = cfg_for_tests();
2572 let l = Limiter::new(8, cfg.clone());
2573 for i in 0..50_000 {
2574 l.observe(Outcome::Timeout, Duration::from_millis(50));
2575 if i == 100 || i == 1_000 || i == 49_999 {
2576 let cur = l.current();
2577 assert_eq!(
2578 cur, cfg.min_concurrency,
2579 "floor breached at iter {i}: got {cur}",
2580 );
2581 }
2582 }
2583 }
2584
2585 #[test]
2587 fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
2588 let cfg = cfg_for_tests();
2589 let start = cfg
2590 .max_concurrency
2591 .saturating_sub(1)
2592 .max(cfg.min_concurrency);
2593 let l = Limiter::new(start, cfg.clone());
2594 for i in 0..50_000 {
2595 l.observe(Outcome::Success, Duration::from_millis(5));
2596 if i == 100 || i == 1_000 || i == 49_999 {
2597 let cur = l.current();
2598 assert!(
2599 cur <= cfg.max_concurrency,
2600 "ceiling breached at iter {i}: got {cur} > {}",
2601 cfg.max_concurrency,
2602 );
2603 }
2604 }
2605 assert_eq!(l.current(), cfg.max_concurrency);
2606 }
2607
2608 #[test]
2614 fn saturating_arithmetic_handles_extreme_config() {
2615 let cfg = LimiterConfig {
2616 max_concurrency: usize::MAX / 2,
2617 ..cfg_for_tests()
2618 };
2619 let start = usize::MAX / 4;
2620 let l = Limiter::new(start, cfg.clone());
2621 for _ in 0..(cfg.window_ops * 10) {
2622 l.observe(Outcome::Success, Duration::from_millis(1));
2623 }
2624 assert_eq!(
2629 l.current(),
2630 cfg.max_concurrency,
2631 "saturating math survived but cap did not grow to ceiling"
2632 );
2633 }
2634
2635 #[test]
2642 fn window_eviction_is_fifo() {
2643 let cfg = LimiterConfig {
2644 window_ops: 10,
2645 min_window_ops: 5,
2646 success_target: 0.9,
2647 timeout_ceiling: 0.1,
2648 ..cfg_for_tests()
2649 };
2650 let l = Limiter::new(8, cfg.clone());
2651 for _ in 0..cfg.window_ops {
2656 l.observe(Outcome::Timeout, Duration::from_millis(50));
2657 }
2658 let after_stress = l.current();
2659 assert!(
2660 after_stress < 8,
2661 "expected cap to drop from 8 after pure-timeout window, got {after_stress}"
2662 );
2663 for _ in 0..(cfg.window_ops * 3) {
2668 l.observe(Outcome::Success, Duration::from_millis(20));
2669 }
2670 let after_recovery = l.current();
2671 assert!(
2674 after_recovery > after_stress,
2675 "FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
2676 );
2677 }
2678
2679 #[test]
2682 fn disabled_controller_returns_initial_value_invariantly() {
2683 let cfg = LimiterConfig {
2684 enabled: false,
2685 ..cfg_for_tests()
2686 };
2687 let initial = 8;
2688 let l = Limiter::new(initial, cfg);
2689 for i in 0..1_000 {
2690 let outcome = match i % 4 {
2691 0 => Outcome::Success,
2692 1 => Outcome::Timeout,
2693 2 => Outcome::NetworkError,
2694 _ => Outcome::ApplicationError,
2695 };
2696 l.observe(outcome, Duration::from_millis(50));
2697 assert_eq!(
2698 l.current(),
2699 initial,
2700 "disabled controller moved at iter {i}",
2701 );
2702 }
2703 }
2704
2705 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2708 async fn concurrent_observations_do_not_corrupt_window() {
2709 let cfg = cfg_for_tests();
2710 let l = Limiter::new(4, cfg.clone());
2711 let mut handles = Vec::with_capacity(100);
2712 for _ in 0..100 {
2713 let l_clone = l.clone();
2714 handles.push(tokio::spawn(async move {
2715 for _ in 0..100 {
2716 l_clone.observe(Outcome::Success, Duration::from_millis(5));
2717 }
2718 }));
2719 }
2720 for h in handles {
2721 h.await.unwrap();
2722 }
2723 let cur = l.current();
2724 assert!(
2725 cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
2726 "cap out of bounds after concurrent observations: {cur}",
2727 );
2728 }
2729
2730 #[test]
2735 fn persisted_snapshot_warm_starts_above_cold_floor() {
2736 let dir = tempfile::tempdir().unwrap();
2737 let path = dir.path().join("client_adaptive.json");
2738 let saved = ChannelStart {
2741 quote: 64,
2742 store: 32,
2743 fetch: 128,
2744 };
2745 save_snapshot(&path, saved);
2746 let loaded = load_snapshot(&path).unwrap();
2747
2748 let low = ChannelStart {
2751 quote: 2,
2752 store: 2,
2753 fetch: 2,
2754 };
2755 let c = AdaptiveController::new(low, AdaptiveConfig::default());
2756 c.warm_start(loaded);
2757 assert_eq!(c.quote.current(), 64);
2758 assert_eq!(c.store.current(), 32);
2759 assert_eq!(c.fetch.current(), 128);
2760 }
2761
2762 #[test]
2766 fn save_load_round_trip_with_concurrent_writes() {
2767 use std::thread;
2768 let dir = tempfile::tempdir().unwrap();
2769 let path = dir.path().join("client_adaptive.json");
2770 let path_a = path.clone();
2771 let path_b = path.clone();
2772 let snap_a = ChannelStart {
2773 quote: 10,
2774 store: 10,
2775 fetch: 10,
2776 };
2777 let snap_b = ChannelStart {
2778 quote: 99,
2779 store: 99,
2780 fetch: 99,
2781 };
2782 let h_a = thread::spawn(move || {
2783 for _ in 0..50 {
2784 save_snapshot(&path_a, snap_a);
2785 }
2786 });
2787 let h_b = thread::spawn(move || {
2788 for _ in 0..50 {
2789 save_snapshot(&path_b, snap_b);
2790 }
2791 });
2792 h_a.join().unwrap();
2793 h_b.join().unwrap();
2794 let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
2795 let valid = (loaded.quote == snap_a.quote
2796 && loaded.store == snap_a.store
2797 && loaded.fetch == snap_a.fetch)
2798 || (loaded.quote == snap_b.quote
2799 && loaded.store == snap_b.store
2800 && loaded.fetch == snap_b.fetch);
2801 assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
2802 }
2803
2804 #[test]
2807 fn save_snapshot_to_unwritable_dir_does_not_panic() {
2808 let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
2812 let snap = ChannelStart {
2813 quote: 1,
2814 store: 1,
2815 fetch: 1,
2816 };
2817 save_snapshot(&path, snap);
2819 assert!(!path.exists());
2821 }
2822
2823 #[test]
2826 fn load_snapshot_from_truncated_file_returns_none() {
2827 let dir = tempfile::tempdir().unwrap();
2828 let path = dir.path().join("truncated.json");
2829 std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
2830 assert!(load_snapshot(&path).is_none());
2831 }
2832
2833 #[test]
2837 fn controller_perf_overhead_is_bounded() {
2838 let cfg = cfg_for_tests();
2839 let l = Limiter::new(8, cfg);
2840 let started = Instant::now();
2841 for _ in 0..100_000 {
2842 let _ = l.current();
2843 l.observe(Outcome::Success, Duration::from_micros(1));
2844 }
2845 let elapsed = started.elapsed();
2846 assert!(
2849 elapsed < Duration::from_millis(500),
2850 "100k observe+current pairs took {elapsed:?}, expected <500ms",
2851 );
2852 }
2853
2854 #[test]
2862 fn nan_and_out_of_range_config_does_not_panic() {
2863 let cfg = AdaptiveConfig {
2864 enabled: true,
2865 min_concurrency: 0, max: ChannelMax {
2867 quote: 0, store: 0,
2869 fetch: 0,
2870 },
2871 window_ops: 10,
2872 min_window_ops: 50, success_target: f64::NAN,
2874 timeout_ceiling: f64::INFINITY,
2875 latency_inflation_factor: f64::NEG_INFINITY,
2876 latency_ewma_alpha: f64::NAN,
2877 };
2878 let c = AdaptiveController::new(ChannelStart::default(), cfg);
2879 let post = &c.config;
2883 assert_eq!(
2884 post.min_concurrency, 1,
2885 "sanitize did not raise min_concurrency from 0"
2886 );
2887 assert!(
2888 post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
2889 "sanitize did not clamp success_target from NaN: {}",
2890 post.success_target
2891 );
2892 assert!(
2893 post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
2894 "sanitize did not clamp timeout_ceiling from Inf: {}",
2895 post.timeout_ceiling
2896 );
2897 assert!(
2898 post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
2899 "sanitize did not fix latency_inflation_factor from -Inf: {}",
2900 post.latency_inflation_factor
2901 );
2902 assert!(
2903 post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
2904 "sanitize did not fix latency_ewma_alpha from NaN: {}",
2905 post.latency_ewma_alpha
2906 );
2907 assert!(
2908 post.min_window_ops <= post.window_ops,
2909 "sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
2910 post.min_window_ops,
2911 post.window_ops
2912 );
2913 assert!(
2914 post.max.quote >= post.min_concurrency,
2915 "max.quote below min_concurrency"
2916 );
2917 for _ in 0..200 {
2920 c.store
2921 .observe(Outcome::Success, Duration::from_secs(99_999));
2922 c.store.observe(Outcome::Timeout, Duration::ZERO);
2923 }
2924 let cur = c.store.current();
2925 assert!(cur >= 1, "cap below floor: {cur}");
2926 }
2927
2928 #[test]
2935 fn transient_burst_does_not_pile_drive_to_floor() {
2936 let cfg = LimiterConfig {
2937 window_ops: 32,
2938 min_window_ops: 8,
2939 success_target: 0.95,
2940 timeout_ceiling: 0.10,
2941 ..cfg_for_tests()
2942 };
2943 let l = Limiter::new(32, cfg);
2944 for _ in 0..8 {
2948 l.observe(Outcome::Timeout, Duration::from_millis(10));
2949 }
2950 let after_burst = l.current();
2953 assert!(
2954 after_burst >= 16,
2955 "transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
2956 );
2957 }
2958
2959 #[tokio::test]
2964 async fn transport_errors_classify_as_capacity_signal() {
2965 use crate::data::client::classify_error;
2966 use crate::data::error::Error;
2967 let make_cfg = || LimiterConfig {
2968 window_ops: 16,
2969 min_window_ops: 5,
2970 success_target: 0.5,
2971 timeout_ceiling: 0.5,
2972 ..cfg_for_tests()
2973 };
2974 type ErrFactory = Box<dyn Fn() -> Error>;
2976 let cases: Vec<(&str, ErrFactory)> = vec![
2977 ("Network", Box::new(|| Error::Network("net".to_string()))),
2978 (
2979 "InsufficientPeers",
2980 Box::new(|| Error::InsufficientPeers("ip".to_string())),
2981 ),
2982 ("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
2983 ("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
2984 ("Storage", Box::new(|| Error::Storage("s".to_string()))),
2985 (
2986 "PartialUpload",
2987 Box::new(|| Error::PartialUpload {
2988 stored: vec![],
2989 stored_count: 0,
2990 failed: vec![],
2991 failed_count: 0,
2992 total_chunks: 0,
2993 spend: Box::new(crate::data::error::PartialUploadSpend {
2994 storage_cost_atto: "0".to_string(),
2995 gas_cost_wei: 0,
2996 }),
2997 reason: "r".to_string(),
2998 }),
2999 ),
3000 ];
3001 for (name, mk) in &cases {
3002 let l = Limiter::new(8, make_cfg());
3003 for _ in 0..16 {
3004 let _: std::result::Result<(), Error> =
3005 observe_op(&l, || async { Err(mk()) }, classify_error).await;
3006 }
3007 let cur = l.current();
3011 assert!(
3012 cur < 8,
3013 "{name} not classified as capacity signal: cap stayed at {cur}",
3014 );
3015 }
3016 }
3017
3018 #[test]
3022 fn per_channel_ceilings_are_independent() {
3023 let cfg = AdaptiveConfig {
3024 max: ChannelMax {
3025 quote: 4, store: 8, fetch: 1024, },
3029 ..AdaptiveConfig::default()
3030 };
3031 let c = AdaptiveController::new(
3032 ChannelStart {
3033 quote: 4,
3034 store: 8,
3035 fetch: 64,
3036 },
3037 cfg,
3038 );
3039 for _ in 0..1000 {
3042 c.quote.observe(Outcome::Success, Duration::from_micros(10));
3043 c.store.observe(Outcome::Success, Duration::from_micros(10));
3044 c.fetch.observe(Outcome::Success, Duration::from_micros(10));
3045 }
3046 assert_eq!(c.quote.current(), 4, "quote should cap at 4");
3047 assert_eq!(c.store.current(), 8, "store should cap at 8");
3048 assert!(
3052 c.fetch.current() > 8 && c.fetch.current() <= 1024,
3053 "fetch did not use its independent ceiling; got {}",
3054 c.fetch.current()
3055 );
3056 }
3057
3058 #[test]
3059 fn fetch_hill_rejects_upward_probe_without_goodput_gain() {
3060 let cfg = hill_cfg_for_tests();
3061 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3062
3063 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3064 assert_eq!(
3065 l.current(),
3066 HILL_TEST_UP_PROBE_CAP,
3067 "first healthy epoch should probe upward"
3068 );
3069
3070 observe_hill_success_epoch_with_latency(
3071 &l,
3072 &cfg,
3073 HILL_TEST_CHUNK_BYTES,
3074 Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
3075 );
3076 assert_eq!(
3077 l.current(),
3078 HILL_TEST_START_CAP,
3079 "slower higher-cap wave should reject the upward probe"
3080 );
3081 assert_eq!(l.snapshot(), HILL_TEST_START_CAP);
3082 }
3083
3084 #[test]
3085 fn fetch_hill_accepts_upward_probe_with_goodput_gain() {
3086 let cfg = hill_cfg_for_tests();
3087 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3088
3089 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3090 assert_eq!(l.current(), HILL_TEST_UP_PROBE_CAP);
3091
3092 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3093 assert_eq!(
3094 l.snapshot(),
3095 HILL_TEST_UP_PROBE_CAP,
3096 "same-size chunks at same latency should promote the higher cap"
3097 );
3098 assert_eq!(
3099 l.current(),
3100 HILL_TEST_NEXT_UP_PROBE_CAP,
3101 "after accepting an upward probe, hill climber should probe higher"
3102 );
3103 }
3104
3105 #[test]
3106 fn fetch_hill_accepts_lower_probe_when_goodput_is_retained() {
3107 let cfg = hill_cfg_for_tests();
3108 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3109
3110 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3111 observe_hill_success_epoch_with_latency(
3112 &l,
3113 &cfg,
3114 HILL_TEST_CHUNK_BYTES,
3115 Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
3116 );
3117 assert_eq!(l.current(), HILL_TEST_START_CAP);
3118
3119 for _ in 0..(HILL_REJECT_COOLDOWN_EPOCHS + HILL_STABLE_PROBE_EPOCHS) {
3120 observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3121 }
3122 assert_eq!(
3123 l.current(),
3124 HILL_TEST_DOWN_PROBE_CAP,
3125 "stable best should eventually probe a lower cap"
3126 );
3127
3128 observe_hill_success_epoch_with_latency(
3129 &l,
3130 &cfg,
3131 HILL_TEST_CHUNK_BYTES,
3132 Duration::from_millis(HILL_TEST_RETAINED_DOWN_LATENCY_MS),
3133 );
3134 assert_eq!(
3135 l.snapshot(),
3136 HILL_TEST_DOWN_PROBE_CAP,
3137 "retained goodput at lower concurrency should become the new best"
3138 );
3139 }
3140
3141 #[tokio::test]
3142 async fn fetch_hill_records_constant_size_timed_ops_without_stress() {
3143 let cfg = hill_cfg_for_tests();
3144 let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3145 let total_ops = hill_epoch_target_samples(HILL_TEST_START_CAP, &cfg)
3146 + hill_epoch_target_samples(HILL_TEST_UP_PROBE_CAP, &cfg);
3147 let limiter_for_ops = l.clone();
3148
3149 let result: std::result::Result<Vec<()>, ()> =
3150 rebucketed_unordered(&l, 0..total_ops, move |_| {
3151 let limiter = limiter_for_ops.clone();
3152 async move {
3153 observe_op_with_success_bytes(
3154 &limiter,
3155 || async {
3156 tokio::time::sleep(Duration::from_millis(HILL_TEST_ASYNC_LATENCY_MS))
3157 .await;
3158 Ok::<(), ()>(())
3159 },
3160 |_| Outcome::NetworkError,
3161 |_| HILL_TEST_CHUNK_BYTES,
3162 )
3163 .await
3164 }
3165 })
3166 .await;
3167 result.unwrap();
3168
3169 let snapshot = l.snapshot();
3174 assert!(
3175 matches!(snapshot, HILL_TEST_START_CAP | HILL_TEST_UP_PROBE_CAP),
3176 "timed successes should finish at the existing or accepted best cap, got {snapshot}"
3177 );
3178 let current = l.current();
3179 assert!(
3180 matches!(current, HILL_TEST_START_CAP | HILL_TEST_NEXT_UP_PROBE_CAP),
3181 "timed successes should leave the controller unstressed, got {current}"
3182 );
3183 }
3184
3185 #[test]
3186 fn fetch_hill_stress_cuts_before_full_epoch() {
3187 let cfg = LimiterConfig {
3188 window_ops: 8,
3189 min_window_ops: 4,
3190 ..hill_cfg_for_tests()
3191 };
3192 let l = fetch_hill_for_tests(16, cfg.clone());
3193
3194 for _ in 0..cfg.min_window_ops {
3195 l.observe(Outcome::Timeout, Duration::from_millis(10));
3196 }
3197
3198 assert_eq!(
3199 l.current(),
3200 8,
3201 "fetch hill climber should halve on early stress"
3202 );
3203 }
3204
3205 #[test]
3209 fn cold_start_at_least_prior_static_defaults() {
3210 let cs = ChannelStart::default();
3211 assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
3212 assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
3213 assert_eq!(
3214 cs.fetch, FETCH_COLD_START_CONCURRENCY,
3215 "fetch cold-start changed unexpectedly"
3216 );
3217 }
3218
3219 #[test]
3232 fn sustained_stress_reaches_floor_within_bounded_ops() {
3233 let cfg = LimiterConfig {
3234 window_ops: 32,
3235 min_window_ops: 8,
3236 success_target: 0.95,
3237 timeout_ceiling: 0.10,
3238 max_concurrency: 64,
3239 ..cfg_for_tests()
3240 };
3241 let l = Limiter::new(64, cfg);
3242 let mut ops = 0usize;
3243 while l.current() > 1 && ops < 200 {
3244 l.observe(Outcome::Timeout, Duration::from_millis(10));
3245 ops += 1;
3246 }
3247 assert_eq!(
3248 l.current(),
3249 1,
3250 "controller did not reach floor within 200 observations under \
3251 sustained timeout stress; took {ops} ops, ended at cap {}",
3252 l.current()
3253 );
3254 }
3255
3256 #[test]
3261 fn default_controller_has_growth_headroom() {
3262 let c = AdaptiveController::default();
3263 let cs = ChannelStart::default();
3264 let max = ChannelMax::default();
3265 assert_eq!(c.quote.current(), cs.quote);
3266 assert_eq!(c.store.current(), cs.store);
3267 assert_eq!(c.fetch.current(), cs.fetch);
3268 assert!(
3269 max.quote > cs.quote,
3270 "no growth headroom for quote: max={} start={}",
3271 max.quote,
3272 cs.quote
3273 );
3274 assert!(
3275 max.store > cs.store,
3276 "no growth headroom for store: max={} start={}",
3277 max.store,
3278 cs.store
3279 );
3280 assert!(
3281 max.fetch > cs.fetch,
3282 "no growth headroom for fetch: max={} start={}",
3283 max.fetch,
3284 cs.fetch
3285 );
3286 }
3287
3288 #[test]
3295 fn warm_start_floors_at_cold_defaults() {
3296 let c = AdaptiveController::default();
3297 let cold = ChannelStart::default();
3298 let bad_snap = ChannelStart {
3300 quote: 1,
3301 store: 1,
3302 fetch: 1,
3303 };
3304 c.warm_start(bad_snap);
3305 assert_eq!(
3308 c.quote.current(),
3309 cold.quote,
3310 "quote warm_start did not floor at cold default"
3311 );
3312 assert_eq!(
3313 c.store.current(),
3314 cold.store,
3315 "store warm_start did not floor at cold default"
3316 );
3317 assert_eq!(
3318 c.fetch.current(),
3319 cold.fetch,
3320 "fetch warm_start did not floor at cold default"
3321 );
3322 }
3323
3324 #[test]
3327 fn warm_start_honors_values_above_cold_floor() {
3328 let c = AdaptiveController::default();
3329 let cold = ChannelStart::default();
3330 let snap = ChannelStart {
3331 quote: cold.quote * 2,
3332 store: cold.store * 4,
3333 fetch: cold.fetch * 2,
3334 };
3335 c.warm_start(snap);
3336 assert_eq!(c.quote.current(), snap.quote);
3337 assert_eq!(c.store.current(), snap.store);
3338 assert_eq!(c.fetch.current(), snap.fetch);
3339 }
3340
3341 #[tokio::test]
3348 async fn rebucketed_picks_up_cap_changes_mid_stream() {
3349 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3350 use std::sync::Arc as StdArc;
3351 let cfg = LimiterConfig {
3352 min_concurrency: 1,
3353 max_concurrency: 32,
3354 ..cfg_for_tests()
3355 };
3356 let l = Limiter::new(4, cfg);
3357 let max_seen = StdArc::new(AtomicUsize::new(0));
3358 let in_flight = StdArc::new(AtomicUsize::new(0));
3359 let processed = StdArc::new(AtomicUsize::new(0));
3360 let l_for_bump = l.clone();
3361 let processed_for_bump = processed.clone();
3362 let bump_handle = tokio::spawn(async move {
3365 loop {
3366 tokio::time::sleep(Duration::from_millis(2)).await;
3367 if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
3368 l_for_bump.warm_start(16);
3369 return;
3370 }
3371 }
3372 });
3373 let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
3374 let max_seen = max_seen.clone();
3375 let in_flight = in_flight.clone();
3376 let processed = processed.clone();
3377 async move {
3378 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3379 max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
3380 tokio::time::sleep(Duration::from_millis(1)).await;
3381 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3382 processed.fetch_add(1, AtomicOrdering::Relaxed);
3383 Ok::<(), &'static str>(())
3384 }
3385 })
3386 .await
3387 .unwrap();
3388 bump_handle.await.unwrap();
3389 let peak = max_seen.load(AtomicOrdering::Relaxed);
3393 assert!(
3394 peak > 4,
3395 "rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
3396 );
3397 }
3398
3399 #[tokio::test]
3408 async fn observe_op_cancellation_drops_silently() {
3409 let cfg = LimiterConfig {
3410 window_ops: 16,
3411 min_window_ops: 4,
3412 ..cfg_for_tests()
3413 };
3414 let l = Limiter::new(4, cfg);
3415 let l_clone = l.clone();
3419 let fut = observe_op(
3420 &l_clone,
3421 || async {
3422 std::future::pending::<()>().await;
3423 Ok::<(), &'static str>(())
3424 },
3425 |_| Outcome::Timeout,
3426 );
3427 drop(fut);
3428 assert_eq!(l.current(), 4, "cancelled op moved the cap");
3430 for _ in 0..16 {
3435 let _: Result<(), &'static str> = observe_op(
3436 &l,
3437 || async { Ok(()) },
3438 |_| Outcome::NetworkError,
3440 )
3441 .await;
3442 }
3443 assert!(
3446 l.current() > 4,
3447 "cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
3448 l.current(),
3449 );
3450 }
3451
3452 #[test]
3459 fn save_snapshot_is_synchronous_and_durable() {
3460 let dir = tempfile::tempdir().unwrap();
3461 let path = dir.path().join("client_adaptive.json");
3462 let snap = ChannelStart {
3463 quote: 100,
3464 store: 50,
3465 fetch: 200,
3466 };
3467 save_snapshot(&path, snap);
3468 assert!(
3471 path.exists(),
3472 "save_snapshot did not write file synchronously"
3473 );
3474 let loaded = load_snapshot(&path).unwrap();
3475 assert_eq!(loaded.quote, 100);
3476 assert_eq!(loaded.store, 50);
3477 assert_eq!(loaded.fetch, 200);
3478 }
3479
3480 #[tokio::test]
3487 async fn warm_start_disables_slow_start_doubling() {
3488 let cfg = LimiterConfig {
3489 window_ops: 8,
3490 min_window_ops: 4,
3491 success_target: 0.9,
3492 ..cfg_for_tests()
3493 };
3494 let l = Limiter::new(2, cfg.clone());
3495 l.warm_start(16);
3498 assert_eq!(l.current(), 16);
3499 for _ in 0..cfg.window_ops {
3502 l.observe(Outcome::Success, Duration::from_millis(10));
3503 }
3504 assert_eq!(
3505 l.current(),
3506 17,
3507 "warm-start triggered slow-start doubling instead of additive +1"
3508 );
3509 }
3510
3511 #[test]
3516 fn controller_warm_start_floors_at_per_instance_cold_start() {
3517 let custom_cold = ChannelStart {
3518 quote: 2,
3519 store: 1,
3520 fetch: 4,
3521 };
3522 let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
3523 c.warm_start(ChannelStart {
3525 quote: 1,
3526 store: 1,
3527 fetch: 1,
3528 });
3529 assert_eq!(c.quote.current(), 2);
3530 assert_eq!(c.store.current(), 1);
3531 assert_eq!(c.fetch.current(), 4);
3532 c.warm_start(ChannelStart {
3534 quote: 10,
3535 store: 10,
3536 fetch: 10,
3537 });
3538 assert_eq!(c.quote.current(), 10);
3539 assert_eq!(c.store.current(), 10);
3540 assert_eq!(c.fetch.current(), 10);
3541 }
3542
3543 #[test]
3547 fn warm_start_is_noop_when_adaptive_disabled() {
3548 let cfg = AdaptiveConfig {
3549 enabled: false,
3550 ..AdaptiveConfig::default()
3551 };
3552 let custom_cold = ChannelStart {
3553 quote: 5,
3554 store: 5,
3555 fetch: 5,
3556 };
3557 let c = AdaptiveController::new(custom_cold, cfg);
3558 c.warm_start(ChannelStart {
3559 quote: 100,
3560 store: 100,
3561 fetch: 100,
3562 });
3563 assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
3564 assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
3565 assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
3566 }
3567
3568 #[tokio::test]
3572 async fn rebucketed_unordered_is_rolling_not_fenced() {
3573 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3574 use std::sync::Arc as StdArc;
3575 let cfg = LimiterConfig {
3576 min_concurrency: 1,
3577 max_concurrency: 8,
3578 window_ops: 100,
3579 min_window_ops: 50,
3580 ..cfg_for_tests()
3581 };
3582 let l = Limiter::new(4, cfg);
3583 let in_flight = StdArc::new(AtomicUsize::new(0));
3584 let max_in_flight = StdArc::new(AtomicUsize::new(0));
3585 let started = StdArc::new(AtomicUsize::new(0));
3586 let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
3587 let in_flight = in_flight.clone();
3588 let max_in_flight = max_in_flight.clone();
3589 let started = started.clone();
3590 async move {
3591 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3592 max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
3593 started.fetch_add(1, AtomicOrdering::Relaxed);
3594 if i == 0 {
3600 tokio::time::sleep(Duration::from_millis(50)).await;
3601 } else {
3602 tokio::time::sleep(Duration::from_millis(1)).await;
3603 }
3604 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3605 Ok::<(), &'static str>(())
3606 }
3607 })
3608 .await
3609 .unwrap();
3610 assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
3613 let peak = max_in_flight.load(AtomicOrdering::Relaxed);
3614 assert!(
3615 peak >= 4,
3616 "rolling scheduler did not fill cap; peak in-flight = {peak}"
3617 );
3618 }
3619
3620 #[tokio::test]
3622 async fn rebucketed_ordered_preserves_input_order() {
3623 let cfg = LimiterConfig {
3624 min_concurrency: 1,
3625 max_concurrency: 4,
3626 ..cfg_for_tests()
3627 };
3628 let l = Limiter::new(4, cfg);
3629 let items: Vec<usize> = (0..50).collect();
3630 let result: Vec<usize> = rebucketed_ordered(
3631 &l,
3632 items.iter().copied().enumerate(),
3633 |(idx, v)| async move {
3634 let delay = (50 - v) as u64;
3636 tokio::time::sleep(Duration::from_micros(delay)).await;
3637 Ok::<_, &'static str>((idx, v * 10))
3638 },
3639 )
3640 .await
3641 .unwrap();
3642 assert_eq!(result.len(), 50);
3643 for (i, v) in result.iter().enumerate() {
3644 assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
3645 }
3646 }
3647
3648 #[tokio::test]
3653 async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
3654 let cfg = LimiterConfig {
3655 min_concurrency: 1,
3656 max_concurrency: 8,
3657 ..cfg_for_tests()
3658 };
3659 let l = Limiter::new(8, cfg);
3660 let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
3665 let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
3666 let delay = (40 - idx) as u64; tokio::time::sleep(Duration::from_micros(delay)).await;
3668 Ok::<_, &'static str>((idx, hash * 7))
3670 })
3671 .await
3672 .unwrap();
3673 for (i, v) in result.iter().enumerate() {
3674 let expected = (1000 + i as u64) * 7;
3675 assert_eq!(
3676 *v, expected,
3677 "idx {i} paired with wrong content: {v}, expected {expected}"
3678 );
3679 }
3680 }
3681
3682 #[test]
3686 fn save_snapshot_temp_file_is_unique_per_call() {
3687 let dir = tempfile::tempdir().unwrap();
3688 let path = dir.path().join("client_adaptive.json");
3689 for i in 0..100 {
3696 save_snapshot(
3697 &path,
3698 ChannelStart {
3699 quote: i + 1,
3700 store: i + 1,
3701 fetch: i + 1,
3702 },
3703 );
3704 }
3705 let loaded = load_snapshot(&path).unwrap();
3706 assert_eq!(loaded.quote, 100);
3707 assert_eq!(loaded.store, 100);
3708 assert_eq!(loaded.fetch, 100);
3709 let leftover: Vec<_> = std::fs::read_dir(dir.path())
3711 .unwrap()
3712 .filter_map(|e| e.ok())
3713 .filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
3714 .collect();
3715 assert!(
3716 leftover.is_empty(),
3717 "temp files leaked: {:?}",
3718 leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
3719 );
3720 }
3721
3722 #[tokio::test]
3727 async fn rebucketed_empty_input_returns_empty() {
3728 let cfg = cfg_for_tests();
3729 let l = Limiter::new(4, cfg);
3730 let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
3731 Ok::<_, &'static str>(42usize)
3732 })
3733 .await
3734 .unwrap();
3735 assert!(v.is_empty());
3736 let v: Vec<usize> = rebucketed_ordered(
3737 &l,
3738 std::iter::empty::<(usize, ())>(),
3739 |(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
3740 )
3741 .await
3742 .unwrap();
3743 assert!(v.is_empty());
3744 }
3745
3746 #[tokio::test]
3748 async fn rebucketed_exactly_cap_items() {
3749 let cfg = LimiterConfig {
3750 min_concurrency: 1,
3751 max_concurrency: 4,
3752 ..cfg_for_tests()
3753 };
3754 let l = Limiter::new(4, cfg);
3755 let v: Vec<usize> =
3756 rebucketed_unordered(
3757 &l,
3758 0..4usize,
3759 |i| async move { Ok::<_, &'static str>(i * 2) },
3760 )
3761 .await
3762 .unwrap();
3763 assert_eq!(v.len(), 4);
3764 }
3765
3766 #[tokio::test]
3769 async fn rebucketed_preserves_first_error() {
3770 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3771 use std::sync::Arc as StdArc;
3772 let cfg = LimiterConfig {
3773 min_concurrency: 1,
3774 max_concurrency: 4,
3775 ..cfg_for_tests()
3776 };
3777 let l = Limiter::new(4, cfg);
3778 let started = StdArc::new(AtomicUsize::new(0));
3779 let started_clone = started.clone();
3780 let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
3781 let started = started_clone.clone();
3782 async move {
3783 started.fetch_add(1, AtomicOrdering::Relaxed);
3784 if i == 5 {
3785 tokio::time::sleep(Duration::from_micros(100)).await;
3788 return Err("first error");
3789 }
3790 if i == 10 {
3791 return Err("second error - should be ignored");
3792 }
3793 tokio::time::sleep(Duration::from_micros(50)).await;
3794 Ok(())
3795 }
3796 })
3797 .await;
3798 match result {
3799 Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
3800 Ok(_) => panic!("expected error, got ok"),
3801 }
3802 let total = started.load(AtomicOrdering::Relaxed);
3808 assert!(
3809 (5..20).contains(&total),
3810 "started count out of range: {total}"
3811 );
3812 }
3813
3814 #[test]
3817 fn limiter_with_min_equal_max_is_pinned() {
3818 let cfg = LimiterConfig {
3819 min_concurrency: 5,
3820 max_concurrency: 5,
3821 ..cfg_for_tests()
3822 };
3823 let l = Limiter::new(5, cfg);
3824 for _ in 0..1000 {
3825 l.observe(Outcome::Success, Duration::from_millis(1));
3826 }
3827 assert_eq!(l.current(), 5, "cap moved despite min==max");
3828 for _ in 0..1000 {
3829 l.observe(Outcome::Timeout, Duration::from_millis(50));
3830 }
3831 assert_eq!(l.current(), 5, "cap moved despite min==max");
3832 }
3833
3834 #[test]
3837 fn ewma_alpha_zero_returns_prev() {
3838 let prev = Duration::from_millis(100);
3839 let sample = Duration::from_millis(500);
3840 let result = ewma(prev, sample, 0.0);
3841 assert_eq!(result, prev, "alpha=0 must return prev unchanged");
3842 }
3843
3844 #[test]
3847 fn ewma_alpha_one_returns_sample() {
3848 let prev = Duration::from_millis(100);
3849 let sample = Duration::from_millis(500);
3850 let result = ewma(prev, sample, 1.0);
3851 let diff = result.abs_diff(sample);
3853 assert!(
3854 diff <= Duration::from_millis(1),
3855 "alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
3856 );
3857 }
3858
3859 #[test]
3861 fn ewma_alpha_half_returns_midpoint() {
3862 let prev = Duration::from_millis(200);
3863 let sample = Duration::from_millis(400);
3864 let result = ewma(prev, sample, 0.5);
3865 let expected = Duration::from_millis(300);
3866 let diff = result.abs_diff(expected);
3867 assert!(
3868 diff <= Duration::from_millis(1),
3869 "alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
3870 );
3871 }
3872
3873 #[test]
3877 fn ewma_nan_alpha_returns_prev() {
3878 let prev = Duration::from_millis(100);
3879 let sample = Duration::from_millis(500);
3880 let result = ewma(prev, sample, f64::NAN);
3881 assert_eq!(result, prev);
3882 let result = ewma(prev, sample, f64::INFINITY);
3883 assert_eq!(result, prev);
3884 let result = ewma(prev, sample, f64::NEG_INFINITY);
3885 assert_eq!(result, prev);
3886 }
3887
3888 #[test]
3891 fn ewma_clamps_alpha_above_one() {
3892 let prev = Duration::from_millis(100);
3893 let sample = Duration::from_millis(500);
3894 let result = ewma(prev, sample, 2.5);
3895 assert!(result >= Duration::from_millis(499));
3897 assert!(result <= Duration::from_millis(501));
3898 }
3899
3900 #[test]
3904 fn window_full_of_application_errors_does_not_move_cap() {
3905 let cfg = cfg_for_tests();
3906 let l = Limiter::new(8, cfg.clone());
3907 for _ in 0..(cfg.window_ops * 5) {
3908 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3909 }
3910 assert_eq!(
3911 l.current(),
3912 8,
3913 "cap moved on pure-app-error window; should hold"
3914 );
3915 }
3916
3917 #[test]
3921 fn disabled_adaptive_controller_truly_inert() {
3922 let cfg = AdaptiveConfig {
3923 enabled: false,
3924 ..AdaptiveConfig::default()
3925 };
3926 let c = AdaptiveController::new(ChannelStart::default(), cfg);
3927 let baseline_quote = c.quote.current();
3928 let baseline_store = c.store.current();
3929 let baseline_fetch = c.fetch.current();
3930 for _ in 0..10000 {
3931 c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
3932 c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3933 c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
3934 }
3935 assert_eq!(c.quote.current(), baseline_quote);
3936 assert_eq!(c.store.current(), baseline_store);
3937 assert_eq!(c.fetch.current(), baseline_fetch);
3938 }
3939
3940 #[test]
3945 fn channel_state_is_independent() {
3946 let c = AdaptiveController::default();
3947 let q0 = c.quote.current();
3948 let f0 = c.fetch.current();
3949 let s0 = c.store.current();
3950 for _ in 0..1000 {
3951 c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3952 }
3953 assert_eq!(
3955 c.store.current(),
3956 c.config.min_concurrency,
3957 "store did not reach floor after 1000 timeouts; cap={}",
3958 c.store.current()
3959 );
3960 assert!(c.store.current() < s0, "store cap did not move at all");
3961 assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
3963 assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
3964 }
3965
3966 #[test]
3972 fn sanitize_corrects_pathological_floats() {
3973 let mut cfg = AdaptiveConfig {
3974 success_target: f64::NAN,
3975 timeout_ceiling: 5.0,
3976 latency_inflation_factor: f64::NEG_INFINITY,
3977 latency_ewma_alpha: 2.5,
3978 window_ops: 4,
3979 min_window_ops: 10,
3980 ..AdaptiveConfig::default()
3981 };
3982 cfg.sanitize();
3983 assert!(cfg.success_target.is_finite());
3984 assert!((0.0..=1.0).contains(&cfg.success_target));
3985 assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
3986 assert!(cfg.latency_inflation_factor.is_finite());
3987 assert!(cfg.latency_inflation_factor > 0.0);
3988 assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
3989 assert!(
3990 cfg.min_window_ops <= cfg.window_ops,
3991 "min_window_ops {} > window_ops {}",
3992 cfg.min_window_ops,
3993 cfg.window_ops
3994 );
3995 }
3996
3997 #[test]
4002 fn channel_max_serde_round_trips() {
4003 let m = ChannelMax {
4004 quote: 7,
4005 store: 13,
4006 fetch: 200,
4007 };
4008 let json = serde_json::to_string(&m).unwrap();
4009 let back: ChannelMax = serde_json::from_str(&json).unwrap();
4010 assert_eq!(back.quote, 7);
4011 assert_eq!(back.store, 13);
4012 assert_eq!(back.fetch, 200);
4013 }
4014
4015 #[test]
4016 fn channel_start_serde_round_trips() {
4017 let s = ChannelStart {
4018 quote: 11,
4019 store: 22,
4020 fetch: 33,
4021 };
4022 let json = serde_json::to_string(&s).unwrap();
4023 let back: ChannelStart = serde_json::from_str(&json).unwrap();
4024 assert_eq!(back.quote, 11);
4025 assert_eq!(back.store, 22);
4026 assert_eq!(back.fetch, 33);
4027 }
4028
4029 #[tokio::test]
4034 async fn rebucketed_honors_cap_shrinkage_mid_stream() {
4035 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
4036 use std::sync::Arc as StdArc;
4037 let cfg = LimiterConfig {
4038 min_concurrency: 1,
4039 max_concurrency: 16,
4040 ..cfg_for_tests()
4041 };
4042 let l = Limiter::new(16, cfg);
4043 let in_flight = StdArc::new(AtomicUsize::new(0));
4044 let max_after_shrink = StdArc::new(AtomicUsize::new(0));
4045 let processed = StdArc::new(AtomicUsize::new(0));
4046 let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
4047 let l_for_shrink = l.clone();
4048 let p_for_shrink = processed.clone();
4049 let shrunk_for_shrink = shrunk.clone();
4050 let shrink_handle = tokio::spawn(async move {
4051 loop {
4053 tokio::time::sleep(Duration::from_millis(2)).await;
4054 if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
4055 l_for_shrink.warm_start(2);
4056 shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
4057 return;
4058 }
4059 }
4060 });
4061 let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
4062 let in_flight = in_flight.clone();
4063 let max_after_shrink = max_after_shrink.clone();
4064 let processed = processed.clone();
4065 let shrunk = shrunk.clone();
4066 async move {
4067 let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
4068 if shrunk.load(AtomicOrdering::Relaxed) {
4069 max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
4070 }
4071 tokio::time::sleep(Duration::from_millis(1)).await;
4072 in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
4073 processed.fetch_add(1, AtomicOrdering::Relaxed);
4074 Ok::<(), &'static str>(())
4075 }
4076 })
4077 .await
4078 .unwrap();
4079 shrink_handle.await.unwrap();
4080 let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
4081 assert!(
4086 peak <= 4,
4087 "rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
4088 );
4089 }
4090
4091 #[test]
4097 fn mixed_window_app_errors_with_capacity_signal() {
4098 let cfg = LimiterConfig {
4099 window_ops: 10,
4100 min_window_ops: 5,
4101 timeout_ceiling: 0.2,
4102 success_target: 0.9,
4103 ..cfg_for_tests()
4104 };
4105 let l = Limiter::new(8, cfg.clone());
4110 for _ in 0..5 {
4111 l.observe(Outcome::ApplicationError, Duration::from_millis(50));
4112 }
4113 for _ in 0..5 {
4114 l.observe(Outcome::Success, Duration::from_millis(50));
4115 }
4116 assert!(
4117 l.current() >= 8,
4118 "AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
4119 l.current()
4120 );
4121 let l2 = Limiter::new(8, cfg);
4124 for _ in 0..5 {
4125 l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
4126 }
4127 for _ in 0..5 {
4128 l2.observe(Outcome::Timeout, Duration::from_millis(50));
4129 }
4130 assert!(
4131 l2.current() < 8,
4132 "all-timeouts (with AppError padding) did not decrease cap; got {}",
4133 l2.current()
4134 );
4135 }
4136
4137 #[test]
4143 fn concurrent_save_load_no_torn_reads() {
4144 use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
4145 use std::thread;
4146 let dir = tempfile::tempdir().unwrap();
4147 let path = dir.path().join("snap.json");
4148 save_snapshot(
4150 &path,
4151 ChannelStart {
4152 quote: 1,
4153 store: 1,
4154 fetch: 1,
4155 },
4156 );
4157 let stop = std::sync::Arc::new(AtomicBool::new(false));
4158 let p_w = path.clone();
4159 let s_w = stop.clone();
4160 let writer = thread::spawn(move || {
4161 let mut i = 1usize;
4162 while !s_w.load(AtomicOrdering::Relaxed) {
4163 save_snapshot(
4164 &p_w,
4165 ChannelStart {
4166 quote: i,
4167 store: i,
4168 fetch: i,
4169 },
4170 );
4171 i = i.wrapping_add(1).max(1);
4172 }
4173 });
4174 let p_r = path.clone();
4175 let reader = thread::spawn(move || {
4176 let mut torn = 0usize;
4177 for _ in 0..2_000 {
4178 if let Some(snap) = load_snapshot(&p_r) {
4179 if snap.quote != snap.store || snap.store != snap.fetch {
4182 torn += 1;
4183 }
4184 }
4185 }
4186 torn
4187 });
4188 let torn = reader.join().unwrap();
4189 stop.store(true, AtomicOrdering::Relaxed);
4190 writer.join().unwrap();
4191 assert_eq!(
4192 torn, 0,
4193 "observed {torn} torn reads under concurrent writes"
4194 );
4195 }
4196
4197 #[test]
4205 fn save_with_timeout_returns_promptly_on_fast_failure() {
4206 let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
4207 let snap = ChannelStart {
4208 quote: 1,
4209 store: 1,
4210 fetch: 1,
4211 };
4212 let started = Instant::now();
4213 save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
4214 let elapsed = started.elapsed();
4215 assert!(
4218 elapsed < Duration::from_secs(1),
4219 "save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
4220 );
4221 }
4222
4223 #[test]
4228 fn save_with_timeout_bounds_wall_time_on_hang() {
4229 let dir = tempfile::tempdir().unwrap();
4241 let path = dir.path().join("snap.json");
4242 let snap = ChannelStart {
4243 quote: 1,
4244 store: 1,
4245 fetch: 1,
4246 };
4247 let started = Instant::now();
4248 save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
4251 let elapsed = started.elapsed();
4252 assert!(
4253 elapsed < Duration::from_millis(200),
4254 "timeout wrapper did not bound wall time: {elapsed:?}"
4255 );
4256 }
4257}