sea_streamer_file/
sink.rs

1use flume::{bounded, unbounded, Receiver, Sender, TryRecvError};
2
3use crate::{
4    watcher::{new_watcher, FileEvent, Watcher},
5    AsyncFile, Bytes, FileErr,
6};
7use sea_streamer_runtime::{spawn_task, TaskHandle};
8
9pub trait ByteSink {
10    /// This should never block.
11    fn write(&mut self, bytes: Bytes) -> Result<(), FileErr>;
12}
13
14const CHUNK_SIZE: usize = 1024 * 1024; // 1 MiB
15
16/// Buffered file writer.
17///
18/// If the file is removed from the file system, the stream ends.
19pub struct FileSink {
20    watcher: Option<Watcher>,
21    sender: Sender<Request>,
22    update: Receiver<Update>,
23    handle: TaskHandle<AsyncFile>,
24}
25
26/// A delegate that impl std::io::Write
27pub struct FileSinkWriter {
28    sender: Sender<Request>,
29}
30
31#[derive(Debug)]
32enum Request {
33    Bytes(Bytes),
34    Flush(u32),
35    SyncAll,
36    End,
37}
38
39#[derive(Debug)]
40enum Update {
41    FileErr(FileErr),
42    Receipt(u32),
43}
44
45#[derive(Debug)]
46struct QuotaFull;
47
48impl FileSink {
49    pub fn new(mut file: AsyncFile, mut quota: u64) -> Result<Self, FileErr> {
50        let (sender, pending) = unbounded();
51        let (notify, update) = bounded(0);
52        let (watch, event) = unbounded();
53        let watcher = new_watcher(file.id(), watch)?;
54        quota -= file.size();
55
56        let handle = spawn_task(async move {
57            let mut buffer = Vec::new();
58
59            'outer: loop {
60                let mut request: Result<Request, TryRecvError> = match pending.recv_async().await {
61                    Ok(request) => Ok(request),
62                    Err(_) => break,
63                };
64
65                let request: Result<Option<Request>, Result<(), QuotaFull>> = loop {
66                    match request {
67                        Ok(Request::Bytes(mut bytes)) => {
68                            let mut len = bytes.len() as u64;
69                            if quota < len {
70                                bytes = bytes.pop(quota as usize);
71                                len = quota;
72                            }
73                            // accumulate bytes ...
74                            buffer.append(&mut bytes.bytes());
75
76                            quota -= len;
77                            if quota == 0 {
78                                break Err(Err(QuotaFull));
79                            }
80                            if buffer.len() >= CHUNK_SIZE {
81                                break (Ok(None));
82                            }
83                            // continue; delay write until 1) some other request 2) some error 3) queue is empty
84                        }
85                        Ok(request) => break Ok(Some(request)),
86                        Err(TryRecvError::Disconnected) => break Err(Ok(())),
87                        Err(TryRecvError::Empty) => break Ok(None),
88                    }
89                    request = pending.try_recv();
90                };
91
92                if !buffer.is_empty() {
93                    // ... write all in one shot
94                    if let Err(err) = file.write_all(&buffer).await {
95                        std::mem::drop(pending); // trigger error
96                        send_error(&notify, err).await;
97                        break 'outer;
98                    }
99                    buffer.truncate(0);
100                }
101
102                if let Err(Ok(())) = request {
103                    break 'outer;
104                } else if let Err(Err(QuotaFull)) = request {
105                    std::mem::drop(pending); // trigger error
106                    send_error(&notify, FileErr::FileLimitExceeded).await;
107                    break 'outer;
108                }
109
110                match request.unwrap() {
111                    Some(Request::Flush(marker)) => {
112                        if let Err(err) = file.flush().await {
113                            std::mem::drop(pending); // trigger error
114                            send_error(&notify, err).await;
115                            break 'outer;
116                        }
117                        if notify.send_async(Update::Receipt(marker)).await.is_err() {
118                            break 'outer;
119                        }
120                    }
121                    request @ Some(Request::SyncAll | Request::End) => {
122                        if let Err(err) = file.sync_all().await {
123                            std::mem::drop(pending); // trigger error
124                            send_error(&notify, err).await;
125                            break 'outer;
126                        }
127                        match request {
128                            Some(Request::SyncAll) => {
129                                if notify.send_async(Update::Receipt(u32::MAX)).await.is_err() {
130                                    break 'outer;
131                                }
132                            }
133                            Some(Request::End) => {
134                                break 'outer;
135                            }
136                            _ => unreachable!(),
137                        }
138                    }
139                    Some(_) => {
140                        unreachable!();
141                    }
142                    None => (),
143                }
144
145                loop {
146                    match event.try_recv() {
147                        Ok(FileEvent::Modify) => {}
148                        Ok(FileEvent::Remove) => {
149                            std::mem::drop(pending); // trigger error
150                            send_error(&notify, FileErr::FileRemoved).await;
151                            break 'outer;
152                        }
153                        Ok(FileEvent::Error(e)) => {
154                            std::mem::drop(pending); // trigger error
155                            send_error(&notify, FileErr::WatchError(e)).await;
156                            break 'outer;
157                        }
158                        Err(TryRecvError::Disconnected) => {
159                            break 'outer;
160                        }
161                        Ok(FileEvent::Rewatch) => {
162                            log::warn!("Why are we receiving this?");
163                            break 'outer;
164                        }
165                        Err(TryRecvError::Empty) => break,
166                    }
167                }
168            }
169
170            log::debug!("FileSink task finish ({})", file.id().path());
171            file
172        });
173
174        async fn send_error(notify: &Sender<Update>, e: FileErr) {
175            if let Err(e) = notify.send_async(Update::FileErr(e)).await {
176                log::error!("{:?}", e.into_inner());
177            }
178        }
179
180        Ok(Self {
181            watcher: Some(watcher),
182            sender,
183            update,
184            handle,
185        })
186    }
187
188    fn return_err(&mut self) -> Result<(), FileErr> {
189        if self.watcher.is_some() {
190            // kill the watcher so we don't leak
191            self.watcher.take();
192        }
193
194        Err(loop {
195            match self.update.try_recv() {
196                Ok(Update::FileErr(err)) => break err,
197                Ok(_) => (),
198                Err(err) => {
199                    panic!("The task should always wait until the error has been sent: {err}")
200                }
201            }
202        })
203    }
204
205    pub async fn flush(&mut self, marker: u32) -> Result<(), FileErr> {
206        if self.sender.send(Request::Flush(marker)).is_err() {
207            self.return_err()
208        } else {
209            match self.update.recv_async().await {
210                Ok(Update::Receipt(receipt)) => {
211                    assert_eq!(receipt, marker);
212                    Ok(())
213                }
214                Ok(Update::FileErr(err)) => Err(err),
215                Err(_) => Err(FileErr::TaskDead("FileSink::flush")),
216            }
217        }
218    }
219
220    pub async fn sync_all(&mut self) -> Result<(), FileErr> {
221        if self.sender.send(Request::SyncAll).is_err() {
222            self.return_err()
223        } else {
224            loop {
225                match self.update.recv_async().await {
226                    Ok(Update::Receipt(u32::MAX)) => return Ok(()),
227                    Ok(Update::Receipt(_)) => (),
228                    Ok(Update::FileErr(err)) => return Err(err),
229                    Err(_) => return Err(FileErr::TaskDead("FileSink::sync_all")),
230                }
231            }
232        }
233    }
234
235    pub async fn end(mut self) -> Result<AsyncFile, FileErr> {
236        if self.sender.send(Request::End).is_err() {
237            Err(self.return_err().err().unwrap())
238        } else {
239            self.handle
240                .await
241                .map_err(|_| FileErr::TaskDead("FileSink::end"))
242        }
243    }
244
245    pub fn as_writer(&mut self) -> FileSinkWriter {
246        FileSinkWriter {
247            sender: self.sender.clone(),
248        }
249    }
250}
251
252impl ByteSink for FileSink {
253    /// This method never blocks
254    fn write(&mut self, bytes: Bytes) -> Result<(), FileErr> {
255        if self.sender.send(Request::Bytes(bytes)).is_err() {
256            self.return_err()
257        } else {
258            Ok(())
259        }
260    }
261}
262
263impl FileSinkWriter {
264    pub fn end(self) -> std::io::Result<()> {
265        Ok(())
266    }
267}
268
269impl std::io::Write for FileSinkWriter {
270    /// Nothing has actually been written, because it will be sent to another async thread and queued.
271    /// Call [`FileSink::flush`] afterwards.
272    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
273        let len = buf.len();
274        use std::io::{Error, ErrorKind};
275        if self
276            .sender
277            .send(Request::Bytes(Bytes::from_slice(buf)))
278            .is_err()
279        {
280            Err(Error::new(ErrorKind::BrokenPipe, "Failed to write"))
281        } else {
282            Ok(len)
283        }
284    }
285
286    /// This has no effect. Please call the async [`FileSink::flush`] and await it.
287    fn flush(&mut self) -> std::io::Result<()> {
288        Ok(())
289    }
290}