1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
use futures::{future, Future, FutureExt};
use tokio::{
select,
sync::{broadcast, mpsc, oneshot},
task,
};
use tracing::{info, trace};
use wheelbuf::WheelBuf;
use casper_types::ProtocolVersion;
use super::{
sse_server::{BroadcastChannelMessage, Id, NewSubscriberInfo, ServerSentEvent},
Config, EventIndex, SseData,
};
/// Run the HTTP server.
///
/// * `server_with_shutdown` is the actual server as a future which can be gracefully shut down.
/// * `server_shutdown_sender` is the channel by which the server will be notified to shut down.
/// * `data_receiver` will provide the server with local events which should then be sent to all
/// subscribed clients.
/// * `broadcaster` is used by the server to send events to each subscribed client after receiving
/// them via the `data_receiver`.
/// * `new_subscriber_info_receiver` is used to notify the server of the details of a new client
/// having subscribed to the event stream. It allows the server to populate that client's stream
/// with the requested number of historical events.
pub(super) async fn run(
config: Config,
api_version: ProtocolVersion,
server_with_shutdown: impl Future<Output = ()> + Send + 'static,
server_shutdown_sender: oneshot::Sender<()>,
mut data_receiver: mpsc::UnboundedReceiver<(EventIndex, SseData)>,
broadcaster: broadcast::Sender<BroadcastChannelMessage>,
mut new_subscriber_info_receiver: mpsc::UnboundedReceiver<NewSubscriberInfo>,
) {
let server_joiner = task::spawn(server_with_shutdown);
// Initialize the index and buffer for the SSEs.
let mut buffer = WheelBuf::new(vec![
ServerSentEvent::initial_event(api_version);
config.event_stream_buffer_length as usize
]);
// Start handling received messages from the two channels; info on new client subscribers and
// incoming events announced by node components.
let event_stream_fut = async {
loop {
select! {
maybe_new_subscriber = new_subscriber_info_receiver.recv() => {
if let Some(subscriber) = maybe_new_subscriber {
// First send the client the `ApiVersion` event. We don't care if this
// errors - the client may have disconnected already.
let _ = subscriber
.initial_events_sender
.send(ServerSentEvent::initial_event(api_version));
// If the client supplied a "start_from" index, provide the buffered events.
// If they requested more than is buffered, just provide the whole buffer.
if let Some(start_index) = subscriber.start_from {
// If the buffer's first event ID is in the range [0, buffer size) or
// (Id::MAX - buffer size, Id::MAX], then the events in the buffer are
// considered to have their IDs wrapping round, or that was recently the
// case. In this case, we add `buffer.capacity()` to `start_index` and
// the buffered events' IDs when considering which events to include in
// the requested initial events, effectively shifting all the IDs past
// the wrapping transition.
let buffer_size = buffer.capacity() as Id;
let in_wraparound_zone = buffer
.iter()
.next()
.map(|event| {
let id = event.id.unwrap();
id > Id::MAX - buffer_size || id < buffer_size
})
.unwrap_or_default();
for event in buffer.iter().skip_while(|event| {
if in_wraparound_zone {
event.id.unwrap().wrapping_add(buffer_size)
< start_index.wrapping_add(buffer_size)
} else {
event.id.unwrap() < start_index
}
}) {
// As per sending `SSE_INITIAL_EVENT`, we don't care if this errors.
let _ = subscriber.initial_events_sender.send(event.clone());
}
}
}
}
maybe_data = data_receiver.recv() => {
match maybe_data {
Some((event_index, data)) => {
// Buffer the data and broadcast it to subscribed clients.
trace!("Event stream server received {:?}", data);
let event = ServerSentEvent { id: Some(event_index), data };
buffer.push(event.clone());
let message = BroadcastChannelMessage::ServerSentEvent(event);
// This can validly fail if there are no connected clients, so don't log
// the error.
let _ = broadcaster.send(message);
}
None => {
// The data sender has been dropped - exit the loop.
info!("shutting down HTTP server");
break;
}
}
}
}
}
};
// Wait for the event stream future to exit, which will only happen if the last `data_sender`
// paired with `data_receiver` is dropped. `server_joiner` will never return here.
let _ = future::select(server_joiner, event_stream_fut.boxed()).await;
// Kill the event-stream handlers, and shut down the server.
let _ = broadcaster.send(BroadcastChannelMessage::Shutdown);
let _ = server_shutdown_sender.send(());
trace!("Event stream server stopped");
}