sea_streamer_file/
source.rs

1use std::time::Duration;
2
3use flume::{bounded, r#async::RecvStream, unbounded, Receiver, Sender, TryRecvError};
4use sea_streamer_types::{
5    export::futures::{Future, StreamExt},
6    SeqPos,
7};
8
9use crate::{
10    watcher::{new_watcher, FileEvent, Watcher},
11    AsyncFile, ByteBuffer, Bytes, FileErr, FileId, ReadFrom,
12};
13use sea_streamer_runtime::{spawn_task, timeout, TaskHandle};
14
15pub trait ByteSource {
16    type Future<'a>: Future<Output = Result<Bytes, FileErr>>
17    where
18        Self: 'a;
19
20    #[allow(clippy::needless_lifetimes)]
21    fn request_bytes<'a>(&'a mut self, size: usize) -> Self::Future<'a>;
22}
23
24/// `FileSource` treats files as a live stream of bytes.
25/// It will read til the end, and will resume reading when the file grows.
26/// It relies on `notify::RecommendedWatcher`, which is the OS's native notify mechanism.
27/// The async API allows you to request how many bytes you need, and it will wait for those
28/// bytes to come in a non-blocking fashion.
29///
30/// If the file is removed from the file system, the stream ends.
31pub struct FileSource {
32    #[allow(dead_code)]
33    watcher: Watcher,
34    receiver: Receiver<Result<Bytes, FileErr>>,
35    handle: Option<TaskHandle<(AsyncFile, Receiver<FileEvent>)>>,
36    notify: Sender<FileEvent>,
37    offset: u64,
38    file_size: u64,
39    buffer: ByteBuffer,
40}
41
42fn new_channel<T>() -> (Sender<T>, Receiver<T>) {
43    // This prevents over-read
44    bounded(1024)
45}
46
47impl FileSource {
48    pub async fn new(file_id: FileId, read_from: ReadFrom) -> Result<Self, FileErr> {
49        let mut file = AsyncFile::new_r(file_id).await?;
50        let offset = if matches!(read_from, ReadFrom::End) {
51            file.seek(SeqPos::End).await?
52        } else {
53            0
54        };
55        let buffer = ByteBuffer::new();
56        Self::new_with(file, offset, buffer)
57    }
58
59    pub(crate) fn new_with(
60        file: AsyncFile,
61        offset: u64,
62        buffer: ByteBuffer,
63    ) -> Result<Self, FileErr> {
64        let (sender, receiver) = new_channel();
65        let (notify, event) = unbounded();
66        let watcher = new_watcher(file.id(), notify.clone())?;
67        let file_size = file.size();
68
69        let handle = Self::spawn_task(file, sender, event);
70
71        Ok(Self {
72            watcher,
73            receiver,
74            handle: Some(handle),
75            notify,
76            offset,
77            file_size,
78            buffer,
79        })
80    }
81
82    /// Offset in the file that has been read up to (i.e. by the user, not by the OS).
83    #[inline]
84    pub fn offset(&self) -> u64 {
85        self.offset
86    }
87
88    /// Known size of the current file. This is not always up to date,
89    /// the file may have grown larger but not yet known.
90    #[inline]
91    pub fn file_size(&self) -> u64 {
92        self.file_size
93    }
94
95    fn spawn_task(
96        mut file: AsyncFile,
97        sender: Sender<Result<Bytes, FileErr>>,
98        events: Receiver<FileEvent>,
99    ) -> TaskHandle<(AsyncFile, Receiver<FileEvent>)> {
100        spawn_task(async move {
101            let mut wait = 0;
102            'outer: while !sender.is_disconnected() {
103                let bytes = match file.read().await {
104                    Ok(bytes) => bytes,
105                    Err(err) => {
106                        send_error(&sender, err).await;
107                        break;
108                    }
109                };
110                if !bytes.is_empty() {
111                    wait = 0;
112                    if sender.send_async(Ok(bytes)).await.is_err() {
113                        break;
114                    }
115                } else {
116                    // drain all remaining events
117                    loop {
118                        match events.try_recv() {
119                            Ok(FileEvent::Modify) => {}
120                            Ok(FileEvent::Remove) => {
121                                send_error(&sender, FileErr::FileRemoved).await;
122                                break 'outer;
123                            }
124                            Ok(FileEvent::Error(e)) => {
125                                send_error(&sender, FileErr::WatchError(e)).await;
126                                break 'outer;
127                            }
128                            Ok(FileEvent::Rewatch) => {
129                                log::debug!("FileSource: Rewatch");
130                                break 'outer;
131                            }
132                            Err(TryRecvError::Disconnected) => {
133                                break 'outer;
134                            }
135                            Err(TryRecvError::Empty) => break,
136                        }
137                    }
138                    // sleep, but there is no guarantee that OS will notify us timely, or at all
139                    let result = timeout(Duration::from_millis(wait), events.recv_async()).await;
140                    match result {
141                        Ok(event) => match event {
142                            Ok(FileEvent::Modify) => {
143                                // continue;
144                            }
145                            Ok(FileEvent::Remove) => {
146                                send_error(&sender, FileErr::FileRemoved).await;
147                                break 'outer;
148                            }
149                            Ok(FileEvent::Error(e)) => {
150                                send_error(&sender, FileErr::WatchError(e)).await;
151                                break 'outer;
152                            }
153                            Ok(FileEvent::Rewatch) => {
154                                log::debug!("FileSource: Rewatch");
155                                break 'outer;
156                            }
157                            Err(_) => {
158                                break 'outer;
159                            }
160                        },
161                        Err(_) => {
162                            // timed out
163                            // an exponential backoff from 1..1024
164                            wait = std::cmp::min(1.max(wait * 2), 1024);
165                        }
166                    }
167                }
168            }
169            log::debug!("FileSource task finish ({})", file.id().path());
170
171            async fn send_error(sender: &Sender<Result<Bytes, FileErr>>, e: FileErr) {
172                if let Err(e) = sender.send_async(Err(e)).await {
173                    log::error!("{}", e.into_inner().err().unwrap());
174                }
175            }
176
177            (file, events)
178        })
179    }
180
181    /// Stream bytes from file. If there is no bytes, it will wait until there are,
182    /// like `tail -f`.
183    ///
184    /// If there are some bytes in the buffer, it yields immediately.
185    pub async fn stream_bytes(&mut self) -> Result<Bytes, FileErr> {
186        loop {
187            let size = self.buffer.size();
188            if size > 0 {
189                self.offset += size as u64;
190                return Ok(self.buffer.consume(size));
191            }
192            self.receive().await?;
193        }
194    }
195
196    async fn receive(&mut self) -> Result<(), FileErr> {
197        match self.receiver.recv_async().await {
198            Ok(Ok(bytes)) => {
199                self.buffer.append(bytes);
200                self.file_size =
201                    std::cmp::max(self.file_size, self.offset + self.buffer.size() as u64);
202                Ok(())
203            }
204            Ok(Err(e)) => Err(e),
205            Err(_) => {
206                // Channel closed
207                Err(FileErr::TaskDead("FileSource::receive"))
208            }
209        }
210    }
211
212    /// Seek the file stream to a different position.
213    /// SeqNo is regarded as byte offset.
214    /// Returns the file offset after sought.
215    ///
216    /// Warning: This future must not be canceled.
217    pub async fn seek(&mut self, to: SeqPos) -> Result<u64, FileErr> {
218        let (mut file, sender, event, _) = self.end().await;
219        // Seek!
220        self.offset = file.seek(to).await?;
221        self.file_size = std::cmp::max(self.file_size, self.offset);
222        // Spawn new task
223        event.drain();
224        self.handle = Some(Self::spawn_task(file, sender, event));
225        Ok(self.offset)
226    }
227
228    /// Drain all bytes and end the stream
229    pub async fn drain(mut self) -> ByteBuffer {
230        // Prompt it to make a final read
231        self.notify
232            .send(FileEvent::Modify) // unbounded, never blocks
233            .expect("FileSource: task panic");
234        // Stop the task
235        let (_, _, _, mut buffer) = self.end().await;
236        // Drain all bytes in the channel
237        let mut drain = self.receiver.drain();
238        while let Some(Ok(bytes)) = drain.next() {
239            buffer.append(bytes);
240        }
241        buffer
242    }
243
244    /// Stop the existing task and reclaim it's resources.
245    /// Also clears the buffer.
246    pub(crate) async fn end(
247        &mut self,
248    ) -> (
249        AsyncFile,
250        Sender<Result<Bytes, FileErr>>,
251        Receiver<FileEvent>,
252        ByteBuffer,
253    ) {
254        // Create a fresh channel
255        let (sender, receiver) = new_channel();
256        // Drops the old channel; this may stop the task
257        self.receiver = receiver;
258        // Notify the task in case it is sleeping
259        self.notify
260            .send(FileEvent::Rewatch) // unbounded, never blocks
261            .expect("FileSource: task panic");
262        // Wait for task exit
263        let (file, event) = self
264            .handle
265            .take()
266            .expect("This future must not be canceled")
267            .await
268            .expect("FileSource: task panic");
269        // Clear the buffer
270        let buffer = self.buffer.take();
271        // Return
272        (file, sender, event, buffer)
273    }
274}
275
276impl ByteSource for FileSource {
277    type Future<'a> = FileSourceFuture<'a>;
278
279    /// Stream N bytes from file. If there is not enough bytes, it will wait until there are,
280    /// like `tail -f`.
281    ///
282    /// If there are enough bytes in the buffer, it yields immediately.
283    fn request_bytes(&mut self, size: usize) -> Self::Future<'_> {
284        FileSourceFuture {
285            size,
286            offset: &mut self.offset,
287            file_size: &mut self.file_size,
288            buffer: &mut self.buffer,
289            stream: self.receiver.stream(),
290        }
291    }
292}
293
294pub struct FileSourceFuture<'a> {
295    size: usize,
296    offset: &'a mut u64,
297    file_size: &'a mut u64,
298    buffer: &'a mut ByteBuffer,
299    stream: RecvStream<'a, Result<Bytes, FileErr>>,
300}
301
302/// A hand unrolled version of the following
303/// ```ignore
304/// loop {
305///     if self.buffer.size() >= size {
306///         return Ok(self.buffer.consume(size));
307///     }
308///     self.receive().await?;
309/// }
310/// ```
311impl<'a> Future for FileSourceFuture<'a> {
312    type Output = Result<Bytes, FileErr>;
313
314    fn poll(
315        mut self: std::pin::Pin<&mut Self>,
316        cx: &mut std::task::Context<'_>,
317    ) -> std::task::Poll<Self::Output> {
318        use std::task::Poll::{Pending, Ready};
319
320        loop {
321            if self.buffer.size() >= self.size {
322                let size = self.size;
323                *self.offset += size as u64;
324                return Ready(Ok(self.buffer.consume(size)));
325            }
326
327            match self.stream.poll_next_unpin(cx) {
328                Ready(res) => match res {
329                    Some(Ok(bytes)) => {
330                        self.buffer.append(bytes);
331                        *self.file_size = std::cmp::max(
332                            *self.file_size,
333                            *self.offset + self.buffer.size() as u64,
334                        );
335                    }
336                    Some(Err(e)) => {
337                        return Ready(Err(e));
338                    }
339                    None => {
340                        // Channel closed
341                        return Ready(Err(FileErr::TaskDead("Source FileSourceFuture")));
342                    }
343                },
344                Pending => {
345                    return Pending;
346                }
347            }
348        }
349    }
350}