selium_server/topic/
pubsub.rs

1use super::config::SharedTopicConfig;
2use crate::BoxSink;
3use bytes::Bytes;
4use futures::{
5    channel::mpsc::{self, Receiver, Sender},
6    stream::BoxStream,
7    Future, SinkExt, StreamExt,
8};
9use selium_log::{
10    data::LogIterator,
11    message::{Message, MessageSlice},
12    MessageLog,
13};
14use selium_protocol::{BatchPayload, Frame, MessagePayload, Offset};
15use selium_std::errors::{Result, SeliumError, TopicError};
16use std::{pin::Pin, sync::Arc, time::Duration};
17use tokio::select;
18use tokio_stream::StreamMap;
19use tokio_util::sync::CancellationToken;
20
21pub type SharedLog = Arc<MessageLog>;
22pub type ReadFut = Pin<Box<dyn Future<Output = Result<MessageSlice>> + Send>>;
23pub type SleepFut = Pin<Box<dyn Future<Output = ()> + Send>>;
24
25const SOCK_CHANNEL_SIZE: usize = 100;
26
27pub enum Socket {
28    Stream(BoxStream<'static, Result<Frame>>),
29    Sink(BoxSink<Frame, SeliumError>, Offset),
30}
31
32pub struct Subscriber {
33    offset: u64,
34    log: SharedLog,
35    sink: BoxSink<Frame, SeliumError>,
36    buffered_slice: Option<LogIterator>,
37}
38
39impl Subscriber {
40    pub fn new(offset: u64, log: SharedLog, sink: BoxSink<Frame, SeliumError>) -> Self {
41        Self {
42            offset,
43            log: log.clone(),
44            sink,
45            buffered_slice: None,
46        }
47    }
48
49    async fn read_messages(&mut self) {
50        if let Some(slice) = self.buffered_slice.as_mut() {
51            while let Ok(Some(message)) = slice.next().await {
52                let batch_size = message.headers().batch_size();
53                let records = Bytes::copy_from_slice(message.records());
54
55                let frame = if batch_size > 1 {
56                    Frame::BatchMessage(BatchPayload {
57                        message: records,
58                        size: batch_size,
59                    })
60                } else {
61                    Frame::Message(MessagePayload {
62                        headers: None,
63                        message: records,
64                    })
65                };
66
67                let _ = self.sink.send(frame).await;
68            }
69        }
70    }
71
72    async fn poll_for_messages(&mut self, interval: Duration) -> Result<()> {
73        let slice = self
74            .log
75            .read_slice(self.offset, None)
76            .await
77            .map_err(SeliumError::Log)?;
78
79        self.offset = slice.end_offset();
80        self.buffered_slice = slice.messages();
81
82        if self.buffered_slice.is_some() {
83            self.read_messages().await;
84        } else {
85            tokio::time::sleep(interval).await;
86        }
87
88        Ok(())
89    }
90}
91
92pub struct Subscribers {
93    notify: Receiver<Pin<Box<Subscriber>>>,
94    token: CancellationToken,
95    config: SharedTopicConfig,
96}
97
98impl Subscribers {
99    pub fn new(config: SharedTopicConfig) -> (Sender<Pin<Box<Subscriber>>>, Self) {
100        let (tx, notify) = mpsc::channel(SOCK_CHANNEL_SIZE);
101        let token = CancellationToken::new();
102        let subscribers = Self {
103            notify,
104            token,
105            config,
106        };
107        (tx, subscribers)
108    }
109
110    pub async fn run(&mut self) {
111        while let Some(mut subscriber) = self.notify.next().await {
112            let token = self.token.clone();
113            let polling_interval = self.config.polling_interval;
114
115            tokio::spawn(async move {
116                loop {
117                    select! {
118                        _ = token.cancelled() => {
119                            break;
120                        },
121                        _ = subscriber.poll_for_messages(polling_interval) => {
122                            continue;
123                        }
124                    }
125                }
126            });
127        }
128    }
129}
130
131pub struct Topic {
132    publishers: StreamMap<usize, BoxStream<'static, Result<Frame>>>,
133    next_stream_id: usize,
134    notify: Sender<Pin<Box<Subscriber>>>,
135    handle: Receiver<Socket>,
136    log: SharedLog,
137}
138
139impl Topic {
140    pub fn pair(log: MessageLog, config: SharedTopicConfig) -> (Self, Sender<Socket>) {
141        let log = Arc::new(log);
142        let (tx, rx) = mpsc::channel(SOCK_CHANNEL_SIZE);
143        let publishers = StreamMap::new();
144        let (notify, mut subscribers) = Subscribers::new(config);
145        tokio::spawn(async move { subscribers.run().await });
146
147        (
148            Self {
149                log,
150                publishers,
151                notify,
152                next_stream_id: 0,
153                handle: rx,
154            },
155            tx,
156        )
157    }
158
159    pub async fn run(&mut self) -> Result<()> {
160        loop {
161            tokio::select! {
162                Some((_, Ok(frame))) = self.publishers.next() => {
163                    if let Frame::Message(_) | Frame::BatchMessage(_) = frame {
164                        let batch_size = frame.batch_size().unwrap();
165                        let message = frame.message().unwrap();
166                        let message = Message::batch(message, batch_size, 1);
167                        self.log.write(message).await?;
168                    }
169                },
170                Some(socket) = self.handle.next() => match socket {
171                    Socket::Stream(st) => {
172                        self.publishers.insert(self.next_stream_id, st);
173                        self.next_stream_id += 1;
174                    }
175                    Socket::Sink(si, offset) => {
176                        let entries = self.log.number_of_entries().await;
177
178                        let log_offset = match offset {
179                            Offset::FromBeginning(offset) => offset,
180                            Offset::FromEnd(offset) => entries.checked_sub(offset).unwrap_or(entries)
181                        };
182
183                        let subscriber = Box::pin(Subscriber::new(log_offset, self.log.clone(), si));
184
185                        self.notify
186                            .send(subscriber)
187                            .await
188                            .map_err(TopicError::NotifySubscribers)?;
189                    }
190                }
191            }
192        }
193    }
194}