ev/migrate.rs
1//! `ev migrate` — backfill an existing decision history into the ledger.
2//!
3//! Four PURE, format-aware extractors turn a source substrate (`&str`) into a `Vec<MigrationRecord>`:
4//! a chat-room/git log (`## R<N>` records), the `to-human` RESOLVED/FLAG markdown blocks (the
5//! authority substrate), a `decisions-immutable` §N document, and an `escalation` log (the SAME
6//! RESOLVED/FLAG reader, path-parameterized). The extractors parse **rulings + structured
7//! rejected-roads only** — they NEVER NLP a free-text reason into a ground (`grounds_are_never_
8//! synthesized`): a road becomes a ground iff the source declares it structurally (a `rejected:`
9//! token), otherwise the record carries zero grounds and stays an honest capture.
10//!
11//! The command driver then runs an IDEMPOTENT backfill loop (deterministic source_key sort →
12//! prospective-parent compute_id → ticks_dir pre-check → skip-if-present) on top of the shared
13//! `capture::append`, plus a `--reconcile` join and a `--bind-check` harvest.
14
15use crate::canonical::compute_id;
16use crate::capture::{harvested_test_check, Decision};
17use crate::store::Store;
18use crate::tick::{Ground, Tick};
19use std::collections::HashMap;
20use std::path::Path;
21
22/// One extracted, not-yet-appended decision from a source substrate. `source_key` is the stable,
23/// deterministic dedup/sort key (e.g. `R2289`, `#555`, `§3`) used to order the backfill and to
24/// reconcile against the store; `observe` carries that key as a durable token so reconcile can read
25/// it back from the HASHED payload, not from the events log. Grounds are ONLY the structurally
26/// declared rejected-roads — never synthesized from prose.
27#[derive(Debug, Clone, PartialEq)]
28pub struct MigrationRecord {
29 pub source_key: String,
30 pub decision: String,
31 pub observe: String,
32 pub blame: Option<String>,
33 pub grounds: Vec<Ground>,
34}
35
36/// A `#<n>` / `R<n>` provenance token (issue or round id), leading-char + all-digits. Mirrors the
37/// `subject_refs` vocabulary in capture.rs but returns the FIRST `R<n>`/`#<n>` as a stable key.
38fn first_round_or_issue_token(text: &str) -> Option<String> {
39 text.split(|c: char| !(c.is_ascii_alphanumeric() || c == '#'))
40 .find(|tok| {
41 let rest = tok
42 .strip_prefix('#')
43 .or_else(|| tok.strip_prefix('R'))
44 .or_else(|| tok.strip_prefix('r'));
45 matches!(rest, Some(d) if !d.is_empty() && d.bytes().all(|b| b.is_ascii_digit()))
46 })
47 .map(|t| t.to_string())
48}
49
50/// Parse the structurally-declared rejected-roads out of a block's lines. A road is declared ONLY by
51/// an explicit `rejected: <option>: <why>` (or `reject <option>: <why>`) line — never inferred from
52/// prose. Returns one `rejected:<option>` ground per declared road, in source order. A block with no
53/// such line yields zero grounds (the honesty contract: no synthesis).
54fn structured_rejected_roads(block: &str) -> Vec<Ground> {
55 let mut out = Vec::new();
56 for line in block.lines() {
57 let l = line.trim_start_matches(['-', '*', ' ', '\t']).trim();
58 let body = l
59 .strip_prefix("rejected:")
60 .or_else(|| l.strip_prefix("rejected "))
61 .or_else(|| l.strip_prefix("reject:"))
62 .or_else(|| l.strip_prefix("reject "));
63 if let Some(rest) = body {
64 if let Some((opt, why)) = rest.split_once(':') {
65 let (opt, why) = (opt.trim(), why.trim());
66 if !opt.is_empty() && !why.is_empty() {
67 out.push(Ground {
68 claim: why.to_string(),
69 supports: format!("rejected:{opt}"),
70 check: None,
71 });
72 }
73 }
74 }
75 }
76 out
77}
78
79/// Build one MigrationRecord from a parsed (key, decision) header + its block body: observe carries the
80/// source_key as durable provenance, grounds are the structurally-declared rejected-roads only (never
81/// synthesized), blame is left for the backfill's `--blame` fallback. Shared by all three block extractors.
82fn flush_record(header: &Option<(String, String)>, body: &str, out: &mut Vec<MigrationRecord>) {
83 if let Some((key, decision)) = header {
84 out.push(MigrationRecord {
85 source_key: key.clone(),
86 decision: decision.clone(),
87 observe: key.clone(),
88 blame: None,
89 grounds: structured_rejected_roads(body),
90 });
91 }
92}
93
94/// The store-side durable key for a tick: its `round_id` if present, else the first round/`#<n>` token
95/// in the hashed `observe` — never the non-hashed events log. Shared by the idempotency index + reconcile,
96/// so the two never disagree on key precedence.
97fn store_key(raw: &serde_json::Value) -> Option<String> {
98 raw.get("round_id")
99 .and_then(|x| x.as_str())
100 .map(|s| s.to_string())
101 .or_else(|| {
102 raw.get("observe")
103 .and_then(|x| x.as_str())
104 .and_then(first_round_or_issue_token)
105 })
106}
107
108/// Extractor 1 — **gitlog / chat-room**: each `## R<N> …` header is one decision; the header text
109/// after the round token (and an optional `— ` em-dash separator) is the decision; any structurally
110/// declared rejected-road line in that record's body becomes a ground. The `R<N>`/`#<n>` token is the
111/// source_key and is carried into observe as a durable provenance token. Reasons are NEVER NLP'd.
112pub fn extract_gitlog(text: &str) -> Vec<MigrationRecord> {
113 let mut records = Vec::new();
114 let mut header: Option<(String, String)> = None; // (source_key, decision)
115 let mut body = String::new();
116 for line in text.lines() {
117 if let Some(rest) = line.strip_prefix("## ") {
118 flush_record(&header, &body, &mut records);
119 body.clear();
120 let key = first_round_or_issue_token(rest);
121 // The decision text is the header with the leading round token stripped + em-dash trimmed.
122 let decision = match key.as_deref() {
123 Some(k) => rest
124 .split_once(k)
125 .map(|x| x.1)
126 .unwrap_or(rest)
127 .trim_start_matches([' ', '—', '-', ':'])
128 .trim()
129 .to_string(),
130 None => rest.trim().to_string(),
131 };
132 header = key.map(|k| {
133 (
134 k,
135 if decision.is_empty() {
136 rest.trim().into()
137 } else {
138 decision
139 },
140 )
141 });
142 } else {
143 body.push_str(line);
144 body.push('\n');
145 }
146 }
147 flush_record(&header, &body, &mut records);
148 records
149}
150
151/// The shared RESOLVED / FLAG block reader (the authority substrate). A `### RESOLVED <key>: <decision>`
152/// or `### FLAG <key>: <decision>` header opens a block; the block's body is scanned for structured
153/// rejected-roads only. RESOLVED marks a user-ruled decision; FLAG marks an open one — both are
154/// captured (the ruling state is provenance, not a reason to drop the record). PATH-PARAMETERIZED by
155/// the caller: `to-human` and `escalation` are the SAME reader over different files (no hardcoded
156/// layout). Returns records in source order.
157fn read_resolved_flag_blocks(text: &str) -> Vec<MigrationRecord> {
158 let mut records = Vec::new();
159 let mut header: Option<(String, String)> = None;
160 let mut body = String::new();
161 for line in text.lines() {
162 let stripped = line
163 .trim_start_matches(['#', ' '])
164 .strip_prefix("RESOLVED")
165 .or_else(|| line.trim_start_matches(['#', ' ']).strip_prefix("FLAG"));
166 if let Some(rest) = stripped {
167 flush_record(&header, &body, &mut records);
168 body.clear();
169 let rest = rest.trim();
170 // `<key>: <decision>` — the key is the leading token before the first colon.
171 if let Some((key, decision)) = rest.split_once(':') {
172 let key = key.trim();
173 let source_key = first_round_or_issue_token(key).unwrap_or_else(|| key.to_string());
174 header = Some((source_key, decision.trim().to_string()));
175 } else {
176 let source_key =
177 first_round_or_issue_token(rest).unwrap_or_else(|| rest.to_string());
178 header = Some((source_key, rest.to_string()));
179 }
180 } else {
181 body.push_str(line);
182 body.push('\n');
183 }
184 }
185 flush_record(&header, &body, &mut records);
186 records
187}
188
189/// Extractor 2 — **to-human**: the RESOLVED/FLAG markdown blocks (the authority substrate).
190pub fn extract_to_human(text: &str) -> Vec<MigrationRecord> {
191 read_resolved_flag_blocks(text)
192}
193
194/// Extractor 4 — **escalation**: the SAME RESOLVED/FLAG reader, path-parameterized — escalation is
195/// just the reader over a different file, with NO hardcoded layout of its own.
196pub fn extract_escalation(text: &str) -> Vec<MigrationRecord> {
197 read_resolved_flag_blocks(text)
198}
199
200/// Extractor 3 — **decisions-immutable**: a document split on `## N.` / `## §N` section headers, one
201/// decision per numbered section. The section number is the source_key; the header text after the
202/// number is the decision; structured rejected-roads in the section body become grounds.
203pub fn extract_decisions_immutable(text: &str) -> Vec<MigrationRecord> {
204 let mut records = Vec::new();
205 let mut header: Option<(String, String)> = None;
206 let mut body = String::new();
207 for line in text.lines() {
208 if let Some(rest) = line.strip_prefix("## ") {
209 // A numbered section header: `## 3. <decision>` or `## §3 <decision>`.
210 let rest = rest.trim();
211 let digits: String = rest
212 .trim_start_matches('§')
213 .chars()
214 .take_while(|c| c.is_ascii_digit())
215 .collect();
216 if !digits.is_empty() {
217 flush_record(&header, &body, &mut records);
218 body.clear();
219 let decision = rest
220 .trim_start_matches('§')
221 .trim_start_matches(|c: char| c.is_ascii_digit())
222 .trim_start_matches(['.', ' ', ':', '—', '-'])
223 .trim()
224 .to_string();
225 header = Some((format!("§{digits}"), decision));
226 continue;
227 }
228 }
229 body.push_str(line);
230 body.push('\n');
231 }
232 flush_record(&header, &body, &mut records);
233 records
234}
235
236/// The outcome of one backfill pass (idempotent): how many records were imported, skipped (already
237/// present by content-addressed id), re-linked (a back-dated mid-chain insert that re-parented), and
238/// how many were source-only gaps that could not be appended (e.g. a source lacking authors with no
239/// `--blame` fallback). Rendered by the command layer.
240#[derive(Debug, Default, PartialEq)]
241pub struct BackfillSummary {
242 pub imported: usize,
243 pub skipped: usize,
244 pub relinked: usize,
245 pub source_only_gaps: usize,
246}
247
248/// Map the store's existing decisions to their durable source key → (id, parent_id). The key is read
249/// from the HASHED payload: `round_id` if present, else the first round/#N token in `observe` — never
250/// from the non-hashed events log. This is the idempotency + re-link index for a backfill pass.
251fn store_key_index(
252 store: &Store,
253) -> Result<std::collections::HashMap<String, (String, String)>, String> {
254 let files = store
255 .read_all()
256 .map_err(|e| format!("reading store: {e}"))?;
257 let mut idx = std::collections::HashMap::new();
258 for (name, raw) in &files {
259 let key = store_key(raw);
260 let parent = raw
261 .get("parent_id")
262 .and_then(|x| x.as_str())
263 .unwrap_or("")
264 .to_string();
265 if let Some(k) = key {
266 idx.insert(k, (name.clone(), parent));
267 }
268 }
269 Ok(idx)
270}
271
272/// Run the idempotent backfill of `records` into the store at `repo`. Deterministic order: records
273/// are sorted by `source_key` first so a re-run replays the same chain. Idempotency is keyed on the
274/// durable `source_key` (carried into the hashed `observe` + the non-hashed `round_id`): a record
275/// whose key is already in the store is SKIPPED — chain-position-independent, so a re-run over a
276/// now-non-empty store writes nothing. The chain is kept by threading the PROSPECTIVE parent (the
277/// id we just wrote/found) instead of re-reading the live HEAD each step, so the lineage stays
278/// stable across re-runs. A skipped record whose stored parent differs from where it would now land
279/// is a back-dated mid-chain insert and is reported as re-linked. `blame_fallback` supplies the
280/// author for a record carrying none; a record with neither is a source-only gap (R5 stays intact —
281/// we never invent an author). `jurisdiction_map` (source_key → A/B/C/D bucket) tags each imported
282/// decision: a record whose key is in the map carries that jurisdiction, one absent imports untagged
283/// (None) — so the map is purely additive (an empty map ⇒ every record None, the prior behavior).
284/// jurisdiction is NON-hashed, so tagging never moves a tick id (idempotency holds across re-runs).
285/// `--dry-run` reports the would-import count but writes nothing.
286pub fn backfill(
287 repo: &Path,
288 mut records: Vec<MigrationRecord>,
289 blame_fallback: Option<&str>,
290 jurisdiction_map: &HashMap<String, String>,
291 dry_run: bool,
292) -> Result<BackfillSummary, String> {
293 records.sort_by(|a, b| a.source_key.cmp(&b.source_key));
294 let store = Store::at(repo);
295 if !store.exists() {
296 return Err("no .evolving/ store here — run `ev init` first".into());
297 }
298 let existing = store_key_index(&store)?;
299 // The prospective parent threads through the loop so the chain stays coherent across this pass:
300 // for a brand-new store it begins at the live HEAD; as records resolve it advances to each id.
301 // For relink detection we compare a found record's STORED parent against where this sorted pass
302 // would place it (`prospective_parent`) — equal ⇒ the chain is intact (a clean re-run reports
303 // 0); different ⇒ the chain was re-linked around it (a back-dated mid-chain insert).
304 let head = store
305 .read_head()
306 .map_err(|e| format!("reading HEAD: {e}"))?;
307 // Seed the prospective parent: if the FIRST sorted record is already the genesis (stored
308 // parent ""), the pass replays from genesis; otherwise it extends the current HEAD.
309 let first_is_stored_genesis = records
310 .first()
311 .and_then(|r| existing.get(&r.source_key))
312 .map(|(_, p)| p.is_empty())
313 .unwrap_or(false);
314 let mut prospective_parent = if first_is_stored_genesis {
315 String::new()
316 } else {
317 head
318 };
319 let mut summary = BackfillSummary::default();
320 for r in records {
321 // Idempotency PRE-CHECK on the durable source_key (chain-position-independent).
322 if let Some((existing_id, existing_parent)) = existing.get(&r.source_key) {
323 // A back-dated mid-chain insert: present, but its stored parent differs from where this
324 // pass would now place it — the chain was re-linked around it. Reported, never rewritten.
325 if *existing_parent != prospective_parent {
326 summary.relinked += 1;
327 }
328 // Keep the chain coherent for any later records in this same pass.
329 prospective_parent = existing_id.clone();
330 summary.skipped += 1;
331 continue;
332 }
333 let blame = match r.blame.as_deref().or(blame_fallback) {
334 Some(b) if !b.trim().is_empty() => b.trim().to_string(),
335 _ => {
336 // R5 stays intact: no author, no fabrication. Surface the gap; never invent a human.
337 summary.source_only_gaps += 1;
338 continue;
339 }
340 };
341 if dry_run {
342 // The id this record WOULD take at the prospective parent (no write). held_since is
343 // non-hashed, so this matches the id `append` computes on a real run — only the real
344 // path needs a write, so the probe lives here, not on the hot import path.
345 let probe = Tick {
346 id: String::new(),
347 parent_id: prospective_parent.clone(),
348 observe: r.observe.clone(),
349 decision: r.decision.clone(),
350 grounds: r.grounds.clone(),
351 status: "live".into(),
352 held_since: String::new(),
353 blame: blame.clone(),
354 authority: None,
355 jurisdiction: jurisdiction_map.get(&r.source_key).cloned(),
356 round_id: Some(r.source_key.clone()),
357 };
358 prospective_parent = compute_id(&probe);
359 summary.imported += 1;
360 continue;
361 }
362 let jurisdiction = jurisdiction_map.get(&r.source_key).cloned();
363 let written = crate::capture::append(
364 repo,
365 Decision {
366 observe: r.observe,
367 decision: r.decision,
368 grounds: r.grounds,
369 blame,
370 authority: None,
371 jurisdiction,
372 round_id: Some(r.source_key),
373 },
374 )?;
375 prospective_parent = written.id;
376 summary.imported += 1;
377 }
378 Ok(summary)
379}
380
381/// A reconcile bucket count: how many source rulings are IN BOTH the source and the store, how many
382/// are SOURCE-ONLY (the capture gap — a ruling the source has that the ledger never captured), how
383/// many are STORE-ONLY (in the ledger, absent from this source), and how many store ticks could not
384/// be keyed at all (no round token in their hashed observe). Keys come from the HASHED `observe` /
385/// `round_id`, never from events.jsonl, so they are durable.
386#[derive(Debug, Default, PartialEq)]
387pub struct ReconcileReport {
388 pub in_both: usize,
389 pub source_only: usize,
390 pub store_only: usize,
391 pub un_keyable: usize,
392}
393
394/// Reconcile a source's extracted records against the store. The store-side key is read from each
395/// tick's HASHED payload — its `round_id` if present, else the first round/#N token in `observe` —
396/// so the join is durable (NOT dependent on the non-hashed events log). A source key with no store
397/// match is a SOURCE-ONLY gap (the capture gap to surface); a store key with no source match is
398/// STORE-ONLY; a store tick with no derivable key is counted separately as un-keyable.
399pub fn reconcile(
400 repo: &Path,
401 source_records: &[MigrationRecord],
402) -> Result<ReconcileReport, String> {
403 let store = Store::at(repo);
404 if !store.exists() {
405 return Err("no .evolving/ store here — run `ev init` first".into());
406 }
407 let files = store
408 .read_all()
409 .map_err(|e| format!("reading store: {e}"))?;
410 let mut store_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
411 let mut un_keyable = 0usize;
412 for (_name, raw) in &files {
413 let key = store_key(raw);
414 match key {
415 Some(k) => {
416 store_keys.insert(k);
417 }
418 None => un_keyable += 1,
419 }
420 }
421 let source_keys: std::collections::HashSet<String> = source_records
422 .iter()
423 .map(|r| r.source_key.clone())
424 .collect();
425 let mut report = ReconcileReport {
426 un_keyable,
427 ..Default::default()
428 };
429 for k in &source_keys {
430 if store_keys.contains(k) {
431 report.in_both += 1;
432 } else {
433 report.source_only += 1;
434 }
435 }
436 report.store_only = store_keys
437 .iter()
438 .filter(|k| !source_keys.contains(*k))
439 .count();
440 Ok(report)
441}
442
443/// The `--bind-check` harvest: build a harvested `Check::Test` (counter_test None, full liveness) for
444/// the given selector, reusing the Task-5 migrate-only constructor. This is the SAME constructor the
445/// harvested-binding path uses — no second half-harvest gate. The caller attaches it to a ground.
446pub fn bind_check(
447 selector: String,
448 verified_at_sha: String,
449 platforms: Vec<String>,
450 triggered_by: Vec<String>,
451 surfaces: Vec<String>,
452) -> Result<crate::tick::Check, String> {
453 harvested_test_check(selector, verified_at_sha, platforms, triggered_by, surfaces)
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 #[test]
461 fn extract_gitlog_should_yield_one_record_per_round_header_when_given_a_chat_room_log() {
462 // given: a chat-room log with two `## R<N>` decision records, one carrying a rejected road
463 let text = "\
464## R2289 QA — restore-safety counter DB-backed
465- rejected: Redis: would add a new infra dependency
466## R2290 Dev — ship the cross-pod drain
467some prose nobody parses for grounds
468";
469
470 // when: the gitlog extractor reads it
471 let recs = extract_gitlog(text);
472
473 // then: two records, keyed by their round token, the first carrying the structured road
474 assert_eq!(recs.len(), 2);
475 assert_eq!(recs[0].source_key, "R2289");
476 assert_eq!(recs[0].decision, "QA — restore-safety counter DB-backed");
477 assert_eq!(recs[0].grounds.len(), 1);
478 assert_eq!(recs[0].grounds[0].supports, "rejected:Redis");
479 assert_eq!(recs[1].source_key, "R2290");
480 assert!(recs[0].observe.contains("R2289"));
481 }
482
483 #[test]
484 fn extract_to_human_should_read_a_resolved_block_when_given_the_authority_substrate() {
485 // given: a to-human doc with a RESOLVED ruling and a FLAG (open) one
486 let text = "\
487### RESOLVED R555: restore-safety counter DB-backed; reject Redis
488- rejected: Redis: a new infra dependency
489### FLAG R600: multi-pod relax policy still open
490";
491
492 // when: the to-human extractor reads it
493 let recs = extract_to_human(text);
494
495 // then: both blocks are captured; the RESOLVED one carries its structured road
496 assert_eq!(recs.len(), 2);
497 assert_eq!(recs[0].source_key, "R555");
498 assert_eq!(
499 recs[0].decision,
500 "restore-safety counter DB-backed; reject Redis"
501 );
502 assert_eq!(recs[0].grounds.len(), 1);
503 assert_eq!(recs[1].source_key, "R600");
504 }
505
506 #[test]
507 fn extract_escalation_should_reuse_the_resolved_flag_reader_when_given_an_escalation_log() {
508 // given: an escalation log in the SAME RESOLVED/FLAG shape (path-parameterized reader)
509 let text = "### FLAG #1194: re-milestoned without sign-off\n";
510
511 // when: the escalation extractor reads it
512 let recs = extract_escalation(text);
513
514 // then: it is read identically to to-human (no hardcoded layout of its own)
515 assert_eq!(recs.len(), 1);
516 assert_eq!(recs[0].source_key, "#1194");
517 assert_eq!(recs[0].decision, "re-milestoned without sign-off");
518 }
519
520 #[test]
521 fn extract_decisions_immutable_should_split_on_numbered_sections_when_given_a_doc() {
522 // given: a decisions-immutable doc split into numbered sections
523 let text = "\
524## 1. freeze the retrieval schema for v2
525- rejected: pgvector: would lock our schema
526## 2. restore-safety counter DB-backed
527";
528
529 // when: the decisions-immutable extractor reads it
530 let recs = extract_decisions_immutable(text);
531
532 // then: one record per section, keyed by §N, the first carrying its structured road
533 assert_eq!(recs.len(), 2);
534 assert_eq!(recs[0].source_key, "§1");
535 assert_eq!(recs[0].decision, "freeze the retrieval schema for v2");
536 assert_eq!(recs[0].grounds.len(), 1);
537 assert_eq!(recs[1].source_key, "§2");
538 }
539
540 #[test]
541 fn grounds_are_never_synthesized_when_a_block_has_no_structured_rejected_road() {
542 // given: a record whose body is pure prose mentioning a rejected option WITHOUT the
543 // structured `rejected:<opt>: <why>` token — an NLP'able sentence we must NOT mine
544 let text = "\
545## R2289 we considered Redis but rejected it because it adds infra
546this paragraph explains at length why redis was rejected, in prose
547";
548
549 // when: the gitlog extractor reads it
550 let recs = extract_gitlog(text);
551
552 // then: the record exists but carries ZERO grounds — reasons are never NLP'd into grounds
553 assert_eq!(recs.len(), 1);
554 assert!(
555 recs[0].grounds.is_empty(),
556 "a prose reason must NEVER become a ground (no synthesis)"
557 );
558 }
559}