Skip to main content

telex/
channel.rs

1//! Channel and port primitives for external event sources.
2//!
3//! `ChannelHandle<T>` is a typed inbound channel: external threads send `T`,
4//! the run loop drains them each frame, and render code reads the messages.
5//!
6//! `PortHandle<In, Out>` is a bidirectional port: inbound `In` messages plus
7//! an outbound `Sender<Out>` for sending data back to external systems.
8
9use std::cell::RefCell;
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::mpsc;
13use std::sync::Arc;
14use std::time::Duration;
15
16/// Trait for the run loop to drain all registered channels without knowing
17/// their concrete types.
18pub trait ChannelDrain {
19    /// Move pending messages from the mpsc receiver into the frame buffer.
20    fn drain(&self);
21    /// Clear the frame buffer (called at the start of each frame).
22    fn clear(&self);
23    /// Returns true if there are messages in the current frame buffer.
24    fn has_messages(&self) -> bool;
25}
26
27/// A typed inbound channel handle.
28///
29/// The `tx()` method returns a `Sender<T>` that can be cloned and sent to
30/// other threads. Each frame, the run loop drains the receiver and stores
31/// messages in `messages`. Components read messages with `get()`.
32///
33/// # Example
34/// ```rust,ignore
35/// let ch = channel!(cx, String);
36///
37/// // In an effect, hand the sender to an external thread
38/// let tx = ch.tx();
39/// effect_once!(cx, move || {
40///     std::thread::spawn(move || {
41///         tx.send("hello from thread".to_string()).ok();
42///     });
43///     || {}
44/// });
45///
46/// // In render, read this frame's messages
47/// for msg in ch.get() {
48///     // ...
49/// }
50/// ```
51/// A sender that wakes the event loop when a message is sent.
52///
53/// Wraps `mpsc::Sender<T>` and atomically sets a wake flag on each send,
54/// so the run loop polls with zero timeout on the next iteration.
55pub struct WakingSender<T> {
56    tx: mpsc::Sender<T>,
57    wake: Arc<AtomicBool>,
58}
59
60impl<T> Clone for WakingSender<T> {
61    fn clone(&self) -> Self {
62        Self {
63            tx: self.tx.clone(),
64            wake: Arc::clone(&self.wake),
65        }
66    }
67}
68
69impl<T> WakingSender<T> {
70    /// Send a value, waking the event loop.
71    pub fn send(&self, value: T) -> Result<(), mpsc::SendError<T>> {
72        let result = self.tx.send(value);
73        if result.is_ok() {
74            self.wake.store(true, Ordering::Release);
75        }
76        result
77    }
78}
79
80pub struct ChannelHandle<T> {
81    inner: Rc<ChannelInner<T>>,
82}
83
84struct ChannelInner<T> {
85    tx: mpsc::Sender<T>,
86    wake: Arc<AtomicBool>,
87    rx: RefCell<mpsc::Receiver<T>>,
88    messages: RefCell<Vec<T>>,
89}
90
91impl<T> Clone for ChannelHandle<T> {
92    fn clone(&self) -> Self {
93        Self {
94            inner: Rc::clone(&self.inner),
95        }
96    }
97}
98
99impl<T: 'static> ChannelHandle<T> {
100    /// Create a new channel handle with an event-loop wake flag.
101    pub fn new(wake: Arc<AtomicBool>) -> Self {
102        let (tx, rx) = mpsc::channel();
103        Self {
104            inner: Rc::new(ChannelInner {
105                tx,
106                wake,
107                rx: RefCell::new(rx),
108                messages: RefCell::new(Vec::new()),
109            }),
110        }
111    }
112
113    /// Get a cloneable sender that wakes the event loop on send.
114    pub fn tx(&self) -> WakingSender<T> {
115        WakingSender {
116            tx: self.inner.tx.clone(),
117            wake: Arc::clone(&self.inner.wake),
118        }
119    }
120
121    /// Get this frame's messages (populated by the run loop's drain pass).
122    pub fn get(&self) -> Vec<T>
123    where
124        T: Clone,
125    {
126        self.inner.messages.borrow().clone()
127    }
128
129    /// Get the number of messages this frame.
130    pub fn len(&self) -> usize {
131        self.inner.messages.borrow().len()
132    }
133
134    /// Check if there are no messages this frame.
135    pub fn is_empty(&self) -> bool {
136        self.inner.messages.borrow().is_empty()
137    }
138}
139
140impl<T: 'static> ChannelDrain for ChannelHandle<T> {
141    fn drain(&self) {
142        let rx = self.inner.rx.borrow();
143        let mut messages = self.inner.messages.borrow_mut();
144        loop {
145            match rx.try_recv() {
146                Ok(msg) => messages.push(msg),
147                Err(mpsc::TryRecvError::Empty) => break,
148                Err(mpsc::TryRecvError::Disconnected) => break,
149            }
150        }
151    }
152
153    fn clear(&self) {
154        self.inner.messages.borrow_mut().clear();
155    }
156
157    fn has_messages(&self) -> bool {
158        !self.inner.messages.borrow().is_empty()
159    }
160}
161
162/// A bidirectional port handle: inbound `In` + outbound `Sender<Out>`.
163///
164/// Use `port.rx` for inbound messages and `port.tx()` for the outbound sender.
165///
166/// # Example
167/// ```rust,ignore
168/// let midi = port!(cx, MidiIn, MidiOut);
169///
170/// // Send the port's senders to an external connection
171/// let tx_in = midi.rx.tx();   // external thread sends MidiIn here
172/// let tx_out = midi.tx();     // component sends MidiOut here
173///
174/// effect_once!(cx, move || {
175///     let conn = start_midi(tx_in, tx_out);
176///     move || drop(conn)
177/// });
178///
179/// // Read inbound messages
180/// for msg in midi.rx.get() { /* ... */ }
181/// ```
182pub struct PortHandle<In, Out> {
183    /// Inbound channel (external -> component).
184    pub rx: ChannelHandle<In>,
185    /// Outbound sender (component -> external).
186    outbound_tx: mpsc::Sender<Out>,
187    /// Outbound receiver (for external thread to consume).
188    outbound_rx: Rc<RefCell<Option<mpsc::Receiver<Out>>>>,
189}
190
191impl<In, Out> Clone for PortHandle<In, Out> {
192    fn clone(&self) -> Self {
193        Self {
194            rx: self.rx.clone(),
195            outbound_tx: self.outbound_tx.clone(),
196            outbound_rx: Rc::clone(&self.outbound_rx),
197        }
198    }
199}
200
201impl<In: 'static, Out: 'static> PortHandle<In, Out> {
202    /// Create a new bidirectional port with an event-loop wake flag.
203    pub fn new(wake: Arc<AtomicBool>) -> Self {
204        let (outbound_tx, outbound_rx) = mpsc::channel();
205        Self {
206            rx: ChannelHandle::new(wake),
207            outbound_tx,
208            outbound_rx: Rc::new(RefCell::new(Some(outbound_rx))),
209        }
210    }
211
212    /// Get a cloneable `Sender<Out>` for sending outbound messages.
213    pub fn tx(&self) -> mpsc::Sender<Out> {
214        self.outbound_tx.clone()
215    }
216
217    /// Take the outbound receiver (can only be called once).
218    ///
219    /// This is intended for passing to an external thread that consumes
220    /// outbound messages. Returns `None` if already taken.
221    pub fn take_outbound_rx(&self) -> Option<mpsc::Receiver<Out>> {
222        self.outbound_rx.borrow_mut().take()
223    }
224}
225
226/// Internal handle for `use_interval`. Spawns a timer thread and invokes
227/// a callback each frame that ticks were received.
228pub(crate) struct IntervalHandle {
229    inner: Rc<IntervalInner>,
230}
231
232struct IntervalInner {
233    rx: RefCell<mpsc::Receiver<()>>,
234    callback: RefCell<Option<Rc<dyn Fn()>>>,
235    ticked: RefCell<bool>,
236}
237
238impl Clone for IntervalHandle {
239    fn clone(&self) -> Self {
240        Self {
241            inner: Rc::clone(&self.inner),
242        }
243    }
244}
245
246impl IntervalHandle {
247    /// Create a new interval that fires at the given duration.
248    pub(crate) fn new(duration: Duration, callback: Rc<dyn Fn()>, wake: Arc<AtomicBool>) -> Self {
249        let (tx, rx) = mpsc::channel();
250
251        // Spawn timer thread
252        std::thread::spawn(move || loop {
253            std::thread::sleep(duration);
254            if tx.send(()).is_err() {
255                break; // Receiver dropped, stop
256            }
257            wake.store(true, Ordering::Release);
258        });
259
260        Self {
261            inner: Rc::new(IntervalInner {
262                rx: RefCell::new(rx),
263                callback: RefCell::new(Some(callback)),
264                ticked: RefCell::new(false),
265            }),
266        }
267    }
268}
269
270impl ChannelDrain for IntervalHandle {
271    fn drain(&self) {
272        let rx = self.inner.rx.borrow();
273        let mut ticked = false;
274        loop {
275            match rx.try_recv() {
276                Ok(()) => ticked = true,
277                Err(mpsc::TryRecvError::Empty) => break,
278                Err(mpsc::TryRecvError::Disconnected) => break,
279            }
280        }
281        if ticked {
282            *self.inner.ticked.borrow_mut() = true;
283            if let Some(cb) = self.inner.callback.borrow().as_ref() {
284                cb();
285            }
286        }
287    }
288
289    fn clear(&self) {
290        *self.inner.ticked.borrow_mut() = false;
291    }
292
293    fn has_messages(&self) -> bool {
294        *self.inner.ticked.borrow()
295    }
296}