codemp/api/
controller.rs

1//! # Controller
2//!
3//! A bidirectional stream handler to easily manage asynchronous operations between local buffers
4//! and the server.
5
6use crate::errors::ControllerResult;
7
8// note that we don't use thiserror's #[from] because we don't want the error structs to contain
9// these foreign types, and also we want these to be easily constructable
10
11/// Asynchronous and thread-safe handle to a generic bidirectional stream. Exists as a combination
12/// of [`AsyncSender`] and [`AsyncReceiver`].
13///
14/// This generic trait is implemented by actors managing stream procedures, and will generally
15/// imply a background worker.
16///
17/// Events can be enqueued for dispatching without blocking with [`AsyncSender::send`].
18///
19/// For receiving events from the server, an asynchronous API with [`AsyncReceiver::recv`] is
20/// provided; if that is not feasible, consider using [`AsyncReceiver::callback`] or, alternatively,
21/// [`AsyncReceiver::poll`] combined with [`AsyncReceiver::try_recv`].
22///
23/// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have
24/// been dropped.
25///
26/// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers.
27#[allow(async_fn_in_trait)]
28#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
29pub trait Controller<Tx, Rx = Tx>: AsyncSender<Tx> + AsyncReceiver<Rx>
30where
31	Tx: Sized + Sync + Send,
32	Rx: Sized + Sync + Send,
33{
34}
35
36/// Asynchronous and thread-safe handle to send data over a stream.
37/// See [`Controller`]'s documentation for details.
38///
39/// Details about the receiving end are left to the implementor.
40pub trait AsyncSender<T: Sized + Send + Sync>: Sized + Send + Sync {
41	/// Enqueue a new value to be sent to all other users without blocking
42	fn send(&self, x: T) -> ControllerResult<()>;
43}
44
45/// Asynchronous and thread-safe handle to receive data from a stream.
46/// See [`Controller`]'s documentation for details.
47///
48/// Details about the sender are left to the implementor.
49#[allow(async_fn_in_trait)]
50#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
51pub trait AsyncReceiver<T: Sized + Send + Sync>: Sized + Send + Sync {
52	/// Block until a value is available and returns it.
53	async fn recv(&self) -> ControllerResult<T> {
54		loop {
55			self.poll().await?;
56			if let Some(x) = self.try_recv().await? {
57				break Ok(x);
58			}
59		}
60	}
61
62	/// Register a callback to be called on receive.
63	///
64	/// There can only be one callback registered at any given time.
65	fn callback(&self, cb: impl Into<ControllerCallback<Self>>);
66
67	/// Clear the currently registered callback.
68	fn clear_callback(&self);
69
70	/// Block until a value is available, without consuming it.
71	async fn poll(&self) -> ControllerResult<()>;
72
73	/// Attempt to receive a value, return None if nothing is currently available.
74	async fn try_recv(&self) -> ControllerResult<Option<T>>;
75}
76
77/// Type wrapper for Boxed dynamic callback.
78pub struct ControllerCallback<T>(pub Box<dyn Sync + Send + Fn(T)>);
79
80impl<T> ControllerCallback<T> {
81	pub(crate) fn call(&self, x: T) {
82		self.0(x)
83	}
84}
85
86impl<T> std::fmt::Debug for ControllerCallback<T> {
87	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88		write!(f, "ControllerCallback {{ {:p} }}", self.0)
89	}
90}
91
92impl<T, X: Sync + Send + Fn(T) + 'static> From<X> for ControllerCallback<T> {
93	fn from(value: X) -> Self {
94		Self(Box::new(value))
95	}
96}