sea_streamer_file/
sink.rs1use flume::{bounded, unbounded, Receiver, Sender, TryRecvError};
2
3use crate::{
4 watcher::{new_watcher, FileEvent, Watcher},
5 AsyncFile, Bytes, FileErr,
6};
7use sea_streamer_runtime::{spawn_task, TaskHandle};
8
9pub trait ByteSink {
10 fn write(&mut self, bytes: Bytes) -> Result<(), FileErr>;
12}
13
14const CHUNK_SIZE: usize = 1024 * 1024; pub struct FileSink {
20 watcher: Option<Watcher>,
21 sender: Sender<Request>,
22 update: Receiver<Update>,
23 handle: TaskHandle<AsyncFile>,
24}
25
26pub struct FileSinkWriter {
28 sender: Sender<Request>,
29}
30
31#[derive(Debug)]
32enum Request {
33 Bytes(Bytes),
34 Flush(u32),
35 SyncAll,
36 End,
37}
38
39#[derive(Debug)]
40enum Update {
41 FileErr(FileErr),
42 Receipt(u32),
43}
44
45#[derive(Debug)]
46struct QuotaFull;
47
48impl FileSink {
49 pub fn new(mut file: AsyncFile, mut quota: u64) -> Result<Self, FileErr> {
50 let (sender, pending) = unbounded();
51 let (notify, update) = bounded(0);
52 let (watch, event) = unbounded();
53 let watcher = new_watcher(file.id(), watch)?;
54 quota -= file.size();
55
56 let handle = spawn_task(async move {
57 let mut buffer = Vec::new();
58
59 'outer: loop {
60 let mut request: Result<Request, TryRecvError> = match pending.recv_async().await {
61 Ok(request) => Ok(request),
62 Err(_) => break,
63 };
64
65 let request: Result<Option<Request>, Result<(), QuotaFull>> = loop {
66 match request {
67 Ok(Request::Bytes(mut bytes)) => {
68 let mut len = bytes.len() as u64;
69 if quota < len {
70 bytes = bytes.pop(quota as usize);
71 len = quota;
72 }
73 buffer.append(&mut bytes.bytes());
75
76 quota -= len;
77 if quota == 0 {
78 break Err(Err(QuotaFull));
79 }
80 if buffer.len() >= CHUNK_SIZE {
81 break (Ok(None));
82 }
83 }
85 Ok(request) => break Ok(Some(request)),
86 Err(TryRecvError::Disconnected) => break Err(Ok(())),
87 Err(TryRecvError::Empty) => break Ok(None),
88 }
89 request = pending.try_recv();
90 };
91
92 if !buffer.is_empty() {
93 if let Err(err) = file.write_all(&buffer).await {
95 std::mem::drop(pending); send_error(¬ify, err).await;
97 break 'outer;
98 }
99 buffer.truncate(0);
100 }
101
102 if let Err(Ok(())) = request {
103 break 'outer;
104 } else if let Err(Err(QuotaFull)) = request {
105 std::mem::drop(pending); send_error(¬ify, FileErr::FileLimitExceeded).await;
107 break 'outer;
108 }
109
110 match request.unwrap() {
111 Some(Request::Flush(marker)) => {
112 if let Err(err) = file.flush().await {
113 std::mem::drop(pending); send_error(¬ify, err).await;
115 break 'outer;
116 }
117 if notify.send_async(Update::Receipt(marker)).await.is_err() {
118 break 'outer;
119 }
120 }
121 request @ Some(Request::SyncAll | Request::End) => {
122 if let Err(err) = file.sync_all().await {
123 std::mem::drop(pending); send_error(¬ify, err).await;
125 break 'outer;
126 }
127 match request {
128 Some(Request::SyncAll) => {
129 if notify.send_async(Update::Receipt(u32::MAX)).await.is_err() {
130 break 'outer;
131 }
132 }
133 Some(Request::End) => {
134 break 'outer;
135 }
136 _ => unreachable!(),
137 }
138 }
139 Some(_) => {
140 unreachable!();
141 }
142 None => (),
143 }
144
145 loop {
146 match event.try_recv() {
147 Ok(FileEvent::Modify) => {}
148 Ok(FileEvent::Remove) => {
149 std::mem::drop(pending); send_error(¬ify, FileErr::FileRemoved).await;
151 break 'outer;
152 }
153 Ok(FileEvent::Error(e)) => {
154 std::mem::drop(pending); send_error(¬ify, FileErr::WatchError(e)).await;
156 break 'outer;
157 }
158 Err(TryRecvError::Disconnected) => {
159 break 'outer;
160 }
161 Ok(FileEvent::Rewatch) => {
162 log::warn!("Why are we receiving this?");
163 break 'outer;
164 }
165 Err(TryRecvError::Empty) => break,
166 }
167 }
168 }
169
170 log::debug!("FileSink task finish ({})", file.id().path());
171 file
172 });
173
174 async fn send_error(notify: &Sender<Update>, e: FileErr) {
175 if let Err(e) = notify.send_async(Update::FileErr(e)).await {
176 log::error!("{:?}", e.into_inner());
177 }
178 }
179
180 Ok(Self {
181 watcher: Some(watcher),
182 sender,
183 update,
184 handle,
185 })
186 }
187
188 fn return_err(&mut self) -> Result<(), FileErr> {
189 if self.watcher.is_some() {
190 self.watcher.take();
192 }
193
194 Err(loop {
195 match self.update.try_recv() {
196 Ok(Update::FileErr(err)) => break err,
197 Ok(_) => (),
198 Err(err) => {
199 panic!("The task should always wait until the error has been sent: {err}")
200 }
201 }
202 })
203 }
204
205 pub async fn flush(&mut self, marker: u32) -> Result<(), FileErr> {
206 if self.sender.send(Request::Flush(marker)).is_err() {
207 self.return_err()
208 } else {
209 match self.update.recv_async().await {
210 Ok(Update::Receipt(receipt)) => {
211 assert_eq!(receipt, marker);
212 Ok(())
213 }
214 Ok(Update::FileErr(err)) => Err(err),
215 Err(_) => Err(FileErr::TaskDead("FileSink::flush")),
216 }
217 }
218 }
219
220 pub async fn sync_all(&mut self) -> Result<(), FileErr> {
221 if self.sender.send(Request::SyncAll).is_err() {
222 self.return_err()
223 } else {
224 loop {
225 match self.update.recv_async().await {
226 Ok(Update::Receipt(u32::MAX)) => return Ok(()),
227 Ok(Update::Receipt(_)) => (),
228 Ok(Update::FileErr(err)) => return Err(err),
229 Err(_) => return Err(FileErr::TaskDead("FileSink::sync_all")),
230 }
231 }
232 }
233 }
234
235 pub async fn end(mut self) -> Result<AsyncFile, FileErr> {
236 if self.sender.send(Request::End).is_err() {
237 Err(self.return_err().err().unwrap())
238 } else {
239 self.handle
240 .await
241 .map_err(|_| FileErr::TaskDead("FileSink::end"))
242 }
243 }
244
245 pub fn as_writer(&mut self) -> FileSinkWriter {
246 FileSinkWriter {
247 sender: self.sender.clone(),
248 }
249 }
250}
251
252impl ByteSink for FileSink {
253 fn write(&mut self, bytes: Bytes) -> Result<(), FileErr> {
255 if self.sender.send(Request::Bytes(bytes)).is_err() {
256 self.return_err()
257 } else {
258 Ok(())
259 }
260 }
261}
262
263impl FileSinkWriter {
264 pub fn end(self) -> std::io::Result<()> {
265 Ok(())
266 }
267}
268
269impl std::io::Write for FileSinkWriter {
270 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
273 let len = buf.len();
274 use std::io::{Error, ErrorKind};
275 if self
276 .sender
277 .send(Request::Bytes(Bytes::from_slice(buf)))
278 .is_err()
279 {
280 Err(Error::new(ErrorKind::BrokenPipe, "Failed to write"))
281 } else {
282 Ok(len)
283 }
284 }
285
286 fn flush(&mut self) -> std::io::Result<()> {
288 Ok(())
289 }
290}