1use anyhow::{Context, Result};
2use rusqlite::Connection;
3use std::collections::HashSet;
4
5pub struct Migration {
7 pub version: u32,
9 pub name: &'static str,
11 pub up: fn(&Connection) -> Result<()>,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub struct MigrationDescriptor {
18 pub version: u32,
20 pub name: &'static str,
22}
23
24impl MigrationDescriptor {
25 fn from_migration(migration: &Migration) -> Self {
26 Self {
27 version: migration.version,
28 name: migration.name,
29 }
30 }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct SchemaStatus {
36 pub current_version: u32,
38 pub target_version: u32,
40 pub pending_versions: Vec<u32>,
42 pub pending_names: Vec<&'static str>,
44}
45
46impl SchemaStatus {
47 pub fn is_current(&self) -> bool {
49 self.pending_versions.is_empty()
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Default)]
54pub struct AppliedMigrationSummary {
56 pub applied: Vec<MigrationDescriptor>,
58}
59
60impl AppliedMigrationSummary {
61 pub fn count(&self) -> u32 {
63 self.applied.len() as u32
64 }
65
66 pub fn is_empty(&self) -> bool {
68 self.applied.is_empty()
69 }
70}
71
72fn ensure_schema_migrations_table(conn: &Connection) -> Result<()> {
73 conn.execute_batch(
74 "CREATE TABLE IF NOT EXISTS schema_migrations (
75 version INTEGER PRIMARY KEY,
76 name TEXT NOT NULL,
77 applied_at TEXT NOT NULL
78 )",
79 )
80 .context("failed to create schema_migrations table")?;
81 Ok(())
82}
83
84pub fn current_version(conn: &Connection) -> Result<u32> {
86 ensure_schema_migrations_table(conn)?;
87
88 let version: u32 = conn
89 .query_row(
90 "SELECT COALESCE(MAX(version), 0) FROM schema_migrations",
91 [],
92 |row| row.get(0),
93 )
94 .context("failed to read current schema version")?;
95 Ok(version)
96}
97
98pub fn registered_migrations() -> Vec<Migration> {
100 vec![
101 Migration {
102 version: 1,
103 name: "m0001_baseline_schema",
104 up: crate::persistence::migration_steps::m0001_baseline_schema,
105 },
106 Migration {
107 version: 2,
108 name: "m0002_backfill_legacy_defaults",
109 up: crate::persistence::migration_steps::m0002_backfill_historical_defaults,
110 },
111 Migration {
112 version: 3,
113 name: "m0003_events_promote_columns",
114 up: crate::persistence::migration_steps::m0003_events_promote_columns,
115 },
116 Migration {
117 version: 4,
118 name: "m0004_events_backfill_promoted",
119 up: crate::persistence::migration_steps::m0004_events_backfill_promoted,
120 },
121 Migration {
122 version: 5,
123 name: "m0005_add_task_lookup_indexes",
124 up: crate::persistence::migration_steps::m0005_add_task_lookup_indexes,
125 },
126 Migration {
127 version: 6,
128 name: "m0006_add_pipeline_vars_json",
129 up: crate::persistence::migration_steps::m0006_add_pipeline_vars_json,
130 },
131 Migration {
132 version: 7,
133 name: "m0007_workflow_store_entries",
134 up: crate::persistence::migration_steps::m0007_workflow_store_entries,
135 },
136 Migration {
137 version: 8,
138 name: "m0008_workflow_primitives",
139 up: crate::persistence::migration_steps::m0008_workflow_primitives,
140 },
141 Migration {
142 version: 9,
143 name: "m0009_normalize_unspecified_agent_ids",
144 up: crate::persistence::migration_steps::m0009_normalize_unspecified_agent_ids,
145 },
146 Migration {
147 version: 10,
148 name: "m0010_per_resource_persistence",
149 up: crate::persistence::migration_steps::m0010_per_resource_persistence,
150 },
151 Migration {
152 version: 11,
153 name: "m0011_finalize_resource_migration",
154 up: crate::persistence::migration_steps::m0011_finalize_resource_migration,
155 },
156 Migration {
157 version: 12,
158 name: "m0012_drop_legacy_orchestrator_config_blob",
159 up: crate::persistence::migration_steps::m0012_drop_legacy_orchestrator_config_blob,
160 },
161 Migration {
162 version: 13,
163 name: "m0013_control_plane_audit",
164 up: crate::persistence::migration_steps::m0013_control_plane_audit,
165 },
166 Migration {
167 version: 14,
168 name: "m0014_task_graph_debug_tables",
169 up: crate::persistence::migration_steps::m0014_task_graph_debug_tables,
170 },
171 Migration {
172 version: 15,
173 name: "m0015_control_plane_audit_rejection_stage",
174 up: crate::persistence::migration_steps::m0015_control_plane_audit_rejection_stage,
175 },
176 Migration {
177 version: 16,
178 name: "m0016_secret_key_lifecycle",
179 up: crate::persistence::migration_steps::m0016_secret_key_lifecycle,
180 },
181 Migration {
182 version: 17,
183 name: "m0017_control_plane_protection_fields",
184 up: crate::persistence::migration_steps::m0017_control_plane_protection_fields,
185 },
186 Migration {
187 version: 18,
188 name: "m0018_trigger_state",
189 up: crate::persistence::migration_steps::m0018_trigger_state,
190 },
191 Migration {
192 version: 19,
193 name: "m0019_daemon_incarnation",
194 up: crate::persistence::migration_steps::m0019_daemon_incarnation,
195 },
196 Migration {
197 version: 20,
198 name: "m0020_command_template_column",
199 up: crate::persistence::migration_steps::m0020_command_template_column,
200 },
201 Migration {
202 version: 21,
203 name: "m0021_command_rule_index_column",
204 up: crate::persistence::migration_steps::m0021_command_rule_index_column,
205 },
206 Migration {
207 version: 22,
208 name: "m0022_plugin_audit",
209 up: crate::persistence::migration_steps::m0022_plugin_audit,
210 },
211 Migration {
212 version: 23,
213 name: "m0023_task_step_filter_and_initial_vars",
214 up: crate::persistence::migration_steps::m0023_task_step_filter_and_initial_vars,
215 },
216 ]
217}
218
219pub fn descriptors(migrations: &[Migration]) -> Vec<MigrationDescriptor> {
221 migrations
222 .iter()
223 .map(MigrationDescriptor::from_migration)
224 .collect()
225}
226
227pub fn registered_descriptors() -> Vec<MigrationDescriptor> {
229 descriptors(®istered_migrations())
230}
231
232pub fn status(conn: &Connection, migrations: &[Migration]) -> Result<SchemaStatus> {
234 let current_version = current_version(conn)?;
235 let descriptors = descriptors(migrations);
236 let pending = descriptors
237 .iter()
238 .filter(|migration| migration.version > current_version)
239 .copied()
240 .collect::<Vec<_>>();
241
242 Ok(SchemaStatus {
243 current_version,
244 target_version: descriptors
245 .last()
246 .map(|migration| migration.version)
247 .unwrap_or(0),
248 pending_versions: pending.iter().map(|migration| migration.version).collect(),
249 pending_names: pending.iter().map(|migration| migration.name).collect(),
250 })
251}
252
253pub fn registered_status(conn: &Connection) -> Result<SchemaStatus> {
255 let migrations = registered_migrations();
256 status(conn, &migrations)
257}
258
259pub fn applied_versions(conn: &Connection) -> Result<Vec<u32>> {
261 ensure_schema_migrations_table(conn)?;
262
263 let mut stmt = conn
264 .prepare("SELECT version FROM schema_migrations ORDER BY version ASC")
265 .context("failed to prepare applied schema versions query")?;
266 let rows = stmt
267 .query_map([], |row| row.get::<_, u32>(0))
268 .context("failed to query applied schema versions")?;
269
270 rows.collect::<std::result::Result<Vec<_>, _>>()
271 .context("failed to collect applied schema versions")
272}
273
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct RegisteredMigrationStatus {
277 pub version: u32,
279 pub name: &'static str,
281 pub applied: bool,
283}
284
285pub fn registered_migration_statuses(conn: &Connection) -> Result<Vec<RegisteredMigrationStatus>> {
287 let applied = applied_versions(conn)?.into_iter().collect::<HashSet<_>>();
288 Ok(registered_descriptors()
289 .into_iter()
290 .map(|descriptor| RegisteredMigrationStatus {
291 version: descriptor.version,
292 name: descriptor.name,
293 applied: applied.contains(&descriptor.version),
294 })
295 .collect())
296}
297
298pub fn run_pending(conn: &Connection, migrations: &[Migration]) -> Result<AppliedMigrationSummary> {
300 let current = current_version(conn)?;
301 let mut applied = Vec::new();
302
303 for migration in migrations {
304 if migration.version <= current {
305 continue;
306 }
307
308 let tx = conn.unchecked_transaction().with_context(|| {
309 format!(
310 "failed to begin transaction for migration {}",
311 migration.name
312 )
313 })?;
314
315 (migration.up)(&tx).with_context(|| format!("migration {} failed", migration.name))?;
316
317 tx.execute(
318 "INSERT INTO schema_migrations (version, name, applied_at) VALUES (?1, ?2, datetime('now'))",
319 rusqlite::params![migration.version, migration.name],
320 )
321 .with_context(|| format!("failed to record migration version {}", migration.version))?;
322
323 tx.commit()
324 .with_context(|| format!("failed to commit migration {}", migration.name))?;
325
326 applied.push(MigrationDescriptor::from_migration(migration));
327 }
328
329 Ok(AppliedMigrationSummary { applied })
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[test]
337 fn registered_status_reports_pending_for_blank_database() {
338 let conn = Connection::open_in_memory().expect("open in-memory sqlite");
339
340 let status = registered_status(&conn).expect("registered status");
341
342 assert_eq!(status.current_version, 0);
343 assert_eq!(
344 status.target_version,
345 registered_descriptors()
346 .last()
347 .expect("at least one migration")
348 .version
349 );
350 assert!(!status.is_current());
351 assert_eq!(
352 status.pending_versions.len(),
353 registered_descriptors().len()
354 );
355 }
356
357 #[test]
358 fn run_pending_summary_reports_applied_descriptors() {
359 let conn = Connection::open_in_memory().expect("open in-memory sqlite");
360 let migrations = vec![Migration {
361 version: 1,
362 name: "m0001_test_only",
363 up: |_conn| Ok(()),
364 }];
365
366 let summary = run_pending(&conn, &migrations).expect("run pending");
367
368 assert_eq!(summary.count(), 1);
369 assert!(!summary.is_empty());
370 assert_eq!(
371 summary.applied,
372 vec![MigrationDescriptor {
373 version: 1,
374 name: "m0001_test_only",
375 }]
376 );
377 }
378}