1use crate::protocol::message::{
4 ActiveConversationInfo, ActiveConversationList, AgentEventMsg, AgentInfo, AgentList,
5 ClientMessage, CompactResponse, ConversationHistory, ConversationInfo, ConversationList,
6 CreateAgentMsg, CreateCronMsg, CronInfo, CronList, DaemonStats, ErrorMsg, InstallPluginMsg,
7 McpInfo, McpList, ModelInfo, ModelList, PluginEvent, PluginInfo, PluginList, PluginSearchList,
8 Pong, ProviderInfo, ProviderList, ProviderPresetInfo, ProviderPresetList, PublishEventMsg,
9 SendMsg, SendResponse, ServerMessage, ServiceLogOutput, SkillInfo, SkillList, SteerSessionMsg,
10 StreamEvent, StreamMsg, SubscribeEventMsg, SubscriptionInfo, SubscriptionList, UpdateAgentMsg,
11 UpsertMcpMsg, client_message, server_message,
12};
13use anyhow::Result;
14use futures_core::Stream;
15use futures_util::StreamExt;
16
17fn server_error(code: u32, message: String) -> ServerMessage {
19 ServerMessage {
20 msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
21 }
22}
23
24fn server_pong() -> ServerMessage {
26 ServerMessage {
27 msg: Some(server_message::Msg::Pong(Pong {})),
28 }
29}
30
31fn result_to_msg<T: Into<ServerMessage>>(result: Result<T>) -> ServerMessage {
33 match result {
34 Ok(resp) => resp.into(),
35 Err(e) => server_error(500, e.to_string()),
36 }
37}
38
39pub trait Server: Sync {
49 fn send(&self, req: SendMsg) -> impl std::future::Future<Output = Result<SendResponse>> + Send;
51
52 fn stream(&self, req: StreamMsg) -> impl Stream<Item = Result<StreamEvent>> + Send;
54
55 fn ping(&self) -> impl std::future::Future<Output = Result<()>> + Send;
57
58 fn list_conversations_active(
60 &self,
61 ) -> impl std::future::Future<Output = Result<Vec<ActiveConversationInfo>>> + Send;
62
63 fn kill_conversation(
65 &self,
66 agent: String,
67 sender: String,
68 ) -> impl std::future::Future<Output = Result<bool>> + Send;
69
70 fn subscribe_events(&self) -> impl Stream<Item = Result<AgentEventMsg>> + Send;
72
73 fn reload(&self) -> impl std::future::Future<Output = Result<()>> + Send;
75
76 fn get_stats(&self) -> impl std::future::Future<Output = Result<DaemonStats>> + Send;
78
79 fn create_cron(
81 &self,
82 req: CreateCronMsg,
83 ) -> impl std::future::Future<Output = Result<CronInfo>> + Send;
84
85 fn delete_cron(&self, id: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
87
88 fn list_crons(&self) -> impl std::future::Future<Output = Result<CronList>> + Send;
90
91 fn subscribe_event(
93 &self,
94 req: SubscribeEventMsg,
95 ) -> impl std::future::Future<Output = Result<SubscriptionInfo>> + Send;
96
97 fn unsubscribe_event(&self, id: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
99
100 fn list_subscriptions(
102 &self,
103 ) -> impl std::future::Future<Output = Result<SubscriptionList>> + Send;
104
105 fn publish_event(
107 &self,
108 req: PublishEventMsg,
109 ) -> impl std::future::Future<Output = Result<()>> + Send;
110
111 fn compact_conversation(
113 &self,
114 agent: String,
115 sender: String,
116 ) -> impl std::future::Future<Output = Result<String>> + Send;
117
118 fn reply_to_ask(
120 &self,
121 agent: String,
122 sender: String,
123 content: String,
124 ) -> impl std::future::Future<Output = Result<()>> + Send;
125
126 fn steer_session(
128 &self,
129 req: SteerSessionMsg,
130 ) -> impl std::future::Future<Output = Result<()>> + Send;
131
132 fn list_agents(&self) -> impl std::future::Future<Output = Result<Vec<AgentInfo>>> + Send;
134
135 fn get_agent(
137 &self,
138 name: String,
139 ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
140
141 fn create_agent(
143 &self,
144 req: CreateAgentMsg,
145 ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
146
147 fn update_agent(
149 &self,
150 req: UpdateAgentMsg,
151 ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
152
153 fn delete_agent(&self, name: String) -> impl std::future::Future<Output = Result<bool>> + Send;
155
156 fn rename_agent(
158 &self,
159 old_name: String,
160 new_name: String,
161 ) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
162
163 fn list_providers(&self)
165 -> impl std::future::Future<Output = Result<Vec<ProviderInfo>>> + Send;
166
167 fn install_plugin(
169 &self,
170 req: InstallPluginMsg,
171 ) -> impl Stream<Item = Result<PluginEvent>> + Send;
172
173 fn uninstall_plugin(&self, plugin: String) -> impl Stream<Item = Result<PluginEvent>> + Send;
175
176 fn list_plugins(&self) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send;
178
179 fn search_plugins(
181 &self,
182 query: String,
183 ) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send;
184
185 fn list_skills(&self) -> impl std::future::Future<Output = Result<Vec<SkillInfo>>> + Send;
187
188 fn list_models(&self) -> impl std::future::Future<Output = Result<Vec<ModelInfo>>> + Send;
190
191 fn list_conversations(
193 &self,
194 agent: String,
195 sender: String,
196 ) -> impl std::future::Future<Output = Result<Vec<ConversationInfo>>> + Send;
197
198 fn get_conversation_history(
200 &self,
201 file_path: String,
202 ) -> impl std::future::Future<Output = Result<ConversationHistory>> + Send;
203
204 fn delete_conversation(
206 &self,
207 file_path: String,
208 ) -> impl std::future::Future<Output = Result<()>> + Send;
209
210 fn list_mcps(&self) -> impl std::future::Future<Output = Result<Vec<McpInfo>>> + Send;
212
213 fn upsert_mcp(
215 &self,
216 req: UpsertMcpMsg,
217 ) -> impl std::future::Future<Output = Result<McpInfo>> + Send;
218
219 fn delete_mcp(&self, name: String) -> impl std::future::Future<Output = Result<bool>> + Send;
221
222 fn set_provider(
224 &self,
225 name: String,
226 config: String,
227 ) -> impl std::future::Future<Output = Result<ProviderInfo>> + Send;
228
229 fn delete_provider(&self, name: String)
231 -> impl std::future::Future<Output = Result<()>> + Send;
232
233 fn set_active_model(
235 &self,
236 model: String,
237 ) -> impl std::future::Future<Output = Result<()>> + Send;
238
239 fn list_provider_presets(
241 &self,
242 ) -> impl std::future::Future<Output = Result<Vec<ProviderPresetInfo>>> + Send;
243
244 fn start_service(
246 &self,
247 name: String,
248 force: bool,
249 ) -> impl std::future::Future<Output = Result<()>> + Send;
250
251 fn stop_service(&self, name: String) -> impl std::future::Future<Output = Result<()>> + Send;
253
254 fn service_logs(
256 &self,
257 name: String,
258 lines: u32,
259 ) -> impl std::future::Future<Output = Result<String>> + Send;
260
261 fn dispatch_extension(
266 &self,
267 _payload: Vec<u8>,
268 ) -> impl std::future::Future<Output = Result<Vec<u8>>> + Send {
269 async { anyhow::bail!("extension not supported") }
270 }
271
272 fn dispatch(&self, msg: ClientMessage) -> impl Stream<Item = ServerMessage> + Send + '_ {
277 async_stream::stream! {
278 let Some(inner) = msg.msg else {
279 yield server_error(400, "empty client message".to_string());
280 return;
281 };
282
283 match inner {
284 client_message::Msg::Send(send_msg) => {
285 yield result_to_msg(self.send(send_msg).await);
286 }
287 client_message::Msg::Stream(stream_msg) => {
288 let s = self.stream(stream_msg);
289 tokio::pin!(s);
290 while let Some(result) = s.next().await {
291 yield result_to_msg(result);
292 }
293 }
294 client_message::Msg::Ping(_) => {
295 yield match self.ping().await {
296 Ok(()) => server_pong(),
297 Err(e) => server_error(500, e.to_string()),
298 };
299 }
300 client_message::Msg::ListActiveConversations(_req) => {
301 yield match self.list_conversations_active().await {
302 Ok(conversations) => ServerMessage {
303 msg: Some(server_message::Msg::ActiveConversations(ActiveConversationList { conversations })),
304 },
305 Err(e) => server_error(500, e.to_string()),
306 };
307 }
308 client_message::Msg::Kill(kill_msg) => {
309 yield match self.kill_conversation(kill_msg.agent.clone(), kill_msg.sender.clone()).await {
310 Ok(true) => server_pong(),
311 Ok(false) => server_error(
312 404,
313 format!("conversation not found for agent='{}' sender='{}'", kill_msg.agent, kill_msg.sender),
314 ),
315 Err(e) => server_error(500, e.to_string()),
316 };
317 }
318 client_message::Msg::GetConfig(_) => {
319 yield server_error(410, "GetConfig is deprecated — use GetStats and granular APIs".into());
320 }
321 client_message::Msg::SubscribeEvents(_) => {
322 let s = self.subscribe_events();
323 tokio::pin!(s);
324 while let Some(result) = s.next().await {
325 yield result_to_msg(result);
326 }
327 }
328 client_message::Msg::Reload(_) => {
329 yield match self.reload().await {
330 Ok(()) => server_pong(),
331 Err(e) => server_error(500, e.to_string()),
332 };
333 }
334 client_message::Msg::ReplyToAsk(msg) => {
335 yield match self.reply_to_ask(msg.agent, msg.sender, msg.content).await {
336 Ok(()) => server_pong(),
337 Err(e) => server_error(404, e.to_string()),
338 };
339 }
340 client_message::Msg::SteerSession(req) => {
341 yield match self.steer_session(req).await {
342 Ok(()) => server_pong(),
343 Err(e) => server_error(500, e.to_string()),
344 };
345 }
346 client_message::Msg::GetStats(_) => {
347 yield match self.get_stats().await {
348 Ok(stats) => ServerMessage {
349 msg: Some(server_message::Msg::Stats(stats)),
350 },
351 Err(e) => server_error(500, e.to_string()),
352 };
353 }
354 client_message::Msg::CreateCron(req) => {
355 yield match self.create_cron(req).await {
356 Ok(info) => ServerMessage {
357 msg: Some(server_message::Msg::CronInfo(info)),
358 },
359 Err(e) => server_error(500, e.to_string()),
360 };
361 }
362 client_message::Msg::DeleteCron(req) => {
363 yield match self.delete_cron(req.id).await {
364 Ok(true) => server_pong(),
365 Ok(false) => server_error(404, format!("cron {} not found", req.id)),
366 Err(e) => server_error(500, e.to_string()),
367 };
368 }
369 client_message::Msg::ListCrons(_) => {
370 yield match self.list_crons().await {
371 Ok(list) => ServerMessage {
372 msg: Some(server_message::Msg::CronList(list)),
373 },
374 Err(e) => server_error(500, e.to_string()),
375 };
376 }
377 client_message::Msg::SubscribeEvent(req) => {
378 yield match self.subscribe_event(req).await {
379 Ok(info) => ServerMessage {
380 msg: Some(server_message::Msg::SubscriptionInfo(info)),
381 },
382 Err(e) => server_error(500, e.to_string()),
383 };
384 }
385 client_message::Msg::UnsubscribeEvent(req) => {
386 yield match self.unsubscribe_event(req.id).await {
387 Ok(true) => server_pong(),
388 Ok(false) => server_error(404, format!("subscription {} not found", req.id)),
389 Err(e) => server_error(500, e.to_string()),
390 };
391 }
392 client_message::Msg::ListSubscriptions(_) => {
393 yield match self.list_subscriptions().await {
394 Ok(list) => ServerMessage {
395 msg: Some(server_message::Msg::SubscriptionList(list)),
396 },
397 Err(e) => server_error(500, e.to_string()),
398 };
399 }
400 client_message::Msg::PublishEvent(req) => {
401 yield match self.publish_event(req).await {
402 Ok(()) => server_pong(),
403 Err(e) => server_error(500, e.to_string()),
404 };
405 }
406 client_message::Msg::Compact(req) => {
407 yield match self.compact_conversation(req.agent, req.sender).await {
408 Ok(summary) => ServerMessage {
409 msg: Some(server_message::Msg::Compact(CompactResponse { summary })),
410 },
411 Err(e) => server_error(500, e.to_string()),
412 };
413 }
414 client_message::Msg::ListAgents(_) => {
415 yield match self.list_agents().await {
416 Ok(agents) => ServerMessage {
417 msg: Some(server_message::Msg::AgentList(AgentList { agents })),
418 },
419 Err(e) => server_error(500, e.to_string()),
420 };
421 }
422 client_message::Msg::GetAgent(req) => {
423 yield match self.get_agent(req.name).await {
424 Ok(info) => ServerMessage {
425 msg: Some(server_message::Msg::AgentInfo(info)),
426 },
427 Err(e) => server_error(404, e.to_string()),
428 };
429 }
430 client_message::Msg::CreateAgent(req) => {
431 yield match self.create_agent(req).await {
432 Ok(info) => ServerMessage {
433 msg: Some(server_message::Msg::AgentInfo(info)),
434 },
435 Err(e) => server_error(500, e.to_string()),
436 };
437 }
438 client_message::Msg::UpdateAgent(req) => {
439 yield match self.update_agent(req).await {
440 Ok(info) => ServerMessage {
441 msg: Some(server_message::Msg::AgentInfo(info)),
442 },
443 Err(e) => server_error(500, e.to_string()),
444 };
445 }
446 client_message::Msg::DeleteAgent(req) => {
447 yield match self.delete_agent(req.name.clone()).await {
448 Ok(true) => server_pong(),
449 Ok(false) => server_error(
450 404,
451 format!("agent '{}' not found in local manifest", req.name),
452 ),
453 Err(e) => server_error(500, e.to_string()),
454 };
455 }
456 client_message::Msg::RenameAgent(req) => {
457 yield match self.rename_agent(req.old_name, req.new_name).await {
458 Ok(info) => ServerMessage {
459 msg: Some(server_message::Msg::AgentInfo(info)),
460 },
461 Err(e) => server_error(500, e.to_string()),
462 };
463 }
464 client_message::Msg::ListProviders(_) => {
465 yield match self.list_providers().await {
466 Ok(providers) => ServerMessage {
467 msg: Some(server_message::Msg::ProviderList(ProviderList {
468 providers,
469 })),
470 },
471 Err(e) => server_error(500, e.to_string()),
472 };
473 }
474 client_message::Msg::InstallPlugin(req) => {
475 let s = self.install_plugin(req);
476 tokio::pin!(s);
477 while let Some(result) = s.next().await {
478 yield result_to_msg(result);
479 }
480 }
481 client_message::Msg::UninstallPlugin(req) => {
482 let s = self.uninstall_plugin(req.plugin);
483 tokio::pin!(s);
484 while let Some(result) = s.next().await {
485 yield result_to_msg(result);
486 }
487 }
488 client_message::Msg::ListPlugins(_) => {
489 yield match self.list_plugins().await {
490 Ok(plugins) => ServerMessage {
491 msg: Some(server_message::Msg::PluginList(PluginList {
492 plugins,
493 })),
494 },
495 Err(e) => server_error(500, e.to_string()),
496 };
497 }
498 client_message::Msg::SearchPlugins(req) => {
499 yield match self.search_plugins(req.query).await {
500 Ok(plugins) => ServerMessage {
501 msg: Some(server_message::Msg::PluginSearchList(PluginSearchList {
502 plugins,
503 })),
504 },
505 Err(e) => server_error(500, e.to_string()),
506 };
507 }
508 client_message::Msg::StartService(req) => {
509 yield match self.start_service(req.name, req.force).await {
510 Ok(()) => server_pong(),
511 Err(e) => server_error(500, e.to_string()),
512 };
513 }
514 client_message::Msg::StopService(req) => {
515 yield match self.stop_service(req.name).await {
516 Ok(()) => server_pong(),
517 Err(e) => server_error(500, e.to_string()),
518 };
519 }
520 client_message::Msg::ServiceLogs(req) => {
521 yield match self.service_logs(req.name, req.lines).await {
522 Ok(content) => ServerMessage {
523 msg: Some(server_message::Msg::ServiceLogOutput(
524 ServiceLogOutput { content },
525 )),
526 },
527 Err(e) => server_error(500, e.to_string()),
528 };
529 }
530 client_message::Msg::ListSkills(_) => {
531 yield match self.list_skills().await {
532 Ok(skills) => ServerMessage {
533 msg: Some(server_message::Msg::SkillList(SkillList { skills })),
534 },
535 Err(e) => server_error(500, e.to_string()),
536 };
537 }
538 client_message::Msg::ListModels(_) => {
539 yield match self.list_models().await {
540 Ok(models) => ServerMessage {
541 msg: Some(server_message::Msg::ModelList(ModelList { models })),
542 },
543 Err(e) => server_error(500, e.to_string()),
544 };
545 }
546 client_message::Msg::ListConversations(req) => {
547 yield match self.list_conversations(req.agent, req.sender).await {
548 Ok(conversations) => ServerMessage {
549 msg: Some(server_message::Msg::ConversationList(ConversationList {
550 conversations,
551 })),
552 },
553 Err(e) => server_error(500, e.to_string()),
554 };
555 }
556 client_message::Msg::GetConversationHistory(req) => {
557 yield result_to_msg(self.get_conversation_history(req.file_path).await);
558 }
559 client_message::Msg::DeleteConversation(req) => {
560 yield match self.delete_conversation(req.file_path).await {
561 Ok(()) => server_pong(),
562 Err(e) => server_error(500, e.to_string()),
563 };
564 }
565 client_message::Msg::ListMcps(_) => {
566 yield match self.list_mcps().await {
567 Ok(mcps) => ServerMessage {
568 msg: Some(server_message::Msg::McpList(McpList { mcps })),
569 },
570 Err(e) => server_error(500, e.to_string()),
571 };
572 }
573 client_message::Msg::UpsertMcp(req) => {
574 yield match self.upsert_mcp(req).await {
575 Ok(info) => ServerMessage {
576 msg: Some(server_message::Msg::McpInfo(info)),
577 },
578 Err(e) => server_error(500, e.to_string()),
579 };
580 }
581 client_message::Msg::DeleteMcp(req) => {
582 yield match self.delete_mcp(req.name.clone()).await {
583 Ok(true) => server_pong(),
584 Ok(false) => server_error(404, format!("mcp '{}' not found", req.name)),
585 Err(e) => server_error(500, e.to_string()),
586 };
587 }
588 client_message::Msg::SetProvider(req) => {
589 yield match self.set_provider(req.name, req.config).await {
590 Ok(info) => ServerMessage {
591 msg: Some(server_message::Msg::ProviderList(ProviderList {
592 providers: vec![info],
593 })),
594 },
595 Err(e) => server_error(500, e.to_string()),
596 };
597 }
598 client_message::Msg::DeleteProvider(req) => {
599 yield match self.delete_provider(req.name.clone()).await {
600 Ok(()) => server_pong(),
601 Err(e) => server_error(500, e.to_string()),
602 };
603 }
604 client_message::Msg::SetActiveModel(req) => {
605 yield match self.set_active_model(req.model).await {
606 Ok(()) => server_pong(),
607 Err(e) => server_error(500, e.to_string()),
608 };
609 }
610 client_message::Msg::ListProviderPresets(_) => {
611 yield match self.list_provider_presets().await {
612 Ok(presets) => ServerMessage {
613 msg: Some(server_message::Msg::ProviderPresetList(
614 ProviderPresetList { presets },
615 )),
616 },
617 Err(e) => server_error(500, e.to_string()),
618 };
619 }
620 client_message::Msg::Extension(payload) => {
621 yield match self.dispatch_extension(payload).await {
622 Ok(response) => ServerMessage {
623 msg: Some(server_message::Msg::Extension(response)),
624 },
625 Err(e) => server_error(500, e.to_string()),
626 };
627 }
628 }
629 }
630 }
631}