1use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6
7use futures_channel::mpsc::{self};
8use futures_util::stream::{Fuse, FusedStream};
9use futures_util::task::{Context, Poll};
10use futures_util::{FutureExt, Sink, Stream, StreamExt, ready};
11use p2panda_core::prune::PruneFlag;
12use p2panda_core::{Body, Extension, Extensions, Header, Operation};
13use p2panda_store::{LogStore, OperationStore};
14use pin_project::pin_project;
15
16use crate::macros::{delegate_access_inner, delegate_sink};
17use crate::operation::{IngestError, IngestResult, ingest_operation};
18
19pub trait IngestExt<S, L, E>: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)> {
22 fn ingest(self, store: S, ooo_buffer_size: usize) -> Ingest<Self, S, L, E>
34 where
35 S: OperationStore<L, E> + LogStore<L, E>,
36 E: Extension<L> + Extension<PruneFlag> + Extensions,
37 Self: Sized,
38 {
39 Ingest::new(self, store, ooo_buffer_size)
40 }
41}
42
43impl<T: ?Sized, S, L, E> IngestExt<S, L, E> for T where
44 T: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
45{
46}
47
48#[pin_project]
50#[must_use = "streams do nothing unless polled"]
51pub struct Ingest<St, S, L, E>
52where
53 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
54 E: Extension<L> + Extension<PruneFlag> + Extensions,
55 S: OperationStore<L, E> + LogStore<L, E>,
56{
57 #[pin]
58 stream: Fuse<St>,
59 store: S,
60 ooo_buffer_size: usize,
61 ooo_buffer_tx: mpsc::Sender<IngestAttempt<E>>,
62 #[pin]
63 ooo_buffer_rx: mpsc::Receiver<IngestAttempt<E>>,
64 ingest_fut: Option<Pin<IngestFut<E>>>,
65 _marker: PhantomData<L>,
66}
67
68impl<St, S, L, E> Ingest<St, S, L, E>
69where
70 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
71 S: OperationStore<L, E> + LogStore<L, E>,
72 E: Extension<L> + Extension<PruneFlag> + Extensions,
73{
74 pub(super) fn new(stream: St, store: S, ooo_buffer_size: usize) -> Ingest<St, S, L, E> {
75 let (ooo_buffer_tx, ooo_buffer_rx) = mpsc::channel::<IngestAttempt<E>>(ooo_buffer_size);
83
84 Ingest {
85 store,
86 stream: stream.fuse(),
87 ooo_buffer_size,
88 ooo_buffer_tx,
89 ooo_buffer_rx,
90 ingest_fut: None,
91 _marker: PhantomData,
92 }
93 }
94
95 delegate_access_inner!(stream, St, (.));
96}
97
98impl<St, S, L, E> Stream for Ingest<St, S, L, E>
99where
100 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
101 S: OperationStore<L, E> + LogStore<L, E> + 'static,
102 E: Extension<L> + Extension<PruneFlag> + Extensions + Send + Sync + 'static,
103 L: Send + Sync,
104{
105 type Item = Result<Operation<E>, IngestError>;
106
107 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108 let mut this = self.project();
109 let mut park_buffer = false;
110
111 loop {
112 if let Some(ingest_fut) = this.ingest_fut.as_mut() {
115 let ingest_res = ready!(ingest_fut.poll_unpin(cx));
116 this.ingest_fut.take();
117
118 match ingest_res {
122 Ok((IngestResult::Retry(header, body, header_bytes, num_missing), counter)) => {
123 if counter > *this.ooo_buffer_size {
128 return Poll::Ready(Some(Err(IngestError::MaxAttemptsReached(
129 num_missing,
130 ))));
131 }
132
133 let Ok(_) = ready!(this.ooo_buffer_tx.poll_ready(cx)) else {
136 break Poll::Ready(None);
137 };
138
139 let Ok(_) = this.ooo_buffer_tx.start_send(IngestAttempt(
140 header,
141 body,
142 header_bytes,
143 counter + 1,
144 )) else {
145 break Poll::Ready(None);
146 };
147
148 park_buffer = true;
150
151 continue;
152 }
153 Ok((IngestResult::Complete(operation), _)) => {
154 return Poll::Ready(Some(Ok(operation)));
155 }
156 Ok((IngestResult::Outdated(_), _)) => {
157 park_buffer = true;
163
164 continue;
165 }
166 Err(err) => {
167 return Poll::Ready(Some(Err(err)));
169 }
170 }
171 }
172
173 let res = {
175 if this.ooo_buffer_rx.size_hint().0 == *this.ooo_buffer_size {
178 ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
179 } else {
180 match this.stream.as_mut().poll_next(cx) {
183 Poll::Ready(Some((header, body, header_bytes))) => {
184 Some(IngestAttempt(header, body, header_bytes, 1))
185 }
186 Poll::Pending => {
187 if park_buffer {
195 return Poll::Pending;
196 }
197 ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
198 }
199 Poll::Ready(None) => match this.ooo_buffer_rx.as_mut().poll_next(cx) {
200 Poll::Ready(Some(attempt)) => Some(attempt),
201 Poll::Pending => None,
204 Poll::Ready(None) => None,
205 },
206 }
207 }
208 };
209 let Some(IngestAttempt(header, body, header_bytes, counter)) = res else {
210 return Poll::Ready(None);
212 };
213
214 let mut store = this.store.clone();
217
218 let ingest_fut = async move {
219 let log_id = header
220 .extension()
221 .ok_or(IngestError::MissingHeaderExtension("log_id".into()))?;
222 let prune_flag: PruneFlag = header
223 .extension()
224 .ok_or(IngestError::MissingHeaderExtension("prune_flag".into()))?;
225
226 let ingest_res = ingest_operation::<S, L, E>(
227 &mut store,
228 header,
229 body,
230 header_bytes,
231 &log_id,
232 prune_flag.is_set(),
233 )
234 .await;
235
236 ingest_res.map(|res| (res, counter))
237 };
238
239 this.ingest_fut.replace(Box::pin(ingest_fut));
240 }
241 }
242}
243
244impl<St: FusedStream, S, L, E> FusedStream for Ingest<St, S, L, E>
245where
246 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
247 S: OperationStore<L, E> + LogStore<L, E> + 'static,
248 E: Extension<L> + Extension<PruneFlag> + Extensions + Send + Sync + 'static,
249 L: Send + Sync,
250{
251 fn is_terminated(&self) -> bool {
252 self.stream.is_terminated() && self.ooo_buffer_rx.is_terminated()
253 }
254}
255
256impl<St, S, L, E> Sink<(Header<E>, Option<Body>, Vec<u8>)> for Ingest<St, S, L, E>
257where
258 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
259 + Sink<(Header<E>, Option<Body>, Vec<u8>)>,
260 S: OperationStore<L, E> + LogStore<L, E>,
261 E: Extension<L> + Extension<PruneFlag> + Extensions,
262{
263 type Error = St::Error;
264
265 delegate_sink!(stream, (Header<E>, Option<Body>, Vec<u8>));
266}
267
268type AttemptCounter = usize;
269
270type IngestFut<E> =
271 Box<dyn Future<Output = Result<(IngestResult<E>, AttemptCounter), IngestError>> + Send>;
272
273#[derive(Debug)]
274struct IngestAttempt<E>(Header<E>, Option<Body>, Vec<u8>, AttemptCounter);
275
276#[cfg(test)]
277mod tests {
278 use std::time::Duration;
279
280 use futures_util::stream::iter;
281 use futures_util::{StreamExt, TryStreamExt};
282 use p2panda_core::{Operation, RawOperation};
283 use p2panda_store::MemoryStore;
284 use p2panda_store::sqlite::store::SqliteStore;
285 use p2panda_store::sqlite::test_utils::initialize_sqlite_db;
286 use tokio::sync::mpsc;
287 use tokio::time;
288 use tokio_stream::wrappers::ReceiverStream;
289
290 use crate::operation::IngestError;
291 use crate::stream::decode::DecodeExt;
292 use crate::test_utils::{Extensions, StreamName, mock_stream};
293
294 use super::IngestExt;
295
296 #[tokio::test]
297 async fn ingest() {
298 let store = MemoryStore::<StreamName, Extensions>::new();
299
300 let stream = mock_stream()
301 .take(5)
302 .decode()
303 .filter_map(|item| async {
304 match item {
305 Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
306 Err(_) => None,
307 }
308 })
309 .ingest(store, 16);
310
311 let res: Result<Vec<Operation<Extensions>>, IngestError> = stream.try_collect().await;
312 assert!(res.is_ok());
313 }
314
315 #[tokio::test]
316 async fn out_of_order() {
317 let items_num = 10;
318 let store = MemoryStore::<StreamName, Extensions>::new();
319
320 let mut items: Vec<RawOperation> = mock_stream().take(items_num).collect().await;
321 items.reverse();
323
324 let stream = iter(items)
325 .decode()
326 .filter_map(|item| async {
327 match item {
328 Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
329 Err(_) => None,
330 }
331 })
332 .ingest(store, items_num);
336
337 let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
338 assert_eq!(res.len(), items_num);
339 }
340
341 #[tokio::test]
342 async fn ingest_async_store_bug() {
343 let pool = initialize_sqlite_db().await;
345 let store = SqliteStore::<StreamName, Extensions>::new(pool);
346 let stream = mock_stream()
347 .take(5)
348 .decode()
349 .filter_map(|item| async { item.ok() })
350 .ingest(store, 16);
351 let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("no fail");
352 assert_eq!(res.len(), 5);
353 }
354
355 #[tokio::test]
356 async fn exhaust_re_attempts_too_early_bug() {
357 let store = MemoryStore::<StreamName, Extensions>::new();
359 let (tx, rx) = mpsc::channel::<RawOperation>(10);
360
361 let mut operations: Vec<RawOperation> = mock_stream().take(10).collect().await;
364 operations.rotate_left(1);
365
366 tokio::spawn(async move {
367 operations.reverse();
369 while let Some(operation) = operations.pop() {
370 let _ = tx.send(operation).await;
371
372 time::sleep(Duration::from_millis(10)).await;
375 }
376 });
377
378 let stream = ReceiverStream::new(rx)
379 .decode()
380 .filter_map(|item| async {
381 match item {
382 Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
383 Err(_) => None,
384 }
385 })
386 .ingest(store, 128); let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
389 assert_eq!(res.len(), 10);
390 }
391}