Skip to main content

ev/
migrate.rs

1//! `ev migrate` — backfill an existing decision history into the ledger.
2//!
3//! Four PURE, format-aware extractors turn a source substrate (`&str`) into a `Vec<MigrationRecord>`:
4//! a chat-room/git log (`## R<N>` records), the `to-human` RESOLVED/FLAG markdown blocks (the
5//! authority substrate), a `decisions-immutable` §N document, and an `escalation` log (the SAME
6//! RESOLVED/FLAG reader, path-parameterized). The extractors parse **rulings + structured
7//! rejected-roads only** — they NEVER NLP a free-text reason into a ground (`grounds_are_never_
8//! synthesized`): a road becomes a ground iff the source declares it structurally (a `rejected:`
9//! token), otherwise the record carries zero grounds and stays an honest capture.
10//!
11//! The command driver then runs an IDEMPOTENT backfill loop (deterministic source_key sort →
12//! prospective-parent compute_id → ticks_dir pre-check → skip-if-present) on top of the shared
13//! `capture::append`, plus a `--reconcile` join and a `--bind-check` harvest.
14
15use crate::canonical::compute_id;
16use crate::capture::{harvested_test_check, Decision};
17use crate::store::Store;
18use crate::tick::{Ground, Tick};
19use std::path::Path;
20
21/// One extracted, not-yet-appended decision from a source substrate. `source_key` is the stable,
22/// deterministic dedup/sort key (e.g. `R2289`, `#555`, `§3`) used to order the backfill and to
23/// reconcile against the store; `observe` carries that key as a durable token so reconcile can read
24/// it back from the HASHED payload, not from the events log. Grounds are ONLY the structurally
25/// declared rejected-roads — never synthesized from prose.
26#[derive(Debug, Clone, PartialEq)]
27pub struct MigrationRecord {
28    pub source_key: String,
29    pub decision: String,
30    pub observe: String,
31    pub blame: Option<String>,
32    pub grounds: Vec<Ground>,
33}
34
35/// A `#<n>` / `R<n>` provenance token (issue or round id), leading-char + all-digits. Mirrors the
36/// `subject_refs` vocabulary in capture.rs but returns the FIRST `R<n>`/`#<n>` as a stable key.
37fn first_round_or_issue_token(text: &str) -> Option<String> {
38    text.split(|c: char| !(c.is_ascii_alphanumeric() || c == '#'))
39        .find(|tok| {
40            let rest = tok
41                .strip_prefix('#')
42                .or_else(|| tok.strip_prefix('R'))
43                .or_else(|| tok.strip_prefix('r'));
44            matches!(rest, Some(d) if !d.is_empty() && d.bytes().all(|b| b.is_ascii_digit()))
45        })
46        .map(|t| t.to_string())
47}
48
49/// Parse the structurally-declared rejected-roads out of a block's lines. A road is declared ONLY by
50/// an explicit `rejected: <option>: <why>` (or `reject <option>: <why>`) line — never inferred from
51/// prose. Returns one `rejected:<option>` ground per declared road, in source order. A block with no
52/// such line yields zero grounds (the honesty contract: no synthesis).
53fn structured_rejected_roads(block: &str) -> Vec<Ground> {
54    let mut out = Vec::new();
55    for line in block.lines() {
56        let l = line.trim_start_matches(['-', '*', ' ', '\t']).trim();
57        let body = l
58            .strip_prefix("rejected:")
59            .or_else(|| l.strip_prefix("rejected "))
60            .or_else(|| l.strip_prefix("reject:"))
61            .or_else(|| l.strip_prefix("reject "));
62        if let Some(rest) = body {
63            if let Some((opt, why)) = rest.split_once(':') {
64                let (opt, why) = (opt.trim(), why.trim());
65                if !opt.is_empty() && !why.is_empty() {
66                    out.push(Ground {
67                        claim: why.to_string(),
68                        supports: format!("rejected:{opt}"),
69                        check: None,
70                    });
71                }
72            }
73        }
74    }
75    out
76}
77
78/// Build one MigrationRecord from a parsed (key, decision) header + its block body: observe carries the
79/// source_key as durable provenance, grounds are the structurally-declared rejected-roads only (never
80/// synthesized), blame is left for the backfill's `--blame` fallback. Shared by all three block extractors.
81fn flush_record(header: &Option<(String, String)>, body: &str, out: &mut Vec<MigrationRecord>) {
82    if let Some((key, decision)) = header {
83        out.push(MigrationRecord {
84            source_key: key.clone(),
85            decision: decision.clone(),
86            observe: key.clone(),
87            blame: None,
88            grounds: structured_rejected_roads(body),
89        });
90    }
91}
92
93/// The store-side durable key for a tick: its `round_id` if present, else the first round/`#<n>` token
94/// in the hashed `observe` — never the non-hashed events log. Shared by the idempotency index + reconcile,
95/// so the two never disagree on key precedence.
96fn store_key(raw: &serde_json::Value) -> Option<String> {
97    raw.get("round_id")
98        .and_then(|x| x.as_str())
99        .map(|s| s.to_string())
100        .or_else(|| {
101            raw.get("observe")
102                .and_then(|x| x.as_str())
103                .and_then(first_round_or_issue_token)
104        })
105}
106
107/// Extractor 1 — **gitlog / chat-room**: each `## R<N> …` header is one decision; the header text
108/// after the round token (and an optional `— ` em-dash separator) is the decision; any structurally
109/// declared rejected-road line in that record's body becomes a ground. The `R<N>`/`#<n>` token is the
110/// source_key and is carried into observe as a durable provenance token. Reasons are NEVER NLP'd.
111pub fn extract_gitlog(text: &str) -> Vec<MigrationRecord> {
112    let mut records = Vec::new();
113    let mut header: Option<(String, String)> = None; // (source_key, decision)
114    let mut body = String::new();
115    for line in text.lines() {
116        if let Some(rest) = line.strip_prefix("## ") {
117            flush_record(&header, &body, &mut records);
118            body.clear();
119            let key = first_round_or_issue_token(rest);
120            // The decision text is the header with the leading round token stripped + em-dash trimmed.
121            let decision = match key.as_deref() {
122                Some(k) => rest
123                    .split_once(k)
124                    .map(|x| x.1)
125                    .unwrap_or(rest)
126                    .trim_start_matches([' ', '—', '-', ':'])
127                    .trim()
128                    .to_string(),
129                None => rest.trim().to_string(),
130            };
131            header = key.map(|k| {
132                (
133                    k,
134                    if decision.is_empty() {
135                        rest.trim().into()
136                    } else {
137                        decision
138                    },
139                )
140            });
141        } else {
142            body.push_str(line);
143            body.push('\n');
144        }
145    }
146    flush_record(&header, &body, &mut records);
147    records
148}
149
150/// The shared RESOLVED / FLAG block reader (the authority substrate). A `### RESOLVED <key>: <decision>`
151/// or `### FLAG <key>: <decision>` header opens a block; the block's body is scanned for structured
152/// rejected-roads only. RESOLVED marks a user-ruled decision; FLAG marks an open one — both are
153/// captured (the ruling state is provenance, not a reason to drop the record). PATH-PARAMETERIZED by
154/// the caller: `to-human` and `escalation` are the SAME reader over different files (no hardcoded
155/// layout). Returns records in source order.
156fn read_resolved_flag_blocks(text: &str) -> Vec<MigrationRecord> {
157    let mut records = Vec::new();
158    let mut header: Option<(String, String)> = None;
159    let mut body = String::new();
160    for line in text.lines() {
161        let stripped = line
162            .trim_start_matches(['#', ' '])
163            .strip_prefix("RESOLVED")
164            .or_else(|| line.trim_start_matches(['#', ' ']).strip_prefix("FLAG"));
165        if let Some(rest) = stripped {
166            flush_record(&header, &body, &mut records);
167            body.clear();
168            let rest = rest.trim();
169            // `<key>: <decision>` — the key is the leading token before the first colon.
170            if let Some((key, decision)) = rest.split_once(':') {
171                let key = key.trim();
172                let source_key = first_round_or_issue_token(key).unwrap_or_else(|| key.to_string());
173                header = Some((source_key, decision.trim().to_string()));
174            } else {
175                let source_key =
176                    first_round_or_issue_token(rest).unwrap_or_else(|| rest.to_string());
177                header = Some((source_key, rest.to_string()));
178            }
179        } else {
180            body.push_str(line);
181            body.push('\n');
182        }
183    }
184    flush_record(&header, &body, &mut records);
185    records
186}
187
188/// Extractor 2 — **to-human**: the RESOLVED/FLAG markdown blocks (the authority substrate).
189pub fn extract_to_human(text: &str) -> Vec<MigrationRecord> {
190    read_resolved_flag_blocks(text)
191}
192
193/// Extractor 4 — **escalation**: the SAME RESOLVED/FLAG reader, path-parameterized — escalation is
194/// just the reader over a different file, with NO hardcoded layout of its own.
195pub fn extract_escalation(text: &str) -> Vec<MigrationRecord> {
196    read_resolved_flag_blocks(text)
197}
198
199/// Extractor 3 — **decisions-immutable**: a document split on `## N.` / `## §N` section headers, one
200/// decision per numbered section. The section number is the source_key; the header text after the
201/// number is the decision; structured rejected-roads in the section body become grounds.
202pub fn extract_decisions_immutable(text: &str) -> Vec<MigrationRecord> {
203    let mut records = Vec::new();
204    let mut header: Option<(String, String)> = None;
205    let mut body = String::new();
206    for line in text.lines() {
207        if let Some(rest) = line.strip_prefix("## ") {
208            // A numbered section header: `## 3. <decision>` or `## §3 <decision>`.
209            let rest = rest.trim();
210            let digits: String = rest
211                .trim_start_matches('§')
212                .chars()
213                .take_while(|c| c.is_ascii_digit())
214                .collect();
215            if !digits.is_empty() {
216                flush_record(&header, &body, &mut records);
217                body.clear();
218                let decision = rest
219                    .trim_start_matches('§')
220                    .trim_start_matches(|c: char| c.is_ascii_digit())
221                    .trim_start_matches(['.', ' ', ':', '—', '-'])
222                    .trim()
223                    .to_string();
224                header = Some((format!("§{digits}"), decision));
225                continue;
226            }
227        }
228        body.push_str(line);
229        body.push('\n');
230    }
231    flush_record(&header, &body, &mut records);
232    records
233}
234
235/// The outcome of one backfill pass (idempotent): how many records were imported, skipped (already
236/// present by content-addressed id), re-linked (a back-dated mid-chain insert that re-parented), and
237/// how many were source-only gaps that could not be appended (e.g. a source lacking authors with no
238/// `--blame` fallback). Rendered by the command layer.
239#[derive(Debug, Default, PartialEq)]
240pub struct BackfillSummary {
241    pub imported: usize,
242    pub skipped: usize,
243    pub relinked: usize,
244    pub source_only_gaps: usize,
245}
246
247/// Map the store's existing decisions to their durable source key → (id, parent_id). The key is read
248/// from the HASHED payload: `round_id` if present, else the first round/#N token in `observe` — never
249/// from the non-hashed events log. This is the idempotency + re-link index for a backfill pass.
250fn store_key_index(
251    store: &Store,
252) -> Result<std::collections::HashMap<String, (String, String)>, String> {
253    let files = store
254        .read_all()
255        .map_err(|e| format!("reading store: {e}"))?;
256    let mut idx = std::collections::HashMap::new();
257    for (name, raw) in &files {
258        let key = store_key(raw);
259        let parent = raw
260            .get("parent_id")
261            .and_then(|x| x.as_str())
262            .unwrap_or("")
263            .to_string();
264        if let Some(k) = key {
265            idx.insert(k, (name.clone(), parent));
266        }
267    }
268    Ok(idx)
269}
270
271/// Run the idempotent backfill of `records` into the store at `repo`. Deterministic order: records
272/// are sorted by `source_key` first so a re-run replays the same chain. Idempotency is keyed on the
273/// durable `source_key` (carried into the hashed `observe` + the non-hashed `round_id`): a record
274/// whose key is already in the store is SKIPPED — chain-position-independent, so a re-run over a
275/// now-non-empty store writes nothing. The chain is kept by threading the PROSPECTIVE parent (the
276/// id we just wrote/found) instead of re-reading the live HEAD each step, so the lineage stays
277/// stable across re-runs. A skipped record whose stored parent differs from where it would now land
278/// is a back-dated mid-chain insert and is reported as re-linked. `blame_fallback` supplies the
279/// author for a record carrying none; a record with neither is a source-only gap (R5 stays intact —
280/// we never invent an author). `--dry-run` reports the would-import count but writes nothing.
281pub fn backfill(
282    repo: &Path,
283    mut records: Vec<MigrationRecord>,
284    blame_fallback: Option<&str>,
285    dry_run: bool,
286) -> Result<BackfillSummary, String> {
287    records.sort_by(|a, b| a.source_key.cmp(&b.source_key));
288    let store = Store::at(repo);
289    if !store.exists() {
290        return Err("no .evolving/ store here — run `ev init` first".into());
291    }
292    let existing = store_key_index(&store)?;
293    // The prospective parent threads through the loop so the chain stays coherent across this pass:
294    // for a brand-new store it begins at the live HEAD; as records resolve it advances to each id.
295    // For relink detection we compare a found record's STORED parent against where this sorted pass
296    // would place it (`prospective_parent`) — equal ⇒ the chain is intact (a clean re-run reports
297    // 0); different ⇒ the chain was re-linked around it (a back-dated mid-chain insert).
298    let head = store
299        .read_head()
300        .map_err(|e| format!("reading HEAD: {e}"))?;
301    // Seed the prospective parent: if the FIRST sorted record is already the genesis (stored
302    // parent ""), the pass replays from genesis; otherwise it extends the current HEAD.
303    let first_is_stored_genesis = records
304        .first()
305        .and_then(|r| existing.get(&r.source_key))
306        .map(|(_, p)| p.is_empty())
307        .unwrap_or(false);
308    let mut prospective_parent = if first_is_stored_genesis {
309        String::new()
310    } else {
311        head
312    };
313    let mut summary = BackfillSummary::default();
314    for r in records {
315        // Idempotency PRE-CHECK on the durable source_key (chain-position-independent).
316        if let Some((existing_id, existing_parent)) = existing.get(&r.source_key) {
317            // A back-dated mid-chain insert: present, but its stored parent differs from where this
318            // pass would now place it — the chain was re-linked around it. Reported, never rewritten.
319            if *existing_parent != prospective_parent {
320                summary.relinked += 1;
321            }
322            // Keep the chain coherent for any later records in this same pass.
323            prospective_parent = existing_id.clone();
324            summary.skipped += 1;
325            continue;
326        }
327        let blame = match r.blame.as_deref().or(blame_fallback) {
328            Some(b) if !b.trim().is_empty() => b.trim().to_string(),
329            _ => {
330                // R5 stays intact: no author, no fabrication. Surface the gap; never invent a human.
331                summary.source_only_gaps += 1;
332                continue;
333            }
334        };
335        if dry_run {
336            // The id this record WOULD take at the prospective parent (no write). held_since is
337            // non-hashed, so this matches the id `append` computes on a real run — only the real
338            // path needs a write, so the probe lives here, not on the hot import path.
339            let probe = Tick {
340                id: String::new(),
341                parent_id: prospective_parent.clone(),
342                observe: r.observe.clone(),
343                decision: r.decision.clone(),
344                grounds: r.grounds.clone(),
345                status: "live".into(),
346                held_since: String::new(),
347                blame: blame.clone(),
348                authority: None,
349                jurisdiction: None,
350                round_id: Some(r.source_key.clone()),
351            };
352            prospective_parent = compute_id(&probe);
353            summary.imported += 1;
354            continue;
355        }
356        let written = crate::capture::append(
357            repo,
358            Decision {
359                observe: r.observe,
360                decision: r.decision,
361                grounds: r.grounds,
362                blame,
363                authority: None,
364                jurisdiction: None,
365                round_id: Some(r.source_key),
366            },
367        )?;
368        prospective_parent = written.id;
369        summary.imported += 1;
370    }
371    Ok(summary)
372}
373
374/// A reconcile bucket count: how many source rulings are IN BOTH the source and the store, how many
375/// are SOURCE-ONLY (the capture gap — a ruling the source has that the ledger never captured), how
376/// many are STORE-ONLY (in the ledger, absent from this source), and how many store ticks could not
377/// be keyed at all (no round token in their hashed observe). Keys come from the HASHED `observe` /
378/// `round_id`, never from events.jsonl, so they are durable.
379#[derive(Debug, Default, PartialEq)]
380pub struct ReconcileReport {
381    pub in_both: usize,
382    pub source_only: usize,
383    pub store_only: usize,
384    pub un_keyable: usize,
385}
386
387/// Reconcile a source's extracted records against the store. The store-side key is read from each
388/// tick's HASHED payload — its `round_id` if present, else the first round/#N token in `observe` —
389/// so the join is durable (NOT dependent on the non-hashed events log). A source key with no store
390/// match is a SOURCE-ONLY gap (the capture gap to surface); a store key with no source match is
391/// STORE-ONLY; a store tick with no derivable key is counted separately as un-keyable.
392pub fn reconcile(
393    repo: &Path,
394    source_records: &[MigrationRecord],
395) -> Result<ReconcileReport, String> {
396    let store = Store::at(repo);
397    if !store.exists() {
398        return Err("no .evolving/ store here — run `ev init` first".into());
399    }
400    let files = store
401        .read_all()
402        .map_err(|e| format!("reading store: {e}"))?;
403    let mut store_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
404    let mut un_keyable = 0usize;
405    for (_name, raw) in &files {
406        let key = store_key(raw);
407        match key {
408            Some(k) => {
409                store_keys.insert(k);
410            }
411            None => un_keyable += 1,
412        }
413    }
414    let source_keys: std::collections::HashSet<String> = source_records
415        .iter()
416        .map(|r| r.source_key.clone())
417        .collect();
418    let mut report = ReconcileReport {
419        un_keyable,
420        ..Default::default()
421    };
422    for k in &source_keys {
423        if store_keys.contains(k) {
424            report.in_both += 1;
425        } else {
426            report.source_only += 1;
427        }
428    }
429    report.store_only = store_keys
430        .iter()
431        .filter(|k| !source_keys.contains(*k))
432        .count();
433    Ok(report)
434}
435
436/// The `--bind-check` harvest: build a harvested `Check::Test` (counter_test None, full liveness) for
437/// the given selector, reusing the Task-5 migrate-only constructor. This is the SAME constructor the
438/// harvested-binding path uses — no second half-harvest gate. The caller attaches it to a ground.
439pub fn bind_check(
440    selector: String,
441    verified_at_sha: String,
442    platforms: Vec<String>,
443    triggered_by: Vec<String>,
444    surfaces: Vec<String>,
445) -> Result<crate::tick::Check, String> {
446    harvested_test_check(selector, verified_at_sha, platforms, triggered_by, surfaces)
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452
453    #[test]
454    fn extract_gitlog_should_yield_one_record_per_round_header_when_given_a_chat_room_log() {
455        // given: a chat-room log with two `## R<N>` decision records, one carrying a rejected road
456        let text = "\
457## R2289 QA — restore-safety counter DB-backed
458- rejected: Redis: would add a new infra dependency
459## R2290 Dev — ship the cross-pod drain
460some prose nobody parses for grounds
461";
462
463        // when: the gitlog extractor reads it
464        let recs = extract_gitlog(text);
465
466        // then: two records, keyed by their round token, the first carrying the structured road
467        assert_eq!(recs.len(), 2);
468        assert_eq!(recs[0].source_key, "R2289");
469        assert_eq!(recs[0].decision, "QA — restore-safety counter DB-backed");
470        assert_eq!(recs[0].grounds.len(), 1);
471        assert_eq!(recs[0].grounds[0].supports, "rejected:Redis");
472        assert_eq!(recs[1].source_key, "R2290");
473        assert!(recs[0].observe.contains("R2289"));
474    }
475
476    #[test]
477    fn extract_to_human_should_read_a_resolved_block_when_given_the_authority_substrate() {
478        // given: a to-human doc with a RESOLVED ruling and a FLAG (open) one
479        let text = "\
480### RESOLVED R555: restore-safety counter DB-backed; reject Redis
481- rejected: Redis: a new infra dependency
482### FLAG R600: multi-pod relax policy still open
483";
484
485        // when: the to-human extractor reads it
486        let recs = extract_to_human(text);
487
488        // then: both blocks are captured; the RESOLVED one carries its structured road
489        assert_eq!(recs.len(), 2);
490        assert_eq!(recs[0].source_key, "R555");
491        assert_eq!(
492            recs[0].decision,
493            "restore-safety counter DB-backed; reject Redis"
494        );
495        assert_eq!(recs[0].grounds.len(), 1);
496        assert_eq!(recs[1].source_key, "R600");
497    }
498
499    #[test]
500    fn extract_escalation_should_reuse_the_resolved_flag_reader_when_given_an_escalation_log() {
501        // given: an escalation log in the SAME RESOLVED/FLAG shape (path-parameterized reader)
502        let text = "### FLAG #1194: re-milestoned without sign-off\n";
503
504        // when: the escalation extractor reads it
505        let recs = extract_escalation(text);
506
507        // then: it is read identically to to-human (no hardcoded layout of its own)
508        assert_eq!(recs.len(), 1);
509        assert_eq!(recs[0].source_key, "#1194");
510        assert_eq!(recs[0].decision, "re-milestoned without sign-off");
511    }
512
513    #[test]
514    fn extract_decisions_immutable_should_split_on_numbered_sections_when_given_a_doc() {
515        // given: a decisions-immutable doc split into numbered sections
516        let text = "\
517## 1. freeze the retrieval schema for v2
518- rejected: pgvector: would lock our schema
519## 2. restore-safety counter DB-backed
520";
521
522        // when: the decisions-immutable extractor reads it
523        let recs = extract_decisions_immutable(text);
524
525        // then: one record per section, keyed by §N, the first carrying its structured road
526        assert_eq!(recs.len(), 2);
527        assert_eq!(recs[0].source_key, "§1");
528        assert_eq!(recs[0].decision, "freeze the retrieval schema for v2");
529        assert_eq!(recs[0].grounds.len(), 1);
530        assert_eq!(recs[1].source_key, "§2");
531    }
532
533    #[test]
534    fn grounds_are_never_synthesized_when_a_block_has_no_structured_rejected_road() {
535        // given: a record whose body is pure prose mentioning a rejected option WITHOUT the
536        // structured `rejected:<opt>: <why>` token — an NLP'able sentence we must NOT mine
537        let text = "\
538## R2289 we considered Redis but rejected it because it adds infra
539this paragraph explains at length why redis was rejected, in prose
540";
541
542        // when: the gitlog extractor reads it
543        let recs = extract_gitlog(text);
544
545        // then: the record exists but carries ZERO grounds — reasons are never NLP'd into grounds
546        assert_eq!(recs.len(), 1);
547        assert!(
548            recs[0].grounds.is_empty(),
549            "a prose reason must NEVER become a ground (no synthesis)"
550        );
551    }
552}