1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10const P95_SAMPLE_LIMIT: usize = 100;
11
12#[derive(Debug, Clone, Serialize, Deserialize, Default)]
14pub struct AgentMetrics {
15 pub total_runs: u32,
17 pub successful_runs: u32,
19 pub failed_runs: u32,
21 pub avg_duration_ms: u64,
23 pub p95_duration_ms: u64,
25 pub recent_success_rate: f32,
27 pub recent_avg_duration_ms: u64,
29 pub current_load: u32,
31 pub last_used_at: Option<DateTime<Utc>>,
33 #[serde(default, skip_serializing_if = "Vec::is_empty")]
35 pub duration_samples: Vec<u64>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize, Default)]
40pub struct CapabilityHealth {
41 pub success_count: u32,
43 pub failure_count: u32,
45 pub last_error_at: Option<DateTime<Utc>>,
47}
48
49impl CapabilityHealth {
50 pub fn success_rate(&self) -> f32 {
52 let total = self.success_count + self.failure_count;
53 if total == 0 {
54 return 0.5; }
56 self.success_count as f32 / total as f32
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize, Default)]
62pub struct AgentHealthState {
63 pub diseased_until: Option<DateTime<Utc>>,
65 pub consecutive_errors: u32,
67 pub total_lifetime_errors: u32,
69 pub capability_health: HashMap<String, CapabilityHealth>,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
76#[serde(rename_all = "snake_case")]
77pub enum AgentLifecycleState {
78 #[default]
80 Active,
81 Cordoned,
83 Draining,
85 Drained,
87}
88
89impl AgentLifecycleState {
90 pub fn as_str(&self) -> &'static str {
92 match self {
93 Self::Active => "Active",
94 Self::Cordoned => "Cordoned",
95 Self::Draining => "Draining",
96 Self::Drained => "Drained",
97 }
98 }
99
100 pub fn is_schedulable(&self) -> bool {
102 matches!(self, Self::Active)
103 }
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct AgentRuntimeState {
109 pub lifecycle: AgentLifecycleState,
111 pub drain_requested_at: Option<DateTime<Utc>>,
113 pub drain_timeout_secs: Option<u64>,
115 pub in_flight_items: u32,
117}
118
119impl Default for AgentRuntimeState {
120 fn default() -> Self {
121 Self {
122 lifecycle: AgentLifecycleState::Active,
123 drain_requested_at: None,
124 drain_timeout_secs: None,
125 in_flight_items: 0,
126 }
127 }
128}
129
130pub use orchestrator_config::selection::{SelectionStrategy, SelectionWeights};
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct SelectionRequirement {
135 pub capability: String,
137 pub strategy: SelectionStrategy,
139 pub weights: SelectionWeights,
141 pub max_load: u32,
143 pub consider_health: bool,
145 pub capability_aware: bool,
147}
148
149impl Default for SelectionRequirement {
150 fn default() -> Self {
151 Self {
152 capability: String::new(),
153 strategy: SelectionStrategy::Adaptive,
154 weights: SelectionWeights::default(),
155 max_load: 5,
156 consider_health: true,
157 capability_aware: true,
158 }
159 }
160}
161
162#[derive(Debug, Clone)]
164pub struct AgentScore {
166 pub agent_id: String,
168 pub total_score: f32,
170 pub cost_score: f32,
172 pub success_rate_score: f32,
174 pub performance_score: f32,
176 pub load_penalty: f32,
178 pub health_penalty: f32,
180}
181
182pub struct MetricsCollector;
184
185impl MetricsCollector {
186 pub fn new_agent_metrics() -> AgentMetrics {
188 AgentMetrics {
189 total_runs: 0,
190 successful_runs: 0,
191 failed_runs: 0,
192 avg_duration_ms: 0,
193 p95_duration_ms: 0,
194 recent_success_rate: 0.5,
195 recent_avg_duration_ms: 0,
196 current_load: 0,
197 last_used_at: None,
198 duration_samples: Vec::new(),
199 }
200 }
201
202 pub fn record_success(metrics: &mut AgentMetrics, duration_ms: u64) {
204 metrics.total_runs += 1;
205 metrics.successful_runs += 1;
206 metrics.last_used_at = Some(Utc::now());
207
208 if metrics.total_runs == 1 {
210 metrics.avg_duration_ms = duration_ms;
211 metrics.recent_avg_duration_ms = duration_ms;
212 } else {
213 let alpha = 0.3;
215 metrics.avg_duration_ms = ((1.0 - alpha) * metrics.avg_duration_ms as f64
216 + alpha * duration_ms as f64) as u64;
217 metrics.recent_avg_duration_ms = ((1.0 - alpha) * metrics.recent_avg_duration_ms as f64
218 + alpha * duration_ms as f64) as u64;
219 }
220
221 if metrics.duration_samples.len() >= P95_SAMPLE_LIMIT {
223 metrics.duration_samples.remove(0);
224 }
225 metrics.duration_samples.push(duration_ms);
226
227 let mut sorted = metrics.duration_samples.clone();
229 sorted.sort_unstable();
230 let idx = ((sorted.len() as f64) * 0.95).ceil() as usize;
231 metrics.p95_duration_ms = sorted[idx.min(sorted.len()) - 1];
232
233 let alpha = 0.3;
235 metrics.recent_success_rate = (1.0 - alpha) * metrics.recent_success_rate + alpha * 1.0;
236 }
237
238 pub fn record_failure(metrics: &mut AgentMetrics) {
240 metrics.total_runs += 1;
241 metrics.failed_runs += 1;
242 metrics.last_used_at = Some(Utc::now());
243
244 let alpha = 0.3;
246 metrics.recent_success_rate = (1.0 - alpha) * metrics.recent_success_rate + alpha * 0.0;
247 }
248
249 pub fn increment_load(metrics: &mut AgentMetrics) {
251 metrics.current_load += 1;
252 }
253
254 pub fn decrement_load(metrics: &mut AgentMetrics) {
256 if metrics.current_load > 0 {
257 metrics.current_load -= 1;
258 }
259 }
260}
261
262pub fn calculate_agent_score(
264 agent_id: &str,
265 cost: Option<u32>,
266 metrics: &Option<AgentMetrics>,
267 health: &Option<AgentHealthState>,
268 requirement: &SelectionRequirement,
269) -> AgentScore {
270 let cost_score = 100.0 - (cost.unwrap_or(50) as f32);
272
273 let success_rate_score = if let Some(m) = metrics {
275 if m.total_runs > 0 {
276 (m.successful_runs as f32 / m.total_runs as f32) * 100.0
277 } else {
278 m.recent_success_rate * 100.0
279 }
280 } else {
281 50.0 };
283
284 let performance_score = if let Some(m) = metrics {
287 if m.avg_duration_ms > 0 {
288 (60000.0 / m.avg_duration_ms as f32).min(100.0)
289 } else {
290 50.0
291 }
292 } else {
293 50.0
294 };
295
296 let load_penalty = if let Some(m) = metrics {
298 -(m.current_load as f32 * 10.0).min(50.0)
299 } else {
300 0.0
301 };
302
303 let health_penalty = if let Some(h) = health {
305 if !is_agent_globally_healthy(h) {
306 -100.0
307 } else if h.consecutive_errors > 0 {
308 -(h.consecutive_errors as f32 * 15.0)
309 } else {
310 0.0
311 }
312 } else {
313 0.0
314 };
315
316 let total_score = match requirement.strategy {
318 SelectionStrategy::CostBased => cost_score * 1.0,
319 SelectionStrategy::SuccessRateWeighted => cost_score * 0.2 + success_rate_score * 0.8,
320 SelectionStrategy::PerformanceFirst => {
321 cost_score * 0.2 + performance_score * 0.6 + success_rate_score * 0.2
322 }
323 SelectionStrategy::Adaptive => {
324 cost_score * requirement.weights.cost
325 + success_rate_score * requirement.weights.success_rate
326 + performance_score * requirement.weights.performance
327 + load_penalty * requirement.weights.load
328 + health_penalty
329 }
330 SelectionStrategy::LoadBalanced => {
331 cost_score * 0.2 + success_rate_score * 0.3 + (100.0 + load_penalty).max(0.0) * 0.5
332 }
333 SelectionStrategy::CapabilityAware => {
334 cost_score * 0.15
336 + success_rate_score * 0.35
337 + performance_score * 0.2
338 + health_penalty.max(-50.0) }
340 };
341
342 AgentScore {
343 agent_id: agent_id.to_string(),
344 total_score,
345 cost_score,
346 success_rate_score,
347 performance_score,
348 load_penalty,
349 health_penalty,
350 }
351}
352
353fn is_agent_globally_healthy(health: &AgentHealthState) -> bool {
355 match health.diseased_until {
356 None => true,
357 Some(until) => Utc::now() >= until,
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364
365 #[test]
366 fn test_new_agent_metrics() {
367 let metrics = MetricsCollector::new_agent_metrics();
368 assert_eq!(metrics.total_runs, 0);
369 assert_eq!(metrics.recent_success_rate, 0.5);
370 }
371
372 #[test]
373 fn test_record_success() {
374 let mut metrics = AgentMetrics::default();
375 MetricsCollector::record_success(&mut metrics, 1000);
376 assert_eq!(metrics.total_runs, 1);
377 assert_eq!(metrics.successful_runs, 1);
378 assert_eq!(metrics.avg_duration_ms, 1000);
379 }
380
381 #[test]
382 fn test_record_failure() {
383 let mut metrics = AgentMetrics::default();
384 MetricsCollector::record_failure(&mut metrics);
385 assert_eq!(metrics.total_runs, 1);
386 assert_eq!(metrics.failed_runs, 1);
387 }
388
389 #[test]
390 fn test_capability_health_rate() {
391 let cap_health = CapabilityHealth {
392 success_count: 8,
393 failure_count: 2,
394 last_error_at: None,
395 };
396 assert!((cap_health.success_rate() - 0.8).abs() < 0.001);
397 }
398
399 #[test]
400 fn test_agent_score_calculation() {
401 let cost = Some(30);
402 let metrics = Some(AgentMetrics {
403 total_runs: 10,
404 successful_runs: 8,
405 failed_runs: 2,
406 avg_duration_ms: 5000,
407 p95_duration_ms: 8000,
408 recent_success_rate: 0.8,
409 recent_avg_duration_ms: 5000,
410 current_load: 1,
411 last_used_at: None,
412 duration_samples: Vec::new(),
413 });
414 let health = Some(AgentHealthState::default());
415
416 let req = SelectionRequirement::default();
417 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &req);
418
419 assert!(score.total_score > 0.0);
420 assert_eq!(score.cost_score, 70.0); }
422
423 fn create_test_metrics(
425 total_runs: u32,
426 successful_runs: u32,
427 avg_duration_ms: u64,
428 current_load: u32,
429 ) -> AgentMetrics {
430 AgentMetrics {
431 total_runs,
432 successful_runs,
433 avg_duration_ms,
434 current_load,
435 ..Default::default()
436 }
437 }
438
439 #[test]
442 fn test_selection_strategy_cost_based() {
443 let cost = Some(25);
444 let metrics = Some(create_test_metrics(10, 8, 5000, 1));
445 let health = Some(AgentHealthState::default());
446 let requirement = SelectionRequirement {
447 strategy: SelectionStrategy::CostBased,
448 ..Default::default()
449 };
450
451 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
452
453 assert!((score.total_score - 75.0).abs() < 0.01);
455 assert!((score.cost_score - 75.0).abs() < 0.01);
456 }
457
458 #[test]
459 fn test_selection_strategy_success_rate_weighted() {
460 let cost = Some(30);
461 let metrics = Some(create_test_metrics(10, 8, 5000, 0));
463 let health = Some(AgentHealthState::default());
464 let requirement = SelectionRequirement {
465 strategy: SelectionStrategy::SuccessRateWeighted,
466 ..Default::default()
467 };
468
469 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
470
471 let expected = 70.0 * 0.2 + 80.0 * 0.8;
475 assert!((score.total_score - expected).abs() < 0.01);
476 }
477
478 #[test]
479 fn test_selection_strategy_performance_first() {
480 let cost = Some(20);
481 let metrics = Some(create_test_metrics(10, 8, 3000, 0));
483 let health = Some(AgentHealthState::default());
484 let requirement = SelectionRequirement {
485 strategy: SelectionStrategy::PerformanceFirst,
486 ..Default::default()
487 };
488
489 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
490
491 let expected = 80.0 * 0.2 + 20.0 * 0.6 + 80.0 * 0.2;
497 assert!((score.total_score - expected).abs() < 0.01);
498 }
499
500 #[test]
501 fn test_selection_strategy_load_balanced() {
502 let cost = Some(30);
503 let metrics = Some(create_test_metrics(10, 8, 5000, 3));
505 let health = Some(AgentHealthState::default());
506 let requirement = SelectionRequirement {
507 strategy: SelectionStrategy::LoadBalanced,
508 ..Default::default()
509 };
510
511 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
512
513 let expected = 70.0 * 0.2 + 80.0 * 0.3 + (100.0 + (-30.0_f32)).max(0.0) * 0.5;
520 assert!((score.total_score - expected).abs() < 0.01);
521 }
522
523 #[test]
524 fn test_load_balanced_score_never_negative() {
525 let metrics = Some(create_test_metrics(10, 0, 5000, 10));
527 let health = Some(AgentHealthState::default());
528 let requirement = SelectionRequirement {
529 strategy: SelectionStrategy::LoadBalanced,
530 ..Default::default()
531 };
532 let score = calculate_agent_score("overloaded", Some(99), &metrics, &health, &requirement);
534 assert!(
535 score.total_score >= 0.0,
536 "LoadBalanced score should never be negative, got {}",
537 score.total_score
538 );
539 }
540
541 #[test]
542 fn test_load_balanced_low_load_scores_higher() {
543 let health = Some(AgentHealthState::default());
544 let requirement = SelectionRequirement {
545 strategy: SelectionStrategy::LoadBalanced,
546 ..Default::default()
547 };
548
549 let metrics_a = Some(create_test_metrics(10, 8, 5000, 1));
551 let score_a = calculate_agent_score("agent_a", Some(30), &metrics_a, &health, &requirement);
552
553 let metrics_b = Some(create_test_metrics(10, 8, 5000, 4));
555 let score_b = calculate_agent_score("agent_b", Some(30), &metrics_b, &health, &requirement);
556
557 assert!(
558 score_a.total_score > score_b.total_score,
559 "Low-load agent should score higher: a={}, b={}",
560 score_a.total_score,
561 score_b.total_score
562 );
563 }
564
565 #[test]
566 fn test_selection_strategy_capability_aware() {
567 let cost = Some(40);
568 let metrics = Some(create_test_metrics(10, 9, 4000, 1));
569 let health = Some(AgentHealthState {
571 consecutive_errors: 2,
572 ..Default::default()
573 });
574 let requirement = SelectionRequirement {
575 strategy: SelectionStrategy::CapabilityAware,
576 ..Default::default()
577 };
578
579 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
580
581 let expected = 60.0 * 0.15 + 90.0 * 0.35 + 15.0 * 0.2 + (-30.0_f32).max(-50.0);
589 assert!((score.total_score - expected).abs() < 0.01);
590 }
591
592 #[test]
595 fn test_ema_convergence_success() {
596 let mut metrics = AgentMetrics::default();
597 for _ in 0..10 {
601 MetricsCollector::record_success(&mut metrics, 1000);
602 }
603
604 assert!(
606 metrics.recent_success_rate > 0.95,
607 "Expected rate > 0.95, got {}",
608 metrics.recent_success_rate
609 );
610 }
611
612 #[test]
613 fn test_ema_convergence_failure() {
614 let mut metrics = AgentMetrics::default();
615 for _ in 0..10 {
619 MetricsCollector::record_failure(&mut metrics);
620 }
621
622 assert!(
624 metrics.recent_success_rate < 0.05,
625 "Expected rate < 0.05, got {}",
626 metrics.recent_success_rate
627 );
628 }
629
630 #[test]
631 fn test_ema_convergence_mixed() {
632 let mut metrics = AgentMetrics::default();
633 for _ in 0..5 {
637 MetricsCollector::record_success(&mut metrics, 1000);
638 }
639 let rate_after_success = metrics.recent_success_rate;
640
641 for _ in 0..3 {
643 MetricsCollector::record_failure(&mut metrics);
644 }
645
646 assert!(
648 metrics.recent_success_rate < rate_after_success,
649 "Rate should decrease after failures: before={}, after={}",
650 rate_after_success,
651 metrics.recent_success_rate
652 );
653
654 assert!(metrics.recent_success_rate > 0.0);
656 }
657
658 #[test]
661 fn test_boundary_zero_total_runs() {
662 let cost = Some(30);
663 let metrics = Some(AgentMetrics {
665 total_runs: 0,
666 recent_success_rate: 0.5,
667 ..Default::default()
668 });
669 let health = Some(AgentHealthState::default());
670 let requirement = SelectionRequirement {
671 strategy: SelectionStrategy::SuccessRateWeighted,
672 ..Default::default()
673 };
674
675 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
676
677 let expected = 70.0 * 0.2 + 50.0 * 0.8;
681 assert!((score.success_rate_score - 50.0).abs() < 0.01);
682 assert!((score.total_score - expected).abs() < 0.01);
683 }
684
685 #[test]
686 fn test_boundary_none_metrics() {
687 let cost = Some(40);
688 let metrics: Option<AgentMetrics> = None;
689 let health = Some(AgentHealthState::default());
690 let requirement = SelectionRequirement {
691 strategy: SelectionStrategy::PerformanceFirst,
692 ..Default::default()
693 };
694
695 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
696
697 assert!((score.success_rate_score - 50.0).abs() < 0.01);
703 assert!((score.performance_score - 50.0).abs() < 0.01);
704 let expected = 60.0 * 0.2 + 50.0 * 0.6 + 50.0 * 0.2;
705 assert!((score.total_score - expected).abs() < 0.01);
706 }
707
708 #[test]
709 fn test_boundary_none_health() {
710 let cost = Some(30);
711 let metrics = Some(create_test_metrics(10, 8, 5000, 0));
712 let health: Option<AgentHealthState> = None;
713 let requirement = SelectionRequirement::default();
714
715 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
716
717 assert!((score.health_penalty - 0.0).abs() < 0.01);
719 }
720
721 #[test]
722 fn test_boundary_max_load() {
723 let cost = Some(30);
724 let metrics = Some(create_test_metrics(10, 8, 5000, 10));
726 let health = Some(AgentHealthState::default());
727 let requirement = SelectionRequirement::default();
728
729 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
730
731 assert!(
733 (score.load_penalty - (-50.0)).abs() < 0.01,
734 "Expected load_penalty -50.0, got {}",
735 score.load_penalty
736 );
737 }
738
739 #[test]
742 fn test_health_penalty_diseased_agent() {
743 let cost = Some(30);
744 let metrics = Some(create_test_metrics(10, 8, 5000, 0));
745 let health = Some(AgentHealthState {
747 diseased_until: Some(Utc::now() + chrono::Duration::seconds(3600)),
748 ..Default::default()
749 });
750 let requirement = SelectionRequirement::default();
751
752 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
753
754 assert!(
756 (score.health_penalty - (-100.0)).abs() < 0.01,
757 "Expected health_penalty -100.0, got {}",
758 score.health_penalty
759 );
760 }
761
762 #[test]
763 fn test_health_penalty_consecutive_errors() {
764 let cost = Some(30);
765 let metrics = Some(create_test_metrics(10, 8, 5000, 0));
766 let health = Some(AgentHealthState {
768 consecutive_errors: 3,
769 ..Default::default()
770 });
771 let requirement = SelectionRequirement::default();
772
773 let score = calculate_agent_score("test_agent", cost, &metrics, &health, &requirement);
774
775 assert!(
777 (score.health_penalty - (-45.0)).abs() < 0.01,
778 "Expected health_penalty -45.0, got {}",
779 score.health_penalty
780 );
781 }
782
783 #[test]
786 fn test_capability_health_zero_total() {
787 let cap_health = CapabilityHealth {
788 success_count: 0,
789 failure_count: 0,
790 last_error_at: None,
791 };
792
793 assert!(
795 (cap_health.success_rate() - 0.5).abs() < 0.001,
796 "Expected success_rate 0.5, got {}",
797 cap_health.success_rate()
798 );
799 }
800
801 #[test]
804 fn test_load_decrement_from_zero() {
805 let mut metrics = AgentMetrics {
806 current_load: 0,
807 ..Default::default()
808 };
809
810 MetricsCollector::decrement_load(&mut metrics);
812 assert_eq!(
813 metrics.current_load, 0,
814 "Expected load to stay at 0, got {}",
815 metrics.current_load
816 );
817 }
818
819 #[test]
820 fn test_load_increment_decrement_cycle() {
821 let mut metrics = AgentMetrics::default();
822
823 assert_eq!(metrics.current_load, 0);
825
826 MetricsCollector::increment_load(&mut metrics);
828 assert_eq!(metrics.current_load, 1);
829
830 MetricsCollector::increment_load(&mut metrics);
832 assert_eq!(metrics.current_load, 2);
833
834 MetricsCollector::decrement_load(&mut metrics);
836 assert_eq!(metrics.current_load, 1);
837
838 MetricsCollector::decrement_load(&mut metrics);
840 assert_eq!(metrics.current_load, 0);
841
842 MetricsCollector::decrement_load(&mut metrics);
844 assert_eq!(metrics.current_load, 0);
845 }
846
847 #[test]
848 fn test_p95_calculated_after_multiple_runs() {
849 let mut metrics = AgentMetrics::default();
850 for i in 1..=20 {
852 MetricsCollector::record_success(&mut metrics, i * 100);
853 }
854 assert_eq!(metrics.p95_duration_ms, 1900);
856 assert_eq!(metrics.duration_samples.len(), 20);
857 }
858
859 #[test]
860 fn test_p95_single_sample() {
861 let mut metrics = AgentMetrics::default();
862 MetricsCollector::record_success(&mut metrics, 500);
863 assert_eq!(metrics.p95_duration_ms, 500);
864 }
865
866 #[test]
867 fn test_p95_circular_buffer_evicts_oldest() {
868 let mut metrics = AgentMetrics::default();
869 for i in 1..=110 {
871 MetricsCollector::record_success(&mut metrics, i * 10);
872 }
873 assert_eq!(metrics.duration_samples.len(), P95_SAMPLE_LIMIT);
874 assert!(!metrics.duration_samples.contains(&10));
876 }
877}