1use crate::config::{load_from_pool, resolve, FullConfig};
5use crate::db::pool::Pool;
6use crate::db::{parse_canonical, CanonicalType, Dialect};
7use crate::error::AppError;
8use crate::extractors::tenant::TenantId;
9use crate::handlers::config::{reload_model, replace_config};
10use crate::handlers::entity::{get_or_create_tenant_pool, resolve_tenant_context};
11use crate::migration::{
12 apply_migrations, compute_migration_plan, execute_migration_plan, revert_migrations,
13 MigrationPlan,
14};
15use crate::state::AppState;
16use crate::store::{
17 count_package_kind, delete_package_and_config, get_migration_plan, get_package,
18 list_package_ids, list_packages, mark_migration_plan_applied, save_migration_plan,
19 upsert_package,
20};
21use crate::tenant::TenantStrategy;
22use axum::extract::{Multipart, Path, State};
23use axum::Json;
24use serde::Deserialize;
25use serde_json::{json, Value};
26use std::collections::HashSet;
27use std::io::Cursor;
28use uuid::Uuid;
29use zip::ZipArchive;
30
31pub(crate) fn reject_asset_columns_without_storage(
35 config: &FullConfig,
36 storage: &Option<std::sync::Arc<dyn crate::storage::StorageProvider>>,
37) -> Result<(), AppError> {
38 if storage.is_some() {
39 return Ok(());
40 }
41 let asset_cols: Vec<String> = config
42 .columns
43 .iter()
44 .filter(|c| {
45 matches!(
46 parse_canonical(&c.type_),
47 CanonicalType::Asset | CanonicalType::AssetArray
48 )
49 })
50 .map(|c| c.name.clone())
51 .collect();
52 if !asset_cols.is_empty() {
53 return Err(AppError::BadRequest(format!(
54 "Package defines asset column(s) [{}] but no storage provider is configured. \
55 Set STORAGE_PROVIDER (s3 | azure | gcs | rustfs) before installing packages \
56 that use asset or asset[] columns.",
57 asset_cols.join(", ")
58 )));
59 }
60 Ok(())
61}
62
63#[derive(serde::Serialize)]
65struct TenantMigrationOutcome {
66 target: String,
68 strategy: String,
70 status: String,
72 warnings: Vec<String>,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 error: Option<String>,
75}
76
77const CONFIG_KINDS: &[&str] = &[
79 "schemas",
80 "enums",
81 "tables",
82 "columns",
83 "indexes",
84 "relationships",
85 "api_entities",
86 "kv_stores",
87];
88
89fn dependencies(kind: &str) -> &'static [&'static str] {
92 match kind {
93 "schemas" => &[],
94 "enums" => &["schemas"],
95 "tables" => &["schemas"],
96 "columns" => &["tables"],
97 "indexes" => &["schemas", "tables"],
98 "relationships" => &["schemas", "tables", "columns"],
99 "api_entities" => &["tables"],
100 "kv_stores" => &[],
101 _ => &[],
102 }
103}
104
105fn config_apply_order() -> Vec<&'static str> {
108 let mut order = Vec::with_capacity(CONFIG_KINDS.len());
109 let mut done: HashSet<&'static str> = HashSet::new();
110 while order.len() < CONFIG_KINDS.len() {
111 let mut made_progress = false;
112 for &kind in CONFIG_KINDS {
113 if done.contains(kind) {
114 continue;
115 }
116 let deps = dependencies(kind);
117 if deps.iter().all(|d| done.contains(d)) {
118 order.push(kind);
119 done.insert(kind);
120 made_progress = true;
121 }
122 }
123 if !made_progress {
124 break;
125 }
126 }
127 order
128}
129
130const DEFAULT_SCHEMA_ID: &str = "default";
132
133fn assemble_config(bodies: &[(&'static str, Vec<Value>)]) -> Result<FullConfig, AppError> {
137 fn de<T: serde::de::DeserializeOwned>(
138 bodies: &[(&'static str, Vec<Value>)],
139 kind: &str,
140 ) -> Result<Vec<T>, AppError> {
141 let arr = bodies
142 .iter()
143 .find(|(k, _)| *k == kind)
144 .map(|(_, v)| v.clone())
145 .unwrap_or_default();
146 serde_json::from_value(Value::Array(arr))
147 .map_err(|e| AppError::BadRequest(format!("invalid {}: {}", kind, e)))
148 }
149 Ok(FullConfig {
150 schemas: de(bodies, "schemas")?,
151 enums: de(bodies, "enums")?,
152 tables: de(bodies, "tables")?,
153 columns: de(bodies, "columns")?,
154 indexes: de(bodies, "indexes")?,
155 relationships: de(bodies, "relationships")?,
156 api_entities: de(bodies, "api_entities")?,
157 kv_stores: de(bodies, "kv_stores")?,
158 })
159}
160
161fn inject_schema_id(body: &mut [Value], schema_id: &str) {
162 for rec in body.iter_mut() {
163 if let Some(obj) = rec.as_object_mut() {
164 if !obj.contains_key("schema_id") {
165 obj.insert("schema_id".into(), Value::String(schema_id.to_string()));
166 }
167 }
168 }
169}
170
171fn inject_relationship_schema_ids(body: &mut [Value], schema_id: &str) {
172 for rec in body.iter_mut() {
173 if let Some(obj) = rec.as_object_mut() {
174 if !obj.contains_key("from_schema_id") {
176 obj.insert(
177 "from_schema_id".into(),
178 Value::String(schema_id.to_string()),
179 );
180 }
181 let is_cross_package = obj
184 .get("to_package_id")
185 .and_then(Value::as_str)
186 .map(|s| !s.is_empty())
187 .unwrap_or(false);
188 if !is_cross_package && !obj.contains_key("to_schema_id") {
189 obj.insert("to_schema_id".into(), Value::String(schema_id.to_string()));
190 }
191 }
192 }
193}
194
195fn read_zip_entry_to_string<R: std::io::Read + std::io::Seek>(
196 archive: &mut ZipArchive<R>,
197 name: &str,
198) -> Result<String, AppError> {
199 let mut f = archive
200 .by_name(name)
201 .map_err(|e| AppError::BadRequest(e.to_string()))?;
202 let mut s = String::new();
203 std::io::Read::read_to_string(&mut f, &mut s)
204 .map_err(|e| AppError::BadRequest(e.to_string()))?;
205 Ok(s)
206}
207
208fn read_kind_from_zip<R: std::io::Read + std::io::Seek>(
212 archive: &mut ZipArchive<R>,
213 kind: &str,
214) -> Result<Vec<Value>, AppError> {
215 let flat = format!("{}.json", kind);
216 if let Ok(content) = read_zip_entry_to_string(archive, &flat) {
217 return serde_json::from_str(&content)
218 .map_err(|e| AppError::BadRequest(format!("invalid {}: {}", flat, e)));
219 }
220
221 let prefix = format!("{}/", kind);
222 let mut names: Vec<String> = archive
223 .file_names()
224 .filter(|n| n.starts_with(&prefix) && n.ends_with(".json"))
225 .map(String::from)
226 .collect();
227 names.sort();
228
229 let mut merged: Vec<Value> = Vec::new();
230 for name in names {
231 let content = read_zip_entry_to_string(archive, &name)?;
232 let mut items: Vec<Value> = serde_json::from_str(&content)
233 .map_err(|e| AppError::BadRequest(format!("invalid {}: {}", name, e)))?;
234 merged.append(&mut items);
235 }
236 Ok(merged)
237}
238
239#[allow(clippy::too_many_arguments)]
244async fn apply_ddl_to_pool(
245 migration_pool: &Pool,
246 config_pool: &Pool,
247 config: &FullConfig,
248 plan: Option<&MigrationPlan>,
249 package_id: &str,
250 target: &str,
251 strategy: &str,
252 from_version: Option<&str>,
253 to_version: &str,
254 rls_tenant_column: Option<&str>,
255 dialect: &dyn Dialect,
256 cross_package_configs: &std::collections::HashMap<String, FullConfig>,
257) -> TenantMigrationOutcome {
258 match plan {
259 Some(p) => {
261 let migration_id = Uuid::new_v4().to_string();
262 match execute_migration_plan(
263 migration_pool,
264 config_pool,
265 p,
266 &migration_id,
267 package_id,
268 target,
269 from_version,
270 to_version,
271 )
272 .await
273 {
274 Ok(result) => TenantMigrationOutcome {
275 target: target.to_string(),
276 strategy: strategy.to_string(),
277 status: if result.warned > 0 {
278 "applied_with_warnings".to_string()
279 } else {
280 "applied".to_string()
281 },
282 warnings: result.warnings,
283 error: None,
284 },
285 Err(e) => {
286 tracing::warn!(target, strategy, error = %e, "DDL broadcast failed (upgrade)");
287 TenantMigrationOutcome {
288 target: target.to_string(),
289 strategy: strategy.to_string(),
290 status: "failed".to_string(),
291 warnings: vec![],
292 error: Some(e.to_string()),
293 }
294 }
295 }
296 }
297 None => {
299 match apply_migrations(
300 migration_pool,
301 config,
302 None,
303 rls_tenant_column,
304 dialect,
305 cross_package_configs,
306 )
307 .await
308 {
309 Ok(()) => TenantMigrationOutcome {
310 target: target.to_string(),
311 strategy: strategy.to_string(),
312 status: "applied".to_string(),
313 warnings: vec![],
314 error: None,
315 },
316 Err(e) => {
317 tracing::warn!(target, strategy, error = %e, "DDL broadcast failed (fresh install)");
318 TenantMigrationOutcome {
319 target: target.to_string(),
320 strategy: strategy.to_string(),
321 status: "failed".to_string(),
322 warnings: vec![],
323 error: Some(e.to_string()),
324 }
325 }
326 }
327 }
328 }
329}
330
331async fn broadcast_ddl(
341 state: &AppState,
342 config_pool: &Pool,
343 config: &FullConfig,
344 old_config: Option<&FullConfig>,
345 package_id: &str,
346 from_version: Option<&str>,
347 to_version: &str,
348) -> Vec<TenantMigrationOutcome> {
349 let mut outcomes = Vec::new();
350
351 let cross_package_configs: std::collections::HashMap<String, FullConfig> = {
353 match list_package_ids(config_pool).await {
354 Ok(ids) => {
355 let mut map = std::collections::HashMap::new();
356 for pid in ids {
357 if pid == package_id {
358 continue;
359 }
360 match load_from_pool(config_pool, &pid).await {
361 Ok(cfg) => {
362 map.insert(pid, cfg);
363 }
364 Err(e) => {
365 tracing::warn!(pkg = %pid, error = %e, "could not load cross-package config for FK resolution")
366 }
367 }
368 }
369 map
370 }
371 Err(e) => {
372 tracing::warn!(error = %e, "could not list packages for cross-package FK resolution");
373 std::collections::HashMap::new()
374 }
375 }
376 };
377
378 let plan: Option<MigrationPlan> = match old_config {
382 Some(old) => {
383 match compute_migration_plan(
384 old,
385 config,
386 None,
387 None,
388 state.dialect.as_ref(),
389 &cross_package_configs,
390 ) {
391 Ok(p) => Some(p),
392 Err(e) => {
393 tracing::error!(error = %e, "could not compute migration plan for broadcast");
394 return vec![TenantMigrationOutcome {
395 target: "all".to_string(),
396 strategy: "n/a".to_string(),
397 status: "failed".to_string(),
398 warnings: vec![],
399 error: Some(format!("migration plan error: {}", e)),
400 }];
401 }
402 }
403 }
404 None => None,
405 };
406
407 if state.tenant_registry.has_shared_rls_tenants() {
409 let outcome = apply_ddl_to_pool(
410 &state.pool,
411 config_pool,
412 config,
413 plan.as_ref(),
414 package_id,
415 "central_rls_db",
416 "rls",
417 from_version,
418 to_version,
419 Some(crate::migration::RLS_TENANT_COLUMN),
420 state.dialect.as_ref(),
421 &cross_package_configs,
422 )
423 .await;
424 outcomes.push(outcome);
425 }
426
427 let mut seen_rls_urls: HashSet<String> = HashSet::new();
430 for (tid, db_url) in state.tenant_registry.rls_dedicated_db_targets() {
431 if !seen_rls_urls.insert(db_url.clone()) {
432 continue; }
434 let pool = match get_or_create_tenant_pool(state, &tid, &db_url).await {
435 Ok(p) => p,
436 Err(e) => {
437 tracing::warn!(target = %tid, error = %e, "could not connect to dedicated RLS tenant DB");
438 outcomes.push(TenantMigrationOutcome {
439 target: tid.clone(),
440 strategy: "rls".to_string(),
441 status: "failed".to_string(),
442 warnings: vec![],
443 error: Some(format!("connection failed: {}", e)),
444 });
445 continue;
446 }
447 };
448 let outcome = apply_ddl_to_pool(
449 &pool,
450 config_pool,
451 config,
452 plan.as_ref(),
453 package_id,
454 &tid,
455 "rls",
456 from_version,
457 to_version,
458 Some(crate::migration::RLS_TENANT_COLUMN),
459 state.dialect.as_ref(),
460 &cross_package_configs,
461 )
462 .await;
463 outcomes.push(outcome);
464 }
465
466 for (tid, db_url) in state.tenant_registry.database_tenant_targets() {
468 let pool = match get_or_create_tenant_pool(state, &tid, &db_url).await {
469 Ok(p) => p,
470 Err(e) => {
471 tracing::warn!(target = %tid, error = %e, "could not connect to Database tenant DB");
472 outcomes.push(TenantMigrationOutcome {
473 target: tid.clone(),
474 strategy: "database".to_string(),
475 status: "failed".to_string(),
476 warnings: vec![],
477 error: Some(format!("connection failed: {}", e)),
478 });
479 continue;
480 }
481 };
482 let outcome = apply_ddl_to_pool(
483 &pool,
484 config_pool,
485 config,
486 plan.as_ref(),
487 package_id,
488 &tid,
489 "database",
490 from_version,
491 to_version,
492 None,
493 state.dialect.as_ref(),
494 &cross_package_configs,
495 )
496 .await;
497 outcomes.push(outcome);
498 }
499
500 outcomes
501}
502
503pub async fn install_package(
507 TenantId(tenant_id_opt): TenantId,
508 State(state): State<AppState>,
509 mut multipart: Multipart,
510) -> Result<impl axum::response::IntoResponse, AppError> {
511 let tenant_id = tenant_id_opt
512 .as_deref()
513 .filter(|s| !s.is_empty())
514 .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
515 state
516 .tenant_registry
517 .get(tenant_id)
518 .ok_or_else(|| AppError::NotFound(format!("tenant not found: {}", tenant_id)))?;
519
520 let mut zip_bytes: Option<Vec<u8>> = None;
521 while let Ok(Some(field)) = multipart.next_field().await {
522 let name = field.name().unwrap_or("").to_string();
523 if name == "file" || name == "package" {
524 let data = field
525 .bytes()
526 .await
527 .map_err(|e| AppError::BadRequest(e.to_string()))?;
528 zip_bytes = Some(data.to_vec());
529 break;
530 }
531 }
532 let zip_bytes = zip_bytes.ok_or_else(|| {
533 AppError::BadRequest("missing 'file' or 'package' field in multipart body".into())
534 })?;
535
536 let mut archive = ZipArchive::new(Cursor::new(zip_bytes))
537 .map_err(|e| AppError::BadRequest(format!("invalid zip: {}", e)))?;
538
539 let manifest_name = archive
540 .file_names()
541 .find(|n| *n == "manifest.json" || n.ends_with("/manifest.json"))
542 .map(String::from)
543 .ok_or_else(|| AppError::BadRequest("zip must contain manifest.json at root".into()))?;
544
545 let manifest_value: Value = {
546 let mut file = archive
547 .by_name(&manifest_name)
548 .map_err(|e| AppError::BadRequest(e.to_string()))?;
549 let mut buf = String::new();
550 std::io::Read::read_to_string(&mut file, &mut buf)
551 .map_err(|e| AppError::BadRequest(e.to_string()))?;
552 serde_json::from_str(&buf)
553 .map_err(|e| AppError::BadRequest(format!("invalid manifest.json: {}", e)))?
554 };
555
556 let manifest_obj = manifest_value
557 .as_object()
558 .ok_or_else(|| AppError::BadRequest("manifest.json must be an object".into()))?;
559 let id = manifest_obj
560 .get("id")
561 .and_then(Value::as_str)
562 .ok_or_else(|| AppError::BadRequest("manifest must have 'id' (string)".into()))?;
563 let _name = manifest_obj
564 .get("name")
565 .and_then(Value::as_str)
566 .ok_or_else(|| AppError::BadRequest("manifest must have 'name' (string)".into()))?;
567 let _version = manifest_obj
568 .get("version")
569 .and_then(Value::as_str)
570 .ok_or_else(|| AppError::BadRequest("manifest must have 'version' (string)".into()))?;
571 let schema_name = manifest_obj
572 .get("schema")
573 .and_then(Value::as_str)
574 .ok_or_else(|| {
575 AppError::BadRequest(
576 "manifest must have 'schema' (string) - the schema name for all configs".into(),
577 )
578 })?;
579
580 let ctx = resolve_tenant_context(&state, Some(tenant_id), Some(id)).await?;
581 let config_pool = ctx.config_pool();
582 let package_cache_key = ctx.package_cache_key().to_string();
584
585 if let Some(deps) = manifest_obj.get("dependencies").and_then(Value::as_array) {
587 let installed_ids: std::collections::HashSet<String> =
588 list_package_ids(config_pool).await?.into_iter().collect();
589 let missing: Vec<&str> = deps
590 .iter()
591 .filter_map(Value::as_str)
592 .filter(|dep| !installed_ids.contains(*dep))
593 .collect();
594 if !missing.is_empty() {
595 return Err(AppError::BadRequest(format!(
596 "package '{}' depends on [{}] which are not installed; install them first",
597 id,
598 missing.join(", ")
599 )));
600 }
601 }
602
603 let incoming_version = manifest_obj
604 .get("version")
605 .and_then(Value::as_str)
606 .unwrap_or("");
607
608 let is_upgrade = if let Some(existing) = get_package(config_pool, id).await? {
610 if existing.semantic_version.as_deref() == Some(incoming_version) {
611 return Err(AppError::Conflict(format!(
612 "package '{}' version '{}' is already installed",
613 id, incoming_version
614 )));
615 }
616 true
617 } else {
618 false
619 };
620
621 let old_config = if is_upgrade {
622 Some(
623 load_from_pool(config_pool, id)
624 .await
625 .map_err(AppError::Config)?,
626 )
627 } else {
628 None
629 };
630
631 let schemas_body: Vec<Value> = vec![serde_json::json!({
632 "id": DEFAULT_SCHEMA_ID,
633 "name": schema_name
634 })];
635
636 let apply_order = config_apply_order();
640 let mut bodies: Vec<(&'static str, Vec<Value>)> = Vec::with_capacity(apply_order.len());
641 for kind in &apply_order {
642 let body: Vec<Value> = if *kind == "schemas" {
643 schemas_body.clone()
644 } else {
645 let mut body = read_kind_from_zip(&mut archive, kind)?;
646 match *kind {
647 "enums" | "tables" | "indexes" => inject_schema_id(&mut body, DEFAULT_SCHEMA_ID),
648 "relationships" => inject_relationship_schema_ids(&mut body, DEFAULT_SCHEMA_ID),
649 _ => {}
650 }
651 body
652 };
653 bodies.push((*kind, body));
654 }
655
656 let config = assemble_config(&bodies)?;
659 let new_model = resolve(&config)
660 .map_err(AppError::Config)?
661 .with_package_id(id);
662
663 reject_asset_columns_without_storage(&config, &state.storage)?;
665
666 let tenant_outcomes = broadcast_ddl(
669 &state,
670 config_pool,
671 &config,
672 old_config.as_ref(),
673 id,
674 old_config
675 .as_ref()
676 .and_then(|_| manifest_value.get("version").and_then(Value::as_str)),
677 incoming_version,
678 )
679 .await;
680
681 if !is_upgrade {
685 let failures: Vec<String> = tenant_outcomes
686 .iter()
687 .filter(|o| o.status == "failed")
688 .map(|o| format!("{}: {}", o.target, o.error.clone().unwrap_or_default()))
689 .collect();
690 if !failures.is_empty() {
691 return Err(AppError::BadRequest(format!(
692 "package '{}' installation failed during schema creation; no configuration was \
693 saved. Errors: {}",
694 id,
695 failures.join("; ")
696 )));
697 }
698 }
699
700 let mut applied = Vec::with_capacity(bodies.len());
702 for (kind, body) in bodies {
703 replace_config(config_pool, kind, body, false, id, None).await?;
704 applied.push(kind.to_string());
705 }
706 upsert_package(config_pool, id, &manifest_value).await?;
707
708 let migration_warnings: Vec<String> = tenant_outcomes
709 .iter()
710 .flat_map(|o| o.warnings.iter().cloned())
711 .collect();
712
713 {
715 let mut model_guard = state
716 .model
717 .write()
718 .map_err(|_| AppError::BadRequest("state lock".into()))?;
719 *model_guard = new_model.clone();
720 let mut pkg_guard = state
721 .package_models
722 .write()
723 .map_err(|_| AppError::BadRequest("state lock".into()))?;
724 pkg_guard.insert(id.to_string(), new_model.clone());
726 for (tid, _) in state.tenant_registry.database_tenant_targets() {
728 pkg_guard.insert(format!("{}:{}", id, tid), new_model.clone());
729 }
730 pkg_guard.insert(package_cache_key, new_model);
732 }
733
734 #[derive(serde::Serialize)]
735 struct PackageInstallResponse {
736 package: Value,
737 applied: Vec<String>,
738 warnings: Vec<String>,
739 tenant_migrations: Vec<TenantMigrationOutcome>,
741 }
742 Ok((
743 axum::http::StatusCode::OK,
744 Json(crate::response::SuccessOne {
745 data: PackageInstallResponse {
746 package: manifest_value,
747 applied,
748 warnings: migration_warnings,
749 tenant_migrations: tenant_outcomes,
750 },
751 meta: None,
752 }),
753 ))
754}
755
756#[derive(Deserialize)]
757pub struct UninstallPath {
758 pub package_id: String,
759}
760
761pub async fn uninstall_package(
763 TenantId(tenant_id_opt): TenantId,
764 State(state): State<AppState>,
765 Path(UninstallPath { package_id }): Path<UninstallPath>,
766) -> Result<impl axum::response::IntoResponse, AppError> {
767 let tenant_id = tenant_id_opt
768 .as_deref()
769 .filter(|s| !s.is_empty())
770 .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
771
772 let ctx = resolve_tenant_context(&state, Some(tenant_id), Some(&package_id)).await?;
773 let config_pool = ctx.config_pool();
774 let migration_pool = ctx.migration_pool();
775 let schema_override = ctx.schema_override();
776 let package_cache_key = ctx.package_cache_key().to_string();
777
778 let installed = list_package_ids(config_pool).await?;
779 if !installed.contains(&package_id) {
780 return Err(AppError::NotFound(format!(
781 "package not found: {}",
782 package_id
783 )));
784 }
785
786 let all_packages = list_packages(config_pool).await?;
788 let dependents: Vec<String> = all_packages
789 .iter()
790 .filter(|row| row.id != package_id)
791 .filter(|row| {
792 row.payload
793 .get("dependencies")
794 .and_then(Value::as_array)
795 .map(|deps| deps.iter().any(|d| d.as_str() == Some(package_id.as_str())))
796 .unwrap_or(false)
797 })
798 .map(|row| row.id.clone())
799 .collect();
800 if !dependents.is_empty() {
801 return Err(AppError::Conflict(format!(
802 "cannot uninstall '{}': packages [{}] depend on it; uninstall them first",
803 package_id,
804 dependents.join(", ")
805 )));
806 }
807
808 let config = load_from_pool(config_pool, &package_id)
809 .await
810 .map_err(AppError::Config)?;
811 revert_migrations(migration_pool, &config, schema_override).await?;
812 delete_package_and_config(config_pool, &package_id).await?;
813
814 {
815 state
816 .package_models
817 .write()
818 .map_err(|_| AppError::BadRequest("state lock".into()))?
819 .remove(&package_cache_key);
820 }
821
822 if std::ptr::eq(&state.pool as *const _, config_pool as *const _) {
824 let _ = reload_model(&state).await;
825 }
826
827 #[derive(serde::Serialize)]
828 struct UninstallResponse {
829 package_id: String,
830 }
831 Ok((
832 axum::http::StatusCode::OK,
833 Json(crate::response::SuccessOne {
834 data: UninstallResponse { package_id },
835 meta: None,
836 }),
837 ))
838}
839
840async fn package_detail_data(
842 pool: &Pool,
843 package_id: &str,
844) -> Result<Value, crate::error::AppError> {
845 use crate::handlers::config::get_config;
846
847 let (schemas, enums, tables, columns, indexes, relationships, api_entities, kv_stores) = tokio::try_join!(
848 get_config(pool, "schemas", package_id),
849 get_config(pool, "enums", package_id),
850 get_config(pool, "tables", package_id),
851 get_config(pool, "columns", package_id),
852 get_config(pool, "indexes", package_id),
853 get_config(pool, "relationships", package_id),
854 get_config(pool, "api_entities", package_id),
855 get_config(pool, "kv_stores", package_id),
856 )?;
857
858 Ok(json!({
859 "stats": {
860 "schemas": schemas.len(),
861 "enums": enums.len(),
862 "tables": tables.len(),
863 "columns": columns.len(),
864 "indexes": indexes.len(),
865 "relationships": relationships.len(),
866 "apiEntities": api_entities.len(),
867 "kvStores": kv_stores.len(),
868 },
869 "schemas": schemas,
870 "enums": enums,
871 "tables": tables,
872 "columns": columns,
873 "indexes": indexes,
874 "relationships": relationships,
875 "apiEntities": api_entities,
876 "kvStores": kv_stores,
877 }))
878}
879
880pub async fn list_packages_handler(
882 State(state): State<AppState>,
883) -> Result<impl axum::response::IntoResponse, crate::error::AppError> {
884 let packages = list_packages(&state.pool).await?;
885
886 let mut items: Vec<Value> = Vec::with_capacity(packages.len());
887 for pkg in packages {
888 let (schemas, enums, tables, columns, indexes, relationships, api_entities, kv_stores) = tokio::try_join!(
889 count_package_kind(&state.pool, "schemas", &pkg.id),
890 count_package_kind(&state.pool, "enums", &pkg.id),
891 count_package_kind(&state.pool, "tables", &pkg.id),
892 count_package_kind(&state.pool, "columns", &pkg.id),
893 count_package_kind(&state.pool, "indexes", &pkg.id),
894 count_package_kind(&state.pool, "relationships", &pkg.id),
895 count_package_kind(&state.pool, "api_entities", &pkg.id),
896 count_package_kind(&state.pool, "kv_stores", &pkg.id),
897 )?;
898
899 let name = pkg
900 .payload
901 .get("name")
902 .and_then(Value::as_str)
903 .map(String::from);
904 let version = pkg
905 .payload
906 .get("version")
907 .and_then(Value::as_str)
908 .map(String::from);
909 let schema = pkg
910 .payload
911 .get("schema")
912 .and_then(Value::as_str)
913 .map(String::from);
914 let dependencies: Vec<&str> = pkg
915 .payload
916 .get("dependencies")
917 .and_then(Value::as_array)
918 .map(|arr| arr.iter().filter_map(Value::as_str).collect())
919 .unwrap_or_default();
920
921 items.push(json!({
922 "id": pkg.id,
923 "name": name,
924 "version": version,
925 "schema": schema,
926 "installedVersion": pkg.version,
927 "updatedAt": pkg.updated_at,
928 "dependencies": dependencies,
929 "stats": {
930 "schemas": schemas,
931 "enums": enums,
932 "tables": tables,
933 "columns": columns,
934 "indexes": indexes,
935 "relationships": relationships,
936 "apiEntities": api_entities,
937 "kvStores": kv_stores,
938 },
939 }));
940 }
941
942 let count = items.len() as u64;
943 Ok((
944 axum::http::StatusCode::OK,
945 Json(crate::response::SuccessMany {
946 data: items,
947 meta: crate::response::MetaCount { count },
948 }),
949 ))
950}
951
952#[derive(Deserialize)]
953pub struct PackageIdPath {
954 pub package_id: String,
955}
956
957pub async fn get_package_handler(
959 State(state): State<AppState>,
960 Path(PackageIdPath { package_id }): Path<PackageIdPath>,
961) -> Result<impl axum::response::IntoResponse, crate::error::AppError> {
962 let pkg = get_package(&state.pool, &package_id)
963 .await?
964 .ok_or_else(|| {
965 crate::error::AppError::NotFound(format!("package not found: {}", package_id))
966 })?;
967
968 let name = pkg
969 .payload
970 .get("name")
971 .and_then(Value::as_str)
972 .map(String::from);
973 let version = pkg
974 .payload
975 .get("version")
976 .and_then(Value::as_str)
977 .map(String::from);
978 let schema = pkg
979 .payload
980 .get("schema")
981 .and_then(Value::as_str)
982 .map(String::from);
983
984 let mut detail = package_detail_data(&state.pool, &package_id).await?;
985 let obj = detail.as_object_mut().unwrap();
986 obj.insert("id".into(), json!(pkg.id));
987 obj.insert("name".into(), json!(name));
988 obj.insert("version".into(), json!(version));
989 obj.insert("schema".into(), json!(schema));
990 obj.insert("installedVersion".into(), json!(pkg.version));
991 obj.insert("updatedAt".into(), json!(pkg.updated_at));
992 obj.insert("manifest".into(), pkg.payload);
993
994 Ok((
995 axum::http::StatusCode::OK,
996 Json(crate::response::SuccessOne {
997 data: detail,
998 meta: None,
999 }),
1000 ))
1001}
1002
1003pub async fn preview_migration_handler(
1010 TenantId(tenant_id_opt): TenantId,
1011 State(state): State<AppState>,
1012 mut multipart: Multipart,
1013) -> Result<impl axum::response::IntoResponse, AppError> {
1014 let tenant_id = tenant_id_opt
1015 .as_deref()
1016 .filter(|s| !s.is_empty())
1017 .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
1018
1019 let mut zip_bytes_raw: Option<Vec<u8>> = None;
1020 while let Ok(Some(field)) = multipart.next_field().await {
1021 let name = field.name().unwrap_or("").to_string();
1022 if name == "file" || name == "package" {
1023 let data = field
1024 .bytes()
1025 .await
1026 .map_err(|e| AppError::BadRequest(e.to_string()))?;
1027 zip_bytes_raw = Some(data.to_vec());
1028 break;
1029 }
1030 }
1031 let zip_bytes = zip_bytes_raw
1032 .ok_or_else(|| AppError::BadRequest("missing 'file' or 'package' field".into()))?;
1033
1034 let mut archive = ZipArchive::new(Cursor::new(zip_bytes.clone()))
1035 .map_err(|e| AppError::BadRequest(format!("invalid zip: {}", e)))?;
1036
1037 let manifest_name = archive
1038 .file_names()
1039 .find(|n| *n == "manifest.json" || n.ends_with("/manifest.json"))
1040 .map(String::from)
1041 .ok_or_else(|| AppError::BadRequest("zip must contain manifest.json".into()))?;
1042
1043 let manifest_value: Value = {
1044 let mut file = archive
1045 .by_name(&manifest_name)
1046 .map_err(|e| AppError::BadRequest(e.to_string()))?;
1047 let mut buf = String::new();
1048 std::io::Read::read_to_string(&mut file, &mut buf)
1049 .map_err(|e| AppError::BadRequest(e.to_string()))?;
1050 serde_json::from_str(&buf)
1051 .map_err(|e| AppError::BadRequest(format!("invalid manifest.json: {}", e)))?
1052 };
1053 let manifest_obj = manifest_value
1054 .as_object()
1055 .ok_or_else(|| AppError::BadRequest("manifest.json must be an object".into()))?;
1056
1057 let id = manifest_obj
1058 .get("id")
1059 .and_then(Value::as_str)
1060 .ok_or_else(|| AppError::BadRequest("manifest must have 'id'".into()))?;
1061 let incoming_version = manifest_obj
1062 .get("version")
1063 .and_then(Value::as_str)
1064 .unwrap_or("");
1065 let schema_name = manifest_obj
1066 .get("schema")
1067 .and_then(Value::as_str)
1068 .ok_or_else(|| AppError::BadRequest("manifest must have 'schema'".into()))?;
1069
1070 let existing = get_package(&state.pool, id).await?.ok_or_else(|| {
1071 AppError::NotFound(format!(
1072 "package '{}' is not installed — preview is only for upgrades",
1073 id
1074 ))
1075 })?;
1076
1077 if existing.semantic_version.as_deref() == Some(incoming_version) {
1078 return Err(AppError::Conflict(format!(
1079 "package '{}' version '{}' is already installed",
1080 id, incoming_version
1081 )));
1082 }
1083
1084 let from_version = existing.semantic_version.clone();
1085 let ctx = resolve_tenant_context(&state, Some(tenant_id), Some(id)).await?;
1086 let config_pool = ctx.config_pool();
1087
1088 let old_config = load_from_pool(config_pool, id)
1089 .await
1090 .map_err(AppError::Config)?;
1091
1092 let schemas_body = vec![serde_json::json!({ "id": DEFAULT_SCHEMA_ID, "name": schema_name })];
1094 let config_kinds = [
1095 "schemas",
1096 "enums",
1097 "tables",
1098 "columns",
1099 "indexes",
1100 "relationships",
1101 "api_entities",
1102 "kv_stores",
1103 ];
1104 let mut all_values: std::collections::HashMap<String, Vec<Value>> =
1105 std::collections::HashMap::new();
1106 for kind in &config_kinds {
1107 let body: Vec<Value> = if *kind == "schemas" {
1108 serde_json::from_value(Value::Array(schemas_body.clone())).unwrap_or_default()
1109 } else {
1110 let mut body = read_kind_from_zip(&mut archive, kind).unwrap_or_default();
1111 match *kind {
1112 "enums" | "tables" | "indexes" => inject_schema_id(&mut body, DEFAULT_SCHEMA_ID),
1113 "relationships" => inject_relationship_schema_ids(&mut body, DEFAULT_SCHEMA_ID),
1114 _ => {}
1115 }
1116 body
1117 };
1118 all_values.insert(kind.to_string(), body);
1119 }
1120
1121 let new_config = build_full_config_from_values(&all_values)?;
1123
1124 let plan = compute_migration_plan(
1125 &old_config,
1126 &new_config,
1127 ctx.schema_override(),
1128 ctx.rls_tenant_column(),
1129 state.dialect.as_ref(),
1130 &std::collections::HashMap::new(),
1131 )
1132 .map_err(|e| AppError::BadRequest(format!("migration plan error: {}", e)))?;
1133
1134 let summary = plan.summary();
1135 let plan_json = serde_json::to_value(&plan).map_err(|e| AppError::BadRequest(e.to_string()))?;
1136 let migration_id = Uuid::new_v4().to_string();
1137
1138 save_migration_plan(
1139 config_pool,
1140 &migration_id,
1141 id,
1142 tenant_id,
1143 from_version.as_deref(),
1144 incoming_version,
1145 &plan_json,
1146 &zip_bytes,
1147 )
1148 .await?;
1149
1150 Ok((
1151 axum::http::StatusCode::OK,
1152 Json(crate::response::SuccessOne {
1153 data: json!({
1154 "migration_id": migration_id,
1155 "package_id": id,
1156 "from_version": from_version,
1157 "to_version": incoming_version,
1158 "expires_in_hours": 24,
1159 "summary": {
1160 "total": summary.total,
1161 "safe": summary.safe,
1162 "best_effort": summary.best_effort,
1163 "warn_only": summary.warn_only,
1164 },
1165 "steps": plan.steps,
1166 }),
1167 meta: None,
1168 }),
1169 ))
1170}
1171
1172#[derive(Deserialize)]
1173pub struct MigrationIdPath {
1174 pub migration_id: String,
1175}
1176
1177pub async fn apply_migration_handler(
1182 TenantId(tenant_id_opt): TenantId,
1183 State(state): State<AppState>,
1184 Path(MigrationIdPath { migration_id }): Path<MigrationIdPath>,
1185) -> Result<impl axum::response::IntoResponse, AppError> {
1186 let tenant_id = tenant_id_opt
1187 .as_deref()
1188 .filter(|s| !s.is_empty())
1189 .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
1190
1191 let row = get_migration_plan(&state.pool, &migration_id)
1192 .await?
1193 .ok_or_else(|| {
1194 AppError::NotFound(format!("migration plan '{}' not found", migration_id))
1195 })?;
1196
1197 if row.status == "applied" {
1198 return Err(AppError::Conflict(format!(
1199 "migration plan '{}' has already been applied",
1200 migration_id
1201 )));
1202 }
1203 if row.status != "pending" {
1204 return Err(AppError::BadRequest(format!(
1205 "migration plan '{}' has status '{}' and cannot be applied",
1206 migration_id, row.status
1207 )));
1208 }
1209
1210 let now = chrono::Utc::now();
1211 if now > row.expires_at {
1212 return Err(AppError::BadRequest(format!(
1213 "migration plan '{}' expired at {} — re-run preview to generate a new plan",
1214 migration_id, row.expires_at
1215 )));
1216 }
1217
1218 if row.tenant_id != tenant_id {
1219 return Err(AppError::BadRequest(format!(
1220 "migration plan '{}' was created for tenant '{}', not '{}'",
1221 migration_id, row.tenant_id, tenant_id
1222 )));
1223 }
1224
1225 let plan: MigrationPlan = serde_json::from_value(row.plan_json.clone())
1226 .map_err(|e| AppError::BadRequest(format!("corrupted migration plan: {}", e)))?;
1227
1228 let ctx = resolve_tenant_context(&state, Some(tenant_id), Some(&row.package_id)).await?;
1229 let config_pool = ctx.config_pool();
1230 let migration_pool = ctx.migration_pool();
1231 let package_cache_key = ctx.package_cache_key().to_string();
1232
1233 let mut archive = ZipArchive::new(Cursor::new(row.zip_bytes.clone()))
1235 .map_err(|e| AppError::BadRequest(format!("stored zip corrupted: {}", e)))?;
1236
1237 let manifest_name = archive
1238 .file_names()
1239 .find(|n| *n == "manifest.json" || n.ends_with("/manifest.json"))
1240 .map(String::from)
1241 .ok_or_else(|| AppError::BadRequest("stored zip missing manifest.json".into()))?;
1242
1243 let manifest_value: Value = {
1244 let mut file = archive
1245 .by_name(&manifest_name)
1246 .map_err(|e| AppError::BadRequest(e.to_string()))?;
1247 let mut buf = String::new();
1248 std::io::Read::read_to_string(&mut file, &mut buf)
1249 .map_err(|e| AppError::BadRequest(e.to_string()))?;
1250 serde_json::from_str(&buf)
1251 .map_err(|e| AppError::BadRequest(format!("invalid manifest: {}", e)))?
1252 };
1253 let schema_name = manifest_value
1254 .get("schema")
1255 .and_then(Value::as_str)
1256 .ok_or_else(|| AppError::BadRequest("manifest missing 'schema'".into()))?;
1257
1258 let schemas_body = vec![serde_json::json!({ "id": DEFAULT_SCHEMA_ID, "name": schema_name })];
1259 let apply_order = config_apply_order();
1260 for kind in &apply_order {
1261 let body: Vec<Value> = if *kind == "schemas" {
1262 serde_json::from_value(Value::Array(schemas_body.clone()))
1263 .map_err(|e| AppError::BadRequest(format!("schemas body: {}", e)))?
1264 } else {
1265 let mut body = read_kind_from_zip(&mut archive, kind)?;
1266 match *kind {
1267 "enums" | "tables" | "indexes" => inject_schema_id(&mut body, DEFAULT_SCHEMA_ID),
1268 "relationships" => inject_relationship_schema_ids(&mut body, DEFAULT_SCHEMA_ID),
1269 _ => {}
1270 }
1271 body
1272 };
1273 replace_config(config_pool, kind, body, false, &row.package_id, None).await?;
1274 }
1275 upsert_package(config_pool, &row.package_id, &manifest_value).await?;
1276
1277 let claimed = mark_migration_plan_applied(config_pool, &migration_id).await?;
1279 if !claimed {
1280 return Err(AppError::Conflict(format!(
1281 "migration plan '{}' was applied by a concurrent request",
1282 migration_id
1283 )));
1284 }
1285
1286 let result = execute_migration_plan(
1288 migration_pool,
1289 config_pool,
1290 &plan,
1291 &migration_id,
1292 &row.package_id,
1293 tenant_id,
1294 row.from_version.as_deref(),
1295 &row.to_version,
1296 )
1297 .await?;
1298
1299 let new_config = load_from_pool(config_pool, &row.package_id)
1301 .await
1302 .map_err(AppError::Config)?;
1303 let new_model = resolve(&new_config)
1304 .map_err(AppError::Config)?
1305 .with_package_id(&row.package_id);
1306 {
1307 let mut guard = state
1308 .model
1309 .write()
1310 .map_err(|_| AppError::BadRequest("state lock".into()))?;
1311 *guard = new_model.clone();
1312 state
1313 .package_models
1314 .write()
1315 .map_err(|_| AppError::BadRequest("state lock".into()))?
1316 .insert(package_cache_key, new_model);
1317 }
1318
1319 Ok((
1320 axum::http::StatusCode::OK,
1321 Json(crate::response::SuccessOne {
1322 data: json!({
1323 "migration_id": migration_id,
1324 "package_id": row.package_id,
1325 "from_version": row.from_version,
1326 "to_version": row.to_version,
1327 "steps_applied": result.applied,
1328 "steps_warned": result.warned,
1329 "warnings": result.warnings,
1330 }),
1331 meta: None,
1332 }),
1333 ))
1334}
1335
1336pub async fn bootstrap_tenant_handler(
1349 TenantId(tenant_id_opt): TenantId,
1350 State(state): State<AppState>,
1351 Path(PackageIdPath { package_id }): Path<PackageIdPath>,
1352) -> Result<impl axum::response::IntoResponse, AppError> {
1353 let tenant_id = tenant_id_opt
1354 .as_deref()
1355 .filter(|s| !s.is_empty())
1356 .ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
1357
1358 let entry = state
1359 .tenant_registry
1360 .get(tenant_id)
1361 .ok_or_else(|| AppError::NotFound(format!("tenant not found: {}", tenant_id)))?;
1362
1363 if !matches!(entry.strategy, TenantStrategy::Database) {
1366 return Err(AppError::BadRequest(
1367 "bootstrap only applies to Database-strategy tenants; RLS tenants share the central DB which is migrated by install_package".into(),
1368 ));
1369 }
1370
1371 let database_url = entry.database_url.as_deref().ok_or_else(|| {
1372 AppError::BadRequest(format!("tenant {}: missing database_url", tenant_id))
1373 })?;
1374
1375 let _ = get_package(&state.pool, &package_id)
1377 .await?
1378 .ok_or_else(|| AppError::NotFound(format!("package '{}' is not installed", package_id)))?;
1379
1380 let config = load_from_pool(&state.pool, &package_id)
1381 .await
1382 .map_err(AppError::Config)?;
1383
1384 let pool = get_or_create_tenant_pool(&state, tenant_id, database_url).await?;
1385
1386 apply_migrations(
1388 &pool,
1389 &config,
1390 None,
1391 None,
1392 state.dialect.as_ref(),
1393 &std::collections::HashMap::new(),
1394 )
1395 .await?;
1396
1397 let model = crate::config::resolve(&config)
1399 .map_err(AppError::Config)?
1400 .with_package_id(&package_id);
1401 {
1402 state
1403 .package_models
1404 .write()
1405 .map_err(|_| AppError::BadRequest("state lock".into()))?
1406 .insert(format!("{}:{}", package_id, tenant_id), model);
1407 }
1408
1409 Ok((
1410 axum::http::StatusCode::OK,
1411 Json(crate::response::SuccessOne {
1412 data: serde_json::json!({
1413 "tenant_id": tenant_id,
1414 "package_id": package_id,
1415 "status": "bootstrapped",
1416 }),
1417 meta: None,
1418 }),
1419 ))
1420}
1421
1422fn build_full_config_from_values(
1426 values: &std::collections::HashMap<String, Vec<Value>>,
1427) -> Result<crate::config::FullConfig, AppError> {
1428 fn parse_kind<T: serde::de::DeserializeOwned>(
1429 values: &std::collections::HashMap<String, Vec<Value>>,
1430 key: &str,
1431 ) -> Result<Vec<T>, AppError> {
1432 let arr = values.get(key).cloned().unwrap_or_default();
1433 arr.into_iter()
1434 .map(|v| {
1435 serde_json::from_value(v)
1436 .map_err(|e| AppError::BadRequest(format!("{} parse error: {}", key, e)))
1437 })
1438 .collect()
1439 }
1440
1441 Ok(crate::config::FullConfig {
1442 schemas: parse_kind(values, "schemas")?,
1443 enums: parse_kind(values, "enums")?,
1444 tables: parse_kind(values, "tables")?,
1445 columns: parse_kind(values, "columns")?,
1446 indexes: parse_kind(values, "indexes")?,
1447 relationships: parse_kind(values, "relationships")?,
1448 api_entities: parse_kind(values, "api_entities")?,
1449 kv_stores: parse_kind(values, "kv_stores")?,
1450 })
1451}