Skip to main content

lex_vcs/
migrate.rs

1//! Operation-format migration (#244).
2//!
3//! When [`crate::OperationFormat`] gains a new variant, every existing
4//! `OpId` rotates: the canonical pre-image bytes change, so the
5//! SHA-256 digest changes. A naïve in-place rewrite would also
6//! invalidate every parent reference in dependent ops.
7//!
8//! This module computes a [`MigrationPlan`] — a topologically-ordered
9//! list of [`MigrationStep`]s, one per op, each containing the new
10//! `OpId` and the new [`OperationRecord`] with parents already
11//! remapped — and then writes the new files in a two-phase
12//! `apply_migration`: write-new, then delete-old. The intermediate
13//! state has both old and new files coexisting (each consistent
14//! within its own version), so a crash mid-migration leaves the
15//! store readable.
16//!
17//! # What's in scope
18//!
19//! - The op log under `<root>/ops/<op_id>.json`. Parents are remapped
20//!   transitively; the topological sort guarantees a parent is
21//!   migrated before any child.
22//!
23//! # What's NOT in scope (yet)
24//!
25//! - **Branch heads** (`<root>/branches/<name>.json`) reference op_ids.
26//!   The CLI ([`lex store migrate-ops`](../../../lex_cli/index.html))
27//!   walks the branch directory after `apply_migration` and rewrites
28//!   each branch's `head_op` through the returned mapping.
29//! - **Attestations** carry an `op_id` field and their own `attestation_id`
30//!   is computed including that op_id, so they cascade. Attestation
31//!   migration is a follow-up — see the CHANGELOG entry for #244.
32//! - **Intents** don't reference op_ids; ops reference intents. No
33//!   action needed here.
34
35use crate::op_log::OpLog;
36use crate::operation::{OpId, Operation, OperationFormat, OperationRecord};
37use std::collections::{BTreeMap, BTreeSet, VecDeque};
38use std::io;
39
40/// Describes the work needed to move an op log from one canonical
41/// form to another. Built by [`plan_migration`]; consumed by
42/// [`apply_migration`].
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct MigrationPlan {
45    pub from: OperationFormat,
46    pub to: OperationFormat,
47    /// Steps in topological order — every step's parents have
48    /// already appeared earlier in the list, so applying in order
49    /// keeps the partial DAG self-consistent.
50    pub steps: Vec<MigrationStep>,
51}
52
53/// One op's migration plan.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct MigrationStep {
56    pub old_op_id: OpId,
57    pub new_op_id: OpId,
58    /// The [`OperationRecord`] that will replace `old_op_id`'s file.
59    /// Its `op.parents` already reference the *new* op_ids; its
60    /// `op_id` field equals `new_op_id`; its `format_version` equals
61    /// the plan's `to` field.
62    pub new_record: OperationRecord,
63}
64
65impl MigrationPlan {
66    /// `true` if every step's old and new `op_id` agree — applying
67    /// the plan would be a no-op. Note that `from == to` does **not**
68    /// imply this: tests inject custom encoders that produce
69    /// different bytes for the same source format, in which case
70    /// `from == to == V1` but the migration is meaningful.
71    pub fn is_no_op(&self) -> bool {
72        self.steps.iter().all(|s| s.old_op_id == s.new_op_id)
73    }
74
75    /// Old → new op_id mapping in deterministic order.
76    pub fn mapping(&self) -> BTreeMap<OpId, OpId> {
77        self.steps
78            .iter()
79            .map(|s| (s.old_op_id.clone(), s.new_op_id.clone()))
80            .collect()
81    }
82}
83
84/// Plan a migration to `target`, using the encoder paired with that
85/// format. Production callers want this; tests that need to inject a
86/// different encoder (to simulate a future variant without adding it
87/// to the production enum) should use [`plan_migration_with_encoder`].
88pub fn plan_migration(log: &OpLog, target: OperationFormat) -> io::Result<MigrationPlan> {
89    plan_migration_with_encoder(log, target, |op| op.canonical_bytes_in(target))
90}
91
92/// Plan a migration with a custom canonical encoder. Used by the
93/// conformance test in `tests/migrate.rs` to simulate a hypothetical
94/// V2 by adding a synthetic suffix to V1's pre-image, without
95/// requiring a placeholder variant in production code.
96pub fn plan_migration_with_encoder<F>(
97    log: &OpLog,
98    target: OperationFormat,
99    encoder: F,
100) -> io::Result<MigrationPlan>
101where
102    F: Fn(&Operation) -> Vec<u8>,
103{
104    let all = log.list_all()?;
105    let mut by_id: BTreeMap<OpId, OperationRecord> = BTreeMap::new();
106    for rec in all {
107        by_id.insert(rec.op_id.clone(), rec);
108    }
109    let topo = topological_sort(&by_id)?;
110    let from = detect_from_format(&by_id);
111
112    let mut mapping: BTreeMap<OpId, OpId> = BTreeMap::new();
113    let mut steps = Vec::with_capacity(topo.len());
114    for old_id in topo {
115        let rec = by_id
116            .get(&old_id)
117            .ok_or_else(|| io::Error::other(format!("op {old_id} disappeared during migration")))?
118            .clone();
119        let remapped_parents: Vec<OpId> = rec
120            .op
121            .parents
122            .iter()
123            .map(|p| {
124                mapping
125                    .get(p)
126                    .cloned()
127                    // A parent without a mapping is a parent we
128                    // didn't see during list_all — treat as a
129                    // dangling reference and preserve the original
130                    // string. The apply step will surface this as a
131                    // broken DAG when it tries to chase the missing
132                    // op file.
133                    .unwrap_or_else(|| p.clone())
134            })
135            .collect();
136        let new_op = Operation {
137            kind: rec.op.kind.clone(),
138            parents: remapped_parents,
139            intent_id: rec.op.intent_id.clone(),
140        };
141        let new_bytes = encoder(&new_op);
142        let new_op_id = crate::canonical::hash_bytes(&new_bytes);
143        let new_record = OperationRecord {
144            op_id: new_op_id.clone(),
145            format_version: target,
146            op: new_op,
147            produces: rec.produces.clone(),
148        };
149        mapping.insert(old_id.clone(), new_op_id.clone());
150        steps.push(MigrationStep {
151            old_op_id: old_id,
152            new_op_id,
153            new_record,
154        });
155    }
156
157    Ok(MigrationPlan {
158        from,
159        to: target,
160        steps,
161    })
162}
163
164/// Apply a [`MigrationPlan`] to a live op log. Two-phase:
165///
166/// 1. Write every new `<new_op_id>.json` file. Idempotent — pre-
167///    existing files (including the rare case where new == old) are
168///    no-ops.
169/// 2. Delete every old `<old_op_id>.json` file whose new id is
170///    different.
171///
172/// On crash between phases the store is double-sized but readable;
173/// re-running the plan converges. **Branch heads and attestations
174/// are not rewritten by this function** — see module docs.
175pub fn apply_migration(log: &OpLog, plan: &MigrationPlan) -> io::Result<()> {
176    if plan.is_no_op() {
177        return Ok(());
178    }
179
180    // Phase 1: write all new records.
181    for step in &plan.steps {
182        log.put(&step.new_record)?;
183    }
184
185    // Phase 2: delete old files whose ids changed. We collect the
186    // set of new ids first so we never delete a file that's also
187    // a new file (the `new == old` case).
188    let new_ids: BTreeSet<&OpId> = plan.steps.iter().map(|s| &s.new_op_id).collect();
189    for step in &plan.steps {
190        if step.old_op_id != step.new_op_id && !new_ids.contains(&step.old_op_id) {
191            log.delete(&step.old_op_id)?;
192        }
193    }
194
195    Ok(())
196}
197
198// ---------------------------------------------- attestation cascade (#258)
199
200/// Plan + apply step for one attestation whose `op_id` is in the
201/// op-migration mapping (#258).
202#[derive(Debug, Clone, PartialEq, Eq)]
203pub struct AttestationMigrationStep {
204    pub old: crate::attestation::Attestation,
205    pub new: crate::attestation::Attestation,
206}
207
208impl AttestationMigrationStep {
209    /// `true` when nothing actually rotated — the new and old
210    /// `attestation_id`s match, so apply is a no-op.
211    pub fn is_no_op(&self) -> bool {
212        self.old.attestation_id == self.new.attestation_id
213    }
214}
215
216/// Plan an attestation migration that follows an op-id rotation.
217/// `op_mapping` is `MigrationPlan::mapping()` — old op_id → new
218/// op_id. For every attestation whose `op_id` is in the mapping
219/// (i.e. the op got rotated), build a new attestation pointing at
220/// the new op_id and re-derive its `attestation_id` from the
221/// canonical form.
222///
223/// Attestations whose `op_id` is `None` (`Override`,
224/// `ProducerBlock`, `Defer`, etc.) are unaffected — they don't
225/// participate in the cascade.
226///
227/// Steps are returned in deterministic order (sorted by old
228/// `attestation_id`), independent of filesystem listing order, so
229/// `--dry-run` output is reproducible.
230pub fn plan_attestation_migration(
231    log: &crate::attestation::AttestationLog,
232    op_mapping: &BTreeMap<OpId, OpId>,
233) -> io::Result<Vec<AttestationMigrationStep>> {
234    if op_mapping.is_empty() {
235        return Ok(Vec::new());
236    }
237    let mut all = log.list_all()?;
238    all.sort_by(|a, b| a.attestation_id.cmp(&b.attestation_id));
239    let mut steps = Vec::new();
240    for old in all {
241        let Some(old_op_id) = old.op_id.as_ref() else { continue };
242        let Some(new_op_id) = op_mapping.get(old_op_id) else { continue };
243        if old_op_id == new_op_id {
244            // Mapping recorded but nothing rotated for this op
245            // (e.g. plan.is_no_op()); skip rather than emit a
246            // pointless step.
247            continue;
248        }
249        // Re-derive the attestation under the new op_id. Same
250        // stage_id, intent_id, kind, result, producer, cost,
251        // timestamp — only op_id changes.
252        let new = crate::attestation::Attestation::with_timestamp(
253            old.stage_id.clone(),
254            Some(new_op_id.clone()),
255            old.intent_id.clone(),
256            old.kind.clone(),
257            old.result.clone(),
258            old.produced_by.clone(),
259            old.cost.clone(),
260            old.timestamp,
261        );
262        steps.push(AttestationMigrationStep { old, new });
263    }
264    Ok(steps)
265}
266
267/// Apply an attestation migration plan. Two-phase: write all new
268/// attestations (with their new attestation_ids) first, then
269/// delete the old ones whose ids changed. Mirrors
270/// [`apply_migration`]'s crash-safety story — between phases the
271/// log is double-sized but readable; re-running converges.
272pub fn apply_attestation_migration(
273    log: &crate::attestation::AttestationLog,
274    steps: &[AttestationMigrationStep],
275) -> io::Result<()> {
276    if steps.is_empty() {
277        return Ok(());
278    }
279    // Phase 1: write all new attestations (with new
280    // attestation_ids and their by-stage / by-run index entries).
281    for step in steps {
282        if step.is_no_op() {
283            continue;
284        }
285        log.put(&step.new)?;
286    }
287    // Phase 2: delete the old ones whose ids changed. Collect new
288    // ids first so we never delete a file that's also a new file
289    // (the rare new == old case is filtered by `is_no_op`).
290    let new_ids: BTreeSet<&crate::attestation::AttestationId> =
291        steps.iter().map(|s| &s.new.attestation_id).collect();
292    for step in steps {
293        if step.is_no_op() {
294            continue;
295        }
296        if !new_ids.contains(&step.old.attestation_id) {
297            log.delete(&step.old)?;
298        }
299    }
300    Ok(())
301}
302
303/// Topological sort of the op DAG (parents before children). Stable
304/// across runs because we process nodes in `BTreeMap` iteration
305/// order — sorted by `op_id` — which matches the deterministic
306/// canonical-form requirement.
307fn topological_sort(by_id: &BTreeMap<OpId, OperationRecord>) -> io::Result<Vec<OpId>> {
308    let mut indegree: BTreeMap<&OpId, usize> = BTreeMap::new();
309    for id in by_id.keys() {
310        indegree.insert(id, 0);
311    }
312    for rec in by_id.values() {
313        for parent in &rec.op.parents {
314            // Only count parents that are present in the log; a
315            // missing parent is a dangling reference, not a cycle.
316            if by_id.contains_key(parent) {
317                *indegree.entry(&rec.op_id).or_insert(0) += 1;
318            }
319        }
320    }
321
322    // Kahn: start with all zero-indegree nodes, process in BTreeMap
323    // order (which is sorted). Each processed node decrements its
324    // children's indegrees.
325    let mut queue: VecDeque<OpId> = indegree
326        .iter()
327        .filter(|(_, d)| **d == 0)
328        .map(|(id, _)| (*id).clone())
329        .collect();
330    let mut out = Vec::with_capacity(by_id.len());
331    while let Some(id) = queue.pop_front() {
332        out.push(id.clone());
333        // Find children of `id` — ops whose parents include `id`.
334        // BTreeMap iteration is sorted, so children-discovery order
335        // is deterministic.
336        for (child_id, child_rec) in by_id {
337            if child_rec.op.parents.contains(&id) {
338                let d = indegree.get_mut(child_id).expect("indegree present");
339                *d -= 1;
340                if *d == 0 {
341                    queue.push_back(child_id.clone());
342                }
343            }
344        }
345    }
346
347    if out.len() != by_id.len() {
348        return Err(io::Error::other(format!(
349            "op log has a cycle or unreachable component: {} of {} ops topologically sortable",
350            out.len(),
351            by_id.len(),
352        )));
353    }
354    Ok(out)
355}
356
357/// Best-effort guess at the source format. If every record carries
358/// V1 (or is missing the field — `serde(default)` deserializes to
359/// V1), the answer is V1. A mixed log (which today shouldn't happen)
360/// surfaces the most-common version; future variants will need a
361/// finer answer when partial migrations exist.
362fn detect_from_format(by_id: &BTreeMap<OpId, OperationRecord>) -> OperationFormat {
363    let mut counts: BTreeMap<OperationFormat, usize> = BTreeMap::new();
364    for rec in by_id.values() {
365        *counts.entry(rec.format_version).or_default() += 1;
366    }
367    counts
368        .into_iter()
369        .max_by_key(|(_, n)| *n)
370        .map(|(f, _)| f)
371        .unwrap_or(OperationFormat::V1)
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use crate::operation::{OperationKind, StageTransition};
378    use std::collections::BTreeSet;
379
380    fn add_op(parent: Option<&OpId>, sig: &str, stg: &str) -> OperationRecord {
381        let parents: Vec<OpId> = parent.cloned().into_iter().collect();
382        OperationRecord::new(
383            Operation::new(
384                OperationKind::AddFunction {
385                    sig_id: sig.into(),
386                    stage_id: stg.into(),
387                    effects: BTreeSet::new(),
388                    budget_cost: None,
389                },
390                parents,
391            ),
392            StageTransition::Create {
393                sig_id: sig.into(),
394                stage_id: stg.into(),
395            },
396        )
397    }
398
399    #[test]
400    fn migration_to_same_format_is_a_no_op() {
401        let tmp = tempfile::tempdir().unwrap();
402        let log = OpLog::open(tmp.path()).unwrap();
403        let a = add_op(None, "fac", "s0");
404        log.put(&a).unwrap();
405        let b = add_op(Some(&a.op_id), "fac2", "s1");
406        log.put(&b).unwrap();
407
408        let plan = plan_migration(&log, OperationFormat::V1).unwrap();
409        assert_eq!(plan.from, OperationFormat::V1);
410        assert_eq!(plan.to, OperationFormat::V1);
411        assert!(plan.is_no_op());
412        for step in &plan.steps {
413            assert_eq!(step.old_op_id, step.new_op_id);
414        }
415        // apply_migration on a no-op leaves the log untouched.
416        apply_migration(&log, &plan).unwrap();
417        assert!(log.get(&a.op_id).unwrap().is_some());
418        assert!(log.get(&b.op_id).unwrap().is_some());
419    }
420
421    #[test]
422    fn topological_sort_orders_parents_before_children() {
423        let tmp = tempfile::tempdir().unwrap();
424        let log = OpLog::open(tmp.path()).unwrap();
425        let a = add_op(None, "fac", "s0");
426        log.put(&a).unwrap();
427        let b = add_op(Some(&a.op_id), "fac2", "s1");
428        log.put(&b).unwrap();
429        let c = add_op(Some(&b.op_id), "fac3", "s2");
430        log.put(&c).unwrap();
431
432        let plan = plan_migration(&log, OperationFormat::V1).unwrap();
433        let order: Vec<_> = plan.steps.iter().map(|s| s.old_op_id.as_str()).collect();
434        let pos = |id: &str| order.iter().position(|x| *x == id).unwrap();
435        assert!(pos(&a.op_id) < pos(&b.op_id));
436        assert!(pos(&b.op_id) < pos(&c.op_id));
437    }
438
439    #[test]
440    fn empty_log_yields_empty_plan() {
441        let tmp = tempfile::tempdir().unwrap();
442        let log = OpLog::open(tmp.path()).unwrap();
443        let plan = plan_migration(&log, OperationFormat::V1).unwrap();
444        assert!(plan.steps.is_empty());
445        assert!(plan.is_no_op());
446    }
447}