1#![forbid(unsafe_code)]
8#![warn(missing_docs)]
9#![deny(clippy::unwrap_used)]
10#![warn(clippy::expect_used)]
11#![deny(clippy::panic)]
12
13mod error;
14
15pub use error::FsAdapterError;
16
17use async_trait::async_trait;
18use async_walkdir::WalkDir;
19use chrono::{DateTime, Utc};
20use futures::StreamExt;
21use glob::Pattern;
22use std::collections::BTreeMap;
23use std::path::{Path, PathBuf};
24use tracing::debug;
25
26use ecl_pipeline_spec::SourceSpec;
27use ecl_pipeline_spec::source::{FilesystemSourceSpec, FilterAction, FilterRule};
28use ecl_pipeline_state::{Blake3Hash, ItemProvenance};
29use ecl_pipeline_topo::error::{ResolveError, SourceError};
30use ecl_pipeline_topo::{ExtractedDocument, SourceAdapter, SourceItem};
31
32#[derive(Debug)]
37pub struct FilesystemAdapter {
38 root: PathBuf,
40 extensions: Vec<String>,
42 filters: Vec<CompiledFilter>,
44 source_name: String,
46}
47
48#[derive(Debug)]
50struct CompiledFilter {
51 pattern: Pattern,
52 action: FilterAction,
53}
54
55impl FilesystemAdapter {
56 pub fn from_spec(source_name: &str, spec: &SourceSpec) -> Result<Self, ResolveError> {
63 let fs_spec = match spec {
64 SourceSpec::Filesystem(fs) => fs,
65 _ => {
66 return Err(ResolveError::UnknownAdapter {
67 stage: source_name.to_string(),
68 adapter: "filesystem".to_string(),
69 });
70 }
71 };
72
73 Self::from_fs_spec(source_name, fs_spec)
74 }
75
76 pub fn from_fs_spec(
82 source_name: &str,
83 spec: &FilesystemSourceSpec,
84 ) -> Result<Self, ResolveError> {
85 let filters = compile_filters(&spec.filters)?;
86
87 Ok(Self {
88 root: spec.root.clone(),
89 extensions: spec.extensions.clone(),
90 filters,
91 source_name: source_name.to_string(),
92 })
93 }
94
95 fn matches_extension(&self, path: &Path) -> bool {
97 if self.extensions.is_empty() {
98 return true;
99 }
100 path.extension()
101 .and_then(|ext| ext.to_str())
102 .is_some_and(|ext| {
103 self.extensions
104 .iter()
105 .any(|allowed| allowed.eq_ignore_ascii_case(ext))
106 })
107 }
108
109 fn passes_filters(&self, path: &str) -> bool {
112 let mut included = true;
113 for filter in &self.filters {
114 if filter.pattern.matches(path) {
115 included = filter.action == FilterAction::Include;
116 }
117 }
118 included
119 }
120
121 fn relative_path(&self, abs_path: &Path) -> String {
123 abs_path
124 .strip_prefix(&self.root)
125 .unwrap_or(abs_path)
126 .to_string_lossy()
127 .into_owned()
128 }
129
130 fn mime_from_extension(path: &Path) -> String {
132 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
133 match ext.to_ascii_lowercase().as_str() {
134 "md" | "markdown" => "text/markdown".to_string(),
135 "txt" => "text/plain".to_string(),
136 "html" | "htm" => "text/html".to_string(),
137 "json" => "application/json".to_string(),
138 "toml" => "application/toml".to_string(),
139 "yaml" | "yml" => "application/yaml".to_string(),
140 "pdf" => "application/pdf".to_string(),
141 "csv" => "text/csv".to_string(),
142 "xml" => "application/xml".to_string(),
143 "rs" => "text/x-rust".to_string(),
144 "py" => "text/x-python".to_string(),
145 "js" => "text/javascript".to_string(),
146 "ts" => "text/typescript".to_string(),
147 _ => "application/octet-stream".to_string(),
148 }
149 }
150}
151
152fn compile_filters(rules: &[FilterRule]) -> Result<Vec<CompiledFilter>, ResolveError> {
154 rules
155 .iter()
156 .map(|rule| {
157 let pattern = Pattern::new(&rule.pattern).map_err(|e| {
158 ResolveError::Io(std::io::Error::new(
159 std::io::ErrorKind::InvalidInput,
160 format!("invalid glob pattern '{}': {e}", rule.pattern),
161 ))
162 })?;
163 Ok(CompiledFilter {
164 pattern,
165 action: rule.action.clone(),
166 })
167 })
168 .collect()
169}
170
171#[async_trait]
172impl SourceAdapter for FilesystemAdapter {
173 fn source_kind(&self) -> &str {
174 "filesystem"
175 }
176
177 async fn enumerate(&self) -> Result<Vec<SourceItem>, SourceError> {
178 let mut items = Vec::new();
179 let mut walker = WalkDir::new(&self.root);
180
181 while let Some(entry) = walker.next().await {
182 let entry = entry.map_err(|e| SourceError::Permanent {
183 source_name: self.source_name.clone(),
184 message: format!("directory walk error: {e}"),
185 })?;
186
187 let path = entry.path();
188
189 let metadata =
191 tokio::fs::metadata(&path)
192 .await
193 .map_err(|e| SourceError::Transient {
194 source_name: self.source_name.clone(),
195 message: format!("failed to read metadata for {}: {e}", path.display()),
196 })?;
197
198 if metadata.is_dir() {
199 continue;
200 }
201
202 if !self.matches_extension(&path) {
204 debug!(path = %path.display(), "skipped: extension filter");
205 continue;
206 }
207
208 let rel_path = self.relative_path(&path);
210 if !self.passes_filters(&rel_path) {
211 debug!(path = %path.display(), "skipped: glob filter");
212 continue;
213 }
214
215 let modified_at: Option<DateTime<Utc>> =
216 metadata.modified().ok().map(DateTime::<Utc>::from);
217
218 let display_name = path
219 .file_name()
220 .map(|n| n.to_string_lossy().into_owned())
221 .unwrap_or_else(|| rel_path.clone());
222
223 let mime_type = Self::mime_from_extension(&path);
224
225 items.push(SourceItem {
226 id: rel_path.clone(),
227 display_name,
228 mime_type,
229 path: rel_path,
230 modified_at,
231 source_hash: None,
232 });
233 }
234
235 items.sort_by(|a, b| a.id.cmp(&b.id));
237 Ok(items)
238 }
239
240 async fn fetch(&self, item: &SourceItem) -> Result<ExtractedDocument, SourceError> {
241 let abs_path = self.root.join(&item.path);
242
243 let content = tokio::fs::read(&abs_path).await.map_err(|e| {
244 if e.kind() == std::io::ErrorKind::NotFound {
245 SourceError::NotFound {
246 source_name: self.source_name.clone(),
247 item_id: item.id.clone(),
248 }
249 } else {
250 SourceError::Transient {
251 source_name: self.source_name.clone(),
252 message: format!("failed to read {}: {e}", abs_path.display()),
253 }
254 }
255 })?;
256
257 let content_hash = Blake3Hash::new(blake3::hash(&content).to_hex().as_str());
258
259 let metadata = tokio::fs::metadata(&abs_path).await.ok();
260 let source_modified = metadata
261 .and_then(|m| m.modified().ok())
262 .map(DateTime::<Utc>::from);
263
264 let mut prov_metadata = BTreeMap::new();
265 prov_metadata.insert(
266 "path".to_string(),
267 serde_json::Value::String(item.path.clone()),
268 );
269
270 let provenance = ItemProvenance {
271 source_kind: "filesystem".to_string(),
272 metadata: prov_metadata,
273 source_modified,
274 extracted_at: Utc::now(),
275 };
276
277 Ok(ExtractedDocument {
278 id: item.id.clone(),
279 display_name: item.display_name.clone(),
280 content,
281 mime_type: item.mime_type.clone(),
282 provenance,
283 content_hash,
284 })
285 }
286}
287
288#[cfg(test)]
289#[allow(clippy::unwrap_used)]
290mod tests {
291 use super::*;
292 use ecl_pipeline_spec::source::FilterRule;
293 use std::fs;
294 use tempfile::TempDir;
295
296 fn make_fs_spec(root: &Path) -> FilesystemSourceSpec {
297 FilesystemSourceSpec {
298 root: root.to_path_buf(),
299 filters: vec![],
300 extensions: vec![],
301 stream: None,
302 }
303 }
304
305 fn make_adapter(root: &Path) -> FilesystemAdapter {
306 FilesystemAdapter::from_fs_spec("test-source", &make_fs_spec(root)).unwrap()
307 }
308
309 fn create_test_files(dir: &Path) {
310 fs::write(dir.join("readme.md"), "# Hello").unwrap();
311 fs::write(dir.join("notes.txt"), "Some notes").unwrap();
312 fs::write(dir.join("data.json"), r#"{"key": "value"}"#).unwrap();
313 fs::create_dir_all(dir.join("sub")).unwrap();
314 fs::write(dir.join("sub/nested.md"), "# Nested").unwrap();
315 fs::write(dir.join("sub/image.png"), [0x89, 0x50, 0x4E, 0x47]).unwrap();
316 }
317
318 #[test]
321 fn test_from_spec_filesystem_source() {
322 let spec = SourceSpec::Filesystem(FilesystemSourceSpec {
323 root: PathBuf::from("/tmp"),
324 filters: vec![],
325 extensions: vec![],
326 stream: None,
327 });
328 let adapter = FilesystemAdapter::from_spec("local", &spec).unwrap();
329 assert_eq!(adapter.source_kind(), "filesystem");
330 assert_eq!(adapter.root, PathBuf::from("/tmp"));
331 }
332
333 #[test]
334 fn test_from_spec_wrong_kind_returns_error() {
335 let spec = SourceSpec::Slack(ecl_pipeline_spec::source::SlackSourceSpec {
336 credentials: ecl_pipeline_spec::source::CredentialRef::ApplicationDefault,
337 channels: vec![],
338 thread_depth: 0,
339 modified_after: None,
340 stream: None,
341 });
342 let result = FilesystemAdapter::from_spec("local", &spec);
343 assert!(result.is_err());
344 }
345
346 #[test]
347 fn test_from_fs_spec_with_extensions() {
348 let spec = FilesystemSourceSpec {
349 root: PathBuf::from("/tmp"),
350 filters: vec![],
351 extensions: vec!["md".to_string(), "txt".to_string()],
352 stream: None,
353 };
354 let adapter = FilesystemAdapter::from_fs_spec("local", &spec).unwrap();
355 assert_eq!(adapter.extensions, vec!["md", "txt"]);
356 }
357
358 #[test]
359 fn test_from_fs_spec_with_filters() {
360 let spec = FilesystemSourceSpec {
361 root: PathBuf::from("/tmp"),
362 filters: vec![
363 FilterRule {
364 pattern: "**/*.md".to_string(),
365 action: FilterAction::Include,
366 },
367 FilterRule {
368 pattern: "**/Archive/**".to_string(),
369 action: FilterAction::Exclude,
370 },
371 ],
372 extensions: vec![],
373 stream: None,
374 };
375 let adapter = FilesystemAdapter::from_fs_spec("local", &spec).unwrap();
376 assert_eq!(adapter.filters.len(), 2);
377 }
378
379 #[test]
380 fn test_invalid_glob_pattern_returns_error() {
381 let spec = FilesystemSourceSpec {
382 root: PathBuf::from("/tmp"),
383 filters: vec![FilterRule {
384 pattern: "[invalid".to_string(),
385 action: FilterAction::Include,
386 }],
387 extensions: vec![],
388 stream: None,
389 };
390 let result = FilesystemAdapter::from_fs_spec("local", &spec);
391 assert!(result.is_err());
392 }
393
394 #[test]
397 fn test_matches_extension_empty_allows_all() {
398 let adapter = FilesystemAdapter {
399 root: PathBuf::from("/tmp"),
400 extensions: vec![],
401 filters: vec![],
402 source_name: "test".to_string(),
403 };
404 assert!(adapter.matches_extension(Path::new("file.md")));
405 assert!(adapter.matches_extension(Path::new("file.rs")));
406 assert!(adapter.matches_extension(Path::new("file")));
407 }
408
409 #[test]
410 fn test_matches_extension_filters_correctly() {
411 let adapter = FilesystemAdapter {
412 root: PathBuf::from("/tmp"),
413 extensions: vec!["md".to_string(), "txt".to_string()],
414 filters: vec![],
415 source_name: "test".to_string(),
416 };
417 assert!(adapter.matches_extension(Path::new("readme.md")));
418 assert!(adapter.matches_extension(Path::new("notes.txt")));
419 assert!(!adapter.matches_extension(Path::new("data.json")));
420 assert!(!adapter.matches_extension(Path::new("no_ext")));
421 }
422
423 #[test]
424 fn test_matches_extension_case_insensitive() {
425 let adapter = FilesystemAdapter {
426 root: PathBuf::from("/tmp"),
427 extensions: vec!["md".to_string()],
428 filters: vec![],
429 source_name: "test".to_string(),
430 };
431 assert!(adapter.matches_extension(Path::new("readme.MD")));
432 assert!(adapter.matches_extension(Path::new("readme.Md")));
433 }
434
435 #[test]
436 fn test_passes_filters_no_rules_includes_all() {
437 let adapter = FilesystemAdapter {
438 root: PathBuf::from("/tmp"),
439 extensions: vec![],
440 filters: vec![],
441 source_name: "test".to_string(),
442 };
443 assert!(adapter.passes_filters("any/path.md"));
444 }
445
446 #[test]
447 fn test_passes_filters_exclude_rule() {
448 let spec = FilesystemSourceSpec {
449 root: PathBuf::from("/tmp"),
450 filters: vec![FilterRule {
451 pattern: "**/Archive/**".to_string(),
452 action: FilterAction::Exclude,
453 }],
454 extensions: vec![],
455 stream: None,
456 };
457 let adapter = FilesystemAdapter::from_fs_spec("test", &spec).unwrap();
458 assert!(!adapter.passes_filters("Archive/old.md"));
459 assert!(adapter.passes_filters("docs/new.md"));
460 }
461
462 #[test]
463 fn test_passes_filters_last_rule_wins() {
464 let spec = FilesystemSourceSpec {
465 root: PathBuf::from("/tmp"),
466 filters: vec![
467 FilterRule {
468 pattern: "**/*.md".to_string(),
469 action: FilterAction::Exclude,
470 },
471 FilterRule {
472 pattern: "**/important/*.md".to_string(),
473 action: FilterAction::Include,
474 },
475 ],
476 extensions: vec![],
477 stream: None,
478 };
479 let adapter = FilesystemAdapter::from_fs_spec("test", &spec).unwrap();
480 assert!(!adapter.passes_filters("docs/readme.md"));
482 assert!(adapter.passes_filters("important/readme.md"));
483 }
484
485 #[test]
488 fn test_mime_from_extension() {
489 assert_eq!(
490 FilesystemAdapter::mime_from_extension(Path::new("x.md")),
491 "text/markdown"
492 );
493 assert_eq!(
494 FilesystemAdapter::mime_from_extension(Path::new("x.txt")),
495 "text/plain"
496 );
497 assert_eq!(
498 FilesystemAdapter::mime_from_extension(Path::new("x.json")),
499 "application/json"
500 );
501 assert_eq!(
502 FilesystemAdapter::mime_from_extension(Path::new("x.pdf")),
503 "application/pdf"
504 );
505 assert_eq!(
506 FilesystemAdapter::mime_from_extension(Path::new("x.unknown")),
507 "application/octet-stream"
508 );
509 assert_eq!(
510 FilesystemAdapter::mime_from_extension(Path::new("no_ext")),
511 "application/octet-stream"
512 );
513 }
514
515 #[tokio::test]
518 async fn test_enumerate_empty_directory() {
519 let tmp = TempDir::new().unwrap();
520 let adapter = make_adapter(tmp.path());
521 let items = adapter.enumerate().await.unwrap();
522 assert!(items.is_empty());
523 }
524
525 #[tokio::test]
526 async fn test_enumerate_finds_all_files() {
527 let tmp = TempDir::new().unwrap();
528 create_test_files(tmp.path());
529 let adapter = make_adapter(tmp.path());
530 let items = adapter.enumerate().await.unwrap();
531 assert_eq!(items.len(), 5);
532 }
533
534 #[tokio::test]
535 async fn test_enumerate_applies_extension_filter() {
536 let tmp = TempDir::new().unwrap();
537 create_test_files(tmp.path());
538 let spec = FilesystemSourceSpec {
539 root: tmp.path().to_path_buf(),
540 filters: vec![],
541 extensions: vec!["md".to_string()],
542 stream: None,
543 };
544 let adapter = FilesystemAdapter::from_fs_spec("test", &spec).unwrap();
545 let items = adapter.enumerate().await.unwrap();
546 assert_eq!(items.len(), 2); assert!(items.iter().all(|i| i.path.ends_with(".md")));
548 }
549
550 #[tokio::test]
551 async fn test_enumerate_applies_glob_filter() {
552 let tmp = TempDir::new().unwrap();
553 create_test_files(tmp.path());
554 let spec = FilesystemSourceSpec {
555 root: tmp.path().to_path_buf(),
556 filters: vec![FilterRule {
557 pattern: "sub/**".to_string(),
558 action: FilterAction::Exclude,
559 }],
560 extensions: vec![],
561 stream: None,
562 };
563 let adapter = FilesystemAdapter::from_fs_spec("test", &spec).unwrap();
564 let items = adapter.enumerate().await.unwrap();
565 assert_eq!(items.len(), 3);
567 assert!(items.iter().all(|i| !i.path.starts_with("sub/")));
568 }
569
570 #[tokio::test]
571 async fn test_enumerate_returns_sorted_results() {
572 let tmp = TempDir::new().unwrap();
573 create_test_files(tmp.path());
574 let adapter = make_adapter(tmp.path());
575 let items = adapter.enumerate().await.unwrap();
576 let ids: Vec<&str> = items.iter().map(|i| i.id.as_str()).collect();
577 let mut sorted = ids.clone();
578 sorted.sort();
579 assert_eq!(ids, sorted);
580 }
581
582 #[tokio::test]
583 async fn test_enumerate_source_items_have_correct_fields() {
584 let tmp = TempDir::new().unwrap();
585 fs::write(tmp.path().join("test.md"), "# Test").unwrap();
586 let adapter = make_adapter(tmp.path());
587 let items = adapter.enumerate().await.unwrap();
588 assert_eq!(items.len(), 1);
589 let item = &items[0];
590 assert_eq!(item.id, "test.md");
591 assert_eq!(item.display_name, "test.md");
592 assert_eq!(item.mime_type, "text/markdown");
593 assert_eq!(item.path, "test.md");
594 assert!(item.modified_at.is_some());
595 assert!(item.source_hash.is_none());
596 }
597
598 #[tokio::test]
601 async fn test_fetch_reads_content() {
602 let tmp = TempDir::new().unwrap();
603 let content = "# Hello World";
604 fs::write(tmp.path().join("test.md"), content).unwrap();
605 let adapter = make_adapter(tmp.path());
606 let items = adapter.enumerate().await.unwrap();
607 let doc = adapter.fetch(&items[0]).await.unwrap();
608 assert_eq!(doc.content, content.as_bytes());
609 assert_eq!(doc.id, "test.md");
610 assert_eq!(doc.display_name, "test.md");
611 assert_eq!(doc.mime_type, "text/markdown");
612 }
613
614 #[tokio::test]
615 async fn test_fetch_computes_blake3_hash() {
616 let tmp = TempDir::new().unwrap();
617 let content = b"hash me";
618 fs::write(tmp.path().join("test.txt"), content).unwrap();
619 let adapter = make_adapter(tmp.path());
620 let items = adapter.enumerate().await.unwrap();
621 let doc = adapter.fetch(&items[0]).await.unwrap();
622 let expected = blake3::hash(content).to_hex().to_string();
623 assert_eq!(doc.content_hash.as_str(), expected);
624 }
625
626 #[tokio::test]
627 async fn test_fetch_includes_provenance() {
628 let tmp = TempDir::new().unwrap();
629 fs::write(tmp.path().join("test.md"), "content").unwrap();
630 let adapter = make_adapter(tmp.path());
631 let items = adapter.enumerate().await.unwrap();
632 let doc = adapter.fetch(&items[0]).await.unwrap();
633 assert_eq!(doc.provenance.source_kind, "filesystem");
634 assert!(doc.provenance.metadata.contains_key("path"));
635 assert!(doc.provenance.source_modified.is_some());
636 }
637
638 #[tokio::test]
639 async fn test_fetch_not_found_returns_error() {
640 let tmp = TempDir::new().unwrap();
641 let adapter = make_adapter(tmp.path());
642 let fake_item = SourceItem {
643 id: "nonexistent.txt".to_string(),
644 display_name: "nonexistent.txt".to_string(),
645 mime_type: "text/plain".to_string(),
646 path: "nonexistent.txt".to_string(),
647 modified_at: None,
648 source_hash: None,
649 };
650 let result = adapter.fetch(&fake_item).await;
651 assert!(result.is_err());
652 assert!(matches!(result.unwrap_err(), SourceError::NotFound { .. }));
653 }
654
655 #[test]
658 fn test_relative_path() {
659 let adapter = FilesystemAdapter {
660 root: PathBuf::from("/data/root"),
661 extensions: vec![],
662 filters: vec![],
663 source_name: "test".to_string(),
664 };
665 assert_eq!(
666 adapter.relative_path(Path::new("/data/root/sub/file.md")),
667 "sub/file.md"
668 );
669 assert_eq!(
670 adapter.relative_path(Path::new("/data/root/file.md")),
671 "file.md"
672 );
673 }
674
675 #[test]
678 fn test_compile_filters_valid_patterns() {
679 let rules = vec![
680 FilterRule {
681 pattern: "**/*.md".to_string(),
682 action: FilterAction::Include,
683 },
684 FilterRule {
685 pattern: "Archive/**".to_string(),
686 action: FilterAction::Exclude,
687 },
688 ];
689 let compiled = compile_filters(&rules).unwrap();
690 assert_eq!(compiled.len(), 2);
691 }
692
693 #[test]
694 fn test_compile_filters_invalid_pattern() {
695 let rules = vec![FilterRule {
696 pattern: "[bad".to_string(),
697 action: FilterAction::Include,
698 }];
699 assert!(compile_filters(&rules).is_err());
700 }
701}