1use std::{
2 ops::{Deref, DerefMut},
3 path::{Path, PathBuf},
4 sync::Arc,
5};
6
7use bytes::Bytes;
8use flowly_core::{DataFrame, FrameSource, MemBlock};
9use flowly_service::{Context, Service};
10use glob::MatchOptions;
11use tokio::io::AsyncReadExt;
12
13use crate::error::Error;
14
15#[derive(Debug, Default, Clone, PartialEq, Eq)]
16pub struct FileSouce {
17 path: String,
18}
19
20impl FrameSource for FileSouce {
21 type Source = flowly_core::Void;
22
23 fn source(&self) -> &Self::Source {
24 unreachable!()
25 }
26
27 fn kind(&self) -> flowly_core::FrameSourceKind {
28 flowly_core::FrameSourceKind::File
29 }
30
31 fn url(&self) -> &str {
32 &self.path
33 }
34
35 fn name(&self) -> &str {
36 &self.path
37 }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct WithSource<E> {
42 pub inner: E,
43 pub source: Arc<FileSouce>,
44}
45
46impl<E> WithSource<E> {
47 pub fn new(inner: E, source: Arc<FileSouce>) -> Self {
48 Self { inner, source }
49 }
50}
51
52impl<E> Deref for WithSource<E> {
53 type Target = E;
54
55 fn deref(&self) -> &Self::Target {
56 &self.inner
57 }
58}
59impl<E> DerefMut for WithSource<E> {
60 fn deref_mut(&mut self) -> &mut Self::Target {
61 &mut self.inner
62 }
63}
64
65impl<U: ?Sized, E: AsRef<U>> AsRef<U> for WithSource<E> {
66 fn as_ref(&self) -> &U {
67 self.inner.as_ref()
68 }
69}
70
71impl<E> DataFrame for WithSource<E>
72where
73 E: MemBlock + Clone,
74{
75 type Source = Arc<FileSouce>;
76 type Chunk = E;
77
78 fn source(&self) -> &Self::Source {
79 &self.source
80 }
81
82 fn chunks(&self) -> impl Send + Iterator<Item = <Self::Chunk as MemBlock>::Ref<'_>> {
83 std::iter::once(self.inner.borrow())
84 }
85
86 fn into_chunks(self) -> impl Send + Iterator<Item = Self::Chunk> {
87 std::iter::once(self.inner)
88 }
89}
90
91#[derive(Debug, Clone)]
92pub struct DirReader {
93 pattern: String,
94 options: MatchOptions,
95}
96
97impl DirReader {
98 pub fn new(pattern: String, options: MatchOptions) -> Self {
99 Self { pattern, options }
100 }
101}
102
103impl<P: AsRef<Path> + Send> Service<P> for DirReader {
104 type Out = Result<WithSource<PathBuf>, Error>;
105
106 fn handle(&self, dir: P, cx: &Context) -> impl futures::Stream<Item = Self::Out> + Send {
107 async_stream::stream! {
108 let pattern = format!("{}/{}", dir.as_ref().display().to_string().trim_end_matches('/'), self.pattern);
109 let shared = Arc::new(FileSouce { path: pattern });
110
111 match glob::glob_with(&shared.path, self.options) {
112 Ok(paths) => {
113 for p in paths {
114 match cx.abort_recv.has_changed() {
115 Ok(true) | Err(_) => break,
116 _ => ()
117 }
118
119 yield p.map(|inner| WithSource::new(inner , shared.clone())).map_err(Into::into);
120 }
121 }
122
123 Err(err) => yield Err(err.into()),
124 }
125 }
126 }
127}
128
129#[derive(Debug, Clone, Copy)]
130pub struct FileReader {
131 chunk_size: usize,
132}
133
134impl FileReader {
135 pub fn new(chunk_size: usize) -> Self {
136 Self { chunk_size }
137 }
138}
139
140impl Default for FileReader {
141 fn default() -> Self {
142 Self { chunk_size: 8192 }
143 }
144}
145
146impl<P: AsRef<Path> + Send + Sync> Service<P> for FileReader {
147 type Out = Result<WithSource<Bytes>, Error>;
148
149 fn handle(&self, path: P, cx: &Context) -> impl futures::Stream<Item = Self::Out> + Send {
150 async_stream::stream! {
151 let mut buf = vec![0u8; self.chunk_size];
152
153 match tokio::fs::File::open(&path).await {
154 Ok(mut file) => {
155 let shared = Arc::new(FileSouce { path: path.as_ref().display().to_string() });
156
157 loop {
158 match cx.abort_recv.has_changed() {
159 Ok(true) | Err(_) => break,
160 _ => ()
161 }
162
163 yield match file.read(&mut buf[..]).await {
164 Ok(0) => break,
165 Ok(n) => Ok(WithSource::new(buf[0..n].to_vec().into(), shared.clone())),
166 Err(err) => Err(err.into())
167 };
168 }
169 },
170 Err(err) => yield Err(err.into()),
171 }
172 }
173 }
174}