Skip to main content

bamboo_engine/mcp/manager/
lifecycle.rs

1use tokio::time::{interval, Duration};
2
3use super::fingerprint::desired_proxy_fingerprint;
4use super::*;
5
6impl McpServerManager {
7    /// Start a new MCP server connection.
8    pub async fn start_server(&self, config: McpServerConfig) -> Result<()> {
9        let server_id = config.id.clone();
10
11        if self.runtimes.contains_key(&server_id) {
12            return Err(McpError::AlreadyRunning(server_id));
13        }
14
15        info!("Starting MCP server '{}'", server_id);
16
17        let runtime_proxy_fingerprint = desired_proxy_fingerprint(self.config.as_ref()).await;
18        let (client, tools) = self
19            .bootstrap_server_client(&server_id, &config, "start")
20            .await?;
21
22        // Create runtime
23        let runtime = Arc::new(ServerRuntime {
24            config: config.clone(),
25            client: RwLock::new(client),
26            info: RwLock::new(RuntimeInfo {
27                status: ServerStatus::Ready,
28                last_error: None,
29                connected_at: Some(Utc::now()),
30                disconnected_at: None,
31                tool_count: tools.len(),
32                restart_count: 0,
33                last_ping_at: Some(Utc::now()),
34            }),
35            tools: RwLock::new(tools.clone()),
36            shutdown: AtomicBool::new(false),
37            reconnecting: AtomicBool::new(false),
38            qos: McpServerQos::new(McpQosConfig::default()),
39            proxy_fingerprint: runtime_proxy_fingerprint,
40        });
41
42        // Register tools in index
43        let aliases = self.index.register_server_tools(
44            &server_id,
45            &tools,
46            &config.allowed_tools,
47            &config.denied_tools,
48        );
49
50        info!(
51            "Registered {} MCP tools for server '{}'",
52            aliases.len(),
53            server_id
54        );
55
56        // Store runtime
57        self.runtimes.insert(server_id.clone(), runtime.clone());
58
59        // Emit event
60        if let Some(ref tx) = self.event_tx {
61            let _ = tx
62                .send(McpEvent::ServerStatusChanged {
63                    server_id: server_id.clone(),
64                    status: ServerStatus::Ready,
65                    error: None,
66                })
67                .await;
68
69            let tool_names: Vec<String> = aliases.into_iter().map(|a| a.alias).collect();
70            let _ = tx
71                .send(McpEvent::ToolsChanged {
72                    server_id,
73                    tools: tool_names,
74                })
75                .await;
76        }
77
78        // Start health check task
79        self.start_health_check(runtime, config.healthcheck_interval_ms);
80
81        Ok(())
82    }
83
84    /// Stop an MCP server connection.
85    pub async fn stop_server(&self, server_id: &str) -> Result<()> {
86        let (_, runtime) = self
87            .runtimes
88            .remove(server_id)
89            .ok_or_else(|| McpError::NotRunning(server_id.to_string()))?;
90
91        info!("Stopping MCP server '{}'", server_id);
92
93        runtime.shutdown.store(true, Ordering::SeqCst);
94
95        // Disconnect client
96        let mut client = runtime.client.write().await;
97        if let Err(e) = client.disconnect().await {
98            warn!("Error disconnecting MCP server '{}': {}", server_id, e);
99        }
100
101        // Update info
102        let mut info = runtime.info.write().await;
103        info.status = ServerStatus::Stopped;
104        info.disconnected_at = Some(Utc::now());
105
106        // Remove tools from index
107        self.index.remove_server_tools(server_id);
108
109        // Emit event
110        if let Some(ref tx) = self.event_tx {
111            let _ = tx
112                .send(McpEvent::ServerStatusChanged {
113                    server_id: server_id.to_string(),
114                    status: ServerStatus::Stopped,
115                    error: None,
116                })
117                .await;
118        }
119
120        info!("MCP server '{}' stopped", server_id);
121        Ok(())
122    }
123
124    /// Call a tool on a specific server.
125    pub async fn call_tool(
126        &self,
127        server_id: &str,
128        tool_name: &str,
129        args: serde_json::Value,
130    ) -> Result<crate::mcp::types::McpCallResult> {
131        let runtime = self
132            .runtimes
133            .get(server_id)
134            .ok_or_else(|| McpError::ServerNotFound(server_id.to_string()))?;
135
136        runtime.qos.check_circuit(server_id, tool_name).await?;
137        let _permit = runtime.qos.acquire_permit().await?;
138
139        let client = runtime.client.read().await;
140        let timeout = runtime.config.request_timeout_ms;
141        let result = client.call_tool(tool_name, args, timeout).await;
142        drop(client);
143
144        let result = match result {
145            Ok(result) => {
146                runtime.qos.record_success().await;
147                result
148            }
149            Err(error) => {
150                runtime
151                    .qos
152                    .record_failure(server_id, tool_name, &error)
153                    .await;
154                return Err(error);
155            }
156        };
157
158        // Emit event
159        if let Some(ref tx) = self.event_tx {
160            let _ = tx
161                .send(McpEvent::ToolExecuted {
162                    server_id: server_id.to_string(),
163                    tool_name: tool_name.to_string(),
164                    success: !result.is_error,
165                })
166                .await;
167        }
168
169        Ok(result)
170    }
171
172    /// Get tool info for a specific tool.
173    pub fn get_tool_info(&self, server_id: &str, tool_name: &str) -> Option<McpTool> {
174        self.runtimes.get(server_id).and_then(|runtime| {
175            let tools = runtime.tools.try_read().ok()?;
176            tools.iter().find(|t| t.name == tool_name).cloned()
177        })
178    }
179
180    /// Refresh tools from a server.
181    pub async fn refresh_tools(&self, server_id: &str) -> Result<()> {
182        let runtime = self
183            .runtimes
184            .get(server_id)
185            .ok_or_else(|| McpError::ServerNotFound(server_id.to_string()))?;
186
187        info!("Refreshing tools for MCP server '{}'", server_id);
188
189        let client = runtime.client.read().await;
190        let new_tools = client.list_tools(runtime.config.request_timeout_ms).await?;
191        drop(client);
192
193        // Update tools
194        let mut tools = runtime.tools.write().await;
195        *tools = new_tools.clone();
196        drop(tools);
197
198        // Update info
199        let mut info = runtime.info.write().await;
200        info.tool_count = new_tools.len();
201
202        // Re-register tools
203        self.index.remove_server_tools(server_id);
204        let aliases = self.index.register_server_tools(
205            server_id,
206            &new_tools,
207            &runtime.config.allowed_tools,
208            &runtime.config.denied_tools,
209        );
210
211        info!(
212            "Refreshed {} tools for MCP server '{}'",
213            aliases.len(),
214            server_id
215        );
216
217        // Emit event
218        if let Some(ref tx) = self.event_tx {
219            let tool_names: Vec<String> = aliases.into_iter().map(|a| a.alias).collect();
220            let _ = tx
221                .send(McpEvent::ToolsChanged {
222                    server_id: server_id.to_string(),
223                    tools: tool_names,
224                })
225                .await;
226        }
227
228        Ok(())
229    }
230
231    fn start_health_check(&self, runtime: Arc<ServerRuntime>, interval_ms: u64) {
232        let server_id = runtime.config.id.clone();
233        let manager = Arc::new(self.clone());
234
235        tokio::spawn(async move {
236            let mut interval = interval(Duration::from_millis(interval_ms));
237
238            loop {
239                interval.tick().await;
240
241                if runtime.shutdown.load(Ordering::SeqCst) {
242                    break;
243                }
244
245                // Skip health check if currently reconnecting
246                if runtime.reconnecting.load(Ordering::SeqCst) {
247                    continue;
248                }
249
250                let client = runtime.client.read().await;
251                match client.ping(runtime.config.request_timeout_ms).await {
252                    Ok(_) => {
253                        let mut info = runtime.info.write().await;
254                        info.last_ping_at = Some(Utc::now());
255                        if info.status == ServerStatus::Degraded {
256                            info.status = ServerStatus::Ready;
257                            // Emit recovery event
258                            if let Some(ref tx) = manager.event_tx {
259                                let _ = tx
260                                    .send(McpEvent::ServerStatusChanged {
261                                        server_id: server_id.clone(),
262                                        status: ServerStatus::Ready,
263                                        error: None,
264                                    })
265                                    .await;
266                            }
267                        }
268                    }
269                    Err(e) => {
270                        warn!("Health check failed for MCP server '{}': {}", server_id, e);
271
272                        // Drop client lock before attempting reconnection
273                        drop(client);
274
275                        // Update status to Degraded
276                        {
277                            let mut info = runtime.info.write().await;
278                            info.status = ServerStatus::Degraded;
279                            info.last_error = Some(e.to_string());
280                        }
281
282                        // Emit degraded event
283                        if let Some(ref tx) = manager.event_tx {
284                            let _ = tx
285                                .send(McpEvent::ServerStatusChanged {
286                                    server_id: server_id.clone(),
287                                    status: ServerStatus::Degraded,
288                                    error: Some(e.to_string()),
289                                })
290                                .await;
291                        }
292
293                        // Attempt reconnection if enabled
294                        if runtime.config.reconnect.enabled {
295                            if let Err(reconnect_err) =
296                                manager.attempt_reconnection(runtime.clone()).await
297                            {
298                                error!(
299                                    "Reconnection failed for MCP server '{}': {}",
300                                    server_id, reconnect_err
301                                );
302                            }
303                        }
304                    }
305                }
306            }
307        });
308    }
309}