Skip to main content

nexus_memory_agent/
inbox.rs

1//! Inbox scanner - watches directory for new files
2
3use std::path::Path;
4use tokio::fs;
5use tokio::time::Duration;
6use tracing::{debug, error, info, warn};
7use walkdir::WalkDir;
8
9use nexus_core::config::AgentConfig;
10use nexus_storage::repository::{MemoryRepository, ProcessedFileRepository};
11
12use crate::error::AgentError;
13use crate::ingest::IngestService;
14
15pub struct InboxScanner {
16    config: AgentConfig,
17    ingest_service: IngestService,
18}
19
20/// Maximum file size to read from inbox (10 MB)
21const MAX_INBOX_FILE_SIZE: u64 = 10 * 1024 * 1024;
22
23impl InboxScanner {
24    pub fn new(config: AgentConfig, ingest_service: IngestService) -> Self {
25        Self {
26            config,
27            ingest_service,
28        }
29    }
30
31    pub async fn run(
32        &self,
33        namespace_id: i64,
34        processed_repo: &ProcessedFileRepository<'_>,
35        memory_repo: &MemoryRepository,
36    ) -> Result<ScanResult, AgentError> {
37        let inbox_path = Path::new(&self.config.inbox_dir);
38
39        if !inbox_path.exists() {
40            info!(path = %inbox_path.display(), "Inbox directory does not exist, creating");
41            fs::create_dir_all(inbox_path)
42                .await
43                .map_err(AgentError::Io)?;
44            return Ok(ScanResult::default());
45        }
46
47        debug!(path = %inbox_path.display(), "Scanning inbox");
48
49        let mut processed = 0;
50        let mut failed = 0;
51
52        // Batch-fetch all completed file paths to avoid N+1 queries
53        let completed_paths = match processed_repo.get_completed_paths(namespace_id).await {
54            Ok(paths) => paths,
55            Err(e) => {
56                error!(error = %e, namespace_id, "Failed to fetch completed paths, falling back to per-file checks");
57                std::collections::HashSet::new()
58            }
59        };
60
61        // Walk the inbox directory
62        for entry in WalkDir::new(inbox_path)
63            .follow_links(false)
64            .into_iter()
65            .filter_map(|e| e.ok())
66            .filter(|e| e.file_type().is_file())
67        {
68            let path = entry.path();
69            let path_str = path.to_string_lossy().to_string();
70
71            // Skip already-processed files (in-memory check)
72            if completed_paths.contains(&path_str) {
73                continue;
74            }
75
76            // Mark as processing
77            let file_id = match processed_repo
78                .mark_processing(namespace_id, &path_str, None)
79                .await
80            {
81                Ok(id) => id,
82                Err(e) => {
83                    error!(path = %path.display(), error = %e, namespace_id, "Failed to mark file as processing");
84                    continue;
85                }
86            };
87
88            // Check file size before reading
89            let metadata = match fs::metadata(path).await {
90                Ok(m) => m,
91                Err(e) => {
92                    error!(path = %path.display(), error = %e, "Failed to read file metadata");
93                    continue;
94                }
95            };
96            if metadata.len() > MAX_INBOX_FILE_SIZE {
97                warn!(
98                    path = %path.display(),
99                    size = metadata.len(),
100                    max = MAX_INBOX_FILE_SIZE,
101                    "Skipping file exceeding size limit"
102                );
103                if let Err(e) = processed_repo
104                    .mark_failed(
105                        file_id,
106                        &format!(
107                            "File too large ({} bytes, max {})",
108                            metadata.len(),
109                            MAX_INBOX_FILE_SIZE
110                        ),
111                    )
112                    .await
113                {
114                    warn!(file_id = file_id, error = %e, "Failed to mark file as failed");
115                }
116                failed += 1;
117                continue;
118            }
119
120            // Read and process file
121            match fs::read_to_string(path).await {
122                Ok(content) => {
123                    let source = format!(
124                        "inbox/{}",
125                        path.file_name()
126                            .map(|n| n.to_string_lossy().to_string())
127                            .unwrap_or_else(|| "unknown".to_string())
128                    );
129
130                    match self
131                        .ingest_service
132                        .ingest(&content, &source, namespace_id, memory_repo)
133                        .await
134                    {
135                        Ok(memory_id) => {
136                            if let Err(e) = processed_repo.mark_processed(file_id, memory_id).await
137                            {
138                                warn!(
139                                    file_id = file_id,
140                                    error = %e,
141                                    "Failed to mark file as processed"
142                                );
143                            }
144                            processed += 1;
145                        }
146                        Err(e) => {
147                            error!(path = %path.display(), error = %e, namespace_id, "Failed to ingest file");
148                            if let Err(e2) =
149                                processed_repo.mark_failed(file_id, &e.to_string()).await
150                            {
151                                warn!(
152                                    file_id = file_id,
153                                    error = %e2,
154                                    "Failed to mark file as failed"
155                                );
156                            }
157                            failed += 1;
158                        }
159                    }
160                }
161                Err(e) => {
162                    error!(path = %path.display(), error = %e, namespace_id, "Failed to read file");
163                    if let Err(e2) = processed_repo.mark_failed(file_id, &e.to_string()).await {
164                        warn!(
165                            file_id = file_id,
166                            error = %e2,
167                            "Failed to mark file as failed"
168                        );
169                    }
170                    failed += 1;
171                }
172            }
173        }
174
175        info!(processed, failed, "Inbox scan complete");
176        Ok(ScanResult { processed, failed })
177    }
178
179    pub fn scan_interval(&self) -> Duration {
180        Duration::from_secs(self.config.scan_interval_secs)
181    }
182}
183
184#[derive(Debug, Default)]
185pub struct ScanResult {
186    pub processed: u64,
187    pub failed: u64,
188}