1use anyhow::Result;
11use async_trait::async_trait;
12use oxios_gateway::channel::Channel;
13use oxios_gateway::format::ChannelFormatter;
14use oxios_gateway::message::{IncomingMessage, OutgoingMessage};
15use oxios_gateway::GatewayInbox;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use tokio::sync::{mpsc, watch, Mutex};
19
20use crate::format::CliFormatter;
21use crate::session::Session;
22
23pub struct CliChannel {
28 incoming_rx: Mutex<Option<mpsc::Receiver<IncomingMessage>>>,
31 incoming_tx: mpsc::Sender<IncomingMessage>,
33 session: Arc<std::sync::Mutex<Session>>,
35 formatter: CliFormatter,
37 processing: Arc<AtomicBool>,
40}
41
42impl CliChannel {
43 pub fn new(buffer: usize) -> Self {
45 let (incoming_tx, incoming_rx) = mpsc::channel(buffer);
46 let session = Arc::new(std::sync::Mutex::new(Session::new(None)));
47 let processing = Arc::new(AtomicBool::new(false));
48
49 Self {
50 incoming_rx: Mutex::new(Some(incoming_rx)),
51 incoming_tx,
52 session,
53 formatter: CliFormatter,
54 processing,
55 }
56 }
57
58 pub fn sender(&self) -> mpsc::Sender<IncomingMessage> {
60 self.incoming_tx.clone()
61 }
62
63 pub fn handle(&self) -> CliChannelHandle {
65 CliChannelHandle {
66 incoming_tx: self.incoming_tx.clone(),
67 session: self.session.clone(),
68 processing: self.processing.clone(),
69 }
70 }
71
72 pub fn processing_flag(&self) -> Arc<AtomicBool> {
74 self.processing.clone()
75 }
76}
77
78#[async_trait]
79impl Channel for CliChannel {
80 fn name(&self) -> &str {
81 "cli"
82 }
83
84 async fn start(
85 &self,
86 tx: mpsc::Sender<GatewayInbox>,
87 mut shutdown: watch::Receiver<bool>,
88 ) -> Result<tokio::task::JoinHandle<()>> {
89 let internal_rx = self.incoming_rx.lock().await.take();
90 let Some(mut internal_rx) = internal_rx else {
91 anyhow::bail!("CLI channel already started (no receiver)");
92 };
93 let channel_name = self.name().to_owned();
94
95 let handle = tokio::spawn(async move {
96 loop {
97 tokio::select! {
98 msg = internal_rx.recv() => {
99 match msg {
100 Some(msg) => {
101 if tx.send((channel_name.clone(), msg)).await.is_err() {
102 break; }
104 }
105 None => break,
106 }
107 }
108 _ = shutdown.changed() => break,
109 }
110 }
111 tracing::info!(channel = %channel_name, "CLI channel stopped");
112 });
113
114 Ok(handle)
115 }
116
117 async fn send(&self, msg: OutgoingMessage) -> Result<()> {
118 let output = match &msg.meta {
119 Some(meta) if meta.error.is_some() => self.formatter.format_error(&msg),
120 Some(_) => self.formatter.format_success(&msg),
121 None => msg.content.clone(),
122 };
123 println!("{output}");
124 self.processing.store(false, Ordering::Relaxed);
125 Ok(())
126 }
127}
128
129impl std::fmt::Debug for CliChannel {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct("CliChannel").finish()
132 }
133}
134
135#[derive(Clone)]
137pub struct CliChannelHandle {
138 pub incoming_tx: mpsc::Sender<IncomingMessage>,
140 session: Arc<std::sync::Mutex<Session>>,
142 processing: Arc<AtomicBool>,
144}
145
146impl std::fmt::Debug for CliChannelHandle {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 f.debug_struct("CliChannelHandle").finish()
149 }
150}
151
152impl CliChannelHandle {
153 pub fn from_channel(channel: &CliChannel) -> Self {
155 channel.handle()
156 }
157
158 pub async fn send_user_message(&self, content: String) -> Result<()> {
160 let mut msg = IncomingMessage::new("cli", "cli-user", &content);
161 {
162 let session = self.session.lock().unwrap_or_else(|e| {
163 tracing::error!("Mutex poisoned: {e}");
164 e.into_inner()
165 });
166 msg.metadata
167 .insert("session_id".to_owned(), session.id.to_string());
168 }
169 self.incoming_tx
170 .send(msg)
171 .await
172 .map_err(|e| anyhow::anyhow!("{e}"))?;
173 Ok(())
174 }
175
176 pub fn touch_session(&self) {
178 if let Ok(mut session) = self.session.lock() {
179 session.touch();
180 }
181 }
182
183 pub fn reset_session(&self) {
185 if let Ok(mut session) = self.session.lock() {
186 *session = Session::new(None);
187 }
188 }
189
190 pub fn session_id(&self) -> uuid::Uuid {
192 self.session.lock().map(|s| s.id).unwrap_or_default()
193 }
194
195 pub fn set_processing(&self, value: bool) {
197 self.processing.store(value, Ordering::Relaxed);
198 }
199
200 pub fn is_processing(&self) -> bool {
202 self.processing.load(Ordering::Relaxed)
203 }
204
205 pub async fn send_switch_model(&self, model_id: &str) -> Result<()> {
210 let mut msg = IncomingMessage::new("cli", "cli-user", format!("switch_model: {model_id}"));
211 msg.metadata
212 .insert("action".to_owned(), "switch_model".to_owned());
213 msg.metadata
214 .insert("model_id".to_owned(), model_id.to_owned());
215 {
216 let session = self.session.lock().unwrap_or_else(|e| {
217 tracing::error!("Mutex poisoned: {e}");
218 e.into_inner()
219 });
220 msg.metadata
221 .insert("session_id".to_owned(), session.id.to_string());
222 }
223 self.incoming_tx
224 .send(msg)
225 .await
226 .map_err(|e| anyhow::anyhow!("{e}"))?;
227 Ok(())
228 }
229
230 pub async fn send_switch_persona(&self, persona_id: &str) -> Result<()> {
235 let mut msg =
236 IncomingMessage::new("cli", "cli-user", format!("switch_persona: {persona_id}"));
237 msg.metadata
238 .insert("action".to_owned(), "switch_persona".to_owned());
239 msg.metadata
240 .insert("persona_id".to_owned(), persona_id.to_owned());
241 {
242 let session = self.session.lock().unwrap_or_else(|e| {
243 tracing::error!("Mutex poisoned: {e}");
244 e.into_inner()
245 });
246 msg.metadata
247 .insert("session_id".to_owned(), session.id.to_string());
248 }
249 self.incoming_tx
250 .send(msg)
251 .await
252 .map_err(|e| anyhow::anyhow!("{e}"))?;
253 Ok(())
254 }
255}