external_buffered_stream/
lib.rs1mod buffer;
2mod error;
3mod runtime;
4mod serde;
5
6pub use buffer::*;
7pub use error::*;
8pub use serde::*;
9
10use std::{
11 marker::PhantomData,
12 pin::Pin,
13 sync::Arc,
14 task::{Context, Poll},
15};
16
17use futures::{channel::mpsc, Future, SinkExt, Stream, StreamExt};
18
19pub struct ExternalBufferedStream<T, B, S>
20where
21 T: Send,
22 B: ExternalBuffer<T>,
23 S: Stream<Item = T>,
24{
25 buffer: Arc<B>,
26 _source: PhantomData<S>,
27 notify: mpsc::UnboundedReceiver<()>,
28
29 pending: Option<Pin<Box<dyn Future<Output = Result<Option<T>, Error>> + Send>>>,
31}
32
33impl<T, B, S> ExternalBufferedStream<T, B, S>
34where
35 T: Send,
36 B: ExternalBuffer<T> + 'static,
37 S: Stream<Item = T> + Send + 'static,
38{
39 pub fn new(source: S, buffer: B) -> Self {
40 let source = Box::pin(source);
41
42 let buffer = Arc::new(buffer);
43 let buffer_clone = buffer.clone();
44
45 let (notify_tx, notify_rx) = mpsc::unbounded::<()>();
46
47 let handle_source = async move {
48 let mut source = source;
49 let mut notify_tx = notify_tx;
50 while let Some(item) = source.next().await {
51 match buffer_clone.push(item).await {
52 Ok(()) => match notify_tx.send(()).await {
53 Ok(_) => {}
54 Err(e) => {
55 log::error!("Failed to notify: {:?}", e);
56 break;
57 }
58 },
59 Err(e) => {
60 log::error!("Failed to push item to buffer: {:?}", e);
61 break;
62 }
63 }
64 }
65 log::info!("Source of external buffer stream is ended.");
66 };
67 runtime::spawn(handle_source);
68
69 ExternalBufferedStream {
70 buffer,
71 _source: PhantomData,
72 notify: notify_rx,
73 pending: None,
74 }
75 }
76}
77
78impl<T, B, S> Stream for ExternalBufferedStream<T, B, S>
79where
80 T: Send,
81 B: ExternalBuffer<T> + 'static,
82 S: Stream<Item = T> + Send + 'static,
83{
84 type Item = T;
85
86 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 let this = unsafe { self.get_unchecked_mut() };
89
90 loop {
91 if this.pending.is_none() {
92 let buffer = this.buffer.clone();
93 this.pending = Some(Box::pin(async move { buffer.shift().await }));
94 }
95
96 if let Some(pending) = this.pending.as_mut() {
97 match pending.as_mut().poll(cx) {
98 Poll::Ready(result) => {
99 this.pending = None;
100
101 match result {
102 Ok(Some(item)) => {
103 return Poll::Ready(Some(item));
104 }
105 Ok(None) => {
106 let mut has_new = false;
107 let is_end = loop {
108 match (&mut this.notify).poll_next_unpin(cx) {
110 Poll::Ready(Some(_)) => {
111 has_new = true;
112 continue;
114 }
115 Poll::Ready(None) => break true,
116 Poll::Pending => break false,
117 }
118 };
119 if has_new {
120 continue;
121 } else if is_end {
122 return Poll::Ready(None);
123 } else {
124 return Poll::Pending;
125 }
126 }
127 Err(err) => {
128 log::error!("external buffer shift return error: {}", err);
129 return Poll::Ready(None);
130 }
131 }
132 }
133 Poll::Pending => {
134 return Poll::Pending;
135 }
136 }
137 }
138 }
139 }
140}
141
142#[cfg(feature = "default")]
143pub fn create_external_buffered_stream<T, S, P>(
144 stream: S,
145 path: P,
146) -> Result<ExternalBufferedStream<T, ExternalBufferSled, S>, Error>
147where
148 T: ExternalBufferSerde + Send + 'static,
149 S: Stream<Item = T> + Send + Sync + 'static,
150 P: AsRef<std::path::Path>,
151{
152 Ok(ExternalBufferedStream::new(
153 stream,
154 ExternalBufferSled::new(path)?,
155 ))
156}
157
158#[cfg(feature = "queue")]
159pub fn create_queued_stream<T, S>(
160 stream: S,
161) -> Result<ExternalBufferedStream<T, ExternalBufferQueue<T>, S>, Error>
162where
163 T: Ord + Send + 'static,
164 S: Stream<Item = T> + Send + Sync + 'static,
165{
166 Ok(ExternalBufferedStream::new(
167 stream,
168 ExternalBufferQueue::new(),
169 ))
170}