Skip to main content

icydb_core/db/schema_evolution/
planner.rs

1//! Module: db::schema_evolution::planner
2//! Responsibility: validate schema-evolution descriptors and derive migration plans.
3//! Does not own: migration execution or commit-marker recovery.
4//! Boundary: descriptor + model targets -> deterministic `MigrationPlan`.
5
6use crate::{
7    db::{
8        identity::EntityName,
9        migration::{MigrationPlan, MigrationStep},
10        schema_evolution::{
11            SchemaMigrationDescriptor, SchemaMigrationEntityTarget, SchemaMigrationRowOp,
12            SchemaMigrationStepIntent,
13        },
14    },
15    error::InternalError,
16};
17
18///
19/// SchemaMigrationPlanner
20///
21/// SchemaMigrationPlanner is the schema-aware derivation boundary above
22/// `db::migration`.
23/// It validates descriptor identity against known runtime models and emits one
24/// deterministic row-op migration plan for the lower execution engine.
25///
26
27#[derive(Clone, Debug)]
28pub struct SchemaMigrationPlanner {
29    entities: Vec<SchemaMigrationEntityTarget>,
30}
31
32impl SchemaMigrationPlanner {
33    /// Build one planner from canonical schema-evolution entity targets.
34    pub fn new(entities: Vec<SchemaMigrationEntityTarget>) -> Result<Self, InternalError> {
35        // Phase 1: fail before planning if two runtime models claim the same
36        // canonical schema identity. Planning cannot be deterministic otherwise.
37        for (index, entity) in entities.iter().enumerate() {
38            if entities[..index]
39                .iter()
40                .any(|existing| existing.name() == entity.name())
41            {
42                return Err(InternalError::schema_evolution_duplicate_entity(
43                    entity.name().as_str(),
44                ));
45            }
46        }
47
48        Ok(Self { entities })
49    }
50
51    /// Build one planner directly from runtime entity models.
52    pub fn from_models(
53        models: &[&'static crate::model::EntityModel],
54    ) -> Result<Self, InternalError> {
55        let entities = models
56            .iter()
57            .copied()
58            .map(SchemaMigrationEntityTarget::from_model)
59            .collect::<Result<Vec<_>, _>>()?;
60
61        Self::new(entities)
62    }
63
64    /// Derive one deterministic low-level migration plan from a descriptor.
65    pub fn plan(
66        &self,
67        descriptor: &SchemaMigrationDescriptor,
68    ) -> Result<MigrationPlan, InternalError> {
69        // Phase 1: validate schema intent against canonical entity/model facts.
70        self.validate_intent(descriptor.intent())?;
71
72        // Phase 2: convert explicit schema-evolution row rewrites into migration
73        // row operations only after schema compatibility has been proven.
74        let row_ops = descriptor
75            .data_transformation()
76            .ok_or_else(|| {
77                InternalError::schema_evolution_row_ops_required(descriptor.migration_id().as_str())
78            })?
79            .row_ops();
80        if row_ops.is_empty() {
81            return Err(InternalError::schema_evolution_row_ops_required(
82                descriptor.migration_id().as_str(),
83            ));
84        }
85
86        for row_op in row_ops {
87            self.require_entity(row_op.target().name())?;
88        }
89
90        let migration_row_ops = descriptor
91            .clone()
92            .into_data_transformation()
93            .expect("descriptor data transformation already checked")
94            .into_row_ops()
95            .into_iter()
96            .map(SchemaMigrationRowOp::into_migration_row_op)
97            .collect::<Result<Vec<_>, _>>()?;
98        let step = MigrationStep::from_row_ops("schema_evolution_apply", migration_row_ops)?;
99
100        MigrationPlan::new(
101            descriptor.migration_id().as_str(),
102            descriptor.version(),
103            vec![step],
104        )
105    }
106
107    fn validate_intent(&self, intent: &SchemaMigrationStepIntent) -> Result<(), InternalError> {
108        match intent {
109            SchemaMigrationStepIntent::AddIndex { index } => {
110                let (entity, fields) = parse_index_name_parts(index.as_str())?;
111                let target = self.require_entity(entity)?;
112                for field in &fields {
113                    if target.model().resolve_field_slot(field).is_none() {
114                        return Err(InternalError::schema_evolution_unknown_field(
115                            target.name().as_str(),
116                            field,
117                        ));
118                    }
119                }
120                if target
121                    .model()
122                    .indexes()
123                    .iter()
124                    .any(|existing| existing.fields() == fields.as_slice())
125                {
126                    return Err(InternalError::schema_evolution_duplicate_index(
127                        target.name().as_str(),
128                        index.as_str(),
129                    ));
130                }
131            }
132        }
133
134        Ok(())
135    }
136
137    fn require_entity(
138        &self,
139        entity: EntityName,
140    ) -> Result<SchemaMigrationEntityTarget, InternalError> {
141        self.entities
142            .iter()
143            .copied()
144            .find(|target| target.name() == entity)
145            .ok_or_else(|| InternalError::schema_evolution_unknown_entity(entity.as_str()))
146    }
147}
148
149fn parse_index_name_parts(index: &str) -> Result<(EntityName, Vec<&str>), InternalError> {
150    let mut parts = index.split('|');
151    let entity = parts
152        .next()
153        .ok_or_else(|| InternalError::schema_evolution_invalid_index_name(index))?;
154    let entity = EntityName::try_from_str(entity).map_err(|err| {
155        InternalError::schema_evolution_invalid_identity(format!(
156            "invalid index entity segment '{entity}': {err}",
157        ))
158    })?;
159    let fields = parts.collect::<Vec<_>>();
160    if fields.is_empty() {
161        return Err(InternalError::schema_evolution_invalid_index_name(index));
162    }
163
164    Ok((entity, fields))
165}