Skip to main content

architect_sdk/handlers/
package.rs

1//! Package install/uninstall handlers. Install: zip upload, extract manifest + configs, apply configs, store manifest, reload model. Uninstall: revert migrations, delete _sys_* rows and package record. X-Tenant-ID is required.
2//! Config is stored in the architect DB (DATABASE_URL). Schemas/tables are created in ALL registered tenant databases (broadcast). Bootstrap endpoint handles new Database-strategy tenants added after install.
3
4use crate::config::{load_from_pool, resolve, FullConfig};
5use crate::db::pool::Pool;
6use crate::db::{parse_canonical, CanonicalType, Dialect};
7use crate::error::AppError;
8use crate::extractors::tenant::TenantId;
9use crate::handlers::config::{reload_model, replace_config};
10use crate::handlers::entity::{get_or_create_tenant_pool, resolve_tenant_context};
11use crate::migration::{
12    apply_migrations, compute_migration_plan, execute_migration_plan, revert_migrations,
13    MigrationPlan,
14};
15use crate::state::AppState;
16use crate::store::{
17    count_package_kind, delete_package_and_config, get_migration_plan, get_package,
18    list_package_ids, list_packages, mark_migration_plan_applied, save_migration_plan,
19    upsert_package,
20};
21use crate::tenant::TenantStrategy;
22use axum::extract::{Multipart, Path, State};
23use axum::Json;
24use serde::Deserialize;
25use serde_json::{json, Value};
26use std::collections::HashSet;
27use std::io::Cursor;
28use uuid::Uuid;
29use zip::ZipArchive;
30
31/// Return an error if the config contains `asset` or `asset[]` columns and no storage provider
32/// is configured. Called during package install and column config ingestion so the problem is
33/// caught before any DDL or `_sys_*` writes occur.
34pub(crate) fn reject_asset_columns_without_storage(
35    config: &FullConfig,
36    storage: &Option<std::sync::Arc<dyn crate::storage::StorageProvider>>,
37) -> Result<(), AppError> {
38    if storage.is_some() {
39        return Ok(());
40    }
41    let asset_cols: Vec<String> = config
42        .columns
43        .iter()
44        .filter(|c| {
45            matches!(
46                parse_canonical(&c.type_),
47                CanonicalType::Asset | CanonicalType::AssetArray
48            )
49        })
50        .map(|c| c.name.clone())
51        .collect();
52    if !asset_cols.is_empty() {
53        return Err(AppError::BadRequest(format!(
54            "Package defines asset column(s) [{}] but no storage provider is configured. \
55             Set STORAGE_PROVIDER (s3 | azure | gcs | rustfs) before installing packages \
56             that use asset or asset[] columns.",
57            asset_cols.join(", ")
58        )));
59    }
60    Ok(())
61}
62
63/// Per-tenant DDL execution result, included in the install/upgrade response.
64#[derive(serde::Serialize)]
65struct TenantMigrationOutcome {
66    /// Tenant ID, or "central_rls_db" for the shared architect DB used by RLS tenants without a dedicated URL.
67    target: String,
68    /// "database" or "rls"
69    strategy: String,
70    /// "applied" | "applied_with_warnings" | "failed"
71    status: String,
72    warnings: Vec<String>,
73    #[serde(skip_serializing_if = "Option::is_none")]
74    error: Option<String>,
75}
76
77/// All config kinds that may appear in a package zip (excluding schemas, which are derived from manifest).
78const CONFIG_KINDS: &[&str] = &[
79    "schemas",
80    "enums",
81    "tables",
82    "columns",
83    "indexes",
84    "relationships",
85    "api_entities",
86    "kv_stores",
87];
88
89/// Dependencies for each config kind: these must be applied before this kind.
90/// Order: most atomic and independent first (schemas, then enums/tables, then columns, etc.).
91fn dependencies(kind: &str) -> &'static [&'static str] {
92    match kind {
93        "schemas" => &[],
94        "enums" => &["schemas"],
95        "tables" => &["schemas"],
96        "columns" => &["tables"],
97        "indexes" => &["schemas", "tables"],
98        "relationships" => &["schemas", "tables", "columns"],
99        "api_entities" => &["tables"],
100        "kv_stores" => &[],
101        _ => &[],
102    }
103}
104
105/// Topological sort of config kinds so that dependencies are applied first.
106/// Returns order to apply: most atomic and independent first.
107fn config_apply_order() -> Vec<&'static str> {
108    let mut order = Vec::with_capacity(CONFIG_KINDS.len());
109    let mut done: HashSet<&'static str> = HashSet::new();
110    while order.len() < CONFIG_KINDS.len() {
111        let mut made_progress = false;
112        for &kind in CONFIG_KINDS {
113            if done.contains(kind) {
114                continue;
115            }
116            let deps = dependencies(kind);
117            if deps.iter().all(|d| done.contains(d)) {
118                order.push(kind);
119                done.insert(kind);
120                made_progress = true;
121            }
122        }
123        if !made_progress {
124            break;
125        }
126    }
127    order
128}
129
130/// Schema id used when manifest provides the schema name (no separate schemas.json).
131const DEFAULT_SCHEMA_ID: &str = "default";
132
133/// Build a `FullConfig` from the in-memory, per-kind config bodies read from the zip — the same
134/// shape `load_from_pool` produces, but without a round trip through the architect DB. Lets the
135/// installer validate and apply DDL before persisting anything.
136fn assemble_config(bodies: &[(&'static str, Vec<Value>)]) -> Result<FullConfig, AppError> {
137    fn de<T: serde::de::DeserializeOwned>(
138        bodies: &[(&'static str, Vec<Value>)],
139        kind: &str,
140    ) -> Result<Vec<T>, AppError> {
141        let arr = bodies
142            .iter()
143            .find(|(k, _)| *k == kind)
144            .map(|(_, v)| v.clone())
145            .unwrap_or_default();
146        serde_json::from_value(Value::Array(arr))
147            .map_err(|e| AppError::BadRequest(format!("invalid {}: {}", kind, e)))
148    }
149    Ok(FullConfig {
150        schemas: de(bodies, "schemas")?,
151        enums: de(bodies, "enums")?,
152        tables: de(bodies, "tables")?,
153        columns: de(bodies, "columns")?,
154        indexes: de(bodies, "indexes")?,
155        relationships: de(bodies, "relationships")?,
156        api_entities: de(bodies, "api_entities")?,
157        kv_stores: de(bodies, "kv_stores")?,
158    })
159}
160
161fn inject_schema_id(body: &mut [Value], schema_id: &str) {
162    for rec in body.iter_mut() {
163        if let Some(obj) = rec.as_object_mut() {
164            if !obj.contains_key("schema_id") {
165                obj.insert("schema_id".into(), Value::String(schema_id.to_string()));
166            }
167        }
168    }
169}
170
171fn inject_relationship_schema_ids(body: &mut [Value], schema_id: &str) {
172    for rec in body.iter_mut() {
173        if let Some(obj) = rec.as_object_mut() {
174            // Always default from_schema_id to this package's schema.
175            if !obj.contains_key("from_schema_id") {
176                obj.insert(
177                    "from_schema_id".into(),
178                    Value::String(schema_id.to_string()),
179                );
180            }
181            // Only default to_schema_id when this is NOT a cross-package relationship.
182            // Cross-package rels carry their own to_schema_id resolved at migration time.
183            let is_cross_package = obj
184                .get("to_package_id")
185                .and_then(Value::as_str)
186                .map(|s| !s.is_empty())
187                .unwrap_or(false);
188            if !is_cross_package && !obj.contains_key("to_schema_id") {
189                obj.insert("to_schema_id".into(), Value::String(schema_id.to_string()));
190            }
191        }
192    }
193}
194
195fn read_zip_entry_to_string<R: std::io::Read + std::io::Seek>(
196    archive: &mut ZipArchive<R>,
197    name: &str,
198) -> Result<String, AppError> {
199    let mut f = archive
200        .by_name(name)
201        .map_err(|e| AppError::BadRequest(e.to_string()))?;
202    let mut s = String::new();
203    std::io::Read::read_to_string(&mut f, &mut s)
204        .map_err(|e| AppError::BadRequest(e.to_string()))?;
205    Ok(s)
206}
207
208/// Read all records for a config kind from a zip archive.
209/// Tries `{kind}.json` first (flat file), then scans `{kind}/*.json` (subdirectory),
210/// merging all arrays in alphabetical order. Returns an empty vec if neither exists.
211fn read_kind_from_zip<R: std::io::Read + std::io::Seek>(
212    archive: &mut ZipArchive<R>,
213    kind: &str,
214) -> Result<Vec<Value>, AppError> {
215    let flat = format!("{}.json", kind);
216    if let Ok(content) = read_zip_entry_to_string(archive, &flat) {
217        return serde_json::from_str(&content)
218            .map_err(|e| AppError::BadRequest(format!("invalid {}: {}", flat, e)));
219    }
220
221    let prefix = format!("{}/", kind);
222    let mut names: Vec<String> = archive
223        .file_names()
224        .filter(|n| n.starts_with(&prefix) && n.ends_with(".json"))
225        .map(String::from)
226        .collect();
227    names.sort();
228
229    let mut merged: Vec<Value> = Vec::new();
230    for name in names {
231        let content = read_zip_entry_to_string(archive, &name)?;
232        let mut items: Vec<Value> = serde_json::from_str(&content)
233            .map_err(|e| AppError::BadRequest(format!("invalid {}: {}", name, e)))?;
234        merged.append(&mut items);
235    }
236    Ok(merged)
237}
238
239// ─── DDL broadcast helpers ───────────────────────────────────────────────────
240
241/// Apply DDL for one target pool — either the full `apply_migrations` (fresh install) or
242/// `execute_migration_plan` (upgrade). Returns a single `TenantMigrationOutcome`.
243#[allow(clippy::too_many_arguments)]
244async fn apply_ddl_to_pool(
245    migration_pool: &Pool,
246    config_pool: &Pool,
247    config: &FullConfig,
248    plan: Option<&MigrationPlan>,
249    package_id: &str,
250    target: &str,
251    strategy: &str,
252    from_version: Option<&str>,
253    to_version: &str,
254    rls_tenant_column: Option<&str>,
255    dialect: &dyn Dialect,
256    cross_package_configs: &std::collections::HashMap<String, FullConfig>,
257) -> TenantMigrationOutcome {
258    match plan {
259        // Upgrade path: execute the pre-computed diff plan.
260        Some(p) => {
261            let migration_id = Uuid::new_v4().to_string();
262            match execute_migration_plan(
263                migration_pool,
264                config_pool,
265                p,
266                &migration_id,
267                package_id,
268                target,
269                from_version,
270                to_version,
271            )
272            .await
273            {
274                Ok(result) => TenantMigrationOutcome {
275                    target: target.to_string(),
276                    strategy: strategy.to_string(),
277                    status: if result.warned > 0 {
278                        "applied_with_warnings".to_string()
279                    } else {
280                        "applied".to_string()
281                    },
282                    warnings: result.warnings,
283                    error: None,
284                },
285                Err(e) => {
286                    tracing::warn!(target, strategy, error = %e, "DDL broadcast failed (upgrade)");
287                    TenantMigrationOutcome {
288                        target: target.to_string(),
289                        strategy: strategy.to_string(),
290                        status: "failed".to_string(),
291                        warnings: vec![],
292                        error: Some(e.to_string()),
293                    }
294                }
295            }
296        }
297        // Fresh install path: apply the full schema.
298        None => {
299            match apply_migrations(
300                migration_pool,
301                config,
302                None,
303                rls_tenant_column,
304                dialect,
305                cross_package_configs,
306            )
307            .await
308            {
309                Ok(()) => TenantMigrationOutcome {
310                    target: target.to_string(),
311                    strategy: strategy.to_string(),
312                    status: "applied".to_string(),
313                    warnings: vec![],
314                    error: None,
315                },
316                Err(e) => {
317                    tracing::warn!(target, strategy, error = %e, "DDL broadcast failed (fresh install)");
318                    TenantMigrationOutcome {
319                        target: target.to_string(),
320                        strategy: strategy.to_string(),
321                        status: "failed".to_string(),
322                        warnings: vec![],
323                        error: Some(e.to_string()),
324                    }
325                }
326            }
327        }
328    }
329}
330
331/// Apply DDL for a package to every registered tenant database.
332///
333/// Targets (in order):
334/// 1. Central architect DB — once, if any RLS tenants share it (no dedicated database_url).
335/// 2. RLS tenants with a dedicated database_url — per unique URL, with RLS column enabled.
336/// 3. Database-strategy tenants — per tenant, without RLS column.
337///
338/// Failures on individual targets are collected as outcomes and do NOT abort the broadcast;
339/// the `_sys_*` config has already been committed and must not be rolled back here.
340async fn broadcast_ddl(
341    state: &AppState,
342    config_pool: &Pool,
343    config: &FullConfig,
344    old_config: Option<&FullConfig>,
345    package_id: &str,
346    from_version: Option<&str>,
347    to_version: &str,
348) -> Vec<TenantMigrationOutcome> {
349    let mut outcomes = Vec::new();
350
351    // Load all other installed packages so cross-package FK resolution works.
352    let cross_package_configs: std::collections::HashMap<String, FullConfig> = {
353        match list_package_ids(config_pool).await {
354            Ok(ids) => {
355                let mut map = std::collections::HashMap::new();
356                for pid in ids {
357                    if pid == package_id {
358                        continue;
359                    }
360                    match load_from_pool(config_pool, &pid).await {
361                        Ok(cfg) => {
362                            map.insert(pid, cfg);
363                        }
364                        Err(e) => {
365                            tracing::warn!(pkg = %pid, error = %e, "could not load cross-package config for FK resolution")
366                        }
367                    }
368                }
369                map
370            }
371            Err(e) => {
372                tracing::warn!(error = %e, "could not list packages for cross-package FK resolution");
373                std::collections::HashMap::new()
374            }
375        }
376    };
377
378    // Compute the migration plan once for upgrades (pure function, no DB calls).
379    // `_rls_tenant_column` is intentionally ignored by compute_migration_plan, so
380    // the same plan is valid for both RLS and Database targets.
381    let plan: Option<MigrationPlan> = match old_config {
382        Some(old) => {
383            match compute_migration_plan(
384                old,
385                config,
386                None,
387                None,
388                state.dialect.as_ref(),
389                &cross_package_configs,
390            ) {
391                Ok(p) => Some(p),
392                Err(e) => {
393                    tracing::error!(error = %e, "could not compute migration plan for broadcast");
394                    return vec![TenantMigrationOutcome {
395                        target: "all".to_string(),
396                        strategy: "n/a".to_string(),
397                        status: "failed".to_string(),
398                        warnings: vec![],
399                        error: Some(format!("migration plan error: {}", e)),
400                    }];
401                }
402            }
403        }
404        None => None,
405    };
406
407    // ── 1. Central DB — covers all RLS tenants without a dedicated database_url ──
408    if state.tenant_registry.has_shared_rls_tenants() {
409        let outcome = apply_ddl_to_pool(
410            &state.pool,
411            config_pool,
412            config,
413            plan.as_ref(),
414            package_id,
415            "central_rls_db",
416            "rls",
417            from_version,
418            to_version,
419            Some(crate::migration::RLS_TENANT_COLUMN),
420            state.dialect.as_ref(),
421            &cross_package_configs,
422        )
423        .await;
424        outcomes.push(outcome);
425    }
426
427    // ── 2. RLS tenants with their own dedicated DB ──
428    // Deduplicate by URL — multiple RLS tenants may share the same DB.
429    let mut seen_rls_urls: HashSet<String> = HashSet::new();
430    for (tid, db_url) in state.tenant_registry.rls_dedicated_db_targets() {
431        if !seen_rls_urls.insert(db_url.clone()) {
432            continue; // already migrated this DB
433        }
434        let pool = match get_or_create_tenant_pool(state, &tid, &db_url).await {
435            Ok(p) => p,
436            Err(e) => {
437                tracing::warn!(target = %tid, error = %e, "could not connect to dedicated RLS tenant DB");
438                outcomes.push(TenantMigrationOutcome {
439                    target: tid.clone(),
440                    strategy: "rls".to_string(),
441                    status: "failed".to_string(),
442                    warnings: vec![],
443                    error: Some(format!("connection failed: {}", e)),
444                });
445                continue;
446            }
447        };
448        let outcome = apply_ddl_to_pool(
449            &pool,
450            config_pool,
451            config,
452            plan.as_ref(),
453            package_id,
454            &tid,
455            "rls",
456            from_version,
457            to_version,
458            Some(crate::migration::RLS_TENANT_COLUMN),
459            state.dialect.as_ref(),
460            &cross_package_configs,
461        )
462        .await;
463        outcomes.push(outcome);
464    }
465
466    // ── 3. Database-strategy tenants (each has their own DB, no RLS column) ──
467    for (tid, db_url) in state.tenant_registry.database_tenant_targets() {
468        let pool = match get_or_create_tenant_pool(state, &tid, &db_url).await {
469            Ok(p) => p,
470            Err(e) => {
471                tracing::warn!(target = %tid, error = %e, "could not connect to Database tenant DB");
472                outcomes.push(TenantMigrationOutcome {
473                    target: tid.clone(),
474                    strategy: "database".to_string(),
475                    status: "failed".to_string(),
476                    warnings: vec![],
477                    error: Some(format!("connection failed: {}", e)),
478                });
479                continue;
480            }
481        };
482        let outcome = apply_ddl_to_pool(
483            &pool,
484            config_pool,
485            config,
486            plan.as_ref(),
487            package_id,
488            &tid,
489            "database",
490            from_version,
491            to_version,
492            None,
493            state.dialect.as_ref(),
494            &cross_package_configs,
495        )
496        .await;
497        outcomes.push(outcome);
498    }
499
500    outcomes
501}
502
503// ─────────────────────────────────────────────────────────────────────────────
504
505/// POST /api/v1/config/package: multipart form with file field containing a zip (manifest.json + config JSONs). X-Tenant-ID required.
506pub async fn install_package(
507    TenantId(tenant_id_opt): TenantId,
508    State(state): State<AppState>,
509    mut multipart: Multipart,
510) -> Result<impl axum::response::IntoResponse, AppError> {
511    let tenant_id = tenant_id_opt
512        .as_deref()
513        .filter(|s| !s.is_empty())
514        .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
515    state
516        .tenant_registry
517        .get(tenant_id)
518        .ok_or_else(|| AppError::NotFound(format!("tenant not found: {}", tenant_id)))?;
519
520    let mut zip_bytes: Option<Vec<u8>> = None;
521    while let Ok(Some(field)) = multipart.next_field().await {
522        let name = field.name().unwrap_or("").to_string();
523        if name == "file" || name == "package" {
524            let data = field
525                .bytes()
526                .await
527                .map_err(|e| AppError::BadRequest(e.to_string()))?;
528            zip_bytes = Some(data.to_vec());
529            break;
530        }
531    }
532    let zip_bytes = zip_bytes.ok_or_else(|| {
533        AppError::BadRequest("missing 'file' or 'package' field in multipart body".into())
534    })?;
535
536    let mut archive = ZipArchive::new(Cursor::new(zip_bytes))
537        .map_err(|e| AppError::BadRequest(format!("invalid zip: {}", e)))?;
538
539    let manifest_name = archive
540        .file_names()
541        .find(|n| *n == "manifest.json" || n.ends_with("/manifest.json"))
542        .map(String::from)
543        .ok_or_else(|| AppError::BadRequest("zip must contain manifest.json at root".into()))?;
544
545    let manifest_value: Value = {
546        let mut file = archive
547            .by_name(&manifest_name)
548            .map_err(|e| AppError::BadRequest(e.to_string()))?;
549        let mut buf = String::new();
550        std::io::Read::read_to_string(&mut file, &mut buf)
551            .map_err(|e| AppError::BadRequest(e.to_string()))?;
552        serde_json::from_str(&buf)
553            .map_err(|e| AppError::BadRequest(format!("invalid manifest.json: {}", e)))?
554    };
555
556    let manifest_obj = manifest_value
557        .as_object()
558        .ok_or_else(|| AppError::BadRequest("manifest.json must be an object".into()))?;
559    let id = manifest_obj
560        .get("id")
561        .and_then(Value::as_str)
562        .ok_or_else(|| AppError::BadRequest("manifest must have 'id' (string)".into()))?;
563    let _name = manifest_obj
564        .get("name")
565        .and_then(Value::as_str)
566        .ok_or_else(|| AppError::BadRequest("manifest must have 'name' (string)".into()))?;
567    let _version = manifest_obj
568        .get("version")
569        .and_then(Value::as_str)
570        .ok_or_else(|| AppError::BadRequest("manifest must have 'version' (string)".into()))?;
571    let schema_name = manifest_obj
572        .get("schema")
573        .and_then(Value::as_str)
574        .ok_or_else(|| {
575            AppError::BadRequest(
576                "manifest must have 'schema' (string) - the schema name for all configs".into(),
577            )
578        })?;
579
580    let ctx = resolve_tenant_context(&state, Some(tenant_id), Some(id)).await?;
581    let config_pool = ctx.config_pool();
582    // migration_pool and schema_override are no longer used directly — broadcast_ddl handles all targets.
583    let package_cache_key = ctx.package_cache_key().to_string();
584
585    // Check that all declared dependency packages are already installed.
586    if let Some(deps) = manifest_obj.get("dependencies").and_then(Value::as_array) {
587        let installed_ids: std::collections::HashSet<String> =
588            list_package_ids(config_pool).await?.into_iter().collect();
589        let missing: Vec<&str> = deps
590            .iter()
591            .filter_map(Value::as_str)
592            .filter(|dep| !installed_ids.contains(*dep))
593            .collect();
594        if !missing.is_empty() {
595            return Err(AppError::BadRequest(format!(
596                "package '{}' depends on [{}] which are not installed; install them first",
597                id,
598                missing.join(", ")
599            )));
600        }
601    }
602
603    let incoming_version = manifest_obj
604        .get("version")
605        .and_then(Value::as_str)
606        .unwrap_or("");
607
608    // For upgrades: load old config BEFORE replacing so we can diff
609    let is_upgrade = if let Some(existing) = get_package(config_pool, id).await? {
610        if existing.semantic_version.as_deref() == Some(incoming_version) {
611            return Err(AppError::Conflict(format!(
612                "package '{}' version '{}' is already installed",
613                id, incoming_version
614            )));
615        }
616        true
617    } else {
618        false
619    };
620
621    let old_config = if is_upgrade {
622        Some(
623            load_from_pool(config_pool, id)
624                .await
625                .map_err(AppError::Config)?,
626        )
627    } else {
628        None
629    };
630
631    let schemas_body: Vec<Value> = vec![serde_json::json!({
632        "id": DEFAULT_SCHEMA_ID,
633        "name": schema_name
634    })];
635
636    // Read and normalize every config kind from the zip into memory FIRST. Nothing is written to
637    // the architect DB or any tenant DB until the whole package validates and its DDL succeeds, so
638    // a bad package can never leave behind orphan `_sys_*` rows or half-created tables.
639    let apply_order = config_apply_order();
640    let mut bodies: Vec<(&'static str, Vec<Value>)> = Vec::with_capacity(apply_order.len());
641    for kind in &apply_order {
642        let body: Vec<Value> = if *kind == "schemas" {
643            schemas_body.clone()
644        } else {
645            let mut body = read_kind_from_zip(&mut archive, kind)?;
646            match *kind {
647                "enums" | "tables" | "indexes" => inject_schema_id(&mut body, DEFAULT_SCHEMA_ID),
648                "relationships" => inject_relationship_schema_ids(&mut body, DEFAULT_SCHEMA_ID),
649                _ => {}
650            }
651            body
652        };
653        bodies.push((*kind, body));
654    }
655
656    // Assemble and fully validate the config in memory (schema/type/reference checks, including
657    // enum schema-prefix typos) before touching any database.
658    let config = assemble_config(&bodies)?;
659    let new_model = resolve(&config)
660        .map_err(AppError::Config)?
661        .with_package_id(id);
662
663    // Reject the install if the package contains asset columns but no storage is configured.
664    reject_asset_columns_without_storage(&config, &state.storage)?;
665
666    // Broadcast DDL to every registered tenant database.
667    // For a fresh install old_config is None (apply_migrations). For an upgrade it is Some (compute_migration_plan + execute).
668    let tenant_outcomes = broadcast_ddl(
669        &state,
670        config_pool,
671        &config,
672        old_config.as_ref(),
673        id,
674        old_config
675            .as_ref()
676            .and_then(|_| manifest_value.get("version").and_then(Value::as_str)),
677        incoming_version,
678    )
679    .await;
680
681    // For a fresh install, abort if schema creation failed on any tenant — do NOT persist config,
682    // so the package is never recorded as installed when its tables do not exist. (Upgrades keep
683    // the prior best-effort behavior: the old version is already live and recorded.)
684    if !is_upgrade {
685        let failures: Vec<String> = tenant_outcomes
686            .iter()
687            .filter(|o| o.status == "failed")
688            .map(|o| format!("{}: {}", o.target, o.error.clone().unwrap_or_default()))
689            .collect();
690        if !failures.is_empty() {
691            return Err(AppError::BadRequest(format!(
692                "package '{}' installation failed during schema creation; no configuration was \
693                 saved. Errors: {}",
694                id,
695                failures.join("; ")
696            )));
697        }
698    }
699
700    // Schema creation succeeded — now persist the config to the architect DB.
701    let mut applied = Vec::with_capacity(bodies.len());
702    for (kind, body) in bodies {
703        replace_config(config_pool, kind, body, false, id, None).await?;
704        applied.push(kind.to_string());
705    }
706    upsert_package(config_pool, id, &manifest_value).await?;
707
708    let migration_warnings: Vec<String> = tenant_outcomes
709        .iter()
710        .flat_map(|o| o.warnings.iter().cloned())
711        .collect();
712
713    // Populate the in-memory ResolvedModel for every tenant cache slot.
714    {
715        let mut model_guard = state
716            .model
717            .write()
718            .map_err(|_| AppError::BadRequest("state lock".into()))?;
719        *model_guard = new_model.clone();
720        let mut pkg_guard = state
721            .package_models
722            .write()
723            .map_err(|_| AppError::BadRequest("state lock".into()))?;
724        // Shared key used by all RLS tenants.
725        pkg_guard.insert(id.to_string(), new_model.clone());
726        // Per-tenant keys used by each Database-strategy tenant.
727        for (tid, _) in state.tenant_registry.database_tenant_targets() {
728            pkg_guard.insert(format!("{}:{}", id, tid), new_model.clone());
729        }
730        // Keep the requesting tenant's own cache slot in sync (covers edge cases).
731        pkg_guard.insert(package_cache_key, new_model);
732    }
733
734    #[derive(serde::Serialize)]
735    struct PackageInstallResponse {
736        package: Value,
737        applied: Vec<String>,
738        warnings: Vec<String>,
739        /// DDL execution result for each tenant database that was targeted.
740        tenant_migrations: Vec<TenantMigrationOutcome>,
741    }
742    Ok((
743        axum::http::StatusCode::OK,
744        Json(crate::response::SuccessOne {
745            data: PackageInstallResponse {
746                package: manifest_value,
747                applied,
748                warnings: migration_warnings,
749                tenant_migrations: tenant_outcomes,
750            },
751            meta: None,
752        }),
753    ))
754}
755
756#[derive(Deserialize)]
757pub struct UninstallPath {
758    pub package_id: String,
759}
760
761/// DELETE /api/v1/config/package/:package_id — uninstall package: revert migrations in tenant DB, delete all _sys_* config and KV data, remove package record. X-Tenant-ID required.
762pub async fn uninstall_package(
763    TenantId(tenant_id_opt): TenantId,
764    State(state): State<AppState>,
765    Path(UninstallPath { package_id }): Path<UninstallPath>,
766) -> Result<impl axum::response::IntoResponse, AppError> {
767    let tenant_id = tenant_id_opt
768        .as_deref()
769        .filter(|s| !s.is_empty())
770        .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
771
772    let ctx = resolve_tenant_context(&state, Some(tenant_id), Some(&package_id)).await?;
773    let config_pool = ctx.config_pool();
774    let migration_pool = ctx.migration_pool();
775    let schema_override = ctx.schema_override();
776    let package_cache_key = ctx.package_cache_key().to_string();
777
778    let installed = list_package_ids(config_pool).await?;
779    if !installed.contains(&package_id) {
780        return Err(AppError::NotFound(format!(
781            "package not found: {}",
782            package_id
783        )));
784    }
785
786    // Block uninstall if another installed package declares this one as a dependency.
787    let all_packages = list_packages(config_pool).await?;
788    let dependents: Vec<String> = all_packages
789        .iter()
790        .filter(|row| row.id != package_id)
791        .filter(|row| {
792            row.payload
793                .get("dependencies")
794                .and_then(Value::as_array)
795                .map(|deps| deps.iter().any(|d| d.as_str() == Some(package_id.as_str())))
796                .unwrap_or(false)
797        })
798        .map(|row| row.id.clone())
799        .collect();
800    if !dependents.is_empty() {
801        return Err(AppError::Conflict(format!(
802            "cannot uninstall '{}': packages [{}] depend on it; uninstall them first",
803            package_id,
804            dependents.join(", ")
805        )));
806    }
807
808    let config = load_from_pool(config_pool, &package_id)
809        .await
810        .map_err(AppError::Config)?;
811    revert_migrations(migration_pool, &config, schema_override).await?;
812    delete_package_and_config(config_pool, &package_id).await?;
813
814    {
815        state
816            .package_models
817            .write()
818            .map_err(|_| AppError::BadRequest("state lock".into()))?
819            .remove(&package_cache_key);
820    }
821
822    // Reload default model when uninstall was on the central DB so in-memory state stays in sync (no process restart needed).
823    if std::ptr::eq(&state.pool as *const _, config_pool as *const _) {
824        let _ = reload_model(&state).await;
825    }
826
827    #[derive(serde::Serialize)]
828    struct UninstallResponse {
829        package_id: String,
830    }
831    Ok((
832        axum::http::StatusCode::OK,
833        Json(crate::response::SuccessOne {
834            data: UninstallResponse { package_id },
835            meta: None,
836        }),
837    ))
838}
839
840/// Build the stats + full config payload for a package by fetching all 8 config kinds in parallel.
841async fn package_detail_data(
842    pool: &Pool,
843    package_id: &str,
844) -> Result<Value, crate::error::AppError> {
845    use crate::handlers::config::get_config;
846
847    let (schemas, enums, tables, columns, indexes, relationships, api_entities, kv_stores) = tokio::try_join!(
848        get_config(pool, "schemas", package_id),
849        get_config(pool, "enums", package_id),
850        get_config(pool, "tables", package_id),
851        get_config(pool, "columns", package_id),
852        get_config(pool, "indexes", package_id),
853        get_config(pool, "relationships", package_id),
854        get_config(pool, "api_entities", package_id),
855        get_config(pool, "kv_stores", package_id),
856    )?;
857
858    Ok(json!({
859        "stats": {
860            "schemas": schemas.len(),
861            "enums": enums.len(),
862            "tables": tables.len(),
863            "columns": columns.len(),
864            "indexes": indexes.len(),
865            "relationships": relationships.len(),
866            "apiEntities": api_entities.len(),
867            "kvStores": kv_stores.len(),
868        },
869        "schemas": schemas,
870        "enums": enums,
871        "tables": tables,
872        "columns": columns,
873        "indexes": indexes,
874        "relationships": relationships,
875        "apiEntities": api_entities,
876        "kvStores": kv_stores,
877    }))
878}
879
880/// GET /api/v1/config/packages — list all installed packages with manifest info and per-kind counts.
881pub async fn list_packages_handler(
882    State(state): State<AppState>,
883) -> Result<impl axum::response::IntoResponse, crate::error::AppError> {
884    let packages = list_packages(&state.pool).await?;
885
886    let mut items: Vec<Value> = Vec::with_capacity(packages.len());
887    for pkg in packages {
888        let (schemas, enums, tables, columns, indexes, relationships, api_entities, kv_stores) = tokio::try_join!(
889            count_package_kind(&state.pool, "schemas", &pkg.id),
890            count_package_kind(&state.pool, "enums", &pkg.id),
891            count_package_kind(&state.pool, "tables", &pkg.id),
892            count_package_kind(&state.pool, "columns", &pkg.id),
893            count_package_kind(&state.pool, "indexes", &pkg.id),
894            count_package_kind(&state.pool, "relationships", &pkg.id),
895            count_package_kind(&state.pool, "api_entities", &pkg.id),
896            count_package_kind(&state.pool, "kv_stores", &pkg.id),
897        )?;
898
899        let name = pkg
900            .payload
901            .get("name")
902            .and_then(Value::as_str)
903            .map(String::from);
904        let version = pkg
905            .payload
906            .get("version")
907            .and_then(Value::as_str)
908            .map(String::from);
909        let schema = pkg
910            .payload
911            .get("schema")
912            .and_then(Value::as_str)
913            .map(String::from);
914        let dependencies: Vec<&str> = pkg
915            .payload
916            .get("dependencies")
917            .and_then(Value::as_array)
918            .map(|arr| arr.iter().filter_map(Value::as_str).collect())
919            .unwrap_or_default();
920
921        items.push(json!({
922            "id": pkg.id,
923            "name": name,
924            "version": version,
925            "schema": schema,
926            "installedVersion": pkg.version,
927            "updatedAt": pkg.updated_at,
928            "dependencies": dependencies,
929            "stats": {
930                "schemas": schemas,
931                "enums": enums,
932                "tables": tables,
933                "columns": columns,
934                "indexes": indexes,
935                "relationships": relationships,
936                "apiEntities": api_entities,
937                "kvStores": kv_stores,
938            },
939        }));
940    }
941
942    let count = items.len() as u64;
943    Ok((
944        axum::http::StatusCode::OK,
945        Json(crate::response::SuccessMany {
946            data: items,
947            meta: crate::response::MetaCount { count },
948        }),
949    ))
950}
951
952#[derive(Deserialize)]
953pub struct PackageIdPath {
954    pub package_id: String,
955}
956
957/// GET /api/v1/config/packages/:package_id — full details of one installed package including all config objects.
958pub async fn get_package_handler(
959    State(state): State<AppState>,
960    Path(PackageIdPath { package_id }): Path<PackageIdPath>,
961) -> Result<impl axum::response::IntoResponse, crate::error::AppError> {
962    let pkg = get_package(&state.pool, &package_id)
963        .await?
964        .ok_or_else(|| {
965            crate::error::AppError::NotFound(format!("package not found: {}", package_id))
966        })?;
967
968    let name = pkg
969        .payload
970        .get("name")
971        .and_then(Value::as_str)
972        .map(String::from);
973    let version = pkg
974        .payload
975        .get("version")
976        .and_then(Value::as_str)
977        .map(String::from);
978    let schema = pkg
979        .payload
980        .get("schema")
981        .and_then(Value::as_str)
982        .map(String::from);
983
984    let mut detail = package_detail_data(&state.pool, &package_id).await?;
985    let obj = detail.as_object_mut().unwrap();
986    obj.insert("id".into(), json!(pkg.id));
987    obj.insert("name".into(), json!(name));
988    obj.insert("version".into(), json!(version));
989    obj.insert("schema".into(), json!(schema));
990    obj.insert("installedVersion".into(), json!(pkg.version));
991    obj.insert("updatedAt".into(), json!(pkg.updated_at));
992    obj.insert("manifest".into(), pkg.payload);
993
994    Ok((
995        axum::http::StatusCode::OK,
996        Json(crate::response::SuccessOne {
997            data: detail,
998            meta: None,
999        }),
1000    ))
1001}
1002
1003// ─── Migration preview / apply ───────────────────────────────────────────────
1004
1005/// POST /api/v1/config/package/migration/preview
1006/// Upload a package zip to preview the migration plan without applying any changes.
1007/// The returned `migration_id` can be passed to the apply endpoint after review.
1008/// X-Tenant-ID required. Only valid for upgrades (package must already be installed).
1009pub async fn preview_migration_handler(
1010    TenantId(tenant_id_opt): TenantId,
1011    State(state): State<AppState>,
1012    mut multipart: Multipart,
1013) -> Result<impl axum::response::IntoResponse, AppError> {
1014    let tenant_id = tenant_id_opt
1015        .as_deref()
1016        .filter(|s| !s.is_empty())
1017        .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
1018
1019    let mut zip_bytes_raw: Option<Vec<u8>> = None;
1020    while let Ok(Some(field)) = multipart.next_field().await {
1021        let name = field.name().unwrap_or("").to_string();
1022        if name == "file" || name == "package" {
1023            let data = field
1024                .bytes()
1025                .await
1026                .map_err(|e| AppError::BadRequest(e.to_string()))?;
1027            zip_bytes_raw = Some(data.to_vec());
1028            break;
1029        }
1030    }
1031    let zip_bytes = zip_bytes_raw
1032        .ok_or_else(|| AppError::BadRequest("missing 'file' or 'package' field".into()))?;
1033
1034    let mut archive = ZipArchive::new(Cursor::new(zip_bytes.clone()))
1035        .map_err(|e| AppError::BadRequest(format!("invalid zip: {}", e)))?;
1036
1037    let manifest_name = archive
1038        .file_names()
1039        .find(|n| *n == "manifest.json" || n.ends_with("/manifest.json"))
1040        .map(String::from)
1041        .ok_or_else(|| AppError::BadRequest("zip must contain manifest.json".into()))?;
1042
1043    let manifest_value: Value = {
1044        let mut file = archive
1045            .by_name(&manifest_name)
1046            .map_err(|e| AppError::BadRequest(e.to_string()))?;
1047        let mut buf = String::new();
1048        std::io::Read::read_to_string(&mut file, &mut buf)
1049            .map_err(|e| AppError::BadRequest(e.to_string()))?;
1050        serde_json::from_str(&buf)
1051            .map_err(|e| AppError::BadRequest(format!("invalid manifest.json: {}", e)))?
1052    };
1053    let manifest_obj = manifest_value
1054        .as_object()
1055        .ok_or_else(|| AppError::BadRequest("manifest.json must be an object".into()))?;
1056
1057    let id = manifest_obj
1058        .get("id")
1059        .and_then(Value::as_str)
1060        .ok_or_else(|| AppError::BadRequest("manifest must have 'id'".into()))?;
1061    let incoming_version = manifest_obj
1062        .get("version")
1063        .and_then(Value::as_str)
1064        .unwrap_or("");
1065    let schema_name = manifest_obj
1066        .get("schema")
1067        .and_then(Value::as_str)
1068        .ok_or_else(|| AppError::BadRequest("manifest must have 'schema'".into()))?;
1069
1070    let existing = get_package(&state.pool, id).await?.ok_or_else(|| {
1071        AppError::NotFound(format!(
1072            "package '{}' is not installed — preview is only for upgrades",
1073            id
1074        ))
1075    })?;
1076
1077    if existing.semantic_version.as_deref() == Some(incoming_version) {
1078        return Err(AppError::Conflict(format!(
1079            "package '{}' version '{}' is already installed",
1080            id, incoming_version
1081        )));
1082    }
1083
1084    let from_version = existing.semantic_version.clone();
1085    let ctx = resolve_tenant_context(&state, Some(tenant_id), Some(id)).await?;
1086    let config_pool = ctx.config_pool();
1087
1088    let old_config = load_from_pool(config_pool, id)
1089        .await
1090        .map_err(AppError::Config)?;
1091
1092    // Build new FullConfig from the zip (same logic as install_package, without writing to DB)
1093    let schemas_body = vec![serde_json::json!({ "id": DEFAULT_SCHEMA_ID, "name": schema_name })];
1094    let config_kinds = [
1095        "schemas",
1096        "enums",
1097        "tables",
1098        "columns",
1099        "indexes",
1100        "relationships",
1101        "api_entities",
1102        "kv_stores",
1103    ];
1104    let mut all_values: std::collections::HashMap<String, Vec<Value>> =
1105        std::collections::HashMap::new();
1106    for kind in &config_kinds {
1107        let body: Vec<Value> = if *kind == "schemas" {
1108            serde_json::from_value(Value::Array(schemas_body.clone())).unwrap_or_default()
1109        } else {
1110            let mut body = read_kind_from_zip(&mut archive, kind).unwrap_or_default();
1111            match *kind {
1112                "enums" | "tables" | "indexes" => inject_schema_id(&mut body, DEFAULT_SCHEMA_ID),
1113                "relationships" => inject_relationship_schema_ids(&mut body, DEFAULT_SCHEMA_ID),
1114                _ => {}
1115            }
1116            body
1117        };
1118        all_values.insert(kind.to_string(), body);
1119    }
1120
1121    // Deserialize into FullConfig manually using the same logic as load_from_pool
1122    let new_config = build_full_config_from_values(&all_values)?;
1123
1124    let plan = compute_migration_plan(
1125        &old_config,
1126        &new_config,
1127        ctx.schema_override(),
1128        ctx.rls_tenant_column(),
1129        state.dialect.as_ref(),
1130        &std::collections::HashMap::new(),
1131    )
1132    .map_err(|e| AppError::BadRequest(format!("migration plan error: {}", e)))?;
1133
1134    let summary = plan.summary();
1135    let plan_json = serde_json::to_value(&plan).map_err(|e| AppError::BadRequest(e.to_string()))?;
1136    let migration_id = Uuid::new_v4().to_string();
1137
1138    save_migration_plan(
1139        config_pool,
1140        &migration_id,
1141        id,
1142        tenant_id,
1143        from_version.as_deref(),
1144        incoming_version,
1145        &plan_json,
1146        &zip_bytes,
1147    )
1148    .await?;
1149
1150    Ok((
1151        axum::http::StatusCode::OK,
1152        Json(crate::response::SuccessOne {
1153            data: json!({
1154                "migration_id": migration_id,
1155                "package_id": id,
1156                "from_version": from_version,
1157                "to_version": incoming_version,
1158                "expires_in_hours": 24,
1159                "summary": {
1160                    "total": summary.total,
1161                    "safe": summary.safe,
1162                    "best_effort": summary.best_effort,
1163                    "warn_only": summary.warn_only,
1164                },
1165                "steps": plan.steps,
1166            }),
1167            meta: None,
1168        }),
1169    ))
1170}
1171
1172#[derive(Deserialize)]
1173pub struct MigrationIdPath {
1174    pub migration_id: String,
1175}
1176
1177/// POST /api/v1/config/package/migration/apply/:migration_id
1178/// Apply a previously previewed migration plan. Idempotent: calling twice returns 409.
1179/// Applies config changes to _sys_* tables, executes DDL against the tenant DB, and writes audit records.
1180/// X-Tenant-ID required.
1181pub async fn apply_migration_handler(
1182    TenantId(tenant_id_opt): TenantId,
1183    State(state): State<AppState>,
1184    Path(MigrationIdPath { migration_id }): Path<MigrationIdPath>,
1185) -> Result<impl axum::response::IntoResponse, AppError> {
1186    let tenant_id = tenant_id_opt
1187        .as_deref()
1188        .filter(|s| !s.is_empty())
1189        .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
1190
1191    let row = get_migration_plan(&state.pool, &migration_id)
1192        .await?
1193        .ok_or_else(|| {
1194            AppError::NotFound(format!("migration plan '{}' not found", migration_id))
1195        })?;
1196
1197    if row.status == "applied" {
1198        return Err(AppError::Conflict(format!(
1199            "migration plan '{}' has already been applied",
1200            migration_id
1201        )));
1202    }
1203    if row.status != "pending" {
1204        return Err(AppError::BadRequest(format!(
1205            "migration plan '{}' has status '{}' and cannot be applied",
1206            migration_id, row.status
1207        )));
1208    }
1209
1210    let now = chrono::Utc::now();
1211    if now > row.expires_at {
1212        return Err(AppError::BadRequest(format!(
1213            "migration plan '{}' expired at {} — re-run preview to generate a new plan",
1214            migration_id, row.expires_at
1215        )));
1216    }
1217
1218    if row.tenant_id != tenant_id {
1219        return Err(AppError::BadRequest(format!(
1220            "migration plan '{}' was created for tenant '{}', not '{}'",
1221            migration_id, row.tenant_id, tenant_id
1222        )));
1223    }
1224
1225    let plan: MigrationPlan = serde_json::from_value(row.plan_json.clone())
1226        .map_err(|e| AppError::BadRequest(format!("corrupted migration plan: {}", e)))?;
1227
1228    let ctx = resolve_tenant_context(&state, Some(tenant_id), Some(&row.package_id)).await?;
1229    let config_pool = ctx.config_pool();
1230    let migration_pool = ctx.migration_pool();
1231    let package_cache_key = ctx.package_cache_key().to_string();
1232
1233    // Re-apply configs from the stored zip bytes
1234    let mut archive = ZipArchive::new(Cursor::new(row.zip_bytes.clone()))
1235        .map_err(|e| AppError::BadRequest(format!("stored zip corrupted: {}", e)))?;
1236
1237    let manifest_name = archive
1238        .file_names()
1239        .find(|n| *n == "manifest.json" || n.ends_with("/manifest.json"))
1240        .map(String::from)
1241        .ok_or_else(|| AppError::BadRequest("stored zip missing manifest.json".into()))?;
1242
1243    let manifest_value: Value = {
1244        let mut file = archive
1245            .by_name(&manifest_name)
1246            .map_err(|e| AppError::BadRequest(e.to_string()))?;
1247        let mut buf = String::new();
1248        std::io::Read::read_to_string(&mut file, &mut buf)
1249            .map_err(|e| AppError::BadRequest(e.to_string()))?;
1250        serde_json::from_str(&buf)
1251            .map_err(|e| AppError::BadRequest(format!("invalid manifest: {}", e)))?
1252    };
1253    let schema_name = manifest_value
1254        .get("schema")
1255        .and_then(Value::as_str)
1256        .ok_or_else(|| AppError::BadRequest("manifest missing 'schema'".into()))?;
1257
1258    let schemas_body = vec![serde_json::json!({ "id": DEFAULT_SCHEMA_ID, "name": schema_name })];
1259    let apply_order = config_apply_order();
1260    for kind in &apply_order {
1261        let body: Vec<Value> = if *kind == "schemas" {
1262            serde_json::from_value(Value::Array(schemas_body.clone()))
1263                .map_err(|e| AppError::BadRequest(format!("schemas body: {}", e)))?
1264        } else {
1265            let mut body = read_kind_from_zip(&mut archive, kind)?;
1266            match *kind {
1267                "enums" | "tables" | "indexes" => inject_schema_id(&mut body, DEFAULT_SCHEMA_ID),
1268                "relationships" => inject_relationship_schema_ids(&mut body, DEFAULT_SCHEMA_ID),
1269                _ => {}
1270            }
1271            body
1272        };
1273        replace_config(config_pool, kind, body, false, &row.package_id, None).await?;
1274    }
1275    upsert_package(config_pool, &row.package_id, &manifest_value).await?;
1276
1277    // Atomically mark plan as applied (prevents double-apply under concurrent requests)
1278    let claimed = mark_migration_plan_applied(config_pool, &migration_id).await?;
1279    if !claimed {
1280        return Err(AppError::Conflict(format!(
1281            "migration plan '{}' was applied by a concurrent request",
1282            migration_id
1283        )));
1284    }
1285
1286    // Execute the DDL plan with audit
1287    let result = execute_migration_plan(
1288        migration_pool,
1289        config_pool,
1290        &plan,
1291        &migration_id,
1292        &row.package_id,
1293        tenant_id,
1294        row.from_version.as_deref(),
1295        &row.to_version,
1296    )
1297    .await?;
1298
1299    // Reload in-memory model
1300    let new_config = load_from_pool(config_pool, &row.package_id)
1301        .await
1302        .map_err(AppError::Config)?;
1303    let new_model = resolve(&new_config)
1304        .map_err(AppError::Config)?
1305        .with_package_id(&row.package_id);
1306    {
1307        let mut guard = state
1308            .model
1309            .write()
1310            .map_err(|_| AppError::BadRequest("state lock".into()))?;
1311        *guard = new_model.clone();
1312        state
1313            .package_models
1314            .write()
1315            .map_err(|_| AppError::BadRequest("state lock".into()))?
1316            .insert(package_cache_key, new_model);
1317    }
1318
1319    Ok((
1320        axum::http::StatusCode::OK,
1321        Json(crate::response::SuccessOne {
1322            data: json!({
1323                "migration_id": migration_id,
1324                "package_id": row.package_id,
1325                "from_version": row.from_version,
1326                "to_version": row.to_version,
1327                "steps_applied": result.applied,
1328                "steps_warned": result.warned,
1329                "warnings": result.warnings,
1330            }),
1331            meta: None,
1332        }),
1333    ))
1334}
1335
1336// ─── Bootstrap ───────────────────────────────────────────────────────────────
1337
1338/// POST /api/v1/config/package/:package_id/bootstrap
1339///
1340/// Initialises a **new** Database-strategy tenant's database using the currently installed
1341/// package schema. Use this after adding a new tenant to `_sys_tenants` when the package is
1342/// already installed (calling `install_package` would return 409).
1343///
1344/// - Does NOT touch `_sys_*` tables or `_sys_packages` — config is unchanged.
1345/// - Calls `apply_migrations` which is idempotent (IF NOT EXISTS guards on tables/schemas/indexes).
1346/// - Returns 400 for RLS tenants: they share the central DB which already has the schema.
1347/// - X-Tenant-ID header must identify the new tenant to bootstrap.
1348pub async fn bootstrap_tenant_handler(
1349    TenantId(tenant_id_opt): TenantId,
1350    State(state): State<AppState>,
1351    Path(PackageIdPath { package_id }): Path<PackageIdPath>,
1352) -> Result<impl axum::response::IntoResponse, AppError> {
1353    let tenant_id = tenant_id_opt
1354        .as_deref()
1355        .filter(|s| !s.is_empty())
1356        .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
1357
1358    let entry = state
1359        .tenant_registry
1360        .get(tenant_id)
1361        .ok_or_else(|| AppError::NotFound(format!("tenant not found: {}", tenant_id)))?;
1362
1363    // Bootstrap is only needed for Database-strategy tenants.
1364    // RLS tenants share the central DB — their tables are created by a normal install.
1365    if !matches!(entry.strategy, TenantStrategy::Database) {
1366        return Err(AppError::BadRequest(
1367            "bootstrap only applies to Database-strategy tenants; RLS tenants share the central DB which is migrated by install_package".into(),
1368        ));
1369    }
1370
1371    let database_url = entry.database_url.as_deref().ok_or_else(|| {
1372        AppError::BadRequest(format!("tenant {}: missing database_url", tenant_id))
1373    })?;
1374
1375    // Package must already be installed in _sys_*.
1376    let _ = get_package(&state.pool, &package_id)
1377        .await?
1378        .ok_or_else(|| AppError::NotFound(format!("package '{}' is not installed", package_id)))?;
1379
1380    let config = load_from_pool(&state.pool, &package_id)
1381        .await
1382        .map_err(AppError::Config)?;
1383
1384    let pool = get_or_create_tenant_pool(&state, tenant_id, database_url).await?;
1385
1386    // apply_migrations is idempotent: safe on both an empty DB and an already-migrated one.
1387    apply_migrations(
1388        &pool,
1389        &config,
1390        None,
1391        None,
1392        state.dialect.as_ref(),
1393        &std::collections::HashMap::new(),
1394    )
1395    .await?;
1396
1397    // Populate the model cache for this tenant so entity routes resolve without a reload.
1398    let model = crate::config::resolve(&config)
1399        .map_err(AppError::Config)?
1400        .with_package_id(&package_id);
1401    {
1402        state
1403            .package_models
1404            .write()
1405            .map_err(|_| AppError::BadRequest("state lock".into()))?
1406            .insert(format!("{}:{}", package_id, tenant_id), model);
1407    }
1408
1409    Ok((
1410        axum::http::StatusCode::OK,
1411        Json(crate::response::SuccessOne {
1412            data: serde_json::json!({
1413                "tenant_id": tenant_id,
1414                "package_id": package_id,
1415                "status": "bootstrapped",
1416            }),
1417            meta: None,
1418        }),
1419    ))
1420}
1421
1422// ─────────────────────────────────────────────────────────────────────────────
1423
1424/// Build a FullConfig from pre-parsed per-kind value maps (used in preview, without touching the DB).
1425fn build_full_config_from_values(
1426    values: &std::collections::HashMap<String, Vec<Value>>,
1427) -> Result<crate::config::FullConfig, AppError> {
1428    fn parse_kind<T: serde::de::DeserializeOwned>(
1429        values: &std::collections::HashMap<String, Vec<Value>>,
1430        key: &str,
1431    ) -> Result<Vec<T>, AppError> {
1432        let arr = values.get(key).cloned().unwrap_or_default();
1433        arr.into_iter()
1434            .map(|v| {
1435                serde_json::from_value(v)
1436                    .map_err(|e| AppError::BadRequest(format!("{} parse error: {}", key, e)))
1437            })
1438            .collect()
1439    }
1440
1441    Ok(crate::config::FullConfig {
1442        schemas: parse_kind(values, "schemas")?,
1443        enums: parse_kind(values, "enums")?,
1444        tables: parse_kind(values, "tables")?,
1445        columns: parse_kind(values, "columns")?,
1446        indexes: parse_kind(values, "indexes")?,
1447        relationships: parse_kind(values, "relationships")?,
1448        api_entities: parse_kind(values, "api_entities")?,
1449        kv_stores: parse_kind(values, "kv_stores")?,
1450    })
1451}