Skip to main content

recoco_core/ops/sources/
local_file.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use async_stream::try_stream;
14use std::borrow::Cow;
15use std::fs::Metadata;
16use std::path::Path;
17use std::{path::PathBuf, sync::Arc};
18use tracing::warn;
19
20use super::shared::pattern_matcher::PatternMatcher;
21use crate::base::field_attrs;
22use crate::{fields_value, ops::sdk::*};
23
24#[derive(Debug, Serialize, Deserialize)]
25pub struct Spec {
26    path: String,
27    binary: bool,
28    included_patterns: Option<Vec<String>>,
29    excluded_patterns: Option<Vec<String>>,
30    max_file_size: Option<i64>,
31}
32
33struct Executor {
34    root_path: PathBuf,
35    binary: bool,
36    pattern_matcher: PatternMatcher,
37    max_file_size: Option<i64>,
38}
39
40async fn ensure_metadata<'a>(
41    path: &Path,
42    metadata: &'a mut Option<Metadata>,
43) -> std::io::Result<&'a Metadata> {
44    if metadata.is_none() {
45        // Follow symlinks.
46        *metadata = Some(tokio::fs::metadata(path).await?);
47    }
48    Ok(metadata.as_ref().unwrap())
49}
50
51#[async_trait]
52impl SourceExecutor for Executor {
53    async fn list(
54        &self,
55        options: &SourceExecutorReadOptions,
56    ) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRow>>>> {
57        let root_component_size = self.root_path.components().count();
58        let mut dirs = Vec::new();
59        dirs.push(Cow::Borrowed(&self.root_path));
60        let mut new_dirs = Vec::new();
61        let stream = try_stream! {
62            while let Some(dir) = dirs.pop() {
63                let mut entries = tokio::fs::read_dir(dir.as_ref()).await?;
64                while let Some(entry) = entries.next_entry().await? {
65                    let path = entry.path();
66                    let mut path_components = path.components();
67                    for _ in 0..root_component_size {
68                        path_components.next();
69                    }
70                    let Some(relative_path) = path_components.as_path().to_str() else {
71                        warn!("Skipped ill-formed file path: {}", path.display());
72                        continue;
73                    };
74                    // We stat per entry at most once when needed.
75                    let mut metadata: Option<Metadata> = None;
76
77                    // For symlinks, if the target doesn't exist, log and skip.
78                    let file_type = entry.file_type().await?;
79                    if file_type.is_symlink()
80                        && let Err(e) = ensure_metadata(&path, &mut metadata).await {
81                            if e.kind() == std::io::ErrorKind::NotFound {
82                                warn!("Skipped broken symlink: {}", path.display());
83                                continue;
84                            }
85                            Err(e)?;
86                        }
87                    let is_dir = if file_type.is_dir() {
88                        true
89                    } else if file_type.is_symlink() {
90                        // Follow symlinks to classify the target.
91                        ensure_metadata(&path, &mut metadata).await?.is_dir()
92                    } else {
93                        false
94                    };
95                    if is_dir {
96                        if !self.pattern_matcher.is_excluded(relative_path) {
97                            new_dirs.push(Cow::Owned(path));
98                        }
99                    } else if self.pattern_matcher.is_file_included(relative_path) {
100                        // Check file size limit
101                        if let Some(max_size) = self.max_file_size
102                            && let Ok(metadata) = ensure_metadata(&path, &mut metadata).await
103                            && metadata.len() > max_size as u64
104                        {
105                            continue;
106                        }
107                        let ordinal: Option<Ordinal> = if options.include_ordinal {
108                            let metadata = ensure_metadata(&path, &mut metadata).await?;
109                            Some(metadata.modified()?.try_into()?)
110                        } else {
111                            None
112                        };
113                        yield vec![PartialSourceRow {
114                            key: KeyValue::from_single_part(relative_path.to_string()),
115                            key_aux_info: serde_json::Value::Null,
116                            data: PartialSourceRowData {
117                                ordinal,
118                                content_version_fp: None,
119                                value: None,
120                            },
121                        }];
122                    }
123                }
124                dirs.extend(new_dirs.drain(..).rev());
125            }
126        };
127        Ok(stream.boxed())
128    }
129
130    async fn get_value(
131        &self,
132        key: &KeyValue,
133        _key_aux_info: &serde_json::Value,
134        options: &SourceExecutorReadOptions,
135    ) -> Result<PartialSourceRowData> {
136        let path = key.single_part()?.str_value()?.as_ref();
137        if !self.pattern_matcher.is_file_included(path) {
138            return Ok(PartialSourceRowData {
139                value: Some(SourceValue::NonExistence),
140                ordinal: Some(Ordinal::unavailable()),
141                content_version_fp: None,
142            });
143        }
144        let path = self.root_path.join(path);
145        let mut metadata: Option<Metadata> = None;
146        // Check file size limit
147        if let Some(max_size) = self.max_file_size
148            && let Ok(metadata) = ensure_metadata(&path, &mut metadata).await
149            && metadata.len() > max_size as u64
150        {
151            return Ok(PartialSourceRowData {
152                value: Some(SourceValue::NonExistence),
153                ordinal: Some(Ordinal::unavailable()),
154                content_version_fp: None,
155            });
156        }
157        let ordinal = if options.include_ordinal {
158            let metadata = ensure_metadata(&path, &mut metadata).await?;
159            Some(metadata.modified()?.try_into()?)
160        } else {
161            None
162        };
163        let value = if options.include_value {
164            match std::fs::read(path) {
165                Ok(content) => {
166                    let content = if self.binary {
167                        fields_value!(content)
168                    } else {
169                        let (s, _) = utils::bytes_decode::bytes_to_string(&content);
170                        fields_value!(s)
171                    };
172                    Some(SourceValue::Existence(content))
173                }
174                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
175                    Some(SourceValue::NonExistence)
176                }
177                Err(e) => Err(e)?,
178            }
179        } else {
180            None
181        };
182        Ok(PartialSourceRowData {
183            value,
184            ordinal,
185            content_version_fp: None,
186        })
187    }
188
189    fn provides_ordinal(&self) -> bool {
190        true
191    }
192}
193
194pub struct Factory;
195
196#[async_trait]
197impl SourceFactoryBase for Factory {
198    type Spec = Spec;
199
200    fn name(&self) -> &str {
201        "LocalFile"
202    }
203
204    async fn get_output_schema(
205        &self,
206        spec: &Spec,
207        _context: &FlowInstanceContext,
208    ) -> Result<EnrichedValueType> {
209        let mut struct_schema = StructSchema::default();
210        let mut schema_builder = StructSchemaBuilder::new(&mut struct_schema);
211        let filename_field = schema_builder.add_field(FieldSchema::new(
212            "filename",
213            make_output_type(BasicValueType::Str),
214        ));
215        schema_builder.add_field(FieldSchema::new(
216            "content",
217            make_output_type(if spec.binary {
218                BasicValueType::Bytes
219            } else {
220                BasicValueType::Str
221            })
222            .with_attr(
223                field_attrs::CONTENT_FILENAME,
224                serde_json::to_value(filename_field.to_field_ref())?,
225            ),
226        ));
227
228        Ok(make_output_type(TableSchema::new(
229            TableKind::KTable(KTableInfo { num_key_parts: 1 }),
230            struct_schema,
231        )))
232    }
233
234    async fn build_executor(
235        self: Arc<Self>,
236        _source_name: &str,
237        spec: Spec,
238        _context: Arc<FlowInstanceContext>,
239    ) -> Result<Box<dyn SourceExecutor>> {
240        Ok(Box::new(Executor {
241            root_path: PathBuf::from(spec.path),
242            binary: spec.binary,
243            pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?,
244            max_file_size: spec.max_file_size,
245        }))
246    }
247}