hermes_async_runtime_components/subscription/traits/subscription.rs
1use alloc::boxed::Box;
2use alloc::sync::Arc;
3use core::pin::Pin;
4
5use cgp::prelude::*;
6use futures_core::stream::Stream;
7
8/**
9 A [`Subscription`] is a multi-consumer abstraction over a single-consumer
10 [`Stream`] construct. A [`Subscription`] value can be shared by wrapping
11 it inside an `Arc<dyn Subscription>`. Each call to the
12 [`subscribe`](Self::subscribe) method would optionally return a [`Stream`]
13 that can be used by a single consumer.
14
15 The expected behavior of a [`Subscription`] implementation is that the
16 [`Stream`]s returned from multiple calls to [`subscribe`](Self::subscribe)
17 should yield the same stream of items, modulo the race conditions between
18 each calls and errors from underlying sources.
19
20 A naive implementation of [`Subscription`] would subscribe from multiple
21 underlying sources, such as a network connection, each time
22 [`subscribe`](Self::subscribe) is called. This may be inefficient as each
23 stream would have to open new network connections, but it is simpler and
24 more resilient to error conditions such as network disconnections. A simple
25 way to implement a naive subscription is to use
26 `CanCreateClosureSubscription` to turn a closure into a [`Subscription`].
27
28 A [`Subscription`] implementation could be made efficient by sharing one
29 incoming [`Stream`] with multiple consumers, by multiplexing them to multiple
30 outgoing [`Stream`]s inside a background task. An example implementation of
31 this is `CanStreamSubscription`, which multiplexes a single stream into a
32 [`Subscription`]. A more advanced version of wrapping is provided by
33 `CanMultiplexSubscription`, which wraps around a naive [`Subscription`] and
34 perform both stream multiplexing and auto recovery from a background task by
35 calling the underlying `subscribe` function.
36
37 A [`Subscription`] do not guarantee whether the returned [`Stream`] is
38 finite or infinite (long-running). As a result, the [`Stream`] returned
39 from [`subscribe`](Self::subscribe) may terminate, in case if there is
40 underlying source encounter errors such as network disconnection. However,
41 a long-running consumer may call [`subscribe`](Self::subscribe) again in
42 attempt to obtain a new [`Stream`].
43
44 A [`Subscription`] can be terminated by an underlying controller, such as
45 during program shutdown. When a subscription is terminated, it is expected
46 to return `None` for all subsequent calls to [`subscribe`](Self::subscribe).
47 A long-running consumer can treat the returned `None` as a signal that
48 the subscription is terminated, and in turns terminate itself. The
49 underlying controller is also expected to terminate all currently running
50 [`Stream`]s, so that the running consumers would receive the termination
51 signal.
52*/
53#[async_trait::async_trait]
54pub trait Subscription: Send + Sync + 'static {
55 /**
56 The item that is yielded in the [`Stream`]s returned from
57 [`subscribe`](Self::subscribe).
58 */
59 type Item: Async;
60
61 /**
62 If the subscription is still active, returns a new single consumer
63 [`Stream`] which would produce a stream of items that are produced
64 _after_ the method is called.
65
66 The items produced prior to the call to [`subscribe`](Self::subscribe)
67 are lost. This is to allow the underlying subscription implementation
68 to preserve memory and not store all items that are produced since the
69 subscription is created.
70
71 If the subscription is terminated, the method would return `None`.
72 Callers that receive `None` should expect all subsequent calls to
73 [`subscribe`](Self::subscribe) to also return `None`, and perform
74 appropriate actions for termination.
75 */
76 async fn subscribe(
77 &self,
78 ) -> Option<Pin<Box<dyn Stream<Item = Self::Item> + Send + Sync + 'static>>>;
79}
80
81#[async_trait::async_trait]
82impl<T: Async> Subscription for Box<dyn Subscription<Item = T>> {
83 type Item = T;
84
85 async fn subscribe(
86 &self,
87 ) -> Option<Pin<Box<dyn Stream<Item = Self::Item> + Send + Sync + 'static>>> {
88 self.as_ref().subscribe().await
89 }
90}
91
92#[async_trait::async_trait]
93impl<T: Async> Subscription for Arc<dyn Subscription<Item = T>> {
94 type Item = T;
95
96 async fn subscribe(
97 &self,
98 ) -> Option<Pin<Box<dyn Stream<Item = Self::Item> + Send + Sync + 'static>>> {
99 self.as_ref().subscribe().await
100 }
101}