essential_storage/
streams.rs1use std::time::Duration;
2
3#[cfg(test)]
4mod tests;
5
6#[derive(Clone)]
8pub struct Notify {
9 contracts: tokio::sync::watch::Sender<()>,
10 blocks: tokio::sync::watch::Sender<()>,
11}
12
13#[derive(Clone)]
15pub struct NewData(tokio::sync::watch::Receiver<()>);
16
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
19pub struct StreamState {
20 state: State,
21 start: Start,
22}
23
24#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
25enum State {
26 Pos(Pos),
27 Done,
28}
29
30#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
31struct Pos {
32 page: usize,
33 index: usize,
34}
35
36#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
38pub struct GetData {
39 pub page: usize,
41 pub time: Option<Duration>,
43 pub number: Option<u64>,
45}
46
47#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
48struct Start {
49 time: Option<Duration>,
50 number: Option<u64>,
51}
52
53pub async fn next_data<F, Fut, D>(
55 mut new_data: NewData,
56 state: StreamState,
57 page_size: usize,
58 get_data: F,
59) -> Option<(Vec<anyhow::Result<D>>, StreamState)>
60where
61 F: Fn(GetData) -> Fut,
62 Fut: std::future::Future<Output = anyhow::Result<Vec<D>>>,
63{
64 let pos = match state.state {
66 State::Pos(pos) => pos,
67 State::Done => return None,
68 };
69
70 loop {
72 let data = get_data(GetData {
74 page: pos.page,
75 time: state.start.time,
76 number: state.start.number,
77 })
78 .await;
79
80 match data {
81 Ok(data) if data.get(pos.index..).filter(|d| !d.is_empty()).is_none() => {
83 match new_data.wait().await {
84 Ok(_) => continue,
86 Err(_) => return None,
89 }
90 }
91 Ok(mut data) => {
93 let next_page = if data.len() >= page_size {
95 Pos {
96 page: pos.page + 1,
97 index: 0,
98 }
99 } else {
100 Pos {
101 page: pos.page,
102 index: data.len(),
103 }
104 };
105
106 return Some((
108 data.drain(pos.index..).map(Ok).collect::<Vec<_>>(),
109 StreamState {
110 state: State::Pos(next_page),
111 start: state.start,
112 },
113 ));
114 }
115 Err(e) => {
117 return Some((
118 vec![Err(e)],
119 StreamState {
120 state: State::Done,
121 start: state.start,
122 },
123 ))
124 }
125 }
126 }
127}
128
129impl StreamState {
130 pub fn new(page: Option<usize>, time: Option<Duration>, number: Option<u64>) -> Self {
132 let page = page.unwrap_or(0);
133 Self {
134 state: State::Pos(Pos { page, index: 0 }),
135 start: Start { time, number },
136 }
137 }
138}
139
140impl Notify {
141 pub fn new() -> Self {
143 let (contracts, _) = tokio::sync::watch::channel(());
144 let (blocks, _) = tokio::sync::watch::channel(());
145 Self { contracts, blocks }
146 }
147
148 pub fn notify_new_contracts(&self) {
150 let _ = self.contracts.send(());
153 }
154
155 pub fn notify_new_blocks(&self) {
157 let _ = self.blocks.send(());
160 }
161
162 pub fn subscribe_contracts(&self) -> NewData {
164 NewData(self.contracts.subscribe())
165 }
166
167 pub fn subscribe_blocks(&self) -> NewData {
169 NewData(self.blocks.subscribe())
170 }
171}
172
173impl NewData {
174 pub async fn wait(&mut self) -> anyhow::Result<()> {
177 self.0
178 .changed()
179 .await
180 .map_err(|_| anyhow::anyhow!("channel closed"))
181 }
182}
183
184impl Default for Notify {
185 fn default() -> Self {
186 Self::new()
187 }
188}
189
190impl Default for StreamState {
191 fn default() -> Self {
192 StreamState {
193 state: State::Pos(Pos { page: 0, index: 0 }),
194 start: Start::default(),
195 }
196 }
197}