crabtalk_daemon/daemon/
protocol.rs1use crate::daemon::Daemon;
4use anyhow::{Context, Result};
5use futures_util::{StreamExt, pin_mut};
6use std::sync::Arc;
7use wcore::AgentEvent;
8use wcore::protocol::{
9 api::Server,
10 message::{
11 AgentEventMsg, AskOption, AskQuestion, AskUserEvent, SendMsg, SendResponse, SessionInfo,
12 StreamChunk, StreamEnd, StreamEvent, StreamMsg, StreamStart, StreamThinking, ToolCallInfo,
13 ToolResultEvent, ToolStartEvent, ToolsCompleteEvent, stream_event,
14 },
15};
16
17impl Server for Daemon {
18 async fn send(&self, req: SendMsg) -> Result<SendResponse> {
19 let rt: Arc<_> = self.runtime.read().await.clone();
20 let sender = req.sender.as_deref().unwrap_or("");
21 let created_by = if sender.is_empty() { "user" } else { sender };
22 let cwd = req.cwd.map(std::path::PathBuf::from);
23 let (session_id, is_new) = match req.session {
24 Some(id) => (id, false),
25 None => {
26 let id = rt.create_session(&req.agent, created_by).await?;
27 if let Some(ref cwd) = cwd {
28 rt.hook.session_cwds.lock().await.insert(id, cwd.clone());
29 }
30 (id, true)
31 }
32 };
33 let response = rt.send_to(session_id, &req.content, sender).await?;
34 if is_new {
35 rt.hook.session_cwds.lock().await.remove(&session_id);
36 rt.close_session(session_id).await;
37 }
38 Ok(SendResponse {
39 agent: req.agent,
40 content: response.final_response.unwrap_or_default(),
41 session: session_id,
42 })
43 }
44
45 fn stream(
46 &self,
47 req: StreamMsg,
48 ) -> impl futures_core::Stream<Item = Result<StreamEvent>> + Send {
49 let runtime = self.runtime.clone();
50 let agent = req.agent;
51 let content = req.content;
52 let req_session = req.session;
53 let sender = req.sender.unwrap_or_default();
54 let cwd = req.cwd.map(std::path::PathBuf::from);
55 async_stream::try_stream! {
56 let rt: Arc<_> = runtime.read().await.clone();
57 let created_by = if sender.is_empty() { "user".into() } else { sender.clone() };
58 let (session_id, is_new) = match req_session {
59 Some(id) => (id, false),
60 None => {
61 let id = rt.create_session(&agent, created_by.as_str()).await?;
62 if let Some(ref cwd) = cwd {
63 rt.hook.session_cwds.lock().await.insert(id, cwd.clone());
64 }
65 (id, true)
66 }
67 };
68
69 yield StreamEvent { event: Some(stream_event::Event::Start(StreamStart { agent: agent.clone(), session: session_id })) };
70
71 let stream = rt.stream_to(session_id, &content, &sender);
72 pin_mut!(stream);
73 while let Some(event) = stream.next().await {
74 match event {
75 AgentEvent::TextDelta(text) => {
76 yield StreamEvent { event: Some(stream_event::Event::Chunk(StreamChunk { content: text })) };
77 }
78 AgentEvent::ThinkingDelta(text) => {
79 yield StreamEvent { event: Some(stream_event::Event::Thinking(StreamThinking { content: text })) };
80 }
81 AgentEvent::ToolCallsStart(calls) => {
82 let ask_questions: Vec<AskQuestion> = calls
84 .iter()
85 .filter(|c| c.function.name == "ask_user")
86 .filter_map(|c| {
87 serde_json::from_str::<crate::hook::system::ask_user::AskUser>(&c.function.arguments)
88 .ok()
89 })
90 .flat_map(|a| a.questions)
91 .map(|q| AskQuestion {
92 question: q.question,
93 header: q.header,
94 options: q.options.into_iter().map(|o| AskOption {
95 label: o.label,
96 description: o.description,
97 }).collect(),
98 multi_select: q.multi_select,
99 })
100 .collect();
101
102 yield StreamEvent { event: Some(stream_event::Event::ToolStart(ToolStartEvent {
103 calls: calls.into_iter().map(|c| ToolCallInfo {
104 name: c.function.name.to_string(),
105 arguments: c.function.arguments,
106 }).collect(),
107 })) };
108
109 if !ask_questions.is_empty() {
110 yield StreamEvent { event: Some(stream_event::Event::AskUser(AskUserEvent { questions: ask_questions })) };
111 }
112 }
113 AgentEvent::ToolResult { call_id, output } => {
114 yield StreamEvent { event: Some(stream_event::Event::ToolResult(ToolResultEvent { call_id: call_id.to_string(), output })) };
115 }
116 AgentEvent::ToolCallsComplete => {
117 yield StreamEvent { event: Some(stream_event::Event::ToolsComplete(ToolsCompleteEvent {})) };
118 }
119 AgentEvent::Compact { .. } => {
120 }
122 AgentEvent::Done(resp) => {
123 if let wcore::AgentStopReason::Error(e) = &resp.stop_reason {
124 if is_new {
125 rt.hook.session_cwds.lock().await.remove(&session_id);
126 rt.close_session(session_id).await;
127 }
128 Err(anyhow::anyhow!("{e}"))?;
129 }
130 break;
131 }
132 }
133 }
134 yield StreamEvent { event: Some(stream_event::Event::End(StreamEnd { agent: agent.clone() })) };
135 }
136 }
137
138 async fn ping(&self) -> Result<()> {
139 Ok(())
140 }
141
142 async fn list_sessions(&self) -> Result<Vec<SessionInfo>> {
143 let rt = self.runtime.read().await.clone();
144 let sessions = rt.sessions().await;
145 let mut infos = Vec::with_capacity(sessions.len());
146 for s in sessions {
147 let s = s.lock().await;
148 let active = rt.is_active(s.id).await;
149 infos.push(SessionInfo {
150 id: s.id,
151 agent: s.agent.to_string(),
152 created_by: s.created_by.to_string(),
153 message_count: s.history.len() as u64,
154 alive_secs: s.created_at.elapsed().as_secs(),
155 active,
156 });
157 }
158 Ok(infos)
159 }
160
161 async fn kill_session(&self, session: u64) -> Result<bool> {
162 let rt = self.runtime.read().await.clone();
163 rt.hook.pending_asks.lock().await.remove(&session);
165 rt.hook.session_cwds.lock().await.remove(&session);
166 Ok(rt.close_session(session).await)
167 }
168
169 fn subscribe_events(&self) -> impl futures_core::Stream<Item = Result<AgentEventMsg>> + Send {
170 let runtime = self.runtime.clone();
171 async_stream::try_stream! {
172 let rt = runtime.read().await.clone();
173 let mut rx = rt.hook.subscribe_events();
174 loop {
175 match rx.recv().await {
176 Ok(event) => yield event,
177 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
178 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
179 }
180 }
181 }
182 }
183
184 async fn get_config(&self) -> Result<String> {
185 let config = self.load_config()?;
186 serde_json::to_string(&config).context("failed to serialize config")
187 }
188
189 async fn set_config(&self, config: String) -> Result<()> {
190 let parsed: crate::DaemonConfig =
191 serde_json::from_str(&config).context("invalid DaemonConfig JSON")?;
192 let toml_str =
193 toml::to_string_pretty(&parsed).context("failed to serialize config to TOML")?;
194 let config_path = self.config_dir.join(wcore::paths::CONFIG_FILE);
195 std::fs::write(&config_path, toml_str)
196 .with_context(|| format!("failed to write {}", config_path.display()))?;
197 self.reload().await
198 }
199
200 async fn reload(&self) -> Result<()> {
201 self.reload().await
202 }
203
204 async fn reply_to_ask(&self, session: u64, content: String) -> Result<()> {
205 let rt = self.runtime.read().await.clone();
206 if let Some(tx) = rt.hook.pending_asks.lock().await.remove(&session) {
209 let _ = tx.send(content);
210 return Ok(());
211 }
212 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
213 if let Some(tx) = rt.hook.pending_asks.lock().await.remove(&session) {
214 let _ = tx.send(content);
215 return Ok(());
216 }
217 anyhow::bail!("no pending ask_user for session {session}")
218 }
219}
220
221impl Daemon {
222 fn load_config(&self) -> Result<crate::DaemonConfig> {
224 crate::DaemonConfig::load(&self.config_dir.join(wcore::paths::CONFIG_FILE))
225 }
226}