1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use crate::{sink::FanoutMany, BoxSink};
use futures::{
    channel::{
        mpsc::{self, Receiver, Sender},
        oneshot,
    },
    ready,
    stream::BoxStream,
    Future, Sink, Stream,
};
use log::error;
use pin_project_lite::pin_project;
use selium_protocol::traits::{ShutdownSink, ShutdownStream};
use selium_std::errors::Result;
use std::{
    fmt::Debug,
    pin::Pin,
    task::{Context, Poll},
};
use tokio_stream::StreamMap;

const SOCK_CHANNEL_SIZE: usize = 100;

pub type TopicShutdown = oneshot::Receiver<()>;

pub enum Socket<T, E> {
    Stream(BoxStream<'static, Result<T>>),
    Sink(BoxSink<T, E>),
}

pin_project! {
    #[project = TopicProj]
    #[must_use = "futures do nothing unless you `.await` or poll them"]
    pub struct Topic<T, E> {
        #[pin]
        stream: StreamMap<usize, BoxStream<'static, Result<T>>>,
        next_stream_id: usize,
        #[pin]
        sink: FanoutMany<usize, BoxSink<T, E>>,
        next_sink_id: usize,
        #[pin]
        handle: Receiver<Socket<T, E>>,
        buffered_item: Option<T>,
    }
}

impl<T, E> Topic<T, E> {
    pub fn pair() -> (Self, Sender<Socket<T, E>>) {
        let (tx, rx) = mpsc::channel(SOCK_CHANNEL_SIZE);

        (
            Self {
                stream: StreamMap::new(),
                next_stream_id: 0,
                sink: FanoutMany::new(),
                next_sink_id: 0,
                handle: rx,
                buffered_item: None,
            },
            tx,
        )
    }
}

impl<T, E> Future for Topic<T, E>
where
    E: Debug + Unpin,
    T: Clone + Unpin,
{
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let TopicProj {
            mut stream,
            next_stream_id,
            mut sink,
            next_sink_id,
            mut handle,
            buffered_item,
        } = self.project();

        loop {
            // If we've got an item buffered already, we need to write it to the sink
            // before we can do anything else.
            if buffered_item.is_some() {
                // Unwrapping is safe as the underlying sink is guaranteed not to error
                ready!(sink.as_mut().poll_ready(cx)).unwrap();
                sink.as_mut()
                    .start_send(buffered_item.take().unwrap())
                    .unwrap();
            }

            match handle.as_mut().poll_next(cx) {
                Poll::Ready(Some(sock)) => match sock {
                    Socket::Stream(st) => {
                        stream.as_mut().insert(*next_stream_id, st);
                        *next_stream_id += 1;
                    }
                    Socket::Sink(si) => {
                        sink.as_mut().insert(*next_sink_id, si);
                        *next_sink_id += 1;
                    }
                },
                // If handle is terminated, the stream is dead
                Poll::Ready(None) => {
                    ready!(sink.as_mut().poll_flush(cx)).unwrap();
                    stream.iter_mut().for_each(|(_, s)| s.shutdown_stream());
                    sink.iter_mut().for_each(|(_, s)| s.shutdown_sink());

                    return Poll::Ready(());
                }
                // If no messages are available and there's no work to do, block this future
                Poll::Pending if stream.is_empty() && buffered_item.is_none() => {
                    return Poll::Pending
                }
                // Otherwise, move on with running the stream
                Poll::Pending => (),
            }

            match stream.as_mut().poll_next(cx) {
                // Received message from an inner stream
                Poll::Ready(Some((_, Ok(item)))) => *buffered_item = Some(item),
                // Encountered an error whilst receiving a message from an inner stream
                Poll::Ready(Some((_, Err(e)))) => {
                    error!("Received invalid message from stream: {e:?}")
                }
                // All streams have finished
                // Unwrapping is safe as the underlying sink is guaranteed not to error
                Poll::Ready(None) => ready!(sink.as_mut().poll_flush(cx)).unwrap(),
                // No messages are available at this time
                Poll::Pending => {
                    // Unwrapping is safe as the underlying sink is guaranteed not to error
                    ready!(sink.poll_flush(cx)).unwrap();
                    return Poll::Pending;
                }
            }
        }
    }
}