bamboo_engine/mcp/manager/
lifecycle.rs1use tokio::time::{interval, Duration};
2
3use super::fingerprint::desired_proxy_fingerprint;
4use super::*;
5
6impl McpServerManager {
7 pub async fn start_server(&self, config: McpServerConfig) -> Result<()> {
9 let server_id = config.id.clone();
10
11 if self.runtimes.contains_key(&server_id) {
12 return Err(McpError::AlreadyRunning(server_id));
13 }
14
15 info!("Starting MCP server '{}'", server_id);
16
17 let runtime_proxy_fingerprint = desired_proxy_fingerprint(self.config.as_ref()).await;
18 let (client, tools) = self
19 .bootstrap_server_client(&server_id, &config, "start")
20 .await?;
21
22 let runtime = Arc::new(ServerRuntime {
24 config: config.clone(),
25 client: RwLock::new(client),
26 info: RwLock::new(RuntimeInfo {
27 status: ServerStatus::Ready,
28 last_error: None,
29 connected_at: Some(Utc::now()),
30 disconnected_at: None,
31 tool_count: tools.len(),
32 restart_count: 0,
33 last_ping_at: Some(Utc::now()),
34 }),
35 tools: RwLock::new(tools.clone()),
36 shutdown: AtomicBool::new(false),
37 reconnecting: AtomicBool::new(false),
38 qos: McpServerQos::new(McpQosConfig::default()),
39 proxy_fingerprint: runtime_proxy_fingerprint,
40 });
41
42 let aliases = self.index.register_server_tools(
44 &server_id,
45 &tools,
46 &config.allowed_tools,
47 &config.denied_tools,
48 );
49
50 info!(
51 "Registered {} MCP tools for server '{}'",
52 aliases.len(),
53 server_id
54 );
55
56 self.runtimes.insert(server_id.clone(), runtime.clone());
58
59 if let Some(ref tx) = self.event_tx {
61 let _ = tx
62 .send(McpEvent::ServerStatusChanged {
63 server_id: server_id.clone(),
64 status: ServerStatus::Ready,
65 error: None,
66 })
67 .await;
68
69 let tool_names: Vec<String> = aliases.into_iter().map(|a| a.alias).collect();
70 let _ = tx
71 .send(McpEvent::ToolsChanged {
72 server_id,
73 tools: tool_names,
74 })
75 .await;
76 }
77
78 self.start_health_check(runtime, config.healthcheck_interval_ms);
80
81 Ok(())
82 }
83
84 pub async fn stop_server(&self, server_id: &str) -> Result<()> {
86 let (_, runtime) = self
87 .runtimes
88 .remove(server_id)
89 .ok_or_else(|| McpError::NotRunning(server_id.to_string()))?;
90
91 info!("Stopping MCP server '{}'", server_id);
92
93 runtime.shutdown.store(true, Ordering::SeqCst);
94
95 let mut client = runtime.client.write().await;
97 if let Err(e) = client.disconnect().await {
98 warn!("Error disconnecting MCP server '{}': {}", server_id, e);
99 }
100
101 let mut info = runtime.info.write().await;
103 info.status = ServerStatus::Stopped;
104 info.disconnected_at = Some(Utc::now());
105
106 self.index.remove_server_tools(server_id);
108
109 if let Some(ref tx) = self.event_tx {
111 let _ = tx
112 .send(McpEvent::ServerStatusChanged {
113 server_id: server_id.to_string(),
114 status: ServerStatus::Stopped,
115 error: None,
116 })
117 .await;
118 }
119
120 info!("MCP server '{}' stopped", server_id);
121 Ok(())
122 }
123
124 pub async fn call_tool(
126 &self,
127 server_id: &str,
128 tool_name: &str,
129 args: serde_json::Value,
130 ) -> Result<crate::mcp::types::McpCallResult> {
131 let runtime = self
132 .runtimes
133 .get(server_id)
134 .ok_or_else(|| McpError::ServerNotFound(server_id.to_string()))?;
135
136 runtime.qos.check_circuit(server_id, tool_name).await?;
137 let _permit = runtime.qos.acquire_permit().await?;
138
139 let client = runtime.client.read().await;
140 let timeout = runtime.config.request_timeout_ms;
141 let result = client.call_tool(tool_name, args, timeout).await;
142 drop(client);
143
144 let result = match result {
145 Ok(result) => {
146 runtime.qos.record_success().await;
147 result
148 }
149 Err(error) => {
150 runtime
151 .qos
152 .record_failure(server_id, tool_name, &error)
153 .await;
154 return Err(error);
155 }
156 };
157
158 if let Some(ref tx) = self.event_tx {
160 let _ = tx
161 .send(McpEvent::ToolExecuted {
162 server_id: server_id.to_string(),
163 tool_name: tool_name.to_string(),
164 success: !result.is_error,
165 })
166 .await;
167 }
168
169 Ok(result)
170 }
171
172 pub fn get_tool_info(&self, server_id: &str, tool_name: &str) -> Option<McpTool> {
174 self.runtimes.get(server_id).and_then(|runtime| {
175 let tools = runtime.tools.try_read().ok()?;
176 tools.iter().find(|t| t.name == tool_name).cloned()
177 })
178 }
179
180 pub async fn refresh_tools(&self, server_id: &str) -> Result<()> {
182 let runtime = self
183 .runtimes
184 .get(server_id)
185 .ok_or_else(|| McpError::ServerNotFound(server_id.to_string()))?;
186
187 info!("Refreshing tools for MCP server '{}'", server_id);
188
189 let client = runtime.client.read().await;
190 let new_tools = client.list_tools(runtime.config.request_timeout_ms).await?;
191 drop(client);
192
193 let mut tools = runtime.tools.write().await;
195 *tools = new_tools.clone();
196 drop(tools);
197
198 let mut info = runtime.info.write().await;
200 info.tool_count = new_tools.len();
201
202 self.index.remove_server_tools(server_id);
204 let aliases = self.index.register_server_tools(
205 server_id,
206 &new_tools,
207 &runtime.config.allowed_tools,
208 &runtime.config.denied_tools,
209 );
210
211 info!(
212 "Refreshed {} tools for MCP server '{}'",
213 aliases.len(),
214 server_id
215 );
216
217 if let Some(ref tx) = self.event_tx {
219 let tool_names: Vec<String> = aliases.into_iter().map(|a| a.alias).collect();
220 let _ = tx
221 .send(McpEvent::ToolsChanged {
222 server_id: server_id.to_string(),
223 tools: tool_names,
224 })
225 .await;
226 }
227
228 Ok(())
229 }
230
231 fn start_health_check(&self, runtime: Arc<ServerRuntime>, interval_ms: u64) {
232 let server_id = runtime.config.id.clone();
233 let manager = Arc::new(self.clone());
234
235 tokio::spawn(async move {
236 let mut interval = interval(Duration::from_millis(interval_ms));
237
238 loop {
239 interval.tick().await;
240
241 if runtime.shutdown.load(Ordering::SeqCst) {
242 break;
243 }
244
245 if runtime.reconnecting.load(Ordering::SeqCst) {
247 continue;
248 }
249
250 let client = runtime.client.read().await;
251 match client.ping(runtime.config.request_timeout_ms).await {
252 Ok(_) => {
253 let mut info = runtime.info.write().await;
254 info.last_ping_at = Some(Utc::now());
255 if info.status == ServerStatus::Degraded {
256 info.status = ServerStatus::Ready;
257 if let Some(ref tx) = manager.event_tx {
259 let _ = tx
260 .send(McpEvent::ServerStatusChanged {
261 server_id: server_id.clone(),
262 status: ServerStatus::Ready,
263 error: None,
264 })
265 .await;
266 }
267 }
268 }
269 Err(e) => {
270 warn!("Health check failed for MCP server '{}': {}", server_id, e);
271
272 drop(client);
274
275 {
277 let mut info = runtime.info.write().await;
278 info.status = ServerStatus::Degraded;
279 info.last_error = Some(e.to_string());
280 }
281
282 if let Some(ref tx) = manager.event_tx {
284 let _ = tx
285 .send(McpEvent::ServerStatusChanged {
286 server_id: server_id.clone(),
287 status: ServerStatus::Degraded,
288 error: Some(e.to_string()),
289 })
290 .await;
291 }
292
293 if runtime.config.reconnect.enabled {
295 if let Err(reconnect_err) =
296 manager.attempt_reconnection(runtime.clone()).await
297 {
298 error!(
299 "Reconnection failed for MCP server '{}': {}",
300 server_id, reconnect_err
301 );
302 }
303 }
304 }
305 }
306 }
307 });
308 }
309}