use crate::op_log::OpLog;
use crate::operation::{OpId, Operation, OperationFormat, OperationRecord};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::io;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MigrationPlan {
pub from: OperationFormat,
pub to: OperationFormat,
pub steps: Vec<MigrationStep>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MigrationStep {
pub old_op_id: OpId,
pub new_op_id: OpId,
pub new_record: OperationRecord,
}
impl MigrationPlan {
pub fn is_no_op(&self) -> bool {
self.steps.iter().all(|s| s.old_op_id == s.new_op_id)
}
pub fn mapping(&self) -> BTreeMap<OpId, OpId> {
self.steps
.iter()
.map(|s| (s.old_op_id.clone(), s.new_op_id.clone()))
.collect()
}
}
pub fn plan_migration(log: &OpLog, target: OperationFormat) -> io::Result<MigrationPlan> {
plan_migration_with_encoder(log, target, |op| op.canonical_bytes_in(target))
}
pub fn plan_migration_with_encoder<F>(
log: &OpLog,
target: OperationFormat,
encoder: F,
) -> io::Result<MigrationPlan>
where
F: Fn(&Operation) -> Vec<u8>,
{
let all = log.list_all()?;
let mut by_id: BTreeMap<OpId, OperationRecord> = BTreeMap::new();
for rec in all {
by_id.insert(rec.op_id.clone(), rec);
}
let topo = topological_sort(&by_id)?;
let from = detect_from_format(&by_id);
let mut mapping: BTreeMap<OpId, OpId> = BTreeMap::new();
let mut steps = Vec::with_capacity(topo.len());
for old_id in topo {
let rec = by_id
.get(&old_id)
.ok_or_else(|| io::Error::other(format!("op {old_id} disappeared during migration")))?
.clone();
let remapped_parents: Vec<OpId> = rec
.op
.parents
.iter()
.map(|p| {
mapping
.get(p)
.cloned()
.unwrap_or_else(|| p.clone())
})
.collect();
let new_op = Operation {
kind: rec.op.kind.clone(),
parents: remapped_parents,
intent_id: rec.op.intent_id.clone(),
};
let new_bytes = encoder(&new_op);
let new_op_id = crate::canonical::hash_bytes(&new_bytes);
let new_record = OperationRecord {
op_id: new_op_id.clone(),
format_version: target,
op: new_op,
produces: rec.produces.clone(),
};
mapping.insert(old_id.clone(), new_op_id.clone());
steps.push(MigrationStep {
old_op_id: old_id,
new_op_id,
new_record,
});
}
Ok(MigrationPlan {
from,
to: target,
steps,
})
}
pub fn apply_migration(log: &OpLog, plan: &MigrationPlan) -> io::Result<()> {
if plan.is_no_op() {
return Ok(());
}
for step in &plan.steps {
log.put(&step.new_record)?;
}
let new_ids: BTreeSet<&OpId> = plan.steps.iter().map(|s| &s.new_op_id).collect();
for step in &plan.steps {
if step.old_op_id != step.new_op_id && !new_ids.contains(&step.old_op_id) {
log.delete(&step.old_op_id)?;
}
}
Ok(())
}
fn topological_sort(by_id: &BTreeMap<OpId, OperationRecord>) -> io::Result<Vec<OpId>> {
let mut indegree: BTreeMap<&OpId, usize> = BTreeMap::new();
for id in by_id.keys() {
indegree.insert(id, 0);
}
for rec in by_id.values() {
for parent in &rec.op.parents {
if by_id.contains_key(parent) {
*indegree.entry(&rec.op_id).or_insert(0) += 1;
}
}
}
let mut queue: VecDeque<OpId> = indegree
.iter()
.filter(|(_, d)| **d == 0)
.map(|(id, _)| (*id).clone())
.collect();
let mut out = Vec::with_capacity(by_id.len());
while let Some(id) = queue.pop_front() {
out.push(id.clone());
for (child_id, child_rec) in by_id {
if child_rec.op.parents.contains(&id) {
let d = indegree.get_mut(child_id).expect("indegree present");
*d -= 1;
if *d == 0 {
queue.push_back(child_id.clone());
}
}
}
}
if out.len() != by_id.len() {
return Err(io::Error::other(format!(
"op log has a cycle or unreachable component: {} of {} ops topologically sortable",
out.len(),
by_id.len(),
)));
}
Ok(out)
}
fn detect_from_format(by_id: &BTreeMap<OpId, OperationRecord>) -> OperationFormat {
let mut counts: BTreeMap<OperationFormat, usize> = BTreeMap::new();
for rec in by_id.values() {
*counts.entry(rec.format_version).or_default() += 1;
}
counts
.into_iter()
.max_by_key(|(_, n)| *n)
.map(|(f, _)| f)
.unwrap_or(OperationFormat::V1)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::operation::{OperationKind, StageTransition};
use std::collections::BTreeSet;
fn add_op(parent: Option<&OpId>, sig: &str, stg: &str) -> OperationRecord {
let parents: Vec<OpId> = parent.cloned().into_iter().collect();
OperationRecord::new(
Operation::new(
OperationKind::AddFunction {
sig_id: sig.into(),
stage_id: stg.into(),
effects: BTreeSet::new(),
budget_cost: None,
},
parents,
),
StageTransition::Create {
sig_id: sig.into(),
stage_id: stg.into(),
},
)
}
#[test]
fn migration_to_same_format_is_a_no_op() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op(None, "fac", "s0");
log.put(&a).unwrap();
let b = add_op(Some(&a.op_id), "fac2", "s1");
log.put(&b).unwrap();
let plan = plan_migration(&log, OperationFormat::V1).unwrap();
assert_eq!(plan.from, OperationFormat::V1);
assert_eq!(plan.to, OperationFormat::V1);
assert!(plan.is_no_op());
for step in &plan.steps {
assert_eq!(step.old_op_id, step.new_op_id);
}
apply_migration(&log, &plan).unwrap();
assert!(log.get(&a.op_id).unwrap().is_some());
assert!(log.get(&b.op_id).unwrap().is_some());
}
#[test]
fn topological_sort_orders_parents_before_children() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op(None, "fac", "s0");
log.put(&a).unwrap();
let b = add_op(Some(&a.op_id), "fac2", "s1");
log.put(&b).unwrap();
let c = add_op(Some(&b.op_id), "fac3", "s2");
log.put(&c).unwrap();
let plan = plan_migration(&log, OperationFormat::V1).unwrap();
let order: Vec<_> = plan.steps.iter().map(|s| s.old_op_id.as_str()).collect();
let pos = |id: &str| order.iter().position(|x| *x == id).unwrap();
assert!(pos(&a.op_id) < pos(&b.op_id));
assert!(pos(&b.op_id) < pos(&c.op_id));
}
#[test]
fn empty_log_yields_empty_plan() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let plan = plan_migration(&log, OperationFormat::V1).unwrap();
assert!(plan.steps.is_empty());
assert!(plan.is_no_op());
}
}