subable/
subed.rs

1use std::sync::Arc;
2
3use futures::{FutureExt, Stream, TryStream, task};
4
5use super::{Inner, Topic};
6
7/// A yielded item from a _subscription_.
8pub enum Item<I> {
9    /// The item has been subscribed to.
10    Subscribed(I),
11
12    /// The item is not handled by any subscriber.
13    Unhandled(I),
14}
15
16/// A _subscription_ to a [`Topic`] yielding this topic's message.
17pub struct Subed<S: TryStream, T: Topic> {
18    inner: Arc<Inner<S, T>>,
19    topic: T,
20}
21
22impl<S: TryStream, T: Topic> Subed<S, T> {
23    pub(super) fn new(inner: Arc<Inner<S, T>>, topic: T) -> Self {
24        Self { inner, topic }
25    }
26}
27
28impl<S: TryStream, T: Topic> Drop for Subed<S, T> {
29    fn drop(&mut self) {
30        tracing::trace!("unsubscribing {:?}", self.topic);
31
32        self.inner.wakers.write().unwrap().remove(&self.topic);
33    }
34}
35
36impl<S: TryStream + Stream<Item = Result<S::Ok, S::Error>> + Unpin, T: Topic<Item = S::Ok>> Stream
37    for Subed<S, T>
38{
39    type Item = Result<Item<S::Ok>, S::Error>;
40
41    fn poll_next(
42        self: std::pin::Pin<&mut Self>,
43        cx: &mut std::task::Context<'_>,
44    ) -> std::task::Poll<Option<Self::Item>> {
45        match self.inner.wakers.read().unwrap().get(&self.topic) {
46            // Register the task for wake-up
47            Some(waker) => waker.register(cx.waker()),
48
49            // If the waker isn't registered, that means the stream is closed
50            None => return task::Poll::Ready(None),
51        };
52
53        let mut stream = futures::ready!(self.inner.stream.lock().poll_unpin(cx));
54        let mut stream = std::pin::Pin::new(&mut *stream);
55
56        match futures::ready!(stream.as_mut().poll_peek(cx)) {
57            Some(Ok(item)) => {
58                let mut topic = T::topic(item);
59                let wakers = self.inner.wakers.read().unwrap();
60
61                if !wakers.contains_key(&topic) {
62                    topic = topic.fallback();
63                }
64
65                if let Some(waker) = wakers.get(&topic)
66                    && topic != self.topic
67                {
68                    // The item is destined to another task, wake it and stay pending
69
70                    waker.wake();
71                    task::Poll::Pending
72                } else if topic == self.topic {
73                    // The item is for us, pop it as `Match`
74                    stream.as_mut().poll_next(cx).map_ok(Item::Subscribed)
75                } else {
76                    // The item is unhandled, pop it as `Default`
77                    stream.as_mut().poll_next(cx).map_ok(Item::Unhandled)
78                }
79            }
80
81            // The stream errored, pop it from the stream
82            Some(_) => stream.as_mut().poll_next(cx).map_ok(Item::Unhandled),
83
84            // The stream ended, return `None`
85            None => task::Poll::Ready(None),
86        }
87    }
88}