use crate::protocol::message::{
ActiveConversationInfo, ActiveConversationList, AgentEventMsg, AgentInfo, AgentList,
ClientMessage, CompactResponse, ConversationHistory, ConversationInfo, ConversationList,
CreateAgentMsg, CreateCronMsg, CronInfo, CronList, DaemonStats, ErrorMsg, InstallPluginMsg,
McpInfo, McpList, ModelInfo, ModelList, PluginEvent, PluginInfo, PluginList, PluginSearchList,
Pong, ProviderInfo, ProviderList, ProviderPresetInfo, ProviderPresetList, PublishEventMsg,
ResourceKind, SendMsg, SendResponse, ServerMessage, ServiceLogOutput, SkillInfo, SkillList,
SteerSessionMsg, StreamEvent, StreamMsg, SubscribeEventMsg, SubscriptionInfo, SubscriptionList,
UpdateAgentMsg, client_message, server_message,
};
use anyhow::Result;
use futures_core::Stream;
use futures_util::StreamExt;
fn server_error(code: u32, message: String) -> ServerMessage {
ServerMessage {
msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
}
}
fn server_pong() -> ServerMessage {
ServerMessage {
msg: Some(server_message::Msg::Pong(Pong {})),
}
}
fn result_to_msg<T: Into<ServerMessage>>(result: Result<T>) -> ServerMessage {
match result {
Ok(resp) => resp.into(),
Err(e) => server_error(500, e.to_string()),
}
}
pub trait Server: Sync {
fn send(&self, req: SendMsg) -> impl std::future::Future<Output = Result<SendResponse>> + Send;
fn stream(&self, req: StreamMsg) -> impl Stream<Item = Result<StreamEvent>> + Send;
fn ping(&self) -> impl std::future::Future<Output = Result<()>> + Send;
fn list_conversations_active(
&self,
) -> impl std::future::Future<Output = Result<Vec<ActiveConversationInfo>>> + Send;
fn kill_conversation(
&self,
agent: String,
sender: String,
) -> impl std::future::Future<Output = Result<bool>> + Send;
fn subscribe_events(&self) -> impl Stream<Item = Result<AgentEventMsg>> + Send;
fn reload(&self) -> impl std::future::Future<Output = Result<()>> + Send;
fn get_stats(&self) -> impl std::future::Future<Output = Result<DaemonStats>> + Send;
fn create_cron(
&self,
req: CreateCronMsg,
) -> impl std::future::Future<Output = Result<CronInfo>> + Send;
fn delete_cron(&self, id: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
fn list_crons(&self) -> impl std::future::Future<Output = Result<CronList>> + Send;
fn subscribe_event(
&self,
req: SubscribeEventMsg,
) -> impl std::future::Future<Output = Result<SubscriptionInfo>> + Send;
fn unsubscribe_event(&self, id: u64) -> impl std::future::Future<Output = Result<bool>> + Send;
fn list_subscriptions(
&self,
) -> impl std::future::Future<Output = Result<SubscriptionList>> + Send;
fn publish_event(
&self,
req: PublishEventMsg,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn compact_conversation(
&self,
agent: String,
sender: String,
) -> impl std::future::Future<Output = Result<String>> + Send;
fn reply_to_ask(
&self,
agent: String,
sender: String,
content: String,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn steer_session(
&self,
req: SteerSessionMsg,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn list_agents(&self) -> impl std::future::Future<Output = Result<Vec<AgentInfo>>> + Send;
fn get_agent(
&self,
name: String,
) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
fn create_agent(
&self,
req: CreateAgentMsg,
) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
fn update_agent(
&self,
req: UpdateAgentMsg,
) -> impl std::future::Future<Output = Result<AgentInfo>> + Send;
fn delete_agent(&self, name: String) -> impl std::future::Future<Output = Result<bool>> + Send;
fn list_providers(&self)
-> impl std::future::Future<Output = Result<Vec<ProviderInfo>>> + Send;
fn install_plugin(
&self,
req: InstallPluginMsg,
) -> impl Stream<Item = Result<PluginEvent>> + Send;
fn uninstall_plugin(&self, plugin: String) -> impl Stream<Item = Result<PluginEvent>> + Send;
fn list_plugins(&self) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send;
fn search_plugins(
&self,
query: String,
) -> impl std::future::Future<Output = Result<Vec<PluginInfo>>> + Send;
fn list_skills(&self) -> impl std::future::Future<Output = Result<Vec<SkillInfo>>> + Send;
fn list_models(&self) -> impl std::future::Future<Output = Result<Vec<ModelInfo>>> + Send;
fn set_enabled(
&self,
kind: ResourceKind,
name: String,
enabled: bool,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn list_conversations(
&self,
agent: String,
sender: String,
) -> impl std::future::Future<Output = Result<Vec<ConversationInfo>>> + Send;
fn get_conversation_history(
&self,
file_path: String,
) -> impl std::future::Future<Output = Result<ConversationHistory>> + Send;
fn delete_conversation(
&self,
file_path: String,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn list_mcps(&self) -> impl std::future::Future<Output = Result<Vec<McpInfo>>> + Send;
fn set_local_mcps(
&self,
mcps: Vec<McpInfo>,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn set_provider(
&self,
name: String,
config: String,
) -> impl std::future::Future<Output = Result<ProviderInfo>> + Send;
fn delete_provider(&self, name: String)
-> impl std::future::Future<Output = Result<()>> + Send;
fn set_active_model(
&self,
model: String,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn list_provider_presets(
&self,
) -> impl std::future::Future<Output = Result<Vec<ProviderPresetInfo>>> + Send;
fn start_service(
&self,
name: String,
force: bool,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn stop_service(&self, name: String) -> impl std::future::Future<Output = Result<()>> + Send;
fn service_logs(
&self,
name: String,
lines: u32,
) -> impl std::future::Future<Output = Result<String>> + Send;
fn dispatch_extension(
&self,
_payload: Vec<u8>,
) -> impl std::future::Future<Output = Result<Vec<u8>>> + Send {
async { anyhow::bail!("extension not supported") }
}
fn dispatch(&self, msg: ClientMessage) -> impl Stream<Item = ServerMessage> + Send + '_ {
async_stream::stream! {
let Some(inner) = msg.msg else {
yield server_error(400, "empty client message".to_string());
return;
};
match inner {
client_message::Msg::Send(send_msg) => {
yield result_to_msg(self.send(send_msg).await);
}
client_message::Msg::Stream(stream_msg) => {
let s = self.stream(stream_msg);
tokio::pin!(s);
while let Some(result) = s.next().await {
yield result_to_msg(result);
}
}
client_message::Msg::Ping(_) => {
yield match self.ping().await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListActiveConversations(_req) => {
yield match self.list_conversations_active().await {
Ok(conversations) => ServerMessage {
msg: Some(server_message::Msg::ActiveConversations(ActiveConversationList { conversations })),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::Kill(kill_msg) => {
yield match self.kill_conversation(kill_msg.agent.clone(), kill_msg.sender.clone()).await {
Ok(true) => server_pong(),
Ok(false) => server_error(
404,
format!("conversation not found for agent='{}' sender='{}'", kill_msg.agent, kill_msg.sender),
),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::GetConfig(_) => {
yield server_error(410, "GetConfig is deprecated — use GetStats and granular APIs".into());
}
client_message::Msg::SubscribeEvents(_) => {
let s = self.subscribe_events();
tokio::pin!(s);
while let Some(result) = s.next().await {
yield result_to_msg(result);
}
}
client_message::Msg::Reload(_) => {
yield match self.reload().await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ReplyToAsk(msg) => {
yield match self.reply_to_ask(msg.agent, msg.sender, msg.content).await {
Ok(()) => server_pong(),
Err(e) => server_error(404, e.to_string()),
};
}
client_message::Msg::SteerSession(req) => {
yield match self.steer_session(req).await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::GetStats(_) => {
yield match self.get_stats().await {
Ok(stats) => ServerMessage {
msg: Some(server_message::Msg::Stats(stats)),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::CreateCron(req) => {
yield match self.create_cron(req).await {
Ok(info) => ServerMessage {
msg: Some(server_message::Msg::CronInfo(info)),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::DeleteCron(req) => {
yield match self.delete_cron(req.id).await {
Ok(true) => server_pong(),
Ok(false) => server_error(404, format!("cron {} not found", req.id)),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListCrons(_) => {
yield match self.list_crons().await {
Ok(list) => ServerMessage {
msg: Some(server_message::Msg::CronList(list)),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::SubscribeEvent(req) => {
yield match self.subscribe_event(req).await {
Ok(info) => ServerMessage {
msg: Some(server_message::Msg::SubscriptionInfo(info)),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::UnsubscribeEvent(req) => {
yield match self.unsubscribe_event(req.id).await {
Ok(true) => server_pong(),
Ok(false) => server_error(404, format!("subscription {} not found", req.id)),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListSubscriptions(_) => {
yield match self.list_subscriptions().await {
Ok(list) => ServerMessage {
msg: Some(server_message::Msg::SubscriptionList(list)),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::PublishEvent(req) => {
yield match self.publish_event(req).await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::Compact(req) => {
yield match self.compact_conversation(req.agent, req.sender).await {
Ok(summary) => ServerMessage {
msg: Some(server_message::Msg::Compact(CompactResponse { summary })),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListAgents(_) => {
yield match self.list_agents().await {
Ok(agents) => ServerMessage {
msg: Some(server_message::Msg::AgentList(AgentList { agents })),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::GetAgent(req) => {
yield match self.get_agent(req.name).await {
Ok(info) => ServerMessage {
msg: Some(server_message::Msg::AgentInfo(info)),
},
Err(e) => server_error(404, e.to_string()),
};
}
client_message::Msg::CreateAgent(req) => {
yield match self.create_agent(req).await {
Ok(info) => ServerMessage {
msg: Some(server_message::Msg::AgentInfo(info)),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::UpdateAgent(req) => {
yield match self.update_agent(req).await {
Ok(info) => ServerMessage {
msg: Some(server_message::Msg::AgentInfo(info)),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::DeleteAgent(req) => {
yield match self.delete_agent(req.name.clone()).await {
Ok(true) => server_pong(),
Ok(false) => server_error(
404,
format!("agent '{}' not found in local manifest", req.name),
),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListProviders(_) => {
yield match self.list_providers().await {
Ok(providers) => ServerMessage {
msg: Some(server_message::Msg::ProviderList(ProviderList {
providers,
})),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::InstallPlugin(req) => {
let s = self.install_plugin(req);
tokio::pin!(s);
while let Some(result) = s.next().await {
yield result_to_msg(result);
}
}
client_message::Msg::UninstallPlugin(req) => {
let s = self.uninstall_plugin(req.plugin);
tokio::pin!(s);
while let Some(result) = s.next().await {
yield result_to_msg(result);
}
}
client_message::Msg::ListPlugins(_) => {
yield match self.list_plugins().await {
Ok(plugins) => ServerMessage {
msg: Some(server_message::Msg::PluginList(PluginList {
plugins,
})),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::SearchPlugins(req) => {
yield match self.search_plugins(req.query).await {
Ok(plugins) => ServerMessage {
msg: Some(server_message::Msg::PluginSearchList(PluginSearchList {
plugins,
})),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::StartService(req) => {
yield match self.start_service(req.name, req.force).await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::StopService(req) => {
yield match self.stop_service(req.name).await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ServiceLogs(req) => {
yield match self.service_logs(req.name, req.lines).await {
Ok(content) => ServerMessage {
msg: Some(server_message::Msg::ServiceLogOutput(
ServiceLogOutput { content },
)),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListSkills(_) => {
yield match self.list_skills().await {
Ok(skills) => ServerMessage {
msg: Some(server_message::Msg::SkillList(SkillList { skills })),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListModels(_) => {
yield match self.list_models().await {
Ok(models) => ServerMessage {
msg: Some(server_message::Msg::ModelList(ModelList { models })),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::SetEnabled(req) => {
let kind = ResourceKind::try_from(req.kind)
.unwrap_or(ResourceKind::Unknown);
yield match self.set_enabled(kind, req.name, req.enabled).await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListConversations(req) => {
yield match self.list_conversations(req.agent, req.sender).await {
Ok(conversations) => ServerMessage {
msg: Some(server_message::Msg::ConversationList(ConversationList {
conversations,
})),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::GetConversationHistory(req) => {
yield result_to_msg(self.get_conversation_history(req.file_path).await);
}
client_message::Msg::DeleteConversation(req) => {
yield match self.delete_conversation(req.file_path).await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListMcps(_) => {
yield match self.list_mcps().await {
Ok(mcps) => ServerMessage {
msg: Some(server_message::Msg::McpList(McpList { mcps })),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::SetLocalMcps(req) => {
yield match self.set_local_mcps(req.mcps).await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::SetProvider(req) => {
yield match self.set_provider(req.name, req.config).await {
Ok(info) => ServerMessage {
msg: Some(server_message::Msg::ProviderList(ProviderList {
providers: vec![info],
})),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::DeleteProvider(req) => {
yield match self.delete_provider(req.name.clone()).await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::SetActiveModel(req) => {
yield match self.set_active_model(req.model).await {
Ok(()) => server_pong(),
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::ListProviderPresets(_) => {
yield match self.list_provider_presets().await {
Ok(presets) => ServerMessage {
msg: Some(server_message::Msg::ProviderPresetList(
ProviderPresetList { presets },
)),
},
Err(e) => server_error(500, e.to_string()),
};
}
client_message::Msg::Extension(payload) => {
yield match self.dispatch_extension(payload).await {
Ok(response) => ServerMessage {
msg: Some(server_message::Msg::Extension(response)),
},
Err(e) => server_error(500, e.to_string()),
};
}
}
}
}
}