p2panda_stream/stream/
ingest.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6
7use futures_channel::mpsc::{self};
8use futures_util::stream::{Fuse, FusedStream};
9use futures_util::task::{Context, Poll};
10use futures_util::{FutureExt, Sink, Stream, StreamExt, ready};
11use p2panda_core::prune::PruneFlag;
12use p2panda_core::{Body, Extension, Extensions, Header, Operation};
13use p2panda_store::{LogStore, OperationStore};
14use pin_project::pin_project;
15
16use crate::macros::{delegate_access_inner, delegate_sink};
17use crate::operation::{IngestError, IngestResult, ingest_operation};
18
19/// An extension trait for `Stream`s that provides a convenient [`ingest`](IngestExt::ingest)
20/// method.
21pub trait IngestExt<S, L, E>: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)> {
22    /// Checks incoming operations against their log integrity and persists them automatically in a
23    /// given store.
24    ///
25    /// Every given operation needs to implement a "prune flag" in their header as specified by the
26    /// p2panda protocol. Ingest will make sure to accordingly validate based on the given prune
27    /// status and automatically remove past items from the log.
28    ///
29    /// This ingest implementation holds an internal buffer for operations which come in "out of
30    /// order". The buffer size determines the maximum number of out-of-order operations in a row
31    /// this method can handle. This means that given a buffer size of for example 100, we can
32    /// handle a worst-case unordered, fully reversed log with 100 items without problem.
33    fn ingest(self, store: S, ooo_buffer_size: usize) -> Ingest<Self, S, L, E>
34    where
35        S: OperationStore<L, E> + LogStore<L, E>,
36        E: Extension<L> + Extension<PruneFlag> + Extensions,
37        Self: Sized,
38    {
39        Ingest::new(self, store, ooo_buffer_size)
40    }
41}
42
43impl<T: ?Sized, S, L, E> IngestExt<S, L, E> for T where
44    T: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
45{
46}
47
48/// Stream for the [`ingest`](IngestExt::ingest) method.
49#[pin_project]
50#[must_use = "streams do nothing unless polled"]
51pub struct Ingest<St, S, L, E>
52where
53    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
54    E: Extension<L> + Extension<PruneFlag> + Extensions,
55    S: OperationStore<L, E> + LogStore<L, E>,
56{
57    #[pin]
58    stream: Fuse<St>,
59    store: S,
60    ooo_buffer_size: usize,
61    ooo_buffer_tx: mpsc::Sender<IngestAttempt<E>>,
62    #[pin]
63    ooo_buffer_rx: mpsc::Receiver<IngestAttempt<E>>,
64    ingest_fut: Option<Pin<IngestFut<E>>>,
65    _marker: PhantomData<L>,
66}
67
68impl<St, S, L, E> Ingest<St, S, L, E>
69where
70    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
71    S: OperationStore<L, E> + LogStore<L, E>,
72    E: Extension<L> + Extension<PruneFlag> + Extensions,
73{
74    pub(super) fn new(stream: St, store: S, ooo_buffer_size: usize) -> Ingest<St, S, L, E> {
75        // @TODO(adz): We can optimize for the internal out-of-order buffer even more as it's FIFO
76        // nature is not optimal. A sorted list (by seq num, maybe even grouped by public key)
77        // might be more efficient, though I'm not sure about optimal implementations yet, so
78        // benchmarks and more real-world experience might make sense before we attempt any of
79        // this.
80        //
81        // Also, using an mpsc for the internal buffer seems overkill.
82        let (ooo_buffer_tx, ooo_buffer_rx) = mpsc::channel::<IngestAttempt<E>>(ooo_buffer_size);
83
84        Ingest {
85            store,
86            stream: stream.fuse(),
87            ooo_buffer_size,
88            ooo_buffer_tx,
89            ooo_buffer_rx,
90            ingest_fut: None,
91            _marker: PhantomData,
92        }
93    }
94
95    delegate_access_inner!(stream, St, (.));
96}
97
98impl<St, S, L, E> Stream for Ingest<St, S, L, E>
99where
100    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
101    S: OperationStore<L, E> + LogStore<L, E> + 'static,
102    E: Extension<L> + Extension<PruneFlag> + Extensions + Send + Sync + 'static,
103    L: Send + Sync,
104{
105    type Item = Result<Operation<E>, IngestError>;
106
107    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108        let mut this = self.project();
109        let mut park_buffer = false;
110
111        loop {
112            // 1. Attempt validating and ingesting operation (this is a future we need to poll
113            //    a while until it resolves).
114            if let Some(ingest_fut) = this.ingest_fut.as_mut() {
115                let ingest_res = ready!(ingest_fut.poll_unpin(cx));
116                this.ingest_fut.take();
117
118                // 2. If the operation arrived out-of-order we can push it back into the internal
119                //    buffer and try again later (attempted for a configured number of times),
120                //    otherwise forward the result to the stream consumer.
121                match ingest_res {
122                    Ok((IngestResult::Retry(header, body, header_bytes, num_missing), counter)) => {
123                        // The number of max. reattempts is equal the size of the buffer. As long as
124                        // the buffer is just a FIFO queue it doesn't make sense to optimize over
125                        // different parameters as in a worst-case distribution of items (exact
126                        // reverse) this will be the max. and min. required bound.
127                        if counter > *this.ooo_buffer_size {
128                            return Poll::Ready(Some(Err(IngestError::MaxAttemptsReached(
129                                num_missing,
130                            ))));
131                        }
132
133                        // Push operation back into the internal queue, if something goes wrong here
134                        // this must be an critical failure.
135                        let Ok(_) = ready!(this.ooo_buffer_tx.poll_ready(cx)) else {
136                            break Poll::Ready(None);
137                        };
138
139                        let Ok(_) = this.ooo_buffer_tx.start_send(IngestAttempt(
140                            header,
141                            body,
142                            header_bytes,
143                            counter + 1,
144                        )) else {
145                            break Poll::Ready(None);
146                        };
147
148                        // In the next iteration we should prioritise the stream again.
149                        park_buffer = true;
150
151                        continue;
152                    }
153                    Ok((IngestResult::Complete(operation), _)) => {
154                        return Poll::Ready(Some(Ok(operation)));
155                    }
156                    Ok((IngestResult::Outdated(_), _)) => {
157                        // Ignore "outdated" operations. This can happen if we've processed an
158                        // operation which was removed concurrently by a newer one (via pruning) in
159                        // the same log.
160
161                        // In the next iteration we should prioritise the stream again.
162                        park_buffer = true;
163
164                        continue;
165                    }
166                    Err(err) => {
167                        // Ingest failed and we want the stream consumers to be aware of that.
168                        return Poll::Ready(Some(Err(err)));
169                    }
170                }
171            }
172
173            // 3. Pull in the next item from the external stream or out-of-order buffer.
174            let res = {
175                // If the buffer ran full we prioritise pulling from it first, re-attempting
176                // ingest. This avoids clogging up the pipeline.
177                if this.ooo_buffer_rx.size_hint().0 == *this.ooo_buffer_size {
178                    ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
179                } else {
180                    // Otherwise prefer pulling from the external stream first as freshly incoming
181                    // data should be prioritised.
182                    match this.stream.as_mut().poll_next(cx) {
183                        Poll::Ready(Some((header, body, header_bytes))) => {
184                            Some(IngestAttempt(header, body, header_bytes, 1))
185                        }
186                        Poll::Pending => {
187                            // If we're getting back to the buffer queue after a failed ingest
188                            // attempt, we should "park" here instead and allow the runtime to try
189                            // polling the stream again next time.
190                            //
191                            // Otherwise we run into a loop where the runtime will never have the
192                            // chance again to take in new operations and we end up exhausting our
193                            // re-attempt counter for no reason.
194                            if park_buffer {
195                                return Poll::Pending;
196                            }
197                            ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
198                        }
199                        Poll::Ready(None) => match this.ooo_buffer_rx.as_mut().poll_next(cx) {
200                            Poll::Ready(Some(attempt)) => Some(attempt),
201                            // If there's no value coming from the buffer _and_ the external stream is
202                            // terminated, we can be sure nothing will come anymore.
203                            Poll::Pending => None,
204                            Poll::Ready(None) => None,
205                        },
206                    }
207                }
208            };
209            let Some(IngestAttempt(header, body, header_bytes, counter)) = res else {
210                // Both external stream and buffer stream has ended, so we stop here as well.
211                return Poll::Ready(None);
212            };
213
214            // 4. Validate and check the log-integrity of the incoming operation. If it is valid it
215            //    get's persisted and the log optionally pruned.
216            let mut store = this.store.clone();
217
218            let ingest_fut = async move {
219                let log_id = header
220                    .extension()
221                    .ok_or(IngestError::MissingHeaderExtension("log_id".into()))?;
222                let prune_flag: PruneFlag = header
223                    .extension()
224                    .ok_or(IngestError::MissingHeaderExtension("prune_flag".into()))?;
225
226                let ingest_res = ingest_operation::<S, L, E>(
227                    &mut store,
228                    header,
229                    body,
230                    header_bytes,
231                    &log_id,
232                    prune_flag.is_set(),
233                )
234                .await;
235
236                ingest_res.map(|res| (res, counter))
237            };
238
239            this.ingest_fut.replace(Box::pin(ingest_fut));
240        }
241    }
242}
243
244impl<St: FusedStream, S, L, E> FusedStream for Ingest<St, S, L, E>
245where
246    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
247    S: OperationStore<L, E> + LogStore<L, E> + 'static,
248    E: Extension<L> + Extension<PruneFlag> + Extensions + Send + Sync + 'static,
249    L: Send + Sync,
250{
251    fn is_terminated(&self) -> bool {
252        self.stream.is_terminated() && self.ooo_buffer_rx.is_terminated()
253    }
254}
255
256impl<St, S, L, E> Sink<(Header<E>, Option<Body>, Vec<u8>)> for Ingest<St, S, L, E>
257where
258    St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
259        + Sink<(Header<E>, Option<Body>, Vec<u8>)>,
260    S: OperationStore<L, E> + LogStore<L, E>,
261    E: Extension<L> + Extension<PruneFlag> + Extensions,
262{
263    type Error = St::Error;
264
265    delegate_sink!(stream, (Header<E>, Option<Body>, Vec<u8>));
266}
267
268type AttemptCounter = usize;
269
270type IngestFut<E> =
271    Box<dyn Future<Output = Result<(IngestResult<E>, AttemptCounter), IngestError>> + Send>;
272
273#[derive(Debug)]
274struct IngestAttempt<E>(Header<E>, Option<Body>, Vec<u8>, AttemptCounter);
275
276#[cfg(test)]
277mod tests {
278    use std::time::Duration;
279
280    use futures_util::stream::iter;
281    use futures_util::{StreamExt, TryStreamExt};
282    use p2panda_core::{Operation, RawOperation};
283    use p2panda_store::MemoryStore;
284    use p2panda_store::sqlite::store::SqliteStore;
285    use p2panda_store::sqlite::test_utils::initialize_sqlite_db;
286    use tokio::sync::mpsc;
287    use tokio::time;
288    use tokio_stream::wrappers::ReceiverStream;
289
290    use crate::operation::IngestError;
291    use crate::stream::decode::DecodeExt;
292    use crate::test_utils::{Extensions, StreamName, mock_stream};
293
294    use super::IngestExt;
295
296    #[tokio::test]
297    async fn ingest() {
298        let store = MemoryStore::<StreamName, Extensions>::new();
299
300        let stream = mock_stream()
301            .take(5)
302            .decode()
303            .filter_map(|item| async {
304                match item {
305                    Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
306                    Err(_) => None,
307                }
308            })
309            .ingest(store, 16);
310
311        let res: Result<Vec<Operation<Extensions>>, IngestError> = stream.try_collect().await;
312        assert!(res.is_ok());
313    }
314
315    #[tokio::test]
316    async fn out_of_order() {
317        let items_num = 10;
318        let store = MemoryStore::<StreamName, Extensions>::new();
319
320        let mut items: Vec<RawOperation> = mock_stream().take(items_num).collect().await;
321        // Reverse all items, to ingest with a worst-case out-of-order sample set.
322        items.reverse();
323
324        let stream = iter(items)
325            .decode()
326            .filter_map(|item| async {
327                match item {
328                    Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
329                    Err(_) => None,
330                }
331            })
332            // Since the sample set ordering is worst-case (fully reversed), it makes sense to keep
333            // the buffer size at least as big as the sample size. Like this we can guarantee that
334            // ingest (and this test) will be successful.
335            .ingest(store, items_num);
336
337        let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
338        assert_eq!(res.len(), items_num);
339    }
340
341    #[tokio::test]
342    async fn ingest_async_store_bug() {
343        // Related issue: https://github.com/p2panda/p2panda/issues/694
344        let pool = initialize_sqlite_db().await;
345        let store = SqliteStore::<StreamName, Extensions>::new(pool);
346        let stream = mock_stream()
347            .take(5)
348            .decode()
349            .filter_map(|item| async { item.ok() })
350            .ingest(store, 16);
351        let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("no fail");
352        assert_eq!(res.len(), 5);
353    }
354
355    #[tokio::test]
356    async fn exhaust_re_attempts_too_early_bug() {
357        // Related issue: https://github.com/p2panda/p2panda/issues/665
358        let store = MemoryStore::<StreamName, Extensions>::new();
359        let (tx, rx) = mpsc::channel::<RawOperation>(10);
360
361        // Incoming operations in order: 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 (<-- first operation in log
362        // comes last in).
363        let mut operations: Vec<RawOperation> = mock_stream().take(10).collect().await;
364        operations.rotate_left(1);
365
366        tokio::spawn(async move {
367            // Reverse operations and pop one after another from the back to ingest.
368            operations.reverse();
369            while let Some(operation) = operations.pop() {
370                let _ = tx.send(operation).await;
371
372                // Waiting here is crucial to cause the bug: The polling logic will not receive a
373                // new item directly from the stream but rather prioritise the buffer.
374                time::sleep(Duration::from_millis(10)).await;
375            }
376        });
377
378        let stream = ReceiverStream::new(rx)
379            .decode()
380            .filter_map(|item| async {
381                match item {
382                    Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
383                    Err(_) => None,
384                }
385            })
386            .ingest(store, 128); // out-of-order buffer is large enough
387
388        let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
389        assert_eq!(res.len(), 10);
390    }
391}