Skip to main content

reovim_kernel/ipc/
channel.rs

1//! Channel abstractions for inter-subsystem communication.
2//!
3//! This module provides thin wrappers around `std::sync::mpsc` channels,
4//! keeping the kernel runtime-agnostic (no tokio dependency).
5//!
6//! # Design Philosophy
7//!
8//! Following the "mechanisms not policy" principle:
9//! - Provides basic channel primitives
10//! - No opinion on how they should be used
11//! - Runtime integration is left to higher layers
12//!
13//! # Channel Types
14//!
15//! - **Unbounded MPSC**: Multiple producers, single consumer, unlimited buffer
16//! - **Bounded MPSC**: Multiple producers, single consumer, fixed capacity
17//! - **Oneshot**: Single-use channel for request-response patterns
18//!
19//! # Example
20//!
21//! ```
22//! use reovim_kernel::api::v1::*;
23//!
24//! // Unbounded channel
25//! let (tx, rx) = channel::<i32>();
26//! tx.send(42).unwrap();
27//! assert_eq!(rx.recv().unwrap(), 42);
28//!
29//! // Bounded channel with capacity 2
30//! let (tx, rx) = bounded::<i32>(2);
31//! tx.send(1).unwrap();
32//! tx.send(2).unwrap();
33//! // tx.send(3) would block until space is available
34//!
35//! // Oneshot for request-response
36//! let (tx, rx) = oneshot::<String>();
37//! tx.send("response".to_string()).unwrap();
38//! assert_eq!(rx.recv().unwrap(), "response");
39//! ```
40
41use std::{sync::mpsc, time::Duration};
42
43use reovim_arch::sync::Mutex;
44
45// ============================================================================
46// Unbounded Channel
47// ============================================================================
48
49/// Sender for unbounded MPSC channel.
50///
51/// Cloning creates a new sender to the same channel.
52pub struct Sender<T>(mpsc::Sender<T>);
53
54// Manual Clone impl because mpsc::Sender<T> is Clone without requiring T: Clone
55impl<T> Clone for Sender<T> {
56    fn clone(&self) -> Self {
57        Self(self.0.clone())
58    }
59}
60
61/// Receiver for unbounded MPSC channel.
62///
63/// Only one receiver exists per channel.
64pub struct Receiver<T>(mpsc::Receiver<T>);
65
66/// Error returned when sending fails because the receiver was dropped.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SendError<T>(pub T);
69
70impl<T> std::fmt::Display for SendError<T> {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        write!(f, "sending on a closed channel")
73    }
74}
75
76impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
77
78/// Error returned when receiving fails because the channel is empty and closed.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct RecvError;
81
82impl std::fmt::Display for RecvError {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        write!(f, "receiving on a closed channel")
85    }
86}
87
88impl std::error::Error for RecvError {}
89
90/// Error returned when `try_recv` fails.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum TryRecvError {
93    /// No message available.
94    Empty,
95    /// Channel is closed.
96    Disconnected,
97}
98
99impl std::fmt::Display for TryRecvError {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            Self::Empty => write!(f, "channel is empty"),
103            Self::Disconnected => write!(f, "channel is disconnected"),
104        }
105    }
106}
107
108impl std::error::Error for TryRecvError {}
109
110impl<T> Sender<T> {
111    /// Send a value through the channel.
112    ///
113    /// # Errors
114    ///
115    /// Returns `Err(SendError(value))` if the receiver has been dropped.
116    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
117        self.0.send(value).map_err(|e| SendError(e.0))
118    }
119}
120
121impl<T> std::fmt::Debug for Sender<T> {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        f.debug_struct("Sender").finish_non_exhaustive()
124    }
125}
126
127impl<T> Receiver<T> {
128    /// Block until a value is received.
129    ///
130    /// # Errors
131    ///
132    /// Returns `Err(RecvError)` if the channel is closed.
133    pub fn recv(&self) -> Result<T, RecvError> {
134        self.0.recv().map_err(|_| RecvError)
135    }
136
137    /// Try to receive without blocking.
138    ///
139    /// # Errors
140    ///
141    /// Returns `Err(TryRecvError::Empty)` if no message is available,
142    /// or `Err(TryRecvError::Disconnected)` if the channel is closed.
143    pub fn try_recv(&self) -> Result<T, TryRecvError> {
144        self.0.try_recv().map_err(|e| match e {
145            mpsc::TryRecvError::Empty => TryRecvError::Empty,
146            mpsc::TryRecvError::Disconnected => TryRecvError::Disconnected,
147        })
148    }
149
150    /// Block until a value is received or timeout expires.
151    ///
152    /// # Errors
153    ///
154    /// Returns `Err(RecvError)` on timeout or if channel is closed.
155    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvError> {
156        self.0.recv_timeout(timeout).map_err(|_| RecvError)
157    }
158
159    /// Create an iterator over received values.
160    pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
161        self.0.iter()
162    }
163
164    /// Try to receive all available values without blocking.
165    pub fn try_iter(&self) -> impl Iterator<Item = T> + '_ {
166        self.0.try_iter()
167    }
168}
169
170impl<T> std::fmt::Debug for Receiver<T> {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("Receiver").finish_non_exhaustive()
173    }
174}
175
176/// Create an unbounded MPSC channel.
177///
178/// # Example
179///
180/// ```
181/// use reovim_kernel::api::v1::*;
182///
183/// let (tx, rx) = channel::<i32>();
184/// tx.send(42).unwrap();
185/// assert_eq!(rx.recv().unwrap(), 42);
186/// ```
187#[must_use]
188pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
189    let (tx, rx) = mpsc::channel();
190    (Sender(tx), Receiver(rx))
191}
192
193// ============================================================================
194// Bounded Channel
195// ============================================================================
196
197/// Sender for bounded MPSC channel.
198pub struct BoundedSender<T>(mpsc::SyncSender<T>);
199
200// Manual Clone impl because mpsc::SyncSender<T> is Clone without requiring T: Clone
201impl<T> Clone for BoundedSender<T> {
202    fn clone(&self) -> Self {
203        Self(self.0.clone())
204    }
205}
206
207/// Receiver for bounded MPSC channel (same as unbounded).
208pub struct BoundedReceiver<T>(mpsc::Receiver<T>);
209
210/// Error returned when `try_send` fails.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum TrySendError<T> {
213    /// Channel is full.
214    Full(T),
215    /// Channel is closed.
216    Disconnected(T),
217}
218
219impl<T> std::fmt::Display for TrySendError<T> {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        match self {
222            Self::Full(_) => write!(f, "channel is full"),
223            Self::Disconnected(_) => write!(f, "channel is disconnected"),
224        }
225    }
226}
227
228impl<T: std::fmt::Debug> std::error::Error for TrySendError<T> {}
229
230impl<T> BoundedSender<T> {
231    /// Send a value, blocking if the channel is full.
232    ///
233    /// # Errors
234    ///
235    /// Returns `Err(SendError(value))` if the receiver has been dropped.
236    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
237        self.0.send(value).map_err(|e| SendError(e.0))
238    }
239
240    /// Try to send without blocking.
241    ///
242    /// # Errors
243    ///
244    /// Returns `Err(TrySendError::Full(value))` if the channel is full,
245    /// or `Err(TrySendError::Disconnected(value))` if the receiver was dropped.
246    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
247        self.0.try_send(value).map_err(|e| match e {
248            mpsc::TrySendError::Full(v) => TrySendError::Full(v),
249            mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
250        })
251    }
252}
253
254impl<T> std::fmt::Debug for BoundedSender<T> {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        f.debug_struct("BoundedSender").finish_non_exhaustive()
257    }
258}
259
260impl<T> BoundedReceiver<T> {
261    /// Block until a value is received.
262    ///
263    /// # Errors
264    ///
265    /// Returns `Err(RecvError)` if the channel is closed.
266    pub fn recv(&self) -> Result<T, RecvError> {
267        self.0.recv().map_err(|_| RecvError)
268    }
269
270    /// Try to receive without blocking.
271    ///
272    /// # Errors
273    ///
274    /// Returns `Err(TryRecvError::Empty)` if no message is available,
275    /// or `Err(TryRecvError::Disconnected)` if the channel is closed.
276    #[cfg_attr(coverage_nightly, coverage(off))]
277    pub fn try_recv(&self) -> Result<T, TryRecvError> {
278        self.0.try_recv().map_err(|e| match e {
279            mpsc::TryRecvError::Empty => TryRecvError::Empty,
280            mpsc::TryRecvError::Disconnected => TryRecvError::Disconnected,
281        })
282    }
283
284    /// Block until a value is received or timeout expires.
285    ///
286    /// # Errors
287    ///
288    /// Returns `Err(RecvError)` on timeout or if the channel is closed.
289    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvError> {
290        self.0.recv_timeout(timeout).map_err(|_| RecvError)
291    }
292
293    /// Create an iterator over received values.
294    pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
295        self.0.iter()
296    }
297
298    /// Try to receive all available values without blocking.
299    pub fn try_iter(&self) -> impl Iterator<Item = T> + '_ {
300        self.0.try_iter()
301    }
302}
303
304impl<T> std::fmt::Debug for BoundedReceiver<T> {
305    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
306        f.debug_struct("BoundedReceiver").finish_non_exhaustive()
307    }
308}
309
310/// Create a bounded MPSC channel with the specified capacity.
311///
312/// Senders will block when the channel is full.
313///
314/// # Example
315///
316/// ```
317/// use reovim_kernel::api::v1::*;
318///
319/// let (tx, rx) = bounded::<i32>(2);
320/// tx.send(1).unwrap();
321/// tx.send(2).unwrap();
322/// // tx.send(3) would block until space is available
323///
324/// assert_eq!(rx.recv().unwrap(), 1);
325/// assert_eq!(rx.recv().unwrap(), 2);
326/// ```
327#[must_use]
328pub fn bounded<T>(capacity: usize) -> (BoundedSender<T>, BoundedReceiver<T>) {
329    let (tx, rx) = mpsc::sync_channel(capacity);
330    (BoundedSender(tx), BoundedReceiver(rx))
331}
332
333// ============================================================================
334// Oneshot Channel
335// ============================================================================
336
337/// Sender for oneshot channel.
338///
339/// Can only send a single value.
340pub struct OneshotSender<T> {
341    inner: Mutex<Option<mpsc::SyncSender<T>>>,
342}
343
344/// Receiver for oneshot channel.
345///
346/// Can only receive a single value.
347pub struct OneshotReceiver<T>(mpsc::Receiver<T>);
348
349impl<T> OneshotSender<T> {
350    /// Send the value, consuming the sender.
351    ///
352    /// This can only be called once.
353    ///
354    /// # Errors
355    ///
356    /// Returns `Err(value)` if the receiver has been dropped.
357    #[cfg_attr(coverage_nightly, coverage(off))]
358    pub fn send(self, value: T) -> Result<(), T> {
359        let sender = self.inner.lock().take();
360        match sender {
361            Some(tx) => tx.send(value).map_err(|e| e.0),
362            None => Err(value),
363        }
364    }
365
366    /// Check if the receiver is still waiting.
367    #[must_use]
368    pub fn is_connected(&self) -> bool {
369        self.inner.lock().is_some()
370    }
371}
372
373impl<T> std::fmt::Debug for OneshotSender<T> {
374    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375        f.debug_struct("OneshotSender")
376            .field("connected", &self.is_connected())
377            .finish()
378    }
379}
380
381impl<T> OneshotReceiver<T> {
382    /// Block until the value is received.
383    ///
384    /// # Errors
385    ///
386    /// Returns `Err(RecvError)` if the sender was dropped without sending.
387    pub fn recv(self) -> Result<T, RecvError> {
388        self.0.recv().map_err(|_| RecvError)
389    }
390
391    /// Try to receive without blocking.
392    ///
393    /// # Errors
394    ///
395    /// Returns `Err(TryRecvError::Empty)` if no value is available yet,
396    /// or `Err(TryRecvError::Disconnected)` if the sender was dropped.
397    pub fn try_recv(&self) -> Result<T, TryRecvError> {
398        self.0.try_recv().map_err(|e| match e {
399            mpsc::TryRecvError::Empty => TryRecvError::Empty,
400            mpsc::TryRecvError::Disconnected => TryRecvError::Disconnected,
401        })
402    }
403
404    /// Block until received or timeout expires.
405    ///
406    /// # Errors
407    ///
408    /// Returns `Err(RecvError)` on timeout or if the sender was dropped.
409    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvError> {
410        self.0.recv_timeout(timeout).map_err(|_| RecvError)
411    }
412}
413
414impl<T> std::fmt::Debug for OneshotReceiver<T> {
415    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416        f.debug_struct("OneshotReceiver").finish_non_exhaustive()
417    }
418}
419
420/// Create a oneshot channel for single-value communication.
421///
422/// Useful for request-response patterns where exactly one response is expected.
423///
424/// # Example
425///
426/// ```
427/// use reovim_kernel::api::v1::*;
428/// use std::thread;
429///
430/// let (tx, rx) = oneshot::<String>();
431///
432/// thread::spawn(move || {
433///     tx.send("response".to_string()).unwrap();
434/// });
435///
436/// assert_eq!(rx.recv().unwrap(), "response");
437/// ```
438#[must_use]
439pub fn oneshot<T>() -> (OneshotSender<T>, OneshotReceiver<T>) {
440    let (tx, rx) = mpsc::sync_channel(1);
441    (
442        OneshotSender {
443            inner: Mutex::new(Some(tx)),
444        },
445        OneshotReceiver(rx),
446    )
447}