recoco_core/ops/sources/
local_file.rs1use async_stream::try_stream;
14use std::borrow::Cow;
15use std::fs::Metadata;
16use std::path::Path;
17use std::{path::PathBuf, sync::Arc};
18use tracing::warn;
19
20use super::shared::pattern_matcher::PatternMatcher;
21use crate::base::field_attrs;
22use crate::{fields_value, ops::sdk::*};
23
24#[derive(Debug, Serialize, Deserialize)]
25pub struct Spec {
26 path: String,
27 binary: bool,
28 included_patterns: Option<Vec<String>>,
29 excluded_patterns: Option<Vec<String>>,
30 max_file_size: Option<i64>,
31}
32
33struct Executor {
34 root_path: PathBuf,
35 binary: bool,
36 pattern_matcher: PatternMatcher,
37 max_file_size: Option<i64>,
38}
39
40async fn ensure_metadata<'a>(
41 path: &Path,
42 metadata: &'a mut Option<Metadata>,
43) -> std::io::Result<&'a Metadata> {
44 if metadata.is_none() {
45 *metadata = Some(tokio::fs::metadata(path).await?);
47 }
48 Ok(metadata.as_ref().unwrap())
49}
50
51#[async_trait]
52impl SourceExecutor for Executor {
53 async fn list(
54 &self,
55 options: &SourceExecutorReadOptions,
56 ) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRow>>>> {
57 let root_component_size = self.root_path.components().count();
58 let mut dirs = Vec::new();
59 dirs.push(Cow::Borrowed(&self.root_path));
60 let mut new_dirs = Vec::new();
61 let stream = try_stream! {
62 while let Some(dir) = dirs.pop() {
63 let mut entries = tokio::fs::read_dir(dir.as_ref()).await?;
64 while let Some(entry) = entries.next_entry().await? {
65 let path = entry.path();
66 let mut path_components = path.components();
67 for _ in 0..root_component_size {
68 path_components.next();
69 }
70 let Some(relative_path) = path_components.as_path().to_str() else {
71 warn!("Skipped ill-formed file path: {}", path.display());
72 continue;
73 };
74 let mut metadata: Option<Metadata> = None;
76
77 let file_type = entry.file_type().await?;
79 if file_type.is_symlink()
80 && let Err(e) = ensure_metadata(&path, &mut metadata).await {
81 if e.kind() == std::io::ErrorKind::NotFound {
82 warn!("Skipped broken symlink: {}", path.display());
83 continue;
84 }
85 Err(e)?;
86 }
87 let is_dir = if file_type.is_dir() {
88 true
89 } else if file_type.is_symlink() {
90 ensure_metadata(&path, &mut metadata).await?.is_dir()
92 } else {
93 false
94 };
95 if is_dir {
96 if !self.pattern_matcher.is_excluded(relative_path) {
97 new_dirs.push(Cow::Owned(path));
98 }
99 } else if self.pattern_matcher.is_file_included(relative_path) {
100 if let Some(max_size) = self.max_file_size
102 && let Ok(metadata) = ensure_metadata(&path, &mut metadata).await
103 && metadata.len() > max_size as u64
104 {
105 continue;
106 }
107 let ordinal: Option<Ordinal> = if options.include_ordinal {
108 let metadata = ensure_metadata(&path, &mut metadata).await?;
109 Some(metadata.modified()?.try_into()?)
110 } else {
111 None
112 };
113 yield vec![PartialSourceRow {
114 key: KeyValue::from_single_part(relative_path.to_string()),
115 key_aux_info: serde_json::Value::Null,
116 data: PartialSourceRowData {
117 ordinal,
118 content_version_fp: None,
119 value: None,
120 },
121 }];
122 }
123 }
124 dirs.extend(new_dirs.drain(..).rev());
125 }
126 };
127 Ok(stream.boxed())
128 }
129
130 async fn get_value(
131 &self,
132 key: &KeyValue,
133 _key_aux_info: &serde_json::Value,
134 options: &SourceExecutorReadOptions,
135 ) -> Result<PartialSourceRowData> {
136 let path = key.single_part()?.str_value()?.as_ref();
137 if !self.pattern_matcher.is_file_included(path) {
138 return Ok(PartialSourceRowData {
139 value: Some(SourceValue::NonExistence),
140 ordinal: Some(Ordinal::unavailable()),
141 content_version_fp: None,
142 });
143 }
144 let path = self.root_path.join(path);
145 let mut metadata: Option<Metadata> = None;
146 if let Some(max_size) = self.max_file_size
148 && let Ok(metadata) = ensure_metadata(&path, &mut metadata).await
149 && metadata.len() > max_size as u64
150 {
151 return Ok(PartialSourceRowData {
152 value: Some(SourceValue::NonExistence),
153 ordinal: Some(Ordinal::unavailable()),
154 content_version_fp: None,
155 });
156 }
157 let ordinal = if options.include_ordinal {
158 let metadata = ensure_metadata(&path, &mut metadata).await?;
159 Some(metadata.modified()?.try_into()?)
160 } else {
161 None
162 };
163 let value = if options.include_value {
164 match std::fs::read(path) {
165 Ok(content) => {
166 let content = if self.binary {
167 fields_value!(content)
168 } else {
169 let (s, _) = utils::bytes_decode::bytes_to_string(&content);
170 fields_value!(s)
171 };
172 Some(SourceValue::Existence(content))
173 }
174 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
175 Some(SourceValue::NonExistence)
176 }
177 Err(e) => Err(e)?,
178 }
179 } else {
180 None
181 };
182 Ok(PartialSourceRowData {
183 value,
184 ordinal,
185 content_version_fp: None,
186 })
187 }
188
189 fn provides_ordinal(&self) -> bool {
190 true
191 }
192}
193
194pub struct Factory;
195
196#[async_trait]
197impl SourceFactoryBase for Factory {
198 type Spec = Spec;
199
200 fn name(&self) -> &str {
201 "LocalFile"
202 }
203
204 async fn get_output_schema(
205 &self,
206 spec: &Spec,
207 _context: &FlowInstanceContext,
208 ) -> Result<EnrichedValueType> {
209 let mut struct_schema = StructSchema::default();
210 let mut schema_builder = StructSchemaBuilder::new(&mut struct_schema);
211 let filename_field = schema_builder.add_field(FieldSchema::new(
212 "filename",
213 make_output_type(BasicValueType::Str),
214 ));
215 schema_builder.add_field(FieldSchema::new(
216 "content",
217 make_output_type(if spec.binary {
218 BasicValueType::Bytes
219 } else {
220 BasicValueType::Str
221 })
222 .with_attr(
223 field_attrs::CONTENT_FILENAME,
224 serde_json::to_value(filename_field.to_field_ref())?,
225 ),
226 ));
227
228 Ok(make_output_type(TableSchema::new(
229 TableKind::KTable(KTableInfo { num_key_parts: 1 }),
230 struct_schema,
231 )))
232 }
233
234 async fn build_executor(
235 self: Arc<Self>,
236 _source_name: &str,
237 spec: Spec,
238 _context: Arc<FlowInstanceContext>,
239 ) -> Result<Box<dyn SourceExecutor>> {
240 Ok(Box::new(Executor {
241 root_path: PathBuf::from(spec.path),
242 binary: spec.binary,
243 pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?,
244 max_file_size: spec.max_file_size,
245 }))
246 }
247}