Skip to main content

oxios_cli/
channel.rs

1//! CLI channel implementation.
2//!
3//! Implements the [`Channel`] trait from `oxios-gateway` so the CLI
4//! can plug into the gateway like any other channel.
5//!
6//! Uses `mpsc` channels to bridge:
7//! - **Incoming**: User typed input → mpsc → Gateway → Kernel
8//! - **Outgoing**: Kernel → Gateway → mpsc → stdout
9
10use anyhow::Result;
11use async_trait::async_trait;
12use oxios_gateway::channel::Channel;
13use oxios_gateway::format::ChannelFormatter;
14use oxios_gateway::message::{IncomingMessage, OutgoingMessage};
15use oxios_gateway::GatewayInbox;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use tokio::sync::{mpsc, watch, Mutex};
19
20use crate::format::CliFormatter;
21use crate::session::Session;
22
23/// The CLI channel adapter.
24///
25/// Bridges the interactive readline loop with the gateway's channel
26/// interface using mpsc channels for message passing.
27pub struct CliChannel {
28    /// Receiver for incoming messages (user input from readline).
29    /// `Option` so `start()` can take ownership via `take()`.
30    incoming_rx: Mutex<Option<mpsc::Receiver<IncomingMessage>>>,
31    /// Sender for injecting incoming messages.
32    incoming_tx: mpsc::Sender<IncomingMessage>,
33    /// Current session metadata.
34    session: Arc<std::sync::Mutex<Session>>,
35    /// CLI response formatter.
36    formatter: CliFormatter,
37    /// Shared flag indicating whether a request is currently being processed.
38    /// Set to `true` by the interactive loop on send, `false` by `send()` on response.
39    processing: Arc<AtomicBool>,
40}
41
42impl CliChannel {
43    /// Creates a new CLI channel with the given buffer size.
44    pub fn new(buffer: usize) -> Self {
45        let (incoming_tx, incoming_rx) = mpsc::channel(buffer);
46        let session = Arc::new(std::sync::Mutex::new(Session::new(None)));
47        let processing = Arc::new(AtomicBool::new(false));
48
49        Self {
50            incoming_rx: Mutex::new(Some(incoming_rx)),
51            incoming_tx,
52            session,
53            formatter: CliFormatter,
54            processing,
55        }
56    }
57
58    /// Returns a sender that can be used to inject incoming messages.
59    pub fn sender(&self) -> mpsc::Sender<IncomingMessage> {
60        self.incoming_tx.clone()
61    }
62
63    /// Returns a handle for injecting messages from outside the channel.
64    pub fn handle(&self) -> CliChannelHandle {
65        CliChannelHandle {
66            incoming_tx: self.incoming_tx.clone(),
67            session: self.session.clone(),
68            processing: self.processing.clone(),
69        }
70    }
71
72    /// Returns a clone of the shared processing flag.
73    pub fn processing_flag(&self) -> Arc<AtomicBool> {
74        self.processing.clone()
75    }
76}
77
78#[async_trait]
79impl Channel for CliChannel {
80    fn name(&self) -> &str {
81        "cli"
82    }
83
84    async fn start(
85        &self,
86        tx: mpsc::Sender<GatewayInbox>,
87        mut shutdown: watch::Receiver<bool>,
88    ) -> Result<tokio::task::JoinHandle<()>> {
89        let internal_rx = self.incoming_rx.lock().await.take();
90        let Some(mut internal_rx) = internal_rx else {
91            anyhow::bail!("CLI channel already started (no receiver)");
92        };
93        let channel_name = self.name().to_owned();
94
95        let handle = tokio::spawn(async move {
96            loop {
97                tokio::select! {
98                    msg = internal_rx.recv() => {
99                        match msg {
100                            Some(msg) => {
101                                if tx.send((channel_name.clone(), msg)).await.is_err() {
102                                    break; // Gateway receiver closed
103                                }
104                            }
105                            None => break,
106                        }
107                    }
108                    _ = shutdown.changed() => break,
109                }
110            }
111            tracing::info!(channel = %channel_name, "CLI channel stopped");
112        });
113
114        Ok(handle)
115    }
116
117    async fn send(&self, msg: OutgoingMessage) -> Result<()> {
118        let output = match &msg.meta {
119            Some(meta) if meta.error.is_some() => self.formatter.format_error(&msg),
120            Some(_) => self.formatter.format_success(&msg),
121            None => msg.content.clone(),
122        };
123        println!("{output}");
124        self.processing.store(false, Ordering::Relaxed);
125        Ok(())
126    }
127}
128
129impl std::fmt::Debug for CliChannel {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct("CliChannel").finish()
132    }
133}
134
135/// Handle to the CLI channel, used to inject messages from the readline loop.
136#[derive(Clone)]
137pub struct CliChannelHandle {
138    /// Sender for injecting incoming messages into the gateway pipeline.
139    pub incoming_tx: mpsc::Sender<IncomingMessage>,
140    /// Shared session reference.
141    session: Arc<std::sync::Mutex<Session>>,
142    /// Shared processing flag (set `true` on send, `false` on response).
143    processing: Arc<AtomicBool>,
144}
145
146impl std::fmt::Debug for CliChannelHandle {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        f.debug_struct("CliChannelHandle").finish()
149    }
150}
151
152impl CliChannelHandle {
153    /// Creates a handle from a CliChannel.
154    pub fn from_channel(channel: &CliChannel) -> Self {
155        channel.handle()
156    }
157
158    /// Send a user message into the gateway pipeline.
159    pub async fn send_user_message(&self, content: String) -> Result<()> {
160        let mut msg = IncomingMessage::new("cli", "cli-user", &content);
161        {
162            let session = self.session.lock().unwrap_or_else(|e| {
163                tracing::error!("Mutex poisoned: {e}");
164                e.into_inner()
165            });
166            msg.metadata
167                .insert("session_id".to_owned(), session.id.to_string());
168        }
169        self.incoming_tx
170            .send(msg)
171            .await
172            .map_err(|e| anyhow::anyhow!("{e}"))?;
173        Ok(())
174    }
175
176    /// Touch the session (update activity).
177    pub fn touch_session(&self) {
178        if let Ok(mut session) = self.session.lock() {
179            session.touch();
180        }
181    }
182
183    /// Reset the session (create a new one).
184    pub fn reset_session(&self) {
185        if let Ok(mut session) = self.session.lock() {
186            *session = Session::new(None);
187        }
188    }
189
190    /// Get the current session ID.
191    pub fn session_id(&self) -> uuid::Uuid {
192        self.session.lock().map(|s| s.id).unwrap_or_default()
193    }
194
195    /// Mark that a request is being processed.
196    pub fn set_processing(&self, value: bool) {
197        self.processing.store(value, Ordering::Relaxed);
198    }
199
200    /// Check whether a request is currently being processed.
201    pub fn is_processing(&self) -> bool {
202        self.processing.load(Ordering::Relaxed)
203    }
204
205    /// Send a switch_model action to the gateway.
206    ///
207    /// The gateway detects the `action` metadata and routes to `EngineApi::set_model()`
208    /// instead of the orchestrator.
209    pub async fn send_switch_model(&self, model_id: &str) -> Result<()> {
210        let mut msg = IncomingMessage::new("cli", "cli-user", format!("switch_model: {model_id}"));
211        msg.metadata
212            .insert("action".to_owned(), "switch_model".to_owned());
213        msg.metadata
214            .insert("model_id".to_owned(), model_id.to_owned());
215        {
216            let session = self.session.lock().unwrap_or_else(|e| {
217                tracing::error!("Mutex poisoned: {e}");
218                e.into_inner()
219            });
220            msg.metadata
221                .insert("session_id".to_owned(), session.id.to_string());
222        }
223        self.incoming_tx
224            .send(msg)
225            .await
226            .map_err(|e| anyhow::anyhow!("{e}"))?;
227        Ok(())
228    }
229
230    /// Send a switch_persona action to the gateway.
231    ///
232    /// The gateway detects the `action` metadata and routes to `PersonaApi::set_active()`
233    /// instead of the orchestrator.
234    pub async fn send_switch_persona(&self, persona_id: &str) -> Result<()> {
235        let mut msg =
236            IncomingMessage::new("cli", "cli-user", format!("switch_persona: {persona_id}"));
237        msg.metadata
238            .insert("action".to_owned(), "switch_persona".to_owned());
239        msg.metadata
240            .insert("persona_id".to_owned(), persona_id.to_owned());
241        {
242            let session = self.session.lock().unwrap_or_else(|e| {
243                tracing::error!("Mutex poisoned: {e}");
244                e.into_inner()
245            });
246            msg.metadata
247                .insert("session_id".to_owned(), session.id.to_string());
248        }
249        self.incoming_tx
250            .send(msg)
251            .await
252            .map_err(|e| anyhow::anyhow!("{e}"))?;
253        Ok(())
254    }
255}