ruchei_route/lib.rs
1//! [`Sink`]s with routing inspired by ZeroMQ's `ROUTER` sockets.
2//!
3//! This model sits somewhere between explicit connection management and ZMQ-like routing trying to
4//! be a reasonable abstraction around both, with some trade-offs.
5//!
6//! ## [`Sink`]s
7//!
8//! - [`RouteSink::poll_ready_any`]
9//! - [`RouteSink::start_send`]
10//! - [`RouteSink::poll_flush_all`]
11//! - [`RouteSink::poll_close`]
12//!
13//! ## Proper [`RouteSink`]s
14//!
15//! - [`RouteSink::poll_ready`]
16//! - [`RouteSink::start_send`]
17//! - [`RouteSink::poll_flush`]
18//! - [`RouteSink::poll_close`]
19//!
20//! ## [`RouteSink`] as a trait union
21//!
22//! - [`RouteSink::is_routing`]
23//!
24//! ## Dynamicity
25//!
26//! ### Object safety
27//!
28//! [`RouteSink`] method is object safe.
29//!
30//! *However*, we don't provide any methods to upcast to [`Sink`], since we believe plain [`Sink`]
31//! and [`RouteSink`] traits don't represent our target usecases, *specifically networking*, which
32//! involve *`Stream`s* as a necessary component of the object. Since this crate doesn't depend on
33//! `futures-core`, we don't provide `dyn Stream + ...` either. Another consideration is to provide
34//! mechanisms for creating upcasting instead of providing upcasting itself, to, for example, allow
35//! for a more efficient FFI functionality.
36//!
37//! ### FFI
38//!
39//! ***Coming Soon...***
40
41#![no_std]
42#![cfg_attr(docsrs, feature(doc_auto_cfg))]
43#![cfg_attr(docsrs, feature(doc_cfg_hide))]
44#![cfg_attr(docsrs, doc(cfg_hide(doc)))]
45
46use core::{
47 pin::Pin,
48 task::{Context, Poll},
49};
50
51use futures_sink::Sink;
52
53/// [`Sink`] with `route` provided to some methods.
54///
55/// This trait is considered weaker than [`Sink<(Route, Msg)>`] (thus is blanked-implemented for it).
56///
57/// Unless specified otherwise by the implementor, all operations on [`RouteSink`] for all routes
58/// (and all route-independent fallible operations) are considered invalid once an error occurs.
59///
60/// ## For [`Sink`]s
61///
62/// This trait is implemented for all [`Sink<(Route, Msg)>`]s:
63///
64/// - [`RouteSink::Error`] forwards [`Sink::Error`]
65/// - [`RouteSink::poll_ready`] discards the route and calls [`RouteSink::poll_ready_any`]
66/// - [`RouteSink::poll_ready_any`] forwards [`Sink::poll_ready`]
67/// - [`RouteSink::start_send`] forwards [`Sink::start_send`]
68/// - [`RouteSink::poll_flush`] discards the route and calls [`RouteSink::poll_flush_all`]
69/// - [`RouteSink::poll_flush_all`] forwards [`Sink::poll_flush`]
70/// - [`RouteSink::poll_close`] forwards [`Sink::poll_close`]
71/// - [`RouteSink::is_routing`] returns `false`
72///
73/// **NOTE**: this arrangement conflates wakers from separate routes as one, potentially leading to
74/// losing track of them. Check for [`is_routing`] to check if the implementation comes from a
75/// [`Sink`] (in which case [`is_routing`] is `false`).
76///
77/// [`is_routing`]: RouteSink::is_routing
78/// [`Sink<(Route, Msg)>`]: Sink
79#[must_use = "sinks do nothing unless polled"]
80pub trait RouteSink<Route, Msg> {
81 /// The type of value produced by the sink when an error occurs.
82 ///
83 /// See also: [`Sink::Error`] ([forwarded][RouteSink#for-sinks] by this associated type)
84 type Error;
85
86 /// Attempts to prepare the [`RouteSink`] to receive a message on a specified route.
87 ///
88 /// Must return `Poll::Ready(Ok(()))` before each call to [`start_send`] on the same route.
89 ///
90 /// See also: [`Sink::poll_ready`], [`RouteSink::poll_ready_any`]
91 ///
92 /// [`start_send`]: RouteSink::start_send
93 fn poll_ready(
94 self: Pin<&mut Self>,
95 route: &Route,
96 cx: &mut Context<'_>,
97 ) -> Poll<Result<(), Self::Error>>;
98
99 /// Attempt to prepare the [`RouteSink`] to receive a message on an arbitrary route.
100 ///
101 /// Forever pending by default. Must be implemented when [`is_routing`] returns `false`.
102 ///
103 /// If this returns `Poll::Ready(Ok(()))`, call to [`start_send`] should be as correct as after
104 /// [`poll_ready`] with a specific route.
105 ///
106 /// See also: [`Sink::poll_ready`] ([forwarded][RouteSink#for-sinks] by this method),
107 /// [`RouteSink::poll_ready`]
108 ///
109 /// [`start_send`]: RouteSink::start_send
110 /// [`poll_ready`]: RouteSink::poll_ready
111 /// [`is_routing`]: RouteSink::is_routing
112 fn poll_ready_any(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113 let _ = cx;
114 Poll::Pending
115 }
116
117 /// Begin the process of sending a message to the [`RouteSink`] on a specified route.
118 ///
119 /// Each call to this method must be preceeded by [`poll_ready`] returning `Poll::Ready(Ok(()))`
120 /// on the same route (or [`poll_ready_any`] without a route).
121 ///
122 /// May or may not trigger the actual sending process. To guarantee the delivery, use either
123 /// [`poll_flush`] on the same route, [`poll_flush_all`] if supported, or [`poll_close`] on the
124 /// whole sink.
125 ///
126 /// See also: [`Sink::start_send`] ([forwarded][RouteSink#for-sinks] by this method)
127 ///
128 /// [`poll_ready`]: RouteSink::poll_ready
129 /// [`poll_ready_any`]: RouteSink::poll_ready_any
130 /// [`poll_flush`]: RouteSink::poll_flush
131 /// [`poll_flush_all`]: RouteSink::poll_flush_all
132 /// [`poll_close`]: RouteSink::poll_close
133 fn start_send(self: Pin<&mut Self>, route: Route, msg: Msg) -> Result<(), Self::Error>;
134
135 /// Flush all the remaining items on a specified route.
136 ///
137 /// Forever pending by default. Must be implemented when [`is_routing`] returns `false`.
138 ///
139 /// Returns `Poll::Ready(Ok(()))` once all items sent via [`start_send`] on that route have been
140 /// flushed from the buffer to the underlying transport.
141 ///
142 /// See also: [`Sink::poll_flush`], [`RouteSink::poll_flush_all`]
143 ///
144 /// [`start_send`]: RouteSink::start_send
145 /// [`is_routing`]: RouteSink::is_routing
146 fn poll_flush(
147 self: Pin<&mut Self>,
148 route: &Route,
149 cx: &mut Context<'_>,
150 ) -> Poll<Result<(), Self::Error>>;
151
152 /// Flush all the routes.
153 ///
154 /// Returns `Poll::Ready(Ok(()))` once all items sent via [`start_send`] on all routes have been
155 /// flushed from the buffer to the underlying transport.
156 ///
157 /// See also: [`Sink::poll_flush`] ([forwarded][RouteSink#for-sinks] by this method),
158 /// [`RouteSink::poll_flush`]
159 ///
160 /// [`start_send`]: RouteSink::start_send
161 /// flushed from the buffer to the underlying transport.
162 fn poll_flush_all(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
163 let _ = cx;
164 Poll::Pending
165 }
166
167 /// Flush and close all the routes.
168 ///
169 /// All other operations on the sink are considered invalid as soon as it has been polled for
170 /// close for the first time.
171 ///
172 /// [`RouteSink`] doesn't provide a method to close a specific route yet, since most interfaces,
173 /// that we've looked into, don't. It might be added later on with an empty (returning
174 /// `Poll::Ready(Ok(()))`) implementation (as not to break compatibility). If you really need
175 /// that type of control, it is recommended to use message transports that do explicit sessions
176 /// or connections, like TCP, [TIPC] or WebSocket.
177 ///
178 /// See also: [`Sink::poll_close`] ([forwarded][RouteSink#for-sinks] by this method)
179 ///
180 /// [TIPC]: <https://tipc.sourceforge.net/>
181 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
182
183 /// Whether this [`RouteSink`] itself is doing any actual routing and keeping wakers separate.
184 ///
185 /// The value must be constant for the whole lifetime of the sink. It is defined on `&self`
186 /// instead of the type itself to keep the trait object-safe.
187 ///
188 /// If the value is `false`, it should be taken as [`poll_ready`] and [`poll_flush`] being
189 /// route-unaware and the whole thing acting like a regular [`Sink`]. This is important for
190 /// systems that normally assume each ready/flushed route state to be independent (thus not
191 /// polling other routes once one of them is done, which leaves them unprocessed forever).
192 ///
193 /// Another implication of not routing is that [`poll_ready_any`] and [`poll_flush_all`] are
194 /// valid and should succeed or fail eventually.
195 ///
196 /// [`poll_ready`]: RouteSink::poll_ready
197 /// [`poll_ready_any`]: RouteSink::poll_ready_any
198 /// [`poll_flush`]: RouteSink::poll_flush
199 /// [`poll_flush_all`]: RouteSink::poll_flush_all
200 #[must_use]
201 fn is_routing(&self) -> bool {
202 true
203 }
204}
205
206/// See [notes][RouteSink#for-sinks]
207#[diagnostic::do_not_recommend]
208impl<Route, Msg, E, T: ?Sized + Sink<(Route, Msg), Error = E>> RouteSink<Route, Msg> for T {
209 type Error = E;
210
211 fn poll_ready(
212 self: Pin<&mut Self>,
213 _: &Route,
214 cx: &mut Context<'_>,
215 ) -> Poll<Result<(), Self::Error>> {
216 self.poll_ready_any(cx)
217 }
218
219 fn poll_ready_any(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220 Sink::poll_ready(self, cx)
221 }
222
223 fn start_send(self: Pin<&mut Self>, route: Route, msg: Msg) -> Result<(), Self::Error> {
224 Sink::start_send(self, (route, msg))
225 }
226
227 fn poll_flush(
228 self: Pin<&mut Self>,
229 _: &Route,
230 cx: &mut Context<'_>,
231 ) -> Poll<Result<(), Self::Error>> {
232 self.poll_flush_all(cx)
233 }
234
235 fn poll_flush_all(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
236 Sink::poll_flush(self, cx)
237 }
238
239 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
240 Sink::poll_close(self, cx)
241 }
242
243 fn is_routing(&self) -> bool {
244 false
245 }
246}
247
248/// [`Sink`] for [`RouteSink`] and a specific route.
249#[derive(Debug)]
250#[must_use = "sinks do nothing unless polled"]
251pub struct WithRoute<'a, T: ?Sized, Route> {
252 route_sink: Pin<&'a mut T>,
253 route: Route,
254}
255
256impl<T: ?Sized, Route> Unpin for WithRoute<'_, T, Route> {}
257
258impl<Route: Clone, Msg, E, T: ?Sized + RouteSink<Route, Msg, Error = E>> Sink<Msg>
259 for WithRoute<'_, T, Route>
260{
261 type Error = E;
262
263 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
264 let this = self.get_mut();
265 this.route_sink.as_mut().poll_ready(&this.route, cx)
266 }
267
268 fn start_send(self: Pin<&mut Self>, msg: Msg) -> Result<(), Self::Error> {
269 let this = self.get_mut();
270 this.route_sink.as_mut().start_send(this.route.clone(), msg)
271 }
272
273 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
274 let this = self.get_mut();
275 this.route_sink.as_mut().poll_flush(&this.route, cx)
276 }
277
278 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
279 self.get_mut().route_sink.as_mut().poll_close(cx)
280 }
281}
282
283/// Extension trait to make [`WithRoute`] from some mutable (pinned) reference to [`RouteSink`].
284pub trait RouteExt<Route> {
285 /// [`RouteSink`] being mutably referred to.
286 type T: ?Sized;
287
288 /// Create a route-specific [`Sink`].
289 fn route(&mut self, route: Route) -> WithRoute<'_, Self::T, Route>;
290}
291
292impl<T: ?Sized, Route> RouteExt<Route> for Pin<&'_ mut T> {
293 type T = T;
294
295 fn route(&mut self, route: Route) -> WithRoute<'_, Self::T, Route> {
296 WithRoute {
297 route_sink: self.as_mut(),
298 route,
299 }
300 }
301}
302
303impl<T: ?Sized + Unpin, Route> RouteExt<Route> for &'_ mut T {
304 type T = T;
305
306 fn route(&mut self, route: Route) -> WithRoute<'_, Self::T, Route> {
307 WithRoute {
308 route_sink: Pin::new(self),
309 route,
310 }
311 }
312}
313
314/// Assert that [`RouteSink`] is derived from [`Sink`].
315#[cfg(feature = "unroute")]
316#[pin_project::pin_project]
317#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
318pub struct Unroute<S>(#[pin] pub S);
319
320#[cfg(feature = "unroute")]
321impl<S> Unroute<S> {
322 /// Create a [`Sink`] out of a [`RouteSink`] and asserts that it's derived from a [`Sink`].
323 ///
324 /// # Panics
325 ///
326 /// Panics if [`RouteSink::is_routing`] is `true`.
327 #[must_use]
328 pub fn new_checked<Route, Msg>(sink: S) -> Self
329 where
330 S: RouteSink<Route, Msg>,
331 {
332 assert!(!sink.is_routing());
333 Self(sink)
334 }
335}
336
337#[cfg(feature = "unroute")]
338impl<S: futures_core::Stream> futures_core::Stream for Unroute<S> {
339 type Item = S::Item;
340
341 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
342 self.project().0.poll_next(cx)
343 }
344}
345
346#[cfg(feature = "unroute")]
347impl<Route, Msg, S: RouteSink<Route, Msg>> Sink<(Route, Msg)> for Unroute<S> {
348 type Error = S::Error;
349
350 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
351 // This used to be a `debug_assert!`. Then we got hit with violation of this precondition in
352 // a release build. On `RouteSink`s statically known to be `Sink`s this should be zero-cost,
353 // as it inlines `false`, and thus removes the assert altogether. So it is now an `assert!`.
354 assert!(!self.0.is_routing());
355 self.project().0.poll_ready_any(cx)
356 }
357
358 fn start_send(self: Pin<&mut Self>, (route, msg): (Route, Msg)) -> Result<(), Self::Error> {
359 assert!(!self.0.is_routing());
360 self.project().0.start_send(route, msg)
361 }
362
363 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
364 assert!(!self.0.is_routing());
365 self.project().0.poll_flush_all(cx)
366 }
367
368 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
369 self.project().0.poll_close(cx)
370 }
371}