deck-orchestrator 0.1.1

Central mpsc hub coordinating LLM/MCP/store/sandbox tasks (ono-sendai)
Documentation
//! deck-orchestrator — the mpsc hub.
//!
//! Owns the `tokio::sync::mpsc` channels that connect the TUI to the LLM,
//! MCP, store and sandbox tasks. The TUI sends [`Command`]s in; each task
//! emits [`Event`]s out; a broadcast channel fans events to anyone who
//! subscribes (TUI, log file, future remote dashboard).

use std::sync::Arc;

use deck_core::{LlmBackend, Message, Role, SessionId, Store, ToolCall, ToolResult};
use futures::StreamExt;
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinSet;
use tracing::{error, warn};

#[derive(Debug, Clone)]
pub enum Command {
    /// Append a user turn and ask the LLM to continue.
    UserMessage { session: SessionId, content: String },
    /// Approve a pending tool call.
    ApproveTool { call_id: String },
    /// Deny a pending tool call.
    DenyTool { call_id: String },
    /// Graceful shutdown.
    Shutdown,
}

#[derive(Debug, Clone)]
pub enum Event {
    AssistantDelta {
        session: SessionId,
        text: String,
    },
    AssistantTurn {
        session: SessionId,
        message: Message,
    },
    ToolCallProposed {
        call: ToolCall,
    },
    ToolCallResult {
        result: ToolResult,
    },
    Error {
        message: String,
    },
}

/// Lightweight handle the TUI (and any other client) keeps. Cloneable;
/// internally just channel handles + a broadcast subscription factory.
#[derive(Clone)]
pub struct Handle {
    commands_tx: mpsc::Sender<Command>,
    events_tx: broadcast::Sender<Event>,
}

impl std::fmt::Debug for Handle {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Handle").finish_non_exhaustive()
    }
}

impl Handle {
    /// Submit a command to the orchestrator. Returns an error if the
    /// orchestrator task has shut down. Wraps the channel-specific error
    /// type so the channel implementation stays an internal detail.
    pub async fn submit(&self, cmd: Command) -> deck_core::Result<()> {
        self.commands_tx
            .send(cmd)
            .await
            .map_err(|_| deck_core::DeckError::Orchestrator("runtime stopped".into()))
    }

    #[must_use]
    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
        self.events_tx.subscribe()
    }
}

/// The owned runtime piece: holds the receiver and spawned task.
#[derive(Debug)]
pub struct Runtime {
    pub handle: Handle,
    join: tokio::task::JoinHandle<()>,
}

impl Runtime {
    pub fn spawn(llm: Arc<dyn LlmBackend>, store: Arc<dyn Store>, model: String) -> Self {
        let (commands_tx, commands_rx) = mpsc::channel::<Command>(64);
        let (events_tx, _) = broadcast::channel::<Event>(256);
        let handle = Handle {
            commands_tx,
            events_tx: events_tx.clone(),
        };
        let join = tokio::spawn(run_loop(commands_rx, events_tx, llm, store, model));
        Self { handle, join }
    }

    pub async fn shutdown(self) {
        let _ = self.handle.commands_tx.send(Command::Shutdown).await;
        let _ = self.join.await;
    }
}

async fn run_loop(
    mut commands_rx: mpsc::Receiver<Command>,
    events_tx: broadcast::Sender<Event>,
    llm: Arc<dyn LlmBackend>,
    store: Arc<dyn Store>,
    model: String,
) {
    // In-flight per-session tasks. JoinSet so `Shutdown` can drain them
    // instead of leaving writes to `store` racing against process exit.
    let mut in_flight: JoinSet<()> = JoinSet::new();
    loop {
        tokio::select! {
            cmd = commands_rx.recv() => {
                match cmd {
                    None | Some(Command::Shutdown) => break,
                    Some(Command::UserMessage { session, content }) => {
                        // Per-session spawn so a slow LLM stream on one
                        // session does not head-of-line block commands on
                        // other sessions (and so we do not hold the
                        // dispatcher across the entire `await` chain).
                        let events_tx = events_tx.clone();
                        let llm = llm.clone();
                        let store = store.clone();
                        let model = model.clone();
                        in_flight.spawn(async move {
                            if let Err(e) = handle_user_message(
                                &events_tx,
                                llm.as_ref(),
                                store.as_ref(),
                                &model,
                                session,
                                content,
                            )
                            .await
                            {
                                let _ = events_tx.send(Event::Error {
                                    message: e.to_string(),
                                });
                            }
                        });
                    }
                    Some(Command::ApproveTool { call_id } | Command::DenyTool { call_id }) => {
                        warn!(call_id, "tool approval not wired in 0.1");
                        let _ = events_tx.send(Event::Error {
                            message: format!(
                                "tool approval is not yet implemented in 0.1 (call_id={call_id})"
                            ),
                        });
                    }
                }
            }
            // Reap completed in-flight tasks so the JoinSet does not grow
            // unboundedly during a long session. We don't care about
            // individual task results here — errors already emit Event::Error.
            Some(_done) = in_flight.join_next(), if !in_flight.is_empty() => {}
        }
    }
    // Drain any remaining in-flight per-session tasks before returning,
    // so `Runtime::shutdown` is genuinely graceful.
    while in_flight.join_next().await.is_some() {}
}

