1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use futures::{future, Future, FutureExt};
use tokio::{
select,
sync::{broadcast, mpsc, oneshot},
task,
};
use tracing::{info, trace};
use wheelbuf::WheelBuf;
use casper_types::ProtocolVersion;
use super::{
sse_server::{BroadcastChannelMessage, NewSubscriberInfo, ServerSentEvent},
Config, SseData,
};
pub(super) async fn run(
config: Config,
api_version: ProtocolVersion,
server_with_shutdown: impl Future<Output = ()> + Send + 'static,
server_shutdown_sender: oneshot::Sender<()>,
mut data_receiver: mpsc::UnboundedReceiver<SseData>,
broadcaster: broadcast::Sender<BroadcastChannelMessage>,
mut new_subscriber_info_receiver: mpsc::UnboundedReceiver<NewSubscriberInfo>,
) {
let server_joiner = task::spawn(server_with_shutdown);
let mut event_index = 0_u32;
let mut buffer = WheelBuf::new(vec![
ServerSentEvent::initial_event(api_version);
config.event_stream_buffer_length as usize
]);
let event_stream_fut = async {
loop {
select! {
maybe_new_subscriber = new_subscriber_info_receiver.recv() => {
if let Some(subscriber) = maybe_new_subscriber {
let _ = subscriber
.initial_events_sender
.send(ServerSentEvent::initial_event(api_version));
if let Some(start_index) = subscriber.start_from {
for event in buffer
.iter()
.skip_while(|event| event.id.unwrap() < start_index)
{
let _ = subscriber.initial_events_sender.send(event.clone());
}
}
}
}
maybe_data = data_receiver.recv() => {
match maybe_data {
Some(data) => {
trace!("Event stream server received {:?}", data);
let event = ServerSentEvent { id: Some(event_index), data };
buffer.push(event.clone());
let message = BroadcastChannelMessage::ServerSentEvent(event);
let _ = broadcaster.send(message);
event_index = event_index.wrapping_add(1);
}
None => {
info!("shutting down HTTP server");
break;
}
}
}
}
}
};
let _ = future::select(server_joiner, event_stream_fut.boxed()).await;
let _ = broadcaster.send(BroadcastChannelMessage::Shutdown);
let _ = server_shutdown_sender.send(());
trace!("Event stream server stopped");
}