essential_storage/
streams.rs

1use std::time::Duration;
2
3#[cfg(test)]
4mod tests;
5
6/// Notify that there are new contracts or blocks.
7#[derive(Clone)]
8pub struct Notify {
9    contracts: tokio::sync::watch::Sender<()>,
10    blocks: tokio::sync::watch::Sender<()>,
11}
12
13/// Wait for new data.
14#[derive(Clone)]
15pub struct NewData(tokio::sync::watch::Receiver<()>);
16
17/// State of the stream.
18#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
19pub struct StreamState {
20    state: State,
21    start: Start,
22}
23
24#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
25enum State {
26    Pos(Pos),
27    Done,
28}
29
30#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
31struct Pos {
32    page: usize,
33    index: usize,
34}
35
36/// Get data from this point.
37#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
38pub struct GetData {
39    /// Page number to get data from.
40    pub page: usize,
41    /// Time to get data from.
42    pub time: Option<Duration>,
43    /// Number to get data from.
44    pub number: Option<u64>,
45}
46
47#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
48struct Start {
49    time: Option<Duration>,
50    number: Option<u64>,
51}
52
53/// Get the next data in the stream.
54pub async fn next_data<F, Fut, D>(
55    mut new_data: NewData,
56    state: StreamState,
57    page_size: usize,
58    get_data: F,
59) -> Option<(Vec<anyhow::Result<D>>, StreamState)>
60where
61    F: Fn(GetData) -> Fut,
62    Fut: std::future::Future<Output = anyhow::Result<Vec<D>>>,
63{
64    // Check if the stream is done due to returning an error.
65    let pos = match state.state {
66        State::Pos(pos) => pos,
67        State::Done => return None,
68    };
69
70    // Loop while there are no new data to return but there has been a change in contracts.
71    loop {
72        // List the data for this page
73        let data = get_data(GetData {
74            page: pos.page,
75            time: state.start.time,
76            number: state.start.number,
77        })
78        .await;
79
80        match data {
81            // If there are no data for this page, await a change.
82            Ok(data) if data.get(pos.index..).filter(|d| !d.is_empty()).is_none() => {
83                match new_data.wait().await {
84                    // Got a change, get data again.
85                    Ok(_) => continue,
86                    // The new data channel was closed, this means
87                    // the program is shutting down. Close this stream.
88                    Err(_) => return None,
89                }
90            }
91            // There is some data to return.
92            Ok(mut data) => {
93                // Calculate the next page and index.
94                let next_page = if data.len() >= page_size {
95                    Pos {
96                        page: pos.page + 1,
97                        index: 0,
98                    }
99                } else {
100                    Pos {
101                        page: pos.page,
102                        index: data.len(),
103                    }
104                };
105
106                // Drain just the new data (this should never be empty due to the above check).
107                return Some((
108                    data.drain(pos.index..).map(Ok).collect::<Vec<_>>(),
109                    StreamState {
110                        state: State::Pos(next_page),
111                        start: state.start,
112                    },
113                ));
114            }
115            // Got an error so return the error and mark the stream as done.
116            Err(e) => {
117                return Some((
118                    vec![Err(e)],
119                    StreamState {
120                        state: State::Done,
121                        start: state.start,
122                    },
123                ))
124            }
125        }
126    }
127}
128
129impl StreamState {
130    /// Create a new stream state from a page.
131    pub fn new(page: Option<usize>, time: Option<Duration>, number: Option<u64>) -> Self {
132        let page = page.unwrap_or(0);
133        Self {
134            state: State::Pos(Pos { page, index: 0 }),
135            start: Start { time, number },
136        }
137    }
138}
139
140impl Notify {
141    /// Create a new notify.
142    pub fn new() -> Self {
143        let (contracts, _) = tokio::sync::watch::channel(());
144        let (blocks, _) = tokio::sync::watch::channel(());
145        Self { contracts, blocks }
146    }
147
148    /// Notify that there are new contracts.
149    pub fn notify_new_contracts(&self) {
150        // There might not be any subscribers so we
151        // need to ignore the error.
152        let _ = self.contracts.send(());
153    }
154
155    /// Notify that there are new blocks.
156    pub fn notify_new_blocks(&self) {
157        // There might not be any subscribers so we
158        // need to ignore the error.
159        let _ = self.blocks.send(());
160    }
161
162    /// Subscribe to new contracts.
163    pub fn subscribe_contracts(&self) -> NewData {
164        NewData(self.contracts.subscribe())
165    }
166
167    /// Subscribe to new blocks.
168    pub fn subscribe_blocks(&self) -> NewData {
169        NewData(self.blocks.subscribe())
170    }
171}
172
173impl NewData {
174    /// Wait for new data.
175    /// Returns an error if the channel is closed.
176    pub async fn wait(&mut self) -> anyhow::Result<()> {
177        self.0
178            .changed()
179            .await
180            .map_err(|_| anyhow::anyhow!("channel closed"))
181    }
182}
183
184impl Default for Notify {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190impl Default for StreamState {
191    fn default() -> Self {
192        StreamState {
193            state: State::Pos(Pos { page: 0, index: 0 }),
194            start: Start::default(),
195        }
196    }
197}