Skip to main content

ev/
migrate.rs

1//! `ev migrate` — backfill an existing decision history into the ledger.
2//!
3//! Four PURE, format-aware extractors turn a source substrate (`&str`) into a `Vec<MigrationRecord>`:
4//! a chat-room/git log (`## R<N>` records), the `to-human` RESOLVED/FLAG markdown blocks (the
5//! authority substrate), a `decisions-immutable` §N document, and an `escalation` log (the SAME
6//! RESOLVED/FLAG reader, path-parameterized). The extractors parse **rulings + structured
7//! rejected-roads only** — they NEVER NLP a free-text reason into a ground (`grounds_are_never_
8//! synthesized`): a road becomes a ground iff the source declares it structurally (a `rejected:`
9//! token), otherwise the record carries zero grounds and stays an honest capture.
10//!
11//! The command driver then runs an IDEMPOTENT backfill loop (deterministic source_key sort →
12//! prospective-parent compute_id → ticks_dir pre-check → skip-if-present) on top of the shared
13//! `capture::append`, plus a `--reconcile` join and a `--bind-check` harvest.
14
15use crate::canonical::compute_id;
16use crate::capture::{harvested_test_check, Decision};
17use crate::store::Store;
18use crate::tick::{Ground, Tick};
19use std::collections::HashMap;
20use std::path::Path;
21
22/// One extracted, not-yet-appended decision from a source substrate. `source_key` is the stable,
23/// deterministic dedup/sort key (e.g. `R2289`, `#555`, `§3`) used to order the backfill and to
24/// reconcile against the store; `observe` carries that key as a durable token so reconcile can read
25/// it back from the HASHED payload, not from the events log. Grounds are ONLY the structurally
26/// declared rejected-roads — never synthesized from prose.
27#[derive(Debug, Clone, PartialEq)]
28pub struct MigrationRecord {
29    pub source_key: String,
30    pub decision: String,
31    pub observe: String,
32    pub blame: Option<String>,
33    pub grounds: Vec<Ground>,
34    // The bookkeeping tags a producer may declare. The four built-in extractors leave them at the
35    // legacy defaults (authority None, jurisdiction None — so the `--jurisdiction-map` fills it —,
36    // source_ref = the source_key token, provenance None); the canonical reader populates them from the
37    // wire record so an imported ruling lands with its true authority / jurisdiction / provenance.
38    pub authority: Option<String>,
39    pub jurisdiction: Option<String>,
40    pub source_ref: Option<serde_json::Value>,
41    pub provenance: Option<String>,
42}
43
44/// A `#<n>` / `R<n>` provenance token (issue or round id), leading-char + all-digits. Mirrors the
45/// `subject_refs` vocabulary in capture.rs but returns the FIRST `R<n>`/`#<n>` as a stable key.
46fn first_round_or_issue_token(text: &str) -> Option<String> {
47    text.split(|c: char| !(c.is_ascii_alphanumeric() || c == '#'))
48        .find(|tok| {
49            let rest = tok
50                .strip_prefix('#')
51                .or_else(|| tok.strip_prefix('R'))
52                .or_else(|| tok.strip_prefix('r'));
53            matches!(rest, Some(d) if !d.is_empty() && d.bytes().all(|b| b.is_ascii_digit()))
54        })
55        .map(|t| t.to_string())
56}
57
58/// Parse the structurally-declared rejected-roads out of a block's lines. A road is declared ONLY by
59/// an explicit `rejected: <option>: <why>` (or `reject <option>: <why>`) line — never inferred from
60/// prose. Returns one `rejected:<option>` ground per declared road, in source order. A block with no
61/// such line yields zero grounds (the honesty contract: no synthesis).
62fn structured_rejected_roads(block: &str) -> Vec<Ground> {
63    let mut out = Vec::new();
64    for line in block.lines() {
65        let l = line.trim_start_matches(['-', '*', ' ', '\t']).trim();
66        let body = l
67            .strip_prefix("rejected:")
68            .or_else(|| l.strip_prefix("rejected "))
69            .or_else(|| l.strip_prefix("reject:"))
70            .or_else(|| l.strip_prefix("reject "));
71        if let Some(rest) = body {
72            if let Some((opt, why)) = rest.split_once(':') {
73                let (opt, why) = (opt.trim(), why.trim());
74                if !opt.is_empty() && !why.is_empty() {
75                    out.push(Ground {
76                        claim: why.to_string(),
77                        supports: format!("rejected:{opt}"),
78                        check: None,
79                    });
80                }
81            }
82        }
83    }
84    out
85}
86
87/// Build one MigrationRecord from a parsed (key, decision) header + its block body: observe carries the
88/// source_key as durable provenance, grounds are the structurally-declared rejected-roads only (never
89/// synthesized), blame is left for the backfill's `--blame` fallback. Shared by all three block extractors.
90fn flush_record(header: &Option<(String, String)>, body: &str, out: &mut Vec<MigrationRecord>) {
91    if let Some((key, decision)) = header {
92        out.push(MigrationRecord {
93            source_key: key.clone(),
94            decision: decision.clone(),
95            observe: key.clone(),
96            blame: None,
97            grounds: structured_rejected_roads(body),
98            // Legacy defaults: no inline authority/provenance, source_ref = the source_key token, and
99            // jurisdiction left None so the `--jurisdiction-map` remains the sole tagger on this path.
100            authority: None,
101            jurisdiction: None,
102            source_ref: Some(serde_json::Value::String(key.clone())),
103            provenance: None,
104        });
105    }
106}
107
108/// The store-side durable key for a tick: the dedup key derived from its opaque `source_ref` if
109/// present (a string verbatim, or an object's deterministic JSON — see `source_ref_key`), else the
110/// first round/`#<n>` token in the hashed `observe` — never the non-hashed events log. Shared by the
111/// idempotency index + reconcile, so the two never disagree on key precedence.
112fn store_key(raw: &serde_json::Value) -> Option<String> {
113    raw.get("source_ref")
114        .map(crate::tick::source_ref_key)
115        .or_else(|| {
116            raw.get("observe")
117                .and_then(|x| x.as_str())
118                .and_then(first_round_or_issue_token)
119        })
120}
121
122/// The closed key set of a Canonical Decision Intake line. The wire envelope is STRICT — unlike a
123/// stored tick (which tolerates an unknown non-hashed key as forward-compat), an external producer's
124/// line with an unknown key is a hard failure, so a mis-piped file cannot smuggle a field past ingest.
125const CANONICAL_KEYS: &[&str] = &[
126    "kind",
127    "decision",
128    "observe",
129    "grounds",
130    "blame",
131    "authority",
132    "jurisdiction",
133    "source_ref",
134    "provenance",
135];
136
137/// Parse a **Canonical Decision Intake** stream (JSONL) into `MigrationRecord`s — the format-neutral
138/// intake both an adopter's legacy adapter and a future live runner emit. This IS the trust boundary:
139/// the producer supplies STRUCTURE, and ev RE-VALIDATES it here through the very read-path validators
140/// (`ground_from_value`, the vocab checks) that guard an on-disk tick — never a parallel serde decode
141/// that could trust an unchecked `Ground`. Per line: skip blank / `#`-comment lines; require the fixed
142/// `kind` discriminator and reject any unknown envelope key loudly; require a non-empty `decision` and
143/// a `grounds` array (which may be empty — the honest zero-grounds capture); validate every declared
144/// tag against its closed vocabulary. The durable dedup/sort key mirrors `store_key`: the opaque
145/// `source_ref`'s derived key, else the first round/`#issue` token in `observe`.
146pub fn canonical_records(text: &str) -> Result<Vec<MigrationRecord>, String> {
147    use crate::capture::validate_authority;
148    use crate::tick::{
149        ground_from_value, only_keys, req_str, source_ref_key, validate_jurisdiction,
150        validate_provenance, validate_source_ref,
151    };
152    let mut out = Vec::new();
153    for (i, raw_line) in text.lines().enumerate() {
154        let n = i + 1;
155        let line = raw_line.trim();
156        if line.is_empty() || line.starts_with('#') {
157            continue;
158        }
159        let v: serde_json::Value =
160            serde_json::from_str(line).map_err(|e| format!("canonical line {n}: not JSON: {e}"))?;
161        let obj = v
162            .as_object()
163            .ok_or_else(|| format!("canonical line {n}: not a JSON object"))?;
164        only_keys(obj, CANONICAL_KEYS, &format!("canonical line {n}"))?;
165        match obj.get("kind").and_then(|x| x.as_str()) {
166            Some("ev-decision-intake") => {}
167            other => {
168                return Err(format!(
169                    "canonical line {n}: not an ev-decision-intake record (kind={other:?})"
170                ))
171            }
172        }
173        let decision = req_str(obj, "decision").map_err(|e| format!("canonical line {n}: {e}"))?;
174        if decision.trim().is_empty() {
175            return Err(format!("canonical line {n}: decision is empty"));
176        }
177        let observe = obj
178            .get("observe")
179            .and_then(|x| x.as_str())
180            .unwrap_or("")
181            .to_string();
182        let grounds_v = obj
183            .get("grounds")
184            .and_then(|x| x.as_array())
185            .ok_or_else(|| format!("canonical line {n}: grounds missing/not array"))?;
186        let mut grounds = Vec::new();
187        for gv in grounds_v {
188            grounds.push(ground_from_value(gv).map_err(|e| format!("canonical line {n}: {e}"))?);
189        }
190        let blame = obj
191            .get("blame")
192            .and_then(|x| x.as_str())
193            .map(str::to_string);
194        // One validated optional string tag: absent → None; present → vocab-checked, with the line
195        // number threaded into the error. (source_ref is a raw Value, so it stays its own arm below.)
196        let opt_tag = |key: &str,
197                       validate: fn(&str) -> Result<(), String>|
198         -> Result<Option<String>, String> {
199            match obj.get(key).and_then(|x| x.as_str()) {
200                None => Ok(None),
201                Some(v) => {
202                    validate(v).map_err(|e| format!("canonical line {n}: {e}"))?;
203                    Ok(Some(v.to_string()))
204                }
205            }
206        };
207        let authority = opt_tag("authority", validate_authority)?;
208        let jurisdiction = opt_tag("jurisdiction", validate_jurisdiction)?;
209        let provenance = opt_tag("provenance", validate_provenance)?;
210        let source_ref = match obj.get("source_ref") {
211            None => None,
212            Some(rv) => {
213                validate_source_ref(rv).map_err(|e| format!("canonical line {n}: {e}"))?;
214                Some(rv.clone())
215            }
216        };
217        // The dedup/sort key mirrors store_key's precedence: the source_ref's derived key, else the
218        // first round/`#issue` token in observe. A record that yields NEITHER has no durable identity,
219        // so re-imports could not be idempotent and distinct records would collide on the empty key —
220        // reject it at the door (mirroring the strict envelope), rather than silently keying it "".
221        let source_key = source_ref
222            .as_ref()
223            .map(source_ref_key)
224            .or_else(|| first_round_or_issue_token(&observe))
225            .filter(|k| !k.is_empty());
226        let source_key = match source_key {
227            Some(k) => k,
228            None => {
229                return Err(format!(
230                    "canonical line {n}: a record needs a source_ref (or a round/#issue token in observe) for idempotent re-import"
231                ))
232            }
233        };
234        out.push(MigrationRecord {
235            source_key,
236            decision,
237            observe,
238            blame,
239            grounds,
240            authority,
241            jurisdiction,
242            source_ref,
243            provenance,
244        });
245    }
246    Ok(out)
247}
248
249/// Extractor 1 — **gitlog / chat-room**: each `## R<N> …` header is one decision; the header text
250/// after the round token (and an optional `— ` em-dash separator) is the decision; any structurally
251/// declared rejected-road line in that record's body becomes a ground. The `R<N>`/`#<n>` token is the
252/// source_key and is carried into observe as a durable provenance token. Reasons are NEVER NLP'd.
253pub fn extract_gitlog(text: &str) -> Vec<MigrationRecord> {
254    let mut records = Vec::new();
255    let mut header: Option<(String, String)> = None; // (source_key, decision)
256    let mut body = String::new();
257    for line in text.lines() {
258        if let Some(rest) = line.strip_prefix("## ") {
259            flush_record(&header, &body, &mut records);
260            body.clear();
261            let key = first_round_or_issue_token(rest);
262            // The decision text is the header with the leading round token stripped + em-dash trimmed.
263            let decision = match key.as_deref() {
264                Some(k) => rest
265                    .split_once(k)
266                    .map(|x| x.1)
267                    .unwrap_or(rest)
268                    .trim_start_matches([' ', '—', '-', ':'])
269                    .trim()
270                    .to_string(),
271                None => rest.trim().to_string(),
272            };
273            header = key.map(|k| {
274                (
275                    k,
276                    if decision.is_empty() {
277                        rest.trim().into()
278                    } else {
279                        decision
280                    },
281                )
282            });
283        } else {
284            body.push_str(line);
285            body.push('\n');
286        }
287    }
288    flush_record(&header, &body, &mut records);
289    records
290}
291
292/// The shared RESOLVED / FLAG block reader (the authority substrate). A `### RESOLVED <key>: <decision>`
293/// or `### FLAG <key>: <decision>` header opens a block; the block's body is scanned for structured
294/// rejected-roads only. RESOLVED marks a user-ruled decision; FLAG marks an open one — both are
295/// captured (the ruling state is provenance, not a reason to drop the record). PATH-PARAMETERIZED by
296/// the caller: `to-human` and `escalation` are the SAME reader over different files (no hardcoded
297/// layout). Returns records in source order.
298fn read_resolved_flag_blocks(text: &str) -> Vec<MigrationRecord> {
299    let mut records = Vec::new();
300    let mut header: Option<(String, String)> = None;
301    let mut body = String::new();
302    for line in text.lines() {
303        let stripped = line
304            .trim_start_matches(['#', ' '])
305            .strip_prefix("RESOLVED")
306            .or_else(|| line.trim_start_matches(['#', ' ']).strip_prefix("FLAG"));
307        if let Some(rest) = stripped {
308            flush_record(&header, &body, &mut records);
309            body.clear();
310            let rest = rest.trim();
311            // `<key>: <decision>` — the key is the leading token before the first colon.
312            if let Some((key, decision)) = rest.split_once(':') {
313                let key = key.trim();
314                let source_key = first_round_or_issue_token(key).unwrap_or_else(|| key.to_string());
315                header = Some((source_key, decision.trim().to_string()));
316            } else {
317                let source_key =
318                    first_round_or_issue_token(rest).unwrap_or_else(|| rest.to_string());
319                header = Some((source_key, rest.to_string()));
320            }
321        } else {
322            body.push_str(line);
323            body.push('\n');
324        }
325    }
326    flush_record(&header, &body, &mut records);
327    records
328}
329
330/// Extractor 2 — **to-human**: the RESOLVED/FLAG markdown blocks (the authority substrate).
331pub fn extract_to_human(text: &str) -> Vec<MigrationRecord> {
332    read_resolved_flag_blocks(text)
333}
334
335/// Extractor 4 — **escalation**: the SAME RESOLVED/FLAG reader, path-parameterized — escalation is
336/// just the reader over a different file, with NO hardcoded layout of its own.
337pub fn extract_escalation(text: &str) -> Vec<MigrationRecord> {
338    read_resolved_flag_blocks(text)
339}
340
341/// Extractor 3 — **decisions-immutable**: a document split on `## N.` / `## §N` section headers, one
342/// decision per numbered section. The section number is the source_key; the header text after the
343/// number is the decision; structured rejected-roads in the section body become grounds.
344pub fn extract_decisions_immutable(text: &str) -> Vec<MigrationRecord> {
345    let mut records = Vec::new();
346    let mut header: Option<(String, String)> = None;
347    let mut body = String::new();
348    for line in text.lines() {
349        if let Some(rest) = line.strip_prefix("## ") {
350            // A numbered section header: `## 3. <decision>` or `## §3 <decision>`.
351            let rest = rest.trim();
352            let digits: String = rest
353                .trim_start_matches('§')
354                .chars()
355                .take_while(|c| c.is_ascii_digit())
356                .collect();
357            if !digits.is_empty() {
358                flush_record(&header, &body, &mut records);
359                body.clear();
360                let decision = rest
361                    .trim_start_matches('§')
362                    .trim_start_matches(|c: char| c.is_ascii_digit())
363                    .trim_start_matches(['.', ' ', ':', '—', '-'])
364                    .trim()
365                    .to_string();
366                header = Some((format!("§{digits}"), decision));
367                continue;
368            }
369        }
370        body.push_str(line);
371        body.push('\n');
372    }
373    flush_record(&header, &body, &mut records);
374    records
375}
376
377/// The outcome of one backfill pass (idempotent): how many records were imported, skipped (already
378/// present by content-addressed id), re-linked (a back-dated mid-chain insert that re-parented), and
379/// how many were source-only gaps that could not be appended (e.g. a source lacking authors with no
380/// `--blame` fallback). Rendered by the command layer.
381#[derive(Debug, Default, PartialEq)]
382pub struct BackfillSummary {
383    pub imported: usize,
384    pub skipped: usize,
385    pub relinked: usize,
386    pub source_only_gaps: usize,
387    /// A re-imported record whose RESOLVED non-hashed tags (authority/jurisdiction/provenance) differ
388    /// from the already-stored tick. Ticks are immutable, so the difference is reported, NEVER applied —
389    /// surfaced (not silently skipped) so a corrected ruling is never invisibly dropped.
390    pub discrepancies: usize,
391}
392
393/// Map the store's existing decisions to their durable source key → (id, parent_id). The key is the
394/// derived dedup key of the non-hashed `source_ref` if present, else the first round/#N token in the
395/// hashed `observe` — never the non-hashed events log. The idempotency + re-link index for a backfill.
396fn store_key_index(
397    store: &Store,
398) -> Result<std::collections::HashMap<String, (String, String)>, String> {
399    let files = store
400        .read_all()
401        .map_err(|e| format!("reading store: {e}"))?;
402    let mut idx = std::collections::HashMap::new();
403    for (name, raw) in &files {
404        let key = store_key(raw);
405        let parent = raw
406            .get("parent_id")
407            .and_then(|x| x.as_str())
408            .unwrap_or("")
409            .to_string();
410        if let Some(k) = key {
411            idx.insert(k, (name.clone(), parent));
412        }
413    }
414    Ok(idx)
415}
416
417/// Run the idempotent backfill of `records` into the store at `repo`. Deterministic order: records
418/// are sorted by `source_key` first so a re-run replays the same chain. Idempotency is keyed on the
419/// durable `source_key` (the non-hashed `source_ref`'s derived key, or a token in the hashed `observe`): a record
420/// whose key is already in the store is SKIPPED — chain-position-independent, so a re-run over a
421/// now-non-empty store writes nothing. The chain is kept by threading the PROSPECTIVE parent (the
422/// id we just wrote/found) instead of re-reading the live HEAD each step, so the lineage stays
423/// stable across re-runs. A skipped record whose stored parent differs from where it would now land
424/// is a back-dated mid-chain insert and is reported as re-linked. `blame_fallback` supplies the
425/// author for a record carrying none; a record with neither is a source-only gap (R5 stays intact —
426/// we never invent an author). `jurisdiction_map` (source_key → A/B/C/D bucket) tags each imported
427/// decision: a record whose key is in the map carries that jurisdiction, one absent imports untagged
428/// (None) — so the map is purely additive (an empty map ⇒ every record None, the prior behavior).
429/// jurisdiction is NON-hashed, so tagging never moves a tick id (idempotency holds across re-runs).
430/// `--dry-run` reports the would-import count but writes nothing.
431pub fn backfill(
432    repo: &Path,
433    mut records: Vec<MigrationRecord>,
434    blame_fallback: Option<&str>,
435    jurisdiction_map: &HashMap<String, String>,
436    dry_run: bool,
437) -> Result<BackfillSummary, String> {
438    records.sort_by(|a, b| a.source_key.cmp(&b.source_key));
439    let store = Store::at(repo);
440    if !store.exists() {
441        return Err("no .evolving/ store here — run `ev init` first".into());
442    }
443    // The running source_key index, seeded from the store and EXTENDED as each record is written, so a
444    // WITHIN-pass duplicate key (two records — e.g. a gitlog R555 and a to-human R555 across two
445    // --source files — sharing a key but absent from the store) routes into the skip/report arm instead
446    // of silently double-importing. `initial_keys` remembers the seed so a within-pass duplicate is not
447    // misreported as a back-dated relink.
448    let mut existing = store_key_index(&store)?;
449    let initial_keys: std::collections::HashSet<String> = existing.keys().cloned().collect();
450    // The prospective parent threads through the loop so the chain stays coherent across this pass:
451    // for a brand-new store it begins at the live HEAD; as records resolve it advances to each id.
452    // For relink detection we compare a found record's STORED parent against where this sorted pass
453    // would place it (`prospective_parent`) — equal ⇒ the chain is intact (a clean re-run reports
454    // 0); different ⇒ the chain was re-linked around it (a back-dated mid-chain insert).
455    let head = store
456        .read_head()
457        .map_err(|e| format!("reading HEAD: {e}"))?;
458    // Seed the prospective parent: if the FIRST sorted record is already the genesis (stored
459    // parent ""), the pass replays from genesis; otherwise it extends the current HEAD.
460    let first_is_stored_genesis = records
461        .first()
462        .and_then(|r| existing.get(&r.source_key))
463        .map(|(_, p)| p.is_empty())
464        .unwrap_or(false);
465    let mut prospective_parent = if first_is_stored_genesis {
466        String::new()
467    } else {
468        head
469    };
470    let mut summary = BackfillSummary::default();
471    for r in records {
472        // Resolve the declared non-hashed tags the SAME way the write path does, BEFORE the skip
473        // check — so the idempotency-skip arm can compare them against the stored tick, and a
474        // jurisdiction conflict is caught whether or not the record is new. Inline jurisdiction WINS
475        // over the `--jurisdiction-map`; the map fills only a record that declares none; a record
476        // declaring a DIFFERENT bucket than the map is a hard error (two sources of truth disagree).
477        let jurisdiction = match (
478            r.jurisdiction.as_deref(),
479            jurisdiction_map.get(&r.source_key),
480        ) {
481            (Some(inline), Some(mapped)) if inline != mapped => {
482                return Err(format!(
483                    "source {:?}: inline jurisdiction {inline:?} conflicts with the --jurisdiction-map entry {mapped:?}",
484                    r.source_key
485                ));
486            }
487            (Some(inline), _) => Some(inline.to_string()),
488            (None, mapped) => mapped.cloned(),
489        };
490        let authority = r.authority.clone();
491        let source_ref = r.source_ref.clone();
492        // The migrate verb backfills HISTORY: a record with no declared provenance is stamped
493        // `imported`. An explicit value (a live runner emitting `agent-proposed` / `human-now`) wins.
494        // `ev decide` / `ev guard` never reach here, so fresh authorship is never stamped imported.
495        let provenance = r
496            .provenance
497            .clone()
498            .or_else(|| Some("imported".to_string()));
499
500        // Idempotency PRE-CHECK on the durable source_key (chain-position-independent).
501        if let Some((existing_id, existing_parent)) = existing.get(&r.source_key) {
502            // A back-dated mid-chain insert: present in the INITIAL store, but its stored parent differs
503            // from where this pass would now place it — the chain was re-linked around it. Reported,
504            // never rewritten. Gated on `initial_keys` so a within-pass duplicate (added to `existing`
505            // this pass) is not misreported as a relink.
506            if initial_keys.contains(&r.source_key) && *existing_parent != prospective_parent {
507                summary.relinked += 1;
508            }
509            // A re-import NEVER rewrites a tick (immutability). But if the record's RESOLVED non-hashed
510            // tags differ from the stored tick, that is a real faithfulness difference — SURFACE it
511            // loudly, never drop it silently (a silent skip of a corrected authority is the false-green
512            // ev exists to refuse). The human resolves it with `ev correct`. Mirrors the re-linked
513            // report: detect a difference on a present record, report it, never rewrite.
514            if let Ok(Some(stored)) = store.read_tick(existing_id) {
515                let diffs: Vec<String> = [
516                    ("authority", &stored.authority, &authority),
517                    ("jurisdiction", &stored.jurisdiction, &jurisdiction),
518                    ("provenance", &stored.provenance, &provenance),
519                ]
520                .iter()
521                .filter(|(_, s, i)| s != i)
522                .map(|(label, s, i)| format!("{label} stored={s:?} incoming={i:?}"))
523                .collect();
524                if !diffs.is_empty() {
525                    summary.discrepancies += 1;
526                    eprintln!(
527                        "discrepancy: source {:?} (tick {existing_id}): {} — NOT applied (ticks are immutable; resolve with `ev correct {existing_id}`)",
528                        r.source_key,
529                        diffs.join("; ")
530                    );
531                }
532            }
533            // Keep the chain coherent for any later records in this same pass.
534            prospective_parent = existing_id.clone();
535            summary.skipped += 1;
536            continue;
537        }
538        let blame = match r.blame.as_deref().or(blame_fallback) {
539            Some(b) if !b.trim().is_empty() => b.trim().to_string(),
540            _ => {
541                // R5 stays intact: no author, no fabrication. Surface the gap; never invent a human.
542                summary.source_only_gaps += 1;
543                continue;
544            }
545        };
546        // Ingest-boundary structural gates — the SAME refusals `ev verify` enforces at rest, applied at
547        // the door so a malformed record never lands. A C/D (detect-only) decision may carry no runnable
548        // Test check (one shared predicate with verify, so they cannot drift):
549        if crate::tick::detect_only_carries_test(jurisdiction.as_deref(), &r.grounds) {
550            return Err(format!(
551                "source {:?}: a {} jurisdiction (detect-only) decision cannot carry a runnable test check",
552                r.source_key,
553                jurisdiction.as_deref().unwrap_or("")
554            ));
555        }
556        // And a harvested check (a Test with no counter-test) is allowed ONLY for imported history — a
557        // fresh `agent-proposed` binding must prove falsifiability with a counter-test, exactly as decide.
558        for g in &r.grounds {
559            if let Some(crate::tick::Check::Test {
560                counter_test: None, ..
561            }) = &g.check
562            {
563                if provenance.as_deref() != Some("imported") {
564                    return Err(format!(
565                        "source {:?}: a harvested test check (no counter-test) is allowed only for imported history, not {}",
566                        r.source_key,
567                        provenance.as_deref().unwrap_or("human-now")
568                    ));
569                }
570            }
571        }
572        // A rejected-road TRIPWIRE (a Test check on a rejected: road) is a GATING capability, so the
573        // canonical door admits it under the SAME authoring rule decide/guard enforce: the decision
574        // must be authority=user-ruled (a human's deliberate closed-road ruling) AND the check must
575        // carry a counter-test (no harvested/non-falsifiable rejected-road tripwire — stricter than
576        // the general harvested gate above, which lets imported history harvest a chosen ground). This
577        // makes the user-ruled-only rule STRUCTURAL across every producer, closing the bypass where a
578        // hand-crafted canonical line slips a rejected-road check past ground_from_value's permissive
579        // parse. (verify stays permissive at rest: authority is mutable via `ev correct`, so a later
580        // re-tag must not retroactively invalidate a legitimately-authored tripwire.)
581        for g in &r.grounds {
582            if g.supports.starts_with("rejected:") {
583                if let Some(crate::tick::Check::Test { counter_test, .. }) = &g.check {
584                    if authority.as_deref() != Some("user-ruled") {
585                        return Err(format!(
586                            "source {:?}: a rejected road can carry a tripwire test only when authority=user-ruled",
587                            r.source_key
588                        ));
589                    }
590                    if counter_test.is_none() {
591                        return Err(format!(
592                            "source {:?}: a rejected-road tripwire requires a counter-test (no harvested tripwire)",
593                            r.source_key
594                        ));
595                    }
596                }
597            }
598        }
599        if dry_run {
600            // The id this record WOULD take at the prospective parent (no write). held_since is
601            // non-hashed, so this matches the id `append` computes on a real run — only the real
602            // path needs a write, so the probe lives here, not on the hot import path.
603            let probe = Tick {
604                id: String::new(),
605                parent_id: prospective_parent.clone(),
606                observe: r.observe.clone(),
607                decision: r.decision.clone(),
608                grounds: r.grounds.clone(),
609                status: "live".into(),
610                held_since: String::new(),
611                blame: blame.clone(),
612                authority: authority.clone(),
613                jurisdiction: jurisdiction.clone(),
614                source_ref: source_ref.clone(),
615                provenance: provenance.clone(),
616            };
617            let probe_id = compute_id(&probe);
618            // Extend the running index so a later same-key record this pass routes into the skip arm.
619            existing.insert(
620                r.source_key.clone(),
621                (probe_id.clone(), prospective_parent.clone()),
622            );
623            prospective_parent = probe_id;
624            summary.imported += 1;
625            continue;
626        }
627        let written = crate::capture::append(
628            repo,
629            Decision {
630                observe: r.observe,
631                decision: r.decision,
632                grounds: r.grounds,
633                blame,
634                authority,
635                jurisdiction,
636                source_ref,
637                provenance,
638            },
639        )?;
640        // Extend the running index so a later same-key record this pass routes into the skip arm
641        // (a within-pass duplicate is detected + reported, never silently double-imported). r.source_key
642        // (an owned field untouched by the partial move above) and prospective_parent move in directly.
643        existing.insert(r.source_key, (written.id.clone(), prospective_parent));
644        prospective_parent = written.id;
645        summary.imported += 1;
646    }
647    Ok(summary)
648}
649
650/// A reconcile bucket count: how many source rulings are IN BOTH the source and the store, how many
651/// are SOURCE-ONLY (the capture gap — a ruling the source has that the ledger never captured), how
652/// many are STORE-ONLY (in the ledger, absent from this source), and how many store ticks could not
653/// be keyed at all (no round token in their hashed observe). Keys come from the non-hashed `source_ref`
654/// or the hashed `observe`, never from events.jsonl, so they are durable.
655#[derive(Debug, Default, PartialEq)]
656pub struct ReconcileReport {
657    pub in_both: usize,
658    pub source_only: usize,
659    pub store_only: usize,
660    pub un_keyable: usize,
661}
662
663/// Reconcile a source's extracted records against the store. The store-side key is read from each
664/// the derived key of its non-hashed `source_ref` if present, else the first round/#N token in the
665/// hashed `observe` — so the join is durable (NOT dependent on the events log). A source key with no store
666/// match is a SOURCE-ONLY gap (the capture gap to surface); a store key with no source match is
667/// STORE-ONLY; a store tick with no derivable key is counted separately as un-keyable.
668pub fn reconcile(
669    repo: &Path,
670    source_records: &[MigrationRecord],
671) -> Result<ReconcileReport, String> {
672    let store = Store::at(repo);
673    if !store.exists() {
674        return Err("no .evolving/ store here — run `ev init` first".into());
675    }
676    let files = store
677        .read_all()
678        .map_err(|e| format!("reading store: {e}"))?;
679    let mut store_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
680    let mut un_keyable = 0usize;
681    for (_name, raw) in &files {
682        let key = store_key(raw);
683        match key {
684            Some(k) => {
685                store_keys.insert(k);
686            }
687            None => un_keyable += 1,
688        }
689    }
690    let source_keys: std::collections::HashSet<String> = source_records
691        .iter()
692        .map(|r| r.source_key.clone())
693        .collect();
694    let mut report = ReconcileReport {
695        un_keyable,
696        ..Default::default()
697    };
698    for k in &source_keys {
699        if store_keys.contains(k) {
700            report.in_both += 1;
701        } else {
702            report.source_only += 1;
703        }
704    }
705    report.store_only = store_keys
706        .iter()
707        .filter(|k| !source_keys.contains(*k))
708        .count();
709    Ok(report)
710}
711
712/// The `--bind-check` harvest: build a harvested `Check::Test` (counter_test None, full liveness) for
713/// the given selector, reusing the Task-5 migrate-only constructor. This is the SAME constructor the
714/// harvested-binding path uses — no second half-harvest gate. The caller attaches it to a ground.
715pub fn bind_check(
716    selector: String,
717    verified_at_sha: String,
718    platforms: Vec<String>,
719    triggered_by: Vec<String>,
720    surfaces: Vec<String>,
721) -> Result<crate::tick::Check, String> {
722    harvested_test_check(selector, verified_at_sha, platforms, triggered_by, surfaces)
723}
724
725#[cfg(test)]
726mod tests {
727    use super::*;
728
729    #[test]
730    fn extract_gitlog_should_yield_one_record_per_round_header_when_given_a_chat_room_log() {
731        // given: a chat-room log with two `## R<N>` decision records, one carrying a rejected road
732        let text = "\
733## R2289 QA — restore-safety counter DB-backed
734- rejected: Redis: would add a new infra dependency
735## R2290 Dev — ship the cross-pod drain
736some prose nobody parses for grounds
737";
738
739        // when: the gitlog extractor reads it
740        let recs = extract_gitlog(text);
741
742        // then: two records, keyed by their round token, the first carrying the structured road
743        assert_eq!(recs.len(), 2);
744        assert_eq!(recs[0].source_key, "R2289");
745        assert_eq!(recs[0].decision, "QA — restore-safety counter DB-backed");
746        assert_eq!(recs[0].grounds.len(), 1);
747        assert_eq!(recs[0].grounds[0].supports, "rejected:Redis");
748        assert_eq!(recs[1].source_key, "R2290");
749        assert!(recs[0].observe.contains("R2289"));
750    }
751
752    #[test]
753    fn extract_to_human_should_read_a_resolved_block_when_given_the_authority_substrate() {
754        // given: a to-human doc with a RESOLVED ruling and a FLAG (open) one
755        let text = "\
756### RESOLVED R555: restore-safety counter DB-backed; reject Redis
757- rejected: Redis: a new infra dependency
758### FLAG R600: multi-pod relax policy still open
759";
760
761        // when: the to-human extractor reads it
762        let recs = extract_to_human(text);
763
764        // then: both blocks are captured; the RESOLVED one carries its structured road
765        assert_eq!(recs.len(), 2);
766        assert_eq!(recs[0].source_key, "R555");
767        assert_eq!(
768            recs[0].decision,
769            "restore-safety counter DB-backed; reject Redis"
770        );
771        assert_eq!(recs[0].grounds.len(), 1);
772        assert_eq!(recs[1].source_key, "R600");
773    }
774
775    #[test]
776    fn extract_escalation_should_reuse_the_resolved_flag_reader_when_given_an_escalation_log() {
777        // given: an escalation log in the SAME RESOLVED/FLAG shape (path-parameterized reader)
778        let text = "### FLAG #1194: re-milestoned without sign-off\n";
779
780        // when: the escalation extractor reads it
781        let recs = extract_escalation(text);
782
783        // then: it is read identically to to-human (no hardcoded layout of its own)
784        assert_eq!(recs.len(), 1);
785        assert_eq!(recs[0].source_key, "#1194");
786        assert_eq!(recs[0].decision, "re-milestoned without sign-off");
787    }
788
789    #[test]
790    fn extract_decisions_immutable_should_split_on_numbered_sections_when_given_a_doc() {
791        // given: a decisions-immutable doc split into numbered sections
792        let text = "\
793## 1. freeze the retrieval schema for v2
794- rejected: pgvector: would lock our schema
795## 2. restore-safety counter DB-backed
796";
797
798        // when: the decisions-immutable extractor reads it
799        let recs = extract_decisions_immutable(text);
800
801        // then: one record per section, keyed by §N, the first carrying its structured road
802        assert_eq!(recs.len(), 2);
803        assert_eq!(recs[0].source_key, "§1");
804        assert_eq!(recs[0].decision, "freeze the retrieval schema for v2");
805        assert_eq!(recs[0].grounds.len(), 1);
806        assert_eq!(recs[1].source_key, "§2");
807    }
808
809    #[test]
810    fn grounds_are_never_synthesized_when_a_block_has_no_structured_rejected_road() {
811        // given: a record whose body is pure prose mentioning a rejected option WITHOUT the
812        // structured `rejected:<opt>: <why>` token — an NLP'able sentence we must NOT mine
813        let text = "\
814## R2289 we considered Redis but rejected it because it adds infra
815this paragraph explains at length why redis was rejected, in prose
816";
817
818        // when: the gitlog extractor reads it
819        let recs = extract_gitlog(text);
820
821        // then: the record exists but carries ZERO grounds — reasons are never NLP'd into grounds
822        assert_eq!(recs.len(), 1);
823        assert!(
824            recs[0].grounds.is_empty(),
825            "a prose reason must NEVER become a ground (no synthesis)"
826        );
827    }
828
829    // --- canonical intake reader (the trust boundary) ---
830
831    fn canonical_line(extra: &str) -> String {
832        // a minimal valid ev-decision-intake line (carrying a source_ref so it has a durable dedup
833        // key), with room to splice in extra fields. Tests that OVERRIDE source_ref build inline.
834        format!(
835            "{{\"kind\":\"ev-decision-intake\",\"decision\":\"no Redis\",\"grounds\":[],\"source_ref\":\"R1\"{extra}}}"
836        )
837    }
838
839    #[test]
840    fn canonical_reader_should_parse_a_full_ruling_record_when_given_a_valid_line() {
841        // given: a full ev-decision-intake ruling carrying every declared tag
842        let text = "{\"kind\":\"ev-decision-intake\",\"decision\":\"rate-limit at the edge\",\
843\"observe\":\"round R1043\",\"grounds\":[{\"claim\":\"edge sees every request\",\"supports\":\"chosen\"},\
844{\"claim\":\"app tier double-counts\",\"supports\":\"rejected:app-tier\"}],\"blame\":\"Wang Yu\",\
845\"authority\":\"user-ruled\",\"jurisdiction\":\"C\",\"source_ref\":\"R1043\",\"provenance\":\"imported\"}";
846
847        // when: the canonical reader parses it
848        let recs = canonical_records(text).expect("valid record");
849
850        // then: every field maps onto the record, grounds re-parsed through the read-path validator
851        assert_eq!(recs.len(), 1);
852        let r = &recs[0];
853        assert_eq!(r.decision, "rate-limit at the edge");
854        assert_eq!(r.grounds.len(), 2);
855        assert_eq!(r.grounds[1].supports, "rejected:app-tier");
856        assert_eq!(r.blame.as_deref(), Some("Wang Yu"));
857        assert_eq!(r.authority.as_deref(), Some("user-ruled"));
858        assert_eq!(r.jurisdiction.as_deref(), Some("C"));
859        assert_eq!(r.source_ref, Some(serde_json::json!("R1043")));
860        assert_eq!(r.source_key, "R1043");
861        assert_eq!(r.provenance.as_deref(), Some("imported"));
862    }
863
864    #[test]
865    fn canonical_reader_should_reject_a_line_whose_kind_is_not_ev_decision_intake() {
866        // given: a JSON line with the wrong envelope kind (a mis-piped non-intake file)
867        let text = "{\"kind\":\"something-else\",\"decision\":\"x\",\"grounds\":[]}";
868
869        // when: the canonical reader parses it
870        let result = canonical_records(text);
871
872        // then: it loud-fails (the wire envelope is strict, not forward-compat-tolerant)
873        assert!(result.is_err());
874    }
875
876    #[test]
877    fn canonical_reader_should_reject_an_unknown_envelope_key() {
878        // given: an otherwise-valid line carrying a key outside the closed envelope set
879        let text = canonical_line(",\"emoji\":\"✅\"");
880
881        // when: the canonical reader parses it
882        let result = canonical_records(&text);
883
884        // then: the unknown key is rejected at the door (no format bleeds into core)
885        assert!(result.is_err());
886    }
887
888    #[test]
889    fn canonical_reader_should_reject_a_malformed_ground_via_ground_from_value() {
890        // given: a line whose ground has an invalid supports (not chosen / rejected:<opt>)
891        let text = "{\"kind\":\"ev-decision-intake\",\"decision\":\"x\",\
892\"grounds\":[{\"claim\":\"c\",\"supports\":\"maybe\"}]}";
893
894        // when: the canonical reader parses it
895        let result = canonical_records(text);
896
897        // then: it fails through the SAME read-path validator a stored tick uses (the trust boundary)
898        assert!(result.is_err());
899    }
900
901    #[test]
902    fn canonical_reader_should_import_zero_grounds_when_grounds_is_empty() {
903        // given: a valid line with an empty grounds array (the honest zero-grounds capture, e.g. a FLAG)
904        let text = canonical_line("");
905
906        // when: the canonical reader parses it
907        let recs = canonical_records(&text).expect("zero-grounds is first-class");
908
909        // then: the record imports with no grounds (never synthesized)
910        assert_eq!(recs.len(), 1);
911        assert!(recs[0].grounds.is_empty());
912    }
913
914    #[test]
915    fn canonical_reader_should_take_source_ref_verbatim_without_resniffing_tokens() {
916        // given: a line whose source_ref is an opaque key and whose observe carries a DIFFERENT token
917        let text = "{\"kind\":\"ev-decision-intake\",\"decision\":\"no Redis\",\"grounds\":[],\
918\"observe\":\"see R2289\",\"source_ref\":\"ticket-42\"}";
919
920        // when: the canonical reader parses it
921        let recs = canonical_records(text).expect("valid");
922
923        // then: source_ref and the dedup key are the verbatim source_ref — never re-sniffed from observe
924        assert_eq!(recs[0].source_ref, Some(serde_json::json!("ticket-42")));
925        assert_eq!(recs[0].source_key, "ticket-42");
926    }
927
928    #[test]
929    fn canonical_reader_should_key_a_structured_source_ref_by_its_deterministic_json() {
930        // given: a line whose source_ref is a STRUCTURED object (richer than a string)
931        let text = "{\"kind\":\"ev-decision-intake\",\"decision\":\"no Redis\",\"grounds\":[],\
932\"source_ref\":{\"round\":\"R1\",\"sprint\":\"S7\"}}";
933
934        // when: the canonical reader parses it
935        let recs = canonical_records(text).expect("valid");
936
937        // then: the object is carried opaquely and the dedup key is its deterministic (sorted) JSON
938        assert_eq!(
939            recs[0].source_ref,
940            Some(serde_json::json!({"round": "R1", "sprint": "S7"}))
941        );
942        assert_eq!(recs[0].source_key, "{\"round\":\"R1\",\"sprint\":\"S7\"}");
943    }
944
945    #[test]
946    fn canonical_reader_should_skip_blank_and_comment_lines() {
947        // given: a stream padded with a blank line and a #-comment around one record
948        let text = format!("\n# a comment\n{}\n\n", canonical_line(""));
949
950        // when: the canonical reader parses it
951        let recs = canonical_records(&text).expect("valid");
952
953        // then: only the real record is read (blank/comment lines are skipped, not errors)
954        assert_eq!(recs.len(), 1);
955    }
956
957    #[test]
958    fn canonical_reader_should_reject_a_record_with_no_source_ref_and_no_observe_token() {
959        // given: a canonical line with NO source_ref and an observe carrying NO round/#issue token
960        let text = "{\"kind\":\"ev-decision-intake\",\"decision\":\"x\",\"grounds\":[],\
961\"observe\":\"no token here\"}";
962
963        // when: the canonical reader parses it
964        let result = canonical_records(text);
965
966        // then: it is rejected — a record with no durable key cannot be re-imported idempotently
967        assert!(
968            result.is_err(),
969            "an un-keyable record must be refused at the door"
970        );
971    }
972
973    #[test]
974    fn canonical_reader_should_reject_an_out_of_vocab_provenance() {
975        // given: a line whose provenance is outside the closed vocabulary
976        let text = canonical_line(",\"provenance\":\"self-asserted\"");
977
978        // when: the canonical reader parses it
979        let result = canonical_records(&text);
980
981        // then: it fails (provenance is vocab-validated at the boundary, like jurisdiction/authority)
982        assert!(result.is_err());
983    }
984}