ruchei_route/
lib.rs

1//! [`Sink`]s with routing inspired by ZeroMQ's `ROUTER` sockets.
2//!
3//! This model sits somewhere between explicit connection management and ZMQ-like routing trying to
4//! be a reasonable abstraction around both, with some trade-offs.
5//!
6//! ## [`Sink`]s
7//!
8//! - [`RouteSink::poll_ready_any`]
9//! - [`RouteSink::start_send`]
10//! - [`RouteSink::poll_flush_all`]
11//! - [`RouteSink::poll_close`]
12//!
13//! ## Proper [`RouteSink`]s
14//!
15//! - [`RouteSink::poll_ready`]
16//! - [`RouteSink::start_send`]
17//! - [`RouteSink::poll_flush`]
18//! - [`RouteSink::poll_close`]
19//!
20//! ## [`RouteSink`] as a trait union
21//!
22//! - [`RouteSink::is_routing`]
23//!
24//! ## Dynamicity
25//!
26//! ### Object safety
27//!
28//! [`RouteSink`] method is object safe.
29//!
30//! *However*, we don't provide any methods to upcast to [`Sink`], since we believe plain [`Sink`]
31//! and [`RouteSink`] traits don't represent our target usecases, *specifically networking*, which
32//! involve *`Stream`s* as a necessary component of the object. Since this crate doesn't depend on
33//! `futures-core`, we don't provide `dyn Stream + ...` either. Another consideration is to provide
34//! mechanisms for creating upcasting instead of providing upcasting itself, to, for example, allow
35//! for a more efficient FFI functionality.
36//!
37//! ### FFI
38//!
39//! ***Coming Soon...***
40
41#![no_std]
42#![cfg_attr(docsrs, feature(doc_auto_cfg))]
43#![cfg_attr(docsrs, feature(doc_cfg_hide))]
44#![cfg_attr(docsrs, doc(cfg_hide(doc)))]
45
46use core::{
47    pin::Pin,
48    task::{Context, Poll},
49};
50
51use futures_sink::Sink;
52
53/// [`Sink`] with `route` provided to some methods.
54///
55/// This trait is considered weaker than [`Sink<(Route, Msg)>`] (thus is blanked-implemented for it).
56///
57/// Unless specified otherwise by the implementor, all operations on [`RouteSink`] for all routes
58/// (and all route-independent fallible operations) are considered invalid once an error occurs.
59///
60/// ## For [`Sink`]s
61///
62/// This trait is implemented for all [`Sink<(Route, Msg)>`]s:
63///
64/// - [`RouteSink::Error`] forwards [`Sink::Error`]
65/// - [`RouteSink::poll_ready`] discards the route and calls [`RouteSink::poll_ready_any`]
66/// - [`RouteSink::poll_ready_any`] forwards [`Sink::poll_ready`]
67/// - [`RouteSink::start_send`] forwards [`Sink::start_send`]
68/// - [`RouteSink::poll_flush`] discards the route and calls [`RouteSink::poll_flush_all`]
69/// - [`RouteSink::poll_flush_all`] forwards [`Sink::poll_flush`]
70/// - [`RouteSink::poll_close`] forwards [`Sink::poll_close`]
71/// - [`RouteSink::is_routing`] returns `false`
72///
73/// **NOTE**: this arrangement conflates wakers from separate routes as one, potentially leading to
74/// losing track of them. Check for [`is_routing`] to check if the implementation comes from a
75/// [`Sink`] (in which case [`is_routing`] is `false`).
76///
77/// [`is_routing`]: RouteSink::is_routing
78/// [`Sink<(Route, Msg)>`]: Sink
79#[must_use = "sinks do nothing unless polled"]
80pub trait RouteSink<Route, Msg> {
81    /// The type of value produced by the sink when an error occurs.
82    ///
83    /// See also: [`Sink::Error`] ([forwarded][RouteSink#for-sinks] by this associated type)
84    type Error;
85
86    /// Attempts to prepare the [`RouteSink`] to receive a message on a specified route.
87    ///
88    /// Must return `Poll::Ready(Ok(()))` before each call to [`start_send`] on the same route.
89    ///
90    /// See also: [`Sink::poll_ready`], [`RouteSink::poll_ready_any`]
91    ///
92    /// [`start_send`]: RouteSink::start_send
93    fn poll_ready(
94        self: Pin<&mut Self>,
95        route: &Route,
96        cx: &mut Context<'_>,
97    ) -> Poll<Result<(), Self::Error>>;
98
99    /// Attempt to prepare the [`RouteSink`] to receive a message on an arbitrary route.
100    ///
101    /// Forever pending by default. Must be implemented when [`is_routing`] returns `false`.
102    ///
103    /// If this returns `Poll::Ready(Ok(()))`, call to [`start_send`] should be as correct as after
104    /// [`poll_ready`] with a specific route.
105    ///
106    /// See also: [`Sink::poll_ready`] ([forwarded][RouteSink#for-sinks] by this method),
107    /// [`RouteSink::poll_ready`]
108    ///
109    /// [`start_send`]: RouteSink::start_send
110    /// [`poll_ready`]: RouteSink::poll_ready
111    /// [`is_routing`]: RouteSink::is_routing
112    fn poll_ready_any(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113        let _ = cx;
114        Poll::Pending
115    }
116
117    /// Begin the process of sending a message to the [`RouteSink`] on a specified route.
118    ///
119    /// Each call to this method must be preceeded by [`poll_ready`] returning `Poll::Ready(Ok(()))`
120    /// on the same route (or [`poll_ready_any`] without a route).
121    ///
122    /// May or may not trigger the actual sending process. To guarantee the delivery, use either
123    /// [`poll_flush`] on the same route, [`poll_flush_all`] if supported, or [`poll_close`] on the
124    /// whole sink.
125    ///
126    /// See also: [`Sink::start_send`] ([forwarded][RouteSink#for-sinks] by this method)
127    ///
128    /// [`poll_ready`]: RouteSink::poll_ready
129    /// [`poll_ready_any`]: RouteSink::poll_ready_any
130    /// [`poll_flush`]: RouteSink::poll_flush
131    /// [`poll_flush_all`]: RouteSink::poll_flush_all
132    /// [`poll_close`]: RouteSink::poll_close
133    fn start_send(self: Pin<&mut Self>, route: Route, msg: Msg) -> Result<(), Self::Error>;
134
135    /// Flush all the remaining items on a specified route.
136    ///
137    /// Forever pending by default. Must be implemented when [`is_routing`] returns `false`.
138    ///
139    /// Returns `Poll::Ready(Ok(()))` once all items sent via [`start_send`] on that route have been
140    /// flushed from the buffer to the underlying transport.
141    ///
142    /// See also: [`Sink::poll_flush`], [`RouteSink::poll_flush_all`]
143    ///
144    /// [`start_send`]: RouteSink::start_send
145    /// [`is_routing`]: RouteSink::is_routing
146    fn poll_flush(
147        self: Pin<&mut Self>,
148        route: &Route,
149        cx: &mut Context<'_>,
150    ) -> Poll<Result<(), Self::Error>>;
151
152    /// Flush all the routes.
153    ///
154    /// Returns `Poll::Ready(Ok(()))` once all items sent via [`start_send`] on all routes have been
155    /// flushed from the buffer to the underlying transport.
156    ///
157    /// See also: [`Sink::poll_flush`] ([forwarded][RouteSink#for-sinks] by this method),
158    /// [`RouteSink::poll_flush`]
159    ///
160    /// [`start_send`]: RouteSink::start_send
161    /// flushed from the buffer to the underlying transport.
162    fn poll_flush_all(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
163        let _ = cx;
164        Poll::Pending
165    }
166
167    /// Flush and close all the routes.
168    ///
169    /// All other operations on the sink are considered invalid as soon as it has been polled for
170    /// close for the first time.
171    ///
172    /// [`RouteSink`] doesn't provide a method to close a specific route yet, since most interfaces,
173    /// that we've looked into, don't. It might be added later on with an empty (returning
174    /// `Poll::Ready(Ok(()))`) implementation (as not to break compatibility). If you really need
175    /// that type of control, it is recommended to use message transports that do explicit sessions
176    /// or connections, like TCP, [TIPC] or WebSocket.
177    ///
178    /// See also: [`Sink::poll_close`] ([forwarded][RouteSink#for-sinks] by this method)
179    ///
180    /// [TIPC]: <https://tipc.sourceforge.net/>
181    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
182
183    /// Whether this [`RouteSink`] itself is doing any actual routing and keeping wakers separate.
184    ///
185    /// The value must be constant for the whole lifetime of the sink. It is defined on `&self`
186    /// instead of the type itself to keep the trait object-safe.
187    ///
188    /// If the value is `false`, it should be taken as [`poll_ready`] and [`poll_flush`] being
189    /// route-unaware and the whole thing acting like a regular [`Sink`]. This is important for
190    /// systems that normally assume each ready/flushed route state to be independent (thus not
191    /// polling other routes once one of them is done, which leaves them unprocessed forever).
192    ///
193    /// Another implication of not routing is that [`poll_ready_any`] and [`poll_flush_all`] are
194    /// valid and should succeed or fail eventually.
195    ///
196    /// [`poll_ready`]: RouteSink::poll_ready
197    /// [`poll_ready_any`]: RouteSink::poll_ready_any
198    /// [`poll_flush`]: RouteSink::poll_flush
199    /// [`poll_flush_all`]: RouteSink::poll_flush_all
200    #[must_use]
201    fn is_routing(&self) -> bool {
202        true
203    }
204}
205
206/// See [notes][RouteSink#for-sinks]
207#[diagnostic::do_not_recommend]
208impl<Route, Msg, E, T: ?Sized + Sink<(Route, Msg), Error = E>> RouteSink<Route, Msg> for T {
209    type Error = E;
210
211    fn poll_ready(
212        self: Pin<&mut Self>,
213        _: &Route,
214        cx: &mut Context<'_>,
215    ) -> Poll<Result<(), Self::Error>> {
216        self.poll_ready_any(cx)
217    }
218
219    fn poll_ready_any(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220        Sink::poll_ready(self, cx)
221    }
222
223    fn start_send(self: Pin<&mut Self>, route: Route, msg: Msg) -> Result<(), Self::Error> {
224        Sink::start_send(self, (route, msg))
225    }
226
227    fn poll_flush(
228        self: Pin<&mut Self>,
229        _: &Route,
230        cx: &mut Context<'_>,
231    ) -> Poll<Result<(), Self::Error>> {
232        self.poll_flush_all(cx)
233    }
234
235    fn poll_flush_all(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
236        Sink::poll_flush(self, cx)
237    }
238
239    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
240        Sink::poll_close(self, cx)
241    }
242
243    fn is_routing(&self) -> bool {
244        false
245    }
246}
247
248/// [`Sink`] for [`RouteSink`] and a specific route.
249#[derive(Debug)]
250#[must_use = "sinks do nothing unless polled"]
251pub struct WithRoute<'a, T: ?Sized, Route> {
252    route_sink: Pin<&'a mut T>,
253    route: Route,
254}
255
256impl<T: ?Sized, Route> Unpin for WithRoute<'_, T, Route> {}
257
258impl<Route: Clone, Msg, E, T: ?Sized + RouteSink<Route, Msg, Error = E>> Sink<Msg>
259    for WithRoute<'_, T, Route>
260{
261    type Error = E;
262
263    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
264        let this = self.get_mut();
265        this.route_sink.as_mut().poll_ready(&this.route, cx)
266    }
267
268    fn start_send(self: Pin<&mut Self>, msg: Msg) -> Result<(), Self::Error> {
269        let this = self.get_mut();
270        this.route_sink.as_mut().start_send(this.route.clone(), msg)
271    }
272
273    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
274        let this = self.get_mut();
275        this.route_sink.as_mut().poll_flush(&this.route, cx)
276    }
277
278    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
279        self.get_mut().route_sink.as_mut().poll_close(cx)
280    }
281}
282
283/// Extension trait to make [`WithRoute`] from some mutable (pinned) reference to [`RouteSink`].
284pub trait RouteExt<Route> {
285    /// [`RouteSink`] being mutably referred to.
286    type T: ?Sized;
287
288    /// Create a route-specific [`Sink`].
289    fn route(&mut self, route: Route) -> WithRoute<'_, Self::T, Route>;
290}
291
292impl<T: ?Sized, Route> RouteExt<Route> for Pin<&'_ mut T> {
293    type T = T;
294
295    fn route(&mut self, route: Route) -> WithRoute<'_, Self::T, Route> {
296        WithRoute {
297            route_sink: self.as_mut(),
298            route,
299        }
300    }
301}
302
303impl<T: ?Sized + Unpin, Route> RouteExt<Route> for &'_ mut T {
304    type T = T;
305
306    fn route(&mut self, route: Route) -> WithRoute<'_, Self::T, Route> {
307        WithRoute {
308            route_sink: Pin::new(self),
309            route,
310        }
311    }
312}
313
314/// Assert that [`RouteSink`] is derived from [`Sink`].
315#[cfg(feature = "unroute")]
316#[pin_project::pin_project]
317#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
318pub struct Unroute<S>(#[pin] pub S);
319
320#[cfg(feature = "unroute")]
321impl<S> Unroute<S> {
322    /// Create a [`Sink`] out of a [`RouteSink`] and asserts that it's derived from a [`Sink`].
323    ///
324    /// # Panics
325    ///
326    /// Panics if [`RouteSink::is_routing`] is `true`.
327    #[must_use]
328    pub fn new_checked<Route, Msg>(sink: S) -> Self
329    where
330        S: RouteSink<Route, Msg>,
331    {
332        assert!(!sink.is_routing());
333        Self(sink)
334    }
335}
336
337#[cfg(feature = "unroute")]
338impl<S: futures_core::Stream> futures_core::Stream for Unroute<S> {
339    type Item = S::Item;
340
341    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
342        self.project().0.poll_next(cx)
343    }
344}
345
346#[cfg(feature = "unroute")]
347impl<Route, Msg, S: RouteSink<Route, Msg>> Sink<(Route, Msg)> for Unroute<S> {
348    type Error = S::Error;
349
350    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
351        // This used to be a `debug_assert!`. Then we got hit with violation of this precondition in
352        // a release build. On `RouteSink`s statically known to be `Sink`s this should be zero-cost,
353        // as it inlines `false`, and thus removes the assert altogether. So it is now an `assert!`.
354        assert!(!self.0.is_routing());
355        self.project().0.poll_ready_any(cx)
356    }
357
358    fn start_send(self: Pin<&mut Self>, (route, msg): (Route, Msg)) -> Result<(), Self::Error> {
359        assert!(!self.0.is_routing());
360        self.project().0.start_send(route, msg)
361    }
362
363    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
364        assert!(!self.0.is_routing());
365        self.project().0.poll_flush_all(cx)
366    }
367
368    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
369        self.project().0.poll_close(cx)
370    }
371}