selium_server/topic/
pubsub.rs1use super::config::SharedTopicConfig;
2use crate::BoxSink;
3use bytes::Bytes;
4use futures::{
5 channel::mpsc::{self, Receiver, Sender},
6 stream::BoxStream,
7 Future, SinkExt, StreamExt,
8};
9use selium_log::{
10 data::LogIterator,
11 message::{Message, MessageSlice},
12 MessageLog,
13};
14use selium_protocol::{BatchPayload, Frame, MessagePayload, Offset};
15use selium_std::errors::{Result, SeliumError, TopicError};
16use std::{pin::Pin, sync::Arc, time::Duration};
17use tokio::select;
18use tokio_stream::StreamMap;
19use tokio_util::sync::CancellationToken;
20
21pub type SharedLog = Arc<MessageLog>;
22pub type ReadFut = Pin<Box<dyn Future<Output = Result<MessageSlice>> + Send>>;
23pub type SleepFut = Pin<Box<dyn Future<Output = ()> + Send>>;
24
25const SOCK_CHANNEL_SIZE: usize = 100;
26
27pub enum Socket {
28 Stream(BoxStream<'static, Result<Frame>>),
29 Sink(BoxSink<Frame, SeliumError>, Offset),
30}
31
32pub struct Subscriber {
33 offset: u64,
34 log: SharedLog,
35 sink: BoxSink<Frame, SeliumError>,
36 buffered_slice: Option<LogIterator>,
37}
38
39impl Subscriber {
40 pub fn new(offset: u64, log: SharedLog, sink: BoxSink<Frame, SeliumError>) -> Self {
41 Self {
42 offset,
43 log: log.clone(),
44 sink,
45 buffered_slice: None,
46 }
47 }
48
49 async fn read_messages(&mut self) {
50 if let Some(slice) = self.buffered_slice.as_mut() {
51 while let Ok(Some(message)) = slice.next().await {
52 let batch_size = message.headers().batch_size();
53 let records = Bytes::copy_from_slice(message.records());
54
55 let frame = if batch_size > 1 {
56 Frame::BatchMessage(BatchPayload {
57 message: records,
58 size: batch_size,
59 })
60 } else {
61 Frame::Message(MessagePayload {
62 headers: None,
63 message: records,
64 })
65 };
66
67 let _ = self.sink.send(frame).await;
68 }
69 }
70 }
71
72 async fn poll_for_messages(&mut self, interval: Duration) -> Result<()> {
73 let slice = self
74 .log
75 .read_slice(self.offset, None)
76 .await
77 .map_err(SeliumError::Log)?;
78
79 self.offset = slice.end_offset();
80 self.buffered_slice = slice.messages();
81
82 if self.buffered_slice.is_some() {
83 self.read_messages().await;
84 } else {
85 tokio::time::sleep(interval).await;
86 }
87
88 Ok(())
89 }
90}
91
92pub struct Subscribers {
93 notify: Receiver<Pin<Box<Subscriber>>>,
94 token: CancellationToken,
95 config: SharedTopicConfig,
96}
97
98impl Subscribers {
99 pub fn new(config: SharedTopicConfig) -> (Sender<Pin<Box<Subscriber>>>, Self) {
100 let (tx, notify) = mpsc::channel(SOCK_CHANNEL_SIZE);
101 let token = CancellationToken::new();
102 let subscribers = Self {
103 notify,
104 token,
105 config,
106 };
107 (tx, subscribers)
108 }
109
110 pub async fn run(&mut self) {
111 while let Some(mut subscriber) = self.notify.next().await {
112 let token = self.token.clone();
113 let polling_interval = self.config.polling_interval;
114
115 tokio::spawn(async move {
116 loop {
117 select! {
118 _ = token.cancelled() => {
119 break;
120 },
121 _ = subscriber.poll_for_messages(polling_interval) => {
122 continue;
123 }
124 }
125 }
126 });
127 }
128 }
129}
130
131pub struct Topic {
132 publishers: StreamMap<usize, BoxStream<'static, Result<Frame>>>,
133 next_stream_id: usize,
134 notify: Sender<Pin<Box<Subscriber>>>,
135 handle: Receiver<Socket>,
136 log: SharedLog,
137}
138
139impl Topic {
140 pub fn pair(log: MessageLog, config: SharedTopicConfig) -> (Self, Sender<Socket>) {
141 let log = Arc::new(log);
142 let (tx, rx) = mpsc::channel(SOCK_CHANNEL_SIZE);
143 let publishers = StreamMap::new();
144 let (notify, mut subscribers) = Subscribers::new(config);
145 tokio::spawn(async move { subscribers.run().await });
146
147 (
148 Self {
149 log,
150 publishers,
151 notify,
152 next_stream_id: 0,
153 handle: rx,
154 },
155 tx,
156 )
157 }
158
159 pub async fn run(&mut self) -> Result<()> {
160 loop {
161 tokio::select! {
162 Some((_, Ok(frame))) = self.publishers.next() => {
163 if let Frame::Message(_) | Frame::BatchMessage(_) = frame {
164 let batch_size = frame.batch_size().unwrap();
165 let message = frame.message().unwrap();
166 let message = Message::batch(message, batch_size, 1);
167 self.log.write(message).await?;
168 }
169 },
170 Some(socket) = self.handle.next() => match socket {
171 Socket::Stream(st) => {
172 self.publishers.insert(self.next_stream_id, st);
173 self.next_stream_id += 1;
174 }
175 Socket::Sink(si, offset) => {
176 let entries = self.log.number_of_entries().await;
177
178 let log_offset = match offset {
179 Offset::FromBeginning(offset) => offset,
180 Offset::FromEnd(offset) => entries.checked_sub(offset).unwrap_or(entries)
181 };
182
183 let subscriber = Box::pin(Subscriber::new(log_offset, self.log.clone(), si));
184
185 self.notify
186 .send(subscriber)
187 .await
188 .map_err(TopicError::NotifySubscribers)?;
189 }
190 }
191 }
192 }
193 }
194}