Skip to main content

stygian_graph/adapters/
document.rs

1//! File system document source adapter.
2//!
3//! Implements [`DocumentSourcePort`] and [`ScrapingService`] for reading files
4//! from the local file system.  Supports glob-based file discovery, recursive
5//! directory traversal, and MIME-type detection.
6//!
7//! # Example
8//!
9//! ```no_run
10//! use stygian_graph::adapters::document::DocumentSource;
11//! use stygian_graph::ports::document_source::{DocumentSourcePort, DocumentQuery};
12//! use std::path::PathBuf;
13//!
14//! # async fn example() {
15//! let source = DocumentSource::new();
16//! let query = DocumentQuery {
17//!     path: PathBuf::from("data/"),
18//!     recursive: true,
19//!     glob_pattern: Some("*.json".into()),
20//! };
21//! let docs = source.read_documents(query).await.unwrap();
22//! # }
23//! ```
24
25use async_trait::async_trait;
26use serde_json::json;
27use std::path::{Path, PathBuf};
28use tokio::fs;
29
30use crate::domain::error::{Result, ServiceError, StygianError};
31use crate::ports::document_source::{Document, DocumentQuery, DocumentSourcePort};
32use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
33
34// ─────────────────────────────────────────────────────────────────────────────
35// DocumentSource
36// ─────────────────────────────────────────────────────────────────────────────
37
38/// Adapter: local file system as a pipeline data source.
39///
40/// Reads files from disk and returns their content as [`Document`] structs.
41/// Also implements [`ScrapingService`] for DAG pipeline integration.
42///
43/// # Example
44///
45/// ```
46/// use stygian_graph::adapters::document::DocumentSource;
47///
48/// let source = DocumentSource::new();
49/// assert_eq!(source.source_name(), "filesystem");
50/// ```
51pub struct DocumentSource {
52    _priv: (),
53}
54
55impl DocumentSource {
56    /// Create a new file system document source.
57    ///
58    /// # Example
59    ///
60    /// ```
61    /// use stygian_graph::adapters::document::DocumentSource;
62    ///
63    /// let source = DocumentSource::new();
64    /// ```
65    #[must_use]
66    pub fn new() -> Self {
67        Self { _priv: () }
68    }
69
70    /// Return the name of this source.
71    #[must_use]
72    pub fn source_name(&self) -> &str {
73        "filesystem"
74    }
75
76    /// Collect file paths matching the query.
77    async fn collect_paths(query: &DocumentQuery) -> Result<Vec<PathBuf>> {
78        let meta = fs::metadata(&query.path).await.map_err(|e| {
79            StygianError::Service(ServiceError::Unavailable(format!(
80                "cannot access {}: {e}",
81                query.path.display()
82            )))
83        })?;
84
85        if meta.is_file() {
86            return Ok(vec![query.path.clone()]);
87        }
88
89        let mut paths = Vec::new();
90        Self::walk_dir(
91            &query.path,
92            query.recursive,
93            &query.glob_pattern,
94            &mut paths,
95        )
96        .await?;
97        Ok(paths)
98    }
99
100    /// Recursively walk a directory collecting matching files.
101    async fn walk_dir(
102        dir: &Path,
103        recursive: bool,
104        glob: &Option<String>,
105        out: &mut Vec<PathBuf>,
106    ) -> Result<()> {
107        let mut entries = fs::read_dir(dir).await.map_err(|e| {
108            StygianError::Service(ServiceError::Unavailable(format!(
109                "cannot read directory {}: {e}",
110                dir.display()
111            )))
112        })?;
113
114        while let Some(entry) = entries.next_entry().await.map_err(|e| {
115            StygianError::Service(ServiceError::Unavailable(format!("readdir error: {e}")))
116        })? {
117            let path = entry.path();
118            let ft = entry.file_type().await.map_err(|e| {
119                StygianError::Service(ServiceError::Unavailable(format!("file_type error: {e}")))
120            })?;
121
122            if ft.is_symlink() {
123                // Follow symlinks: re-check metadata to determine the target type.
124                match fs::metadata(&path).await {
125                    Ok(meta) if meta.is_dir() && recursive => {
126                        Box::pin(Self::walk_dir(&path, recursive, glob, out)).await?;
127                    }
128                    Ok(meta) if meta.is_file() => {
129                        if let Some(pattern) = glob {
130                            if let Some(name) = path.file_name().and_then(|n| n.to_str())
131                                && Self::glob_matches(pattern, name)
132                            {
133                                out.push(path);
134                            }
135                        } else {
136                            out.push(path);
137                        }
138                    }
139                    // Broken symlink or permission error — skip silently.
140                    _ => {}
141                }
142            } else if ft.is_dir() && recursive {
143                Box::pin(Self::walk_dir(&path, recursive, glob, out)).await?;
144            } else if ft.is_file() {
145                if let Some(pattern) = glob {
146                    if let Some(name) = path.file_name().and_then(|n| n.to_str())
147                        && Self::glob_matches(pattern, name)
148                    {
149                        out.push(path);
150                    }
151                } else {
152                    out.push(path);
153                }
154            }
155        }
156
157        Ok(())
158    }
159
160    /// Simple glob matching supporting `*` wildcard.
161    ///
162    /// Matching is case-insensitive to behave consistently across
163    /// Windows (NTFS), macOS (APFS/HFS+), and Linux (ext4).
164    fn glob_matches(pattern: &str, name: &str) -> bool {
165        if pattern == "*" {
166            return true;
167        }
168        let pattern_lower = pattern.to_ascii_lowercase();
169        let name_lower = name.to_ascii_lowercase();
170        if let Some(ext) = pattern_lower.strip_prefix("*.") {
171            return name_lower.ends_with(&format!(".{ext}"));
172        }
173        if let Some(prefix) = pattern_lower.strip_suffix('*') {
174            return name_lower.starts_with(prefix);
175        }
176        pattern_lower == name_lower
177    }
178
179    /// Infer MIME type from file extension.
180    ///
181    /// Extension matching is case-insensitive so `.JSON`, `.Json`, etc.
182    /// resolve correctly on all platforms.
183    fn infer_mime(path: &Path) -> Option<String> {
184        let ext_raw = path.extension()?.to_str()?;
185        let ext = ext_raw.to_ascii_lowercase();
186        let mime = match ext.as_str() {
187            "json" => "application/json",
188            "csv" => "text/csv",
189            "xml" => "application/xml",
190            "html" | "htm" => "text/html",
191            "md" | "markdown" => "text/markdown",
192            "txt" => "text/plain",
193            "yaml" | "yml" => "application/yaml",
194            "toml" => "application/toml",
195            "pdf" => "application/pdf",
196            "png" => "image/png",
197            "jpg" | "jpeg" => "image/jpeg",
198            "gif" => "image/gif",
199            "svg" => "image/svg+xml",
200            _ => return None,
201        };
202        Some(mime.to_string())
203    }
204}
205
206impl Default for DocumentSource {
207    fn default() -> Self {
208        Self::new()
209    }
210}
211
212// ─────────────────────────────────────────────────────────────────────────────
213// DocumentSourcePort
214// ─────────────────────────────────────────────────────────────────────────────
215
216#[async_trait]
217impl DocumentSourcePort for DocumentSource {
218    async fn read_documents(&self, query: DocumentQuery) -> Result<Vec<Document>> {
219        let paths = Self::collect_paths(&query).await?;
220        let mut docs = Vec::with_capacity(paths.len());
221
222        for path in paths {
223            let content = fs::read_to_string(&path).await.map_err(|e| {
224                StygianError::Service(ServiceError::InvalidResponse(format!(
225                    "cannot read {}: {e}",
226                    path.display()
227                )))
228            })?;
229            let size_bytes = content.len() as u64;
230            let mime_type = Self::infer_mime(&path);
231
232            docs.push(Document {
233                path,
234                content,
235                mime_type,
236                size_bytes,
237            });
238        }
239
240        Ok(docs)
241    }
242
243    fn source_name(&self) -> &str {
244        "filesystem"
245    }
246}
247
248// ─────────────────────────────────────────────────────────────────────────────
249// ScrapingService (DAG integration)
250// ─────────────────────────────────────────────────────────────────────────────
251
252#[async_trait]
253impl ScrapingService for DocumentSource {
254    /// Read files from the local file system.
255    ///
256    /// Expected params:
257    /// ```json
258    /// { "path": "data/", "recursive": true, "glob_pattern": "*.csv" }
259    /// ```
260    ///
261    /// # Example
262    ///
263    /// ```no_run
264    /// # use stygian_graph::ports::{ScrapingService, ServiceInput};
265    /// # use stygian_graph::adapters::document::DocumentSource;
266    /// # use serde_json::json;
267    /// # async fn example() {
268    /// let source = DocumentSource::new();
269    /// let input = ServiceInput {
270    ///     url: String::new(),
271    ///     params: json!({"path": "data/report.json"}),
272    /// };
273    /// let result = source.execute(input).await.unwrap();
274    /// # }
275    /// ```
276    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
277        let path_str = input.params["path"].as_str().ok_or_else(|| {
278            StygianError::Service(ServiceError::InvalidResponse(
279                "missing 'path' in params".into(),
280            ))
281        })?;
282
283        let query = DocumentQuery {
284            path: PathBuf::from(path_str),
285            recursive: input.params["recursive"].as_bool().unwrap_or(false),
286            glob_pattern: input.params["glob_pattern"].as_str().map(String::from),
287        };
288
289        let docs = self.read_documents(query).await?;
290        let doc_count = docs.len();
291
292        Ok(ServiceOutput {
293            data: serde_json::to_string(&docs).unwrap_or_default(),
294            metadata: json!({
295                "source": "filesystem",
296                "document_count": doc_count,
297            }),
298        })
299    }
300
301    fn name(&self) -> &'static str {
302        "document"
303    }
304}
305
306// ─────────────────────────────────────────────────────────────────────────────
307// Tests
308// ─────────────────────────────────────────────────────────────────────────────
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    #[test]
315    fn glob_matches_exact() {
316        assert!(DocumentSource::glob_matches("report.csv", "report.csv"));
317        assert!(!DocumentSource::glob_matches("report.csv", "other.csv"));
318    }
319
320    #[test]
321    fn glob_matches_extension() {
322        assert!(DocumentSource::glob_matches("*.csv", "data.csv"));
323        assert!(!DocumentSource::glob_matches("*.csv", "data.json"));
324    }
325
326    #[test]
327    fn glob_matches_case_insensitive() {
328        assert!(DocumentSource::glob_matches("*.JSON", "data.json"));
329        assert!(DocumentSource::glob_matches("*.csv", "DATA.CSV"));
330        assert!(DocumentSource::glob_matches("Report*", "report_2024.csv"));
331    }
332
333    #[test]
334    fn glob_matches_prefix() {
335        assert!(DocumentSource::glob_matches("report*", "report_2024.csv"));
336        assert!(!DocumentSource::glob_matches("report*", "data.csv"));
337    }
338
339    #[test]
340    fn glob_matches_star() {
341        assert!(DocumentSource::glob_matches("*", "anything.txt"));
342    }
343
344    #[test]
345    fn infer_mime_known_types() {
346        assert_eq!(
347            DocumentSource::infer_mime(Path::new("data.json")),
348            Some("application/json".into())
349        );
350        assert_eq!(
351            DocumentSource::infer_mime(Path::new("data.csv")),
352            Some("text/csv".into())
353        );
354        assert_eq!(
355            DocumentSource::infer_mime(Path::new("doc.pdf")),
356            Some("application/pdf".into())
357        );
358    }
359
360    #[test]
361    fn infer_mime_case_insensitive() {
362        assert_eq!(
363            DocumentSource::infer_mime(Path::new("DATA.JSON")),
364            Some("application/json".into())
365        );
366        assert_eq!(
367            DocumentSource::infer_mime(Path::new("photo.JPG")),
368            Some("image/jpeg".into())
369        );
370    }
371
372    #[test]
373    fn infer_mime_unknown() {
374        assert_eq!(DocumentSource::infer_mime(Path::new("data.xyz")), None);
375    }
376
377    #[tokio::test]
378    async fn read_nonexistent_path_returns_error() {
379        let source = DocumentSource::new();
380        let query = DocumentQuery {
381            path: PathBuf::from("/nonexistent/path/that/does/not/exist"),
382            recursive: false,
383            glob_pattern: None,
384        };
385        let result = source.read_documents(query).await;
386        assert!(result.is_err());
387    }
388}