Skip to main content

ecl_adapter_fs/
lib.rs

1//! Filesystem source adapter for the ECL pipeline runner.
2//!
3//! Provides `FilesystemAdapter`, which implements `SourceAdapter` by walking
4//! a local directory tree, applying extension and glob filters, and reading
5//! file contents with blake3 hashing.
6
7#![forbid(unsafe_code)]
8#![warn(missing_docs)]
9#![deny(clippy::unwrap_used)]
10#![warn(clippy::expect_used)]
11#![deny(clippy::panic)]
12
13mod error;
14
15pub use error::FsAdapterError;
16
17use async_trait::async_trait;
18use async_walkdir::WalkDir;
19use chrono::{DateTime, Utc};
20use futures::StreamExt;
21use glob::Pattern;
22use std::collections::BTreeMap;
23use std::path::{Path, PathBuf};
24use tracing::debug;
25
26use ecl_pipeline_spec::SourceSpec;
27use ecl_pipeline_spec::source::{FilesystemSourceSpec, FilterAction, FilterRule};
28use ecl_pipeline_state::{Blake3Hash, ItemProvenance};
29use ecl_pipeline_topo::error::{ResolveError, SourceError};
30use ecl_pipeline_topo::{ExtractedDocument, SourceAdapter, SourceItem};
31
32/// Filesystem source adapter.
33///
34/// Recursively walks a root directory, applying extension and glob filters
35/// during enumeration. Fetches file content and computes blake3 hashes.
36#[derive(Debug)]
37pub struct FilesystemAdapter {
38    /// Root directory to scan.
39    root: PathBuf,
40    /// File extensions to include (empty = all).
41    extensions: Vec<String>,
42    /// Compiled filter rules (include/exclude globs).
43    filters: Vec<CompiledFilter>,
44    /// Source name (for error reporting and provenance).
45    source_name: String,
46}
47
48/// A compiled filter rule with a pre-parsed glob pattern.
49#[derive(Debug)]
50struct CompiledFilter {
51    pattern: Pattern,
52    action: FilterAction,
53}
54
55impl FilesystemAdapter {
56    /// Create a new `FilesystemAdapter` from a `SourceSpec`.
57    ///
58    /// # Errors
59    ///
60    /// Returns `ResolveError::UnknownAdapter` if the spec is not a filesystem source.
61    /// Returns `ResolveError::Io` if a glob pattern is invalid.
62    pub fn from_spec(source_name: &str, spec: &SourceSpec) -> Result<Self, ResolveError> {
63        let fs_spec = match spec {
64            SourceSpec::Filesystem(fs) => fs,
65            _ => {
66                return Err(ResolveError::UnknownAdapter {
67                    stage: source_name.to_string(),
68                    adapter: "filesystem".to_string(),
69                });
70            }
71        };
72
73        Self::from_fs_spec(source_name, fs_spec)
74    }
75
76    /// Create a new `FilesystemAdapter` directly from a `FilesystemSourceSpec`.
77    ///
78    /// # Errors
79    ///
80    /// Returns `ResolveError::Io` if a glob pattern is invalid.
81    pub fn from_fs_spec(
82        source_name: &str,
83        spec: &FilesystemSourceSpec,
84    ) -> Result<Self, ResolveError> {
85        let filters = compile_filters(&spec.filters)?;
86
87        Ok(Self {
88            root: spec.root.clone(),
89            extensions: spec.extensions.clone(),
90            filters,
91            source_name: source_name.to_string(),
92        })
93    }
94
95    /// Check whether a path passes the extension filter.
96    fn matches_extension(&self, path: &Path) -> bool {
97        if self.extensions.is_empty() {
98            return true;
99        }
100        path.extension()
101            .and_then(|ext| ext.to_str())
102            .is_some_and(|ext| {
103                self.extensions
104                    .iter()
105                    .any(|allowed| allowed.eq_ignore_ascii_case(ext))
106            })
107    }
108
109    /// Evaluate filter rules against a path. Rules are evaluated in order;
110    /// the last matching rule wins. If no rule matches, the item is included.
111    fn passes_filters(&self, path: &str) -> bool {
112        let mut included = true;
113        for filter in &self.filters {
114            if filter.pattern.matches(path) {
115                included = filter.action == FilterAction::Include;
116            }
117        }
118        included
119    }
120
121    /// Compute a relative path string for filtering and identification.
122    fn relative_path(&self, abs_path: &Path) -> String {
123        abs_path
124            .strip_prefix(&self.root)
125            .unwrap_or(abs_path)
126            .to_string_lossy()
127            .into_owned()
128    }
129
130    /// Detect MIME type from file extension.
131    fn mime_from_extension(path: &Path) -> String {
132        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
133        match ext.to_ascii_lowercase().as_str() {
134            "md" | "markdown" => "text/markdown".to_string(),
135            "txt" => "text/plain".to_string(),
136            "html" | "htm" => "text/html".to_string(),
137            "json" => "application/json".to_string(),
138            "toml" => "application/toml".to_string(),
139            "yaml" | "yml" => "application/yaml".to_string(),
140            "pdf" => "application/pdf".to_string(),
141            "csv" => "text/csv".to_string(),
142            "xml" => "application/xml".to_string(),
143            "rs" => "text/x-rust".to_string(),
144            "py" => "text/x-python".to_string(),
145            "js" => "text/javascript".to_string(),
146            "ts" => "text/typescript".to_string(),
147            _ => "application/octet-stream".to_string(),
148        }
149    }
150}
151
152/// Compile filter rules into glob patterns.
153fn compile_filters(rules: &[FilterRule]) -> Result<Vec<CompiledFilter>, ResolveError> {
154    rules
155        .iter()
156        .map(|rule| {
157            let pattern = Pattern::new(&rule.pattern).map_err(|e| {
158                ResolveError::Io(std::io::Error::new(
159                    std::io::ErrorKind::InvalidInput,
160                    format!("invalid glob pattern '{}': {e}", rule.pattern),
161                ))
162            })?;
163            Ok(CompiledFilter {
164                pattern,
165                action: rule.action.clone(),
166            })
167        })
168        .collect()
169}
170
171#[async_trait]
172impl SourceAdapter for FilesystemAdapter {
173    fn source_kind(&self) -> &str {
174        "filesystem"
175    }
176
177    async fn enumerate(&self) -> Result<Vec<SourceItem>, SourceError> {
178        let mut items = Vec::new();
179        let mut walker = WalkDir::new(&self.root);
180
181        while let Some(entry) = walker.next().await {
182            let entry = entry.map_err(|e| SourceError::Permanent {
183                source_name: self.source_name.clone(),
184                message: format!("directory walk error: {e}"),
185            })?;
186
187            let path = entry.path();
188
189            // Skip directories
190            let metadata =
191                tokio::fs::metadata(&path)
192                    .await
193                    .map_err(|e| SourceError::Transient {
194                        source_name: self.source_name.clone(),
195                        message: format!("failed to read metadata for {}: {e}", path.display()),
196                    })?;
197
198            if metadata.is_dir() {
199                continue;
200            }
201
202            // Apply extension filter
203            if !self.matches_extension(&path) {
204                debug!(path = %path.display(), "skipped: extension filter");
205                continue;
206            }
207
208            // Apply glob filters
209            let rel_path = self.relative_path(&path);
210            if !self.passes_filters(&rel_path) {
211                debug!(path = %path.display(), "skipped: glob filter");
212                continue;
213            }
214
215            let modified_at: Option<DateTime<Utc>> =
216                metadata.modified().ok().map(DateTime::<Utc>::from);
217
218            let display_name = path
219                .file_name()
220                .map(|n| n.to_string_lossy().into_owned())
221                .unwrap_or_else(|| rel_path.clone());
222
223            let mime_type = Self::mime_from_extension(&path);
224
225            items.push(SourceItem {
226                id: rel_path.clone(),
227                display_name,
228                mime_type,
229                path: rel_path,
230                modified_at,
231                source_hash: None,
232            });
233        }
234
235        // Sort for deterministic ordering
236        items.sort_by(|a, b| a.id.cmp(&b.id));
237        Ok(items)
238    }
239
240    async fn fetch(&self, item: &SourceItem) -> Result<ExtractedDocument, SourceError> {
241        let abs_path = self.root.join(&item.path);
242
243        let content = tokio::fs::read(&abs_path).await.map_err(|e| {
244            if e.kind() == std::io::ErrorKind::NotFound {
245                SourceError::NotFound {
246                    source_name: self.source_name.clone(),
247                    item_id: item.id.clone(),
248                }
249            } else {
250                SourceError::Transient {
251                    source_name: self.source_name.clone(),
252                    message: format!("failed to read {}: {e}", abs_path.display()),
253                }
254            }
255        })?;
256
257        let content_hash = Blake3Hash::new(blake3::hash(&content).to_hex().as_str());
258
259        let metadata = tokio::fs::metadata(&abs_path).await.ok();
260        let source_modified = metadata
261            .and_then(|m| m.modified().ok())
262            .map(DateTime::<Utc>::from);
263
264        let mut prov_metadata = BTreeMap::new();
265        prov_metadata.insert(
266            "path".to_string(),
267            serde_json::Value::String(item.path.clone()),
268        );
269
270        let provenance = ItemProvenance {
271            source_kind: "filesystem".to_string(),
272            metadata: prov_metadata,
273            source_modified,
274            extracted_at: Utc::now(),
275        };
276
277        Ok(ExtractedDocument {
278            id: item.id.clone(),
279            display_name: item.display_name.clone(),
280            content,
281            mime_type: item.mime_type.clone(),
282            provenance,
283            content_hash,
284        })
285    }
286}
287
288#[cfg(test)]
289#[allow(clippy::unwrap_used)]
290mod tests {
291    use super::*;
292    use ecl_pipeline_spec::source::FilterRule;
293    use std::fs;
294    use tempfile::TempDir;
295
296    fn make_fs_spec(root: &Path) -> FilesystemSourceSpec {
297        FilesystemSourceSpec {
298            root: root.to_path_buf(),
299            filters: vec![],
300            extensions: vec![],
301            stream: None,
302        }
303    }
304
305    fn make_adapter(root: &Path) -> FilesystemAdapter {
306        FilesystemAdapter::from_fs_spec("test-source", &make_fs_spec(root)).unwrap()
307    }
308
309    fn create_test_files(dir: &Path) {
310        fs::write(dir.join("readme.md"), "# Hello").unwrap();
311        fs::write(dir.join("notes.txt"), "Some notes").unwrap();
312        fs::write(dir.join("data.json"), r#"{"key": "value"}"#).unwrap();
313        fs::create_dir_all(dir.join("sub")).unwrap();
314        fs::write(dir.join("sub/nested.md"), "# Nested").unwrap();
315        fs::write(dir.join("sub/image.png"), [0x89, 0x50, 0x4E, 0x47]).unwrap();
316    }
317
318    // ── Construction tests ─────────────────────────────────────────
319
320    #[test]
321    fn test_from_spec_filesystem_source() {
322        let spec = SourceSpec::Filesystem(FilesystemSourceSpec {
323            root: PathBuf::from("/tmp"),
324            filters: vec![],
325            extensions: vec![],
326            stream: None,
327        });
328        let adapter = FilesystemAdapter::from_spec("local", &spec).unwrap();
329        assert_eq!(adapter.source_kind(), "filesystem");
330        assert_eq!(adapter.root, PathBuf::from("/tmp"));
331    }
332
333    #[test]
334    fn test_from_spec_wrong_kind_returns_error() {
335        let spec = SourceSpec::Slack(ecl_pipeline_spec::source::SlackSourceSpec {
336            credentials: ecl_pipeline_spec::source::CredentialRef::ApplicationDefault,
337            channels: vec![],
338            thread_depth: 0,
339            modified_after: None,
340            stream: None,
341        });
342        let result = FilesystemAdapter::from_spec("local", &spec);
343        assert!(result.is_err());
344    }
345
346    #[test]
347    fn test_from_fs_spec_with_extensions() {
348        let spec = FilesystemSourceSpec {
349            root: PathBuf::from("/tmp"),
350            filters: vec![],
351            extensions: vec!["md".to_string(), "txt".to_string()],
352            stream: None,
353        };
354        let adapter = FilesystemAdapter::from_fs_spec("local", &spec).unwrap();
355        assert_eq!(adapter.extensions, vec!["md", "txt"]);
356    }
357
358    #[test]
359    fn test_from_fs_spec_with_filters() {
360        let spec = FilesystemSourceSpec {
361            root: PathBuf::from("/tmp"),
362            filters: vec![
363                FilterRule {
364                    pattern: "**/*.md".to_string(),
365                    action: FilterAction::Include,
366                },
367                FilterRule {
368                    pattern: "**/Archive/**".to_string(),
369                    action: FilterAction::Exclude,
370                },
371            ],
372            extensions: vec![],
373            stream: None,
374        };
375        let adapter = FilesystemAdapter::from_fs_spec("local", &spec).unwrap();
376        assert_eq!(adapter.filters.len(), 2);
377    }
378
379    #[test]
380    fn test_invalid_glob_pattern_returns_error() {
381        let spec = FilesystemSourceSpec {
382            root: PathBuf::from("/tmp"),
383            filters: vec![FilterRule {
384                pattern: "[invalid".to_string(),
385                action: FilterAction::Include,
386            }],
387            extensions: vec![],
388            stream: None,
389        };
390        let result = FilesystemAdapter::from_fs_spec("local", &spec);
391        assert!(result.is_err());
392    }
393
394    // ── Filter tests ───────────────────────────────────────────────
395
396    #[test]
397    fn test_matches_extension_empty_allows_all() {
398        let adapter = FilesystemAdapter {
399            root: PathBuf::from("/tmp"),
400            extensions: vec![],
401            filters: vec![],
402            source_name: "test".to_string(),
403        };
404        assert!(adapter.matches_extension(Path::new("file.md")));
405        assert!(adapter.matches_extension(Path::new("file.rs")));
406        assert!(adapter.matches_extension(Path::new("file")));
407    }
408
409    #[test]
410    fn test_matches_extension_filters_correctly() {
411        let adapter = FilesystemAdapter {
412            root: PathBuf::from("/tmp"),
413            extensions: vec!["md".to_string(), "txt".to_string()],
414            filters: vec![],
415            source_name: "test".to_string(),
416        };
417        assert!(adapter.matches_extension(Path::new("readme.md")));
418        assert!(adapter.matches_extension(Path::new("notes.txt")));
419        assert!(!adapter.matches_extension(Path::new("data.json")));
420        assert!(!adapter.matches_extension(Path::new("no_ext")));
421    }
422
423    #[test]
424    fn test_matches_extension_case_insensitive() {
425        let adapter = FilesystemAdapter {
426            root: PathBuf::from("/tmp"),
427            extensions: vec!["md".to_string()],
428            filters: vec![],
429            source_name: "test".to_string(),
430        };
431        assert!(adapter.matches_extension(Path::new("readme.MD")));
432        assert!(adapter.matches_extension(Path::new("readme.Md")));
433    }
434
435    #[test]
436    fn test_passes_filters_no_rules_includes_all() {
437        let adapter = FilesystemAdapter {
438            root: PathBuf::from("/tmp"),
439            extensions: vec![],
440            filters: vec![],
441            source_name: "test".to_string(),
442        };
443        assert!(adapter.passes_filters("any/path.md"));
444    }
445
446    #[test]
447    fn test_passes_filters_exclude_rule() {
448        let spec = FilesystemSourceSpec {
449            root: PathBuf::from("/tmp"),
450            filters: vec![FilterRule {
451                pattern: "**/Archive/**".to_string(),
452                action: FilterAction::Exclude,
453            }],
454            extensions: vec![],
455            stream: None,
456        };
457        let adapter = FilesystemAdapter::from_fs_spec("test", &spec).unwrap();
458        assert!(!adapter.passes_filters("Archive/old.md"));
459        assert!(adapter.passes_filters("docs/new.md"));
460    }
461
462    #[test]
463    fn test_passes_filters_last_rule_wins() {
464        let spec = FilesystemSourceSpec {
465            root: PathBuf::from("/tmp"),
466            filters: vec![
467                FilterRule {
468                    pattern: "**/*.md".to_string(),
469                    action: FilterAction::Exclude,
470                },
471                FilterRule {
472                    pattern: "**/important/*.md".to_string(),
473                    action: FilterAction::Include,
474                },
475            ],
476            extensions: vec![],
477            stream: None,
478        };
479        let adapter = FilesystemAdapter::from_fs_spec("test", &spec).unwrap();
480        // First rule excludes all .md, but second re-includes important/*.md
481        assert!(!adapter.passes_filters("docs/readme.md"));
482        assert!(adapter.passes_filters("important/readme.md"));
483    }
484
485    // ── MIME detection tests ───────────────────────────────────────
486
487    #[test]
488    fn test_mime_from_extension() {
489        assert_eq!(
490            FilesystemAdapter::mime_from_extension(Path::new("x.md")),
491            "text/markdown"
492        );
493        assert_eq!(
494            FilesystemAdapter::mime_from_extension(Path::new("x.txt")),
495            "text/plain"
496        );
497        assert_eq!(
498            FilesystemAdapter::mime_from_extension(Path::new("x.json")),
499            "application/json"
500        );
501        assert_eq!(
502            FilesystemAdapter::mime_from_extension(Path::new("x.pdf")),
503            "application/pdf"
504        );
505        assert_eq!(
506            FilesystemAdapter::mime_from_extension(Path::new("x.unknown")),
507            "application/octet-stream"
508        );
509        assert_eq!(
510            FilesystemAdapter::mime_from_extension(Path::new("no_ext")),
511            "application/octet-stream"
512        );
513    }
514
515    // ── Enumerate tests ────────────────────────────────────────────
516
517    #[tokio::test]
518    async fn test_enumerate_empty_directory() {
519        let tmp = TempDir::new().unwrap();
520        let adapter = make_adapter(tmp.path());
521        let items = adapter.enumerate().await.unwrap();
522        assert!(items.is_empty());
523    }
524
525    #[tokio::test]
526    async fn test_enumerate_finds_all_files() {
527        let tmp = TempDir::new().unwrap();
528        create_test_files(tmp.path());
529        let adapter = make_adapter(tmp.path());
530        let items = adapter.enumerate().await.unwrap();
531        assert_eq!(items.len(), 5);
532    }
533
534    #[tokio::test]
535    async fn test_enumerate_applies_extension_filter() {
536        let tmp = TempDir::new().unwrap();
537        create_test_files(tmp.path());
538        let spec = FilesystemSourceSpec {
539            root: tmp.path().to_path_buf(),
540            filters: vec![],
541            extensions: vec!["md".to_string()],
542            stream: None,
543        };
544        let adapter = FilesystemAdapter::from_fs_spec("test", &spec).unwrap();
545        let items = adapter.enumerate().await.unwrap();
546        assert_eq!(items.len(), 2); // readme.md and sub/nested.md
547        assert!(items.iter().all(|i| i.path.ends_with(".md")));
548    }
549
550    #[tokio::test]
551    async fn test_enumerate_applies_glob_filter() {
552        let tmp = TempDir::new().unwrap();
553        create_test_files(tmp.path());
554        let spec = FilesystemSourceSpec {
555            root: tmp.path().to_path_buf(),
556            filters: vec![FilterRule {
557                pattern: "sub/**".to_string(),
558                action: FilterAction::Exclude,
559            }],
560            extensions: vec![],
561            stream: None,
562        };
563        let adapter = FilesystemAdapter::from_fs_spec("test", &spec).unwrap();
564        let items = adapter.enumerate().await.unwrap();
565        // Should exclude sub/nested.md and sub/image.png
566        assert_eq!(items.len(), 3);
567        assert!(items.iter().all(|i| !i.path.starts_with("sub/")));
568    }
569
570    #[tokio::test]
571    async fn test_enumerate_returns_sorted_results() {
572        let tmp = TempDir::new().unwrap();
573        create_test_files(tmp.path());
574        let adapter = make_adapter(tmp.path());
575        let items = adapter.enumerate().await.unwrap();
576        let ids: Vec<&str> = items.iter().map(|i| i.id.as_str()).collect();
577        let mut sorted = ids.clone();
578        sorted.sort();
579        assert_eq!(ids, sorted);
580    }
581
582    #[tokio::test]
583    async fn test_enumerate_source_items_have_correct_fields() {
584        let tmp = TempDir::new().unwrap();
585        fs::write(tmp.path().join("test.md"), "# Test").unwrap();
586        let adapter = make_adapter(tmp.path());
587        let items = adapter.enumerate().await.unwrap();
588        assert_eq!(items.len(), 1);
589        let item = &items[0];
590        assert_eq!(item.id, "test.md");
591        assert_eq!(item.display_name, "test.md");
592        assert_eq!(item.mime_type, "text/markdown");
593        assert_eq!(item.path, "test.md");
594        assert!(item.modified_at.is_some());
595        assert!(item.source_hash.is_none());
596    }
597
598    // ── Fetch tests ────────────────────────────────────────────────
599
600    #[tokio::test]
601    async fn test_fetch_reads_content() {
602        let tmp = TempDir::new().unwrap();
603        let content = "# Hello World";
604        fs::write(tmp.path().join("test.md"), content).unwrap();
605        let adapter = make_adapter(tmp.path());
606        let items = adapter.enumerate().await.unwrap();
607        let doc = adapter.fetch(&items[0]).await.unwrap();
608        assert_eq!(doc.content, content.as_bytes());
609        assert_eq!(doc.id, "test.md");
610        assert_eq!(doc.display_name, "test.md");
611        assert_eq!(doc.mime_type, "text/markdown");
612    }
613
614    #[tokio::test]
615    async fn test_fetch_computes_blake3_hash() {
616        let tmp = TempDir::new().unwrap();
617        let content = b"hash me";
618        fs::write(tmp.path().join("test.txt"), content).unwrap();
619        let adapter = make_adapter(tmp.path());
620        let items = adapter.enumerate().await.unwrap();
621        let doc = adapter.fetch(&items[0]).await.unwrap();
622        let expected = blake3::hash(content).to_hex().to_string();
623        assert_eq!(doc.content_hash.as_str(), expected);
624    }
625
626    #[tokio::test]
627    async fn test_fetch_includes_provenance() {
628        let tmp = TempDir::new().unwrap();
629        fs::write(tmp.path().join("test.md"), "content").unwrap();
630        let adapter = make_adapter(tmp.path());
631        let items = adapter.enumerate().await.unwrap();
632        let doc = adapter.fetch(&items[0]).await.unwrap();
633        assert_eq!(doc.provenance.source_kind, "filesystem");
634        assert!(doc.provenance.metadata.contains_key("path"));
635        assert!(doc.provenance.source_modified.is_some());
636    }
637
638    #[tokio::test]
639    async fn test_fetch_not_found_returns_error() {
640        let tmp = TempDir::new().unwrap();
641        let adapter = make_adapter(tmp.path());
642        let fake_item = SourceItem {
643            id: "nonexistent.txt".to_string(),
644            display_name: "nonexistent.txt".to_string(),
645            mime_type: "text/plain".to_string(),
646            path: "nonexistent.txt".to_string(),
647            modified_at: None,
648            source_hash: None,
649        };
650        let result = adapter.fetch(&fake_item).await;
651        assert!(result.is_err());
652        assert!(matches!(result.unwrap_err(), SourceError::NotFound { .. }));
653    }
654
655    // ── Relative path tests ────────────────────────────────────────
656
657    #[test]
658    fn test_relative_path() {
659        let adapter = FilesystemAdapter {
660            root: PathBuf::from("/data/root"),
661            extensions: vec![],
662            filters: vec![],
663            source_name: "test".to_string(),
664        };
665        assert_eq!(
666            adapter.relative_path(Path::new("/data/root/sub/file.md")),
667            "sub/file.md"
668        );
669        assert_eq!(
670            adapter.relative_path(Path::new("/data/root/file.md")),
671            "file.md"
672        );
673    }
674
675    // ── Compile filters test ───────────────────────────────────────
676
677    #[test]
678    fn test_compile_filters_valid_patterns() {
679        let rules = vec![
680            FilterRule {
681                pattern: "**/*.md".to_string(),
682                action: FilterAction::Include,
683            },
684            FilterRule {
685                pattern: "Archive/**".to_string(),
686                action: FilterAction::Exclude,
687            },
688        ];
689        let compiled = compile_filters(&rules).unwrap();
690        assert_eq!(compiled.len(), 2);
691    }
692
693    #[test]
694    fn test_compile_filters_invalid_pattern() {
695        let rules = vec![FilterRule {
696            pattern: "[bad".to_string(),
697            action: FilterAction::Include,
698        }];
699        assert!(compile_filters(&rules).is_err());
700    }
701}