nexus_memory_agent/
inbox.rs1use std::path::Path;
4use tokio::fs;
5use tokio::time::Duration;
6use tracing::{debug, error, info, warn};
7use walkdir::WalkDir;
8
9use nexus_core::config::AgentConfig;
10use nexus_storage::repository::{MemoryRepository, ProcessedFileRepository};
11
12use crate::error::AgentError;
13use crate::ingest::IngestService;
14
15pub struct InboxScanner {
16 config: AgentConfig,
17 ingest_service: IngestService,
18}
19
20const MAX_INBOX_FILE_SIZE: u64 = 10 * 1024 * 1024;
22
23impl InboxScanner {
24 pub fn new(config: AgentConfig, ingest_service: IngestService) -> Self {
25 Self {
26 config,
27 ingest_service,
28 }
29 }
30
31 pub async fn run(
32 &self,
33 namespace_id: i64,
34 processed_repo: &ProcessedFileRepository<'_>,
35 memory_repo: &MemoryRepository,
36 ) -> Result<ScanResult, AgentError> {
37 let inbox_path = Path::new(&self.config.inbox_dir);
38
39 if !inbox_path.exists() {
40 info!(path = %inbox_path.display(), "Inbox directory does not exist, creating");
41 fs::create_dir_all(inbox_path)
42 .await
43 .map_err(AgentError::Io)?;
44 return Ok(ScanResult::default());
45 }
46
47 debug!(path = %inbox_path.display(), "Scanning inbox");
48
49 let mut processed = 0;
50 let mut failed = 0;
51
52 let completed_paths = match processed_repo.get_completed_paths(namespace_id).await {
54 Ok(paths) => paths,
55 Err(e) => {
56 error!(error = %e, namespace_id, "Failed to fetch completed paths, falling back to per-file checks");
57 std::collections::HashSet::new()
58 }
59 };
60
61 for entry in WalkDir::new(inbox_path)
63 .follow_links(false)
64 .into_iter()
65 .filter_map(|e| e.ok())
66 .filter(|e| e.file_type().is_file())
67 {
68 let path = entry.path();
69 let path_str = path.to_string_lossy().to_string();
70
71 if completed_paths.contains(&path_str) {
73 continue;
74 }
75
76 let file_id = match processed_repo
78 .mark_processing(namespace_id, &path_str, None)
79 .await
80 {
81 Ok(id) => id,
82 Err(e) => {
83 error!(path = %path.display(), error = %e, namespace_id, "Failed to mark file as processing");
84 continue;
85 }
86 };
87
88 let metadata = match fs::metadata(path).await {
90 Ok(m) => m,
91 Err(e) => {
92 error!(path = %path.display(), error = %e, "Failed to read file metadata");
93 continue;
94 }
95 };
96 if metadata.len() > MAX_INBOX_FILE_SIZE {
97 warn!(
98 path = %path.display(),
99 size = metadata.len(),
100 max = MAX_INBOX_FILE_SIZE,
101 "Skipping file exceeding size limit"
102 );
103 if let Err(e) = processed_repo
104 .mark_failed(
105 file_id,
106 &format!(
107 "File too large ({} bytes, max {})",
108 metadata.len(),
109 MAX_INBOX_FILE_SIZE
110 ),
111 )
112 .await
113 {
114 warn!(file_id = file_id, error = %e, "Failed to mark file as failed");
115 }
116 failed += 1;
117 continue;
118 }
119
120 match fs::read_to_string(path).await {
122 Ok(content) => {
123 let source = format!(
124 "inbox/{}",
125 path.file_name()
126 .map(|n| n.to_string_lossy().to_string())
127 .unwrap_or_else(|| "unknown".to_string())
128 );
129
130 match self
131 .ingest_service
132 .ingest(&content, &source, namespace_id, memory_repo)
133 .await
134 {
135 Ok(memory_id) => {
136 if let Err(e) = processed_repo.mark_processed(file_id, memory_id).await
137 {
138 warn!(
139 file_id = file_id,
140 error = %e,
141 "Failed to mark file as processed"
142 );
143 }
144 processed += 1;
145 }
146 Err(e) => {
147 error!(path = %path.display(), error = %e, namespace_id, "Failed to ingest file");
148 if let Err(e2) =
149 processed_repo.mark_failed(file_id, &e.to_string()).await
150 {
151 warn!(
152 file_id = file_id,
153 error = %e2,
154 "Failed to mark file as failed"
155 );
156 }
157 failed += 1;
158 }
159 }
160 }
161 Err(e) => {
162 error!(path = %path.display(), error = %e, namespace_id, "Failed to read file");
163 if let Err(e2) = processed_repo.mark_failed(file_id, &e.to_string()).await {
164 warn!(
165 file_id = file_id,
166 error = %e2,
167 "Failed to mark file as failed"
168 );
169 }
170 failed += 1;
171 }
172 }
173 }
174
175 info!(processed, failed, "Inbox scan complete");
176 Ok(ScanResult { processed, failed })
177 }
178
179 pub fn scan_interval(&self) -> Duration {
180 Duration::from_secs(self.config.scan_interval_secs)
181 }
182}
183
184#[derive(Debug, Default)]
185pub struct ScanResult {
186 pub processed: u64,
187 pub failed: u64,
188}