Skip to main content

architect_sdk/
store.rs

1//! _sys_* table DDL and config persistence. All _sys_* tables live in a schema named from `ARCHITECT_SCHEMA` env (default `architect`).
2
3use crate::db::{pool::Pool, Dialect};
4use crate::error::AppError;
5use chrono::{DateTime, Utc};
6use std::collections::HashMap;
7
8/// Schema name for _sys_* tables. From env `ARCHITECT_SCHEMA`, default `architect`. Must be a valid PostgreSQL identifier.
9pub fn architect_schema() -> String {
10    std::env::var("ARCHITECT_SCHEMA").unwrap_or_else(|_| "architect".into())
11}
12
13/// Returns schema-qualified table name for _sys_* tables (e.g. "architect._sys_schemas").
14pub fn qualified_sys_table(table: &str) -> String {
15    format!("{}.{}", architect_schema(), table)
16}
17
18/// Config tables (each row is keyed by id + package_id). Excludes _sys_packages.
19const CONFIG_TABLES: &[&str] = &[
20    "_sys_schemas",
21    "_sys_enums",
22    "_sys_tables",
23    "_sys_columns",
24    "_sys_indexes",
25    "_sys_relationships",
26    "_sys_api_entities",
27    "_sys_kv_stores",
28];
29
30/// Package id used when config is posted directly (no package install). Ensures (id, package_id) is unique per package.
31pub const DEFAULT_PACKAGE_ID: &str = "_default";
32
33/// Create schema from `ARCHITECT_SCHEMA` env if not exists, then _sys_* tables.
34/// Config tables have (id, package_id) as composite primary key; _sys_packages has id only.
35pub async fn ensure_sys_tables(pool: &Pool, dialect: &dyn Dialect) -> Result<(), AppError> {
36    let schema = architect_schema();
37    if dialect.supports_schemas() {
38        sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema))
39            .execute(pool)
40            .await?;
41    }
42
43    for table in CONFIG_TABLES {
44        let q_table = qualified_sys_table(table);
45        let ddl = format!(
46            "CREATE TABLE IF NOT EXISTS {} (\
47                id TEXT NOT NULL, \
48                package_id TEXT NOT NULL, \
49                payload {} NOT NULL, \
50                updated_at {} NOT NULL DEFAULT {}, \
51                version BIGINT NOT NULL DEFAULT 1, \
52                PRIMARY KEY (id, package_id)\
53            )",
54            q_table,
55            dialect.sys_json_type(),
56            dialect.sys_timestamp_type(),
57            dialect.now_fn(),
58        );
59        sqlx::query(&ddl).execute(pool).await?;
60        let alter_version = format!(
61            "ALTER TABLE {} ADD COLUMN IF NOT EXISTS version BIGINT NOT NULL DEFAULT 1",
62            q_table
63        );
64        let _ = sqlx::query(&alter_version).execute(pool).await;
65        let alter_package = format!(
66            "ALTER TABLE {} ADD COLUMN IF NOT EXISTS package_id TEXT NOT NULL DEFAULT '{}'",
67            q_table, DEFAULT_PACKAGE_ID
68        );
69        let _ = sqlx::query(&alter_package).execute(pool).await;
70
71        let history_table = qualified_sys_table(&format!("{}_history", table));
72        let history_ddl = format!(
73            "CREATE TABLE IF NOT EXISTS {} (\
74                id TEXT NOT NULL, \
75                package_id TEXT NOT NULL, \
76                payload {} NOT NULL, \
77                version BIGINT NOT NULL, \
78                created_at {} NOT NULL DEFAULT {}, \
79                PRIMARY KEY (id, package_id, version)\
80            )",
81            history_table,
82            dialect.sys_json_type(),
83            dialect.sys_timestamp_type(),
84            dialect.now_fn(),
85        );
86        sqlx::query(&history_ddl).execute(pool).await?;
87        let alter_history_package = format!(
88            "ALTER TABLE {} ADD COLUMN IF NOT EXISTS package_id TEXT NOT NULL DEFAULT '{}'",
89            history_table, DEFAULT_PACKAGE_ID
90        );
91        let _ = sqlx::query(&alter_history_package).execute(pool).await;
92    }
93
94    let q_packages = qualified_sys_table("_sys_packages");
95    let packages_ddl = format!(
96        "CREATE TABLE IF NOT EXISTS {} (\
97            id TEXT PRIMARY KEY, \
98            payload {} NOT NULL, \
99            updated_at {} NOT NULL DEFAULT {}, \
100            version BIGINT NOT NULL DEFAULT 1, \
101            semantic_version TEXT\
102        )",
103        q_packages,
104        dialect.sys_json_type(),
105        dialect.sys_timestamp_type(),
106        dialect.now_fn(),
107    );
108    sqlx::query(&packages_ddl).execute(pool).await?;
109    let alter_pkg_semver = format!(
110        "ALTER TABLE {} ADD COLUMN IF NOT EXISTS semantic_version TEXT",
111        q_packages
112    );
113    let _ = sqlx::query(&alter_pkg_semver).execute(pool).await;
114    let q_packages_history = qualified_sys_table("_sys_packages_history");
115    let packages_history_ddl = format!(
116        "CREATE TABLE IF NOT EXISTS {} (\
117            id TEXT NOT NULL, \
118            payload {} NOT NULL, \
119            version BIGINT NOT NULL, \
120            created_at {} NOT NULL DEFAULT {}, \
121            semantic_version TEXT, \
122            PRIMARY KEY (id, version)\
123        )",
124        q_packages_history,
125        dialect.sys_json_type(),
126        dialect.sys_timestamp_type(),
127        dialect.now_fn(),
128    );
129    sqlx::query(&packages_history_ddl).execute(pool).await?;
130    let alter_pkg_hist_semver = format!(
131        "ALTER TABLE {} ADD COLUMN IF NOT EXISTS semantic_version TEXT",
132        q_packages_history
133    );
134    let _ = sqlx::query(&alter_pkg_hist_semver).execute(pool).await;
135    // Migrate to surrogate PK so multiple uninstalls of the same package (same id/version) don't violate uniqueness
136    let add_history_id = format!(
137        "ALTER TABLE {} ADD COLUMN IF NOT EXISTS history_id {}",
138        q_packages_history,
139        dialect.sys_bigserial_type()
140    );
141    let _ = sqlx::query(&add_history_id).execute(pool).await;
142    let drop_old_pk = format!(
143        "ALTER TABLE {} DROP CONSTRAINT IF EXISTS _sys_packages_history_pkey",
144        q_packages_history
145    );
146    let _ = sqlx::query(&drop_old_pk).execute(pool).await;
147    if dialect.name() == "postgres" {
148        let add_new_pk_cond = format!(
149            "DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = '_sys_packages_history_history_id_pkey') THEN \
150             ALTER TABLE {} ADD CONSTRAINT _sys_packages_history_history_id_pkey PRIMARY KEY (history_id); END IF; END $$",
151            q_packages_history
152        );
153        let _ = sqlx::query(&add_new_pk_cond).execute(pool).await;
154    }
155
156    let q_tenants = qualified_sys_table("_sys_tenants");
157    let tenants_ddl = format!(
158        "CREATE TABLE IF NOT EXISTS {} (\
159            id TEXT PRIMARY KEY, \
160            strategy TEXT NOT NULL, \
161            database_url TEXT, \
162            updated_at {} NOT NULL DEFAULT {}, \
163            comment TEXT\
164        )",
165        q_tenants,
166        dialect.sys_timestamp_type(),
167        dialect.now_fn(),
168    );
169    sqlx::query(&tenants_ddl).execute(pool).await?;
170    let drop_schema_name = format!(
171        "ALTER TABLE {} DROP COLUMN IF EXISTS schema_name",
172        q_tenants
173    );
174    let _ = sqlx::query(&drop_schema_name).execute(pool).await;
175
176    let q_kv_data = qualified_sys_table("_sys_kv_data");
177    let kv_data_ddl = format!(
178        "CREATE TABLE IF NOT EXISTS {} (\
179            tenant_id TEXT NOT NULL, \
180            package_id TEXT NOT NULL, \
181            namespace TEXT NOT NULL, \
182            key TEXT NOT NULL, \
183            value {} NOT NULL, \
184            updated_at {} NOT NULL DEFAULT {}, \
185            PRIMARY KEY (tenant_id, package_id, namespace, key)\
186        )",
187        q_kv_data,
188        dialect.sys_json_type(),
189        dialect.sys_timestamp_type(),
190        dialect.now_fn(),
191    );
192    sqlx::query(&kv_data_ddl).execute(pool).await?;
193    // Migrate existing tables that had no tenant_id: add column and new PK.
194    let alter_kv_tenant = format!(
195        "ALTER TABLE {} ADD COLUMN IF NOT EXISTS tenant_id TEXT NOT NULL DEFAULT '_shared'",
196        q_kv_data
197    );
198    let _ = sqlx::query(&alter_kv_tenant).execute(pool).await;
199    let drop_pk = format!(
200        "ALTER TABLE {} DROP CONSTRAINT IF EXISTS _sys_kv_data_pkey",
201        q_kv_data
202    );
203    let _ = sqlx::query(&drop_pk).execute(pool).await;
204    let add_pk = format!(
205        "ALTER TABLE {} ADD PRIMARY KEY (tenant_id, package_id, namespace, key)",
206        q_kv_data
207    );
208    let _ = sqlx::query(&add_pk).execute(pool).await;
209    // Ensure value column is JSON type (for existing tables that had value as text).
210    if dialect.name() == "postgres" {
211        let alter_value_json = format!(
212            "ALTER TABLE {} ALTER COLUMN value TYPE JSONB USING value::jsonb",
213            q_kv_data
214        );
215        let _ = sqlx::query(&alter_value_json).execute(pool).await;
216    }
217
218    ensure_migration_tables(pool, dialect).await?;
219
220    Ok(())
221}
222
223/// Create _sys_migration_plans and _sys_migration_audit tables if they don't exist.
224async fn ensure_migration_tables(pool: &Pool, dialect: &dyn Dialect) -> Result<(), AppError> {
225    let q_plans = qualified_sys_table("_sys_migration_plans");
226    let expires_at_col = match dialect.default_now_plus_hours(24) {
227        Some(expr) => format!(
228            "expires_at {} NOT NULL DEFAULT {}",
229            dialect.sys_timestamp_type(),
230            expr
231        ),
232        None => format!("expires_at {}", dialect.sys_timestamp_type()),
233    };
234    sqlx::query(&format!(
235        "CREATE TABLE IF NOT EXISTS {} (\
236            id TEXT PRIMARY KEY, \
237            package_id TEXT NOT NULL, \
238            tenant_id TEXT NOT NULL, \
239            from_version TEXT, \
240            to_version TEXT NOT NULL, \
241            plan_json {} NOT NULL, \
242            zip_bytes {} NOT NULL, \
243            status TEXT NOT NULL DEFAULT 'pending', \
244            created_at {} NOT NULL DEFAULT {}, \
245            {}, \
246            applied_at {}\
247        )",
248        q_plans,
249        dialect.sys_json_type(),
250        dialect.sys_bytes_type(),
251        dialect.sys_timestamp_type(),
252        dialect.now_fn(),
253        expires_at_col,
254        dialect.sys_timestamp_type(),
255    ))
256    .execute(pool)
257    .await?;
258
259    let q_audit = qualified_sys_table("_sys_migration_audit");
260    sqlx::query(&format!(
261        "CREATE TABLE IF NOT EXISTS {} (\
262            id {} PRIMARY KEY, \
263            migration_plan_id TEXT NOT NULL, \
264            package_id TEXT NOT NULL, \
265            tenant_id TEXT NOT NULL, \
266            from_version TEXT, \
267            to_version TEXT NOT NULL, \
268            step_number INT NOT NULL, \
269            operation TEXT NOT NULL, \
270            schema_name TEXT NOT NULL, \
271            table_name TEXT, \
272            object_name TEXT NOT NULL, \
273            object_type TEXT NOT NULL, \
274            description TEXT NOT NULL, \
275            ddl TEXT, \
276            safety TEXT NOT NULL, \
277            risk TEXT NOT NULL, \
278            status TEXT NOT NULL, \
279            error_message TEXT, \
280            executed_at {} NOT NULL DEFAULT {}\
281        )",
282        q_audit,
283        dialect.sys_bigserial_type(),
284        dialect.sys_timestamp_type(),
285        dialect.now_fn(),
286    ))
287    .execute(pool)
288    .await?;
289
290    Ok(())
291}
292
293/// Row returned from _sys_migration_plans.
294pub struct MigrationPlanRow {
295    pub id: String,
296    pub package_id: String,
297    pub tenant_id: String,
298    pub from_version: Option<String>,
299    pub to_version: String,
300    pub plan_json: serde_json::Value,
301    pub zip_bytes: Vec<u8>,
302    pub status: String,
303    pub created_at: DateTime<Utc>,
304    pub expires_at: DateTime<Utc>,
305    pub applied_at: Option<DateTime<Utc>>,
306}
307
308/// Persist a migration plan (zip bytes + serialized steps) for later confirmation.
309#[allow(clippy::too_many_arguments)]
310pub async fn save_migration_plan(
311    pool: &Pool,
312    id: &str,
313    package_id: &str,
314    tenant_id: &str,
315    from_version: Option<&str>,
316    to_version: &str,
317    plan_json: &serde_json::Value,
318    zip_bytes: &[u8],
319) -> Result<(), AppError> {
320    let q = qualified_sys_table("_sys_migration_plans");
321    sqlx::query(&format!(
322        "INSERT INTO {} (id, package_id, tenant_id, from_version, to_version, plan_json, zip_bytes, status, created_at, expires_at) \
323         VALUES ($1, $2, $3, $4, $5, $6, $7, 'pending', NOW(), NOW() + INTERVAL '24 hours')",
324        q
325    ))
326    .bind(id)
327    .bind(package_id)
328    .bind(tenant_id)
329    .bind(from_version)
330    .bind(to_version)
331    .bind(plan_json)
332    .bind(zip_bytes)
333    .execute(pool)
334    .await?;
335    Ok(())
336}
337
338/// Fetch a migration plan by id, or None if not found.
339pub async fn get_migration_plan(
340    pool: &Pool,
341    id: &str,
342) -> Result<Option<MigrationPlanRow>, AppError> {
343    let q = qualified_sys_table("_sys_migration_plans");
344    #[allow(clippy::type_complexity)]
345    let row: Option<(String, String, String, Option<String>, String, serde_json::Value, Vec<u8>, String, DateTime<Utc>, DateTime<Utc>, Option<DateTime<Utc>>)> =
346        sqlx::query_as(&format!(
347            "SELECT id, package_id, tenant_id, from_version, to_version, plan_json, zip_bytes, status, created_at, expires_at, applied_at FROM {} WHERE id = $1",
348            q
349        ))
350        .bind(id)
351        .fetch_optional(pool)
352        .await
353        .map_err(AppError::Db)?;
354    Ok(row.map(
355        |(
356            id,
357            package_id,
358            tenant_id,
359            from_version,
360            to_version,
361            plan_json,
362            zip_bytes,
363            status,
364            created_at,
365            expires_at,
366            applied_at,
367        )| {
368            MigrationPlanRow {
369                id,
370                package_id,
371                tenant_id,
372                from_version,
373                to_version,
374                plan_json,
375                zip_bytes,
376                status,
377                created_at,
378                expires_at,
379                applied_at,
380            }
381        },
382    ))
383}
384
385/// Atomically mark a migration plan as applied. Returns false if already applied or not found.
386pub async fn mark_migration_plan_applied(pool: &Pool, id: &str) -> Result<bool, AppError> {
387    let q = qualified_sys_table("_sys_migration_plans");
388    let result = sqlx::query(&format!(
389        "UPDATE {} SET status = 'applied', applied_at = NOW() WHERE id = $1 AND status = 'pending'",
390        q
391    ))
392    .bind(id)
393    .execute(pool)
394    .await?;
395    Ok(result.rows_affected() > 0)
396}
397
398/// Append one audit record for a migration step execution.
399#[allow(clippy::too_many_arguments)]
400pub async fn insert_migration_audit(
401    pool: &Pool,
402    migration_plan_id: &str,
403    package_id: &str,
404    tenant_id: &str,
405    from_version: Option<&str>,
406    to_version: &str,
407    step_number: i32,
408    operation: &str,
409    schema_name: &str,
410    table_name: Option<&str>,
411    object_name: &str,
412    object_type: &str,
413    description: &str,
414    ddl: Option<&str>,
415    safety: &str,
416    risk: &str,
417    status: &str,
418    error_message: Option<&str>,
419) -> Result<(), AppError> {
420    let q = qualified_sys_table("_sys_migration_audit");
421    sqlx::query(&format!(
422        "INSERT INTO {} (migration_plan_id, package_id, tenant_id, from_version, to_version, step_number, operation, schema_name, table_name, object_name, object_type, description, ddl, safety, risk, status, error_message, executed_at) \
423         VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,NOW())",
424        q
425    ))
426    .bind(migration_plan_id)
427    .bind(package_id)
428    .bind(tenant_id)
429    .bind(from_version)
430    .bind(to_version)
431    .bind(step_number)
432    .bind(operation)
433    .bind(schema_name)
434    .bind(table_name)
435    .bind(object_name)
436    .bind(object_type)
437    .bind(description)
438    .bind(ddl)
439    .bind(safety)
440    .bind(risk)
441    .bind(status)
442    .bind(error_message)
443    .execute(pool)
444    .await?;
445    Ok(())
446}
447
448/// Resolve the storage id for a config record. For api_entities, entity_id is used when id is absent.
449fn config_record_id(table: &str, rec: &serde_json::Value) -> Result<String, AppError> {
450    let id = rec.get("id").and_then(|v| v.as_str());
451    let entity_id = rec.get("entity_id").and_then(|v| v.as_str());
452    match (table, id, entity_id) {
453        ("_sys_api_entities", None, Some(eid)) => Ok(eid.to_string()),
454        (_, Some(id), _) => Ok(id.to_string()),
455        _ => Err(AppError::BadRequest(
456            "each config record must have an 'id' field (or 'entity_id' for api_entities)".into(),
457        )),
458    }
459}
460
461/// Deep-compare incoming records with current stored payloads (by id).
462/// Returns true if they are identical (same ids and same payload per id).
463fn config_payloads_unchanged(
464    table: &str,
465    current: &HashMap<String, serde_json::Value>,
466    records: &[serde_json::Value],
467) -> Result<bool, AppError> {
468    if current.len() != records.len() {
469        return Ok(false);
470    }
471    for rec in records {
472        let id = config_record_id(table, rec)?;
473        match current.get(&id) {
474            None => return Ok(false),
475            Some(existing) if existing != rec => return Ok(false),
476            Some(_) => {}
477        }
478    }
479    Ok(true)
480}
481
482/// Replace all rows for a config type for one package: copy current (for this package_id) to history, delete, insert with new version.
483/// If incoming payloads are deep-equal to current, no write is performed and no new version is created.
484/// Returns (count inserted, version). Call within transaction for atomicity.
485pub async fn replace_config_rows(
486    tx: &mut crate::db::pool::Connection,
487    table: &str,
488    package_id: &str,
489    records: &[serde_json::Value],
490) -> Result<(u64, i64), AppError> {
491    let q_table = qualified_sys_table(table);
492    let current_version: (Option<i64>,) = sqlx::query_as(&format!(
493        "SELECT COALESCE(MAX(version), 0) FROM {} WHERE package_id = $1",
494        q_table
495    ))
496    .bind(package_id)
497    .fetch_one(&mut *tx)
498    .await
499    .map_err(AppError::Db)?;
500    let current_version = current_version.0.unwrap_or(0);
501
502    let rows: Vec<(String, serde_json::Value)> = sqlx::query_as(&format!(
503        "SELECT id, payload FROM {} WHERE package_id = $1",
504        q_table
505    ))
506    .bind(package_id)
507    .fetch_all(&mut *tx)
508    .await
509    .map_err(AppError::Db)?;
510    let current: HashMap<String, serde_json::Value> = rows.into_iter().collect();
511
512    if config_payloads_unchanged(table, &current, records)? {
513        return Ok((0, current_version));
514    }
515
516    let history_table = qualified_sys_table(&format!("{}_history", table));
517    let new_version = current_version + 1;
518
519    sqlx::query(&format!(
520        "INSERT INTO {} (id, package_id, payload, version, created_at) SELECT id, package_id, payload, version, updated_at FROM {} WHERE package_id = $1",
521        history_table, q_table
522    ))
523    .bind(package_id)
524    .execute(&mut *tx)
525    .await?;
526
527    sqlx::query(&format!("DELETE FROM {} WHERE package_id = $1", q_table))
528        .bind(package_id)
529        .execute(&mut *tx)
530        .await?;
531
532    let mut count = 0u64;
533    for rec in records {
534        let id = config_record_id(table, rec)?;
535        sqlx::query(&format!(
536            "INSERT INTO {} (id, package_id, payload, updated_at, version) VALUES ($1, $2, $3, NOW(), $4)",
537            q_table
538        ))
539        .bind(&id)
540        .bind(package_id)
541        .bind(rec)
542        .bind(new_version)
543        .execute(&mut *tx)
544        .await?;
545        count += 1;
546    }
547    Ok((count, new_version))
548}
549
550const PACKAGES_TABLE: &str = "_sys_packages";
551const PACKAGES_HISTORY_TABLE: &str = "_sys_packages_history";
552
553pub struct PackageRow {
554    pub id: String,
555    pub payload: serde_json::Value,
556    pub version: i64,
557    pub updated_at: DateTime<Utc>,
558    pub semantic_version: Option<String>,
559}
560
561/// List all rows from _sys_packages ordered by id.
562pub async fn list_packages(pool: &Pool) -> Result<Vec<PackageRow>, AppError> {
563    let q = qualified_sys_table(PACKAGES_TABLE);
564    #[allow(clippy::type_complexity)]
565    let rows: Vec<(
566        String,
567        serde_json::Value,
568        i64,
569        DateTime<Utc>,
570        Option<String>,
571    )> = sqlx::query_as(&format!(
572        "SELECT id, payload, version, updated_at, semantic_version FROM {} ORDER BY id",
573        q
574    ))
575    .fetch_all(pool)
576    .await
577    .map_err(AppError::Db)?;
578    Ok(rows
579        .into_iter()
580        .map(
581            |(id, payload, version, updated_at, semantic_version)| PackageRow {
582                id,
583                payload,
584                version,
585                updated_at,
586                semantic_version,
587            },
588        )
589        .collect())
590}
591
592/// Fetch a single package row by id, or None if not installed.
593pub async fn get_package(pool: &Pool, id: &str) -> Result<Option<PackageRow>, AppError> {
594    let q = qualified_sys_table(PACKAGES_TABLE);
595    #[allow(clippy::type_complexity)]
596    let row: Option<(
597        String,
598        serde_json::Value,
599        i64,
600        DateTime<Utc>,
601        Option<String>,
602    )> = sqlx::query_as(&format!(
603        "SELECT id, payload, version, updated_at, semantic_version FROM {} WHERE id = $1",
604        q
605    ))
606    .bind(id)
607    .fetch_optional(pool)
608    .await
609    .map_err(AppError::Db)?;
610    Ok(row.map(
611        |(id, payload, version, updated_at, semantic_version)| PackageRow {
612            id,
613            payload,
614            version,
615            updated_at,
616            semantic_version,
617        },
618    ))
619}
620
621/// Count rows in a config table for a given package.
622pub async fn count_package_kind(
623    pool: &Pool,
624    kind: &str,
625    package_id: &str,
626) -> Result<i64, AppError> {
627    let table = sys_table_for_kind(kind)
628        .ok_or_else(|| AppError::BadRequest(format!("unknown config kind: {}", kind)))?;
629    let q = qualified_sys_table(table);
630    let (count,): (i64,) =
631        sqlx::query_as(&format!("SELECT COUNT(*) FROM {} WHERE package_id = $1", q))
632            .bind(package_id)
633            .fetch_one(pool)
634            .await
635            .map_err(AppError::Db)?;
636    Ok(count)
637}
638
639/// List all package ids from _sys_packages (what is installed in the DB). Used to generate OpenAPI spec from _sys_* config.
640pub async fn list_package_ids(pool: &Pool) -> Result<Vec<String>, AppError> {
641    let q = qualified_sys_table(PACKAGES_TABLE);
642    let rows: Vec<(String,)> = sqlx::query_as(&format!("SELECT id FROM {} ORDER BY id", q))
643        .fetch_all(pool)
644        .await
645        .map_err(AppError::Db)?;
646    Ok(rows.into_iter().map(|(id,)| id).collect())
647}
648
649/// Upsert one package row by id: copy current to history if exists, then insert or replace with new payload.
650/// Semantic version is read from payload.version (e.g. manifest "version": "1.0.0"). Version is only incremented when semantic_version changes.
651pub async fn upsert_package(
652    pool: &Pool,
653    id: &str,
654    payload: &serde_json::Value,
655) -> Result<i64, AppError> {
656    let semantic_version = payload
657        .get("version")
658        .and_then(serde_json::Value::as_str)
659        .map(String::from)
660        .unwrap_or_default();
661
662    let q_packages = qualified_sys_table(PACKAGES_TABLE);
663    let q_packages_history = qualified_sys_table(PACKAGES_HISTORY_TABLE);
664    let mut tx = pool.begin().await?;
665    let current: Option<(serde_json::Value, i64, Option<String>)> = sqlx::query_as(&format!(
666        "SELECT payload, version, semantic_version FROM {} WHERE id = $1",
667        q_packages
668    ))
669    .bind(id)
670    .fetch_optional(&mut *tx)
671    .await
672    .map_err(AppError::Db)?;
673
674    let new_version = match &current {
675        Some((_, v, Some(ref old_semver))) if *old_semver == semantic_version => *v,
676        Some((_, v, _)) => v + 1,
677        None => 1,
678    };
679
680    if let Some((old_payload, old_version, old_semver)) = current {
681        sqlx::query(&format!(
682            "INSERT INTO {} (id, payload, version, created_at, semantic_version) VALUES ($1, $2, $3, NOW(), $4)",
683            q_packages_history
684        ))
685        .bind(id)
686        .bind(old_payload)
687        .bind(old_version)
688        .bind(old_semver)
689        .execute(&mut *tx)
690        .await?;
691    }
692
693    sqlx::query(&format!("DELETE FROM {} WHERE id = $1", q_packages))
694        .bind(id)
695        .execute(&mut *tx)
696        .await?;
697
698    let semver_param: Option<&str> = if semantic_version.is_empty() {
699        None
700    } else {
701        Some(semantic_version.as_str())
702    };
703    sqlx::query(&format!(
704        "INSERT INTO {} (id, payload, updated_at, version, semantic_version) VALUES ($1, $2, NOW(), $3, $4)",
705        q_packages
706    ))
707    .bind(id)
708    .bind(payload)
709    .bind(new_version)
710    .bind(semver_param)
711    .execute(&mut *tx)
712    .await?;
713
714    tx.commit().await?;
715    Ok(new_version)
716}
717
718/// Delete all config rows and KV data for a package, then remove the package record.
719/// Copies the current package row to _sys_packages_history before delete. Call after reverting migrations on the tenant DB.
720pub async fn delete_package_and_config(pool: &Pool, package_id: &str) -> Result<(), AppError> {
721    let q_packages = qualified_sys_table(PACKAGES_TABLE);
722    let q_packages_history = qualified_sys_table(PACKAGES_HISTORY_TABLE);
723    let q_kv_data = qualified_sys_table("_sys_kv_data");
724
725    let mut tx = pool.begin().await?;
726
727    // Copy current package row to history (if exists)
728    let current: Option<(serde_json::Value, i64, Option<String>)> = sqlx::query_as(&format!(
729        "SELECT payload, version, semantic_version FROM {} WHERE id = $1",
730        q_packages
731    ))
732    .bind(package_id)
733    .fetch_optional(&mut *tx)
734    .await
735    .map_err(AppError::Db)?;
736
737    if let Some((payload, version, semantic_version)) = current {
738        sqlx::query(&format!(
739            "INSERT INTO {} (id, payload, version, created_at, semantic_version) VALUES ($1, $2, $3, NOW(), $4)",
740            q_packages_history
741        ))
742        .bind(package_id)
743        .bind(payload)
744        .bind(version)
745        .bind(semantic_version)
746        .execute(&mut *tx)
747        .await?;
748    }
749
750    // Delete from each config table and its history (by package_id)
751    for table in CONFIG_TABLES {
752        let q_table = qualified_sys_table(table);
753        sqlx::query(&format!("DELETE FROM {} WHERE package_id = $1", q_table))
754            .bind(package_id)
755            .execute(&mut *tx)
756            .await?;
757        let history_table = qualified_sys_table(&format!("{}_history", table));
758        sqlx::query(&format!(
759            "DELETE FROM {} WHERE package_id = $1",
760            history_table
761        ))
762        .bind(package_id)
763        .execute(&mut *tx)
764        .await?;
765    }
766
767    // Delete KV data for this package
768    sqlx::query(&format!("DELETE FROM {} WHERE package_id = $1", q_kv_data))
769        .bind(package_id)
770        .execute(&mut *tx)
771        .await?;
772
773    // Delete package row
774    sqlx::query(&format!("DELETE FROM {} WHERE id = $1", q_packages))
775        .bind(package_id)
776        .execute(&mut *tx)
777        .await?;
778
779    tx.commit().await?;
780    Ok(())
781}
782
783/// Create a connection pool for the compiled-in dialect.
784///
785/// This is the recommended way to build a pool in consumer binaries — it uses the correct
786/// pool type for whichever dialect feature is active without requiring `#[cfg(feature = ...)]`
787/// in caller code.
788pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<Pool, AppError> {
789    #[cfg(feature = "postgres")]
790    return sqlx::postgres::PgPoolOptions::new()
791        .max_connections(max_connections)
792        .connect(database_url)
793        .await
794        .map_err(AppError::Db);
795
796    #[cfg(feature = "mysql")]
797    return sqlx::mysql::MySqlPoolOptions::new()
798        .max_connections(max_connections)
799        .connect(database_url)
800        .await
801        .map_err(AppError::Db);
802
803    #[cfg(feature = "sqlite")]
804    return sqlx::sqlite::SqlitePoolOptions::new()
805        .max_connections(max_connections)
806        .connect(database_url)
807        .await
808        .map_err(AppError::Db);
809
810    #[cfg(not(any(feature = "postgres", feature = "mysql", feature = "sqlite")))]
811    Err(AppError::BadRequest(
812        "No database dialect feature enabled. Enable one of: postgres, mysql, sqlite.".into(),
813    ))
814}
815
816/// Ensure the database in `database_url` exists; create it if not.
817///
818/// - **Postgres**: connects to the admin `postgres` database and runs `CREATE DATABASE`.
819/// - **SQLite**: the database file is created automatically on first connect — this is a no-op.
820/// - **MySQL**: database auto-creation is not supported here; create the database manually or
821///   rely on connection-string options like `createDatabaseIfNotExist=true`.
822pub async fn ensure_database_exists(database_url: &str) -> Result<(), AppError> {
823    #[cfg(feature = "postgres")]
824    {
825        use sqlx::ConnectOptions as _;
826        use std::str::FromStr;
827        let (admin_url, db_name) = parse_db_name_from_url(database_url)?;
828        if db_name.is_empty() || db_name == "postgres" {
829            return Ok(());
830        }
831        let opts = sqlx::postgres::PgConnectOptions::from_str(&admin_url)
832            .map_err(|e| AppError::BadRequest(format!("invalid DATABASE_URL: {}", e)))?;
833        let mut conn: sqlx::PgConnection = opts.connect().await.map_err(AppError::Db)?;
834        let exists: (bool,) =
835            sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)")
836                .bind(&db_name)
837                .fetch_one(&mut conn)
838                .await
839                .map_err(AppError::Db)?;
840        if !exists.0 {
841            let quoted = quote_ident(&db_name);
842            sqlx::query(&format!("CREATE DATABASE {}", quoted))
843                .execute(&mut conn)
844                .await
845                .map_err(AppError::Db)?;
846        }
847    }
848    #[cfg(not(feature = "postgres"))]
849    let _ = database_url;
850    Ok(())
851}
852
853#[cfg(feature = "postgres")]
854fn parse_db_name_from_url(url: &str) -> Result<(String, String), AppError> {
855    let path_start = url
856        .rfind('/')
857        .ok_or_else(|| AppError::BadRequest("DATABASE_URL: no path".into()))?
858        + 1;
859    let path_and_query = url.get(path_start..).unwrap_or("");
860    let db_name = path_and_query.split('?').next().unwrap_or("").trim();
861    let base = url.get(..path_start).unwrap_or(url);
862    let admin_url = format!("{}postgres", base);
863    Ok((admin_url, db_name.to_string()))
864}
865
866#[cfg(feature = "postgres")]
867fn quote_ident(name: &str) -> String {
868    format!("\"{}\"", name.replace('\\', "\\\\").replace('"', "\\\""))
869}
870
871pub fn sys_table_for_kind(kind: &str) -> Option<&'static str> {
872    match kind {
873        "schemas" => Some("_sys_schemas"),
874        "enums" => Some("_sys_enums"),
875        "tables" => Some("_sys_tables"),
876        "columns" => Some("_sys_columns"),
877        "indexes" => Some("_sys_indexes"),
878        "relationships" => Some("_sys_relationships"),
879        "api_entities" => Some("_sys_api_entities"),
880        "kv_stores" => Some("_sys_kv_stores"),
881        _ => None,
882    }
883}