1use std::sync::Arc;
2
3use futures::{FutureExt, Stream, TryStream, task};
4
5use super::{Inner, Topic};
6
7pub enum Item<I> {
9 Subscribed(I),
11
12 Unhandled(I),
14}
15
16pub struct Subed<S: TryStream, T: Topic> {
18 inner: Arc<Inner<S, T>>,
19 topic: T,
20}
21
22impl<S: TryStream, T: Topic> Subed<S, T> {
23 pub(super) fn new(inner: Arc<Inner<S, T>>, topic: T) -> Self {
24 Self { inner, topic }
25 }
26}
27
28impl<S: TryStream, T: Topic> Drop for Subed<S, T> {
29 fn drop(&mut self) {
30 tracing::trace!("unsubscribing {:?}", self.topic);
31
32 self.inner.wakers.write().unwrap().remove(&self.topic);
33 }
34}
35
36impl<S: TryStream + Stream<Item = Result<S::Ok, S::Error>> + Unpin, T: Topic<Item = S::Ok>> Stream
37 for Subed<S, T>
38{
39 type Item = Result<Item<S::Ok>, S::Error>;
40
41 fn poll_next(
42 self: std::pin::Pin<&mut Self>,
43 cx: &mut std::task::Context<'_>,
44 ) -> std::task::Poll<Option<Self::Item>> {
45 match self.inner.wakers.read().unwrap().get(&self.topic) {
46 Some(waker) => waker.register(cx.waker()),
48
49 None => return task::Poll::Ready(None),
51 };
52
53 let mut stream = futures::ready!(self.inner.stream.lock().poll_unpin(cx));
54 let mut stream = std::pin::Pin::new(&mut *stream);
55
56 match futures::ready!(stream.as_mut().poll_peek(cx)) {
57 Some(Ok(item)) => {
58 let mut topic = T::topic(item);
59 let wakers = self.inner.wakers.read().unwrap();
60
61 if !wakers.contains_key(&topic) {
62 topic = topic.fallback();
63 }
64
65 if let Some(waker) = wakers.get(&topic)
66 && topic != self.topic
67 {
68 waker.wake();
71 task::Poll::Pending
72 } else if topic == self.topic {
73 stream.as_mut().poll_next(cx).map_ok(Item::Subscribed)
75 } else {
76 stream.as_mut().poll_next(cx).map_ok(Item::Unhandled)
78 }
79 }
80
81 Some(_) => stream.as_mut().poll_next(cx).map_ok(Item::Unhandled),
83
84 None => task::Poll::Ready(None),
86 }
87 }
88}