Skip to main content

ev/
migrate.rs

1//! `ev migrate` — backfill an existing decision history into the ledger.
2//!
3//! Four PURE, format-aware extractors turn a source substrate (`&str`) into a `Vec<MigrationRecord>`:
4//! a chat-room/git log (`## R<N>` records), the `to-human` RESOLVED/FLAG markdown blocks (the
5//! authority substrate), a `decisions-immutable` §N document, and an `escalation` log (the SAME
6//! RESOLVED/FLAG reader, path-parameterized). The extractors parse **rulings + structured
7//! rejected-roads only** — they NEVER NLP a free-text reason into a ground (`grounds_are_never_
8//! synthesized`): a road becomes a ground iff the source declares it structurally (a `rejected:`
9//! token), otherwise the record carries zero grounds and stays an honest capture.
10//!
11//! The command driver then runs an IDEMPOTENT backfill loop (deterministic source_key sort →
12//! prospective-parent compute_id → ticks_dir pre-check → skip-if-present) on top of the shared
13//! `capture::append`, plus a `--reconcile` join and a `--bind-check` harvest.
14
15use crate::canonical::compute_id;
16use crate::capture::{harvested_test_check, Decision};
17use crate::store::Store;
18use crate::tick::{Ground, Tick};
19use std::collections::HashMap;
20use std::path::Path;
21
22/// One extracted, not-yet-appended decision from a source substrate. `source_key` is the stable,
23/// deterministic dedup/sort key (e.g. `R2289`, `#555`, `§3`) used to order the backfill and to
24/// reconcile against the store; `observe` carries that key as a durable token so reconcile can read
25/// it back from the HASHED payload, not from the events log. Grounds are ONLY the structurally
26/// declared rejected-roads — never synthesized from prose.
27#[derive(Debug, Clone, PartialEq)]
28pub struct MigrationRecord {
29    pub source_key: String,
30    pub decision: String,
31    pub observe: String,
32    pub blame: Option<String>,
33    pub grounds: Vec<Ground>,
34}
35
36/// A `#<n>` / `R<n>` provenance token (issue or round id), leading-char + all-digits. Mirrors the
37/// `subject_refs` vocabulary in capture.rs but returns the FIRST `R<n>`/`#<n>` as a stable key.
38fn first_round_or_issue_token(text: &str) -> Option<String> {
39    text.split(|c: char| !(c.is_ascii_alphanumeric() || c == '#'))
40        .find(|tok| {
41            let rest = tok
42                .strip_prefix('#')
43                .or_else(|| tok.strip_prefix('R'))
44                .or_else(|| tok.strip_prefix('r'));
45            matches!(rest, Some(d) if !d.is_empty() && d.bytes().all(|b| b.is_ascii_digit()))
46        })
47        .map(|t| t.to_string())
48}
49
50/// Parse the structurally-declared rejected-roads out of a block's lines. A road is declared ONLY by
51/// an explicit `rejected: <option>: <why>` (or `reject <option>: <why>`) line — never inferred from
52/// prose. Returns one `rejected:<option>` ground per declared road, in source order. A block with no
53/// such line yields zero grounds (the honesty contract: no synthesis).
54fn structured_rejected_roads(block: &str) -> Vec<Ground> {
55    let mut out = Vec::new();
56    for line in block.lines() {
57        let l = line.trim_start_matches(['-', '*', ' ', '\t']).trim();
58        let body = l
59            .strip_prefix("rejected:")
60            .or_else(|| l.strip_prefix("rejected "))
61            .or_else(|| l.strip_prefix("reject:"))
62            .or_else(|| l.strip_prefix("reject "));
63        if let Some(rest) = body {
64            if let Some((opt, why)) = rest.split_once(':') {
65                let (opt, why) = (opt.trim(), why.trim());
66                if !opt.is_empty() && !why.is_empty() {
67                    out.push(Ground {
68                        claim: why.to_string(),
69                        supports: format!("rejected:{opt}"),
70                        check: None,
71                    });
72                }
73            }
74        }
75    }
76    out
77}
78
79/// Build one MigrationRecord from a parsed (key, decision) header + its block body: observe carries the
80/// source_key as durable provenance, grounds are the structurally-declared rejected-roads only (never
81/// synthesized), blame is left for the backfill's `--blame` fallback. Shared by all three block extractors.
82fn flush_record(header: &Option<(String, String)>, body: &str, out: &mut Vec<MigrationRecord>) {
83    if let Some((key, decision)) = header {
84        out.push(MigrationRecord {
85            source_key: key.clone(),
86            decision: decision.clone(),
87            observe: key.clone(),
88            blame: None,
89            grounds: structured_rejected_roads(body),
90        });
91    }
92}
93
94/// The store-side durable key for a tick: its `round_id` if present, else the first round/`#<n>` token
95/// in the hashed `observe` — never the non-hashed events log. Shared by the idempotency index + reconcile,
96/// so the two never disagree on key precedence.
97fn store_key(raw: &serde_json::Value) -> Option<String> {
98    raw.get("round_id")
99        .and_then(|x| x.as_str())
100        .map(|s| s.to_string())
101        .or_else(|| {
102            raw.get("observe")
103                .and_then(|x| x.as_str())
104                .and_then(first_round_or_issue_token)
105        })
106}
107
108/// Extractor 1 — **gitlog / chat-room**: each `## R<N> …` header is one decision; the header text
109/// after the round token (and an optional `— ` em-dash separator) is the decision; any structurally
110/// declared rejected-road line in that record's body becomes a ground. The `R<N>`/`#<n>` token is the
111/// source_key and is carried into observe as a durable provenance token. Reasons are NEVER NLP'd.
112pub fn extract_gitlog(text: &str) -> Vec<MigrationRecord> {
113    let mut records = Vec::new();
114    let mut header: Option<(String, String)> = None; // (source_key, decision)
115    let mut body = String::new();
116    for line in text.lines() {
117        if let Some(rest) = line.strip_prefix("## ") {
118            flush_record(&header, &body, &mut records);
119            body.clear();
120            let key = first_round_or_issue_token(rest);
121            // The decision text is the header with the leading round token stripped + em-dash trimmed.
122            let decision = match key.as_deref() {
123                Some(k) => rest
124                    .split_once(k)
125                    .map(|x| x.1)
126                    .unwrap_or(rest)
127                    .trim_start_matches([' ', '—', '-', ':'])
128                    .trim()
129                    .to_string(),
130                None => rest.trim().to_string(),
131            };
132            header = key.map(|k| {
133                (
134                    k,
135                    if decision.is_empty() {
136                        rest.trim().into()
137                    } else {
138                        decision
139                    },
140                )
141            });
142        } else {
143            body.push_str(line);
144            body.push('\n');
145        }
146    }
147    flush_record(&header, &body, &mut records);
148    records
149}
150
151/// The shared RESOLVED / FLAG block reader (the authority substrate). A `### RESOLVED <key>: <decision>`
152/// or `### FLAG <key>: <decision>` header opens a block; the block's body is scanned for structured
153/// rejected-roads only. RESOLVED marks a user-ruled decision; FLAG marks an open one — both are
154/// captured (the ruling state is provenance, not a reason to drop the record). PATH-PARAMETERIZED by
155/// the caller: `to-human` and `escalation` are the SAME reader over different files (no hardcoded
156/// layout). Returns records in source order.
157fn read_resolved_flag_blocks(text: &str) -> Vec<MigrationRecord> {
158    let mut records = Vec::new();
159    let mut header: Option<(String, String)> = None;
160    let mut body = String::new();
161    for line in text.lines() {
162        let stripped = line
163            .trim_start_matches(['#', ' '])
164            .strip_prefix("RESOLVED")
165            .or_else(|| line.trim_start_matches(['#', ' ']).strip_prefix("FLAG"));
166        if let Some(rest) = stripped {
167            flush_record(&header, &body, &mut records);
168            body.clear();
169            let rest = rest.trim();
170            // `<key>: <decision>` — the key is the leading token before the first colon.
171            if let Some((key, decision)) = rest.split_once(':') {
172                let key = key.trim();
173                let source_key = first_round_or_issue_token(key).unwrap_or_else(|| key.to_string());
174                header = Some((source_key, decision.trim().to_string()));
175            } else {
176                let source_key =
177                    first_round_or_issue_token(rest).unwrap_or_else(|| rest.to_string());
178                header = Some((source_key, rest.to_string()));
179            }
180        } else {
181            body.push_str(line);
182            body.push('\n');
183        }
184    }
185    flush_record(&header, &body, &mut records);
186    records
187}
188
189/// Extractor 2 — **to-human**: the RESOLVED/FLAG markdown blocks (the authority substrate).
190pub fn extract_to_human(text: &str) -> Vec<MigrationRecord> {
191    read_resolved_flag_blocks(text)
192}
193
194/// Extractor 4 — **escalation**: the SAME RESOLVED/FLAG reader, path-parameterized — escalation is
195/// just the reader over a different file, with NO hardcoded layout of its own.
196pub fn extract_escalation(text: &str) -> Vec<MigrationRecord> {
197    read_resolved_flag_blocks(text)
198}
199
200/// Extractor 3 — **decisions-immutable**: a document split on `## N.` / `## §N` section headers, one
201/// decision per numbered section. The section number is the source_key; the header text after the
202/// number is the decision; structured rejected-roads in the section body become grounds.
203pub fn extract_decisions_immutable(text: &str) -> Vec<MigrationRecord> {
204    let mut records = Vec::new();
205    let mut header: Option<(String, String)> = None;
206    let mut body = String::new();
207    for line in text.lines() {
208        if let Some(rest) = line.strip_prefix("## ") {
209            // A numbered section header: `## 3. <decision>` or `## §3 <decision>`.
210            let rest = rest.trim();
211            let digits: String = rest
212                .trim_start_matches('§')
213                .chars()
214                .take_while(|c| c.is_ascii_digit())
215                .collect();
216            if !digits.is_empty() {
217                flush_record(&header, &body, &mut records);
218                body.clear();
219                let decision = rest
220                    .trim_start_matches('§')
221                    .trim_start_matches(|c: char| c.is_ascii_digit())
222                    .trim_start_matches(['.', ' ', ':', '—', '-'])
223                    .trim()
224                    .to_string();
225                header = Some((format!("§{digits}"), decision));
226                continue;
227            }
228        }
229        body.push_str(line);
230        body.push('\n');
231    }
232    flush_record(&header, &body, &mut records);
233    records
234}
235
236/// The outcome of one backfill pass (idempotent): how many records were imported, skipped (already
237/// present by content-addressed id), re-linked (a back-dated mid-chain insert that re-parented), and
238/// how many were source-only gaps that could not be appended (e.g. a source lacking authors with no
239/// `--blame` fallback). Rendered by the command layer.
240#[derive(Debug, Default, PartialEq)]
241pub struct BackfillSummary {
242    pub imported: usize,
243    pub skipped: usize,
244    pub relinked: usize,
245    pub source_only_gaps: usize,
246}
247
248/// Map the store's existing decisions to their durable source key → (id, parent_id). The key is read
249/// from the HASHED payload: `round_id` if present, else the first round/#N token in `observe` — never
250/// from the non-hashed events log. This is the idempotency + re-link index for a backfill pass.
251fn store_key_index(
252    store: &Store,
253) -> Result<std::collections::HashMap<String, (String, String)>, String> {
254    let files = store
255        .read_all()
256        .map_err(|e| format!("reading store: {e}"))?;
257    let mut idx = std::collections::HashMap::new();
258    for (name, raw) in &files {
259        let key = store_key(raw);
260        let parent = raw
261            .get("parent_id")
262            .and_then(|x| x.as_str())
263            .unwrap_or("")
264            .to_string();
265        if let Some(k) = key {
266            idx.insert(k, (name.clone(), parent));
267        }
268    }
269    Ok(idx)
270}
271
272/// Run the idempotent backfill of `records` into the store at `repo`. Deterministic order: records
273/// are sorted by `source_key` first so a re-run replays the same chain. Idempotency is keyed on the
274/// durable `source_key` (carried into the hashed `observe` + the non-hashed `round_id`): a record
275/// whose key is already in the store is SKIPPED — chain-position-independent, so a re-run over a
276/// now-non-empty store writes nothing. The chain is kept by threading the PROSPECTIVE parent (the
277/// id we just wrote/found) instead of re-reading the live HEAD each step, so the lineage stays
278/// stable across re-runs. A skipped record whose stored parent differs from where it would now land
279/// is a back-dated mid-chain insert and is reported as re-linked. `blame_fallback` supplies the
280/// author for a record carrying none; a record with neither is a source-only gap (R5 stays intact —
281/// we never invent an author). `jurisdiction_map` (source_key → A/B/C/D bucket) tags each imported
282/// decision: a record whose key is in the map carries that jurisdiction, one absent imports untagged
283/// (None) — so the map is purely additive (an empty map ⇒ every record None, the prior behavior).
284/// jurisdiction is NON-hashed, so tagging never moves a tick id (idempotency holds across re-runs).
285/// `--dry-run` reports the would-import count but writes nothing.
286pub fn backfill(
287    repo: &Path,
288    mut records: Vec<MigrationRecord>,
289    blame_fallback: Option<&str>,
290    jurisdiction_map: &HashMap<String, String>,
291    dry_run: bool,
292) -> Result<BackfillSummary, String> {
293    records.sort_by(|a, b| a.source_key.cmp(&b.source_key));
294    let store = Store::at(repo);
295    if !store.exists() {
296        return Err("no .evolving/ store here — run `ev init` first".into());
297    }
298    let existing = store_key_index(&store)?;
299    // The prospective parent threads through the loop so the chain stays coherent across this pass:
300    // for a brand-new store it begins at the live HEAD; as records resolve it advances to each id.
301    // For relink detection we compare a found record's STORED parent against where this sorted pass
302    // would place it (`prospective_parent`) — equal ⇒ the chain is intact (a clean re-run reports
303    // 0); different ⇒ the chain was re-linked around it (a back-dated mid-chain insert).
304    let head = store
305        .read_head()
306        .map_err(|e| format!("reading HEAD: {e}"))?;
307    // Seed the prospective parent: if the FIRST sorted record is already the genesis (stored
308    // parent ""), the pass replays from genesis; otherwise it extends the current HEAD.
309    let first_is_stored_genesis = records
310        .first()
311        .and_then(|r| existing.get(&r.source_key))
312        .map(|(_, p)| p.is_empty())
313        .unwrap_or(false);
314    let mut prospective_parent = if first_is_stored_genesis {
315        String::new()
316    } else {
317        head
318    };
319    let mut summary = BackfillSummary::default();
320    for r in records {
321        // Idempotency PRE-CHECK on the durable source_key (chain-position-independent).
322        if let Some((existing_id, existing_parent)) = existing.get(&r.source_key) {
323            // A back-dated mid-chain insert: present, but its stored parent differs from where this
324            // pass would now place it — the chain was re-linked around it. Reported, never rewritten.
325            if *existing_parent != prospective_parent {
326                summary.relinked += 1;
327            }
328            // Keep the chain coherent for any later records in this same pass.
329            prospective_parent = existing_id.clone();
330            summary.skipped += 1;
331            continue;
332        }
333        let blame = match r.blame.as_deref().or(blame_fallback) {
334            Some(b) if !b.trim().is_empty() => b.trim().to_string(),
335            _ => {
336                // R5 stays intact: no author, no fabrication. Surface the gap; never invent a human.
337                summary.source_only_gaps += 1;
338                continue;
339            }
340        };
341        if dry_run {
342            // The id this record WOULD take at the prospective parent (no write). held_since is
343            // non-hashed, so this matches the id `append` computes on a real run — only the real
344            // path needs a write, so the probe lives here, not on the hot import path.
345            let probe = Tick {
346                id: String::new(),
347                parent_id: prospective_parent.clone(),
348                observe: r.observe.clone(),
349                decision: r.decision.clone(),
350                grounds: r.grounds.clone(),
351                status: "live".into(),
352                held_since: String::new(),
353                blame: blame.clone(),
354                authority: None,
355                jurisdiction: jurisdiction_map.get(&r.source_key).cloned(),
356                round_id: Some(r.source_key.clone()),
357            };
358            prospective_parent = compute_id(&probe);
359            summary.imported += 1;
360            continue;
361        }
362        let jurisdiction = jurisdiction_map.get(&r.source_key).cloned();
363        let written = crate::capture::append(
364            repo,
365            Decision {
366                observe: r.observe,
367                decision: r.decision,
368                grounds: r.grounds,
369                blame,
370                authority: None,
371                jurisdiction,
372                round_id: Some(r.source_key),
373            },
374        )?;
375        prospective_parent = written.id;
376        summary.imported += 1;
377    }
378    Ok(summary)
379}
380
381/// A reconcile bucket count: how many source rulings are IN BOTH the source and the store, how many
382/// are SOURCE-ONLY (the capture gap — a ruling the source has that the ledger never captured), how
383/// many are STORE-ONLY (in the ledger, absent from this source), and how many store ticks could not
384/// be keyed at all (no round token in their hashed observe). Keys come from the HASHED `observe` /
385/// `round_id`, never from events.jsonl, so they are durable.
386#[derive(Debug, Default, PartialEq)]
387pub struct ReconcileReport {
388    pub in_both: usize,
389    pub source_only: usize,
390    pub store_only: usize,
391    pub un_keyable: usize,
392}
393
394/// Reconcile a source's extracted records against the store. The store-side key is read from each
395/// tick's HASHED payload — its `round_id` if present, else the first round/#N token in `observe` —
396/// so the join is durable (NOT dependent on the non-hashed events log). A source key with no store
397/// match is a SOURCE-ONLY gap (the capture gap to surface); a store key with no source match is
398/// STORE-ONLY; a store tick with no derivable key is counted separately as un-keyable.
399pub fn reconcile(
400    repo: &Path,
401    source_records: &[MigrationRecord],
402) -> Result<ReconcileReport, String> {
403    let store = Store::at(repo);
404    if !store.exists() {
405        return Err("no .evolving/ store here — run `ev init` first".into());
406    }
407    let files = store
408        .read_all()
409        .map_err(|e| format!("reading store: {e}"))?;
410    let mut store_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
411    let mut un_keyable = 0usize;
412    for (_name, raw) in &files {
413        let key = store_key(raw);
414        match key {
415            Some(k) => {
416                store_keys.insert(k);
417            }
418            None => un_keyable += 1,
419        }
420    }
421    let source_keys: std::collections::HashSet<String> = source_records
422        .iter()
423        .map(|r| r.source_key.clone())
424        .collect();
425    let mut report = ReconcileReport {
426        un_keyable,
427        ..Default::default()
428    };
429    for k in &source_keys {
430        if store_keys.contains(k) {
431            report.in_both += 1;
432        } else {
433            report.source_only += 1;
434        }
435    }
436    report.store_only = store_keys
437        .iter()
438        .filter(|k| !source_keys.contains(*k))
439        .count();
440    Ok(report)
441}
442
443/// The `--bind-check` harvest: build a harvested `Check::Test` (counter_test None, full liveness) for
444/// the given selector, reusing the Task-5 migrate-only constructor. This is the SAME constructor the
445/// harvested-binding path uses — no second half-harvest gate. The caller attaches it to a ground.
446pub fn bind_check(
447    selector: String,
448    verified_at_sha: String,
449    platforms: Vec<String>,
450    triggered_by: Vec<String>,
451    surfaces: Vec<String>,
452) -> Result<crate::tick::Check, String> {
453    harvested_test_check(selector, verified_at_sha, platforms, triggered_by, surfaces)
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459
460    #[test]
461    fn extract_gitlog_should_yield_one_record_per_round_header_when_given_a_chat_room_log() {
462        // given: a chat-room log with two `## R<N>` decision records, one carrying a rejected road
463        let text = "\
464## R2289 QA — restore-safety counter DB-backed
465- rejected: Redis: would add a new infra dependency
466## R2290 Dev — ship the cross-pod drain
467some prose nobody parses for grounds
468";
469
470        // when: the gitlog extractor reads it
471        let recs = extract_gitlog(text);
472
473        // then: two records, keyed by their round token, the first carrying the structured road
474        assert_eq!(recs.len(), 2);
475        assert_eq!(recs[0].source_key, "R2289");
476        assert_eq!(recs[0].decision, "QA — restore-safety counter DB-backed");
477        assert_eq!(recs[0].grounds.len(), 1);
478        assert_eq!(recs[0].grounds[0].supports, "rejected:Redis");
479        assert_eq!(recs[1].source_key, "R2290");
480        assert!(recs[0].observe.contains("R2289"));
481    }
482
483    #[test]
484    fn extract_to_human_should_read_a_resolved_block_when_given_the_authority_substrate() {
485        // given: a to-human doc with a RESOLVED ruling and a FLAG (open) one
486        let text = "\
487### RESOLVED R555: restore-safety counter DB-backed; reject Redis
488- rejected: Redis: a new infra dependency
489### FLAG R600: multi-pod relax policy still open
490";
491
492        // when: the to-human extractor reads it
493        let recs = extract_to_human(text);
494
495        // then: both blocks are captured; the RESOLVED one carries its structured road
496        assert_eq!(recs.len(), 2);
497        assert_eq!(recs[0].source_key, "R555");
498        assert_eq!(
499            recs[0].decision,
500            "restore-safety counter DB-backed; reject Redis"
501        );
502        assert_eq!(recs[0].grounds.len(), 1);
503        assert_eq!(recs[1].source_key, "R600");
504    }
505
506    #[test]
507    fn extract_escalation_should_reuse_the_resolved_flag_reader_when_given_an_escalation_log() {
508        // given: an escalation log in the SAME RESOLVED/FLAG shape (path-parameterized reader)
509        let text = "### FLAG #1194: re-milestoned without sign-off\n";
510
511        // when: the escalation extractor reads it
512        let recs = extract_escalation(text);
513
514        // then: it is read identically to to-human (no hardcoded layout of its own)
515        assert_eq!(recs.len(), 1);
516        assert_eq!(recs[0].source_key, "#1194");
517        assert_eq!(recs[0].decision, "re-milestoned without sign-off");
518    }
519
520    #[test]
521    fn extract_decisions_immutable_should_split_on_numbered_sections_when_given_a_doc() {
522        // given: a decisions-immutable doc split into numbered sections
523        let text = "\
524## 1. freeze the retrieval schema for v2
525- rejected: pgvector: would lock our schema
526## 2. restore-safety counter DB-backed
527";
528
529        // when: the decisions-immutable extractor reads it
530        let recs = extract_decisions_immutable(text);
531
532        // then: one record per section, keyed by §N, the first carrying its structured road
533        assert_eq!(recs.len(), 2);
534        assert_eq!(recs[0].source_key, "§1");
535        assert_eq!(recs[0].decision, "freeze the retrieval schema for v2");
536        assert_eq!(recs[0].grounds.len(), 1);
537        assert_eq!(recs[1].source_key, "§2");
538    }
539
540    #[test]
541    fn grounds_are_never_synthesized_when_a_block_has_no_structured_rejected_road() {
542        // given: a record whose body is pure prose mentioning a rejected option WITHOUT the
543        // structured `rejected:<opt>: <why>` token — an NLP'able sentence we must NOT mine
544        let text = "\
545## R2289 we considered Redis but rejected it because it adds infra
546this paragraph explains at length why redis was rejected, in prose
547";
548
549        // when: the gitlog extractor reads it
550        let recs = extract_gitlog(text);
551
552        // then: the record exists but carries ZERO grounds — reasons are never NLP'd into grounds
553        assert_eq!(recs.len(), 1);
554        assert!(
555            recs[0].grounds.is_empty(),
556            "a prose reason must NEVER become a ground (no synthesis)"
557        );
558    }
559}