1use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6
7use futures_channel::mpsc::{self};
8use futures_util::stream::{Fuse, FusedStream};
9use futures_util::task::{Context, Poll};
10use futures_util::{ready, Sink, Stream, StreamExt};
11use p2panda_core::prune::PruneFlag;
12use p2panda_core::{Body, Extension, Extensions, Header, Operation};
13use p2panda_store::{LogStore, OperationStore};
14use pin_project::pin_project;
15use pin_utils::pin_mut;
16
17use crate::macros::{delegate_access_inner, delegate_sink};
18use crate::operation::{ingest_operation, IngestError, IngestResult};
19
20pub trait IngestExt<S, L, E>: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)> {
23 fn ingest(self, store: S, ooo_buffer_size: usize) -> Ingest<Self, S, L, E>
35 where
36 S: OperationStore<L, E> + LogStore<L, E>,
37 E: Extension<L> + Extension<PruneFlag> + Extensions,
38 Self: Sized,
39 {
40 Ingest::new(self, store, ooo_buffer_size)
41 }
42}
43
44impl<T: ?Sized, S, L, E> IngestExt<S, L, E> for T where
45 T: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
46{
47}
48
49#[derive(Debug)]
51#[pin_project]
52#[must_use = "streams do nothing unless polled"]
53pub struct Ingest<St, S, L, E>
54where
55 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
56 E: Extension<L> + Extension<PruneFlag> + Extensions,
57 S: OperationStore<L, E> + LogStore<L, E>,
58{
59 #[pin]
60 stream: Fuse<St>,
61 store: S,
62 ooo_buffer_size: usize,
63 ooo_buffer_tx: mpsc::Sender<IngestAttempt<E>>,
64 #[pin]
65 ooo_buffer_rx: mpsc::Receiver<IngestAttempt<E>>,
66 _marker: PhantomData<L>,
67}
68
69impl<St, S, L, E> Ingest<St, S, L, E>
70where
71 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
72 S: OperationStore<L, E> + LogStore<L, E>,
73 E: Extension<L> + Extension<PruneFlag> + Extensions,
74{
75 pub(super) fn new(stream: St, store: S, ooo_buffer_size: usize) -> Ingest<St, S, L, E> {
76 let (ooo_buffer_tx, ooo_buffer_rx) = mpsc::channel::<IngestAttempt<E>>(ooo_buffer_size);
84
85 Ingest {
86 store,
87 stream: stream.fuse(),
88 ooo_buffer_size,
89 ooo_buffer_tx,
90 ooo_buffer_rx,
91 _marker: PhantomData,
92 }
93 }
94
95 delegate_access_inner!(stream, St, (.));
96}
97
98impl<St, S, L, E> Stream for Ingest<St, S, L, E>
99where
100 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
101 S: OperationStore<L, E> + LogStore<L, E>,
102 E: Extension<L> + Extension<PruneFlag> + Extensions,
103{
104 type Item = Result<Operation<E>, IngestError>;
105
106 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
107 let mut store = self.store.clone();
108 let mut this = self.project();
109 let mut park_buffer = false;
110
111 loop {
112 let res = {
114 if this.ooo_buffer_rx.size_hint().0 == *this.ooo_buffer_size {
117 ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
118 } else {
119 match this.stream.as_mut().poll_next(cx) {
122 Poll::Ready(Some((header, body, header_bytes))) => {
123 Some(IngestAttempt(header, body, header_bytes, 1))
124 }
125 Poll::Pending => {
126 if park_buffer {
134 return Poll::Pending;
135 }
136 ready!(this.ooo_buffer_rx.as_mut().poll_next(cx))
137 }
138 Poll::Ready(None) => match this.ooo_buffer_rx.as_mut().poll_next(cx) {
139 Poll::Ready(Some(attempt)) => Some(attempt),
140 Poll::Pending => None,
143 Poll::Ready(None) => None,
144 },
145 }
146 }
147 };
148 let Some(IngestAttempt(header, body, header_bytes, counter)) = res else {
149 return Poll::Ready(None);
151 };
152
153 let ingest_fut = async {
156 let log_id: L = header
157 .extract()
158 .ok_or(IngestError::MissingHeaderExtension("log_id".into()))?;
159 let prune_flag: PruneFlag = header
160 .extract()
161 .ok_or(IngestError::MissingHeaderExtension("prune_flag".into()))?;
162 ingest_operation::<S, L, E>(
163 &mut store,
164 header,
165 body,
166 header_bytes,
167 &log_id,
168 prune_flag.is_set(),
169 )
170 .await
171 };
172 pin_mut!(ingest_fut);
173 let ingest_res = ready!(ingest_fut.poll(cx));
174
175 match ingest_res {
179 Ok(IngestResult::Retry(header, body, header_bytes, num_missing)) => {
180 if counter > *this.ooo_buffer_size {
185 return Poll::Ready(Some(Err(IngestError::MaxAttemptsReached(
186 num_missing,
187 ))));
188 }
189
190 let Ok(_) = ready!(this.ooo_buffer_tx.poll_ready(cx)) else {
193 break Poll::Ready(None);
194 };
195
196 let Ok(_) = this.ooo_buffer_tx.start_send(IngestAttempt(
197 header,
198 body,
199 header_bytes,
200 counter + 1,
201 )) else {
202 break Poll::Ready(None);
203 };
204
205 park_buffer = true;
207
208 continue;
209 }
210 Ok(IngestResult::Complete(operation)) => {
211 return Poll::Ready(Some(Ok(operation)));
212 }
213 Err(err) => {
214 return Poll::Ready(Some(Err(err)));
216 }
217 }
218 }
219 }
220}
221
222impl<St: FusedStream, S, L, E> FusedStream for Ingest<St, S, L, E>
223where
224 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>,
225 S: OperationStore<L, E> + LogStore<L, E>,
226 E: Extension<L> + Extension<PruneFlag> + Extensions,
227{
228 fn is_terminated(&self) -> bool {
229 self.stream.is_terminated() && self.ooo_buffer_rx.is_terminated()
230 }
231}
232
233impl<St, S, L, E> Sink<(Header<E>, Option<Body>, Vec<u8>)> for Ingest<St, S, L, E>
234where
235 St: Stream<Item = (Header<E>, Option<Body>, Vec<u8>)>
236 + Sink<(Header<E>, Option<Body>, Vec<u8>)>,
237 S: OperationStore<L, E> + LogStore<L, E>,
238 E: Extension<L> + Extension<PruneFlag> + Extensions,
239{
240 type Error = St::Error;
241
242 delegate_sink!(stream, (Header<E>, Option<Body>, Vec<u8>));
243}
244
245#[derive(Debug)]
246struct IngestAttempt<E>(Header<E>, Option<Body>, Vec<u8>, usize);
247
248#[cfg(test)]
249mod tests {
250 use std::time::Duration;
251
252 use futures_util::stream::iter;
253 use futures_util::{StreamExt, TryStreamExt};
254 use p2panda_core::{Operation, RawOperation};
255 use p2panda_store::MemoryStore;
256 use tokio::sync::mpsc;
257 use tokio::time;
258 use tokio_stream::wrappers::ReceiverStream;
259
260 use crate::operation::IngestError;
261 use crate::stream::decode::DecodeExt;
262 use crate::test_utils::{mock_stream, Extensions, StreamName};
263
264 use super::IngestExt;
265
266 #[tokio::test]
267 async fn ingest() {
268 let store = MemoryStore::<StreamName, Extensions>::new();
269
270 let stream = mock_stream()
271 .take(5)
272 .decode()
273 .filter_map(|item| async {
274 match item {
275 Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
276 Err(_) => None,
277 }
278 })
279 .ingest(store, 16);
280
281 let res: Result<Vec<Operation<Extensions>>, IngestError> = stream.try_collect().await;
282 assert!(res.is_ok());
283 }
284
285 #[tokio::test]
286 async fn out_of_order() {
287 let items_num = 10;
288 let store = MemoryStore::<StreamName, Extensions>::new();
289
290 let mut items: Vec<RawOperation> = mock_stream().take(items_num).collect().await;
291 items.reverse();
293
294 let stream = iter(items)
295 .decode()
296 .filter_map(|item| async {
297 match item {
298 Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
299 Err(_) => None,
300 }
301 })
302 .ingest(store, items_num);
306
307 let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
308 assert_eq!(res.len(), items_num);
309 }
310
311 #[tokio::test]
312 async fn exhaust_re_attempts_too_early_bug() {
313 let store = MemoryStore::<StreamName, Extensions>::new();
315 let (tx, rx) = mpsc::channel::<RawOperation>(10);
316
317 let mut operations: Vec<RawOperation> = mock_stream().take(10).collect().await;
320 operations.rotate_left(1);
321
322 tokio::spawn(async move {
323 operations.reverse();
325 while let Some(operation) = operations.pop() {
326 let _ = tx.send(operation).await;
327
328 time::sleep(Duration::from_millis(10)).await;
331 }
332 });
333
334 let stream = ReceiverStream::new(rx)
335 .decode()
336 .filter_map(|item| async {
337 match item {
338 Ok((header, body, header_bytes)) => Some((header, body, header_bytes)),
339 Err(_) => None,
340 }
341 })
342 .ingest(store, 128); let res: Vec<Operation<Extensions>> = stream.try_collect().await.expect("not fail");
345 assert_eq!(res.len(), 10);
346 }
347}