use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use uuid::Uuid;
use crate::orchestrator::DxTool;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ToolId(Uuid);
impl ToolId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
}
impl Default for ToolId {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ToolStatus {
Stopped,
Starting,
Running,
Stopping,
Failed(String),
Completed,
}
#[derive(Debug, Clone)]
pub enum LifecycleEvent {
ToolStarting {
id: ToolId,
name: String,
},
ToolStarted {
id: ToolId,
name: String,
},
ToolStopping {
id: ToolId,
name: String,
},
ToolStopped {
id: ToolId,
name: String,
},
ToolFailed {
id: ToolId,
name: String,
error: String,
},
ToolCompleted {
id: ToolId,
name: String,
},
}
pub struct ToolState {
pub id: ToolId,
pub status: ToolStatus,
pub tool: Box<dyn DxTool>,
pub started_at: Option<DateTime<Utc>>,
pub stopped_at: Option<DateTime<Utc>>,
pub handle: Option<JoinHandle<()>>,
}
impl ToolState {
fn new(id: ToolId, tool: Box<dyn DxTool>) -> Self {
Self {
id,
status: ToolStatus::Stopped,
tool,
started_at: None,
stopped_at: None,
handle: None,
}
}
}
pub struct LifecycleManager {
tools: HashMap<ToolId, ToolState>,
event_bus: broadcast::Sender<LifecycleEvent>,
}
impl LifecycleManager {
pub fn new() -> Self {
let (event_bus, _) = broadcast::channel(1000);
Self {
tools: HashMap::new(),
event_bus,
}
}
pub fn register_tool(&mut self, tool: Box<dyn DxTool>) -> Result<ToolId> {
let id = ToolId::new();
let state = ToolState::new(id, tool);
self.tools.insert(id, state);
tracing::debug!("Registered tool with id: {:?}", id);
Ok(id)
}
pub async fn start_tool(&mut self, id: ToolId) -> Result<()> {
let state = self.tools.get_mut(&id)
.ok_or_else(|| anyhow!("Tool not found: {:?}", id))?;
if state.status == ToolStatus::Running {
return Err(anyhow!("Tool is already running: {:?}", id));
}
let tool_name = state.tool.name().to_string();
state.status = ToolStatus::Starting;
let _ = self.event_bus.send(LifecycleEvent::ToolStarting {
id,
name: tool_name.clone(),
});
state.status = ToolStatus::Running;
state.started_at = Some(Utc::now());
let _ = self.event_bus.send(LifecycleEvent::ToolStarted {
id,
name: tool_name,
});
tracing::info!("Started tool: {:?}", id);
Ok(())
}
pub async fn stop_tool(&mut self, id: ToolId) -> Result<()> {
let state = self.tools.get_mut(&id)
.ok_or_else(|| anyhow!("Tool not found: {:?}", id))?;
if state.status == ToolStatus::Stopped {
return Ok(());
}
let tool_name = state.tool.name().to_string();
state.status = ToolStatus::Stopping;
let _ = self.event_bus.send(LifecycleEvent::ToolStopping {
id,
name: tool_name.clone(),
});
if let Some(handle) = state.handle.take() {
handle.abort();
}
state.status = ToolStatus::Stopped;
state.stopped_at = Some(Utc::now());
let _ = self.event_bus.send(LifecycleEvent::ToolStopped {
id,
name: tool_name,
});
tracing::info!("Stopped tool: {:?}", id);
Ok(())
}
pub fn get_status(&self, id: ToolId) -> Option<ToolStatus> {
self.tools.get(&id).map(|state| state.status.clone())
}
pub fn list_tool_ids(&self) -> Vec<ToolId> {
self.tools.keys().copied().collect()
}
pub fn stop_all(&mut self) -> Result<()> {
let tool_ids: Vec<ToolId> = self.tools.keys().copied().collect();
for id in tool_ids {
if let Some(state) = self.tools.get(&id) {
if state.status == ToolStatus::Running {
let rt = tokio::runtime::Handle::try_current();
if let Ok(handle) = rt {
handle.block_on(async {
let _ = self.stop_tool(id).await;
});
}
}
}
}
Ok(())
}
pub fn subscribe(&self) -> broadcast::Receiver<LifecycleEvent> {
self.event_bus.subscribe()
}
pub fn running_count(&self) -> usize {
self.tools
.values()
.filter(|state| state.status == ToolStatus::Running)
.count()
}
pub fn total_count(&self) -> usize {
self.tools.len()
}
}
impl Default for LifecycleManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::orchestrator::{ExecutionContext, ToolOutput};
struct TestTool {
name: String,
}
impl DxTool for TestTool {
fn name(&self) -> &str {
&self.name
}
fn version(&self) -> &str {
"1.0.0"
}
fn priority(&self) -> u32 {
50
}
fn execute(&mut self, _ctx: &ExecutionContext) -> Result<ToolOutput> {
Ok(ToolOutput::success())
}
}
#[tokio::test]
async fn test_register_tool() {
let mut manager = LifecycleManager::new();
let tool = Box::new(TestTool {
name: "test-tool".to_string(),
});
let id = manager.register_tool(tool).unwrap();
assert_eq!(manager.total_count(), 1);
assert_eq!(manager.get_status(id), Some(ToolStatus::Stopped));
}
#[tokio::test]
async fn test_start_stop_tool() {
let mut manager = LifecycleManager::new();
let tool = Box::new(TestTool {
name: "test-tool".to_string(),
});
let id = manager.register_tool(tool).unwrap();
manager.start_tool(id).await.unwrap();
assert_eq!(manager.get_status(id), Some(ToolStatus::Running));
assert_eq!(manager.running_count(), 1);
manager.stop_tool(id).await.unwrap();
assert_eq!(manager.get_status(id), Some(ToolStatus::Stopped));
assert_eq!(manager.running_count(), 0);
}
#[tokio::test]
async fn test_lifecycle_events() {
let mut manager = LifecycleManager::new();
let mut rx = manager.subscribe();
let tool = Box::new(TestTool {
name: "test-tool".to_string(),
});
let id = manager.register_tool(tool).unwrap();
manager.start_tool(id).await.unwrap();
if let Ok(event) = rx.try_recv() {
match event {
LifecycleEvent::ToolStarting { id: _, name } => {
assert_eq!(name, "test-tool");
}
_ => panic!("Expected ToolStarting event"),
}
}
}
}