Skip to main content

oxios_cli/
channel.rs

1//! CLI channel implementation.
2//!
3//! Implements the [`Channel`] trait from `oxios-gateway` so the CLI
4//! can plug into the gateway like any other channel.
5//!
6//! Uses `mpsc` channels to bridge:
7//! - **Incoming**: User typed input → mpsc → Gateway → Kernel
8//! - **Outgoing**: Kernel → Gateway → mpsc → stdout
9
10use anyhow::Result;
11use async_trait::async_trait;
12use oxios_gateway::channel::Channel;
13use oxios_gateway::message::{IncomingMessage, OutgoingMessage};
14use std::sync::Arc;
15use tokio::sync::mpsc;
16use tokio::sync::Mutex;
17
18use crate::session::Session;
19
20/// The CLI channel adapter.
21///
22/// Bridges the interactive readline loop with the gateway's channel
23/// interface using mpsc channels for message passing.
24pub struct CliChannel {
25    /// Receiver for incoming messages (user input from readline).
26    incoming_rx: Mutex<mpsc::Receiver<IncomingMessage>>,
27    /// Sender for injecting incoming messages.
28    incoming_tx: mpsc::Sender<IncomingMessage>,
29    /// Current session metadata.
30    session: Arc<std::sync::Mutex<Session>>,
31}
32
33impl CliChannel {
34    /// Creates a new CLI channel with the given buffer size.
35    pub fn new(buffer: usize) -> Self {
36        let (incoming_tx, incoming_rx) = mpsc::channel(buffer);
37        let session = Arc::new(std::sync::Mutex::new(Session::new(None)));
38
39        Self {
40            incoming_rx: Mutex::new(incoming_rx),
41            incoming_tx,
42            session,
43        }
44    }
45
46    /// Returns a sender that can be used to inject incoming messages.
47    pub fn sender(&self) -> mpsc::Sender<IncomingMessage> {
48        self.incoming_tx.clone()
49    }
50
51    /// Returns a handle for injecting messages from outside the channel.
52    pub fn handle(&self) -> CliChannelHandle {
53        CliChannelHandle {
54            incoming_tx: self.incoming_tx.clone(),
55            session: self.session.clone(),
56        }
57    }
58}
59
60#[async_trait]
61impl Channel for CliChannel {
62    fn name(&self) -> &str {
63        "cli"
64    }
65
66    async fn receive(&self) -> Result<Option<IncomingMessage>> {
67        let mut rx = self.incoming_rx.lock().await;
68        Ok(rx.recv().await)
69    }
70
71    async fn send(&self, msg: OutgoingMessage) -> Result<()> {
72        // Print the response to stdout.
73        println!("{}", msg.content);
74        Ok(())
75    }
76}
77
78impl std::fmt::Debug for CliChannel {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        f.debug_struct("CliChannel").finish()
81    }
82}
83
84/// Handle to the CLI channel, used to inject messages from the readline loop.
85#[derive(Debug, Clone)]
86pub struct CliChannelHandle {
87    /// Sender for injecting incoming messages into the gateway pipeline.
88    pub incoming_tx: mpsc::Sender<IncomingMessage>,
89    /// Shared session reference.
90    session: Arc<std::sync::Mutex<Session>>,
91}
92
93impl CliChannelHandle {
94    /// Creates a handle from a CliChannel.
95    pub fn from_channel(channel: &CliChannel) -> Self {
96        channel.handle()
97    }
98
99    /// Send a user message into the gateway pipeline.
100    pub async fn send_user_message(&self, content: String) -> Result<()> {
101        let mut msg = IncomingMessage::new("cli", "cli-user", &content);
102        {
103            let session = self.session.lock().unwrap();
104            msg.metadata
105                .insert("session_id".to_owned(), session.id.to_string());
106        }
107        self.incoming_tx
108            .send(msg)
109            .await
110            .map_err(|e| anyhow::anyhow!("{e}"))?;
111        Ok(())
112    }
113
114    /// Touch the session (update activity).
115    pub fn touch_session(&self) {
116        if let Ok(mut session) = self.session.lock() {
117            session.touch();
118        }
119    }
120
121    /// Reset the session (create a new one).
122    pub fn reset_session(&self) {
123        if let Ok(mut session) = self.session.lock() {
124            *session = Session::new(None);
125        }
126    }
127
128    /// Get the current session ID.
129    pub fn session_id(&self) -> uuid::Uuid {
130        self.session.lock().map(|s| s.id).unwrap_or_default()
131    }
132}