1use std::sync::Arc;
9
10use deck_core::{LlmBackend, Message, Role, SessionId, Store, ToolCall, ToolResult};
11use futures::StreamExt;
12use tokio::sync::{broadcast, mpsc};
13use tokio::task::JoinSet;
14use tracing::{error, warn};
15
16#[derive(Debug, Clone)]
17pub enum Command {
18 UserMessage { session: SessionId, content: String },
20 ApproveTool { call_id: String },
22 DenyTool { call_id: String },
24 Shutdown,
26}
27
28#[derive(Debug, Clone)]
29pub enum Event {
30 AssistantDelta {
31 session: SessionId,
32 text: String,
33 },
34 AssistantTurn {
35 session: SessionId,
36 message: Message,
37 },
38 ToolCallProposed {
39 call: ToolCall,
40 },
41 ToolCallResult {
42 result: ToolResult,
43 },
44 Error {
45 message: String,
46 },
47}
48
49#[derive(Clone)]
52pub struct Handle {
53 commands_tx: mpsc::Sender<Command>,
54 events_tx: broadcast::Sender<Event>,
55}
56
57impl std::fmt::Debug for Handle {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("Handle").finish_non_exhaustive()
60 }
61}
62
63impl Handle {
64 pub async fn submit(&self, cmd: Command) -> deck_core::Result<()> {
68 self.commands_tx
69 .send(cmd)
70 .await
71 .map_err(|_| deck_core::DeckError::Orchestrator("runtime stopped".into()))
72 }
73
74 #[must_use]
75 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
76 self.events_tx.subscribe()
77 }
78}
79
80#[derive(Debug)]
82pub struct Runtime {
83 pub handle: Handle,
84 join: tokio::task::JoinHandle<()>,
85}
86
87impl Runtime {
88 pub fn spawn(llm: Arc<dyn LlmBackend>, store: Arc<dyn Store>, model: String) -> Self {
89 let (commands_tx, commands_rx) = mpsc::channel::<Command>(64);
90 let (events_tx, _) = broadcast::channel::<Event>(256);
91 let handle = Handle {
92 commands_tx,
93 events_tx: events_tx.clone(),
94 };
95 let join = tokio::spawn(run_loop(commands_rx, events_tx, llm, store, model));
96 Self { handle, join }
97 }
98
99 pub async fn shutdown(self) {
100 let _ = self.handle.commands_tx.send(Command::Shutdown).await;
101 let _ = self.join.await;
102 }
103}
104
105async fn run_loop(
106 mut commands_rx: mpsc::Receiver<Command>,
107 events_tx: broadcast::Sender<Event>,
108 llm: Arc<dyn LlmBackend>,
109 store: Arc<dyn Store>,
110 model: String,
111) {
112 let mut in_flight: JoinSet<()> = JoinSet::new();
115 loop {
116 tokio::select! {
117 cmd = commands_rx.recv() => {
118 match cmd {
119 None | Some(Command::Shutdown) => break,
120 Some(Command::UserMessage { session, content }) => {
121 let events_tx = events_tx.clone();
126 let llm = llm.clone();
127 let store = store.clone();
128 let model = model.clone();
129 in_flight.spawn(async move {
130 if let Err(e) = handle_user_message(
131 &events_tx,
132 llm.as_ref(),
133 store.as_ref(),
134 &model,
135 session,
136 content,
137 )
138 .await
139 {
140 let _ = events_tx.send(Event::Error {
141 message: e.to_string(),
142 });
143 }
144 });
145 }
146 Some(Command::ApproveTool { call_id } | Command::DenyTool { call_id }) => {
147 warn!(call_id, "tool approval not wired in 0.1");
148 let _ = events_tx.send(Event::Error {
149 message: format!(
150 "tool approval is not yet implemented in 0.1 (call_id={call_id})"
151 ),
152 });
153 }
154 }
155 }
156 Some(_done) = in_flight.join_next(), if !in_flight.is_empty() => {}
160 }
161 }
162 while in_flight.join_next().await.is_some() {}
165}
166
167async fn handle_user_message(
168 events_tx: &broadcast::Sender<Event>,
169 llm: &dyn LlmBackend,
170 store: &dyn Store,
171 model: &str,
172 session: SessionId,
173 content: String,
174) -> deck_core::Result<()> {
175 let user_msg = Message {
176 role: Role::User,
177 content,
178 tool_calls: vec![],
179 };
180 store.append(session, &user_msg).await?;
181 let history = store.load(session).await?;
182 let mut stream = llm.stream(model, &history).await?;
183 let mut accumulated = String::new();
184 while let Some(item) = stream.next().await {
185 match item {
186 Ok(delta) => {
187 if !delta.content.is_empty() {
188 accumulated.push_str(&delta.content);
189 let _ = events_tx.send(Event::AssistantDelta {
190 session,
191 text: delta.content,
192 });
193 }
194 }
195 Err(e) => {
196 error!(error = %e, "llm stream error");
197 let _ = events_tx.send(Event::Error {
198 message: e.to_string(),
199 });
200 return Err(e);
201 }
202 }
203 }
204 let asst_msg = Message {
205 role: Role::Assistant,
206 content: accumulated,
207 tool_calls: vec![],
208 };
209 store.append(session, &asst_msg).await?;
210 let _ = events_tx.send(Event::AssistantTurn {
211 session,
212 message: asst_msg,
213 });
214 Ok(())
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use async_trait::async_trait;
221 use deck_core::{DeckError, Message, Role};
222 use futures::stream::{self, BoxStream};
223 use tokio::sync::Mutex as AsyncMutex;
224
225 struct EchoLlm;
226 #[async_trait]
227 impl LlmBackend for EchoLlm {
228 fn id(&self) -> String {
229 "echo".into()
230 }
231 async fn complete(&self, _model: &str, messages: &[Message]) -> deck_core::Result<Message> {
232 let last = messages
233 .last()
234 .cloned()
235 .ok_or_else(|| DeckError::Llm("empty".into()))?;
236 Ok(Message {
237 role: Role::Assistant,
238 content: format!("echo:{}", last.content),
239 tool_calls: vec![],
240 })
241 }
242 async fn stream(
243 &self,
244 _model: &str,
245 messages: &[Message],
246 ) -> deck_core::Result<BoxStream<'static, deck_core::Result<Message>>> {
247 let last = messages
248 .last()
249 .cloned()
250 .ok_or_else(|| DeckError::Llm("empty".into()))?;
251 let chunks: Vec<deck_core::Result<Message>> = format!("echo:{}", last.content)
252 .chars()
253 .map(|c| {
254 Ok(Message {
255 role: Role::Assistant,
256 content: c.to_string(),
257 tool_calls: vec![],
258 })
259 })
260 .collect();
261 Ok(stream::iter(chunks).boxed())
262 }
263 }
264
265 #[derive(Default, Clone)]
266 struct MemStore {
267 inner: Arc<AsyncMutex<std::collections::HashMap<SessionId, Vec<Message>>>>,
268 }
269 #[async_trait]
270 impl Store for MemStore {
271 async fn append(&self, s: SessionId, m: &Message) -> deck_core::Result<()> {
272 self.inner
273 .lock()
274 .await
275 .entry(s)
276 .or_default()
277 .push(m.clone());
278 Ok(())
279 }
280 async fn load(&self, s: SessionId) -> deck_core::Result<Vec<Message>> {
281 Ok(self.inner.lock().await.get(&s).cloned().unwrap_or_default())
282 }
283 async fn list(&self) -> deck_core::Result<Vec<SessionId>> {
284 Ok(self.inner.lock().await.keys().copied().collect())
285 }
286 }
287
288 #[tokio::test]
289 async fn user_message_produces_assistant_turn() {
290 let llm: Arc<dyn LlmBackend> = Arc::new(EchoLlm);
291 let store: Arc<dyn Store> = Arc::new(MemStore::default());
292 let rt = Runtime::spawn(llm, store.clone(), "test-model".into());
293 let mut rx = rt.handle.subscribe();
294 let session = SessionId::new();
295 rt.handle
296 .submit(Command::UserMessage {
297 session,
298 content: "hi".into(),
299 })
300 .await
301 .unwrap();
302 let mut got_turn = false;
303 for _ in 0..100 {
304 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
305 Ok(Ok(Event::AssistantTurn { message, .. })) => {
306 assert!(message.content.contains("echo:hi"));
307 got_turn = true;
308 break;
309 }
310 Ok(Ok(_)) => continue,
311 _ => break,
312 }
313 }
314 assert!(got_turn);
315 rt.shutdown().await;
316 let history = store.load(session).await.unwrap();
317 assert_eq!(history.len(), 2);
318 }
319}