external_buffered_stream/
lib.rs

1mod buffer;
2mod error;
3mod runtime;
4mod serde;
5
6pub use buffer::*;
7pub use error::*;
8pub use serde::*;
9
10use std::{
11    marker::PhantomData,
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15};
16
17use futures::{channel::mpsc, Future, SinkExt, Stream, StreamExt};
18
19pub struct ExternalBufferedStream<T, B, S>
20where
21    T: Send,
22    B: ExternalBuffer<T>,
23    S: Stream<Item = T>,
24{
25    buffer: Arc<B>,
26    _source: PhantomData<S>,
27    notify: mpsc::UnboundedReceiver<()>,
28
29    // the pending future that be polled by the stream consumer
30    pending: Option<Pin<Box<dyn Future<Output = Result<Option<T>, Error>> + Send>>>,
31}
32
33impl<T, B, S> ExternalBufferedStream<T, B, S>
34where
35    T: Send,
36    B: ExternalBuffer<T> + 'static,
37    S: Stream<Item = T> + Send + 'static,
38{
39    pub fn new(source: S, buffer: B) -> Self {
40        let source = Box::pin(source);
41
42        let buffer = Arc::new(buffer);
43        let buffer_clone = buffer.clone();
44
45        let (notify_tx, notify_rx) = mpsc::unbounded::<()>();
46
47        let handle_source = async move {
48            let mut source = source;
49            let mut notify_tx = notify_tx;
50            while let Some(item) = source.next().await {
51                match buffer_clone.push(item).await {
52                    Ok(()) => match notify_tx.send(()).await {
53                        Ok(_) => {}
54                        Err(e) => {
55                            log::error!("Failed to notify: {:?}", e);
56                            break;
57                        }
58                    },
59                    Err(e) => {
60                        log::error!("Failed to push item to buffer: {:?}", e);
61                        break;
62                    }
63                }
64            }
65            log::info!("Source of external buffer stream is ended.");
66        };
67        runtime::spawn(handle_source);
68
69        ExternalBufferedStream {
70            buffer,
71            _source: PhantomData,
72            notify: notify_rx,
73            pending: None,
74        }
75    }
76}
77
78impl<T, B, S> Stream for ExternalBufferedStream<T, B, S>
79where
80    T: Send,
81    B: ExternalBuffer<T> + 'static,
82    S: Stream<Item = T> + Send + 'static,
83{
84    type Item = T;
85
86    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87        // S is PhantomData, so here is safe to get mut
88        let this = unsafe { self.get_unchecked_mut() };
89
90        loop {
91            if this.pending.is_none() {
92                let buffer = this.buffer.clone();
93                this.pending = Some(Box::pin(async move { buffer.shift().await }));
94            }
95
96            if let Some(pending) = this.pending.as_mut() {
97                match pending.as_mut().poll(cx) {
98                    Poll::Ready(result) => {
99                        this.pending = None;
100
101                        match result {
102                            Ok(Some(item)) => {
103                                return Poll::Ready(Some(item));
104                            }
105                            Ok(None) => {
106                                let mut has_new = false;
107                                let is_end = loop {
108                                    // wait notify and consume all
109                                    match (&mut this.notify).poll_next_unpin(cx) {
110                                        Poll::Ready(Some(_)) => {
111                                            has_new = true;
112                                            // 消费所有通知
113                                            continue;
114                                        }
115                                        Poll::Ready(None) => break true,
116                                        Poll::Pending => break false,
117                                    }
118                                };
119                                if has_new {
120                                    continue;
121                                } else if is_end {
122                                    return Poll::Ready(None);
123                                } else {
124                                    return Poll::Pending;
125                                }
126                            }
127                            Err(err) => {
128                                log::error!("external buffer shift return error: {}", err);
129                                return Poll::Ready(None);
130                            }
131                        }
132                    }
133                    Poll::Pending => {
134                        return Poll::Pending;
135                    }
136                }
137            }
138        }
139    }
140}
141
142#[cfg(feature = "default")]
143pub fn create_external_buffered_stream<T, S, P>(
144    stream: S,
145    path: P,
146) -> Result<ExternalBufferedStream<T, ExternalBufferSled, S>, Error>
147where
148    T: ExternalBufferSerde + Send + 'static,
149    S: Stream<Item = T> + Send + Sync + 'static,
150    P: AsRef<std::path::Path>,
151{
152    Ok(ExternalBufferedStream::new(
153        stream,
154        ExternalBufferSled::new(path)?,
155    ))
156}
157
158#[cfg(feature = "queue")]
159pub fn create_queued_stream<T, S>(
160    stream: S,
161) -> Result<ExternalBufferedStream<T, ExternalBufferQueue<T>, S>, Error>
162where
163    T: Ord + Send + 'static,
164    S: Stream<Item = T> + Send + Sync + 'static,
165{
166    Ok(ExternalBufferedStream::new(
167        stream,
168        ExternalBufferQueue::new(),
169    ))
170}