1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::convert::Infallible;
use futures::{
future::{self, select},
FutureExt,
};
use hyper::server::{conn::AddrIncoming, Builder};
use tokio::{
select,
sync::{mpsc, oneshot},
};
use tracing::{info, trace};
use wheelbuf::WheelBuf;
use super::{
sse_server::{self, BroadcastChannelMessage, ServerSentEvent, SSE_INITIAL_EVENT},
Config, SseData,
};
/// Run the HTTP server.
///
/// `data_receiver` will provide the server with local events which should then be sent to all
/// subscribed clients.
pub(super) async fn run(
config: Config,
builder: Builder<AddrIncoming>,
mut data_receiver: mpsc::UnboundedReceiver<SseData>,
) {
// Event stream channels and filter.
let (broadcaster, mut new_subscriber_info_receiver, sse_filter) =
sse_server::create_channels_and_filter(config.broadcast_channel_size);
let service = warp_json_rpc::service(sse_filter);
// Start the server, passing a oneshot receiver to allow the server to be shut down gracefully.
let make_svc =
hyper::service::make_service_fn(move |_| future::ok::<_, Infallible>(service.clone()));
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let server = builder.serve(make_svc);
info!(address = %server.local_addr(), "started HTTP server");
let server_with_shutdown = server.with_graceful_shutdown(async {
shutdown_receiver.await.ok();
});
let server_joiner = tokio::spawn(server_with_shutdown);
// Initialize the index and buffer for the SSEs.
let mut event_index = 0_u32;
let mut buffer = WheelBuf::new(vec![
SSE_INITIAL_EVENT.clone();
config.event_stream_buffer_length as usize
]);
// Start handling received messages from the two channels; info on new client subscribers and
// incoming events announced by node components.
let event_stream_fut = async {
loop {
select! {
maybe_new_subscriber = new_subscriber_info_receiver.recv() => {
if let Some(subscriber) = maybe_new_subscriber {
// First send the client the `ApiVersion` event. We don't care if this
// errors - the client may have disconnected already.
let _ = subscriber.initial_events_sender.send(SSE_INITIAL_EVENT.clone());
// If the client supplied a "start_from" index, provide the buffered events.
// If they requested more than is buffered, just provide the whole buffer.
if let Some(start_index) = subscriber.start_from {
for event in buffer
.iter()
.skip_while(|event| event.id.unwrap() < start_index)
{
// As per sending `SSE_INITIAL_EVENT`, we don't care if this errors.
let _ = subscriber.initial_events_sender.send(event.clone());
}
}
}
}
maybe_data = data_receiver.recv() => {
match maybe_data {
Some(data) => {
// Buffer the data and broadcast it to subscribed clients.
trace!("Event stream server received {:?}", data);
let event = ServerSentEvent { id: Some(event_index), data };
buffer.push(event.clone());
let message = BroadcastChannelMessage::ServerSentEvent(event);
// This can validly fail if there are no connected clients, so don't log
// the error.
let _ = broadcaster.send(message);
event_index = event_index.wrapping_add(1);
}
None => {
// The data sender has been dropped - exit the loop.
info!("shutting down HTTP server");
break;
}
}
}
}
}
};
// Wait for the event stream future to exit, which will only happen if the last `data_sender`
// paired with `data_receiver` is dropped. `server_joiner` will never return here.
let _ = select(server_joiner, event_stream_fut.boxed()).await;
// Kill the event-stream handlers, and shut down the server.
let _ = broadcaster.send(BroadcastChannelMessage::Shutdown);
let _ = shutdown_sender.send(());
trace!("Event stream server stopped");
}