1use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use tokio::sync::{mpsc, Notify};
11use tokio::time::interval;
12
13use crate::types::*;
14
15#[async_trait]
17pub trait LifecycleController {
18 async fn initialize_agent(&self, config: AgentConfig) -> Result<AgentId, LifecycleError>;
20
21 async fn start_agent(&self, agent_id: AgentId) -> Result<(), LifecycleError>;
23
24 async fn suspend_agent(&self, agent_id: AgentId) -> Result<(), LifecycleError>;
26
27 async fn resume_agent(&self, agent_id: AgentId) -> Result<(), LifecycleError>;
29
30 async fn terminate_agent(&self, agent_id: AgentId) -> Result<(), LifecycleError>;
32
33 async fn get_agent_state(&self, agent_id: AgentId) -> Result<AgentState, LifecycleError>;
35
36 async fn get_agents_by_state(&self, state: AgentState) -> Vec<AgentId>;
38
39 async fn shutdown(&self) -> Result<(), LifecycleError>;
41
42 async fn check_health(&self) -> Result<ComponentHealth, LifecycleError>;
44}
45
46#[derive(Debug, Clone)]
48pub struct LifecycleConfig {
49 pub max_agents: usize,
50 pub initialization_timeout: Duration,
51 pub termination_timeout: Duration,
52 pub state_check_interval: Duration,
53 pub enable_auto_recovery: bool,
54 pub max_restart_attempts: u32,
55}
56
57impl Default for LifecycleConfig {
58 fn default() -> Self {
59 Self {
60 max_agents: 10000,
61 initialization_timeout: Duration::from_secs(30),
62 termination_timeout: Duration::from_secs(10),
63 state_check_interval: Duration::from_secs(5),
64 enable_auto_recovery: true,
65 max_restart_attempts: 3,
66 }
67 }
68}
69
70pub struct DefaultLifecycleController {
72 config: LifecycleConfig,
73 agents: Arc<RwLock<HashMap<AgentId, AgentInstance>>>,
74 state_machine: Arc<AgentStateMachine>,
75 event_sender: mpsc::UnboundedSender<LifecycleEvent>,
76 shutdown_notify: Arc<Notify>,
77 is_running: Arc<RwLock<bool>>,
78}
79
80impl DefaultLifecycleController {
81 pub async fn new(config: LifecycleConfig) -> Result<Self, LifecycleError> {
83 let agents = Arc::new(RwLock::new(HashMap::new()));
84 let state_machine = Arc::new(AgentStateMachine::new());
85 let (event_sender, event_receiver) = mpsc::unbounded_channel();
86 let shutdown_notify = Arc::new(Notify::new());
87 let is_running = Arc::new(RwLock::new(true));
88
89 let controller = Self {
90 config,
91 agents,
92 state_machine,
93 event_sender,
94 shutdown_notify,
95 is_running,
96 };
97
98 controller.start_event_loop(event_receiver).await;
100 controller.start_state_monitor().await;
101
102 Ok(controller)
103 }
104
105 async fn start_event_loop(&self, mut event_receiver: mpsc::UnboundedReceiver<LifecycleEvent>) {
107 let agents = self.agents.clone();
108 let state_machine = self.state_machine.clone();
109 let shutdown_notify = self.shutdown_notify.clone();
110 let is_running = self.is_running.clone();
111
112 tokio::spawn(async move {
113 loop {
114 tokio::select! {
115 event = event_receiver.recv() => {
116 if !*is_running.read() {
117 break;
118 }
119 if let Some(event) = event {
120 Self::process_lifecycle_event(event, &agents, &state_machine).await;
121 } else {
122 break;
123 }
124 }
125 _ = shutdown_notify.notified() => {
126 break;
127 }
128 }
129 }
130 });
131 }
132
133 async fn start_state_monitor(&self) {
135 let agents = self.agents.clone();
136 let state_machine = self.state_machine.clone();
137 let shutdown_notify = self.shutdown_notify.clone();
138 let is_running = self.is_running.clone();
139 let check_interval = self.config.state_check_interval;
140 let enable_auto_recovery = self.config.enable_auto_recovery;
141 let max_restart_attempts = self.config.max_restart_attempts;
142
143 tokio::spawn(async move {
144 let mut interval = interval(check_interval);
145
146 loop {
147 tokio::select! {
148 _ = interval.tick() => {
149 if !*is_running.read() {
150 break;
151 }
152
153 Self::monitor_agent_states(&agents, &state_machine, enable_auto_recovery, max_restart_attempts).await;
154 }
155 _ = shutdown_notify.notified() => {
156 break;
157 }
158 }
159 }
160 });
161 }
162
163 async fn process_lifecycle_event(
165 event: LifecycleEvent,
166 agents: &Arc<RwLock<HashMap<AgentId, AgentInstance>>>,
167 state_machine: &Arc<AgentStateMachine>,
168 ) {
169 match event {
170 LifecycleEvent::StateTransition {
171 agent_id,
172 from_state,
173 to_state,
174 } => {
175 if let Some(agent) = agents.write().get_mut(&agent_id) {
176 if state_machine.is_valid_transition(&from_state, &to_state) {
177 agent.state = to_state.clone();
178 agent.last_state_change = SystemTime::now();
179
180 tracing::info!(
181 "Agent {} transitioned from {:?} to {:?}",
182 agent_id,
183 from_state,
184 to_state
185 );
186 } else {
187 tracing::error!(
188 "Invalid state transition for agent {}: {:?} -> {:?}",
189 agent_id,
190 from_state,
191 to_state
192 );
193 }
194 }
195 }
196 LifecycleEvent::AgentError {
197 agent_id,
198 error,
199 timestamp,
200 } => {
201 tracing::error!(
202 "Agent {} encountered error: {} at {:?}",
203 agent_id,
204 error,
205 timestamp
206 );
207
208 if let Some(agent) = agents.write().get_mut(&agent_id) {
209 agent.error_count += 1;
210 agent.last_error = Some(error);
211 if state_machine.is_valid_transition(&agent.state, &AgentState::Failed) {
213 agent.state = AgentState::Failed;
214 agent.last_state_change = timestamp;
215 } else {
216 tracing::warn!(
217 "Cannot transition agent {} to Failed state from {:?}",
218 agent_id,
219 agent.state
220 );
221 }
222 }
223 }
224 LifecycleEvent::ResourceExhausted {
225 agent_id,
226 resource_type,
227 timestamp,
228 } => {
229 tracing::warn!(
230 "Agent {} exhausted resource {} at {:?}",
231 agent_id,
232 resource_type,
233 timestamp
234 );
235
236 if let Some(agent) = agents.write().get_mut(&agent_id) {
237 if state_machine.is_valid_transition(&agent.state, &AgentState::Suspended) {
239 agent.state = AgentState::Suspended;
240 agent.last_state_change = timestamp;
241 } else {
242 tracing::warn!(
243 "Cannot transition agent {} to Suspended state from {:?}",
244 agent_id,
245 agent.state
246 );
247 }
248 }
249 }
250 }
251 }
252
253 async fn monitor_agent_states(
255 agents: &Arc<RwLock<HashMap<AgentId, AgentInstance>>>,
256 state_machine: &Arc<AgentStateMachine>,
257 enable_auto_recovery: bool,
258 max_restart_attempts: u32,
259 ) {
260 let mut agents_to_restart = Vec::new();
261 let mut error_events = Vec::new();
262 let mut resource_events = Vec::new();
263
264 {
265 let agents_read = agents.read();
266 for (agent_id, agent) in agents_read.iter() {
267 if enable_auto_recovery
269 && agent.state == AgentState::Failed
270 && agent.restart_count < max_restart_attempts
271 {
272 agents_to_restart.push(*agent_id);
273 }
274
275 let time_in_state = SystemTime::now()
277 .duration_since(agent.last_state_change)
278 .unwrap_or_default();
279
280 if time_in_state > Duration::from_secs(300) {
281 match agent.state {
283 AgentState::Initializing | AgentState::Terminating => {
284 tracing::warn!(
285 "Agent {} stuck in {:?} state for {:?}",
286 agent_id,
287 agent.state,
288 time_in_state
289 );
290 error_events.push(LifecycleEvent::AgentError {
292 agent_id: *agent_id,
293 error: format!(
294 "Agent stuck in {:?} state for {:?}",
295 agent.state, time_in_state
296 ),
297 timestamp: SystemTime::now(),
298 });
299 }
300 _ => {}
301 }
302 }
303
304 if agent.error_count > 5 && agent.state == AgentState::Running {
306 resource_events.push(LifecycleEvent::ResourceExhausted {
307 agent_id: *agent_id,
308 resource_type: "error_threshold".to_string(),
309 timestamp: SystemTime::now(),
310 });
311 }
312 }
313 }
314
315 for agent_id in agents_to_restart {
317 if let Some(agent) = agents.write().get_mut(&agent_id) {
318 if state_machine.is_valid_transition(&agent.state, &AgentState::Initializing) {
320 agent.restart_count += 1;
321 agent.state = AgentState::Initializing;
322 agent.last_state_change = SystemTime::now();
323
324 tracing::info!(
325 "Auto-restarting failed agent {} (attempt {})",
326 agent_id,
327 agent.restart_count
328 );
329 } else {
330 tracing::warn!(
331 "Cannot restart agent {} from state {:?}",
332 agent_id,
333 agent.state
334 );
335 }
336 }
337 }
338
339 for event in error_events {
341 Self::process_lifecycle_event(event, agents, state_machine).await;
342 }
343 for event in resource_events {
344 Self::process_lifecycle_event(event, agents, state_machine).await;
345 }
346 }
347
348 fn send_event(&self, event: LifecycleEvent) -> Result<(), LifecycleError> {
350 self.event_sender
351 .send(event)
352 .map_err(|_| LifecycleError::EventProcessingFailed {
353 reason: "Failed to send lifecycle event".to_string(),
354 })
355 }
356}
357
358#[async_trait]
359impl LifecycleController for DefaultLifecycleController {
360 async fn initialize_agent(&self, config: AgentConfig) -> Result<AgentId, LifecycleError> {
361 if !*self.is_running.read() {
362 return Err(LifecycleError::ShuttingDown);
363 }
364
365 let agents_count = self.agents.read().len();
366 if agents_count >= self.config.max_agents {
367 return Err(LifecycleError::ResourceExhausted {
368 reason: format!(
369 "Agent slots exhausted: {} / {}",
370 agents_count, self.config.max_agents
371 ),
372 });
373 }
374
375 let agent_id = config.id;
376 let instance = AgentInstance::new(config);
377
378 self.agents.write().insert(agent_id, instance);
380
381 self.send_event(LifecycleEvent::StateTransition {
383 agent_id,
384 from_state: AgentState::Created,
385 to_state: AgentState::Initializing,
386 })?;
387
388 tracing::info!("Initialized agent {}", agent_id);
389 Ok(agent_id)
390 }
391
392 async fn start_agent(&self, agent_id: AgentId) -> Result<(), LifecycleError> {
393 let current_state = {
394 let agents = self.agents.read();
395 agents
396 .get(&agent_id)
397 .map(|agent| agent.state.clone())
398 .ok_or(LifecycleError::AgentNotFound { agent_id })?
399 };
400
401 if !self
402 .state_machine
403 .is_valid_transition(¤t_state, &AgentState::Running)
404 {
405 return Err(LifecycleError::InvalidStateTransition {
406 from: format!("{:?}", current_state),
407 to: format!("{:?}", AgentState::Running),
408 });
409 }
410
411 self.send_event(LifecycleEvent::StateTransition {
412 agent_id,
413 from_state: current_state,
414 to_state: AgentState::Running,
415 })?;
416
417 tracing::info!("Started agent {}", agent_id);
418 Ok(())
419 }
420
421 async fn suspend_agent(&self, agent_id: AgentId) -> Result<(), LifecycleError> {
422 let current_state = {
423 let agents = self.agents.read();
424 agents
425 .get(&agent_id)
426 .map(|agent| agent.state.clone())
427 .ok_or(LifecycleError::AgentNotFound { agent_id })?
428 };
429
430 if !self
431 .state_machine
432 .is_valid_transition(¤t_state, &AgentState::Suspended)
433 {
434 return Err(LifecycleError::InvalidStateTransition {
435 from: format!("{:?}", current_state),
436 to: format!("{:?}", AgentState::Suspended),
437 });
438 }
439
440 self.send_event(LifecycleEvent::StateTransition {
441 agent_id,
442 from_state: current_state,
443 to_state: AgentState::Suspended,
444 })?;
445
446 tracing::info!("Suspended agent {}", agent_id);
447 Ok(())
448 }
449
450 async fn resume_agent(&self, agent_id: AgentId) -> Result<(), LifecycleError> {
451 let current_state = {
452 let agents = self.agents.read();
453 agents
454 .get(&agent_id)
455 .map(|agent| agent.state.clone())
456 .ok_or(LifecycleError::AgentNotFound { agent_id })?
457 };
458
459 if current_state != AgentState::Suspended {
460 return Err(LifecycleError::InvalidStateTransition {
461 from: format!("{:?}", current_state),
462 to: format!("{:?}", AgentState::Running),
463 });
464 }
465
466 self.send_event(LifecycleEvent::StateTransition {
467 agent_id,
468 from_state: current_state,
469 to_state: AgentState::Running,
470 })?;
471
472 tracing::info!("Resumed agent {}", agent_id);
473 Ok(())
474 }
475
476 async fn terminate_agent(&self, agent_id: AgentId) -> Result<(), LifecycleError> {
477 let current_state = {
478 let agents = self.agents.read();
479 agents
480 .get(&agent_id)
481 .map(|agent| agent.state.clone())
482 .ok_or(LifecycleError::AgentNotFound { agent_id })?
483 };
484
485 self.send_event(LifecycleEvent::StateTransition {
486 agent_id,
487 from_state: current_state,
488 to_state: AgentState::Terminating,
489 })?;
490
491 tokio::time::sleep(Duration::from_millis(100)).await;
493
494 self.send_event(LifecycleEvent::StateTransition {
495 agent_id,
496 from_state: AgentState::Terminating,
497 to_state: AgentState::Terminated,
498 })?;
499
500 self.agents.write().remove(&agent_id);
502
503 tracing::info!("Terminated agent {}", agent_id);
504 Ok(())
505 }
506
507 async fn get_agent_state(&self, agent_id: AgentId) -> Result<AgentState, LifecycleError> {
508 let agents = self.agents.read();
509 agents
510 .get(&agent_id)
511 .map(|agent| agent.state.clone())
512 .ok_or(LifecycleError::AgentNotFound { agent_id })
513 }
514
515 async fn get_agents_by_state(&self, state: AgentState) -> Vec<AgentId> {
516 let agents = self.agents.read();
517 agents
518 .iter()
519 .filter(|(_, agent)| agent.state == state)
520 .map(|(id, _)| *id)
521 .collect()
522 }
523
524 async fn shutdown(&self) -> Result<(), LifecycleError> {
525 tracing::info!("Shutting down lifecycle controller");
526
527 *self.is_running.write() = false;
528 self.shutdown_notify.notify_waiters();
529
530 let agent_ids: Vec<AgentId> = self.agents.read().keys().copied().collect();
532
533 for agent_id in agent_ids {
534 if let Err(e) = self.terminate_agent(agent_id).await {
535 tracing::error!(
536 "Failed to terminate agent {} during shutdown: {}",
537 agent_id,
538 e
539 );
540 }
541 }
542
543 Ok(())
544 }
545
546 async fn check_health(&self) -> Result<ComponentHealth, LifecycleError> {
547 let is_running = *self.is_running.read();
548 if !is_running {
549 return Ok(ComponentHealth::unhealthy(
550 "Lifecycle controller is shut down".to_string(),
551 ));
552 }
553
554 let agents = self.agents.read();
555 let total_agents = agents.len();
556
557 let mut state_counts = std::collections::HashMap::new();
559 let mut failed_count = 0;
560 let mut stuck_count = 0;
561
562 for agent in agents.values() {
563 *state_counts.entry(agent.state.clone()).or_insert(0) += 1;
564
565 if agent.state == AgentState::Failed {
566 failed_count += 1;
567 }
568
569 let time_in_state = SystemTime::now()
571 .duration_since(agent.last_state_change)
572 .unwrap_or_default();
573
574 if time_in_state > Duration::from_secs(300)
575 && matches!(
576 agent.state,
577 AgentState::Initializing | AgentState::Terminating
578 )
579 {
580 stuck_count += 1;
581 }
582 }
583
584 let capacity_usage = total_agents as f64 / self.config.max_agents as f64;
585
586 let status = if stuck_count > 0 {
587 ComponentHealth::degraded(format!(
588 "{} agents stuck in transitional states",
589 stuck_count
590 ))
591 } else if failed_count > total_agents / 4 {
592 ComponentHealth::degraded(format!(
593 "High failure rate: {}/{} agents failed",
594 failed_count, total_agents
595 ))
596 } else if capacity_usage > 0.9 {
597 ComponentHealth::degraded(format!(
598 "Near capacity: {}/{} agent slots used",
599 total_agents, self.config.max_agents
600 ))
601 } else {
602 ComponentHealth::healthy(Some(format!(
603 "Managing {} agents across {} states",
604 total_agents,
605 state_counts.len()
606 )))
607 };
608
609 let mut health = status
610 .with_metric("total_agents".to_string(), total_agents.to_string())
611 .with_metric("failed_agents".to_string(), failed_count.to_string())
612 .with_metric("stuck_agents".to_string(), stuck_count.to_string())
613 .with_metric(
614 "capacity_usage".to_string(),
615 format!("{:.2}", capacity_usage),
616 )
617 .with_metric("max_agents".to_string(), self.config.max_agents.to_string());
618
619 for (state, count) in state_counts {
621 health = health.with_metric(
622 format!("state_{:?}", state).to_lowercase(),
623 count.to_string(),
624 );
625 }
626
627 Ok(health)
628 }
629}
630
631pub struct AgentStateMachine {
633 valid_transitions: HashMap<AgentState, Vec<AgentState>>,
634}
635
636impl Default for AgentStateMachine {
637 fn default() -> Self {
638 Self::new()
639 }
640}
641
642impl AgentStateMachine {
643 pub fn new() -> Self {
644 let mut valid_transitions = HashMap::new();
645
646 valid_transitions.insert(AgentState::Created, vec![AgentState::Initializing]);
648 valid_transitions.insert(
649 AgentState::Initializing,
650 vec![AgentState::Ready, AgentState::Failed],
651 );
652 valid_transitions.insert(
653 AgentState::Ready,
654 vec![
655 AgentState::Running,
656 AgentState::Suspended,
657 AgentState::Terminating,
658 ],
659 );
660 valid_transitions.insert(
661 AgentState::Running,
662 vec![
663 AgentState::Suspended,
664 AgentState::Completed,
665 AgentState::Failed,
666 AgentState::Terminating,
667 ],
668 );
669 valid_transitions.insert(
670 AgentState::Suspended,
671 vec![AgentState::Running, AgentState::Terminating],
672 );
673 valid_transitions.insert(AgentState::Completed, vec![AgentState::Terminating]);
674 valid_transitions.insert(
675 AgentState::Failed,
676 vec![AgentState::Initializing, AgentState::Terminating],
677 );
678 valid_transitions.insert(AgentState::Terminating, vec![AgentState::Terminated]);
679 valid_transitions.insert(AgentState::Terminated, vec![]); Self { valid_transitions }
682 }
683
684 pub fn is_valid_transition(&self, from: &AgentState, to: &AgentState) -> bool {
685 self.valid_transitions
686 .get(from)
687 .map(|transitions| transitions.contains(to))
688 .unwrap_or(false)
689 }
690}
691
692#[derive(Debug, Clone)]
694enum LifecycleEvent {
695 StateTransition {
696 agent_id: AgentId,
697 from_state: AgentState,
698 to_state: AgentState,
699 },
700 AgentError {
701 agent_id: AgentId,
702 error: String,
703 timestamp: SystemTime,
704 },
705 ResourceExhausted {
706 agent_id: AgentId,
707 resource_type: String,
708 timestamp: SystemTime,
709 },
710}
711
712#[cfg(test)]
713mod tests {
714 use super::*;
715 use crate::types::{ExecutionMode, Priority, ResourceLimits, SecurityTier};
716 use std::collections::HashMap;
717
718 fn create_test_config() -> AgentConfig {
719 AgentConfig {
720 id: AgentId::new(),
721 name: "test".to_string(),
722 dsl_source: "test".to_string(),
723 execution_mode: ExecutionMode::Ephemeral,
724 security_tier: SecurityTier::Tier1,
725 resource_limits: ResourceLimits::default(),
726 capabilities: vec![],
727 policies: vec![],
728 metadata: HashMap::new(),
729 priority: Priority::Normal,
730 }
731 }
732
733 #[tokio::test]
734 async fn test_agent_initialization() {
735 let controller = DefaultLifecycleController::new(LifecycleConfig::default())
736 .await
737 .unwrap();
738 let config = create_test_config();
739
740 let agent_id = controller.initialize_agent(config).await.unwrap();
741
742 tokio::time::sleep(Duration::from_millis(50)).await;
744
745 let state = controller.get_agent_state(agent_id).await.unwrap();
746 assert_eq!(state, AgentState::Initializing);
747 }
748
749 #[tokio::test]
750 async fn test_state_transitions() {
751 let controller = DefaultLifecycleController::new(LifecycleConfig::default())
752 .await
753 .unwrap();
754 let config = create_test_config();
755
756 let agent_id = controller.initialize_agent(config).await.unwrap();
757
758 tokio::time::sleep(Duration::from_millis(50)).await;
760
761 controller
763 .send_event(LifecycleEvent::StateTransition {
764 agent_id,
765 from_state: AgentState::Initializing,
766 to_state: AgentState::Ready,
767 })
768 .unwrap();
769
770 tokio::time::sleep(Duration::from_millis(50)).await;
771
772 controller.start_agent(agent_id).await.unwrap();
774
775 tokio::time::sleep(Duration::from_millis(50)).await;
776
777 let state = controller.get_agent_state(agent_id).await.unwrap();
778 assert_eq!(state, AgentState::Running);
779 }
780
781 #[tokio::test]
782 async fn test_agent_termination() {
783 let controller = DefaultLifecycleController::new(LifecycleConfig::default())
784 .await
785 .unwrap();
786 let config = create_test_config();
787
788 let agent_id = controller.initialize_agent(config).await.unwrap();
789
790 tokio::time::sleep(Duration::from_millis(50)).await;
791
792 controller.terminate_agent(agent_id).await.unwrap();
793
794 tokio::time::sleep(Duration::from_millis(150)).await;
795
796 let result = controller.get_agent_state(agent_id).await;
797 assert!(result.is_err());
798 }
799
800 #[test]
801 fn test_state_machine() {
802 let state_machine = AgentStateMachine::new();
803
804 assert!(state_machine.is_valid_transition(&AgentState::Created, &AgentState::Initializing));
806 assert!(state_machine.is_valid_transition(&AgentState::Initializing, &AgentState::Ready));
807 assert!(state_machine.is_valid_transition(&AgentState::Ready, &AgentState::Running));
808
809 assert!(!state_machine.is_valid_transition(&AgentState::Created, &AgentState::Running));
811 assert!(!state_machine.is_valid_transition(&AgentState::Terminated, &AgentState::Running));
812 }
813}