Skip to main content

mana_core/sqlite/
rebuild.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, Result};
5use rusqlite::{params, Transaction};
6
7use crate::unit::{Unit, UnitType};
8
9use super::freshness::{
10    invalid_source_file_metadata, source_file_metadata_with_kind, SourceFileKind,
11    SourceFileMetadata, SourceFileStatus,
12};
13use super::Index;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct RebuildReport {
17    pub valid_units: usize,
18    pub invalid_files: usize,
19}
20
21impl Index {
22    pub fn rebuild_from_canonical_files(&mut self, mana_dir: &Path) -> Result<RebuildReport> {
23        let files = discover_unit_files(mana_dir)?;
24        let tx = self.connection_mut().transaction()?;
25
26        clear_indexed_rows(&tx)?;
27
28        let mut report = RebuildReport {
29            valid_units: 0,
30            invalid_files: 0,
31        };
32
33        for source in files {
34            match Unit::from_file(&source.path) {
35                Ok(mut unit) => {
36                    unit.is_archived = source.is_archived;
37                    let metadata = source_file_metadata_with_kind(
38                        &source.path,
39                        Some(unit.id.clone()),
40                        if source.is_archived {
41                            SourceFileKind::Archive
42                        } else {
43                            SourceFileKind::Unit
44                        },
45                        SourceFileStatus::Valid,
46                    )?;
47                    record_source_file_tx(&tx, &metadata)?;
48                    insert_unit_tx(&tx, &unit, &metadata)?;
49                    report.valid_units += 1;
50                }
51                Err(error) => {
52                    let metadata = invalid_source_file_metadata(
53                        &source.path,
54                        if source.is_archived {
55                            SourceFileKind::Archive
56                        } else {
57                            SourceFileKind::Unit
58                        },
59                        SourceFileStatus::InvalidParse,
60                        error.to_string(),
61                    )?;
62                    record_source_file_tx(&tx, &metadata)?;
63                    insert_diagnostic_tx(
64                        &tx,
65                        "error",
66                        "parse",
67                        Some(&metadata.path),
68                        None,
69                        Some("frontmatter"),
70                        metadata
71                            .error_message
72                            .as_deref()
73                            .unwrap_or("failed to parse unit file"),
74                    )?;
75                    report.invalid_files += 1;
76                }
77            }
78        }
79
80        set_meta_tx(&tx, "last_full_rebuild_at", &super::timestamp_now())?;
81        set_meta_tx(&tx, "stale", "false")?;
82        set_meta_tx(&tx, "stale_reason", "")?;
83        tx.commit()?;
84
85        Ok(report)
86    }
87}
88
89#[derive(Debug, Clone)]
90struct UnitSourceFile {
91    path: PathBuf,
92    is_archived: bool,
93}
94
95fn discover_unit_files(mana_dir: &Path) -> Result<Vec<UnitSourceFile>> {
96    let mut files = Vec::new();
97    collect_unit_files_in_dir(mana_dir, false, &mut files)?;
98
99    let archive_dir = mana_dir.join("archive");
100    if archive_dir.is_dir() {
101        collect_unit_files_recursive(&archive_dir, true, &mut files)?;
102    }
103
104    files.sort_by(|a, b| a.path.cmp(&b.path));
105    Ok(files)
106}
107
108fn collect_unit_files_in_dir(
109    dir: &Path,
110    is_archived: bool,
111    files: &mut Vec<UnitSourceFile>,
112) -> Result<()> {
113    for entry in fs::read_dir(dir)
114        .with_context(|| format!("failed to read mana directory: {}", dir.display()))?
115    {
116        let entry = entry?;
117        let path = entry.path();
118        if path.is_file() && is_unit_file(&path) {
119            files.push(UnitSourceFile { path, is_archived });
120        } else if path.is_dir()
121            && path.file_name().and_then(|name| name.to_str()) != Some("archive")
122        {
123            collect_unit_files_recursive(&path, is_archived, files)?;
124        }
125    }
126    Ok(())
127}
128
129fn collect_unit_files_recursive(
130    dir: &Path,
131    is_archived: bool,
132    files: &mut Vec<UnitSourceFile>,
133) -> Result<()> {
134    for entry in fs::read_dir(dir)
135        .with_context(|| format!("failed to read archive directory: {}", dir.display()))?
136    {
137        let entry = entry?;
138        let path = entry.path();
139        if path.is_dir() {
140            collect_unit_files_recursive(&path, is_archived, files)?;
141        } else if path.is_file() && is_unit_file(&path) {
142            files.push(UnitSourceFile { path, is_archived });
143        }
144    }
145    Ok(())
146}
147
148fn is_unit_file(path: &Path) -> bool {
149    let Some(filename) = path.file_name().and_then(|name| name.to_str()) else {
150        return false;
151    };
152    if matches!(
153        filename,
154        "config.yaml" | "index.yaml" | "unit.yaml" | "archive.yaml"
155    ) {
156        return false;
157    }
158    match path.extension().and_then(|ext| ext.to_str()) {
159        Some("md") => filename.contains('-'),
160        Some("yaml") => true,
161        _ => false,
162    }
163}
164
165fn clear_indexed_rows(tx: &Transaction<'_>) -> Result<()> {
166    for table in [
167        "index_diagnostics",
168        "context_edges",
169        "facts",
170        "unit_history",
171        "unit_attempts",
172        "unit_decisions",
173        "unit_artifacts",
174        "unit_dependencies",
175        "unit_paths",
176        "unit_labels",
177        "units",
178        "source_files",
179    ] {
180        tx.execute(&format!("DELETE FROM {table}"), [])?;
181    }
182    Ok(())
183}
184
185fn record_source_file_tx(tx: &Transaction<'_>, metadata: &SourceFileMetadata) -> Result<()> {
186    tx.execute(
187        r#"
188        INSERT INTO source_files (
189            path, unit_id, kind, hash, mtime, size, indexed_at, status,
190            error_kind, error_message, error_field
191        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
192        ON CONFLICT(path) DO UPDATE SET
193            unit_id = excluded.unit_id,
194            kind = excluded.kind,
195            hash = excluded.hash,
196            mtime = excluded.mtime,
197            size = excluded.size,
198            indexed_at = excluded.indexed_at,
199            status = excluded.status,
200            error_kind = excluded.error_kind,
201            error_message = excluded.error_message,
202            error_field = excluded.error_field
203        "#,
204        params![
205            metadata.path,
206            metadata.unit_id,
207            metadata.kind.as_str(),
208            metadata.hash,
209            metadata.mtime,
210            metadata.size,
211            super::timestamp_now(),
212            metadata.status.as_str(),
213            metadata.error_kind,
214            metadata.error_message,
215            metadata.error_field,
216        ],
217    )?;
218    Ok(())
219}
220
221fn insert_unit_tx(tx: &Transaction<'_>, unit: &Unit, metadata: &SourceFileMetadata) -> Result<()> {
222    let source_hash = metadata.hash.clone().unwrap_or_default();
223    let indexed_at = super::timestamp_now();
224    tx.execute(
225        r#"
226        INSERT INTO units (
227            id, title, handle, slug, status, priority, kind, unit_type, feature,
228            created_at, updated_at, closed_at, close_reason, description, acceptance,
229            notes, design, parent, assignee, claimed_by, claimed_at, is_archived,
230            verify, verify_fast, fail_first, checkpoint, verify_hash, attempts,
231            max_attempts, max_loops, verify_timeout, last_verified, stale_after,
232            created_by, model, autonomy_disposition, outputs_json, on_fail_json,
233            on_close_json, source_path, source_hash, indexed_at
234        ) VALUES (
235            ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
236            ?10, ?11, ?12, ?13, ?14, ?15,
237            ?16, ?17, ?18, ?19, ?20, ?21, ?22,
238            ?23, ?24, ?25, ?26, ?27, ?28,
239            ?29, ?30, ?31, ?32, ?33,
240            ?34, ?35, ?36, ?37, ?38,
241            ?39, ?40, ?41, ?42
242        )
243        "#,
244        params![
245            unit.id,
246            unit.title,
247            unit.handle,
248            unit.slug,
249            unit.status.to_string(),
250            i64::from(unit.priority),
251            unit_type_as_str(unit.kind),
252            unit.unit_type,
253            unit.feature,
254            unit.created_at.to_rfc3339(),
255            unit.updated_at.to_rfc3339(),
256            unit.closed_at.map(|value| value.to_rfc3339()),
257            unit.close_reason,
258            unit.description,
259            unit.acceptance,
260            unit.notes,
261            unit.design,
262            unit.parent,
263            unit.assignee,
264            unit.claimed_by,
265            unit.claimed_at.map(|value| value.to_rfc3339()),
266            unit.is_archived,
267            unit.verify,
268            unit.verify_fast,
269            unit.fail_first,
270            unit.checkpoint,
271            unit.verify_hash,
272            i64::from(unit.attempts),
273            i64::from(unit.max_attempts),
274            unit.max_loops.map(i64::from),
275            unit.verify_timeout
276                .and_then(|value| i64::try_from(value).ok()),
277            unit.last_verified.map(|value| value.to_rfc3339()),
278            unit.stale_after.map(|value| value.to_rfc3339()),
279            unit.created_by,
280            unit.model,
281            json_string(&unit.autonomy_disposition)?,
282            json_string(&unit.outputs)?,
283            json_string(&unit.on_fail)?,
284            json_string(&unit.on_close)?,
285            metadata.path,
286            source_hash,
287            indexed_at,
288        ],
289    )?;
290
291    insert_strings_tx(tx, "unit_labels", "label", &unit.id, &unit.labels)?;
292    insert_strings_tx(tx, "unit_paths", "path", &unit.id, &unit.paths)?;
293    insert_strings_tx(
294        tx,
295        "unit_dependencies",
296        "dep_id",
297        &unit.id,
298        &unit.dependencies,
299    )?;
300    insert_artifacts_tx(tx, &unit.id, "produces", &unit.produces)?;
301    insert_artifacts_tx(tx, &unit.id, "requires", &unit.requires)?;
302    insert_decisions_tx(tx, &unit.id, &unit.decisions)?;
303    insert_attempts_tx(tx, &unit.id, &unit.attempt_log)?;
304    insert_history_tx(tx, &unit.id, &unit.history)?;
305
306    if matches!(unit.kind, UnitType::Fact) || unit.unit_type == "fact" {
307        tx.execute(
308            "INSERT INTO facts (unit_id, last_verified, stale_after, score_hint) VALUES (?1, ?2, ?3, NULL)",
309            params![
310                unit.id,
311                unit.last_verified.map(|value| value.to_rfc3339()),
312                unit.stale_after.map(|value| value.to_rfc3339()),
313            ],
314        )?;
315    }
316
317    Ok(())
318}
319
320fn insert_strings_tx(
321    tx: &Transaction<'_>,
322    table: &str,
323    column: &str,
324    unit_id: &str,
325    values: &[String],
326) -> Result<()> {
327    let sql = format!("INSERT INTO {table} (unit_id, {column}, position) VALUES (?1, ?2, ?3)");
328    for (position, value) in values.iter().enumerate() {
329        tx.execute(&sql, params![unit_id, value, position as i64])?;
330    }
331    Ok(())
332}
333
334fn insert_artifacts_tx(
335    tx: &Transaction<'_>,
336    unit_id: &str,
337    direction: &str,
338    values: &[String],
339) -> Result<()> {
340    for (position, value) in values.iter().enumerate() {
341        tx.execute(
342            "INSERT INTO unit_artifacts (unit_id, direction, artifact, position) VALUES (?1, ?2, ?3, ?4)",
343            params![unit_id, direction, value, position as i64],
344        )?;
345    }
346    Ok(())
347}
348
349fn insert_decisions_tx(tx: &Transaction<'_>, unit_id: &str, decisions: &[String]) -> Result<()> {
350    for (index, decision) in decisions.iter().enumerate() {
351        tx.execute(
352            "INSERT INTO unit_decisions (unit_id, decision_index, text, resolved) VALUES (?1, ?2, ?3, 0)",
353            params![unit_id, index as i64, decision],
354        )?;
355    }
356    Ok(())
357}
358
359fn insert_attempts_tx(
360    tx: &Transaction<'_>,
361    unit_id: &str,
362    attempts: &[crate::unit::types::AttemptRecord],
363) -> Result<()> {
364    for (index, attempt) in attempts.iter().enumerate() {
365        tx.execute(
366            "INSERT INTO unit_attempts (unit_id, attempt_index, num, outcome, notes, raw_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
367            params![
368                unit_id,
369                index as i64,
370                i64::from(attempt.num),
371                json_string(&attempt.outcome)?,
372                attempt.notes,
373                json_string(attempt)?,
374            ],
375        )?;
376    }
377    Ok(())
378}
379
380fn insert_history_tx(
381    tx: &Transaction<'_>,
382    unit_id: &str,
383    history: &[crate::unit::types::RunRecord],
384) -> Result<()> {
385    for (index, record) in history.iter().enumerate() {
386        tx.execute(
387            "INSERT INTO unit_history (unit_id, history_index, started_at, finished_at, status, exit_code, raw_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
388            params![
389                unit_id,
390                index as i64,
391                record.started_at.to_rfc3339(),
392                record.finished_at.map(|value| value.to_rfc3339()),
393                json_string(&record.result)?,
394                record.exit_code,
395                json_string(record)?,
396            ],
397        )?;
398    }
399    Ok(())
400}
401
402fn insert_diagnostic_tx(
403    tx: &Transaction<'_>,
404    severity: &str,
405    kind: &str,
406    source_path: Option<&str>,
407    unit_id: Option<&str>,
408    field: Option<&str>,
409    message: &str,
410) -> Result<()> {
411    tx.execute(
412        "INSERT INTO index_diagnostics (severity, kind, source_path, unit_id, field, message, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
413        params![severity, kind, source_path, unit_id, field, message, super::timestamp_now()],
414    )?;
415    Ok(())
416}
417
418fn set_meta_tx(tx: &Transaction<'_>, key: &str, value: &str) -> Result<()> {
419    tx.execute(
420        r#"
421        INSERT INTO index_meta (key, value) VALUES (?1, ?2)
422        ON CONFLICT(key) DO UPDATE SET value = excluded.value
423        "#,
424        params![key, value],
425    )?;
426    Ok(())
427}
428
429fn json_string<T: serde::Serialize>(value: &T) -> Result<Option<String>> {
430    serde_json::to_string(value)
431        .map(Some)
432        .context("failed to serialize indexed JSON value")
433}
434
435fn unit_type_as_str(kind: UnitType) -> &'static str {
436    match kind {
437        UnitType::Epic => "epic",
438        UnitType::Task => "task",
439        UnitType::Fact => "fact",
440    }
441}