1use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6
7use futures_channel::mpsc::{self};
8use futures_util::stream::{Fuse, FusedStream};
9use futures_util::task::{Context, Poll};
10use futures_util::{FutureExt, Sink, Stream, StreamExt, ready};
11use p2panda_core::prune::PruneFlag;
12use p2panda_core::{Body, Extension, Extensions, Header, Operation};
13use p2panda_store::{LogStore, OperationStore};
14use pin_project::pin_project;
15
16use crate::macros::{delegate_access_inner, delegate_sink};
17use crate::operation::{IngestError, IngestResult, ingest_operation};
18
19pub trait IngestExt<S, L, E>: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)> {
22 fn ingest(self, store: S, ooo_buffer_size: usize) -> Ingest<Self, S, L, E>
34 where
35 S: OperationStore<L, E> + LogStore<L, E>,
36 E: Extension<L> + Extension<PruneFlag> + Extensions,
37 Self: Sized,
38 {
39 Ingest::new(self, store, ooo_buffer_size)
40 }
41}
42
43impl<T: ?Sized, S, L, E> IngestExt<S, L, E> for T where
44 T: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
45{
46}
47
48#[pin_project]
50#[must_use = "streams do nothing unless polled"]
51pub struct Ingest<St, S, L, E>
52where
53 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
54 E: Extension<L> + Extension<PruneFlag> + Extensions,
55 S: OperationStore<L, E> + LogStore<L, E>,
56{
57 #[pin]
58 stream: Fuse<St>,
59 store: S,
60 ooo_buffer_size: usize,
61 ooo_buffer_tx: mpsc::Sender<IngestAttempt<E>>,
62 #[pin]
63 ooo_buffer_rx: mpsc::Receiver<IngestAttempt<E>>,
64 ingest_fut: Option<Pin<IngestFut<E>>>,
65 _marker: PhantomData<L>,
66}
67
68impl<St, S, L, E> Ingest<St, S, L, E>
69where
70 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
71 S: OperationStore<L, E> + LogStore<L, E>,
72 E: Extension<L> + Extension<PruneFlag> + Extensions,
73{
74 pub(super) fn new(stream: St, store: S, ooo_buffer_size: usize) -> Ingest<St, S, L, E> {
75 let (ooo_buffer_tx, ooo_buffer_rx) = mpsc::channel::<IngestAttempt<E>>(ooo_buffer_size);
83
84 Ingest {
85 store,
86 stream: stream.fuse(),
87 ooo_buffer_size,
88 ooo_buffer_tx,
89 ooo_buffer_rx,
90 ingest_fut: None,
91 _marker: PhantomData,
92 }
93 }
94
95 delegate_access_inner!(stream, St, (.));
96}
97
98impl<St, S, L, E> Stream for Ingest<St, S, L, E>
99where
100 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
101 S: OperationStore<L, E> + LogStore<L, E> + 'static,
102 E: Extension<L> + Extension<PruneFlag> + Extensions + Send + Sync + 'static,
103 L: Send + Sync,
104{
105 type Item = Result<Operation<E>, IngestError>;
106
107 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108 let mut this = self.project();
109 let mut park_buffer = false;
110
111 loop {
112 if let Some(ingest_fut) = this.ingest_fut.as_mut() {
115 let ingest_res = ready!(ingest_fut.poll_unpin(cx));
116 this.ingest_fut.take();
117
118 match ingest_res {
122 Ok((IngestResult::Retry(header, body, header_bytes, num_missing), counter)) => {
123 if counter > *this.ooo_buffer_size {
128 return Poll::Ready(Some(Err(IngestError::MaxAttemptsReached(
129 num_missing,
130 ))));
131 }
132
133 let Ok(_) = ready!(this.ooo_buffer_tx.poll_ready(cx)) else {
136 break Poll::Ready(None);
137 };
138
139 let Ok(_) = this.ooo_buffer_tx.start_send(IngestAttempt(
140 header,
141 body,
142 header_bytes,
143 counter + 1,
144 )) else {
145 break Poll::Ready(None);
146 };
147
148 park_buffer = true;
150
151 continue;
152 }
153 Ok((IngestResult::Complete(operation), _)) => {
154 return Poll::Ready(Some(Ok(operation)));
155 }
156 Err(err) => {
157 return Poll::Ready(Some(Err(err)));
159 }
160 }
161 }
162
163 let res = {
165 if this.ooo_buffer_rx.size_hint().0 == *this.ooo_buffer_size {
168 ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
169 } else {
170 match this.stream.as_mut().poll_next(cx) {
173 Poll::Ready(Some((header, body, header_bytes))) => {
174 Some(IngestAttempt(header, body, header_bytes, 1))
175 }
176 Poll::Pending => {
177 if park_buffer {
185 return Poll::Pending;
186 }
187 ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
188 }
189 Poll::Ready(None) => match this.ooo_buffer_rx.as_mut().poll_next(cx) {
190 Poll::Ready(Some(attempt)) => Some(attempt),
191 Poll::Pending => None,
194 Poll::Ready(None) => None,
195 },
196 }
197 }
198 };
199 let Some(IngestAttempt(header, body, header_bytes, counter)) = res else {
200 return Poll::Ready(None);
202 };
203
204 let mut store = this.store.clone();
207
208 let ingest_fut = async move {
209 let log_id = header
210 .extension()
211 .ok_or(IngestError::MissingHeaderExtension("log_id".into()))?;
212 let prune_flag: PruneFlag = header
213 .extension()
214 .ok_or(IngestError::MissingHeaderExtension("prune_flag".into()))?;
215
216 let ingest_res = ingest_operation::<S, L, E>(
217 &mut store,
218 header,
219 body,
220 header_bytes,
221 &log_id,
222 prune_flag.is_set(),
223 )
224 .await;
225
226 ingest_res.map(|res| (res, counter))
227 };
228
229 this.ingest_fut.replace(Box::pin(ingest_fut));
230 }
231 }
232}
233
234impl<St: FusedStream, S, L, E> FusedStream for Ingest<St, S, L, E>
235where
236 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
237 S: OperationStore<L, E> + LogStore<L, E> + 'static,
238 E: Extension<L> + Extension<PruneFlag> + Extensions + Send + Sync + 'static,
239 L: Send + Sync,
240{
241 fn is_terminated(&self) -> bool {
242 self.stream.is_terminated() && self.ooo_buffer_rx.is_terminated()
243 }
244}
245
246impl<St, S, L, E> Sink<(Header<E>, Option<Body>, Vec<u8>)> for Ingest<St, S, L, E>
247where
248 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
249 + Sink<(Header<E>, Option<Body>, Vec<u8>)>,
250 S: OperationStore<L, E> + LogStore<L, E>,
251 E: Extension<L> + Extension<PruneFlag> + Extensions,
252{
253 type Error = St::Error;
254
255 delegate_sink!(stream, (Header<E>, Option<Body>, Vec<u8>));
256}
257
258type AttemptCounter = usize;
259
260type IngestFut<E> =
261 Box<dyn Future<Output = Result<(IngestResult<E>, AttemptCounter), IngestError>> + Send>;
262
263#[derive(Debug)]
264struct IngestAttempt<E>(Header<E>, Option<Body>, Vec<u8>, AttemptCounter);
265
266#[cfg(test)]
267mod tests {
268 use std::time::Duration;
269
270 use futures_util::stream::iter;
271 use futures_util::{StreamExt, TryStreamExt};
272 use p2panda_core::{Operation, RawOperation};
273 use p2panda_store::MemoryStore;
274 use p2panda_store::sqlite::store::SqliteStore;
275 use p2panda_store::sqlite::test_utils::initialize_sqlite_db;
276 use tokio::sync::mpsc;
277 use tokio::time;
278 use tokio_stream::wrappers::ReceiverStream;
279
280 use crate::operation::IngestError;
281 use crate::stream::decode::DecodeExt;
282 use crate::test_utils::{Extensions, StreamName, mock_stream};
283
284 use super::IngestExt;
285
286 #[tokio::test]
287 async fn ingest() {
288 let store = MemoryStore::<StreamName, Extensions>::new();
289
290 let stream = mock_stream()
291 .take(5)
292 .decode()
293 .filter_map(|item| async {
294 match item {
295 Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
296 Err(_) => None,
297 }
298 })
299 .ingest(store, 16);
300
301 let res: Result<Vec<Operation<Extensions>>, IngestError> = stream.try_collect().await;
302 assert!(res.is_ok());
303 }
304
305 #[tokio::test]
306 async fn out_of_order() {
307 let items_num = 10;
308 let store = MemoryStore::<StreamName, Extensions>::new();
309
310 let mut items: Vec<RawOperation> = mock_stream().take(items_num).collect().await;
311 items.reverse();
313
314 let stream = iter(items)
315 .decode()
316 .filter_map(|item| async {
317 match item {
318 Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
319 Err(_) => None,
320 }
321 })
322 .ingest(store, items_num);
326
327 let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
328 assert_eq!(res.len(), items_num);
329 }
330
331 #[tokio::test]
332 async fn ingest_async_store_bug() {
333 let pool = initialize_sqlite_db().await;
335 let store = SqliteStore::<StreamName, Extensions>::new(pool);
336 let stream = mock_stream()
337 .take(5)
338 .decode()
339 .filter_map(|item| async { item.ok() })
340 .ingest(store, 16);
341 let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("no fail");
342 assert_eq!(res.len(), 5);
343 }
344
345 #[tokio::test]
346 async fn exhaust_re_attempts_too_early_bug() {
347 let store = MemoryStore::<StreamName, Extensions>::new();
349 let (tx, rx) = mpsc::channel::<RawOperation>(10);
350
351 let mut operations: Vec<RawOperation> = mock_stream().take(10).collect().await;
354 operations.rotate_left(1);
355
356 tokio::spawn(async move {
357 operations.reverse();
359 while let Some(operation) = operations.pop() {
360 let _ = tx.send(operation).await;
361
362 time::sleep(Duration::from_millis(10)).await;
365 }
366 });
367
368 let stream = ReceiverStream::new(rx)
369 .decode()
370 .filter_map(|item| async {
371 match item {
372 Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
373 Err(_) => None,
374 }
375 })
376 .ingest(store, 128); let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
379 assert_eq!(res.len(), 10);
380 }
381}