reovim_kernel/ipc/channel.rs
1//! Channel abstractions for inter-subsystem communication.
2//!
3//! This module provides thin wrappers around `std::sync::mpsc` channels,
4//! keeping the kernel runtime-agnostic (no tokio dependency).
5//!
6//! # Design Philosophy
7//!
8//! Following the "mechanisms not policy" principle:
9//! - Provides basic channel primitives
10//! - No opinion on how they should be used
11//! - Runtime integration is left to higher layers
12//!
13//! # Channel Types
14//!
15//! - **Unbounded MPSC**: Multiple producers, single consumer, unlimited buffer
16//! - **Bounded MPSC**: Multiple producers, single consumer, fixed capacity
17//! - **Oneshot**: Single-use channel for request-response patterns
18//!
19//! # Example
20//!
21//! ```
22//! use reovim_kernel::api::v1::*;
23//!
24//! // Unbounded channel
25//! let (tx, rx) = channel::<i32>();
26//! tx.send(42).unwrap();
27//! assert_eq!(rx.recv().unwrap(), 42);
28//!
29//! // Bounded channel with capacity 2
30//! let (tx, rx) = bounded::<i32>(2);
31//! tx.send(1).unwrap();
32//! tx.send(2).unwrap();
33//! // tx.send(3) would block until space is available
34//!
35//! // Oneshot for request-response
36//! let (tx, rx) = oneshot::<String>();
37//! tx.send("response".to_string()).unwrap();
38//! assert_eq!(rx.recv().unwrap(), "response");
39//! ```
40
41use std::{sync::mpsc, time::Duration};
42
43use reovim_arch::sync::Mutex;
44
45// ============================================================================
46// Unbounded Channel
47// ============================================================================
48
49/// Sender for unbounded MPSC channel.
50///
51/// Cloning creates a new sender to the same channel.
52pub struct Sender<T>(mpsc::Sender<T>);
53
54// Manual Clone impl because mpsc::Sender<T> is Clone without requiring T: Clone
55impl<T> Clone for Sender<T> {
56 fn clone(&self) -> Self {
57 Self(self.0.clone())
58 }
59}
60
61/// Receiver for unbounded MPSC channel.
62///
63/// Only one receiver exists per channel.
64pub struct Receiver<T>(mpsc::Receiver<T>);
65
66/// Error returned when sending fails because the receiver was dropped.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SendError<T>(pub T);
69
70impl<T> std::fmt::Display for SendError<T> {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 write!(f, "sending on a closed channel")
73 }
74}
75
76impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
77
78/// Error returned when receiving fails because the channel is empty and closed.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct RecvError;
81
82impl std::fmt::Display for RecvError {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 write!(f, "receiving on a closed channel")
85 }
86}
87
88impl std::error::Error for RecvError {}
89
90/// Error returned when `try_recv` fails.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum TryRecvError {
93 /// No message available.
94 Empty,
95 /// Channel is closed.
96 Disconnected,
97}
98
99impl std::fmt::Display for TryRecvError {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 Self::Empty => write!(f, "channel is empty"),
103 Self::Disconnected => write!(f, "channel is disconnected"),
104 }
105 }
106}
107
108impl std::error::Error for TryRecvError {}
109
110impl<T> Sender<T> {
111 /// Send a value through the channel.
112 ///
113 /// # Errors
114 ///
115 /// Returns `Err(SendError(value))` if the receiver has been dropped.
116 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
117 self.0.send(value).map_err(|e| SendError(e.0))
118 }
119}
120
121impl<T> std::fmt::Debug for Sender<T> {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 f.debug_struct("Sender").finish_non_exhaustive()
124 }
125}
126
127impl<T> Receiver<T> {
128 /// Block until a value is received.
129 ///
130 /// # Errors
131 ///
132 /// Returns `Err(RecvError)` if the channel is closed.
133 pub fn recv(&self) -> Result<T, RecvError> {
134 self.0.recv().map_err(|_| RecvError)
135 }
136
137 /// Try to receive without blocking.
138 ///
139 /// # Errors
140 ///
141 /// Returns `Err(TryRecvError::Empty)` if no message is available,
142 /// or `Err(TryRecvError::Disconnected)` if the channel is closed.
143 pub fn try_recv(&self) -> Result<T, TryRecvError> {
144 self.0.try_recv().map_err(|e| match e {
145 mpsc::TryRecvError::Empty => TryRecvError::Empty,
146 mpsc::TryRecvError::Disconnected => TryRecvError::Disconnected,
147 })
148 }
149
150 /// Block until a value is received or timeout expires.
151 ///
152 /// # Errors
153 ///
154 /// Returns `Err(RecvError)` on timeout or if channel is closed.
155 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvError> {
156 self.0.recv_timeout(timeout).map_err(|_| RecvError)
157 }
158
159 /// Create an iterator over received values.
160 pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
161 self.0.iter()
162 }
163
164 /// Try to receive all available values without blocking.
165 pub fn try_iter(&self) -> impl Iterator<Item = T> + '_ {
166 self.0.try_iter()
167 }
168}
169
170impl<T> std::fmt::Debug for Receiver<T> {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("Receiver").finish_non_exhaustive()
173 }
174}
175
176/// Create an unbounded MPSC channel.
177///
178/// # Example
179///
180/// ```
181/// use reovim_kernel::api::v1::*;
182///
183/// let (tx, rx) = channel::<i32>();
184/// tx.send(42).unwrap();
185/// assert_eq!(rx.recv().unwrap(), 42);
186/// ```
187#[must_use]
188pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
189 let (tx, rx) = mpsc::channel();
190 (Sender(tx), Receiver(rx))
191}
192
193// ============================================================================
194// Bounded Channel
195// ============================================================================
196
197/// Sender for bounded MPSC channel.
198pub struct BoundedSender<T>(mpsc::SyncSender<T>);
199
200// Manual Clone impl because mpsc::SyncSender<T> is Clone without requiring T: Clone
201impl<T> Clone for BoundedSender<T> {
202 fn clone(&self) -> Self {
203 Self(self.0.clone())
204 }
205}
206
207/// Receiver for bounded MPSC channel (same as unbounded).
208pub struct BoundedReceiver<T>(mpsc::Receiver<T>);
209
210/// Error returned when `try_send` fails.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum TrySendError<T> {
213 /// Channel is full.
214 Full(T),
215 /// Channel is closed.
216 Disconnected(T),
217}
218
219impl<T> std::fmt::Display for TrySendError<T> {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 match self {
222 Self::Full(_) => write!(f, "channel is full"),
223 Self::Disconnected(_) => write!(f, "channel is disconnected"),
224 }
225 }
226}
227
228impl<T: std::fmt::Debug> std::error::Error for TrySendError<T> {}
229
230impl<T> BoundedSender<T> {
231 /// Send a value, blocking if the channel is full.
232 ///
233 /// # Errors
234 ///
235 /// Returns `Err(SendError(value))` if the receiver has been dropped.
236 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
237 self.0.send(value).map_err(|e| SendError(e.0))
238 }
239
240 /// Try to send without blocking.
241 ///
242 /// # Errors
243 ///
244 /// Returns `Err(TrySendError::Full(value))` if the channel is full,
245 /// or `Err(TrySendError::Disconnected(value))` if the receiver was dropped.
246 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
247 self.0.try_send(value).map_err(|e| match e {
248 mpsc::TrySendError::Full(v) => TrySendError::Full(v),
249 mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
250 })
251 }
252}
253
254impl<T> std::fmt::Debug for BoundedSender<T> {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 f.debug_struct("BoundedSender").finish_non_exhaustive()
257 }
258}
259
260impl<T> BoundedReceiver<T> {
261 /// Block until a value is received.
262 ///
263 /// # Errors
264 ///
265 /// Returns `Err(RecvError)` if the channel is closed.
266 pub fn recv(&self) -> Result<T, RecvError> {
267 self.0.recv().map_err(|_| RecvError)
268 }
269
270 /// Try to receive without blocking.
271 ///
272 /// # Errors
273 ///
274 /// Returns `Err(TryRecvError::Empty)` if no message is available,
275 /// or `Err(TryRecvError::Disconnected)` if the channel is closed.
276 #[cfg_attr(coverage_nightly, coverage(off))]
277 pub fn try_recv(&self) -> Result<T, TryRecvError> {
278 self.0.try_recv().map_err(|e| match e {
279 mpsc::TryRecvError::Empty => TryRecvError::Empty,
280 mpsc::TryRecvError::Disconnected => TryRecvError::Disconnected,
281 })
282 }
283
284 /// Block until a value is received or timeout expires.
285 ///
286 /// # Errors
287 ///
288 /// Returns `Err(RecvError)` on timeout or if the channel is closed.
289 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvError> {
290 self.0.recv_timeout(timeout).map_err(|_| RecvError)
291 }
292
293 /// Create an iterator over received values.
294 pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
295 self.0.iter()
296 }
297
298 /// Try to receive all available values without blocking.
299 pub fn try_iter(&self) -> impl Iterator<Item = T> + '_ {
300 self.0.try_iter()
301 }
302}
303
304impl<T> std::fmt::Debug for BoundedReceiver<T> {
305 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
306 f.debug_struct("BoundedReceiver").finish_non_exhaustive()
307 }
308}
309
310/// Create a bounded MPSC channel with the specified capacity.
311///
312/// Senders will block when the channel is full.
313///
314/// # Example
315///
316/// ```
317/// use reovim_kernel::api::v1::*;
318///
319/// let (tx, rx) = bounded::<i32>(2);
320/// tx.send(1).unwrap();
321/// tx.send(2).unwrap();
322/// // tx.send(3) would block until space is available
323///
324/// assert_eq!(rx.recv().unwrap(), 1);
325/// assert_eq!(rx.recv().unwrap(), 2);
326/// ```
327#[must_use]
328pub fn bounded<T>(capacity: usize) -> (BoundedSender<T>, BoundedReceiver<T>) {
329 let (tx, rx) = mpsc::sync_channel(capacity);
330 (BoundedSender(tx), BoundedReceiver(rx))
331}
332
333// ============================================================================
334// Oneshot Channel
335// ============================================================================
336
337/// Sender for oneshot channel.
338///
339/// Can only send a single value.
340pub struct OneshotSender<T> {
341 inner: Mutex<Option<mpsc::SyncSender<T>>>,
342}
343
344/// Receiver for oneshot channel.
345///
346/// Can only receive a single value.
347pub struct OneshotReceiver<T>(mpsc::Receiver<T>);
348
349impl<T> OneshotSender<T> {
350 /// Send the value, consuming the sender.
351 ///
352 /// This can only be called once.
353 ///
354 /// # Errors
355 ///
356 /// Returns `Err(value)` if the receiver has been dropped.
357 #[cfg_attr(coverage_nightly, coverage(off))]
358 pub fn send(self, value: T) -> Result<(), T> {
359 let sender = self.inner.lock().take();
360 match sender {
361 Some(tx) => tx.send(value).map_err(|e| e.0),
362 None => Err(value),
363 }
364 }
365
366 /// Check if the receiver is still waiting.
367 #[must_use]
368 pub fn is_connected(&self) -> bool {
369 self.inner.lock().is_some()
370 }
371}
372
373impl<T> std::fmt::Debug for OneshotSender<T> {
374 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375 f.debug_struct("OneshotSender")
376 .field("connected", &self.is_connected())
377 .finish()
378 }
379}
380
381impl<T> OneshotReceiver<T> {
382 /// Block until the value is received.
383 ///
384 /// # Errors
385 ///
386 /// Returns `Err(RecvError)` if the sender was dropped without sending.
387 pub fn recv(self) -> Result<T, RecvError> {
388 self.0.recv().map_err(|_| RecvError)
389 }
390
391 /// Try to receive without blocking.
392 ///
393 /// # Errors
394 ///
395 /// Returns `Err(TryRecvError::Empty)` if no value is available yet,
396 /// or `Err(TryRecvError::Disconnected)` if the sender was dropped.
397 pub fn try_recv(&self) -> Result<T, TryRecvError> {
398 self.0.try_recv().map_err(|e| match e {
399 mpsc::TryRecvError::Empty => TryRecvError::Empty,
400 mpsc::TryRecvError::Disconnected => TryRecvError::Disconnected,
401 })
402 }
403
404 /// Block until received or timeout expires.
405 ///
406 /// # Errors
407 ///
408 /// Returns `Err(RecvError)` on timeout or if the sender was dropped.
409 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvError> {
410 self.0.recv_timeout(timeout).map_err(|_| RecvError)
411 }
412}
413
414impl<T> std::fmt::Debug for OneshotReceiver<T> {
415 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416 f.debug_struct("OneshotReceiver").finish_non_exhaustive()
417 }
418}
419
420/// Create a oneshot channel for single-value communication.
421///
422/// Useful for request-response patterns where exactly one response is expected.
423///
424/// # Example
425///
426/// ```
427/// use reovim_kernel::api::v1::*;
428/// use std::thread;
429///
430/// let (tx, rx) = oneshot::<String>();
431///
432/// thread::spawn(move || {
433/// tx.send("response".to_string()).unwrap();
434/// });
435///
436/// assert_eq!(rx.recv().unwrap(), "response");
437/// ```
438#[must_use]
439pub fn oneshot<T>() -> (OneshotSender<T>, OneshotReceiver<T>) {
440 let (tx, rx) = mpsc::sync_channel(1);
441 (
442 OneshotSender {
443 inner: Mutex::new(Some(tx)),
444 },
445 OneshotReceiver(rx),
446 )
447}