Skip to main content

reifydb_core/encoded/schema/
evolution.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! RowSchema evolution and resolution utilities.
5//!
6//! SchemaResolver handles reading data written with one schema
7//! using a different (but compatible) target schema.
8
9use reifydb_type::value::constraint::TypeConstraint;
10
11use crate::encoded::schema::RowSchema;
12
13/// Describes how to map a source field to a target field during schema evolution.
14#[derive(Debug, Clone)]
15pub enum FieldMapping {
16	/// Field exists in both schemas at the given source index
17	Direct {
18		source_index: usize,
19	},
20	/// Field is new in target schema, use default value
21	UseDefault,
22	/// Field was removed (source has it, target doesn't) - skip during read
23	Removed,
24}
25
26/// Resolves differences between source and target schemas.
27///
28/// Used when reading data that was written with an older schema version
29/// using a newer schema, or vice versa.
30#[derive(Debug)]
31pub struct SchemaResolver {
32	/// The schema the data was written with
33	source: RowSchema,
34	/// The schema we want to read as
35	target: RowSchema,
36	/// Mapping from target field index to source field
37	mappings: Vec<FieldMapping>,
38}
39
40impl SchemaResolver {
41	/// Create a resolver to read data from source schema as target schema.
42	///
43	/// Returns None if the schemas are incompatible (e.g., type mismatch
44	/// on same-named field without valid widening path).
45	pub fn new(source: RowSchema, target: RowSchema) -> Option<Self> {
46		// If fingerprints match, no resolution needed - schemas are identical
47		if source.fingerprint() == target.fingerprint() {
48			return Some(Self {
49				mappings: (0..target.field_count())
50					.map(|i| FieldMapping::Direct {
51						source_index: i,
52					})
53					.collect(),
54				source,
55				target,
56			});
57		}
58
59		let mut mappings = Vec::with_capacity(target.field_count());
60
61		for target_field in target.fields() {
62			if let Some((source_idx, source_field)) =
63				source.fields().iter().enumerate().find(|(_, f)| f.name == target_field.name)
64			{
65				// Field exists in both - check type compatibility
66				if !Self::types_compatible(&source_field.constraint, &target_field.constraint) {
67					return None; // Incompatible types
68				}
69				mappings.push(FieldMapping::Direct {
70					source_index: source_idx,
71				});
72			} else {
73				// Field only in target - needs default
74				mappings.push(FieldMapping::UseDefault);
75			}
76		}
77
78		Some(Self {
79			source,
80			target,
81			mappings,
82		})
83	}
84
85	/// Check if source constraint can be read as target constraint.
86	/// For now, just compares base types - constraint widening could be added later.
87	fn types_compatible(source: &TypeConstraint, target: &TypeConstraint) -> bool {
88		let source_type = source.get_type();
89		let target_type = target.get_type();
90
91		if source_type == target_type {
92			return true;
93		}
94
95		// Type widening would go here
96		// For now, only identical types are compatible
97		false
98	}
99
100	/// Get the source schema
101	pub fn source(&self) -> &RowSchema {
102		&self.source
103	}
104
105	/// Get the target schema
106	pub fn target(&self) -> &RowSchema {
107		&self.target
108	}
109
110	/// Get the field mappings
111	pub fn mappings(&self) -> &[FieldMapping] {
112		&self.mappings
113	}
114
115	/// Check if this is an identity mapping (source == target)
116	pub fn is_identity(&self) -> bool {
117		self.source.fingerprint() == self.target.fingerprint()
118	}
119}
120
121#[cfg(test)]
122mod tests {
123	use reifydb_type::value::r#type::Type;
124
125	use super::*;
126	use crate::encoded::schema::RowSchemaField;
127
128	#[test]
129	fn test_resolver_identity() {
130		let fields = vec![
131			RowSchemaField::unconstrained("a", Type::Int4),
132			RowSchemaField::unconstrained("b", Type::Utf8),
133		];
134
135		let schema = RowSchema::new(fields);
136		let resolver = SchemaResolver::new(schema.clone(), schema.clone()).unwrap();
137
138		assert!(resolver.is_identity());
139		assert_eq!(resolver.mappings().len(), 2);
140	}
141
142	#[test]
143	fn test_resolver_added_field() {
144		let source_fields = vec![RowSchemaField::unconstrained("a", Type::Int4)];
145
146		let target_fields = vec![
147			RowSchemaField::unconstrained("a", Type::Int4),
148			RowSchemaField::unconstrained("b", Type::Utf8), // new field
149		];
150
151		let source = RowSchema::new(source_fields);
152		let target = RowSchema::new(target_fields);
153
154		let resolver = SchemaResolver::new(source, target).unwrap();
155
156		assert!(!resolver.is_identity());
157		assert!(matches!(
158			resolver.mappings()[0],
159			FieldMapping::Direct {
160				source_index: 0
161			}
162		));
163		assert!(matches!(resolver.mappings()[1], FieldMapping::UseDefault));
164	}
165
166	#[test]
167	fn test_resolver_incompatible_types() {
168		let source_fields = vec![RowSchemaField::unconstrained("a", Type::Int4)];
169		let target_fields = vec![RowSchemaField::unconstrained("a", Type::Utf8)]; // type changed
170
171		let source = RowSchema::new(source_fields);
172		let target = RowSchema::new(target_fields);
173
174		// Should return None due to incompatible types
175		assert!(SchemaResolver::new(source, target).is_none());
176	}
177}