use anyhow::Result;
use async_trait::async_trait;
use oxios_gateway::channel::Channel;
use oxios_gateway::format::ChannelFormatter;
use oxios_gateway::message::{IncomingMessage, OutgoingMessage};
use oxios_gateway::GatewayInbox;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, watch, Mutex};
use crate::format::CliFormatter;
use crate::session::Session;
pub struct CliChannel {
incoming_rx: Mutex<Option<mpsc::Receiver<IncomingMessage>>>,
incoming_tx: mpsc::Sender<IncomingMessage>,
session: Arc<std::sync::Mutex<Session>>,
formatter: CliFormatter,
processing: Arc<AtomicBool>,
}
impl CliChannel {
pub fn new(buffer: usize) -> Self {
let (incoming_tx, incoming_rx) = mpsc::channel(buffer);
let session = Arc::new(std::sync::Mutex::new(Session::new(None)));
let processing = Arc::new(AtomicBool::new(false));
Self {
incoming_rx: Mutex::new(Some(incoming_rx)),
incoming_tx,
session,
formatter: CliFormatter,
processing,
}
}
pub fn sender(&self) -> mpsc::Sender<IncomingMessage> {
self.incoming_tx.clone()
}
pub fn handle(&self) -> CliChannelHandle {
CliChannelHandle {
incoming_tx: self.incoming_tx.clone(),
session: self.session.clone(),
processing: self.processing.clone(),
}
}
pub fn processing_flag(&self) -> Arc<AtomicBool> {
self.processing.clone()
}
}
#[async_trait]
impl Channel for CliChannel {
fn name(&self) -> &str {
"cli"
}
async fn start(
&self,
tx: mpsc::Sender<GatewayInbox>,
mut shutdown: watch::Receiver<bool>,
) -> Result<tokio::task::JoinHandle<()>> {
let internal_rx = self.incoming_rx.lock().await.take();
let Some(mut internal_rx) = internal_rx else {
anyhow::bail!("CLI channel already started (no receiver)");
};
let channel_name = self.name().to_owned();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
msg = internal_rx.recv() => {
match msg {
Some(msg) => {
if tx.send((channel_name.clone(), msg)).await.is_err() {
break; }
}
None => break,
}
}
_ = shutdown.changed() => break,
}
}
tracing::info!(channel = %channel_name, "CLI channel stopped");
});
Ok(handle)
}
async fn send(&self, msg: OutgoingMessage) -> Result<()> {
let output = match &msg.meta {
Some(meta) if meta.error.is_some() => self.formatter.format_error(&msg),
Some(_) => self.formatter.format_success(&msg),
None => msg.content.clone(),
};
println!("{output}");
self.processing.store(false, Ordering::Relaxed);
Ok(())
}
}
impl std::fmt::Debug for CliChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CliChannel").finish()
}
}
#[derive(Clone)]
pub struct CliChannelHandle {
pub incoming_tx: mpsc::Sender<IncomingMessage>,
session: Arc<std::sync::Mutex<Session>>,
processing: Arc<AtomicBool>,
}
impl std::fmt::Debug for CliChannelHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CliChannelHandle").finish()
}
}
impl CliChannelHandle {
pub fn from_channel(channel: &CliChannel) -> Self {
channel.handle()
}
pub async fn send_user_message(&self, content: String) -> Result<()> {
let mut msg = IncomingMessage::new("cli", "cli-user", &content);
{
let session = self.session.lock().unwrap_or_else(|e| {
tracing::error!("Mutex poisoned: {e}");
e.into_inner()
});
msg.metadata
.insert("session_id".to_owned(), session.id.to_string());
}
self.incoming_tx
.send(msg)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(())
}
pub fn touch_session(&self) {
if let Ok(mut session) = self.session.lock() {
session.touch();
}
}
pub fn reset_session(&self) {
if let Ok(mut session) = self.session.lock() {
*session = Session::new(None);
}
}
pub fn session_id(&self) -> uuid::Uuid {
self.session.lock().map(|s| s.id).unwrap_or_default()
}
pub fn set_processing(&self, value: bool) {
self.processing.store(value, Ordering::Relaxed);
}
pub fn is_processing(&self) -> bool {
self.processing.load(Ordering::Relaxed)
}
}