flowly_io/
file.rs

1use std::{
2    ops::{Deref, DerefMut},
3    path::{Path, PathBuf},
4    pin::pin,
5    sync::Arc,
6};
7
8use bytes::Bytes;
9use flowly_service::Service;
10use futures::StreamExt;
11use glob::MatchOptions;
12use tokio::io::AsyncReadExt;
13
14use crate::error::Error;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct Spread<E, S> {
18    pub inner: E,
19    pub shared: Arc<S>,
20}
21
22impl<E, S> Spread<E, S> {
23    pub fn new(inner: E, shared: Arc<S>) -> Self {
24        Self { inner, shared }
25    }
26}
27
28impl<E, S> Deref for Spread<E, S> {
29    type Target = E;
30
31    fn deref(&self) -> &Self::Target {
32        &self.inner
33    }
34}
35impl<E, S> DerefMut for Spread<E, S> {
36    fn deref_mut(&mut self) -> &mut Self::Target {
37        &mut self.inner
38    }
39}
40
41impl<U: ?Sized, E: AsRef<U>, S> AsRef<U> for Spread<E, S> {
42    fn as_ref(&self) -> &U {
43        self.inner.as_ref()
44    }
45}
46
47#[derive(Debug, Clone)]
48pub struct DirReader {
49    pattern: String,
50    options: MatchOptions,
51}
52
53impl DirReader {
54    pub fn new(pattern: String, options: MatchOptions) -> Self {
55        Self { pattern, options }
56    }
57}
58
59impl<P, E> Service<Result<P, E>> for DirReader
60where
61    P: AsRef<Path> + Send + Sync,
62    E: std::error::Error + Send + Sync + 'static,
63{
64    type Out = Result<Spread<PathBuf, P>, Error<E>>;
65
66    fn handle(
67        self,
68        input: impl futures::Stream<Item = Result<P, E>> + Send,
69    ) -> impl futures::Stream<Item = Self::Out> + Send {
70        async_stream::stream! {
71            let mut stream = pin!(input);
72
73            while let Some(res) = stream.next().await {
74                match res {
75                    Ok(dir) => {
76                        let pattern = format!("{}/{}", dir.as_ref().display().to_string().trim_end_matches('/'), self.pattern);
77                        let shared = Arc::new(dir);
78
79                        match glob::glob_with(&pattern, self.options) {
80                            Ok(paths) => {
81                                for p in paths {
82                                    yield p.map(|inner| Spread::new(inner , shared.clone())).map_err(Into::into);
83                                }
84                            }
85
86                            Err(err) => yield Err(err.into()),
87                        }
88                    }
89
90                    Err(err) => yield Err(Error::Other(err)),
91                }
92
93            }
94
95        }
96    }
97}
98
99#[derive(Debug, Clone, Copy)]
100pub struct FileReader {
101    chunk_size: usize,
102}
103
104impl FileReader {
105    pub fn new(chunk_size: usize) -> Self {
106        Self { chunk_size }
107    }
108}
109
110impl Default for FileReader {
111    fn default() -> Self {
112        Self { chunk_size: 8192 }
113    }
114}
115
116impl<P: AsRef<Path> + Send + Sync, E: std::error::Error + Send + Sync + 'static>
117    Service<Result<P, E>> for FileReader
118{
119    type Out = Result<Spread<Bytes, P>, Error<E>>;
120
121    fn handle(
122        self,
123        input: impl futures::Stream<Item = Result<P, E>> + Send,
124    ) -> impl futures::Stream<Item = Self::Out> + Send {
125        async_stream::stream! {
126            let mut input = pin!(input);
127            let mut buf = vec![0u8; self.chunk_size];
128
129            while let Some(res) = input.next().await  {
130                match res {
131                    Ok(path) => {
132                        println!("path {}", path.as_ref().display());
133                        match tokio::fs::File::open(&path).await {
134                            Ok(mut file) => {
135                                let shared = Arc::new(path);
136                                loop {
137                                    yield match file.read(&mut buf[..]).await {
138                                        Ok(0) => break,
139                                        Ok(n) => Ok(Spread::new(buf[0..n].to_vec().into(), shared.clone())),
140                                        Err(err) => Err(err.into())
141                                    };
142                                }
143                            },
144                            Err(err) => yield Err(err.into()),
145                        }
146                    }
147
148                    Err(err) => yield Err(Error::Other(err)),
149                }
150            }
151        }
152    }
153}