icydb_core/db/schema_evolution/
planner.rs1use crate::{
7 db::{
8 identity::EntityName,
9 migration::{MigrationPlan, MigrationStep},
10 schema_evolution::{
11 SchemaMigrationDescriptor, SchemaMigrationEntityTarget, SchemaMigrationRowOp,
12 SchemaMigrationStepIntent,
13 },
14 },
15 error::InternalError,
16};
17
18#[derive(Clone, Debug)]
28pub struct SchemaMigrationPlanner {
29 entities: Vec<SchemaMigrationEntityTarget>,
30}
31
32impl SchemaMigrationPlanner {
33 pub fn new(entities: Vec<SchemaMigrationEntityTarget>) -> Result<Self, InternalError> {
35 for (index, entity) in entities.iter().enumerate() {
38 if entities[..index]
39 .iter()
40 .any(|existing| existing.name() == entity.name())
41 {
42 return Err(InternalError::schema_evolution_duplicate_entity(
43 entity.name().as_str(),
44 ));
45 }
46 }
47
48 Ok(Self { entities })
49 }
50
51 pub fn from_models(
53 models: &[&'static crate::model::EntityModel],
54 ) -> Result<Self, InternalError> {
55 let entities = models
56 .iter()
57 .copied()
58 .map(SchemaMigrationEntityTarget::from_model)
59 .collect::<Result<Vec<_>, _>>()?;
60
61 Self::new(entities)
62 }
63
64 pub fn plan(
66 &self,
67 descriptor: &SchemaMigrationDescriptor,
68 ) -> Result<MigrationPlan, InternalError> {
69 self.validate_intent(descriptor.intent())?;
71
72 let row_ops = descriptor
75 .data_transformation()
76 .ok_or_else(|| {
77 InternalError::schema_evolution_row_ops_required(descriptor.migration_id().as_str())
78 })?
79 .row_ops();
80 if row_ops.is_empty() {
81 return Err(InternalError::schema_evolution_row_ops_required(
82 descriptor.migration_id().as_str(),
83 ));
84 }
85
86 for row_op in row_ops {
87 self.require_entity(row_op.target().name())?;
88 }
89
90 let migration_row_ops = descriptor
91 .clone()
92 .into_data_transformation()
93 .expect("descriptor data transformation already checked")
94 .into_row_ops()
95 .into_iter()
96 .map(SchemaMigrationRowOp::into_migration_row_op)
97 .collect::<Result<Vec<_>, _>>()?;
98 let step = MigrationStep::from_row_ops("schema_evolution_apply", migration_row_ops)?;
99
100 MigrationPlan::new(
101 descriptor.migration_id().as_str(),
102 descriptor.version(),
103 vec![step],
104 )
105 }
106
107 fn validate_intent(&self, intent: &SchemaMigrationStepIntent) -> Result<(), InternalError> {
108 match intent {
109 SchemaMigrationStepIntent::AddIndex { index } => {
110 let (entity, fields) = parse_index_name_parts(index.as_str())?;
111 let target = self.require_entity(entity)?;
112 for field in &fields {
113 if target.model().resolve_field_slot(field).is_none() {
114 return Err(InternalError::schema_evolution_unknown_field(
115 target.name().as_str(),
116 field,
117 ));
118 }
119 }
120 if target
121 .model()
122 .indexes()
123 .iter()
124 .any(|existing| existing.fields() == fields.as_slice())
125 {
126 return Err(InternalError::schema_evolution_duplicate_index(
127 target.name().as_str(),
128 index.as_str(),
129 ));
130 }
131 }
132 }
133
134 Ok(())
135 }
136
137 fn require_entity(
138 &self,
139 entity: EntityName,
140 ) -> Result<SchemaMigrationEntityTarget, InternalError> {
141 self.entities
142 .iter()
143 .copied()
144 .find(|target| target.name() == entity)
145 .ok_or_else(|| InternalError::schema_evolution_unknown_entity(entity.as_str()))
146 }
147}
148
149fn parse_index_name_parts(index: &str) -> Result<(EntityName, Vec<&str>), InternalError> {
150 let mut parts = index.split('|');
151 let entity = parts
152 .next()
153 .ok_or_else(|| InternalError::schema_evolution_invalid_index_name(index))?;
154 let entity = EntityName::try_from_str(entity).map_err(|err| {
155 InternalError::schema_evolution_invalid_identity(format!(
156 "invalid index entity segment '{entity}': {err}",
157 ))
158 })?;
159 let fields = parts.collect::<Vec<_>>();
160 if fields.is_empty() {
161 return Err(InternalError::schema_evolution_invalid_index_name(index));
162 }
163
164 Ok((entity, fields))
165}