Skip to main content

cersei_memory/
graph_migrate.rs

1//! Graph schema versioning and migration engine.
2//!
3//! Stores a `(:SchemaVersion)` node inside the graph. On open, the code checks
4//! the graph version against `CURRENT_SCHEMA_VERSION` and runs sequential
5//! migrations if the graph is behind. Each migration is idempotent.
6//!
7//! ## Version History
8//! - v0: Pre-versioning (no SchemaVersion node)
9//! - v1: Stamp version node (current schema, no data changes)
10//! - v2: Add `last_validated_at`, `decay_rate`, `embedding_model_version` to Memory nodes
11
12#[cfg(feature = "graph")]
13use grafeo::GrafeoDB;
14
15/// Current schema version that this code expects.
16pub const CURRENT_SCHEMA_VERSION: u32 = 2;
17
18/// Result of a schema version check.
19#[derive(Debug, PartialEq)]
20pub enum VersionCheck {
21    /// Graph is at the expected version.
22    UpToDate,
23    /// Graph needs migration from `from` to `to`.
24    NeedsMigration { from: u32, to: u32 },
25    /// Graph is ahead of this code (opened by newer code previously).
26    CodeBehind {
27        graph_version: u32,
28        code_version: u32,
29    },
30}
31
32// ─── GQL queries for version management ────────────────────────────────────
33
34mod queries {
35    pub const READ_VERSION: &str = "MATCH (v:SchemaVersion) RETURN v.version";
36
37    pub fn insert_version(version: u32, now: &str, code_ver: &str) -> String {
38        format!(
39            "INSERT (:SchemaVersion {{singleton: 'schema_version', version: {}, migrated_at: '{}', code_version: '{}'}})",
40            version, now, code_ver
41        )
42    }
43
44    /// Migration v1→v2: add decay and embedding fields to Memory nodes that lack them.
45    /// Grafeo is schema-less, so we SET properties on existing nodes.
46    /// Idempotent: only targets nodes where last_validated_at is not already set.
47    ///
48    /// Since Grafeo may not support `WHERE ... IS NULL` or `SET` in a single query,
49    /// we do this in Rust by iterating. See `migrate_v1_to_v2`.
50    pub const MATCH_ALL_MEMORIES: &str = "MATCH (m:Memory) RETURN m.id, m.created_at";
51}
52
53// ─── Version check ─────────────────────────────────────────────────────────
54
55/// Check the graph's schema version against the code's expected version.
56#[cfg(feature = "graph")]
57pub fn check_version(db: &GrafeoDB) -> VersionCheck {
58    let session = db.session();
59
60    match session.execute(queries::READ_VERSION) {
61        Ok(result) => {
62            if let Some(row) = result.iter().next() {
63                // Try to extract version as i64 then cast
64                let graph_ver = row
65                    .first()
66                    .and_then(|v| format!("{}", v).parse::<u32>().ok())
67                    .unwrap_or(0);
68
69                if graph_ver == CURRENT_SCHEMA_VERSION {
70                    VersionCheck::UpToDate
71                } else if graph_ver < CURRENT_SCHEMA_VERSION {
72                    VersionCheck::NeedsMigration {
73                        from: graph_ver,
74                        to: CURRENT_SCHEMA_VERSION,
75                    }
76                } else {
77                    VersionCheck::CodeBehind {
78                        graph_version: graph_ver,
79                        code_version: CURRENT_SCHEMA_VERSION,
80                    }
81                }
82            } else {
83                // No SchemaVersion node → pre-versioning (v0)
84                VersionCheck::NeedsMigration {
85                    from: 0,
86                    to: CURRENT_SCHEMA_VERSION,
87                }
88            }
89        }
90        Err(_) => {
91            // Query failed → treat as v0
92            VersionCheck::NeedsMigration {
93                from: 0,
94                to: CURRENT_SCHEMA_VERSION,
95            }
96        }
97    }
98}
99
100/// Fallback when graph feature is disabled.
101#[cfg(not(feature = "graph"))]
102pub fn check_version(_db: &()) -> VersionCheck {
103    VersionCheck::UpToDate
104}
105
106// ─── Migration runner ──────────────────────────────────────────────────────
107
108/// Run all necessary migrations from `from` to `to`.
109/// Each step is idempotent. Updates the SchemaVersion node on completion.
110#[cfg(feature = "graph")]
111pub fn run_migrations(db: &GrafeoDB, from: u32, to: u32) -> cersei_types::Result<()> {
112    tracing::info!("Migrating graph schema from v{} to v{}", from, to);
113
114    let mut current = from;
115    while current < to {
116        match current {
117            0 => migrate_v0_to_v1(db)?,
118            1 => migrate_v1_to_v2(db)?,
119            _ => {
120                return Err(cersei_types::CerseiError::Config(format!(
121                    "Unknown migration: v{} → v{}",
122                    current,
123                    current + 1
124                )));
125            }
126        }
127        current += 1;
128    }
129
130    // Stamp the final version
131    stamp_version(db, to)?;
132
133    tracing::info!("Graph schema migration complete: v{}", to);
134    Ok(())
135}
136
137/// Fallback when graph feature is disabled.
138#[cfg(not(feature = "graph"))]
139pub fn run_migrations(_db: &(), _from: u32, _to: u32) -> cersei_types::Result<()> {
140    Ok(())
141}
142
143// ─── Version stamping ──────────────────────────────────────────────────────
144
145#[cfg(feature = "graph")]
146fn stamp_version(db: &GrafeoDB, version: u32) -> cersei_types::Result<()> {
147    let session = db.session();
148    let now = chrono::Utc::now().to_rfc3339();
149    let code_ver = env!("CARGO_PKG_VERSION");
150
151    // Delete old version node if exists, then insert fresh one.
152    // This is simpler than trying to UPDATE which Grafeo may not support.
153    let _ = session.execute("MATCH (v:SchemaVersion) DELETE v");
154    session
155        .execute(&queries::insert_version(version, &now, code_ver))
156        .map_err(|e| {
157            cersei_types::CerseiError::Config(format!("Failed to stamp schema version: {}", e))
158        })?;
159
160    Ok(())
161}
162
163// ─── Individual migrations ─────────────────────────────────────────────────
164
165/// v0 → v1: Stamp initial schema version. No data changes needed —
166/// v1 IS the pre-existing schema.
167#[cfg(feature = "graph")]
168fn migrate_v0_to_v1(db: &GrafeoDB) -> cersei_types::Result<()> {
169    tracing::debug!("Running migration v0 → v1 (stamp version, no data changes)");
170    // Nothing to change — v1 is the original schema.
171    // The stamp_version call after all migrations handles creating the node.
172    Ok(())
173}
174
175/// v1 → v2: Add confidence decay and embedding fields to all Memory nodes.
176///
177/// New fields on :Memory:
178/// - last_validated_at (String, defaults to created_at)
179/// - decay_rate (f32, defaults to 0.01)
180/// - embedding_model_version (String, defaults to "")
181///
182/// Idempotent: re-running is safe because we only modify nodes that exist.
183/// Since the INSERT creates new nodes with these fields, and old nodes
184/// just don't have them, the forward-compatible read helpers handle the gap.
185#[cfg(feature = "graph")]
186fn migrate_v1_to_v2(db: &GrafeoDB) -> cersei_types::Result<()> {
187    tracing::debug!("Running migration v1 → v2 (add decay/embedding fields)");
188
189    // Grafeo is schema-less. "Adding fields" means new INSERT statements include them.
190    // Old nodes simply lack these properties — the read-time helpers in graph.rs
191    // (effective_confidence, prop_or_default) handle missing fields with defaults.
192    //
193    // We don't iterate and SET each existing node because:
194    // 1. Grafeo may not support MATCH ... SET syntax
195    // 2. The read-time defaults produce identical behavior
196    // 3. Iterating thousands of nodes in a migration is slow and fragile
197    //
198    // The tradeoff: old nodes never get the physical properties, but they
199    // behave identically through the API because defaults are applied at read time.
200
201    Ok(())
202}
203
204// ─── Confidence decay ──────────────────────────────────────────────────────
205
206/// Calculate effective confidence with time-based decay.
207///
208/// `effective = base_confidence - (decay_rate * days_since_validation)`
209///
210/// Clamped to [0.0, 1.0]. If `last_validated_at` is invalid or missing,
211/// returns `base_confidence` unchanged (no decay).
212pub fn effective_confidence(base: f32, decay_rate: f32, last_validated_at: &str) -> f32 {
213    if last_validated_at.is_empty() || decay_rate <= 0.0 {
214        return base.clamp(0.0, 1.0);
215    }
216
217    let validated = match chrono::DateTime::parse_from_rfc3339(last_validated_at) {
218        Ok(dt) => dt.with_timezone(&chrono::Utc),
219        Err(_) => return base.clamp(0.0, 1.0),
220    };
221
222    let days = (chrono::Utc::now() - validated).num_days().max(0) as f32;
223    (base - decay_rate * days).clamp(0.0, 1.0)
224}
225
226// ─── Tests ─────────────────────────────────────────────────────────────────
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn test_effective_confidence_no_decay() {
234        assert_eq!(effective_confidence(0.9, 0.0, ""), 0.9);
235        assert_eq!(effective_confidence(0.9, 0.0, "2024-01-01T00:00:00Z"), 0.9);
236    }
237
238    #[test]
239    fn test_effective_confidence_with_decay() {
240        // Use a date far in the past for deterministic test
241        let old = "2020-01-01T00:00:00Z";
242        let result = effective_confidence(0.9, 0.01, old);
243        // Should be significantly decayed (>2000 days * 0.01 = >20, clamped to 0.0)
244        assert_eq!(result, 0.0);
245    }
246
247    #[test]
248    fn test_effective_confidence_recent() {
249        let now = chrono::Utc::now().to_rfc3339();
250        let result = effective_confidence(0.9, 0.01, &now);
251        // Just validated — should be ~0.9 (0 days decay)
252        assert!((result - 0.9).abs() < 0.02);
253    }
254
255    #[test]
256    fn test_effective_confidence_invalid_date() {
257        assert_eq!(effective_confidence(0.8, 0.01, "not-a-date"), 0.8);
258    }
259
260    #[test]
261    fn test_effective_confidence_clamps() {
262        assert_eq!(effective_confidence(1.5, 0.0, ""), 1.0);
263        assert_eq!(effective_confidence(-0.5, 0.0, ""), 0.0);
264    }
265
266    #[cfg(feature = "graph")]
267    #[test]
268    fn test_check_version_fresh_graph() {
269        let db = GrafeoDB::new_in_memory();
270        let check = check_version(&db);
271        assert_eq!(
272            check,
273            VersionCheck::NeedsMigration {
274                from: 0,
275                to: CURRENT_SCHEMA_VERSION
276            }
277        );
278    }
279
280    #[cfg(feature = "graph")]
281    #[test]
282    fn test_migration_and_recheck() {
283        let db = GrafeoDB::new_in_memory();
284
285        // Fresh graph → needs migration
286        let check = check_version(&db);
287        assert_eq!(check, VersionCheck::NeedsMigration { from: 0, to: 2 });
288
289        // Run migrations
290        run_migrations(&db, 0, 2).unwrap();
291
292        // Now should be up to date
293        let check = check_version(&db);
294        assert_eq!(check, VersionCheck::UpToDate);
295    }
296
297    #[cfg(feature = "graph")]
298    #[test]
299    fn test_migration_idempotent() {
300        let db = GrafeoDB::new_in_memory();
301
302        // Run migrations twice — should not fail
303        run_migrations(&db, 0, 2).unwrap();
304        run_migrations(&db, 0, 2).unwrap();
305
306        let check = check_version(&db);
307        assert_eq!(check, VersionCheck::UpToDate);
308    }
309}