hermes_async_runtime_components/subscription/traits/
subscription.rs

1use alloc::boxed::Box;
2use alloc::sync::Arc;
3use core::pin::Pin;
4
5use cgp::prelude::*;
6use futures_core::stream::Stream;
7
8/**
9    A [`Subscription`] is a multi-consumer abstraction over a single-consumer
10    [`Stream`] construct. A [`Subscription`] value can be shared by wrapping
11    it inside an `Arc<dyn Subscription>`. Each call to the
12    [`subscribe`](Self::subscribe) method would optionally return a [`Stream`]
13    that can be used by a single consumer.
14
15    The expected behavior of a [`Subscription`] implementation is that the
16    [`Stream`]s returned from multiple calls to [`subscribe`](Self::subscribe)
17    should yield the same stream of items, modulo the race conditions between
18    each calls and errors from underlying sources.
19
20    A naive implementation of [`Subscription`] would subscribe from multiple
21    underlying sources, such as a network connection, each time
22    [`subscribe`](Self::subscribe) is called. This may be inefficient as each
23    stream would have to open new network connections, but it is simpler and
24    more resilient to error conditions such as network disconnections. A simple
25    way to implement a naive subscription is to use
26    `CanCreateClosureSubscription` to turn a closure into a [`Subscription`].
27
28    A [`Subscription`] implementation could be made efficient by sharing one
29    incoming [`Stream`] with multiple consumers, by multiplexing them to multiple
30    outgoing [`Stream`]s inside a background task. An example implementation of
31    this is `CanStreamSubscription`, which multiplexes a single stream into a
32    [`Subscription`]. A more advanced version of wrapping is provided by
33    `CanMultiplexSubscription`, which wraps around a naive [`Subscription`] and
34    perform both stream multiplexing and auto recovery from a background task by
35    calling the underlying `subscribe` function.
36
37    A [`Subscription`] do not guarantee whether the returned [`Stream`] is
38    finite or infinite (long-running). As a result, the [`Stream`] returned
39    from [`subscribe`](Self::subscribe) may terminate, in case if there is
40    underlying source encounter errors such as network disconnection. However,
41    a long-running consumer may call [`subscribe`](Self::subscribe) again in
42    attempt to obtain a new [`Stream`].
43
44    A [`Subscription`] can be terminated by an underlying controller, such as
45    during program shutdown. When a subscription is terminated, it is expected
46    to return `None` for all subsequent calls to [`subscribe`](Self::subscribe).
47    A long-running consumer can treat the returned `None` as a signal that
48    the subscription is terminated, and in turns terminate itself. The
49    underlying controller is also expected to terminate all currently running
50    [`Stream`]s, so that the running consumers would receive the termination
51    signal.
52*/
53#[async_trait::async_trait]
54pub trait Subscription: Send + Sync + 'static {
55    /**
56       The item that is yielded in the [`Stream`]s returned from
57       [`subscribe`](Self::subscribe).
58    */
59    type Item: Async;
60
61    /**
62       If the subscription is still active, returns a new single consumer
63       [`Stream`] which would produce a stream of items that are produced
64       _after_ the method is called.
65
66       The items produced prior to the call to [`subscribe`](Self::subscribe)
67       are lost. This is to allow the underlying subscription implementation
68       to preserve memory and not store all items that are produced since the
69       subscription is created.
70
71       If the subscription is terminated, the method would return `None`.
72       Callers that receive `None` should expect all subsequent calls to
73       [`subscribe`](Self::subscribe) to also return `None`, and perform
74       appropriate actions for termination.
75    */
76    async fn subscribe(
77        &self,
78    ) -> Option<Pin<Box<dyn Stream<Item = Self::Item> + Send + Sync + 'static>>>;
79}
80
81#[async_trait::async_trait]
82impl<T: Async> Subscription for Box<dyn Subscription<Item = T>> {
83    type Item = T;
84
85    async fn subscribe(
86        &self,
87    ) -> Option<Pin<Box<dyn Stream<Item = Self::Item> + Send + Sync + 'static>>> {
88        self.as_ref().subscribe().await
89    }
90}
91
92#[async_trait::async_trait]
93impl<T: Async> Subscription for Arc<dyn Subscription<Item = T>> {
94    type Item = T;
95
96    async fn subscribe(
97        &self,
98    ) -> Option<Pin<Box<dyn Stream<Item = Self::Item> + Send + Sync + 'static>>> {
99        self.as_ref().subscribe().await
100    }
101}