Skip to main content

crabtalk_daemon/daemon/
protocol.rs

1//! Server trait implementation for the Daemon.
2
3use crate::daemon::Daemon;
4use anyhow::{Context, Result};
5use futures_util::{StreamExt, pin_mut};
6use std::sync::Arc;
7use wcore::AgentEvent;
8use wcore::protocol::{
9    api::Server,
10    message::{
11        AgentEventMsg, AskOption, AskQuestion, AskUserEvent, SendMsg, SendResponse, SessionInfo,
12        StreamChunk, StreamEnd, StreamEvent, StreamMsg, StreamStart, StreamThinking, ToolCallInfo,
13        ToolResultEvent, ToolStartEvent, ToolsCompleteEvent, stream_event,
14    },
15};
16
17impl Server for Daemon {
18    async fn send(&self, req: SendMsg) -> Result<SendResponse> {
19        let rt: Arc<_> = self.runtime.read().await.clone();
20        let sender = req.sender.as_deref().unwrap_or("");
21        let created_by = if sender.is_empty() { "user" } else { sender };
22        let cwd = req.cwd.map(std::path::PathBuf::from);
23        let (session_id, is_new) = match req.session {
24            Some(id) => (id, false),
25            None => {
26                let id = rt.create_session(&req.agent, created_by).await?;
27                if let Some(ref cwd) = cwd {
28                    rt.hook.session_cwds.lock().await.insert(id, cwd.clone());
29                }
30                (id, true)
31            }
32        };
33        let response = rt.send_to(session_id, &req.content, sender).await?;
34        if is_new {
35            rt.hook.session_cwds.lock().await.remove(&session_id);
36            rt.close_session(session_id).await;
37        }
38        Ok(SendResponse {
39            agent: req.agent,
40            content: response.final_response.unwrap_or_default(),
41            session: session_id,
42        })
43    }
44
45    fn stream(
46        &self,
47        req: StreamMsg,
48    ) -> impl futures_core::Stream<Item = Result<StreamEvent>> + Send {
49        let runtime = self.runtime.clone();
50        let agent = req.agent;
51        let content = req.content;
52        let req_session = req.session;
53        let sender = req.sender.unwrap_or_default();
54        let cwd = req.cwd.map(std::path::PathBuf::from);
55        async_stream::try_stream! {
56            let rt: Arc<_> = runtime.read().await.clone();
57            let created_by = if sender.is_empty() { "user".into() } else { sender.clone() };
58            let (session_id, is_new) = match req_session {
59                Some(id) => (id, false),
60                None => {
61                    let id = rt.create_session(&agent, created_by.as_str()).await?;
62                    if let Some(ref cwd) = cwd {
63                        rt.hook.session_cwds.lock().await.insert(id, cwd.clone());
64                    }
65                    (id, true)
66                }
67            };
68
69            yield StreamEvent { event: Some(stream_event::Event::Start(StreamStart { agent: agent.clone(), session: session_id })) };
70
71            let stream = rt.stream_to(session_id, &content, &sender);
72            pin_mut!(stream);
73            while let Some(event) = stream.next().await {
74                match event {
75                    AgentEvent::TextDelta(text) => {
76                        yield StreamEvent { event: Some(stream_event::Event::Chunk(StreamChunk { content: text })) };
77                    }
78                    AgentEvent::ThinkingDelta(text) => {
79                        yield StreamEvent { event: Some(stream_event::Event::Thinking(StreamThinking { content: text })) };
80                    }
81                    AgentEvent::ToolCallsStart(calls) => {
82                        // Extract structured questions from ask_user calls.
83                        let ask_questions: Vec<AskQuestion> = calls
84                            .iter()
85                            .filter(|c| c.function.name == "ask_user")
86                            .filter_map(|c| {
87                                serde_json::from_str::<crate::hook::system::ask_user::AskUser>(&c.function.arguments)
88                                    .ok()
89                            })
90                            .flat_map(|a| a.questions)
91                            .map(|q| AskQuestion {
92                                question: q.question,
93                                header: q.header,
94                                options: q.options.into_iter().map(|o| AskOption {
95                                    label: o.label,
96                                    description: o.description,
97                                }).collect(),
98                                multi_select: q.multi_select,
99                            })
100                            .collect();
101
102                        yield StreamEvent { event: Some(stream_event::Event::ToolStart(ToolStartEvent {
103                            calls: calls.into_iter().map(|c| ToolCallInfo {
104                                name: c.function.name.to_string(),
105                                arguments: c.function.arguments,
106                            }).collect(),
107                        })) };
108
109                        if !ask_questions.is_empty() {
110                            yield StreamEvent { event: Some(stream_event::Event::AskUser(AskUserEvent { questions: ask_questions })) };
111                        }
112                    }
113                    AgentEvent::ToolResult { call_id, output } => {
114                        yield StreamEvent { event: Some(stream_event::Event::ToolResult(ToolResultEvent { call_id: call_id.to_string(), output })) };
115                    }
116                    AgentEvent::ToolCallsComplete => {
117                        yield StreamEvent { event: Some(stream_event::Event::ToolsComplete(ToolsCompleteEvent {})) };
118                    }
119                    AgentEvent::Compact { .. } => {
120                        // Compact events are handled by on_event in the hook layer.
121                    }
122                    AgentEvent::Done(resp) => {
123                        if let wcore::AgentStopReason::Error(e) = &resp.stop_reason {
124                            if is_new {
125                                rt.hook.session_cwds.lock().await.remove(&session_id);
126                                rt.close_session(session_id).await;
127                            }
128                            Err(anyhow::anyhow!("{e}"))?;
129                        }
130                        break;
131                    }
132                }
133            }
134            yield StreamEvent { event: Some(stream_event::Event::End(StreamEnd { agent: agent.clone() })) };
135        }
136    }
137
138    async fn ping(&self) -> Result<()> {
139        Ok(())
140    }
141
142    async fn list_sessions(&self) -> Result<Vec<SessionInfo>> {
143        let rt = self.runtime.read().await.clone();
144        let sessions = rt.sessions().await;
145        let mut infos = Vec::with_capacity(sessions.len());
146        for s in sessions {
147            let s = s.lock().await;
148            let active = rt.is_active(s.id).await;
149            infos.push(SessionInfo {
150                id: s.id,
151                agent: s.agent.to_string(),
152                created_by: s.created_by.to_string(),
153                message_count: s.history.len() as u64,
154                alive_secs: s.created_at.elapsed().as_secs(),
155                active,
156            });
157        }
158        Ok(infos)
159    }
160
161    async fn kill_session(&self, session: u64) -> Result<bool> {
162        let rt = self.runtime.read().await.clone();
163        // Drop any pending ask_user oneshot so dispatch_ask_user unblocks immediately.
164        rt.hook.pending_asks.lock().await.remove(&session);
165        rt.hook.session_cwds.lock().await.remove(&session);
166        Ok(rt.close_session(session).await)
167    }
168
169    fn subscribe_events(&self) -> impl futures_core::Stream<Item = Result<AgentEventMsg>> + Send {
170        let runtime = self.runtime.clone();
171        async_stream::try_stream! {
172            let rt = runtime.read().await.clone();
173            let mut rx = rt.hook.subscribe_events();
174            loop {
175                match rx.recv().await {
176                    Ok(event) => yield event,
177                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
178                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
179                }
180            }
181        }
182    }
183
184    async fn get_config(&self) -> Result<String> {
185        let config = self.load_config()?;
186        serde_json::to_string(&config).context("failed to serialize config")
187    }
188
189    async fn set_config(&self, config: String) -> Result<()> {
190        let parsed: crate::DaemonConfig =
191            serde_json::from_str(&config).context("invalid DaemonConfig JSON")?;
192        let toml_str =
193            toml::to_string_pretty(&parsed).context("failed to serialize config to TOML")?;
194        let config_path = self.config_dir.join(wcore::paths::CONFIG_FILE);
195        std::fs::write(&config_path, toml_str)
196            .with_context(|| format!("failed to write {}", config_path.display()))?;
197        self.reload().await
198    }
199
200    async fn reload(&self) -> Result<()> {
201        self.reload().await
202    }
203
204    async fn reply_to_ask(&self, session: u64, content: String) -> Result<()> {
205        let rt = self.runtime.read().await.clone();
206        // Try to find and deliver the reply. Retry once after a brief delay
207        // in case the ask_user dispatch hasn't inserted the oneshot yet.
208        if let Some(tx) = rt.hook.pending_asks.lock().await.remove(&session) {
209            let _ = tx.send(content);
210            return Ok(());
211        }
212        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
213        if let Some(tx) = rt.hook.pending_asks.lock().await.remove(&session) {
214            let _ = tx.send(content);
215            return Ok(());
216        }
217        anyhow::bail!("no pending ask_user for session {session}")
218    }
219}
220
221impl Daemon {
222    /// Load the current `DaemonConfig` from disk.
223    fn load_config(&self) -> Result<crate::DaemonConfig> {
224        crate::DaemonConfig::load(&self.config_dir.join(wcore::paths::CONFIG_FILE))
225    }
226}