p2panda_stream/stream/
ingest.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6
7use futures_channel::mpsc::{self};
8use futures_util::stream::{Fuse, FusedStream};
9use futures_util::task::{Context, Poll};
10use futures_util::{ready, Sink, Stream, StreamExt};
11use p2panda_core::prune::PruneFlag;
12use p2panda_core::{Body, Extension, Extensions, Header, Operation};
13use p2panda_store::{LogStore, OperationStore};
14use pin_project::pin_project;
15use pin_utils::pin_mut;
16
17use crate::macros::{delegate_access_inner, delegate_sink};
18use crate::operation::{ingest_operation, IngestError, IngestResult};
19
20/// An extension trait for `Stream`s that provides a convenient [`ingest`](IngestExt::ingest)
21/// method.
22pub trait IngestExt<S, L, E>: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)> {
23    /// Checks incoming operations against their log integrity and persists them automatically in a
24    /// given store.
25    ///
26    /// Every given operation needs to implement a "prune flag" in their header as specified by the
27    /// p2panda protocol. Ingest will make sure to accordingly validate based on the given prune
28    /// status and automatically remove past items from the log.
29    ///
30    /// This ingest implementation holds an internal buffer for operations which come in "out of
31    /// order". The buffer size determines the maximum number of out-of-order operations in a row
32    /// this method can handle. This means that given a buffer size of for example 100, we can
33    /// handle a worst-case unordered, fully reversed log with 100 items without problem.
34    fn ingest(self, store: S, ooo_buffer_size: usize) -> Ingest<Self, S, L, E>
35    where
36        S: OperationStore<L, E> + LogStore<L, E>,
37        E: Extension<L> + Extension<PruneFlag> + Extensions,
38        Self: Sized,
39    {
40        Ingest::new(self, store, ooo_buffer_size)
41    }
42}
43
44impl<T: ?Sized, S, L, E> IngestExt<S, L, E> for T where
45    T: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
46{
47}
48
49/// Stream for the [`ingest`](IngestExt::ingest) method.
50#[derive(Debug)]
51#[pin_project]
52#[must_use = "streams do nothing unless polled"]
53pub struct Ingest<St, S, L, E>
54where
55    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
56    E: Extension<L> + Extension<PruneFlag> + Extensions,
57    S: OperationStore<L, E> + LogStore<L, E>,
58{
59    #[pin]
60    stream: Fuse<St>,
61    store: S,
62    ooo_buffer_size: usize,
63    ooo_buffer_tx: mpsc::Sender<IngestAttempt<E>>,
64    #[pin]
65    ooo_buffer_rx: mpsc::Receiver<IngestAttempt<E>>,
66    _marker: PhantomData<L>,
67}
68
69impl<St, S, L, E> Ingest<St, S, L, E>
70where
71    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
72    S: OperationStore<L, E> + LogStore<L, E>,
73    E: Extension<L> + Extension<PruneFlag> + Extensions,
74{
75    pub(super) fn new(stream: St, store: S, ooo_buffer_size: usize) -> Ingest<St, S, L, E> {
76        // @TODO(adz): We can optimize for the internal out-of-order buffer even more as it's FIFO
77        // nature is not optimal. A sorted list (by seq num, maybe even grouped by public key)
78        // might be more efficient, though I'm not sure about optimal implementations yet, so
79        // benchmarks and more real-world experience might make sense before we attempt any of
80        // this.
81        //
82        // Also, using an mpsc for the internal buffer seems overkill.
83        let (ooo_buffer_tx, ooo_buffer_rx) = mpsc::channel::<IngestAttempt<E>>(ooo_buffer_size);
84
85        Ingest {
86            store,
87            stream: stream.fuse(),
88            ooo_buffer_size,
89            ooo_buffer_tx,
90            ooo_buffer_rx,
91            _marker: PhantomData,
92        }
93    }
94
95    delegate_access_inner!(stream, St, (.));
96}
97
98impl<St, S, L, E> Stream for Ingest<St, S, L, E>
99where
100    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
101    S: OperationStore<L, E> + LogStore<L, E>,
102    E: Extension<L> + Extension<PruneFlag> + Extensions,
103{
104    type Item = Result<Operation<E>, IngestError>;
105
106    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
107        let mut store = self.store.clone();
108        let mut this = self.project();
109        let mut park_buffer = false;
110
111        loop {
112            // 1. Pull in the next item from the external stream or out-of-order buffer.
113            let res = {
114                // If the buffer ran full we prioritize pulling from it first, re-attempting
115                // ingest. This avoids clogging up the pipeline.
116                if this.ooo_buffer_rx.size_hint().0 == *this.ooo_buffer_size {
117                    ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
118                } else {
119                    // Otherwise prefer pulling from the external stream first as freshly incoming
120                    // data should be prioritized.
121                    match this.stream.as_mut().poll_next(cx) {
122                        Poll::Ready(Some((header, body, header_bytes))) => {
123                            Some(IngestAttempt(header, body, header_bytes, 1))
124                        }
125                        Poll::Pending => {
126                            // If we're getting back to the buffer queue after a failed ingest
127                            // attempt, we should "park" here instead and allow the runtime to try
128                            // polling the stream again next time.
129                            //
130                            // Otherwise we run into a loop where the runtime will never have the
131                            // chance again to take in new operations and we end up exhausting our
132                            // re-attempt counter for no reason.
133                            if park_buffer {
134                                return Poll::Pending;
135                            }
136                            ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
137                        }
138                        Poll::Ready(None) => match this.ooo_buffer_rx.as_mut().poll_next(cx) {
139                            Poll::Ready(Some(attempt)) => Some(attempt),
140                            // If there's no value coming from the buffer _and_ the external stream is
141                            // terminated, we can be sure nothing will come anymore.
142                            Poll::Pending => None,
143                            Poll::Ready(None) => None,
144                        },
145                    }
146                }
147            };
148            let Some(IngestAttempt(header, body, header_bytes, counter)) = res else {
149                // Both external stream and buffer stream has ended, so we stop here as well.
150                return Poll::Ready(None);
151            };
152
153            // 2. Validate and check the log-integrity of the incoming operation. If it is valid it
154            //    get's persisted and the log optionally pruned.
155            let ingest_fut = async {
156                let log_id: L = header
157                    .extract()
158                    .ok_or(IngestError::MissingHeaderExtension("log_id".into()))?;
159                let prune_flag: PruneFlag = header
160                    .extract()
161                    .ok_or(IngestError::MissingHeaderExtension("prune_flag".into()))?;
162                ingest_operation::<S, L, E>(
163                    &mut store,
164                    header,
165                    body,
166                    header_bytes,
167                    &log_id,
168                    prune_flag.is_set(),
169                )
170                .await
171            };
172            pin_mut!(ingest_fut);
173            let ingest_res = ready!(ingest_fut.poll(cx));
174
175            // 3. If the operation arrived out-of-order we can push it back into the internal
176            //    buffer and try again later (attempted for a configured number of times),
177            //    otherwise forward the result of ingest to the consumer.
178            match ingest_res {
179                Ok(IngestResult::Retry(header, body, header_bytes, num_missing)) => {
180                    // The number of max. reattempts is equal the size of the buffer. As long as
181                    // the buffer is just a FIFO queue it doesn't make sense to optimize over
182                    // different parameters as in a worst-case distribution of items (exact
183                    // reverse) this will be the max. and min. required bound.
184                    if counter > *this.ooo_buffer_size {
185                        return Poll::Ready(Some(Err(IngestError::MaxAttemptsReached(
186                            num_missing,
187                        ))));
188                    }
189
190                    // Push operation back into the internal queue, if something goes wrong here
191                    // this must be an critical failure.
192                    let Ok(_) = ready!(this.ooo_buffer_tx.poll_ready(cx)) else {
193                        break Poll::Ready(None);
194                    };
195
196                    let Ok(_) = this.ooo_buffer_tx.start_send(IngestAttempt(
197                        header,
198                        body,
199                        header_bytes,
200                        counter + 1,
201                    )) else {
202                        break Poll::Ready(None);
203                    };
204
205                    // In the next iteration we should prioritize the stream again.
206                    park_buffer = true;
207
208                    continue;
209                }
210                Ok(IngestResult::Complete(operation)) => {
211                    return Poll::Ready(Some(Ok(operation)));
212                }
213                Err(err) => {
214                    // Ingest failed and we want the stream consumers to be aware of that.
215                    return Poll::Ready(Some(Err(err)));
216                }
217            }
218        }
219    }
220}
221
222impl<St: FusedStream, S, L, E> FusedStream for Ingest<St, S, L, E>
223where
224    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
225    S: OperationStore<L, E> + LogStore<L, E>,
226    E: Extension<L> + Extension<PruneFlag> + Extensions,
227{
228    fn is_terminated(&self) -> bool {
229        self.stream.is_terminated() && self.ooo_buffer_rx.is_terminated()
230    }
231}
232
233impl<St, S, L, E> Sink<(Header<E>, Option<Body>, Vec<u8>)> for Ingest<St, S, L, E>
234where
235    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
236        + Sink<(Header<E>, Option<Body>, Vec<u8>)>,
237    S: OperationStore<L, E> + LogStore<L, E>,
238    E: Extension<L> + Extension<PruneFlag> + Extensions,
239{
240    type Error = St::Error;
241
242    delegate_sink!(stream, (Header<E>, Option<Body>, Vec<u8>));
243}
244
245#[derive(Debug)]
246struct IngestAttempt<E>(Header<E>, Option<Body>, Vec<u8>, usize);
247
248#[cfg(test)]
249mod tests {
250    use std::time::Duration;
251
252    use futures_util::stream::iter;
253    use futures_util::{StreamExt, TryStreamExt};
254    use p2panda_core::{Operation, RawOperation};
255    use p2panda_store::MemoryStore;
256    use tokio::sync::mpsc;
257    use tokio::time;
258    use tokio_stream::wrappers::ReceiverStream;
259
260    use crate::operation::IngestError;
261    use crate::stream::decode::DecodeExt;
262    use crate::test_utils::{mock_stream, Extensions, StreamName};
263
264    use super::IngestExt;
265
266    #[tokio::test]
267    async fn ingest() {
268        let store = MemoryStore::<StreamName, Extensions>::new();
269
270        let stream = mock_stream()
271            .take(5)
272            .decode()
273            .filter_map(|item| async {
274                match item {
275                    Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
276                    Err(_) => None,
277                }
278            })
279            .ingest(store, 16);
280
281        let res: Result<Vec<Operation<Extensions>>, IngestError> = stream.try_collect().await;
282        assert!(res.is_ok());
283    }
284
285    #[tokio::test]
286    async fn out_of_order() {
287        let items_num = 10;
288        let store = MemoryStore::<StreamName, Extensions>::new();
289
290        let mut items: Vec<RawOperation> = mock_stream().take(items_num).collect().await;
291        // Reverse all items, to ingest with a worst-case out-of-order sample set.
292        items.reverse();
293
294        let stream = iter(items)
295            .decode()
296            .filter_map(|item| async {
297                match item {
298                    Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
299                    Err(_) => None,
300                }
301            })
302            // Since the sample set ordering is worst-case (fully reversed), it makes sense to keep
303            // the buffer size at least as big as the sample size. Like this we can guarantee that
304            // ingest (and this test) will be successful.
305            .ingest(store, items_num);
306
307        let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
308        assert_eq!(res.len(), items_num);
309    }
310
311    #[tokio::test]
312    async fn exhaust_re_attempts_too_early_bug() {
313        // Related issue: https://github.com/p2panda/p2panda/issues/665
314        let store = MemoryStore::<StreamName, Extensions>::new();
315        let (tx, rx) = mpsc::channel::<RawOperation>(10);
316
317        // Incoming operations in order: 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 (<-- first operation in log
318        // comes last in).
319        let mut operations: Vec<RawOperation> = mock_stream().take(10).collect().await;
320        operations.rotate_left(1);
321
322        tokio::spawn(async move {
323            // Reverse operations and pop one after another from the back to ingest.
324            operations.reverse();
325            while let Some(operation) = operations.pop() {
326                let _ = tx.send(operation).await;
327
328                // Waiting here is crucial to cause the bug: The polling logic will not receive a
329                // new item directly from the stream but rather prioritize the buffer.
330                time::sleep(Duration::from_millis(10)).await;
331            }
332        });
333
334        let stream = ReceiverStream::new(rx)
335            .decode()
336            .filter_map(|item| async {
337                match item {
338                    Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
339                    Err(_) => None,
340                }
341            })
342            .ingest(store, 128); // out-of-order buffer is large enough
343
344        let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
345        assert_eq!(res.len(), 10);
346    }
347}