1use std::fs;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, Result};
5use rusqlite::{params, Transaction};
6
7use crate::unit::{Unit, UnitType};
8
9use super::freshness::{
10 invalid_source_file_metadata, source_file_metadata_with_kind, SourceFileKind,
11 SourceFileMetadata, SourceFileStatus,
12};
13use super::Index;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct RebuildReport {
17 pub valid_units: usize,
18 pub invalid_files: usize,
19}
20
21impl Index {
22 pub fn rebuild_from_canonical_files(&mut self, mana_dir: &Path) -> Result<RebuildReport> {
23 let files = discover_unit_files(mana_dir)?;
24 let tx = self.connection_mut().transaction()?;
25
26 clear_indexed_rows(&tx)?;
27
28 let mut report = RebuildReport {
29 valid_units: 0,
30 invalid_files: 0,
31 };
32
33 for source in files {
34 match Unit::from_file(&source.path) {
35 Ok(mut unit) => {
36 unit.is_archived = source.is_archived;
37 let metadata = source_file_metadata_with_kind(
38 &source.path,
39 Some(unit.id.clone()),
40 if source.is_archived {
41 SourceFileKind::Archive
42 } else {
43 SourceFileKind::Unit
44 },
45 SourceFileStatus::Valid,
46 )?;
47 record_source_file_tx(&tx, &metadata)?;
48 insert_unit_tx(&tx, &unit, &metadata)?;
49 report.valid_units += 1;
50 }
51 Err(error) => {
52 let metadata = invalid_source_file_metadata(
53 &source.path,
54 if source.is_archived {
55 SourceFileKind::Archive
56 } else {
57 SourceFileKind::Unit
58 },
59 SourceFileStatus::InvalidParse,
60 error.to_string(),
61 )?;
62 record_source_file_tx(&tx, &metadata)?;
63 insert_diagnostic_tx(
64 &tx,
65 "error",
66 "parse",
67 Some(&metadata.path),
68 None,
69 Some("frontmatter"),
70 metadata
71 .error_message
72 .as_deref()
73 .unwrap_or("failed to parse unit file"),
74 )?;
75 report.invalid_files += 1;
76 }
77 }
78 }
79
80 set_meta_tx(&tx, "last_full_rebuild_at", &super::timestamp_now())?;
81 set_meta_tx(&tx, "stale", "false")?;
82 set_meta_tx(&tx, "stale_reason", "")?;
83 tx.commit()?;
84
85 Ok(report)
86 }
87}
88
89#[derive(Debug, Clone)]
90struct UnitSourceFile {
91 path: PathBuf,
92 is_archived: bool,
93}
94
95fn discover_unit_files(mana_dir: &Path) -> Result<Vec<UnitSourceFile>> {
96 let mut files = Vec::new();
97 collect_unit_files_in_dir(mana_dir, false, &mut files)?;
98
99 let archive_dir = mana_dir.join("archive");
100 if archive_dir.is_dir() {
101 collect_unit_files_recursive(&archive_dir, true, &mut files)?;
102 }
103
104 files.sort_by(|a, b| a.path.cmp(&b.path));
105 Ok(files)
106}
107
108fn collect_unit_files_in_dir(
109 dir: &Path,
110 is_archived: bool,
111 files: &mut Vec<UnitSourceFile>,
112) -> Result<()> {
113 for entry in fs::read_dir(dir)
114 .with_context(|| format!("failed to read mana directory: {}", dir.display()))?
115 {
116 let entry = entry?;
117 let path = entry.path();
118 if path.is_file() && is_unit_file(&path) {
119 files.push(UnitSourceFile { path, is_archived });
120 } else if path.is_dir()
121 && path.file_name().and_then(|name| name.to_str()) != Some("archive")
122 {
123 collect_unit_files_recursive(&path, is_archived, files)?;
124 }
125 }
126 Ok(())
127}
128
129fn collect_unit_files_recursive(
130 dir: &Path,
131 is_archived: bool,
132 files: &mut Vec<UnitSourceFile>,
133) -> Result<()> {
134 for entry in fs::read_dir(dir)
135 .with_context(|| format!("failed to read archive directory: {}", dir.display()))?
136 {
137 let entry = entry?;
138 let path = entry.path();
139 if path.is_dir() {
140 collect_unit_files_recursive(&path, is_archived, files)?;
141 } else if path.is_file() && is_unit_file(&path) {
142 files.push(UnitSourceFile { path, is_archived });
143 }
144 }
145 Ok(())
146}
147
148fn is_unit_file(path: &Path) -> bool {
149 let Some(filename) = path.file_name().and_then(|name| name.to_str()) else {
150 return false;
151 };
152 if matches!(
153 filename,
154 "config.yaml" | "index.yaml" | "unit.yaml" | "archive.yaml"
155 ) {
156 return false;
157 }
158 match path.extension().and_then(|ext| ext.to_str()) {
159 Some("md") => filename.contains('-'),
160 Some("yaml") => true,
161 _ => false,
162 }
163}
164
165fn clear_indexed_rows(tx: &Transaction<'_>) -> Result<()> {
166 for table in [
167 "index_diagnostics",
168 "context_edges",
169 "facts",
170 "unit_history",
171 "unit_attempts",
172 "unit_decisions",
173 "unit_artifacts",
174 "unit_dependencies",
175 "unit_paths",
176 "unit_labels",
177 "units",
178 "source_files",
179 ] {
180 tx.execute(&format!("DELETE FROM {table}"), [])?;
181 }
182 Ok(())
183}
184
185fn record_source_file_tx(tx: &Transaction<'_>, metadata: &SourceFileMetadata) -> Result<()> {
186 tx.execute(
187 r#"
188 INSERT INTO source_files (
189 path, unit_id, kind, hash, mtime, size, indexed_at, status,
190 error_kind, error_message, error_field
191 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
192 ON CONFLICT(path) DO UPDATE SET
193 unit_id = excluded.unit_id,
194 kind = excluded.kind,
195 hash = excluded.hash,
196 mtime = excluded.mtime,
197 size = excluded.size,
198 indexed_at = excluded.indexed_at,
199 status = excluded.status,
200 error_kind = excluded.error_kind,
201 error_message = excluded.error_message,
202 error_field = excluded.error_field
203 "#,
204 params![
205 metadata.path,
206 metadata.unit_id,
207 metadata.kind.as_str(),
208 metadata.hash,
209 metadata.mtime,
210 metadata.size,
211 super::timestamp_now(),
212 metadata.status.as_str(),
213 metadata.error_kind,
214 metadata.error_message,
215 metadata.error_field,
216 ],
217 )?;
218 Ok(())
219}
220
221fn insert_unit_tx(tx: &Transaction<'_>, unit: &Unit, metadata: &SourceFileMetadata) -> Result<()> {
222 let source_hash = metadata.hash.clone().unwrap_or_default();
223 let indexed_at = super::timestamp_now();
224 tx.execute(
225 r#"
226 INSERT INTO units (
227 id, title, handle, slug, status, priority, kind, unit_type, feature,
228 created_at, updated_at, closed_at, close_reason, description, acceptance,
229 notes, design, parent, assignee, claimed_by, claimed_at, is_archived,
230 verify, verify_fast, fail_first, checkpoint, verify_hash, attempts,
231 max_attempts, max_loops, verify_timeout, last_verified, stale_after,
232 created_by, model, autonomy_disposition, outputs_json, on_fail_json,
233 on_close_json, source_path, source_hash, indexed_at
234 ) VALUES (
235 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
236 ?10, ?11, ?12, ?13, ?14, ?15,
237 ?16, ?17, ?18, ?19, ?20, ?21, ?22,
238 ?23, ?24, ?25, ?26, ?27, ?28,
239 ?29, ?30, ?31, ?32, ?33,
240 ?34, ?35, ?36, ?37, ?38,
241 ?39, ?40, ?41, ?42
242 )
243 "#,
244 params![
245 unit.id,
246 unit.title,
247 unit.handle,
248 unit.slug,
249 unit.status.to_string(),
250 i64::from(unit.priority),
251 unit_type_as_str(unit.kind),
252 unit.unit_type,
253 unit.feature,
254 unit.created_at.to_rfc3339(),
255 unit.updated_at.to_rfc3339(),
256 unit.closed_at.map(|value| value.to_rfc3339()),
257 unit.close_reason,
258 unit.description,
259 unit.acceptance,
260 unit.notes,
261 unit.design,
262 unit.parent,
263 unit.assignee,
264 unit.claimed_by,
265 unit.claimed_at.map(|value| value.to_rfc3339()),
266 unit.is_archived,
267 unit.verify,
268 unit.verify_fast,
269 unit.fail_first,
270 unit.checkpoint,
271 unit.verify_hash,
272 i64::from(unit.attempts),
273 i64::from(unit.max_attempts),
274 unit.max_loops.map(i64::from),
275 unit.verify_timeout
276 .and_then(|value| i64::try_from(value).ok()),
277 unit.last_verified.map(|value| value.to_rfc3339()),
278 unit.stale_after.map(|value| value.to_rfc3339()),
279 unit.created_by,
280 unit.model,
281 json_string(&unit.autonomy_disposition)?,
282 json_string(&unit.outputs)?,
283 json_string(&unit.on_fail)?,
284 json_string(&unit.on_close)?,
285 metadata.path,
286 source_hash,
287 indexed_at,
288 ],
289 )?;
290
291 insert_strings_tx(tx, "unit_labels", "label", &unit.id, &unit.labels)?;
292 insert_strings_tx(tx, "unit_paths", "path", &unit.id, &unit.paths)?;
293 insert_strings_tx(
294 tx,
295 "unit_dependencies",
296 "dep_id",
297 &unit.id,
298 &unit.dependencies,
299 )?;
300 insert_artifacts_tx(tx, &unit.id, "produces", &unit.produces)?;
301 insert_artifacts_tx(tx, &unit.id, "requires", &unit.requires)?;
302 insert_decisions_tx(tx, &unit.id, &unit.decisions)?;
303 insert_attempts_tx(tx, &unit.id, &unit.attempt_log)?;
304 insert_history_tx(tx, &unit.id, &unit.history)?;
305
306 if matches!(unit.kind, UnitType::Fact) || unit.unit_type == "fact" {
307 tx.execute(
308 "INSERT INTO facts (unit_id, last_verified, stale_after, score_hint) VALUES (?1, ?2, ?3, NULL)",
309 params![
310 unit.id,
311 unit.last_verified.map(|value| value.to_rfc3339()),
312 unit.stale_after.map(|value| value.to_rfc3339()),
313 ],
314 )?;
315 }
316
317 Ok(())
318}
319
320fn insert_strings_tx(
321 tx: &Transaction<'_>,
322 table: &str,
323 column: &str,
324 unit_id: &str,
325 values: &[String],
326) -> Result<()> {
327 let sql = format!("INSERT INTO {table} (unit_id, {column}, position) VALUES (?1, ?2, ?3)");
328 for (position, value) in values.iter().enumerate() {
329 tx.execute(&sql, params![unit_id, value, position as i64])?;
330 }
331 Ok(())
332}
333
334fn insert_artifacts_tx(
335 tx: &Transaction<'_>,
336 unit_id: &str,
337 direction: &str,
338 values: &[String],
339) -> Result<()> {
340 for (position, value) in values.iter().enumerate() {
341 tx.execute(
342 "INSERT INTO unit_artifacts (unit_id, direction, artifact, position) VALUES (?1, ?2, ?3, ?4)",
343 params![unit_id, direction, value, position as i64],
344 )?;
345 }
346 Ok(())
347}
348
349fn insert_decisions_tx(tx: &Transaction<'_>, unit_id: &str, decisions: &[String]) -> Result<()> {
350 for (index, decision) in decisions.iter().enumerate() {
351 tx.execute(
352 "INSERT INTO unit_decisions (unit_id, decision_index, text, resolved) VALUES (?1, ?2, ?3, 0)",
353 params![unit_id, index as i64, decision],
354 )?;
355 }
356 Ok(())
357}
358
359fn insert_attempts_tx(
360 tx: &Transaction<'_>,
361 unit_id: &str,
362 attempts: &[crate::unit::types::AttemptRecord],
363) -> Result<()> {
364 for (index, attempt) in attempts.iter().enumerate() {
365 tx.execute(
366 "INSERT INTO unit_attempts (unit_id, attempt_index, num, outcome, notes, raw_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
367 params![
368 unit_id,
369 index as i64,
370 i64::from(attempt.num),
371 json_string(&attempt.outcome)?,
372 attempt.notes,
373 json_string(attempt)?,
374 ],
375 )?;
376 }
377 Ok(())
378}
379
380fn insert_history_tx(
381 tx: &Transaction<'_>,
382 unit_id: &str,
383 history: &[crate::unit::types::RunRecord],
384) -> Result<()> {
385 for (index, record) in history.iter().enumerate() {
386 tx.execute(
387 "INSERT INTO unit_history (unit_id, history_index, started_at, finished_at, status, exit_code, raw_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
388 params![
389 unit_id,
390 index as i64,
391 record.started_at.to_rfc3339(),
392 record.finished_at.map(|value| value.to_rfc3339()),
393 json_string(&record.result)?,
394 record.exit_code,
395 json_string(record)?,
396 ],
397 )?;
398 }
399 Ok(())
400}
401
402fn insert_diagnostic_tx(
403 tx: &Transaction<'_>,
404 severity: &str,
405 kind: &str,
406 source_path: Option<&str>,
407 unit_id: Option<&str>,
408 field: Option<&str>,
409 message: &str,
410) -> Result<()> {
411 tx.execute(
412 "INSERT INTO index_diagnostics (severity, kind, source_path, unit_id, field, message, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
413 params![severity, kind, source_path, unit_id, field, message, super::timestamp_now()],
414 )?;
415 Ok(())
416}
417
418fn set_meta_tx(tx: &Transaction<'_>, key: &str, value: &str) -> Result<()> {
419 tx.execute(
420 r#"
421 INSERT INTO index_meta (key, value) VALUES (?1, ?2)
422 ON CONFLICT(key) DO UPDATE SET value = excluded.value
423 "#,
424 params![key, value],
425 )?;
426 Ok(())
427}
428
429fn json_string<T: serde::Serialize>(value: &T) -> Result<Option<String>> {
430 serde_json::to_string(value)
431 .map(Some)
432 .context("failed to serialize indexed JSON value")
433}
434
435fn unit_type_as_str(kind: UnitType) -> &'static str {
436 match kind {
437 UnitType::Epic => "epic",
438 UnitType::Task => "task",
439 UnitType::Fact => "fact",
440 }
441}