1use crate::db::{pool::Pool, Dialect};
4use crate::error::AppError;
5use chrono::{DateTime, Utc};
6use std::collections::HashMap;
7
8pub fn architect_schema() -> String {
10 std::env::var("ARCHITECT_SCHEMA").unwrap_or_else(|_| "architect".into())
11}
12
13pub fn qualified_sys_table(table: &str) -> String {
15 format!("{}.{}", architect_schema(), table)
16}
17
18const CONFIG_TABLES: &[&str] = &[
20 "_sys_schemas",
21 "_sys_enums",
22 "_sys_tables",
23 "_sys_columns",
24 "_sys_indexes",
25 "_sys_relationships",
26 "_sys_api_entities",
27 "_sys_kv_stores",
28];
29
30pub const DEFAULT_PACKAGE_ID: &str = "_default";
32
33pub async fn ensure_sys_tables(pool: &Pool, dialect: &dyn Dialect) -> Result<(), AppError> {
36 let schema = architect_schema();
37 if dialect.supports_schemas() {
38 sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema))
39 .execute(pool)
40 .await?;
41 }
42
43 for table in CONFIG_TABLES {
44 let q_table = qualified_sys_table(table);
45 let ddl = format!(
46 "CREATE TABLE IF NOT EXISTS {} (\
47 id TEXT NOT NULL, \
48 package_id TEXT NOT NULL, \
49 payload {} NOT NULL, \
50 updated_at {} NOT NULL DEFAULT {}, \
51 version BIGINT NOT NULL DEFAULT 1, \
52 PRIMARY KEY (id, package_id)\
53 )",
54 q_table,
55 dialect.sys_json_type(),
56 dialect.sys_timestamp_type(),
57 dialect.now_fn(),
58 );
59 sqlx::query(&ddl).execute(pool).await?;
60 let alter_version = format!(
61 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS version BIGINT NOT NULL DEFAULT 1",
62 q_table
63 );
64 let _ = sqlx::query(&alter_version).execute(pool).await;
65 let alter_package = format!(
66 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS package_id TEXT NOT NULL DEFAULT '{}'",
67 q_table, DEFAULT_PACKAGE_ID
68 );
69 let _ = sqlx::query(&alter_package).execute(pool).await;
70
71 let history_table = qualified_sys_table(&format!("{}_history", table));
72 let history_ddl = format!(
73 "CREATE TABLE IF NOT EXISTS {} (\
74 id TEXT NOT NULL, \
75 package_id TEXT NOT NULL, \
76 payload {} NOT NULL, \
77 version BIGINT NOT NULL, \
78 created_at {} NOT NULL DEFAULT {}, \
79 PRIMARY KEY (id, package_id, version)\
80 )",
81 history_table,
82 dialect.sys_json_type(),
83 dialect.sys_timestamp_type(),
84 dialect.now_fn(),
85 );
86 sqlx::query(&history_ddl).execute(pool).await?;
87 let alter_history_package = format!(
88 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS package_id TEXT NOT NULL DEFAULT '{}'",
89 history_table, DEFAULT_PACKAGE_ID
90 );
91 let _ = sqlx::query(&alter_history_package).execute(pool).await;
92 }
93
94 let q_packages = qualified_sys_table("_sys_packages");
95 let packages_ddl = format!(
96 "CREATE TABLE IF NOT EXISTS {} (\
97 id TEXT PRIMARY KEY, \
98 payload {} NOT NULL, \
99 updated_at {} NOT NULL DEFAULT {}, \
100 version BIGINT NOT NULL DEFAULT 1, \
101 semantic_version TEXT\
102 )",
103 q_packages,
104 dialect.sys_json_type(),
105 dialect.sys_timestamp_type(),
106 dialect.now_fn(),
107 );
108 sqlx::query(&packages_ddl).execute(pool).await?;
109 let alter_pkg_semver = format!(
110 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS semantic_version TEXT",
111 q_packages
112 );
113 let _ = sqlx::query(&alter_pkg_semver).execute(pool).await;
114 let q_packages_history = qualified_sys_table("_sys_packages_history");
115 let packages_history_ddl = format!(
116 "CREATE TABLE IF NOT EXISTS {} (\
117 id TEXT NOT NULL, \
118 payload {} NOT NULL, \
119 version BIGINT NOT NULL, \
120 created_at {} NOT NULL DEFAULT {}, \
121 semantic_version TEXT, \
122 PRIMARY KEY (id, version)\
123 )",
124 q_packages_history,
125 dialect.sys_json_type(),
126 dialect.sys_timestamp_type(),
127 dialect.now_fn(),
128 );
129 sqlx::query(&packages_history_ddl).execute(pool).await?;
130 let alter_pkg_hist_semver = format!(
131 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS semantic_version TEXT",
132 q_packages_history
133 );
134 let _ = sqlx::query(&alter_pkg_hist_semver).execute(pool).await;
135 let add_history_id = format!(
137 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS history_id {}",
138 q_packages_history,
139 dialect.sys_bigserial_type()
140 );
141 let _ = sqlx::query(&add_history_id).execute(pool).await;
142 let drop_old_pk = format!(
143 "ALTER TABLE {} DROP CONSTRAINT IF EXISTS _sys_packages_history_pkey",
144 q_packages_history
145 );
146 let _ = sqlx::query(&drop_old_pk).execute(pool).await;
147 if dialect.name() == "postgres" {
148 let add_new_pk_cond = format!(
149 "DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = '_sys_packages_history_history_id_pkey') THEN \
150 ALTER TABLE {} ADD CONSTRAINT _sys_packages_history_history_id_pkey PRIMARY KEY (history_id); END IF; END $$",
151 q_packages_history
152 );
153 let _ = sqlx::query(&add_new_pk_cond).execute(pool).await;
154 }
155
156 let q_tenants = qualified_sys_table("_sys_tenants");
157 let tenants_ddl = format!(
158 "CREATE TABLE IF NOT EXISTS {} (\
159 id TEXT PRIMARY KEY, \
160 strategy TEXT NOT NULL, \
161 database_url TEXT, \
162 updated_at {} NOT NULL DEFAULT {}, \
163 comment TEXT\
164 )",
165 q_tenants,
166 dialect.sys_timestamp_type(),
167 dialect.now_fn(),
168 );
169 sqlx::query(&tenants_ddl).execute(pool).await?;
170 let drop_schema_name = format!(
171 "ALTER TABLE {} DROP COLUMN IF EXISTS schema_name",
172 q_tenants
173 );
174 let _ = sqlx::query(&drop_schema_name).execute(pool).await;
175
176 let q_kv_data = qualified_sys_table("_sys_kv_data");
177 let kv_data_ddl = format!(
178 "CREATE TABLE IF NOT EXISTS {} (\
179 tenant_id TEXT NOT NULL, \
180 package_id TEXT NOT NULL, \
181 namespace TEXT NOT NULL, \
182 key TEXT NOT NULL, \
183 value {} NOT NULL, \
184 updated_at {} NOT NULL DEFAULT {}, \
185 PRIMARY KEY (tenant_id, package_id, namespace, key)\
186 )",
187 q_kv_data,
188 dialect.sys_json_type(),
189 dialect.sys_timestamp_type(),
190 dialect.now_fn(),
191 );
192 sqlx::query(&kv_data_ddl).execute(pool).await?;
193 let alter_kv_tenant = format!(
195 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS tenant_id TEXT NOT NULL DEFAULT '_shared'",
196 q_kv_data
197 );
198 let _ = sqlx::query(&alter_kv_tenant).execute(pool).await;
199 let drop_pk = format!(
200 "ALTER TABLE {} DROP CONSTRAINT IF EXISTS _sys_kv_data_pkey",
201 q_kv_data
202 );
203 let _ = sqlx::query(&drop_pk).execute(pool).await;
204 let add_pk = format!(
205 "ALTER TABLE {} ADD PRIMARY KEY (tenant_id, package_id, namespace, key)",
206 q_kv_data
207 );
208 let _ = sqlx::query(&add_pk).execute(pool).await;
209 if dialect.name() == "postgres" {
211 let alter_value_json = format!(
212 "ALTER TABLE {} ALTER COLUMN value TYPE JSONB USING value::jsonb",
213 q_kv_data
214 );
215 let _ = sqlx::query(&alter_value_json).execute(pool).await;
216 }
217
218 ensure_migration_tables(pool, dialect).await?;
219
220 Ok(())
221}
222
223async fn ensure_migration_tables(pool: &Pool, dialect: &dyn Dialect) -> Result<(), AppError> {
225 let q_plans = qualified_sys_table("_sys_migration_plans");
226 let expires_at_col = match dialect.default_now_plus_hours(24) {
227 Some(expr) => format!(
228 "expires_at {} NOT NULL DEFAULT {}",
229 dialect.sys_timestamp_type(),
230 expr
231 ),
232 None => format!("expires_at {}", dialect.sys_timestamp_type()),
233 };
234 sqlx::query(&format!(
235 "CREATE TABLE IF NOT EXISTS {} (\
236 id TEXT PRIMARY KEY, \
237 package_id TEXT NOT NULL, \
238 tenant_id TEXT NOT NULL, \
239 from_version TEXT, \
240 to_version TEXT NOT NULL, \
241 plan_json {} NOT NULL, \
242 zip_bytes {} NOT NULL, \
243 status TEXT NOT NULL DEFAULT 'pending', \
244 created_at {} NOT NULL DEFAULT {}, \
245 {}, \
246 applied_at {}\
247 )",
248 q_plans,
249 dialect.sys_json_type(),
250 dialect.sys_bytes_type(),
251 dialect.sys_timestamp_type(),
252 dialect.now_fn(),
253 expires_at_col,
254 dialect.sys_timestamp_type(),
255 ))
256 .execute(pool)
257 .await?;
258
259 let q_audit = qualified_sys_table("_sys_migration_audit");
260 sqlx::query(&format!(
261 "CREATE TABLE IF NOT EXISTS {} (\
262 id {} PRIMARY KEY, \
263 migration_plan_id TEXT NOT NULL, \
264 package_id TEXT NOT NULL, \
265 tenant_id TEXT NOT NULL, \
266 from_version TEXT, \
267 to_version TEXT NOT NULL, \
268 step_number INT NOT NULL, \
269 operation TEXT NOT NULL, \
270 schema_name TEXT NOT NULL, \
271 table_name TEXT, \
272 object_name TEXT NOT NULL, \
273 object_type TEXT NOT NULL, \
274 description TEXT NOT NULL, \
275 ddl TEXT, \
276 safety TEXT NOT NULL, \
277 risk TEXT NOT NULL, \
278 status TEXT NOT NULL, \
279 error_message TEXT, \
280 executed_at {} NOT NULL DEFAULT {}\
281 )",
282 q_audit,
283 dialect.sys_bigserial_type(),
284 dialect.sys_timestamp_type(),
285 dialect.now_fn(),
286 ))
287 .execute(pool)
288 .await?;
289
290 Ok(())
291}
292
293pub struct MigrationPlanRow {
295 pub id: String,
296 pub package_id: String,
297 pub tenant_id: String,
298 pub from_version: Option<String>,
299 pub to_version: String,
300 pub plan_json: serde_json::Value,
301 pub zip_bytes: Vec<u8>,
302 pub status: String,
303 pub created_at: DateTime<Utc>,
304 pub expires_at: DateTime<Utc>,
305 pub applied_at: Option<DateTime<Utc>>,
306}
307
308#[allow(clippy::too_many_arguments)]
310pub async fn save_migration_plan(
311 pool: &Pool,
312 id: &str,
313 package_id: &str,
314 tenant_id: &str,
315 from_version: Option<&str>,
316 to_version: &str,
317 plan_json: &serde_json::Value,
318 zip_bytes: &[u8],
319) -> Result<(), AppError> {
320 let q = qualified_sys_table("_sys_migration_plans");
321 sqlx::query(&format!(
322 "INSERT INTO {} (id, package_id, tenant_id, from_version, to_version, plan_json, zip_bytes, status, created_at, expires_at) \
323 VALUES ($1, $2, $3, $4, $5, $6, $7, 'pending', NOW(), NOW() + INTERVAL '24 hours')",
324 q
325 ))
326 .bind(id)
327 .bind(package_id)
328 .bind(tenant_id)
329 .bind(from_version)
330 .bind(to_version)
331 .bind(plan_json)
332 .bind(zip_bytes)
333 .execute(pool)
334 .await?;
335 Ok(())
336}
337
338pub async fn get_migration_plan(
340 pool: &Pool,
341 id: &str,
342) -> Result<Option<MigrationPlanRow>, AppError> {
343 let q = qualified_sys_table("_sys_migration_plans");
344 #[allow(clippy::type_complexity)]
345 let row: Option<(String, String, String, Option<String>, String, serde_json::Value, Vec<u8>, String, DateTime<Utc>, DateTime<Utc>, Option<DateTime<Utc>>)> =
346 sqlx::query_as(&format!(
347 "SELECT id, package_id, tenant_id, from_version, to_version, plan_json, zip_bytes, status, created_at, expires_at, applied_at FROM {} WHERE id = $1",
348 q
349 ))
350 .bind(id)
351 .fetch_optional(pool)
352 .await
353 .map_err(AppError::Db)?;
354 Ok(row.map(
355 |(
356 id,
357 package_id,
358 tenant_id,
359 from_version,
360 to_version,
361 plan_json,
362 zip_bytes,
363 status,
364 created_at,
365 expires_at,
366 applied_at,
367 )| {
368 MigrationPlanRow {
369 id,
370 package_id,
371 tenant_id,
372 from_version,
373 to_version,
374 plan_json,
375 zip_bytes,
376 status,
377 created_at,
378 expires_at,
379 applied_at,
380 }
381 },
382 ))
383}
384
385pub async fn mark_migration_plan_applied(pool: &Pool, id: &str) -> Result<bool, AppError> {
387 let q = qualified_sys_table("_sys_migration_plans");
388 let result = sqlx::query(&format!(
389 "UPDATE {} SET status = 'applied', applied_at = NOW() WHERE id = $1 AND status = 'pending'",
390 q
391 ))
392 .bind(id)
393 .execute(pool)
394 .await?;
395 Ok(result.rows_affected() > 0)
396}
397
398#[allow(clippy::too_many_arguments)]
400pub async fn insert_migration_audit(
401 pool: &Pool,
402 migration_plan_id: &str,
403 package_id: &str,
404 tenant_id: &str,
405 from_version: Option<&str>,
406 to_version: &str,
407 step_number: i32,
408 operation: &str,
409 schema_name: &str,
410 table_name: Option<&str>,
411 object_name: &str,
412 object_type: &str,
413 description: &str,
414 ddl: Option<&str>,
415 safety: &str,
416 risk: &str,
417 status: &str,
418 error_message: Option<&str>,
419) -> Result<(), AppError> {
420 let q = qualified_sys_table("_sys_migration_audit");
421 sqlx::query(&format!(
422 "INSERT INTO {} (migration_plan_id, package_id, tenant_id, from_version, to_version, step_number, operation, schema_name, table_name, object_name, object_type, description, ddl, safety, risk, status, error_message, executed_at) \
423 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,NOW())",
424 q
425 ))
426 .bind(migration_plan_id)
427 .bind(package_id)
428 .bind(tenant_id)
429 .bind(from_version)
430 .bind(to_version)
431 .bind(step_number)
432 .bind(operation)
433 .bind(schema_name)
434 .bind(table_name)
435 .bind(object_name)
436 .bind(object_type)
437 .bind(description)
438 .bind(ddl)
439 .bind(safety)
440 .bind(risk)
441 .bind(status)
442 .bind(error_message)
443 .execute(pool)
444 .await?;
445 Ok(())
446}
447
448fn config_record_id(table: &str, rec: &serde_json::Value) -> Result<String, AppError> {
450 let id = rec.get("id").and_then(|v| v.as_str());
451 let entity_id = rec.get("entity_id").and_then(|v| v.as_str());
452 match (table, id, entity_id) {
453 ("_sys_api_entities", None, Some(eid)) => Ok(eid.to_string()),
454 (_, Some(id), _) => Ok(id.to_string()),
455 _ => Err(AppError::BadRequest(
456 "each config record must have an 'id' field (or 'entity_id' for api_entities)".into(),
457 )),
458 }
459}
460
461fn config_payloads_unchanged(
464 table: &str,
465 current: &HashMap<String, serde_json::Value>,
466 records: &[serde_json::Value],
467) -> Result<bool, AppError> {
468 if current.len() != records.len() {
469 return Ok(false);
470 }
471 for rec in records {
472 let id = config_record_id(table, rec)?;
473 match current.get(&id) {
474 None => return Ok(false),
475 Some(existing) if existing != rec => return Ok(false),
476 Some(_) => {}
477 }
478 }
479 Ok(true)
480}
481
482pub async fn replace_config_rows(
486 tx: &mut crate::db::pool::Connection,
487 table: &str,
488 package_id: &str,
489 records: &[serde_json::Value],
490) -> Result<(u64, i64), AppError> {
491 let q_table = qualified_sys_table(table);
492 let current_version: (Option<i64>,) = sqlx::query_as(&format!(
493 "SELECT COALESCE(MAX(version), 0) FROM {} WHERE package_id = $1",
494 q_table
495 ))
496 .bind(package_id)
497 .fetch_one(&mut *tx)
498 .await
499 .map_err(AppError::Db)?;
500 let current_version = current_version.0.unwrap_or(0);
501
502 let rows: Vec<(String, serde_json::Value)> = sqlx::query_as(&format!(
503 "SELECT id, payload FROM {} WHERE package_id = $1",
504 q_table
505 ))
506 .bind(package_id)
507 .fetch_all(&mut *tx)
508 .await
509 .map_err(AppError::Db)?;
510 let current: HashMap<String, serde_json::Value> = rows.into_iter().collect();
511
512 if config_payloads_unchanged(table, ¤t, records)? {
513 return Ok((0, current_version));
514 }
515
516 let history_table = qualified_sys_table(&format!("{}_history", table));
517 let new_version = current_version + 1;
518
519 sqlx::query(&format!(
520 "INSERT INTO {} (id, package_id, payload, version, created_at) SELECT id, package_id, payload, version, updated_at FROM {} WHERE package_id = $1",
521 history_table, q_table
522 ))
523 .bind(package_id)
524 .execute(&mut *tx)
525 .await?;
526
527 sqlx::query(&format!("DELETE FROM {} WHERE package_id = $1", q_table))
528 .bind(package_id)
529 .execute(&mut *tx)
530 .await?;
531
532 let mut count = 0u64;
533 for rec in records {
534 let id = config_record_id(table, rec)?;
535 sqlx::query(&format!(
536 "INSERT INTO {} (id, package_id, payload, updated_at, version) VALUES ($1, $2, $3, NOW(), $4)",
537 q_table
538 ))
539 .bind(&id)
540 .bind(package_id)
541 .bind(rec)
542 .bind(new_version)
543 .execute(&mut *tx)
544 .await?;
545 count += 1;
546 }
547 Ok((count, new_version))
548}
549
550const PACKAGES_TABLE: &str = "_sys_packages";
551const PACKAGES_HISTORY_TABLE: &str = "_sys_packages_history";
552
553pub struct PackageRow {
554 pub id: String,
555 pub payload: serde_json::Value,
556 pub version: i64,
557 pub updated_at: DateTime<Utc>,
558 pub semantic_version: Option<String>,
559}
560
561pub async fn list_packages(pool: &Pool) -> Result<Vec<PackageRow>, AppError> {
563 let q = qualified_sys_table(PACKAGES_TABLE);
564 #[allow(clippy::type_complexity)]
565 let rows: Vec<(
566 String,
567 serde_json::Value,
568 i64,
569 DateTime<Utc>,
570 Option<String>,
571 )> = sqlx::query_as(&format!(
572 "SELECT id, payload, version, updated_at, semantic_version FROM {} ORDER BY id",
573 q
574 ))
575 .fetch_all(pool)
576 .await
577 .map_err(AppError::Db)?;
578 Ok(rows
579 .into_iter()
580 .map(
581 |(id, payload, version, updated_at, semantic_version)| PackageRow {
582 id,
583 payload,
584 version,
585 updated_at,
586 semantic_version,
587 },
588 )
589 .collect())
590}
591
592pub async fn get_package(pool: &Pool, id: &str) -> Result<Option<PackageRow>, AppError> {
594 let q = qualified_sys_table(PACKAGES_TABLE);
595 #[allow(clippy::type_complexity)]
596 let row: Option<(
597 String,
598 serde_json::Value,
599 i64,
600 DateTime<Utc>,
601 Option<String>,
602 )> = sqlx::query_as(&format!(
603 "SELECT id, payload, version, updated_at, semantic_version FROM {} WHERE id = $1",
604 q
605 ))
606 .bind(id)
607 .fetch_optional(pool)
608 .await
609 .map_err(AppError::Db)?;
610 Ok(row.map(
611 |(id, payload, version, updated_at, semantic_version)| PackageRow {
612 id,
613 payload,
614 version,
615 updated_at,
616 semantic_version,
617 },
618 ))
619}
620
621pub async fn count_package_kind(
623 pool: &Pool,
624 kind: &str,
625 package_id: &str,
626) -> Result<i64, AppError> {
627 let table = sys_table_for_kind(kind)
628 .ok_or_else(|| AppError::BadRequest(format!("unknown config kind: {}", kind)))?;
629 let q = qualified_sys_table(table);
630 let (count,): (i64,) =
631 sqlx::query_as(&format!("SELECT COUNT(*) FROM {} WHERE package_id = $1", q))
632 .bind(package_id)
633 .fetch_one(pool)
634 .await
635 .map_err(AppError::Db)?;
636 Ok(count)
637}
638
639pub async fn list_package_ids(pool: &Pool) -> Result<Vec<String>, AppError> {
641 let q = qualified_sys_table(PACKAGES_TABLE);
642 let rows: Vec<(String,)> = sqlx::query_as(&format!("SELECT id FROM {} ORDER BY id", q))
643 .fetch_all(pool)
644 .await
645 .map_err(AppError::Db)?;
646 Ok(rows.into_iter().map(|(id,)| id).collect())
647}
648
649pub async fn upsert_package(
652 pool: &Pool,
653 id: &str,
654 payload: &serde_json::Value,
655) -> Result<i64, AppError> {
656 let semantic_version = payload
657 .get("version")
658 .and_then(serde_json::Value::as_str)
659 .map(String::from)
660 .unwrap_or_default();
661
662 let q_packages = qualified_sys_table(PACKAGES_TABLE);
663 let q_packages_history = qualified_sys_table(PACKAGES_HISTORY_TABLE);
664 let mut tx = pool.begin().await?;
665 let current: Option<(serde_json::Value, i64, Option<String>)> = sqlx::query_as(&format!(
666 "SELECT payload, version, semantic_version FROM {} WHERE id = $1",
667 q_packages
668 ))
669 .bind(id)
670 .fetch_optional(&mut *tx)
671 .await
672 .map_err(AppError::Db)?;
673
674 let new_version = match ¤t {
675 Some((_, v, Some(ref old_semver))) if *old_semver == semantic_version => *v,
676 Some((_, v, _)) => v + 1,
677 None => 1,
678 };
679
680 if let Some((old_payload, old_version, old_semver)) = current {
681 sqlx::query(&format!(
682 "INSERT INTO {} (id, payload, version, created_at, semantic_version) VALUES ($1, $2, $3, NOW(), $4)",
683 q_packages_history
684 ))
685 .bind(id)
686 .bind(old_payload)
687 .bind(old_version)
688 .bind(old_semver)
689 .execute(&mut *tx)
690 .await?;
691 }
692
693 sqlx::query(&format!("DELETE FROM {} WHERE id = $1", q_packages))
694 .bind(id)
695 .execute(&mut *tx)
696 .await?;
697
698 let semver_param: Option<&str> = if semantic_version.is_empty() {
699 None
700 } else {
701 Some(semantic_version.as_str())
702 };
703 sqlx::query(&format!(
704 "INSERT INTO {} (id, payload, updated_at, version, semantic_version) VALUES ($1, $2, NOW(), $3, $4)",
705 q_packages
706 ))
707 .bind(id)
708 .bind(payload)
709 .bind(new_version)
710 .bind(semver_param)
711 .execute(&mut *tx)
712 .await?;
713
714 tx.commit().await?;
715 Ok(new_version)
716}
717
718pub async fn delete_package_and_config(pool: &Pool, package_id: &str) -> Result<(), AppError> {
721 let q_packages = qualified_sys_table(PACKAGES_TABLE);
722 let q_packages_history = qualified_sys_table(PACKAGES_HISTORY_TABLE);
723 let q_kv_data = qualified_sys_table("_sys_kv_data");
724
725 let mut tx = pool.begin().await?;
726
727 let current: Option<(serde_json::Value, i64, Option<String>)> = sqlx::query_as(&format!(
729 "SELECT payload, version, semantic_version FROM {} WHERE id = $1",
730 q_packages
731 ))
732 .bind(package_id)
733 .fetch_optional(&mut *tx)
734 .await
735 .map_err(AppError::Db)?;
736
737 if let Some((payload, version, semantic_version)) = current {
738 sqlx::query(&format!(
739 "INSERT INTO {} (id, payload, version, created_at, semantic_version) VALUES ($1, $2, $3, NOW(), $4)",
740 q_packages_history
741 ))
742 .bind(package_id)
743 .bind(payload)
744 .bind(version)
745 .bind(semantic_version)
746 .execute(&mut *tx)
747 .await?;
748 }
749
750 for table in CONFIG_TABLES {
752 let q_table = qualified_sys_table(table);
753 sqlx::query(&format!("DELETE FROM {} WHERE package_id = $1", q_table))
754 .bind(package_id)
755 .execute(&mut *tx)
756 .await?;
757 let history_table = qualified_sys_table(&format!("{}_history", table));
758 sqlx::query(&format!(
759 "DELETE FROM {} WHERE package_id = $1",
760 history_table
761 ))
762 .bind(package_id)
763 .execute(&mut *tx)
764 .await?;
765 }
766
767 sqlx::query(&format!("DELETE FROM {} WHERE package_id = $1", q_kv_data))
769 .bind(package_id)
770 .execute(&mut *tx)
771 .await?;
772
773 sqlx::query(&format!("DELETE FROM {} WHERE id = $1", q_packages))
775 .bind(package_id)
776 .execute(&mut *tx)
777 .await?;
778
779 tx.commit().await?;
780 Ok(())
781}
782
783pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<Pool, AppError> {
789 #[cfg(feature = "postgres")]
790 return sqlx::postgres::PgPoolOptions::new()
791 .max_connections(max_connections)
792 .connect(database_url)
793 .await
794 .map_err(AppError::Db);
795
796 #[cfg(feature = "mysql")]
797 return sqlx::mysql::MySqlPoolOptions::new()
798 .max_connections(max_connections)
799 .connect(database_url)
800 .await
801 .map_err(AppError::Db);
802
803 #[cfg(feature = "sqlite")]
804 return sqlx::sqlite::SqlitePoolOptions::new()
805 .max_connections(max_connections)
806 .connect(database_url)
807 .await
808 .map_err(AppError::Db);
809
810 #[cfg(not(any(feature = "postgres", feature = "mysql", feature = "sqlite")))]
811 Err(AppError::BadRequest(
812 "No database dialect feature enabled. Enable one of: postgres, mysql, sqlite.".into(),
813 ))
814}
815
816pub async fn ensure_database_exists(database_url: &str) -> Result<(), AppError> {
823 #[cfg(feature = "postgres")]
824 {
825 use sqlx::ConnectOptions as _;
826 use std::str::FromStr;
827 let (admin_url, db_name) = parse_db_name_from_url(database_url)?;
828 if db_name.is_empty() || db_name == "postgres" {
829 return Ok(());
830 }
831 let opts = sqlx::postgres::PgConnectOptions::from_str(&admin_url)
832 .map_err(|e| AppError::BadRequest(format!("invalid DATABASE_URL: {}", e)))?;
833 let mut conn: sqlx::PgConnection = opts.connect().await.map_err(AppError::Db)?;
834 let exists: (bool,) =
835 sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)")
836 .bind(&db_name)
837 .fetch_one(&mut conn)
838 .await
839 .map_err(AppError::Db)?;
840 if !exists.0 {
841 let quoted = quote_ident(&db_name);
842 sqlx::query(&format!("CREATE DATABASE {}", quoted))
843 .execute(&mut conn)
844 .await
845 .map_err(AppError::Db)?;
846 }
847 }
848 #[cfg(not(feature = "postgres"))]
849 let _ = database_url;
850 Ok(())
851}
852
853#[cfg(feature = "postgres")]
854fn parse_db_name_from_url(url: &str) -> Result<(String, String), AppError> {
855 let path_start = url
856 .rfind('/')
857 .ok_or_else(|| AppError::BadRequest("DATABASE_URL: no path".into()))?
858 + 1;
859 let path_and_query = url.get(path_start..).unwrap_or("");
860 let db_name = path_and_query.split('?').next().unwrap_or("").trim();
861 let base = url.get(..path_start).unwrap_or(url);
862 let admin_url = format!("{}postgres", base);
863 Ok((admin_url, db_name.to_string()))
864}
865
866#[cfg(feature = "postgres")]
867fn quote_ident(name: &str) -> String {
868 format!("\"{}\"", name.replace('\\', "\\\\").replace('"', "\\\""))
869}
870
871pub fn sys_table_for_kind(kind: &str) -> Option<&'static str> {
872 match kind {
873 "schemas" => Some("_sys_schemas"),
874 "enums" => Some("_sys_enums"),
875 "tables" => Some("_sys_tables"),
876 "columns" => Some("_sys_columns"),
877 "indexes" => Some("_sys_indexes"),
878 "relationships" => Some("_sys_relationships"),
879 "api_entities" => Some("_sys_api_entities"),
880 "kv_stores" => Some("_sys_kv_stores"),
881 _ => None,
882 }
883}