Skip to main content

deck_orchestrator/
lib.rs

1//! deck-orchestrator — the mpsc hub.
2//!
3//! Owns the `tokio::sync::mpsc` channels that connect the TUI to the LLM,
4//! MCP, store and sandbox tasks. The TUI sends [`Command`]s in; each task
5//! emits [`Event`]s out; a broadcast channel fans events to anyone who
6//! subscribes (TUI, log file, future remote dashboard).
7
8use std::sync::Arc;
9
10use deck_core::{LlmBackend, Message, Role, SessionId, Store, ToolCall, ToolResult};
11use futures::StreamExt;
12use tokio::sync::{broadcast, mpsc};
13use tokio::task::JoinSet;
14use tracing::{error, warn};
15
16#[derive(Debug, Clone)]
17pub enum Command {
18    /// Append a user turn and ask the LLM to continue.
19    UserMessage { session: SessionId, content: String },
20    /// Approve a pending tool call.
21    ApproveTool { call_id: String },
22    /// Deny a pending tool call.
23    DenyTool { call_id: String },
24    /// Graceful shutdown.
25    Shutdown,
26}
27
28#[derive(Debug, Clone)]
29pub enum Event {
30    AssistantDelta {
31        session: SessionId,
32        text: String,
33    },
34    AssistantTurn {
35        session: SessionId,
36        message: Message,
37    },
38    ToolCallProposed {
39        call: ToolCall,
40    },
41    ToolCallResult {
42        result: ToolResult,
43    },
44    Error {
45        message: String,
46    },
47}
48
49/// Lightweight handle the TUI (and any other client) keeps. Cloneable;
50/// internally just channel handles + a broadcast subscription factory.
51#[derive(Clone)]
52pub struct Handle {
53    commands_tx: mpsc::Sender<Command>,
54    events_tx: broadcast::Sender<Event>,
55}
56
57impl std::fmt::Debug for Handle {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("Handle").finish_non_exhaustive()
60    }
61}
62
63impl Handle {
64    /// Submit a command to the orchestrator. Returns an error if the
65    /// orchestrator task has shut down. Wraps the channel-specific error
66    /// type so the channel implementation stays an internal detail.
67    pub async fn submit(&self, cmd: Command) -> deck_core::Result<()> {
68        self.commands_tx
69            .send(cmd)
70            .await
71            .map_err(|_| deck_core::DeckError::Orchestrator("runtime stopped".into()))
72    }
73
74    #[must_use]
75    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
76        self.events_tx.subscribe()
77    }
78}
79
80/// The owned runtime piece: holds the receiver and spawned task.
81#[derive(Debug)]
82pub struct Runtime {
83    pub handle: Handle,
84    join: tokio::task::JoinHandle<()>,
85}
86
87impl Runtime {
88    pub fn spawn(llm: Arc<dyn LlmBackend>, store: Arc<dyn Store>, model: String) -> Self {
89        let (commands_tx, commands_rx) = mpsc::channel::<Command>(64);
90        let (events_tx, _) = broadcast::channel::<Event>(256);
91        let handle = Handle {
92            commands_tx,
93            events_tx: events_tx.clone(),
94        };
95        let join = tokio::spawn(run_loop(commands_rx, events_tx, llm, store, model));
96        Self { handle, join }
97    }
98
99    pub async fn shutdown(self) {
100        let _ = self.handle.commands_tx.send(Command::Shutdown).await;
101        let _ = self.join.await;
102    }
103}
104
105async fn run_loop(
106    mut commands_rx: mpsc::Receiver<Command>,
107    events_tx: broadcast::Sender<Event>,
108    llm: Arc<dyn LlmBackend>,
109    store: Arc<dyn Store>,
110    model: String,
111) {
112    // In-flight per-session tasks. JoinSet so `Shutdown` can drain them
113    // instead of leaving writes to `store` racing against process exit.
114    let mut in_flight: JoinSet<()> = JoinSet::new();
115    loop {
116        tokio::select! {
117            cmd = commands_rx.recv() => {
118                match cmd {
119                    None | Some(Command::Shutdown) => break,
120                    Some(Command::UserMessage { session, content }) => {
121                        // Per-session spawn so a slow LLM stream on one
122                        // session does not head-of-line block commands on
123                        // other sessions (and so we do not hold the
124                        // dispatcher across the entire `await` chain).
125                        let events_tx = events_tx.clone();
126                        let llm = llm.clone();
127                        let store = store.clone();
128                        let model = model.clone();
129                        in_flight.spawn(async move {
130                            if let Err(e) = handle_user_message(
131                                &events_tx,
132                                llm.as_ref(),
133                                store.as_ref(),
134                                &model,
135                                session,
136                                content,
137                            )
138                            .await
139                            {
140                                let _ = events_tx.send(Event::Error {
141                                    message: e.to_string(),
142                                });
143                            }
144                        });
145                    }
146                    Some(Command::ApproveTool { call_id } | Command::DenyTool { call_id }) => {
147                        warn!(call_id, "tool approval not wired in 0.1");
148                        let _ = events_tx.send(Event::Error {
149                            message: format!(
150                                "tool approval is not yet implemented in 0.1 (call_id={call_id})"
151                            ),
152                        });
153                    }
154                }
155            }
156            // Reap completed in-flight tasks so the JoinSet does not grow
157            // unboundedly during a long session. We don't care about
158            // individual task results here — errors already emit Event::Error.
159            Some(_done) = in_flight.join_next(), if !in_flight.is_empty() => {}
160        }
161    }
162    // Drain any remaining in-flight per-session tasks before returning,
163    // so `Runtime::shutdown` is genuinely graceful.
164    while in_flight.join_next().await.is_some() {}
165}
166
167async fn handle_user_message(
168    events_tx: &broadcast::Sender<Event>,
169    llm: &dyn LlmBackend,
170    store: &dyn Store,
171    model: &str,
172    session: SessionId,
173    content: String,
174) -> deck_core::Result<()> {
175    let user_msg = Message {
176        role: Role::User,
177        content,
178        tool_calls: vec![],
179    };
180    store.append(session, &user_msg).await?;
181    let history = store.load(session).await?;
182    let mut stream = llm.stream(model, &history).await?;
183    let mut accumulated = String::new();
184    while let Some(item) = stream.next().await {
185        match item {
186            Ok(delta) => {
187                if !delta.content.is_empty() {
188                    accumulated.push_str(&delta.content);
189                    let _ = events_tx.send(Event::AssistantDelta {
190                        session,
191                        text: delta.content,
192                    });
193                }
194            }
195            Err(e) => {
196                error!(error = %e, "llm stream error");
197                let _ = events_tx.send(Event::Error {
198                    message: e.to_string(),
199                });
200                return Err(e);
201            }
202        }
203    }
204    let asst_msg = Message {
205        role: Role::Assistant,
206        content: accumulated,
207        tool_calls: vec![],
208    };
209    store.append(session, &asst_msg).await?;
210    let _ = events_tx.send(Event::AssistantTurn {
211        session,
212        message: asst_msg,
213    });
214    Ok(())
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use async_trait::async_trait;
221    use deck_core::{DeckError, Message, Role};
222    use futures::stream::{self, BoxStream};
223    use tokio::sync::Mutex as AsyncMutex;
224
225    struct EchoLlm;
226    #[async_trait]
227    impl LlmBackend for EchoLlm {
228        fn id(&self) -> String {
229            "echo".into()
230        }
231        async fn complete(&self, _model: &str, messages: &[Message]) -> deck_core::Result<Message> {
232            let last = messages
233                .last()
234                .cloned()
235                .ok_or_else(|| DeckError::Llm("empty".into()))?;
236            Ok(Message {
237                role: Role::Assistant,
238                content: format!("echo:{}", last.content),
239                tool_calls: vec![],
240            })
241        }
242        async fn stream(
243            &self,
244            _model: &str,
245            messages: &[Message],
246        ) -> deck_core::Result<BoxStream<'static, deck_core::Result<Message>>> {
247            let last = messages
248                .last()
249                .cloned()
250                .ok_or_else(|| DeckError::Llm("empty".into()))?;
251            let chunks: Vec<deck_core::Result<Message>> = format!("echo:{}", last.content)
252                .chars()
253                .map(|c| {
254                    Ok(Message {
255                        role: Role::Assistant,
256                        content: c.to_string(),
257                        tool_calls: vec![],
258                    })
259                })
260                .collect();
261            Ok(stream::iter(chunks).boxed())
262        }
263    }
264
265    #[derive(Default, Clone)]
266    struct MemStore {
267        inner: Arc<AsyncMutex<std::collections::HashMap<SessionId, Vec<Message>>>>,
268    }
269    #[async_trait]
270    impl Store for MemStore {
271        async fn append(&self, s: SessionId, m: &Message) -> deck_core::Result<()> {
272            self.inner
273                .lock()
274                .await
275                .entry(s)
276                .or_default()
277                .push(m.clone());
278            Ok(())
279        }
280        async fn load(&self, s: SessionId) -> deck_core::Result<Vec<Message>> {
281            Ok(self.inner.lock().await.get(&s).cloned().unwrap_or_default())
282        }
283        async fn list(&self) -> deck_core::Result<Vec<SessionId>> {
284            Ok(self.inner.lock().await.keys().copied().collect())
285        }
286    }
287
288    #[tokio::test]
289    async fn user_message_produces_assistant_turn() {
290        let llm: Arc<dyn LlmBackend> = Arc::new(EchoLlm);
291        let store: Arc<dyn Store> = Arc::new(MemStore::default());
292        let rt = Runtime::spawn(llm, store.clone(), "test-model".into());
293        let mut rx = rt.handle.subscribe();
294        let session = SessionId::new();
295        rt.handle
296            .submit(Command::UserMessage {
297                session,
298                content: "hi".into(),
299            })
300            .await
301            .unwrap();
302        let mut got_turn = false;
303        for _ in 0..100 {
304            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
305                Ok(Ok(Event::AssistantTurn { message, .. })) => {
306                    assert!(message.content.contains("echo:hi"));
307                    got_turn = true;
308                    break;
309                }
310                Ok(Ok(_)) => continue,
311                _ => break,
312            }
313        }
314        assert!(got_turn);
315        rt.shutdown().await;
316        let history = store.load(session).await.unwrap();
317        assert_eq!(history.len(), 2);
318    }
319}