flowly_io/
file.rs

1use std::{
2    ops::{Deref, DerefMut},
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use bytes::Bytes;
8use flowly_core::{DataFrame, FrameSource, MemBlock};
9use flowly_service::{Context, Service};
10use glob::MatchOptions;
11use tokio::io::AsyncReadExt;
12
13use crate::error::Error;
14
15#[derive(Debug, Default, Clone, PartialEq, Eq)]
16pub struct FileSouce {
17    path: String,
18}
19
20impl FrameSource for FileSouce {
21    type Source = flowly_core::Void;
22
23    fn source(&self) -> &Self::Source {
24        unreachable!()
25    }
26
27    fn kind(&self) -> flowly_core::FrameSourceKind {
28        flowly_core::FrameSourceKind::File
29    }
30
31    fn url(&self) -> &str {
32        &self.path
33    }
34
35    fn name(&self) -> &str {
36        &self.path
37    }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct WithSource<E> {
42    pub inner: E,
43    pub source: Arc<FileSouce>,
44}
45
46impl<E> WithSource<E> {
47    pub fn new(inner: E, source: Arc<FileSouce>) -> Self {
48        Self { inner, source }
49    }
50}
51
52impl<E> Deref for WithSource<E> {
53    type Target = E;
54
55    fn deref(&self) -> &Self::Target {
56        &self.inner
57    }
58}
59impl<E> DerefMut for WithSource<E> {
60    fn deref_mut(&mut self) -> &mut Self::Target {
61        &mut self.inner
62    }
63}
64
65impl<U: ?Sized, E: AsRef<U>> AsRef<U> for WithSource<E> {
66    fn as_ref(&self) -> &U {
67        self.inner.as_ref()
68    }
69}
70
71impl<E> DataFrame for WithSource<E>
72where
73    E: MemBlock + Clone,
74{
75    type Source = Arc<FileSouce>;
76    type Chunk = E;
77
78    fn source(&self) -> &Self::Source {
79        &self.source
80    }
81
82    fn chunks(&self) -> impl Send + Iterator<Item = <Self::Chunk as MemBlock>::Ref<'_>> {
83        std::iter::once(self.inner.borrow())
84    }
85
86    fn into_chunks(self) -> impl Send + Iterator<Item = Self::Chunk> {
87        std::iter::once(self.inner)
88    }
89}
90
91#[derive(Debug, Clone)]
92pub struct DirReader {
93    pattern: String,
94    options: MatchOptions,
95}
96
97impl DirReader {
98    pub fn new(pattern: String, options: MatchOptions) -> Self {
99        Self { pattern, options }
100    }
101}
102
103impl<P: AsRef<Path> + Send> Service<P> for DirReader {
104    type Out = Result<WithSource<PathBuf>, Error>;
105
106    fn handle(&mut self, dir: P, cx: &Context) -> impl futures::Stream<Item = Self::Out> + Send {
107        async_stream::stream! {
108            let pattern = format!("{}/{}", dir.as_ref().display().to_string().trim_end_matches('/'), self.pattern);
109            let shared = Arc::new(FileSouce { path: pattern });
110
111            match glob::glob_with(&shared.path, self.options) {
112                Ok(paths) => {
113                    for p in paths {
114                        match cx.abort_recv.has_changed() {
115                            Ok(true) | Err(_) => break,
116                            _ => ()
117                        }
118
119                        yield p.map(|inner| WithSource::new(inner , shared.clone())).map_err(Into::into);
120                    }
121                }
122
123                Err(err) => yield Err(err.into()),
124            }
125        }
126    }
127}
128
129#[derive(Debug, Clone, Copy)]
130pub struct FileReader {
131    chunk_size: usize,
132}
133
134impl FileReader {
135    pub fn new(chunk_size: usize) -> Self {
136        Self { chunk_size }
137    }
138}
139
140impl Default for FileReader {
141    fn default() -> Self {
142        Self { chunk_size: 8192 }
143    }
144}
145
146impl<P: AsRef<Path> + Send + Sync> Service<P> for FileReader {
147    type Out = Result<WithSource<Bytes>, Error>;
148
149    fn handle(&mut self, path: P, cx: &Context) -> impl futures::Stream<Item = Self::Out> + Send {
150        async_stream::stream! {
151            let mut buf = vec![0u8; self.chunk_size];
152
153            match tokio::fs::File::open(&path).await {
154                Ok(mut file) => {
155                    let shared = Arc::new(FileSouce { path: path.as_ref().display().to_string() });
156
157                    loop {
158                        match cx.abort_recv.has_changed() {
159                            Ok(true) | Err(_) => break,
160                            _ => ()
161                        }
162
163                        yield match file.read(&mut buf[..]).await {
164                            Ok(0) => break,
165                            Ok(n) => Ok(WithSource::new(buf[0..n].to_vec().into(), shared.clone())),
166                            Err(err) => Err(err.into())
167                        };
168                    }
169                },
170                Err(err) => yield Err(err.into()),
171            }
172        }
173    }
174}