use futures::{future, Future, FutureExt};
use tokio::{
select,
sync::{broadcast, mpsc, oneshot},
task,
};
use tracing::{info, trace};
use wheelbuf::WheelBuf;
use casper_types::ProtocolVersion;
use super::{
sse_server::{BroadcastChannelMessage, Id, NewSubscriberInfo, ServerSentEvent},
Config, EventIndex, SseData,
};
pub(super) async fn run(
config: Config,
api_version: ProtocolVersion,
server_with_shutdown: impl Future<Output = ()> + Send + 'static,
server_shutdown_sender: oneshot::Sender<()>,
mut data_receiver: mpsc::UnboundedReceiver<(EventIndex, SseData)>,
broadcaster: broadcast::Sender<BroadcastChannelMessage>,
mut new_subscriber_info_receiver: mpsc::UnboundedReceiver<NewSubscriberInfo>,
) {
let server_joiner = task::spawn(server_with_shutdown);
let mut buffer = WheelBuf::new(vec![
ServerSentEvent::initial_event(api_version);
config.event_stream_buffer_length as usize
]);
let event_stream_fut = async {
loop {
select! {
maybe_new_subscriber = new_subscriber_info_receiver.recv() => {
if let Some(subscriber) = maybe_new_subscriber {
let _ = subscriber
.initial_events_sender
.send(ServerSentEvent::initial_event(api_version));
if let Some(start_index) = subscriber.start_from {
let buffer_size = buffer.capacity() as Id;
let in_wraparound_zone = buffer
.iter()
.next()
.map(|event| {
let id = event.id.unwrap();
id > Id::MAX - buffer_size || id < buffer_size
})
.unwrap_or_default();
for event in buffer.iter().skip_while(|event| {
if in_wraparound_zone {
event.id.unwrap().wrapping_add(buffer_size)
< start_index.wrapping_add(buffer_size)
} else {
event.id.unwrap() < start_index
}
}) {
let _ = subscriber.initial_events_sender.send(event.clone());
}
}
}
}
maybe_data = data_receiver.recv() => {
match maybe_data {
Some((event_index, data)) => {
trace!("Event stream server received {:?}", data);
let event = ServerSentEvent { id: Some(event_index), data };
buffer.push(event.clone());
let message = BroadcastChannelMessage::ServerSentEvent(event);
let _ = broadcaster.send(message);
}
None => {
info!("shutting down HTTP server");
break;
}
}
}
}
}
};
let _ = future::select(server_joiner, event_stream_fut.boxed()).await;
let _ = broadcaster.send(BroadcastChannelMessage::Shutdown);
let _ = server_shutdown_sender.send(());
trace!("Event stream server stopped");
}