oxidio_ctl/channel.rs
1//! Control channel types for sending commands and receiving state updates.
2
3use tokio::sync::{ broadcast, mpsc };
4
5use oxidio_protocol::{ AppCommand, StateUpdate };
6
7
8/// Default capacity for the command channel.
9const COMMAND_CHANNEL_CAPACITY: usize = 256;
10
11/// Default capacity for the broadcast channel.
12const BROADCAST_CHANNEL_CAPACITY: usize = 64;
13
14
15/// A cloneable handle for sending commands into the control channel.
16///
17/// Any frontend (TUI, web, CLI oneshot) can clone this and send commands.
18#[derive( Clone )]
19pub struct CommandSender {
20 tx: mpsc::Sender<AppCommand>,
21}
22
23
24impl CommandSender {
25 /// Sends a command to the control channel.
26 ///
27 /// @param cmd - The command to send
28 pub async fn send( &self, cmd: AppCommand ) -> Result<(), mpsc::error::SendError<AppCommand>> {
29 self.tx.send( cmd ).await
30 }
31
32
33 /// Tries to send a command without blocking.
34 ///
35 /// Useful for synchronous contexts (e.g., TUI event handlers).
36 ///
37 /// @param cmd - The command to send
38 pub fn try_send( &self, cmd: AppCommand ) -> Result<(), mpsc::error::TrySendError<AppCommand>> {
39 self.tx.try_send( cmd )
40 }
41}
42
43
44/// Handle for creating senders and subscribers to the control channel.
45///
46/// This is the factory for creating client connections to the message bus.
47pub struct ControlChannel {
48 command_tx: mpsc::Sender<AppCommand>,
49 command_rx: Option<mpsc::Receiver<AppCommand>>,
50 broadcast_tx: broadcast::Sender<StateUpdate>,
51}
52
53
54impl ControlChannel {
55 /// Creates a new control channel.
56 ///
57 /// @returns A new ControlChannel instance
58 pub fn new() -> Self {
59 let ( command_tx, command_rx ) = mpsc::channel( COMMAND_CHANNEL_CAPACITY );
60 let ( broadcast_tx, _broadcast_rx ) = broadcast::channel( BROADCAST_CHANNEL_CAPACITY );
61
62 Self {
63 command_tx,
64 command_rx: Some( command_rx ),
65 broadcast_tx,
66 }
67 }
68
69
70 /// Creates a new `CommandSender` for a frontend client.
71 pub fn sender( &self ) -> CommandSender {
72 CommandSender {
73 tx: self.command_tx.clone(),
74 }
75 }
76
77
78 /// Subscribes to state updates from the command processor.
79 ///
80 /// @returns A broadcast receiver for state updates
81 pub fn subscribe( &self ) -> broadcast::Receiver<StateUpdate> {
82 self.broadcast_tx.subscribe()
83 }
84
85
86 /// Takes the command receiver (can only be called once).
87 ///
88 /// This is consumed by the `CommandProcessor` during initialization.
89 ///
90 /// @returns The command receiver, or None if already taken
91 pub fn take_command_rx( &mut self ) -> Option<mpsc::Receiver<AppCommand>> {
92 self.command_rx.take()
93 }
94
95
96 /// Gets a clone of the broadcast sender.
97 ///
98 /// Used by the `CommandProcessor` to broadcast state updates.
99 pub fn broadcast_tx( &self ) -> broadcast::Sender<StateUpdate> {
100 self.broadcast_tx.clone()
101 }
102}