async fn handle_user_message(
    events_tx: &broadcast::Sender<Event>,
    llm: &dyn LlmBackend,
    store: &dyn Store,
    model: &str,
    session: SessionId,
    content: String,
) -> deck_core::Result<()> {
    let user_msg = Message {
        role: Role::User,
        content,
        tool_calls: vec![],
    };
    store.append(session, &user_msg).await?;
    let history = store.load(session).await?;
    let mut stream = llm.stream(model, &history).await?;
    let mut accumulated = String::new();
    while let Some(item) = stream.next().await {
        match item {
            Ok(delta) => {
                if !delta.content.is_empty() {
                    accumulated.push_str(&delta.content);
                    let _ = events_tx.send(Event::AssistantDelta {
                        session,
                        text: delta.content,
                    });
                }
            }
            Err(e) => {
                error!(error = %e, "llm stream error");
                let _ = events_tx.send(Event::Error {
                    message: e.to_string(),
                });
                return Err(e);
            }
        }
    }
    let asst_msg = Message {
        role: Role::Assistant,
        content: accumulated,
        tool_calls: vec![],
    };
    store.append(session, &asst_msg).await?;
    let _ = events_tx.send(Event::AssistantTurn {
        session,
        message: asst_msg,
    });
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use async_trait::async_trait;
    use deck_core::{DeckError, Message, Role};
    use futures::stream::{self, BoxStream};
    use tokio::sync::Mutex as AsyncMutex;

    struct EchoLlm;
    #[async_trait]
    impl LlmBackend for EchoLlm {
        fn id(&self) -> String {
            "echo".into()
        }
        async fn complete(&self, _model: &str, messages: &[Message]) -> deck_core::Result<Message> {
            let last = messages
                .last()
                .cloned()
                .ok_or_else(|| DeckError::Llm("empty".into()))?;
            Ok(Message {
                role: Role::Assistant,
                content: format!("echo:{}", last.content),
                tool_calls: vec![],
            })
        }
        async fn stream(
            &self,
            _model: &str,
            messages: &[Message],
        ) -> deck_core::Result<BoxStream<'static, deck_core::Result<Message>>> {
            let last = messages
                .last()
                .cloned()
                .ok_or_else(|| DeckError::Llm("empty".into()))?;
            let chunks: Vec<deck_core::Result<Message>> = format!("echo:{}", last.content)
                .chars()
                .map(|c| {
                    Ok(Message {
                        role: Role::Assistant,
                        content: c.to_string(),
                        tool_calls: vec![],
                    })
                })
                .collect();
            Ok(stream::iter(chunks).boxed())
        }
    }

    #[derive(Default, Clone)]
    struct MemStore {
        inner: Arc<AsyncMutex<std::collections::HashMap<SessionId, Vec<Message>>>>,
    }
    #[async_trait]
    impl Store for MemStore {
        async fn append(&self, s: SessionId, m: &Message) -> deck_core::Result<()> {
            self.inner
                .lock()
                .await
                .entry(s)
                .or_default()
                .push(m.clone());
            Ok(())
        }
        async fn load(&self, s: SessionId) -> deck_core::Result<Vec<Message>> {
            Ok(self.inner.lock().await.get(&s).cloned().unwrap_or_default())
        }
        async fn list(&self) -> deck_core::Result<Vec<SessionId>> {
            Ok(self.inner.lock().await.keys().copied().collect())
        }
    }

    #[tokio::test]
    async fn user_message_produces_assistant_turn() {
        let llm: Arc<dyn LlmBackend> = Arc::new(EchoLlm);
        let store: Arc<dyn Store> = Arc::new(MemStore::default());
        let rt = Runtime::spawn(llm, store.clone(), "test-model".into());
        let mut rx = rt.handle.subscribe();
        let session = SessionId::new();
        rt.handle
            .submit(Command::UserMessage {
                session,
                content: "hi".into(),
            })
            .await
            .unwrap();
        let mut got_turn = false;
        for _ in 0..100 {
            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
                Ok(Ok(Event::AssistantTurn { message, .. })) => {
                    assert!(message.content.contains("echo:hi"));
                    got_turn = true;
                    break;
                }
                Ok(Ok(_)) => continue,
                _ => break,
            }
        }
        assert!(got_turn);
        rt.shutdown().await;
        let history = store.load(session).await.unwrap();
        assert_eq!(history.len(), 2);
    }
}