use std::collections::HashMap;
use std::sync::Weak;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::mcp::McpManager;
#[derive(Debug, Clone)]
pub enum LifecycleEvent {
StartIdleTimer { server: String, timeout: Duration },
CancelIdleTimer { server: String },
StartHealthCheck { server: String },
ServerStopped { server: String },
Shutdown,
}
pub type LifecycleTx = mpsc::UnboundedSender<LifecycleEvent>;
pub type LifecycleRx = mpsc::UnboundedReceiver<LifecycleEvent>;
pub fn channel() -> (LifecycleTx, LifecycleRx) {
mpsc::unbounded_channel()
}
pub async fn lifecycle_event_loop(mut rx: LifecycleRx, manager: Weak<McpManager>) {
let mut idle_timers: HashMap<String, JoinHandle<()>> = HashMap::new();
let mut health_handles: HashMap<String, JoinHandle<()>> = HashMap::new();
while let Some(event) = rx.recv().await {
match event {
LifecycleEvent::StartIdleTimer { server, timeout } => {
if let Some(h) = idle_timers.remove(&server) {
h.abort();
}
let mgr = manager.clone();
let srv = server.clone();
idle_timers.insert(
server,
tokio::spawn(async move {
tokio::time::sleep(timeout).await;
if let Some(m) = mgr.upgrade() {
if let Err(e) = m.disconnect_server(&srv).await {
tracing::debug!(
"MCP: idle-disconnect for '{}' failed: {}",
srv,
e
);
}
}
}),
);
}
LifecycleEvent::CancelIdleTimer { server } => {
if let Some(h) = idle_timers.remove(&server) {
h.abort();
}
}
LifecycleEvent::StartHealthCheck { server } => {
if let Some(h) = health_handles.remove(&server) {
h.abort();
}
let mgr = manager.clone();
let srv = server.clone();
health_handles.insert(
server,
tokio::spawn(async move {
let interval = Duration::from_secs(30);
loop {
tokio::time::sleep(interval).await;
let Some(m) = mgr.upgrade() else {
break;
};
match m.health_check_and_reconnect(&srv).await {
Ok(()) => continue,
Err(e) => {
tracing::warn!(
"MCP: health check for keep-alive server '{}' failed: {}",
srv,
e
);
break;
}
}
}
}),
);
}
LifecycleEvent::ServerStopped { server } => {
if let Some(h) = idle_timers.remove(&server) {
h.abort();
}
if let Some(h) = health_handles.remove(&server) {
h.abort();
}
}
LifecycleEvent::Shutdown => break,
}
}
for (_, h) in idle_timers {
h.abort();
}
for (_, h) in health_handles {
h.abort();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn channel_round_trip() {
let (tx, mut rx) = channel();
tx.send(LifecycleEvent::CancelIdleTimer {
server: "test".into(),
})
.unwrap();
let event = rx.recv().await.unwrap();
match event {
LifecycleEvent::CancelIdleTimer { server } => assert_eq!(server, "test"),
_ => panic!("wrong event"),
}
}
#[tokio::test]
async fn lifecycle_event_loop_runs_to_completion() {
let (tx, rx) = channel();
let manager: Weak<McpManager> = Weak::new();
let task = tokio::spawn(lifecycle_event_loop(rx, manager));
tx.send(LifecycleEvent::CancelIdleTimer {
server: "x".into(),
})
.unwrap();
drop(tx);
task.await.unwrap();
}
}