codemp/api/controller.rs
1//! # Controller
2//!
3//! A bidirectional stream handler to easily manage asynchronous operations between local buffers
4//! and the server.
5
6use crate::errors::ControllerResult;
7
8// note that we don't use thiserror's #[from] because we don't want the error structs to contain
9// these foreign types, and also we want these to be easily constructable
10
11/// Asynchronous and thread-safe handle to a generic bidirectional stream. Exists as a combination
12/// of [`AsyncSender`] and [`AsyncReceiver`].
13///
14/// This generic trait is implemented by actors managing stream procedures, and will generally
15/// imply a background worker.
16///
17/// Events can be enqueued for dispatching without blocking with [`AsyncSender::send`].
18///
19/// For receiving events from the server, an asynchronous API with [`AsyncReceiver::recv`] is
20/// provided; if that is not feasible, consider using [`AsyncReceiver::callback`] or, alternatively,
21/// [`AsyncReceiver::poll`] combined with [`AsyncReceiver::try_recv`].
22///
23/// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have
24/// been dropped.
25///
26/// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers.
27#[allow(async_fn_in_trait)]
28#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
29pub trait Controller<Tx, Rx = Tx>: AsyncSender<Tx> + AsyncReceiver<Rx>
30where
31 Tx: Sized + Sync + Send,
32 Rx: Sized + Sync + Send,
33{
34}
35
36/// Asynchronous and thread-safe handle to send data over a stream.
37/// See [`Controller`]'s documentation for details.
38///
39/// Details about the receiving end are left to the implementor.
40pub trait AsyncSender<T: Sized + Send + Sync>: Sized + Send + Sync {
41 /// Enqueue a new value to be sent to all other users without blocking
42 fn send(&self, x: T) -> ControllerResult<()>;
43}
44
45/// Asynchronous and thread-safe handle to receive data from a stream.
46/// See [`Controller`]'s documentation for details.
47///
48/// Details about the sender are left to the implementor.
49#[allow(async_fn_in_trait)]
50#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
51pub trait AsyncReceiver<T: Sized + Send + Sync>: Sized + Send + Sync {
52 /// Block until a value is available and returns it.
53 async fn recv(&self) -> ControllerResult<T> {
54 loop {
55 self.poll().await?;
56 if let Some(x) = self.try_recv().await? {
57 break Ok(x);
58 }
59 }
60 }
61
62 /// Register a callback to be called on receive.
63 ///
64 /// There can only be one callback registered at any given time.
65 fn callback(&self, cb: impl Into<ControllerCallback<Self>>);
66
67 /// Clear the currently registered callback.
68 fn clear_callback(&self);
69
70 /// Block until a value is available, without consuming it.
71 async fn poll(&self) -> ControllerResult<()>;
72
73 /// Attempt to receive a value, return None if nothing is currently available.
74 async fn try_recv(&self) -> ControllerResult<Option<T>>;
75}
76
77/// Type wrapper for Boxed dynamic callback.
78pub struct ControllerCallback<T>(pub Box<dyn Sync + Send + Fn(T)>);
79
80impl<T> ControllerCallback<T> {
81 pub(crate) fn call(&self, x: T) {
82 self.0(x)
83 }
84}
85
86impl<T> std::fmt::Debug for ControllerCallback<T> {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 write!(f, "ControllerCallback {{ {:p} }}", self.0)
89 }
90}
91
92impl<T, X: Sync + Send + Fn(T) + 'static> From<X> for ControllerCallback<T> {
93 fn from(value: X) -> Self {
94 Self(Box::new(value))
95 }
96}