Skip to main content

reddb_server/runtime/
impl_migrations.rs

1//! Native migration execution: CREATE / APPLY / ROLLBACK / EXPLAIN MIGRATION
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use super::*;
8use crate::application::migration_collections as mc;
9use crate::application::migration_graph;
10use crate::application::migration_inference;
11use crate::application::vcs::{Author, CreateCommitInput};
12use crate::storage::query::ast::{
13    ApplyMigrationQuery, ApplyMigrationTarget, CreateMigrationQuery, ExplainMigrationQuery,
14    RollbackMigrationQuery,
15};
16use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
17
18fn now_ms() -> i64 {
19    SystemTime::now()
20        .duration_since(UNIX_EPOCH)
21        .map(|d| d.as_millis() as i64)
22        .unwrap_or(0)
23}
24
25fn val_text(v: &Value) -> Option<&str> {
26    if let Value::Text(s) = v {
27        Some(s.as_ref())
28    } else {
29        None
30    }
31}
32
33fn val_bool(v: &Value) -> Option<bool> {
34    if let Value::Boolean(b) = v {
35        Some(*b)
36    } else {
37        None
38    }
39}
40
41fn migration_author(rt: &RedDBRuntime) -> Author {
42    let store = rt.inner.db.store();
43    let name = store
44        .get_config("red.vcs.author.name")
45        .and_then(|v| {
46            if let Value::Text(s) = v {
47                Some(s.to_string())
48            } else {
49                None
50            }
51        })
52        .unwrap_or_else(|| "reddb".to_string());
53    let email = store
54        .get_config("red.vcs.author.email")
55        .and_then(|v| {
56            if let Value::Text(s) = v {
57                Some(s.to_string())
58            } else {
59                None
60            }
61        })
62        .unwrap_or_else(|| "reddb@localhost".to_string());
63    Author { name, email }
64}
65
66fn insert_meta_row(
67    store: &UnifiedStore,
68    collection: &str,
69    fields: HashMap<String, Value>,
70) -> RedDBResult<EntityId> {
71    let _ = store.get_or_create_collection(collection);
72    store
73        .insert_auto(
74            collection,
75            UnifiedEntity::new(
76                EntityId::new(0),
77                EntityKind::TableRow {
78                    table: Arc::from(collection),
79                    row_id: 0,
80                },
81                EntityData::Row(RowData {
82                    columns: Vec::new(),
83                    named: Some(fields),
84                    schema: None,
85                }),
86            ),
87        )
88        .map_err(|e| RedDBError::Internal(e.to_string()))
89}
90
91/// Find a migration row by name. Returns the entity and its named fields.
92fn find_migration(
93    store: &UnifiedStore,
94    name: &str,
95) -> Option<(UnifiedEntity, HashMap<String, Value>)> {
96    let manager = store.get_collection(mc::MIGRATIONS)?;
97    let results = manager.query_all(|entity| {
98        if let EntityData::Row(ref row) = entity.data {
99            if let Some(ref named) = row.named {
100                return named.get("name").and_then(|v| val_text(v)) == Some(name);
101            }
102        }
103        false
104    });
105    results.into_iter().find_map(|entity| {
106        if let EntityData::Row(ref row) = entity.data {
107            if let Some(ref named) = row.named {
108                return Some((entity.clone(), named.clone()));
109            }
110        }
111        None
112    })
113}
114
115/// Update a field on an existing migration row (by name).
116fn update_migration_field(
117    store: &UnifiedStore,
118    name: &str,
119    key: &str,
120    value: Value,
121) -> RedDBResult<()> {
122    let manager = store
123        .get_collection(mc::MIGRATIONS)
124        .ok_or_else(|| RedDBError::Internal("red_migrations collection not found".to_string()))?;
125    let results = manager.query_all(|entity| {
126        if let EntityData::Row(ref row) = entity.data {
127            if let Some(ref named) = row.named {
128                return named.get("name").and_then(|v| val_text(v)) == Some(name);
129            }
130        }
131        false
132    });
133    for mut entity in results {
134        if let EntityData::Row(ref mut row) = entity.data {
135            if let Some(ref mut named) = row.named {
136                named.insert(key.to_string(), value.clone());
137                manager
138                    .update(entity)
139                    .map_err(|e| RedDBError::Internal(e.to_string()))?;
140                return Ok(());
141            }
142        }
143    }
144    Err(RedDBError::NotFound(format!(
145        "migration '{name}' not found"
146    )))
147}
148
149/// Load all dependency edges from red_migration_deps.
150fn load_all_edges(store: &UnifiedStore) -> Vec<(String, String)> {
151    let Some(manager) = store.get_collection(mc::MIGRATION_DEPS) else {
152        return Vec::new();
153    };
154    manager
155        .query_all(|_| true)
156        .into_iter()
157        .filter_map(|entity| {
158            if let EntityData::Row(ref row) = entity.data {
159                if let Some(ref named) = row.named {
160                    let from = named
161                        .get("migration_id")
162                        .and_then(|v| val_text(v))?
163                        .to_string();
164                    let to = named
165                        .get("depends_on_id")
166                        .and_then(|v| val_text(v))?
167                        .to_string();
168                    return Some((from, to));
169                }
170            }
171            None
172        })
173        .collect()
174}
175
176/// Load all migration names from red_migrations.
177fn load_all_migration_names(store: &UnifiedStore) -> Vec<String> {
178    let Some(manager) = store.get_collection(mc::MIGRATIONS) else {
179        return Vec::new();
180    };
181    manager
182        .query_all(|_| true)
183        .into_iter()
184        .filter_map(|entity| {
185            if let EntityData::Row(ref row) = entity.data {
186                if let Some(ref named) = row.named {
187                    return named
188                        .get("name")
189                        .and_then(|v| val_text(v))
190                        .map(|s| s.to_string());
191                }
192            }
193            None
194        })
195        .collect()
196}
197
198impl RedDBRuntime {
199    /// CREATE MIGRATION — register a migration definition with status=pending.
200    pub fn execute_create_migration(
201        &self,
202        raw_query: &str,
203        q: &CreateMigrationQuery,
204    ) -> RedDBResult<RuntimeQueryResult> {
205        let store_arc = self.inner.db.store();
206        let store: &UnifiedStore = &store_arc;
207
208        if find_migration(store, &q.name).is_some() {
209            return Err(RedDBError::Query(format!(
210                "migration '{}' already exists",
211                q.name
212            )));
213        }
214
215        for dep in &q.depends_on {
216            if find_migration(store, dep).is_none() {
217                return Err(RedDBError::Query(format!(
218                    "migration '{dep}' referenced in DEPENDS ON does not exist"
219                )));
220            }
221        }
222
223        // Cycle detection: check that adding name → dep edges wouldn't create a cycle.
224        let existing_edges = load_all_edges(store);
225        for dep in &q.depends_on {
226            if migration_graph::would_create_cycle(&existing_edges, q.name.as_str(), dep) {
227                return Err(RedDBError::Query(format!(
228                    "adding DEPENDS ON '{dep}' to migration '{}' would create a dependency cycle",
229                    q.name
230                )));
231            }
232        }
233
234        let mut fields: HashMap<String, Value> = HashMap::new();
235        fields.insert("name".to_string(), Value::text(q.name.as_str()));
236        fields.insert("status".to_string(), Value::text("pending"));
237        fields.insert(
238            "kind".to_string(),
239            Value::text(if q.no_rollback { "data" } else { "ddl" }),
240        );
241        fields.insert("body".to_string(), Value::text(q.body.as_str()));
242        fields.insert(
243            "author".to_string(),
244            Value::text(migration_author(self).name.as_str()),
245        );
246        fields.insert("created_at".to_string(), Value::TimestampMs(now_ms()));
247        fields.insert("applied_at".to_string(), Value::Null);
248        fields.insert("rows_total".to_string(), Value::Null);
249        fields.insert("rows_processed".to_string(), Value::UnsignedInteger(0));
250        fields.insert("vcs_commit_hash".to_string(), Value::Null);
251        fields.insert("no_rollback".to_string(), Value::Boolean(q.no_rollback));
252        fields.insert(
253            "batch_size".to_string(),
254            q.batch_size
255                .map(Value::UnsignedInteger)
256                .unwrap_or(Value::Null),
257        );
258        insert_meta_row(store, mc::MIGRATIONS, fields)?;
259
260        for dep in &q.depends_on {
261            let mut dep_fields: HashMap<String, Value> = HashMap::new();
262            dep_fields.insert("migration_id".to_string(), Value::text(q.name.as_str()));
263            dep_fields.insert("depends_on_id".to_string(), Value::text(dep.as_str()));
264            dep_fields.insert("inferred".to_string(), Value::Boolean(false));
265            insert_meta_row(store, mc::MIGRATION_DEPS, dep_fields)?;
266        }
267
268        // Auto-infer additional dependency edges from static analysis of the body.
269        let existing_migrations: Vec<(String, String)> = store
270            .get_collection(mc::MIGRATIONS)
271            .map(|manager| {
272                manager
273                    .query_all(|_| true)
274                    .into_iter()
275                    .filter_map(|entity| {
276                        if let EntityData::Row(ref row) = entity.data {
277                            if let Some(ref named) = row.named {
278                                let name = named.get("name").and_then(|v| val_text(v))?.to_string();
279                                let body = named.get("body").and_then(|v| val_text(v))?.to_string();
280                                return Some((name, body));
281                            }
282                        }
283                        None
284                    })
285                    .collect()
286            })
287            .unwrap_or_default();
288        let explicit_deps: std::collections::HashSet<String> =
289            q.depends_on.iter().cloned().collect();
290        let inferred_edges = migration_inference::infer_dependencies(
291            q.name.as_str(),
292            q.body.as_str(),
293            &existing_migrations,
294        );
295        for (_, dep) in inferred_edges {
296            if explicit_deps.contains(&dep) {
297                continue; // already stored as explicit
298            }
299            // Only store if it wouldn't create a cycle (re-check with updated edge set).
300            let current_edges = load_all_edges(store);
301            if !migration_graph::would_create_cycle(&current_edges, q.name.as_str(), &dep) {
302                let mut dep_fields: HashMap<String, Value> = HashMap::new();
303                dep_fields.insert("migration_id".to_string(), Value::text(q.name.as_str()));
304                dep_fields.insert("depends_on_id".to_string(), Value::text(dep.as_str()));
305                dep_fields.insert("inferred".to_string(), Value::Boolean(true));
306                let _ = insert_meta_row(store, mc::MIGRATION_DEPS, dep_fields);
307            }
308        }
309
310        Ok(RuntimeQueryResult::ok_message(
311            raw_query.to_string(),
312            &format!("migration '{}' registered (pending)", q.name),
313            "create_migration",
314        ))
315    }
316
317    /// APPLY MIGRATION name [FOR TENANT id] | APPLY MIGRATION * [FOR TENANT id]
318    pub fn execute_apply_migration(
319        &self,
320        raw_query: &str,
321        q: &ApplyMigrationQuery,
322    ) -> RedDBResult<RuntimeQueryResult> {
323        // FOR TENANT * fans out to every known tenant.
324        if let Some(tenant) = &q.for_tenant {
325            if tenant == "*" {
326                return self.apply_migration_all_tenants(raw_query, q);
327            }
328            // FOR TENANT <specific_id>: set tenant context for this apply.
329            crate::runtime::impl_core::set_current_tenant(tenant.clone());
330        }
331
332        let result = match &q.target {
333            ApplyMigrationTarget::Named(name) => self.apply_single_migration(raw_query, name),
334            ApplyMigrationTarget::All => self.apply_all_pending(raw_query),
335        };
336
337        // Clear tenant override after apply so it doesn't leak.
338        if q.for_tenant.is_some() {
339            crate::runtime::impl_core::clear_current_tenant();
340        }
341
342        result
343    }
344
345    fn apply_all_pending(&self, raw_query: &str) -> RedDBResult<RuntimeQueryResult> {
346        let store_arc = self.inner.db.store();
347        let store: &UnifiedStore = &store_arc;
348        let pending = self.collect_pending_migrations(store);
349        if pending.is_empty() {
350            return Ok(RuntimeQueryResult::ok_message(
351                raw_query.to_string(),
352                "no pending migrations",
353                "apply_migration",
354            ));
355        }
356        let mut applied = 0u32;
357        let mut messages: Vec<String> = Vec::new();
358        for name in pending {
359            match self.apply_single_migration(raw_query, &name) {
360                Ok(_) => {
361                    applied += 1;
362                    messages.push(format!("applied: {name}"));
363                }
364                Err(e) => {
365                    messages.push(format!("failed: {name} — {e}"));
366                    break;
367                }
368            }
369        }
370        let summary = messages.join("; ");
371        Ok(RuntimeQueryResult::ok_message(
372            raw_query.to_string(),
373            &format!("applied {applied} migration(s): {summary}"),
374            "apply_migration",
375        ))
376    }
377
378    /// Fan out APPLY MIGRATION * to every known tenant in the auth store.
379    fn apply_migration_all_tenants(
380        &self,
381        raw_query: &str,
382        q: &ApplyMigrationQuery,
383    ) -> RedDBResult<RuntimeQueryResult> {
384        let tenant_ids = self.list_known_tenants();
385        if tenant_ids.is_empty() {
386            return Ok(RuntimeQueryResult::ok_message(
387                raw_query.to_string(),
388                "no tenants found — nothing applied",
389                "apply_migration",
390            ));
391        }
392        let mut results: Vec<String> = Vec::new();
393        for tenant in &tenant_ids {
394            crate::runtime::impl_core::set_current_tenant(tenant.clone());
395            let inner_q = ApplyMigrationQuery {
396                target: q.target.clone(),
397                for_tenant: None,
398            };
399            match self.execute_apply_migration(raw_query, &inner_q) {
400                Ok(r) => results.push(format!(
401                    "tenant={tenant}: {}",
402                    r.result
403                        .records
404                        .first()
405                        .and_then(|rec| rec.get("message"))
406                        .and_then(|v| val_text(v))
407                        .unwrap_or("ok")
408                )),
409                Err(e) => results.push(format!("tenant={tenant}: error — {e}")),
410            }
411            crate::runtime::impl_core::clear_current_tenant();
412        }
413        Ok(RuntimeQueryResult::ok_message(
414            raw_query.to_string(),
415            &results.join("; "),
416            "apply_migration",
417        ))
418    }
419
420    /// Collect distinct tenant IDs from the auth store.
421    fn list_known_tenants(&self) -> Vec<String> {
422        let auth_store = match self.inner.auth_store.read().clone() {
423            Some(s) => s,
424            None => return Vec::new(),
425        };
426        let users = auth_store.list_users_scoped(None);
427        let mut tenants: std::collections::HashSet<String> = std::collections::HashSet::new();
428        for u in users {
429            if let Some(ref t) = u.tenant_id {
430                tenants.insert(t.clone());
431            }
432        }
433        let mut out: Vec<String> = tenants.into_iter().collect();
434        out.sort();
435        out
436    }
437
438    fn collect_pending_migrations(&self, store: &UnifiedStore) -> Vec<String> {
439        // Collect only pending migrations.
440        let Some(manager) = store.get_collection(mc::MIGRATIONS) else {
441            return Vec::new();
442        };
443        let pending: Vec<String> = manager
444            .query_all(|entity| {
445                if let EntityData::Row(ref row) = entity.data {
446                    if let Some(ref named) = row.named {
447                        return named.get("status").and_then(|v| val_text(v)) == Some("pending");
448                    }
449                }
450                false
451            })
452            .into_iter()
453            .filter_map(|entity| {
454                if let EntityData::Row(ref row) = entity.data {
455                    if let Some(ref named) = row.named {
456                        return named
457                            .get("name")
458                            .and_then(|v| val_text(v))
459                            .map(|s| s.to_string());
460                    }
461                }
462                None
463            })
464            .collect();
465
466        // Sort topologically using the full edge set (includes applied migrations
467        // as anchors — only pending nodes end up in the output).
468        let all_edges = load_all_edges(store);
469        // Filter edges to only those between pending migrations.
470        let pending_set: std::collections::HashSet<&str> =
471            pending.iter().map(|s| s.as_str()).collect();
472        let relevant_edges: Vec<(String, String)> = all_edges
473            .into_iter()
474            .filter(|(m, d)| pending_set.contains(m.as_str()) && pending_set.contains(d.as_str()))
475            .collect();
476
477        match migration_graph::topological_sort(&pending, &relevant_edges) {
478            Ok(sorted) => sorted,
479            Err(_) => pending, // cycle shouldn't happen (guarded at CREATE time); fall back
480        }
481    }
482
483    fn apply_single_migration(
484        &self,
485        raw_query: &str,
486        name: &str,
487    ) -> RedDBResult<RuntimeQueryResult> {
488        let store_arc = self.inner.db.store();
489        let store: &UnifiedStore = &store_arc;
490
491        let (_, fields) = find_migration(store, name)
492            .ok_or_else(|| RedDBError::NotFound(format!("migration '{name}' not found")))?;
493
494        let status = fields.get("status").and_then(|v| val_text(v)).unwrap_or("");
495
496        if status == "applied" {
497            return Ok(RuntimeQueryResult::ok_message(
498                raw_query.to_string(),
499                &format!("migration '{name}' is already applied"),
500                "apply_migration",
501            ));
502        }
503
504        // Verify all dependencies are applied.
505        let deps = self.load_migration_deps(store, name);
506        for dep in &deps {
507            match find_migration(store, dep) {
508                Some((_, dep_fields)) => {
509                    let dep_status = dep_fields
510                        .get("status")
511                        .and_then(|v| val_text(v))
512                        .unwrap_or("");
513                    if dep_status != "applied" {
514                        return Err(RedDBError::Query(format!(
515                            "migration '{name}' depends on '{dep}' which is not yet applied"
516                        )));
517                    }
518                }
519                None => {
520                    return Err(RedDBError::Query(format!(
521                        "migration '{name}' depends on '{dep}' which does not exist"
522                    )));
523                }
524            }
525        }
526
527        let body = fields
528            .get("body")
529            .and_then(|v| val_text(v))
530            .unwrap_or("")
531            .to_string();
532        let batch_size = match fields.get("batch_size") {
533            Some(Value::UnsignedInteger(n)) => Some(*n),
534            _ => None,
535        };
536        let no_rollback = fields
537            .get("no_rollback")
538            .and_then(val_bool)
539            .unwrap_or(false);
540        let rows_processed_start = match fields.get("rows_processed") {
541            Some(Value::UnsignedInteger(n)) => *n,
542            _ => 0,
543        };
544
545        let apply_result = if let Some(batch) = batch_size {
546            self.apply_batched(store, name, &body, batch, rows_processed_start)
547        } else {
548            self.apply_statements(name, &body)
549        };
550
551        match apply_result {
552            Err(e) => {
553                let err_msg = e.to_string();
554                let _ = update_migration_field(store, name, "status", Value::text("failed"));
555                let _ = update_migration_field(store, name, "error", Value::text(err_msg.as_str()));
556                Err(RedDBError::Query(format!(
557                    "migration '{name}' failed: {err_msg}"
558                )))
559            }
560            Ok(rows_processed) => {
561                let author = migration_author(self);
562                let commit_hash = self
563                    .vcs_commit(CreateCommitInput {
564                        connection_id: 0,
565                        message: format!("migration: {name}"),
566                        author,
567                        committer: None,
568                        amend: false,
569                        allow_empty: true,
570                    })
571                    .map(|c| c.hash)
572                    .unwrap_or_default();
573
574                let _ = update_migration_field(store, name, "status", Value::text("applied"));
575                let _ =
576                    update_migration_field(store, name, "applied_at", Value::TimestampMs(now_ms()));
577                let _ = update_migration_field(
578                    store,
579                    name,
580                    "vcs_commit_hash",
581                    Value::text(commit_hash.as_str()),
582                );
583                if batch_size.is_some() {
584                    let _ = update_migration_field(
585                        store,
586                        name,
587                        "rows_processed",
588                        Value::UnsignedInteger(rows_processed),
589                    );
590                }
591                let msg = if no_rollback {
592                    format!(
593                        "migration '{name}' applied — {rows_processed} rows (no rollback, commit: {commit_hash})"
594                    )
595                } else {
596                    format!("migration '{name}' applied (commit: {commit_hash})")
597                };
598                Ok(RuntimeQueryResult::ok_message(
599                    raw_query.to_string(),
600                    &msg,
601                    "apply_migration",
602                ))
603            }
604        }
605    }
606
607    /// Execute a (possibly multi-statement) DDL body by splitting on `;`.
608    /// Returns Ok(0) — row count not tracked for DDL.
609    fn apply_statements(&self, name: &str, body: &str) -> RedDBResult<u64> {
610        let statements: Vec<&str> = body
611            .split(';')
612            .map(|s| s.trim())
613            .filter(|s| !s.is_empty())
614            .collect();
615        for stmt in statements {
616            self.execute_query(stmt).map_err(|e| {
617                RedDBError::Query(format!("statement in migration '{name}' failed: {e}"))
618            })?;
619        }
620        Ok(0)
621    }
622
623    /// Execute a data migration body in batches of `batch_size` rows,
624    /// persisting a checkpoint (`rows_processed`) after each batch.
625    /// Appends `LIMIT {batch_size}` to the body on each iteration;
626    /// stops when the engine reports fewer rows affected than the batch size.
627    fn apply_batched(
628        &self,
629        store: &UnifiedStore,
630        name: &str,
631        body: &str,
632        batch_size: u64,
633        initial_processed: u64,
634    ) -> RedDBResult<u64> {
635        let mut total = initial_processed;
636        loop {
637            let batch_body = format!("{body} LIMIT {batch_size}");
638            let result = self.execute_query(&batch_body).map_err(|e| {
639                RedDBError::Query(format!("batch in migration '{name}' failed: {e}"))
640            })?;
641            let affected = result.affected_rows;
642            total += affected;
643            // Persist checkpoint so a crash can resume from here.
644            let _ = update_migration_field(
645                store,
646                name,
647                "rows_processed",
648                Value::UnsignedInteger(total),
649            );
650            if affected < batch_size {
651                break;
652            }
653        }
654        Ok(total)
655    }
656
657    fn load_migration_deps(&self, store: &UnifiedStore, name: &str) -> Vec<String> {
658        let Some(manager) = store.get_collection(mc::MIGRATION_DEPS) else {
659            return Vec::new();
660        };
661        manager
662            .query_all(|entity| {
663                if let EntityData::Row(ref row) = entity.data {
664                    if let Some(ref named) = row.named {
665                        return named.get("migration_id").and_then(|v| val_text(v)) == Some(name);
666                    }
667                }
668                false
669            })
670            .into_iter()
671            .filter_map(|entity| {
672                if let EntityData::Row(ref row) = entity.data {
673                    if let Some(ref named) = row.named {
674                        return named
675                            .get("depends_on_id")
676                            .and_then(|v| val_text(v))
677                            .map(|s| s.to_string());
678                    }
679                }
680                None
681            })
682            .collect()
683    }
684
685    /// ROLLBACK MIGRATION name
686    pub fn execute_rollback_migration(
687        &self,
688        raw_query: &str,
689        q: &RollbackMigrationQuery,
690    ) -> RedDBResult<RuntimeQueryResult> {
691        let store_arc = self.inner.db.store();
692        let store: &UnifiedStore = &store_arc;
693
694        let (_, fields) = find_migration(store, &q.name)
695            .ok_or_else(|| RedDBError::NotFound(format!("migration '{}' not found", q.name)))?;
696
697        if fields
698            .get("no_rollback")
699            .and_then(val_bool)
700            .unwrap_or(false)
701        {
702            return Err(RedDBError::Query(format!(
703                "migration '{}' was declared NO ROLLBACK and cannot be rolled back",
704                q.name
705            )));
706        }
707
708        let status = fields.get("status").and_then(|v| val_text(v)).unwrap_or("");
709
710        if status != "applied" {
711            return Err(RedDBError::Query(format!(
712                "migration '{}' has status '{status}' — only applied migrations can be rolled back",
713                q.name
714            )));
715        }
716
717        let commit_hash = fields
718            .get("vcs_commit_hash")
719            .and_then(|v| val_text(v))
720            .unwrap_or("")
721            .to_string();
722
723        if !commit_hash.is_empty() {
724            let author = migration_author(self);
725            let _ = self.vcs_revert(0, &commit_hash, author);
726        }
727
728        let _ = update_migration_field(store, &q.name, "status", Value::text("pending"));
729        let _ = update_migration_field(store, &q.name, "applied_at", Value::Null);
730        let _ = update_migration_field(store, &q.name, "vcs_commit_hash", Value::Null);
731
732        Ok(RuntimeQueryResult::ok_message(
733            raw_query.to_string(),
734            &format!("migration '{}' rolled back (status: pending)", q.name),
735            "rollback_migration",
736        ))
737    }
738
739    /// EXPLAIN MIGRATION name
740    pub fn execute_explain_migration(
741        &self,
742        raw_query: &str,
743        q: &ExplainMigrationQuery,
744    ) -> RedDBResult<RuntimeQueryResult> {
745        let store_arc = self.inner.db.store();
746        let store: &UnifiedStore = &store_arc;
747
748        let (_, fields) = find_migration(store, &q.name)
749            .ok_or_else(|| RedDBError::NotFound(format!("migration '{}' not found", q.name)))?;
750
751        let status = fields
752            .get("status")
753            .and_then(|v| val_text(v))
754            .unwrap_or("unknown")
755            .to_string();
756        let body = fields
757            .get("body")
758            .and_then(|v| val_text(v))
759            .unwrap_or("")
760            .to_string();
761        let kind = fields
762            .get("kind")
763            .and_then(|v| val_text(v))
764            .unwrap_or("ddl")
765            .to_string();
766
767        let columns = vec![
768            "migration".to_string(),
769            "status".to_string(),
770            "kind".to_string(),
771            "body".to_string(),
772            "estimated_rows".to_string(),
773            "lock_duration_ms".to_string(),
774        ];
775
776        let row: Vec<(String, Value)> = vec![
777            ("migration".to_string(), Value::text(q.name.as_str())),
778            ("status".to_string(), Value::text(status.as_str())),
779            ("kind".to_string(), Value::text(kind.as_str())),
780            ("body".to_string(), Value::text(body.as_str())),
781            ("estimated_rows".to_string(), Value::Null),
782            ("lock_duration_ms".to_string(), Value::UnsignedInteger(0)),
783        ];
784
785        Ok(RuntimeQueryResult::ok_records(
786            raw_query.to_string(),
787            columns,
788            vec![row],
789            "explain_migration",
790        ))
791    }
792}