1use std::{
2 ops::{Deref, DerefMut},
3 path::{Path, PathBuf},
4 pin::pin,
5 sync::Arc,
6};
7
8use bytes::Bytes;
9use flowly_service::Service;
10use futures::StreamExt;
11use glob::MatchOptions;
12use tokio::io::AsyncReadExt;
13
14use crate::error::Error;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct Spread<E, S> {
18 pub inner: E,
19 pub shared: Arc<S>,
20}
21
22impl<E, S> Spread<E, S> {
23 pub fn new(inner: E, shared: Arc<S>) -> Self {
24 Self { inner, shared }
25 }
26}
27
28impl<E, S> Deref for Spread<E, S> {
29 type Target = E;
30
31 fn deref(&self) -> &Self::Target {
32 &self.inner
33 }
34}
35impl<E, S> DerefMut for Spread<E, S> {
36 fn deref_mut(&mut self) -> &mut Self::Target {
37 &mut self.inner
38 }
39}
40
41impl<U: ?Sized, E: AsRef<U>, S> AsRef<U> for Spread<E, S> {
42 fn as_ref(&self) -> &U {
43 self.inner.as_ref()
44 }
45}
46
47#[derive(Debug, Clone)]
48pub struct DirReader {
49 pattern: String,
50 options: MatchOptions,
51}
52
53impl DirReader {
54 pub fn new(pattern: String, options: MatchOptions) -> Self {
55 Self { pattern, options }
56 }
57}
58
59impl<P, E> Service<Result<P, E>> for DirReader
60where
61 P: AsRef<Path> + Send + Sync,
62 E: std::error::Error + Send + Sync + 'static,
63{
64 type Out = Result<Spread<PathBuf, P>, Error<E>>;
65
66 fn handle(
67 self,
68 input: impl futures::Stream<Item = Result<P, E>> + Send,
69 ) -> impl futures::Stream<Item = Self::Out> + Send {
70 async_stream::stream! {
71 let mut stream = pin!(input);
72
73 while let Some(res) = stream.next().await {
74 match res {
75 Ok(dir) => {
76 let pattern = format!("{}/{}", dir.as_ref().display().to_string().trim_end_matches('/'), self.pattern);
77 let shared = Arc::new(dir);
78
79 match glob::glob_with(&pattern, self.options) {
80 Ok(paths) => {
81 for p in paths {
82 yield p.map(|inner| Spread::new(inner , shared.clone())).map_err(Into::into);
83 }
84 }
85
86 Err(err) => yield Err(err.into()),
87 }
88 }
89
90 Err(err) => yield Err(Error::Other(err)),
91 }
92
93 }
94
95 }
96 }
97}
98
99#[derive(Debug, Clone, Copy)]
100pub struct FileReader {
101 chunk_size: usize,
102}
103
104impl FileReader {
105 pub fn new(chunk_size: usize) -> Self {
106 Self { chunk_size }
107 }
108}
109
110impl Default for FileReader {
111 fn default() -> Self {
112 Self { chunk_size: 8192 }
113 }
114}
115
116impl<P: AsRef<Path> + Send + Sync, E: std::error::Error + Send + Sync + 'static>
117 Service<Result<P, E>> for FileReader
118{
119 type Out = Result<Spread<Bytes, P>, Error<E>>;
120
121 fn handle(
122 self,
123 input: impl futures::Stream<Item = Result<P, E>> + Send,
124 ) -> impl futures::Stream<Item = Self::Out> + Send {
125 async_stream::stream! {
126 let mut input = pin!(input);
127 let mut buf = vec![0u8; self.chunk_size];
128
129 while let Some(res) = input.next().await {
130 match res {
131 Ok(path) => {
132 println!("path {}", path.as_ref().display());
133 match tokio::fs::File::open(&path).await {
134 Ok(mut file) => {
135 let shared = Arc::new(path);
136 loop {
137 yield match file.read(&mut buf[..]).await {
138 Ok(0) => break,
139 Ok(n) => Ok(Spread::new(buf[0..n].to_vec().into(), shared.clone())),
140 Err(err) => Err(err.into())
141 };
142 }
143 },
144 Err(err) => yield Err(err.into()),
145 }
146 }
147
148 Err(err) => yield Err(Error::Other(err)),
149 }
150 }
151 }
152 }
153}