lex_vcs/migrate.rs
1//! Operation-format migration (#244).
2//!
3//! When [`crate::OperationFormat`] gains a new variant, every existing
4//! `OpId` rotates: the canonical pre-image bytes change, so the
5//! SHA-256 digest changes. A naïve in-place rewrite would also
6//! invalidate every parent reference in dependent ops.
7//!
8//! This module computes a [`MigrationPlan`] — a topologically-ordered
9//! list of [`MigrationStep`]s, one per op, each containing the new
10//! `OpId` and the new [`OperationRecord`] with parents already
11//! remapped — and then writes the new files in a two-phase
12//! `apply_migration`: write-new, then delete-old. The intermediate
13//! state has both old and new files coexisting (each consistent
14//! within its own version), so a crash mid-migration leaves the
15//! store readable.
16//!
17//! # What's in scope
18//!
19//! - The op log under `<root>/ops/<op_id>.json`. Parents are remapped
20//! transitively; the topological sort guarantees a parent is
21//! migrated before any child.
22//!
23//! # What's NOT in scope (yet)
24//!
25//! - **Branch heads** (`<root>/branches/<name>.json`) reference op_ids.
26//! The CLI ([`lex store migrate-ops`](../../../lex_cli/index.html))
27//! walks the branch directory after `apply_migration` and rewrites
28//! each branch's `head_op` through the returned mapping.
29//! - **Attestations** carry an `op_id` field and their own `attestation_id`
30//! is computed including that op_id, so they cascade. Attestation
31//! migration is a follow-up — see the CHANGELOG entry for #244.
32//! - **Intents** don't reference op_ids; ops reference intents. No
33//! action needed here.
34
35use crate::op_log::OpLog;
36use crate::operation::{OpId, Operation, OperationFormat, OperationRecord};
37use std::collections::{BTreeMap, BTreeSet, VecDeque};
38use std::io;
39
40/// Describes the work needed to move an op log from one canonical
41/// form to another. Built by [`plan_migration`]; consumed by
42/// [`apply_migration`].
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct MigrationPlan {
45 pub from: OperationFormat,
46 pub to: OperationFormat,
47 /// Steps in topological order — every step's parents have
48 /// already appeared earlier in the list, so applying in order
49 /// keeps the partial DAG self-consistent.
50 pub steps: Vec<MigrationStep>,
51}
52
53/// One op's migration plan.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct MigrationStep {
56 pub old_op_id: OpId,
57 pub new_op_id: OpId,
58 /// The [`OperationRecord`] that will replace `old_op_id`'s file.
59 /// Its `op.parents` already reference the *new* op_ids; its
60 /// `op_id` field equals `new_op_id`; its `format_version` equals
61 /// the plan's `to` field.
62 pub new_record: OperationRecord,
63}
64
65impl MigrationPlan {
66 /// `true` if every step's old and new `op_id` agree — applying
67 /// the plan would be a no-op. Note that `from == to` does **not**
68 /// imply this: tests inject custom encoders that produce
69 /// different bytes for the same source format, in which case
70 /// `from == to == V1` but the migration is meaningful.
71 pub fn is_no_op(&self) -> bool {
72 self.steps.iter().all(|s| s.old_op_id == s.new_op_id)
73 }
74
75 /// Old → new op_id mapping in deterministic order.
76 pub fn mapping(&self) -> BTreeMap<OpId, OpId> {
77 self.steps
78 .iter()
79 .map(|s| (s.old_op_id.clone(), s.new_op_id.clone()))
80 .collect()
81 }
82}
83
84/// Plan a migration to `target`, using the encoder paired with that
85/// format. Production callers want this; tests that need to inject a
86/// different encoder (to simulate a future variant without adding it
87/// to the production enum) should use [`plan_migration_with_encoder`].
88pub fn plan_migration(log: &OpLog, target: OperationFormat) -> io::Result<MigrationPlan> {
89 plan_migration_with_encoder(log, target, |op| op.canonical_bytes_in(target))
90}
91
92/// Plan a migration with a custom canonical encoder. Used by the
93/// conformance test in `tests/migrate.rs` to simulate a hypothetical
94/// V2 by adding a synthetic suffix to V1's pre-image, without
95/// requiring a placeholder variant in production code.
96pub fn plan_migration_with_encoder<F>(
97 log: &OpLog,
98 target: OperationFormat,
99 encoder: F,
100) -> io::Result<MigrationPlan>
101where
102 F: Fn(&Operation) -> Vec<u8>,
103{
104 let all = log.list_all()?;
105 let mut by_id: BTreeMap<OpId, OperationRecord> = BTreeMap::new();
106 for rec in all {
107 by_id.insert(rec.op_id.clone(), rec);
108 }
109 let topo = topological_sort(&by_id)?;
110 let from = detect_from_format(&by_id);
111
112 let mut mapping: BTreeMap<OpId, OpId> = BTreeMap::new();
113 let mut steps = Vec::with_capacity(topo.len());
114 for old_id in topo {
115 let rec = by_id
116 .get(&old_id)
117 .ok_or_else(|| io::Error::other(format!("op {old_id} disappeared during migration")))?
118 .clone();
119 let remapped_parents: Vec<OpId> = rec
120 .op
121 .parents
122 .iter()
123 .map(|p| {
124 mapping
125 .get(p)
126 .cloned()
127 // A parent without a mapping is a parent we
128 // didn't see during list_all — treat as a
129 // dangling reference and preserve the original
130 // string. The apply step will surface this as a
131 // broken DAG when it tries to chase the missing
132 // op file.
133 .unwrap_or_else(|| p.clone())
134 })
135 .collect();
136 let new_op = Operation {
137 kind: rec.op.kind.clone(),
138 parents: remapped_parents,
139 intent_id: rec.op.intent_id.clone(),
140 };
141 let new_bytes = encoder(&new_op);
142 let new_op_id = crate::canonical::hash_bytes(&new_bytes);
143 let new_record = OperationRecord {
144 op_id: new_op_id.clone(),
145 format_version: target,
146 op: new_op,
147 produces: rec.produces.clone(),
148 };
149 mapping.insert(old_id.clone(), new_op_id.clone());
150 steps.push(MigrationStep {
151 old_op_id: old_id,
152 new_op_id,
153 new_record,
154 });
155 }
156
157 Ok(MigrationPlan {
158 from,
159 to: target,
160 steps,
161 })
162}
163
164/// Apply a [`MigrationPlan`] to a live op log. Two-phase:
165///
166/// 1. Write every new `<new_op_id>.json` file. Idempotent — pre-
167/// existing files (including the rare case where new == old) are
168/// no-ops.
169/// 2. Delete every old `<old_op_id>.json` file whose new id is
170/// different.
171///
172/// On crash between phases the store is double-sized but readable;
173/// re-running the plan converges. **Branch heads and attestations
174/// are not rewritten by this function** — see module docs.
175pub fn apply_migration(log: &OpLog, plan: &MigrationPlan) -> io::Result<()> {
176 if plan.is_no_op() {
177 return Ok(());
178 }
179
180 // Phase 1: write all new records.
181 for step in &plan.steps {
182 log.put(&step.new_record)?;
183 }
184
185 // Phase 2: delete old files whose ids changed. We collect the
186 // set of new ids first so we never delete a file that's also
187 // a new file (the `new == old` case).
188 let new_ids: BTreeSet<&OpId> = plan.steps.iter().map(|s| &s.new_op_id).collect();
189 for step in &plan.steps {
190 if step.old_op_id != step.new_op_id && !new_ids.contains(&step.old_op_id) {
191 log.delete(&step.old_op_id)?;
192 }
193 }
194
195 Ok(())
196}
197
198// ---------------------------------------------- attestation cascade (#258)
199
200/// Plan + apply step for one attestation whose `op_id` is in the
201/// op-migration mapping (#258).
202#[derive(Debug, Clone, PartialEq, Eq)]
203pub struct AttestationMigrationStep {
204 pub old: crate::attestation::Attestation,
205 pub new: crate::attestation::Attestation,
206}
207
208impl AttestationMigrationStep {
209 /// `true` when nothing actually rotated — the new and old
210 /// `attestation_id`s match, so apply is a no-op.
211 pub fn is_no_op(&self) -> bool {
212 self.old.attestation_id == self.new.attestation_id
213 }
214}
215
216/// Plan an attestation migration that follows an op-id rotation.
217/// `op_mapping` is `MigrationPlan::mapping()` — old op_id → new
218/// op_id. For every attestation whose `op_id` is in the mapping
219/// (i.e. the op got rotated), build a new attestation pointing at
220/// the new op_id and re-derive its `attestation_id` from the
221/// canonical form.
222///
223/// Attestations whose `op_id` is `None` (`Override`,
224/// `ProducerBlock`, `Defer`, etc.) are unaffected — they don't
225/// participate in the cascade.
226///
227/// Steps are returned in deterministic order (sorted by old
228/// `attestation_id`), independent of filesystem listing order, so
229/// `--dry-run` output is reproducible.
230pub fn plan_attestation_migration(
231 log: &crate::attestation::AttestationLog,
232 op_mapping: &BTreeMap<OpId, OpId>,
233) -> io::Result<Vec<AttestationMigrationStep>> {
234 if op_mapping.is_empty() {
235 return Ok(Vec::new());
236 }
237 let mut all = log.list_all()?;
238 all.sort_by(|a, b| a.attestation_id.cmp(&b.attestation_id));
239 let mut steps = Vec::new();
240 for old in all {
241 let Some(old_op_id) = old.op_id.as_ref() else { continue };
242 let Some(new_op_id) = op_mapping.get(old_op_id) else { continue };
243 if old_op_id == new_op_id {
244 // Mapping recorded but nothing rotated for this op
245 // (e.g. plan.is_no_op()); skip rather than emit a
246 // pointless step.
247 continue;
248 }
249 // Re-derive the attestation under the new op_id. Same
250 // stage_id, intent_id, kind, result, producer, cost,
251 // timestamp — only op_id changes.
252 let new = crate::attestation::Attestation::with_timestamp(
253 old.stage_id.clone(),
254 Some(new_op_id.clone()),
255 old.intent_id.clone(),
256 old.kind.clone(),
257 old.result.clone(),
258 old.produced_by.clone(),
259 old.cost.clone(),
260 old.timestamp,
261 );
262 steps.push(AttestationMigrationStep { old, new });
263 }
264 Ok(steps)
265}
266
267/// Apply an attestation migration plan. Two-phase: write all new
268/// attestations (with their new attestation_ids) first, then
269/// delete the old ones whose ids changed. Mirrors
270/// [`apply_migration`]'s crash-safety story — between phases the
271/// log is double-sized but readable; re-running converges.
272pub fn apply_attestation_migration(
273 log: &crate::attestation::AttestationLog,
274 steps: &[AttestationMigrationStep],
275) -> io::Result<()> {
276 if steps.is_empty() {
277 return Ok(());
278 }
279 // Phase 1: write all new attestations (with new
280 // attestation_ids and their by-stage / by-run index entries).
281 for step in steps {
282 if step.is_no_op() {
283 continue;
284 }
285 log.put(&step.new)?;
286 }
287 // Phase 2: delete the old ones whose ids changed. Collect new
288 // ids first so we never delete a file that's also a new file
289 // (the rare new == old case is filtered by `is_no_op`).
290 let new_ids: BTreeSet<&crate::attestation::AttestationId> =
291 steps.iter().map(|s| &s.new.attestation_id).collect();
292 for step in steps {
293 if step.is_no_op() {
294 continue;
295 }
296 if !new_ids.contains(&step.old.attestation_id) {
297 log.delete(&step.old)?;
298 }
299 }
300 Ok(())
301}
302
303/// Topological sort of the op DAG (parents before children). Stable
304/// across runs because we process nodes in `BTreeMap` iteration
305/// order — sorted by `op_id` — which matches the deterministic
306/// canonical-form requirement.
307fn topological_sort(by_id: &BTreeMap<OpId, OperationRecord>) -> io::Result<Vec<OpId>> {
308 let mut indegree: BTreeMap<&OpId, usize> = BTreeMap::new();
309 for id in by_id.keys() {
310 indegree.insert(id, 0);
311 }
312 for rec in by_id.values() {
313 for parent in &rec.op.parents {
314 // Only count parents that are present in the log; a
315 // missing parent is a dangling reference, not a cycle.
316 if by_id.contains_key(parent) {
317 *indegree.entry(&rec.op_id).or_insert(0) += 1;
318 }
319 }
320 }
321
322 // Kahn: start with all zero-indegree nodes, process in BTreeMap
323 // order (which is sorted). Each processed node decrements its
324 // children's indegrees.
325 let mut queue: VecDeque<OpId> = indegree
326 .iter()
327 .filter(|(_, d)| **d == 0)
328 .map(|(id, _)| (*id).clone())
329 .collect();
330 let mut out = Vec::with_capacity(by_id.len());
331 while let Some(id) = queue.pop_front() {
332 out.push(id.clone());
333 // Find children of `id` — ops whose parents include `id`.
334 // BTreeMap iteration is sorted, so children-discovery order
335 // is deterministic.
336 for (child_id, child_rec) in by_id {
337 if child_rec.op.parents.contains(&id) {
338 let d = indegree.get_mut(child_id).expect("indegree present");
339 *d -= 1;
340 if *d == 0 {
341 queue.push_back(child_id.clone());
342 }
343 }
344 }
345 }
346
347 if out.len() != by_id.len() {
348 return Err(io::Error::other(format!(
349 "op log has a cycle or unreachable component: {} of {} ops topologically sortable",
350 out.len(),
351 by_id.len(),
352 )));
353 }
354 Ok(out)
355}
356
357/// Best-effort guess at the source format. If every record carries
358/// V1 (or is missing the field — `serde(default)` deserializes to
359/// V1), the answer is V1. A mixed log (which today shouldn't happen)
360/// surfaces the most-common version; future variants will need a
361/// finer answer when partial migrations exist.
362fn detect_from_format(by_id: &BTreeMap<OpId, OperationRecord>) -> OperationFormat {
363 let mut counts: BTreeMap<OperationFormat, usize> = BTreeMap::new();
364 for rec in by_id.values() {
365 *counts.entry(rec.format_version).or_default() += 1;
366 }
367 counts
368 .into_iter()
369 .max_by_key(|(_, n)| *n)
370 .map(|(f, _)| f)
371 .unwrap_or(OperationFormat::V1)
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377 use crate::operation::{OperationKind, StageTransition};
378 use std::collections::BTreeSet;
379
380 fn add_op(parent: Option<&OpId>, sig: &str, stg: &str) -> OperationRecord {
381 let parents: Vec<OpId> = parent.cloned().into_iter().collect();
382 OperationRecord::new(
383 Operation::new(
384 OperationKind::AddFunction {
385 sig_id: sig.into(),
386 stage_id: stg.into(),
387 effects: BTreeSet::new(),
388 budget_cost: None,
389 },
390 parents,
391 ),
392 StageTransition::Create {
393 sig_id: sig.into(),
394 stage_id: stg.into(),
395 },
396 )
397 }
398
399 #[test]
400 fn migration_to_same_format_is_a_no_op() {
401 let tmp = tempfile::tempdir().unwrap();
402 let log = OpLog::open(tmp.path()).unwrap();
403 let a = add_op(None, "fac", "s0");
404 log.put(&a).unwrap();
405 let b = add_op(Some(&a.op_id), "fac2", "s1");
406 log.put(&b).unwrap();
407
408 let plan = plan_migration(&log, OperationFormat::V1).unwrap();
409 assert_eq!(plan.from, OperationFormat::V1);
410 assert_eq!(plan.to, OperationFormat::V1);
411 assert!(plan.is_no_op());
412 for step in &plan.steps {
413 assert_eq!(step.old_op_id, step.new_op_id);
414 }
415 // apply_migration on a no-op leaves the log untouched.
416 apply_migration(&log, &plan).unwrap();
417 assert!(log.get(&a.op_id).unwrap().is_some());
418 assert!(log.get(&b.op_id).unwrap().is_some());
419 }
420
421 #[test]
422 fn topological_sort_orders_parents_before_children() {
423 let tmp = tempfile::tempdir().unwrap();
424 let log = OpLog::open(tmp.path()).unwrap();
425 let a = add_op(None, "fac", "s0");
426 log.put(&a).unwrap();
427 let b = add_op(Some(&a.op_id), "fac2", "s1");
428 log.put(&b).unwrap();
429 let c = add_op(Some(&b.op_id), "fac3", "s2");
430 log.put(&c).unwrap();
431
432 let plan = plan_migration(&log, OperationFormat::V1).unwrap();
433 let order: Vec<_> = plan.steps.iter().map(|s| s.old_op_id.as_str()).collect();
434 let pos = |id: &str| order.iter().position(|x| *x == id).unwrap();
435 assert!(pos(&a.op_id) < pos(&b.op_id));
436 assert!(pos(&b.op_id) < pos(&c.op_id));
437 }
438
439 #[test]
440 fn empty_log_yields_empty_plan() {
441 let tmp = tempfile::tempdir().unwrap();
442 let log = OpLog::open(tmp.path()).unwrap();
443 let plan = plan_migration(&log, OperationFormat::V1).unwrap();
444 assert!(plan.steps.is_empty());
445 assert!(plan.is_no_op());
446 }
447}