1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use super::*;
8use crate::application::migration_collections as mc;
9use crate::application::migration_graph;
10use crate::application::migration_inference;
11use crate::application::vcs::{Author, CreateCommitInput};
12use crate::storage::query::ast::{
13 ApplyMigrationQuery, ApplyMigrationTarget, CreateMigrationQuery, ExplainMigrationQuery,
14 RollbackMigrationQuery,
15};
16use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
17
18fn now_ms() -> i64 {
19 SystemTime::now()
20 .duration_since(UNIX_EPOCH)
21 .map(|d| d.as_millis() as i64)
22 .unwrap_or(0)
23}
24
25fn val_text(v: &Value) -> Option<&str> {
26 if let Value::Text(s) = v {
27 Some(s.as_ref())
28 } else {
29 None
30 }
31}
32
33fn val_bool(v: &Value) -> Option<bool> {
34 if let Value::Boolean(b) = v {
35 Some(*b)
36 } else {
37 None
38 }
39}
40
41fn migration_author(rt: &RedDBRuntime) -> Author {
42 let store = rt.inner.db.store();
43 let name = store
44 .get_config("red.vcs.author.name")
45 .and_then(|v| {
46 if let Value::Text(s) = v {
47 Some(s.to_string())
48 } else {
49 None
50 }
51 })
52 .unwrap_or_else(|| "reddb".to_string());
53 let email = store
54 .get_config("red.vcs.author.email")
55 .and_then(|v| {
56 if let Value::Text(s) = v {
57 Some(s.to_string())
58 } else {
59 None
60 }
61 })
62 .unwrap_or_else(|| "reddb@localhost".to_string());
63 Author { name, email }
64}
65
66fn insert_meta_row(
67 store: &UnifiedStore,
68 collection: &str,
69 fields: HashMap<String, Value>,
70) -> RedDBResult<EntityId> {
71 let _ = store.get_or_create_collection(collection);
72 store
73 .insert_auto(
74 collection,
75 UnifiedEntity::new(
76 EntityId::new(0),
77 EntityKind::TableRow {
78 table: Arc::from(collection),
79 row_id: 0,
80 },
81 EntityData::Row(RowData {
82 columns: Vec::new(),
83 named: Some(fields),
84 schema: None,
85 }),
86 ),
87 )
88 .map_err(|e| RedDBError::Internal(e.to_string()))
89}
90
91fn find_migration(
93 store: &UnifiedStore,
94 name: &str,
95) -> Option<(UnifiedEntity, HashMap<String, Value>)> {
96 let manager = store.get_collection(mc::MIGRATIONS)?;
97 let results = manager.query_all(|entity| {
98 if let EntityData::Row(ref row) = entity.data {
99 if let Some(ref named) = row.named {
100 return named.get("name").and_then(|v| val_text(v)) == Some(name);
101 }
102 }
103 false
104 });
105 results.into_iter().find_map(|entity| {
106 if let EntityData::Row(ref row) = entity.data {
107 if let Some(ref named) = row.named {
108 return Some((entity.clone(), named.clone()));
109 }
110 }
111 None
112 })
113}
114
115fn update_migration_field(
117 store: &UnifiedStore,
118 name: &str,
119 key: &str,
120 value: Value,
121) -> RedDBResult<()> {
122 let manager = store
123 .get_collection(mc::MIGRATIONS)
124 .ok_or_else(|| RedDBError::Internal("red_migrations collection not found".to_string()))?;
125 let results = manager.query_all(|entity| {
126 if let EntityData::Row(ref row) = entity.data {
127 if let Some(ref named) = row.named {
128 return named.get("name").and_then(|v| val_text(v)) == Some(name);
129 }
130 }
131 false
132 });
133 for mut entity in results {
134 if let EntityData::Row(ref mut row) = entity.data {
135 if let Some(ref mut named) = row.named {
136 named.insert(key.to_string(), value.clone());
137 manager
138 .update(entity)
139 .map_err(|e| RedDBError::Internal(e.to_string()))?;
140 return Ok(());
141 }
142 }
143 }
144 Err(RedDBError::NotFound(format!(
145 "migration '{name}' not found"
146 )))
147}
148
149fn load_all_edges(store: &UnifiedStore) -> Vec<(String, String)> {
151 let Some(manager) = store.get_collection(mc::MIGRATION_DEPS) else {
152 return Vec::new();
153 };
154 manager
155 .query_all(|_| true)
156 .into_iter()
157 .filter_map(|entity| {
158 if let EntityData::Row(ref row) = entity.data {
159 if let Some(ref named) = row.named {
160 let from = named
161 .get("migration_id")
162 .and_then(|v| val_text(v))?
163 .to_string();
164 let to = named
165 .get("depends_on_id")
166 .and_then(|v| val_text(v))?
167 .to_string();
168 return Some((from, to));
169 }
170 }
171 None
172 })
173 .collect()
174}
175
176fn load_all_migration_names(store: &UnifiedStore) -> Vec<String> {
178 let Some(manager) = store.get_collection(mc::MIGRATIONS) else {
179 return Vec::new();
180 };
181 manager
182 .query_all(|_| true)
183 .into_iter()
184 .filter_map(|entity| {
185 if let EntityData::Row(ref row) = entity.data {
186 if let Some(ref named) = row.named {
187 return named
188 .get("name")
189 .and_then(|v| val_text(v))
190 .map(|s| s.to_string());
191 }
192 }
193 None
194 })
195 .collect()
196}
197
198impl RedDBRuntime {
199 pub fn execute_create_migration(
201 &self,
202 raw_query: &str,
203 q: &CreateMigrationQuery,
204 ) -> RedDBResult<RuntimeQueryResult> {
205 let store_arc = self.inner.db.store();
206 let store: &UnifiedStore = &store_arc;
207
208 if find_migration(store, &q.name).is_some() {
209 return Err(RedDBError::Query(format!(
210 "migration '{}' already exists",
211 q.name
212 )));
213 }
214
215 for dep in &q.depends_on {
216 if find_migration(store, dep).is_none() {
217 return Err(RedDBError::Query(format!(
218 "migration '{dep}' referenced in DEPENDS ON does not exist"
219 )));
220 }
221 }
222
223 let existing_edges = load_all_edges(store);
225 for dep in &q.depends_on {
226 if migration_graph::would_create_cycle(&existing_edges, q.name.as_str(), dep) {
227 return Err(RedDBError::Query(format!(
228 "adding DEPENDS ON '{dep}' to migration '{}' would create a dependency cycle",
229 q.name
230 )));
231 }
232 }
233
234 let mut fields: HashMap<String, Value> = HashMap::new();
235 fields.insert("name".to_string(), Value::text(q.name.as_str()));
236 fields.insert("status".to_string(), Value::text("pending"));
237 fields.insert(
238 "kind".to_string(),
239 Value::text(if q.no_rollback { "data" } else { "ddl" }),
240 );
241 fields.insert("body".to_string(), Value::text(q.body.as_str()));
242 fields.insert(
243 "author".to_string(),
244 Value::text(migration_author(self).name.as_str()),
245 );
246 fields.insert("created_at".to_string(), Value::TimestampMs(now_ms()));
247 fields.insert("applied_at".to_string(), Value::Null);
248 fields.insert("rows_total".to_string(), Value::Null);
249 fields.insert("rows_processed".to_string(), Value::UnsignedInteger(0));
250 fields.insert("vcs_commit_hash".to_string(), Value::Null);
251 fields.insert("no_rollback".to_string(), Value::Boolean(q.no_rollback));
252 fields.insert(
253 "batch_size".to_string(),
254 q.batch_size
255 .map(Value::UnsignedInteger)
256 .unwrap_or(Value::Null),
257 );
258 insert_meta_row(store, mc::MIGRATIONS, fields)?;
259
260 for dep in &q.depends_on {
261 let mut dep_fields: HashMap<String, Value> = HashMap::new();
262 dep_fields.insert("migration_id".to_string(), Value::text(q.name.as_str()));
263 dep_fields.insert("depends_on_id".to_string(), Value::text(dep.as_str()));
264 dep_fields.insert("inferred".to_string(), Value::Boolean(false));
265 insert_meta_row(store, mc::MIGRATION_DEPS, dep_fields)?;
266 }
267
268 let existing_migrations: Vec<(String, String)> = store
270 .get_collection(mc::MIGRATIONS)
271 .map(|manager| {
272 manager
273 .query_all(|_| true)
274 .into_iter()
275 .filter_map(|entity| {
276 if let EntityData::Row(ref row) = entity.data {
277 if let Some(ref named) = row.named {
278 let name = named.get("name").and_then(|v| val_text(v))?.to_string();
279 let body = named.get("body").and_then(|v| val_text(v))?.to_string();
280 return Some((name, body));
281 }
282 }
283 None
284 })
285 .collect()
286 })
287 .unwrap_or_default();
288 let explicit_deps: std::collections::HashSet<String> =
289 q.depends_on.iter().cloned().collect();
290 let inferred_edges = migration_inference::infer_dependencies(
291 q.name.as_str(),
292 q.body.as_str(),
293 &existing_migrations,
294 );
295 for (_, dep) in inferred_edges {
296 if explicit_deps.contains(&dep) {
297 continue; }
299 let current_edges = load_all_edges(store);
301 if !migration_graph::would_create_cycle(¤t_edges, q.name.as_str(), &dep) {
302 let mut dep_fields: HashMap<String, Value> = HashMap::new();
303 dep_fields.insert("migration_id".to_string(), Value::text(q.name.as_str()));
304 dep_fields.insert("depends_on_id".to_string(), Value::text(dep.as_str()));
305 dep_fields.insert("inferred".to_string(), Value::Boolean(true));
306 let _ = insert_meta_row(store, mc::MIGRATION_DEPS, dep_fields);
307 }
308 }
309
310 Ok(RuntimeQueryResult::ok_message(
311 raw_query.to_string(),
312 &format!("migration '{}' registered (pending)", q.name),
313 "create_migration",
314 ))
315 }
316
317 pub fn execute_apply_migration(
319 &self,
320 raw_query: &str,
321 q: &ApplyMigrationQuery,
322 ) -> RedDBResult<RuntimeQueryResult> {
323 if let Some(tenant) = &q.for_tenant {
325 if tenant == "*" {
326 return self.apply_migration_all_tenants(raw_query, q);
327 }
328 crate::runtime::impl_core::set_current_tenant(tenant.clone());
330 }
331
332 let result = match &q.target {
333 ApplyMigrationTarget::Named(name) => self.apply_single_migration(raw_query, name),
334 ApplyMigrationTarget::All => self.apply_all_pending(raw_query),
335 };
336
337 if q.for_tenant.is_some() {
339 crate::runtime::impl_core::clear_current_tenant();
340 }
341
342 result
343 }
344
345 fn apply_all_pending(&self, raw_query: &str) -> RedDBResult<RuntimeQueryResult> {
346 let store_arc = self.inner.db.store();
347 let store: &UnifiedStore = &store_arc;
348 let pending = self.collect_pending_migrations(store);
349 if pending.is_empty() {
350 return Ok(RuntimeQueryResult::ok_message(
351 raw_query.to_string(),
352 "no pending migrations",
353 "apply_migration",
354 ));
355 }
356 let mut applied = 0u32;
357 let mut messages: Vec<String> = Vec::new();
358 for name in pending {
359 match self.apply_single_migration(raw_query, &name) {
360 Ok(_) => {
361 applied += 1;
362 messages.push(format!("applied: {name}"));
363 }
364 Err(e) => {
365 messages.push(format!("failed: {name} — {e}"));
366 break;
367 }
368 }
369 }
370 let summary = messages.join("; ");
371 Ok(RuntimeQueryResult::ok_message(
372 raw_query.to_string(),
373 &format!("applied {applied} migration(s): {summary}"),
374 "apply_migration",
375 ))
376 }
377
378 fn apply_migration_all_tenants(
380 &self,
381 raw_query: &str,
382 q: &ApplyMigrationQuery,
383 ) -> RedDBResult<RuntimeQueryResult> {
384 let tenant_ids = self.list_known_tenants();
385 if tenant_ids.is_empty() {
386 return Ok(RuntimeQueryResult::ok_message(
387 raw_query.to_string(),
388 "no tenants found — nothing applied",
389 "apply_migration",
390 ));
391 }
392 let mut results: Vec<String> = Vec::new();
393 for tenant in &tenant_ids {
394 crate::runtime::impl_core::set_current_tenant(tenant.clone());
395 let inner_q = ApplyMigrationQuery {
396 target: q.target.clone(),
397 for_tenant: None,
398 };
399 match self.execute_apply_migration(raw_query, &inner_q) {
400 Ok(r) => results.push(format!(
401 "tenant={tenant}: {}",
402 r.result
403 .records
404 .first()
405 .and_then(|rec| rec.get("message"))
406 .and_then(|v| val_text(v))
407 .unwrap_or("ok")
408 )),
409 Err(e) => results.push(format!("tenant={tenant}: error — {e}")),
410 }
411 crate::runtime::impl_core::clear_current_tenant();
412 }
413 Ok(RuntimeQueryResult::ok_message(
414 raw_query.to_string(),
415 &results.join("; "),
416 "apply_migration",
417 ))
418 }
419
420 fn list_known_tenants(&self) -> Vec<String> {
422 let auth_store = match self.inner.auth_store.read().clone() {
423 Some(s) => s,
424 None => return Vec::new(),
425 };
426 let users = auth_store.list_users_scoped(None);
427 let mut tenants: std::collections::HashSet<String> = std::collections::HashSet::new();
428 for u in users {
429 if let Some(ref t) = u.tenant_id {
430 tenants.insert(t.clone());
431 }
432 }
433 let mut out: Vec<String> = tenants.into_iter().collect();
434 out.sort();
435 out
436 }
437
438 fn collect_pending_migrations(&self, store: &UnifiedStore) -> Vec<String> {
439 let Some(manager) = store.get_collection(mc::MIGRATIONS) else {
441 return Vec::new();
442 };
443 let pending: Vec<String> = manager
444 .query_all(|entity| {
445 if let EntityData::Row(ref row) = entity.data {
446 if let Some(ref named) = row.named {
447 return named.get("status").and_then(|v| val_text(v)) == Some("pending");
448 }
449 }
450 false
451 })
452 .into_iter()
453 .filter_map(|entity| {
454 if let EntityData::Row(ref row) = entity.data {
455 if let Some(ref named) = row.named {
456 return named
457 .get("name")
458 .and_then(|v| val_text(v))
459 .map(|s| s.to_string());
460 }
461 }
462 None
463 })
464 .collect();
465
466 let all_edges = load_all_edges(store);
469 let pending_set: std::collections::HashSet<&str> =
471 pending.iter().map(|s| s.as_str()).collect();
472 let relevant_edges: Vec<(String, String)> = all_edges
473 .into_iter()
474 .filter(|(m, d)| pending_set.contains(m.as_str()) && pending_set.contains(d.as_str()))
475 .collect();
476
477 match migration_graph::topological_sort(&pending, &relevant_edges) {
478 Ok(sorted) => sorted,
479 Err(_) => pending, }
481 }
482
483 fn apply_single_migration(
484 &self,
485 raw_query: &str,
486 name: &str,
487 ) -> RedDBResult<RuntimeQueryResult> {
488 let store_arc = self.inner.db.store();
489 let store: &UnifiedStore = &store_arc;
490
491 let (_, fields) = find_migration(store, name)
492 .ok_or_else(|| RedDBError::NotFound(format!("migration '{name}' not found")))?;
493
494 let status = fields.get("status").and_then(|v| val_text(v)).unwrap_or("");
495
496 if status == "applied" {
497 return Ok(RuntimeQueryResult::ok_message(
498 raw_query.to_string(),
499 &format!("migration '{name}' is already applied"),
500 "apply_migration",
501 ));
502 }
503
504 let deps = self.load_migration_deps(store, name);
506 for dep in &deps {
507 match find_migration(store, dep) {
508 Some((_, dep_fields)) => {
509 let dep_status = dep_fields
510 .get("status")
511 .and_then(|v| val_text(v))
512 .unwrap_or("");
513 if dep_status != "applied" {
514 return Err(RedDBError::Query(format!(
515 "migration '{name}' depends on '{dep}' which is not yet applied"
516 )));
517 }
518 }
519 None => {
520 return Err(RedDBError::Query(format!(
521 "migration '{name}' depends on '{dep}' which does not exist"
522 )));
523 }
524 }
525 }
526
527 let body = fields
528 .get("body")
529 .and_then(|v| val_text(v))
530 .unwrap_or("")
531 .to_string();
532 let batch_size = match fields.get("batch_size") {
533 Some(Value::UnsignedInteger(n)) => Some(*n),
534 _ => None,
535 };
536 let no_rollback = fields
537 .get("no_rollback")
538 .and_then(val_bool)
539 .unwrap_or(false);
540 let rows_processed_start = match fields.get("rows_processed") {
541 Some(Value::UnsignedInteger(n)) => *n,
542 _ => 0,
543 };
544
545 let apply_result = if let Some(batch) = batch_size {
546 self.apply_batched(store, name, &body, batch, rows_processed_start)
547 } else {
548 self.apply_statements(name, &body)
549 };
550
551 match apply_result {
552 Err(e) => {
553 let err_msg = e.to_string();
554 let _ = update_migration_field(store, name, "status", Value::text("failed"));
555 let _ = update_migration_field(store, name, "error", Value::text(err_msg.as_str()));
556 Err(RedDBError::Query(format!(
557 "migration '{name}' failed: {err_msg}"
558 )))
559 }
560 Ok(rows_processed) => {
561 let author = migration_author(self);
562 let commit_hash = self
563 .vcs_commit(CreateCommitInput {
564 connection_id: 0,
565 message: format!("migration: {name}"),
566 author,
567 committer: None,
568 amend: false,
569 allow_empty: true,
570 })
571 .map(|c| c.hash)
572 .unwrap_or_default();
573
574 let _ = update_migration_field(store, name, "status", Value::text("applied"));
575 let _ =
576 update_migration_field(store, name, "applied_at", Value::TimestampMs(now_ms()));
577 let _ = update_migration_field(
578 store,
579 name,
580 "vcs_commit_hash",
581 Value::text(commit_hash.as_str()),
582 );
583 if batch_size.is_some() {
584 let _ = update_migration_field(
585 store,
586 name,
587 "rows_processed",
588 Value::UnsignedInteger(rows_processed),
589 );
590 }
591 let msg = if no_rollback {
592 format!(
593 "migration '{name}' applied — {rows_processed} rows (no rollback, commit: {commit_hash})"
594 )
595 } else {
596 format!("migration '{name}' applied (commit: {commit_hash})")
597 };
598 Ok(RuntimeQueryResult::ok_message(
599 raw_query.to_string(),
600 &msg,
601 "apply_migration",
602 ))
603 }
604 }
605 }
606
607 fn apply_statements(&self, name: &str, body: &str) -> RedDBResult<u64> {
610 let statements: Vec<&str> = body
611 .split(';')
612 .map(|s| s.trim())
613 .filter(|s| !s.is_empty())
614 .collect();
615 for stmt in statements {
616 self.execute_query(stmt).map_err(|e| {
617 RedDBError::Query(format!("statement in migration '{name}' failed: {e}"))
618 })?;
619 }
620 Ok(0)
621 }
622
623 fn apply_batched(
628 &self,
629 store: &UnifiedStore,
630 name: &str,
631 body: &str,
632 batch_size: u64,
633 initial_processed: u64,
634 ) -> RedDBResult<u64> {
635 let mut total = initial_processed;
636 loop {
637 let batch_body = format!("{body} LIMIT {batch_size}");
638 let result = self.execute_query(&batch_body).map_err(|e| {
639 RedDBError::Query(format!("batch in migration '{name}' failed: {e}"))
640 })?;
641 let affected = result.affected_rows;
642 total += affected;
643 let _ = update_migration_field(
645 store,
646 name,
647 "rows_processed",
648 Value::UnsignedInteger(total),
649 );
650 if affected < batch_size {
651 break;
652 }
653 }
654 Ok(total)
655 }
656
657 fn load_migration_deps(&self, store: &UnifiedStore, name: &str) -> Vec<String> {
658 let Some(manager) = store.get_collection(mc::MIGRATION_DEPS) else {
659 return Vec::new();
660 };
661 manager
662 .query_all(|entity| {
663 if let EntityData::Row(ref row) = entity.data {
664 if let Some(ref named) = row.named {
665 return named.get("migration_id").and_then(|v| val_text(v)) == Some(name);
666 }
667 }
668 false
669 })
670 .into_iter()
671 .filter_map(|entity| {
672 if let EntityData::Row(ref row) = entity.data {
673 if let Some(ref named) = row.named {
674 return named
675 .get("depends_on_id")
676 .and_then(|v| val_text(v))
677 .map(|s| s.to_string());
678 }
679 }
680 None
681 })
682 .collect()
683 }
684
685 pub fn execute_rollback_migration(
687 &self,
688 raw_query: &str,
689 q: &RollbackMigrationQuery,
690 ) -> RedDBResult<RuntimeQueryResult> {
691 let store_arc = self.inner.db.store();
692 let store: &UnifiedStore = &store_arc;
693
694 let (_, fields) = find_migration(store, &q.name)
695 .ok_or_else(|| RedDBError::NotFound(format!("migration '{}' not found", q.name)))?;
696
697 if fields
698 .get("no_rollback")
699 .and_then(val_bool)
700 .unwrap_or(false)
701 {
702 return Err(RedDBError::Query(format!(
703 "migration '{}' was declared NO ROLLBACK and cannot be rolled back",
704 q.name
705 )));
706 }
707
708 let status = fields.get("status").and_then(|v| val_text(v)).unwrap_or("");
709
710 if status != "applied" {
711 return Err(RedDBError::Query(format!(
712 "migration '{}' has status '{status}' — only applied migrations can be rolled back",
713 q.name
714 )));
715 }
716
717 let commit_hash = fields
718 .get("vcs_commit_hash")
719 .and_then(|v| val_text(v))
720 .unwrap_or("")
721 .to_string();
722
723 if !commit_hash.is_empty() {
724 let author = migration_author(self);
725 let _ = self.vcs_revert(0, &commit_hash, author);
726 }
727
728 let _ = update_migration_field(store, &q.name, "status", Value::text("pending"));
729 let _ = update_migration_field(store, &q.name, "applied_at", Value::Null);
730 let _ = update_migration_field(store, &q.name, "vcs_commit_hash", Value::Null);
731
732 Ok(RuntimeQueryResult::ok_message(
733 raw_query.to_string(),
734 &format!("migration '{}' rolled back (status: pending)", q.name),
735 "rollback_migration",
736 ))
737 }
738
739 pub fn execute_explain_migration(
741 &self,
742 raw_query: &str,
743 q: &ExplainMigrationQuery,
744 ) -> RedDBResult<RuntimeQueryResult> {
745 let store_arc = self.inner.db.store();
746 let store: &UnifiedStore = &store_arc;
747
748 let (_, fields) = find_migration(store, &q.name)
749 .ok_or_else(|| RedDBError::NotFound(format!("migration '{}' not found", q.name)))?;
750
751 let status = fields
752 .get("status")
753 .and_then(|v| val_text(v))
754 .unwrap_or("unknown")
755 .to_string();
756 let body = fields
757 .get("body")
758 .and_then(|v| val_text(v))
759 .unwrap_or("")
760 .to_string();
761 let kind = fields
762 .get("kind")
763 .and_then(|v| val_text(v))
764 .unwrap_or("ddl")
765 .to_string();
766
767 let columns = vec![
768 "migration".to_string(),
769 "status".to_string(),
770 "kind".to_string(),
771 "body".to_string(),
772 "estimated_rows".to_string(),
773 "lock_duration_ms".to_string(),
774 ];
775
776 let row: Vec<(String, Value)> = vec![
777 ("migration".to_string(), Value::text(q.name.as_str())),
778 ("status".to_string(), Value::text(status.as_str())),
779 ("kind".to_string(), Value::text(kind.as_str())),
780 ("body".to_string(), Value::text(body.as_str())),
781 ("estimated_rows".to_string(), Value::Null),
782 ("lock_duration_ms".to_string(), Value::UnsignedInteger(0)),
783 ];
784
785 Ok(RuntimeQueryResult::ok_records(
786 raw_query.to_string(),
787 columns,
788 vec![row],
789 "explain_migration",
790 ))
791 }
792}