1use std::time::Duration;
2
3use flume::{bounded, r#async::RecvStream, unbounded, Receiver, Sender, TryRecvError};
4use sea_streamer_types::{
5 export::futures::{Future, StreamExt},
6 SeqPos,
7};
8
9use crate::{
10 watcher::{new_watcher, FileEvent, Watcher},
11 AsyncFile, ByteBuffer, Bytes, FileErr, FileId, ReadFrom,
12};
13use sea_streamer_runtime::{spawn_task, timeout, TaskHandle};
14
15pub trait ByteSource {
16 type Future<'a>: Future<Output = Result<Bytes, FileErr>>
17 where
18 Self: 'a;
19
20 #[allow(clippy::needless_lifetimes)]
21 fn request_bytes<'a>(&'a mut self, size: usize) -> Self::Future<'a>;
22}
23
24pub struct FileSource {
32 #[allow(dead_code)]
33 watcher: Watcher,
34 receiver: Receiver<Result<Bytes, FileErr>>,
35 handle: Option<TaskHandle<(AsyncFile, Receiver<FileEvent>)>>,
36 notify: Sender<FileEvent>,
37 offset: u64,
38 file_size: u64,
39 buffer: ByteBuffer,
40}
41
42fn new_channel<T>() -> (Sender<T>, Receiver<T>) {
43 bounded(1024)
45}
46
47impl FileSource {
48 pub async fn new(file_id: FileId, read_from: ReadFrom) -> Result<Self, FileErr> {
49 let mut file = AsyncFile::new_r(file_id).await?;
50 let offset = if matches!(read_from, ReadFrom::End) {
51 file.seek(SeqPos::End).await?
52 } else {
53 0
54 };
55 let buffer = ByteBuffer::new();
56 Self::new_with(file, offset, buffer)
57 }
58
59 pub(crate) fn new_with(
60 file: AsyncFile,
61 offset: u64,
62 buffer: ByteBuffer,
63 ) -> Result<Self, FileErr> {
64 let (sender, receiver) = new_channel();
65 let (notify, event) = unbounded();
66 let watcher = new_watcher(file.id(), notify.clone())?;
67 let file_size = file.size();
68
69 let handle = Self::spawn_task(file, sender, event);
70
71 Ok(Self {
72 watcher,
73 receiver,
74 handle: Some(handle),
75 notify,
76 offset,
77 file_size,
78 buffer,
79 })
80 }
81
82 #[inline]
84 pub fn offset(&self) -> u64 {
85 self.offset
86 }
87
88 #[inline]
91 pub fn file_size(&self) -> u64 {
92 self.file_size
93 }
94
95 fn spawn_task(
96 mut file: AsyncFile,
97 sender: Sender<Result<Bytes, FileErr>>,
98 events: Receiver<FileEvent>,
99 ) -> TaskHandle<(AsyncFile, Receiver<FileEvent>)> {
100 spawn_task(async move {
101 let mut wait = 0;
102 'outer: while !sender.is_disconnected() {
103 let bytes = match file.read().await {
104 Ok(bytes) => bytes,
105 Err(err) => {
106 send_error(&sender, err).await;
107 break;
108 }
109 };
110 if !bytes.is_empty() {
111 wait = 0;
112 if sender.send_async(Ok(bytes)).await.is_err() {
113 break;
114 }
115 } else {
116 loop {
118 match events.try_recv() {
119 Ok(FileEvent::Modify) => {}
120 Ok(FileEvent::Remove) => {
121 send_error(&sender, FileErr::FileRemoved).await;
122 break 'outer;
123 }
124 Ok(FileEvent::Error(e)) => {
125 send_error(&sender, FileErr::WatchError(e)).await;
126 break 'outer;
127 }
128 Ok(FileEvent::Rewatch) => {
129 log::debug!("FileSource: Rewatch");
130 break 'outer;
131 }
132 Err(TryRecvError::Disconnected) => {
133 break 'outer;
134 }
135 Err(TryRecvError::Empty) => break,
136 }
137 }
138 let result = timeout(Duration::from_millis(wait), events.recv_async()).await;
140 match result {
141 Ok(event) => match event {
142 Ok(FileEvent::Modify) => {
143 }
145 Ok(FileEvent::Remove) => {
146 send_error(&sender, FileErr::FileRemoved).await;
147 break 'outer;
148 }
149 Ok(FileEvent::Error(e)) => {
150 send_error(&sender, FileErr::WatchError(e)).await;
151 break 'outer;
152 }
153 Ok(FileEvent::Rewatch) => {
154 log::debug!("FileSource: Rewatch");
155 break 'outer;
156 }
157 Err(_) => {
158 break 'outer;
159 }
160 },
161 Err(_) => {
162 wait = std::cmp::min(1.max(wait * 2), 1024);
165 }
166 }
167 }
168 }
169 log::debug!("FileSource task finish ({})", file.id().path());
170
171 async fn send_error(sender: &Sender<Result<Bytes, FileErr>>, e: FileErr) {
172 if let Err(e) = sender.send_async(Err(e)).await {
173 log::error!("{}", e.into_inner().err().unwrap());
174 }
175 }
176
177 (file, events)
178 })
179 }
180
181 pub async fn stream_bytes(&mut self) -> Result<Bytes, FileErr> {
186 loop {
187 let size = self.buffer.size();
188 if size > 0 {
189 self.offset += size as u64;
190 return Ok(self.buffer.consume(size));
191 }
192 self.receive().await?;
193 }
194 }
195
196 async fn receive(&mut self) -> Result<(), FileErr> {
197 match self.receiver.recv_async().await {
198 Ok(Ok(bytes)) => {
199 self.buffer.append(bytes);
200 self.file_size =
201 std::cmp::max(self.file_size, self.offset + self.buffer.size() as u64);
202 Ok(())
203 }
204 Ok(Err(e)) => Err(e),
205 Err(_) => {
206 Err(FileErr::TaskDead("FileSource::receive"))
208 }
209 }
210 }
211
212 pub async fn seek(&mut self, to: SeqPos) -> Result<u64, FileErr> {
218 let (mut file, sender, event, _) = self.end().await;
219 self.offset = file.seek(to).await?;
221 self.file_size = std::cmp::max(self.file_size, self.offset);
222 event.drain();
224 self.handle = Some(Self::spawn_task(file, sender, event));
225 Ok(self.offset)
226 }
227
228 pub async fn drain(mut self) -> ByteBuffer {
230 self.notify
232 .send(FileEvent::Modify) .expect("FileSource: task panic");
234 let (_, _, _, mut buffer) = self.end().await;
236 let mut drain = self.receiver.drain();
238 while let Some(Ok(bytes)) = drain.next() {
239 buffer.append(bytes);
240 }
241 buffer
242 }
243
244 pub(crate) async fn end(
247 &mut self,
248 ) -> (
249 AsyncFile,
250 Sender<Result<Bytes, FileErr>>,
251 Receiver<FileEvent>,
252 ByteBuffer,
253 ) {
254 let (sender, receiver) = new_channel();
256 self.receiver = receiver;
258 self.notify
260 .send(FileEvent::Rewatch) .expect("FileSource: task panic");
262 let (file, event) = self
264 .handle
265 .take()
266 .expect("This future must not be canceled")
267 .await
268 .expect("FileSource: task panic");
269 let buffer = self.buffer.take();
271 (file, sender, event, buffer)
273 }
274}
275
276impl ByteSource for FileSource {
277 type Future<'a> = FileSourceFuture<'a>;
278
279 fn request_bytes(&mut self, size: usize) -> Self::Future<'_> {
284 FileSourceFuture {
285 size,
286 offset: &mut self.offset,
287 file_size: &mut self.file_size,
288 buffer: &mut self.buffer,
289 stream: self.receiver.stream(),
290 }
291 }
292}
293
294pub struct FileSourceFuture<'a> {
295 size: usize,
296 offset: &'a mut u64,
297 file_size: &'a mut u64,
298 buffer: &'a mut ByteBuffer,
299 stream: RecvStream<'a, Result<Bytes, FileErr>>,
300}
301
302impl<'a> Future for FileSourceFuture<'a> {
312 type Output = Result<Bytes, FileErr>;
313
314 fn poll(
315 mut self: std::pin::Pin<&mut Self>,
316 cx: &mut std::task::Context<'_>,
317 ) -> std::task::Poll<Self::Output> {
318 use std::task::Poll::{Pending, Ready};
319
320 loop {
321 if self.buffer.size() >= self.size {
322 let size = self.size;
323 *self.offset += size as u64;
324 return Ready(Ok(self.buffer.consume(size)));
325 }
326
327 match self.stream.poll_next_unpin(cx) {
328 Ready(res) => match res {
329 Some(Ok(bytes)) => {
330 self.buffer.append(bytes);
331 *self.file_size = std::cmp::max(
332 *self.file_size,
333 *self.offset + self.buffer.size() as u64,
334 );
335 }
336 Some(Err(e)) => {
337 return Ready(Err(e));
338 }
339 None => {
340 return Ready(Err(FileErr::TaskDead("Source FileSourceFuture")));
342 }
343 },
344 Pending => {
345 return Pending;
346 }
347 }
348 }
349 }
350}