Skip to main content

agent_orchestrator/persistence/
migration.rs

1use anyhow::{Context, Result};
2use rusqlite::Connection;
3use std::collections::HashSet;
4
5/// Describes a schema migration that can be applied to a persistence database.
6pub struct Migration {
7    /// Monotonic schema version assigned to the migration.
8    pub version: u32,
9    /// Stable migration identifier recorded in `schema_migrations`.
10    pub name: &'static str,
11    /// Migration function executed inside a transaction.
12    pub up: fn(&Connection) -> Result<()>,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16/// Public metadata for a registered migration.
17pub struct MigrationDescriptor {
18    /// Schema version introduced by the migration.
19    pub version: u32,
20    /// Stable migration identifier.
21    pub name: &'static str,
22}
23
24impl MigrationDescriptor {
25    fn from_migration(migration: &Migration) -> Self {
26        Self {
27            version: migration.version,
28            name: migration.name,
29        }
30    }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34/// Reports current and target schema versions plus the migrations still pending.
35pub struct SchemaStatus {
36    /// Highest schema version already applied to the database.
37    pub current_version: u32,
38    /// Highest schema version known to the running binary.
39    pub target_version: u32,
40    /// Ordered list of pending schema versions.
41    pub pending_versions: Vec<u32>,
42    /// Ordered list of pending migration names.
43    pub pending_names: Vec<&'static str>,
44}
45
46impl SchemaStatus {
47    /// Returns `true` when the database is already at the latest registered version.
48    pub fn is_current(&self) -> bool {
49        self.pending_versions.is_empty()
50    }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Default)]
54/// Summarizes which migrations were applied during a `run_pending` invocation.
55pub struct AppliedMigrationSummary {
56    /// Ordered descriptors for migrations applied in the current run.
57    pub applied: Vec<MigrationDescriptor>,
58}
59
60impl AppliedMigrationSummary {
61    /// Returns the number of migrations applied in the current run.
62    pub fn count(&self) -> u32 {
63        self.applied.len() as u32
64    }
65
66    /// Returns `true` when no migrations were applied.
67    pub fn is_empty(&self) -> bool {
68        self.applied.is_empty()
69    }
70}
71
72fn ensure_schema_migrations_table(conn: &Connection) -> Result<()> {
73    conn.execute_batch(
74        "CREATE TABLE IF NOT EXISTS schema_migrations (
75            version INTEGER PRIMARY KEY,
76            name TEXT NOT NULL,
77            applied_at TEXT NOT NULL
78        )",
79    )
80    .context("failed to create schema_migrations table")?;
81    Ok(())
82}
83
84/// Returns the highest applied schema version for the given database.
85pub fn current_version(conn: &Connection) -> Result<u32> {
86    ensure_schema_migrations_table(conn)?;
87
88    let version: u32 = conn
89        .query_row(
90            "SELECT COALESCE(MAX(version), 0) FROM schema_migrations",
91            [],
92            |row| row.get(0),
93        )
94        .context("failed to read current schema version")?;
95    Ok(version)
96}
97
98/// Returns the full ordered list of schema migrations known to this binary.
99pub fn registered_migrations() -> Vec<Migration> {
100    vec![
101        Migration {
102            version: 1,
103            name: "m0001_baseline_schema",
104            up: crate::persistence::migration_steps::m0001_baseline_schema,
105        },
106        Migration {
107            version: 2,
108            name: "m0002_backfill_legacy_defaults",
109            up: crate::persistence::migration_steps::m0002_backfill_historical_defaults,
110        },
111        Migration {
112            version: 3,
113            name: "m0003_events_promote_columns",
114            up: crate::persistence::migration_steps::m0003_events_promote_columns,
115        },
116        Migration {
117            version: 4,
118            name: "m0004_events_backfill_promoted",
119            up: crate::persistence::migration_steps::m0004_events_backfill_promoted,
120        },
121        Migration {
122            version: 5,
123            name: "m0005_add_task_lookup_indexes",
124            up: crate::persistence::migration_steps::m0005_add_task_lookup_indexes,
125        },
126        Migration {
127            version: 6,
128            name: "m0006_add_pipeline_vars_json",
129            up: crate::persistence::migration_steps::m0006_add_pipeline_vars_json,
130        },
131        Migration {
132            version: 7,
133            name: "m0007_workflow_store_entries",
134            up: crate::persistence::migration_steps::m0007_workflow_store_entries,
135        },
136        Migration {
137            version: 8,
138            name: "m0008_workflow_primitives",
139            up: crate::persistence::migration_steps::m0008_workflow_primitives,
140        },
141        Migration {
142            version: 9,
143            name: "m0009_normalize_unspecified_agent_ids",
144            up: crate::persistence::migration_steps::m0009_normalize_unspecified_agent_ids,
145        },
146        Migration {
147            version: 10,
148            name: "m0010_per_resource_persistence",
149            up: crate::persistence::migration_steps::m0010_per_resource_persistence,
150        },
151        Migration {
152            version: 11,
153            name: "m0011_finalize_resource_migration",
154            up: crate::persistence::migration_steps::m0011_finalize_resource_migration,
155        },
156        Migration {
157            version: 12,
158            name: "m0012_drop_legacy_orchestrator_config_blob",
159            up: crate::persistence::migration_steps::m0012_drop_legacy_orchestrator_config_blob,
160        },
161        Migration {
162            version: 13,
163            name: "m0013_control_plane_audit",
164            up: crate::persistence::migration_steps::m0013_control_plane_audit,
165        },
166        Migration {
167            version: 14,
168            name: "m0014_task_graph_debug_tables",
169            up: crate::persistence::migration_steps::m0014_task_graph_debug_tables,
170        },
171        Migration {
172            version: 15,
173            name: "m0015_control_plane_audit_rejection_stage",
174            up: crate::persistence::migration_steps::m0015_control_plane_audit_rejection_stage,
175        },
176        Migration {
177            version: 16,
178            name: "m0016_secret_key_lifecycle",
179            up: crate::persistence::migration_steps::m0016_secret_key_lifecycle,
180        },
181        Migration {
182            version: 17,
183            name: "m0017_control_plane_protection_fields",
184            up: crate::persistence::migration_steps::m0017_control_plane_protection_fields,
185        },
186        Migration {
187            version: 18,
188            name: "m0018_trigger_state",
189            up: crate::persistence::migration_steps::m0018_trigger_state,
190        },
191        Migration {
192            version: 19,
193            name: "m0019_daemon_incarnation",
194            up: crate::persistence::migration_steps::m0019_daemon_incarnation,
195        },
196        Migration {
197            version: 20,
198            name: "m0020_command_template_column",
199            up: crate::persistence::migration_steps::m0020_command_template_column,
200        },
201        Migration {
202            version: 21,
203            name: "m0021_command_rule_index_column",
204            up: crate::persistence::migration_steps::m0021_command_rule_index_column,
205        },
206        Migration {
207            version: 22,
208            name: "m0022_plugin_audit",
209            up: crate::persistence::migration_steps::m0022_plugin_audit,
210        },
211        Migration {
212            version: 23,
213            name: "m0023_task_step_filter_and_initial_vars",
214            up: crate::persistence::migration_steps::m0023_task_step_filter_and_initial_vars,
215        },
216    ]
217}
218
219/// Converts migration definitions into lightweight public descriptors.
220pub fn descriptors(migrations: &[Migration]) -> Vec<MigrationDescriptor> {
221    migrations
222        .iter()
223        .map(MigrationDescriptor::from_migration)
224        .collect()
225}
226
227/// Returns descriptors for every registered migration.
228pub fn registered_descriptors() -> Vec<MigrationDescriptor> {
229    descriptors(&registered_migrations())
230}
231
232/// Computes the database schema status against the provided migration set.
233pub fn status(conn: &Connection, migrations: &[Migration]) -> Result<SchemaStatus> {
234    let current_version = current_version(conn)?;
235    let descriptors = descriptors(migrations);
236    let pending = descriptors
237        .iter()
238        .filter(|migration| migration.version > current_version)
239        .copied()
240        .collect::<Vec<_>>();
241
242    Ok(SchemaStatus {
243        current_version,
244        target_version: descriptors
245            .last()
246            .map(|migration| migration.version)
247            .unwrap_or(0),
248        pending_versions: pending.iter().map(|migration| migration.version).collect(),
249        pending_names: pending.iter().map(|migration| migration.name).collect(),
250    })
251}
252
253/// Computes the database schema status against all registered migrations.
254pub fn registered_status(conn: &Connection) -> Result<SchemaStatus> {
255    let migrations = registered_migrations();
256    status(conn, &migrations)
257}
258
259/// Returns every schema version already recorded in `schema_migrations`.
260pub fn applied_versions(conn: &Connection) -> Result<Vec<u32>> {
261    ensure_schema_migrations_table(conn)?;
262
263    let mut stmt = conn
264        .prepare("SELECT version FROM schema_migrations ORDER BY version ASC")
265        .context("failed to prepare applied schema versions query")?;
266    let rows = stmt
267        .query_map([], |row| row.get::<_, u32>(0))
268        .context("failed to query applied schema versions")?;
269
270    rows.collect::<std::result::Result<Vec<_>, _>>()
271        .context("failed to collect applied schema versions")
272}
273
274#[derive(Debug, Clone, PartialEq, Eq)]
275/// Indicates whether each registered migration has already been applied.
276pub struct RegisteredMigrationStatus {
277    /// Schema version represented by this row.
278    pub version: u32,
279    /// Stable migration identifier.
280    pub name: &'static str,
281    /// Whether the version is present in `schema_migrations`.
282    pub applied: bool,
283}
284
285/// Returns the applied status for every registered migration.
286pub fn registered_migration_statuses(conn: &Connection) -> Result<Vec<RegisteredMigrationStatus>> {
287    let applied = applied_versions(conn)?.into_iter().collect::<HashSet<_>>();
288    Ok(registered_descriptors()
289        .into_iter()
290        .map(|descriptor| RegisteredMigrationStatus {
291            version: descriptor.version,
292            name: descriptor.name,
293            applied: applied.contains(&descriptor.version),
294        })
295        .collect())
296}
297
298/// Applies every migration newer than the current schema version.
299pub fn run_pending(conn: &Connection, migrations: &[Migration]) -> Result<AppliedMigrationSummary> {
300    let current = current_version(conn)?;
301    let mut applied = Vec::new();
302
303    for migration in migrations {
304        if migration.version <= current {
305            continue;
306        }
307
308        let tx = conn.unchecked_transaction().with_context(|| {
309            format!(
310                "failed to begin transaction for migration {}",
311                migration.name
312            )
313        })?;
314
315        (migration.up)(&tx).with_context(|| format!("migration {} failed", migration.name))?;
316
317        tx.execute(
318            "INSERT INTO schema_migrations (version, name, applied_at) VALUES (?1, ?2, datetime('now'))",
319            rusqlite::params![migration.version, migration.name],
320        )
321        .with_context(|| format!("failed to record migration version {}", migration.version))?;
322
323        tx.commit()
324            .with_context(|| format!("failed to commit migration {}", migration.name))?;
325
326        applied.push(MigrationDescriptor::from_migration(migration));
327    }
328
329    Ok(AppliedMigrationSummary { applied })
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    #[test]
337    fn registered_status_reports_pending_for_blank_database() {
338        let conn = Connection::open_in_memory().expect("open in-memory sqlite");
339
340        let status = registered_status(&conn).expect("registered status");
341
342        assert_eq!(status.current_version, 0);
343        assert_eq!(
344            status.target_version,
345            registered_descriptors()
346                .last()
347                .expect("at least one migration")
348                .version
349        );
350        assert!(!status.is_current());
351        assert_eq!(
352            status.pending_versions.len(),
353            registered_descriptors().len()
354        );
355    }
356
357    #[test]
358    fn run_pending_summary_reports_applied_descriptors() {
359        let conn = Connection::open_in_memory().expect("open in-memory sqlite");
360        let migrations = vec![Migration {
361            version: 1,
362            name: "m0001_test_only",
363            up: |_conn| Ok(()),
364        }];
365
366        let summary = run_pending(&conn, &migrations).expect("run pending");
367
368        assert_eq!(summary.count(), 1);
369        assert!(!summary.is_empty());
370        assert_eq!(
371            summary.applied,
372            vec![MigrationDescriptor {
373                version: 1,
374                name: "m0001_test_only",
375            }]
376        );
377    }
378}