Skip to main content

oxidio_ctl/
channel.rs

1//! Control channel types for sending commands and receiving state updates.
2
3use tokio::sync::{ broadcast, mpsc };
4
5use oxidio_protocol::{ AppCommand, StateUpdate };
6
7
8/// Default capacity for the command channel.
9const COMMAND_CHANNEL_CAPACITY: usize = 256;
10
11/// Default capacity for the broadcast channel.
12const BROADCAST_CHANNEL_CAPACITY: usize = 64;
13
14
15/// A cloneable handle for sending commands into the control channel.
16///
17/// Any frontend (TUI, web, CLI oneshot) can clone this and send commands.
18#[derive( Clone )]
19pub struct CommandSender {
20    tx: mpsc::Sender<AppCommand>,
21}
22
23
24impl CommandSender {
25    /// Sends a command to the control channel.
26    ///
27    /// @param cmd - The command to send
28    pub async fn send( &self, cmd: AppCommand ) -> Result<(), mpsc::error::SendError<AppCommand>> {
29        self.tx.send( cmd ).await
30    }
31
32
33    /// Tries to send a command without blocking.
34    ///
35    /// Useful for synchronous contexts (e.g., TUI event handlers).
36    ///
37    /// @param cmd - The command to send
38    pub fn try_send( &self, cmd: AppCommand ) -> Result<(), mpsc::error::TrySendError<AppCommand>> {
39        self.tx.try_send( cmd )
40    }
41}
42
43
44/// Handle for creating senders and subscribers to the control channel.
45///
46/// This is the factory for creating client connections to the message bus.
47pub struct ControlChannel {
48    command_tx: mpsc::Sender<AppCommand>,
49    command_rx: Option<mpsc::Receiver<AppCommand>>,
50    broadcast_tx: broadcast::Sender<StateUpdate>,
51}
52
53
54impl ControlChannel {
55    /// Creates a new control channel.
56    ///
57    /// @returns A new ControlChannel instance
58    pub fn new() -> Self {
59        let ( command_tx, command_rx ) = mpsc::channel( COMMAND_CHANNEL_CAPACITY );
60        let ( broadcast_tx, _broadcast_rx ) = broadcast::channel( BROADCAST_CHANNEL_CAPACITY );
61
62        Self {
63            command_tx,
64            command_rx: Some( command_rx ),
65            broadcast_tx,
66        }
67    }
68
69
70    /// Creates a new `CommandSender` for a frontend client.
71    pub fn sender( &self ) -> CommandSender {
72        CommandSender {
73            tx: self.command_tx.clone(),
74        }
75    }
76
77
78    /// Subscribes to state updates from the command processor.
79    ///
80    /// @returns A broadcast receiver for state updates
81    pub fn subscribe( &self ) -> broadcast::Receiver<StateUpdate> {
82        self.broadcast_tx.subscribe()
83    }
84
85
86    /// Takes the command receiver (can only be called once).
87    ///
88    /// This is consumed by the `CommandProcessor` during initialization.
89    ///
90    /// @returns The command receiver, or None if already taken
91    pub fn take_command_rx( &mut self ) -> Option<mpsc::Receiver<AppCommand>> {
92        self.command_rx.take()
93    }
94
95
96    /// Gets a clone of the broadcast sender.
97    ///
98    /// Used by the `CommandProcessor` to broadcast state updates.
99    pub fn broadcast_tx( &self ) -> broadcast::Sender<StateUpdate> {
100        self.broadcast_tx.clone()
101    }
102}