1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use std::sync::Arc;
use fuel_core_services::{
stream::{
BoxStream,
IntoBoxStream,
},
SharedMutex,
};
use fuel_core_types::blockchain::primitives::BlockHeight;
use futures::stream::StreamExt;
use tokio::sync::Notify;
use crate::state::State;
#[cfg(test)]
mod tests;
pub(crate) enum IncomingHeight {
Observed(BlockHeight),
Committed(BlockHeight),
}
pub(crate) struct SyncHeights {
height_stream: BoxStream<IncomingHeight>,
state: SharedMutex<State>,
notify: Arc<Notify>,
}
impl SyncHeights {
pub(crate) fn new(
height_stream: BoxStream<BlockHeight>,
committed_height_stream: BoxStream<BlockHeight>,
state: SharedMutex<State>,
notify: Arc<Notify>,
) -> Self {
let height_stream = futures::stream::select(
height_stream.map(IncomingHeight::Observed),
committed_height_stream.map(IncomingHeight::Committed),
)
.into_boxed();
Self {
height_stream,
state,
notify,
}
}
#[tracing::instrument(skip(self))]
pub(crate) async fn sync(&mut self) -> Option<()> {
let height = self.height_stream.next().await?;
let state_change = match height {
IncomingHeight::Committed(height) => {
self.state.apply(|s| s.commit(*height));
false
}
IncomingHeight::Observed(height) => self.state.apply(|s| s.observe(*height)),
};
if state_change {
self.notify.notify_one();
}
Some(())
}
pub(crate) fn map_stream(
&mut self,
f: impl FnOnce(BoxStream<IncomingHeight>) -> BoxStream<IncomingHeight>,
) {
let height_stream = core::mem::replace(
&mut self.height_stream,
futures::stream::pending().into_boxed(),
);
self.height_stream = f(height_stream);
}
}