p2panda_stream/stream/
ingest.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6
7use futures_channel::mpsc::{self};
8use futures_util::stream::{Fuse, FusedStream};
9use futures_util::task::{Context, Poll};
10use futures_util::{FutureExt, Sink, Stream, StreamExt, ready};
11use p2panda_core::prune::PruneFlag;
12use p2panda_core::{Body, Extension, Extensions, Header, Operation};
13use p2panda_store::{LogStore, OperationStore};
14use pin_project::pin_project;
15
16use crate::macros::{delegate_access_inner, delegate_sink};
17use crate::operation::{IngestError, IngestResult, ingest_operation};
18
19/// An extension trait for `Stream`s that provides a convenient [`ingest`](IngestExt::ingest)
20/// method.
21pub trait IngestExt<S, L, E>: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)> {
22    /// Checks incoming operations against their log integrity and persists them automatically in a
23    /// given store.
24    ///
25    /// Every given operation needs to implement a "prune flag" in their header as specified by the
26    /// p2panda protocol. Ingest will make sure to accordingly validate based on the given prune
27    /// status and automatically remove past items from the log.
28    ///
29    /// This ingest implementation holds an internal buffer for operations which come in "out of
30    /// order". The buffer size determines the maximum number of out-of-order operations in a row
31    /// this method can handle. This means that given a buffer size of for example 100, we can
32    /// handle a worst-case unordered, fully reversed log with 100 items without problem.
33    fn ingest(self, store: S, ooo_buffer_size: usize) -> Ingest<Self, S, L, E>
34    where
35        S: OperationStore<L, E> + LogStore<L, E>,
36        E: Extension<L> + Extension<PruneFlag> + Extensions,
37        Self: Sized,
38    {
39        Ingest::new(self, store, ooo_buffer_size)
40    }
41}
42
43impl<T: ?Sized, S, L, E> IngestExt<S, L, E> for T where
44    T: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
45{
46}
47
48/// Stream for the [`ingest`](IngestExt::ingest) method.
49#[pin_project]
50#[must_use = "streams do nothing unless polled"]
51pub struct Ingest<St, S, L, E>
52where
53    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
54    E: Extension<L> + Extension<PruneFlag> + Extensions,
55    S: OperationStore<L, E> + LogStore<L, E>,
56{
57    #[pin]
58    stream: Fuse<St>,
59    store: S,
60    ooo_buffer_size: usize,
61    ooo_buffer_tx: mpsc::Sender<IngestAttempt<E>>,
62    #[pin]
63    ooo_buffer_rx: mpsc::Receiver<IngestAttempt<E>>,
64    ingest_fut: Option<Pin<IngestFut<E>>>,
65    _marker: PhantomData<L>,
66}
67
68impl<St, S, L, E> Ingest<St, S, L, E>
69where
70    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
71    S: OperationStore<L, E> + LogStore<L, E>,
72    E: Extension<L> + Extension<PruneFlag> + Extensions,
73{
74    pub(super) fn new(stream: St, store: S, ooo_buffer_size: usize) -> Ingest<St, S, L, E> {
75        // @TODO(adz): We can optimize for the internal out-of-order buffer even more as it's FIFO
76        // nature is not optimal. A sorted list (by seq num, maybe even grouped by public key)
77        // might be more efficient, though I'm not sure about optimal implementations yet, so
78        // benchmarks and more real-world experience might make sense before we attempt any of
79        // this.
80        //
81        // Also, using an mpsc for the internal buffer seems overkill.
82        let (ooo_buffer_tx, ooo_buffer_rx) = mpsc::channel::<IngestAttempt<E>>(ooo_buffer_size);
83
84        Ingest {
85            store,
86            stream: stream.fuse(),
87            ooo_buffer_size,
88            ooo_buffer_tx,
89            ooo_buffer_rx,
90            ingest_fut: None,
91            _marker: PhantomData,
92        }
93    }
94
95    delegate_access_inner!(stream, St, (.));
96}
97
98impl<St, S, L, E> Stream for Ingest<St, S, L, E>
99where
100    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
101    S: OperationStore<L, E> + LogStore<L, E> + 'static,
102    E: Extension<L> + Extension<PruneFlag> + Extensions + Send + Sync + 'static,
103    L: Send + Sync,
104{
105    type Item = Result<Operation<E>, IngestError>;
106
107    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108        let mut this = self.project();
109        let mut park_buffer = false;
110
111        loop {
112            // 1. Attempt validating and ingesting operation (this is a future we need to poll
113            //    a while until it resolves).
114            if let Some(ingest_fut) = this.ingest_fut.as_mut() {
115                let ingest_res = ready!(ingest_fut.poll_unpin(cx));
116                this.ingest_fut.take();
117
118                // 2. If the operation arrived out-of-order we can push it back into the internal
119                //    buffer and try again later (attempted for a configured number of times),
120                //    otherwise forward the result to the stream consumer.
121                match ingest_res {
122                    Ok((IngestResult::Retry(header, body, header_bytes, num_missing), counter)) => {
123                        // The number of max. reattempts is equal the size of the buffer. As long as
124                        // the buffer is just a FIFO queue it doesn't make sense to optimize over
125                        // different parameters as in a worst-case distribution of items (exact
126                        // reverse) this will be the max. and min. required bound.
127                        if counter > *this.ooo_buffer_size {
128                            return Poll::Ready(Some(Err(IngestError::MaxAttemptsReached(
129                                num_missing,
130                            ))));
131                        }
132
133                        // Push operation back into the internal queue, if something goes wrong here
134                        // this must be an critical failure.
135                        let Ok(_) = ready!(this.ooo_buffer_tx.poll_ready(cx)) else {
136                            break Poll::Ready(None);
137                        };
138
139                        let Ok(_) = this.ooo_buffer_tx.start_send(IngestAttempt(
140                            header,
141                            body,
142                            header_bytes,
143                            counter + 1,
144                        )) else {
145                            break Poll::Ready(None);
146                        };
147
148                        // In the next iteration we should prioritize the stream again.
149                        park_buffer = true;
150
151                        continue;
152                    }
153                    Ok((IngestResult::Complete(operation), _)) => {
154                        return Poll::Ready(Some(Ok(operation)));
155                    }
156                    Err(err) => {
157                        // Ingest failed and we want the stream consumers to be aware of that.
158                        return Poll::Ready(Some(Err(err)));
159                    }
160                }
161            }
162
163            // 3. Pull in the next item from the external stream or out-of-order buffer.
164            let res = {
165                // If the buffer ran full we prioritize pulling from it first, re-attempting
166                // ingest. This avoids clogging up the pipeline.
167                if this.ooo_buffer_rx.size_hint().0 == *this.ooo_buffer_size {
168                    ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
169                } else {
170                    // Otherwise prefer pulling from the external stream first as freshly incoming
171                    // data should be prioritized.
172                    match this.stream.as_mut().poll_next(cx) {
173                        Poll::Ready(Some((header, body, header_bytes))) => {
174                            Some(IngestAttempt(header, body, header_bytes, 1))
175                        }
176                        Poll::Pending => {
177                            // If we're getting back to the buffer queue after a failed ingest
178                            // attempt, we should "park" here instead and allow the runtime to try
179                            // polling the stream again next time.
180                            //
181                            // Otherwise we run into a loop where the runtime will never have the
182                            // chance again to take in new operations and we end up exhausting our
183                            // re-attempt counter for no reason.
184                            if park_buffer {
185                                return Poll::Pending;
186                            }
187                            ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
188                        }
189                        Poll::Ready(None) => match this.ooo_buffer_rx.as_mut().poll_next(cx) {
190                            Poll::Ready(Some(attempt)) => Some(attempt),
191                            // If there's no value coming from the buffer _and_ the external stream is
192                            // terminated, we can be sure nothing will come anymore.
193                            Poll::Pending => None,
194                            Poll::Ready(None) => None,
195                        },
196                    }
197                }
198            };
199            let Some(IngestAttempt(header, body, header_bytes, counter)) = res else {
200                // Both external stream and buffer stream has ended, so we stop here as well.
201                return Poll::Ready(None);
202            };
203
204            // 4. Validate and check the log-integrity of the incoming operation. If it is valid it
205            //    get's persisted and the log optionally pruned.
206            let mut store = this.store.clone();
207
208            let ingest_fut = async move {
209                let log_id = header
210                    .extension()
211                    .ok_or(IngestError::MissingHeaderExtension("log_id".into()))?;
212                let prune_flag: PruneFlag = header
213                    .extension()
214                    .ok_or(IngestError::MissingHeaderExtension("prune_flag".into()))?;
215
216                let ingest_res = ingest_operation::<S, L, E>(
217                    &mut store,
218                    header,
219                    body,
220                    header_bytes,
221                    &log_id,
222                    prune_flag.is_set(),
223                )
224                .await;
225
226                ingest_res.map(|res| (res, counter))
227            };
228
229            this.ingest_fut.replace(Box::pin(ingest_fut));
230        }
231    }
232}
233
234impl<St: FusedStream, S, L, E> FusedStream for Ingest<St, S, L, E>
235where
236    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
237    S: OperationStore<L, E> + LogStore<L, E> + 'static,
238    E: Extension<L> + Extension<PruneFlag> + Extensions + Send + Sync + 'static,
239    L: Send + Sync,
240{
241    fn is_terminated(&self) -> bool {
242        self.stream.is_terminated() && self.ooo_buffer_rx.is_terminated()
243    }
244}
245
246impl<St, S, L, E> Sink<(Header<E>, Option<Body>, Vec<u8>)> for Ingest<St, S, L, E>
247where
248    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
249        + Sink<(Header<E>, Option<Body>, Vec<u8>)>,
250    S: OperationStore<L, E> + LogStore<L, E>,
251    E: Extension<L> + Extension<PruneFlag> + Extensions,
252{
253    type Error = St::Error;
254
255    delegate_sink!(stream, (Header<E>, Option<Body>, Vec<u8>));
256}
257
258type AttemptCounter = usize;
259
260type IngestFut<E> =
261    Box<dyn Future<Output = Result<(IngestResult<E>, AttemptCounter), IngestError>> + Send>;
262
263#[derive(Debug)]
264struct IngestAttempt<E>(Header<E>, Option<Body>, Vec<u8>, AttemptCounter);
265
266#[cfg(test)]
267mod tests {
268    use std::time::Duration;
269
270    use futures_util::stream::iter;
271    use futures_util::{StreamExt, TryStreamExt};
272    use p2panda_core::{Operation, RawOperation};
273    use p2panda_store::MemoryStore;
274    use p2panda_store::sqlite::store::SqliteStore;
275    use p2panda_store::sqlite::test_utils::initialize_sqlite_db;
276    use tokio::sync::mpsc;
277    use tokio::time;
278    use tokio_stream::wrappers::ReceiverStream;
279
280    use crate::operation::IngestError;
281    use crate::stream::decode::DecodeExt;
282    use crate::test_utils::{Extensions, StreamName, mock_stream};
283
284    use super::IngestExt;
285
286    #[tokio::test]
287    async fn ingest() {
288        let store = MemoryStore::<StreamName, Extensions>::new();
289
290        let stream = mock_stream()
291            .take(5)
292            .decode()
293            .filter_map(|item| async {
294                match item {
295                    Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
296                    Err(_) => None,
297                }
298            })
299            .ingest(store, 16);
300
301        let res: Result<Vec<Operation<Extensions>>, IngestError> = stream.try_collect().await;
302        assert!(res.is_ok());
303    }
304
305    #[tokio::test]
306    async fn out_of_order() {
307        let items_num = 10;
308        let store = MemoryStore::<StreamName, Extensions>::new();
309
310        let mut items: Vec<RawOperation> = mock_stream().take(items_num).collect().await;
311        // Reverse all items, to ingest with a worst-case out-of-order sample set.
312        items.reverse();
313
314        let stream = iter(items)
315            .decode()
316            .filter_map(|item| async {
317                match item {
318                    Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
319                    Err(_) => None,
320                }
321            })
322            // Since the sample set ordering is worst-case (fully reversed), it makes sense to keep
323            // the buffer size at least as big as the sample size. Like this we can guarantee that
324            // ingest (and this test) will be successful.
325            .ingest(store, items_num);
326
327        let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
328        assert_eq!(res.len(), items_num);
329    }
330
331    #[tokio::test]
332    async fn ingest_async_store_bug() {
333        // Related issue: https://github.com/p2panda/p2panda/issues/694
334        let pool = initialize_sqlite_db().await;
335        let store = SqliteStore::<StreamName, Extensions>::new(pool);
336        let stream = mock_stream()
337            .take(5)
338            .decode()
339            .filter_map(|item| async { item.ok() })
340            .ingest(store, 16);
341        let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("no fail");
342        assert_eq!(res.len(), 5);
343    }
344
345    #[tokio::test]
346    async fn exhaust_re_attempts_too_early_bug() {
347        // Related issue: https://github.com/p2panda/p2panda/issues/665
348        let store = MemoryStore::<StreamName, Extensions>::new();
349        let (tx, rx) = mpsc::channel::<RawOperation>(10);
350
351        // Incoming operations in order: 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 (<-- first operation in log
352        // comes last in).
353        let mut operations: Vec<RawOperation> = mock_stream().take(10).collect().await;
354        operations.rotate_left(1);
355
356        tokio::spawn(async move {
357            // Reverse operations and pop one after another from the back to ingest.
358            operations.reverse();
359            while let Some(operation) = operations.pop() {
360                let _ = tx.send(operation).await;
361
362                // Waiting here is crucial to cause the bug: The polling logic will not receive a
363                // new item directly from the stream but rather prioritize the buffer.
364                time::sleep(Duration::from_millis(10)).await;
365            }
366        });
367
368        let stream = ReceiverStream::new(rx)
369            .decode()
370            .filter_map(|item| async {
371                match item {
372                    Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
373                    Err(_) => None,
374                }
375            })
376            .ingest(store, 128); // out-of-order buffer is large enough
377
378        let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
379        assert_eq!(res.len(), 10);
380    }
381}