1use crate::config::types::ColumnTypeConfig;
26use crate::config::ResolvedEntity;
27use crate::db::{parse_canonical, CanonicalType, Dialect};
28use crate::error::AppError;
29use crate::store::qualified_sys_table;
30use serde::Deserialize;
31use serde_json::Value;
32use std::collections::HashMap;
33
34pub const REGISTRY_NAMESPACE: &str = "__extensible_fields__";
36
37pub const REGISTRY_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(60);
41
42#[derive(Clone)]
44pub struct CachedRegistry {
45 pub registry: ExtensibleRegistry,
46 pub loaded_at: std::time::Instant,
47}
48
49pub type RegistryCache = std::sync::Arc<
52 std::sync::RwLock<std::collections::HashMap<(String, String, String), CachedRegistry>>,
53>;
54
55fn default_true() -> bool {
56 true
57}
58
59#[derive(Clone, Debug, Deserialize)]
61#[serde(rename_all = "camelCase")]
62pub struct ExtensibleFieldDef {
63 pub key: String,
65 #[serde(rename = "type")]
68 pub type_: String,
69 #[serde(default)]
71 pub required: bool,
72 #[serde(default = "default_true")]
74 pub filterable: bool,
75 #[serde(default = "default_true")]
77 pub sortable: bool,
78 #[serde(default)]
80 pub allowed: Option<Vec<Value>>,
81 #[serde(default)]
83 pub min: Option<f64>,
84 #[serde(default)]
86 pub max: Option<f64>,
87 #[serde(default)]
89 pub max_length: Option<u32>,
90 #[serde(default)]
92 pub min_length: Option<u32>,
93 #[serde(default)]
95 pub pattern: Option<String>,
96}
97
98impl ExtensibleFieldDef {
99 pub fn canonical(&self) -> CanonicalType {
101 parse_canonical(&ColumnTypeConfig::Simple(self.type_.clone()))
102 }
103}
104
105#[derive(Clone, Debug, Default)]
107pub struct ExtensibleRegistry {
108 bags: HashMap<String, HashMap<String, ExtensibleFieldDef>>,
109}
110
111impl ExtensibleRegistry {
112 pub fn is_empty(&self) -> bool {
114 self.bags.is_empty()
115 }
116
117 pub fn bag(&self, column: &str) -> Option<&HashMap<String, ExtensibleFieldDef>> {
119 self.bags.get(column)
120 }
121
122 pub fn field(&self, column: &str, key: &str) -> Option<&ExtensibleFieldDef> {
124 self.bags.get(column).and_then(|b| b.get(key))
125 }
126
127 pub fn from_value(v: Value) -> Result<Self, AppError> {
129 let raw: HashMap<String, Vec<ExtensibleFieldDef>> =
130 serde_json::from_value(v).map_err(|e| {
131 AppError::Validation(format!("invalid extensible-fields registry: {}", e))
132 })?;
133 let mut bags = HashMap::new();
134 for (column, defs) in raw {
135 let mut by_key = HashMap::new();
136 for def in defs {
137 by_key.insert(def.key.clone(), def);
138 }
139 bags.insert(column, by_key);
140 }
141 Ok(ExtensibleRegistry { bags })
142 }
143}
144
145pub async fn load_registry(
150 pool: &crate::db::pool::Pool,
151 dialect: &dyn Dialect,
152 tenant_id: &str,
153 package_id: &str,
154 path_segment: &str,
155) -> Result<ExtensibleRegistry, AppError> {
156 let q_table = qualified_sys_table("_sys_kv_data");
157 let sql = format!(
158 "SELECT value FROM {} WHERE tenant_id = {} AND package_id = {} AND namespace = {} AND key = {}",
159 q_table,
160 dialect.placeholder(1),
161 dialect.placeholder(2),
162 dialect.placeholder(3),
163 dialect.placeholder(4),
164 );
165 let row: Option<(Value,)> = sqlx::query_as(&sql)
166 .bind(tenant_id)
167 .bind(package_id)
168 .bind(REGISTRY_NAMESPACE)
169 .bind(path_segment)
170 .fetch_optional(pool)
171 .await?;
172 match row {
173 Some((v,)) => ExtensibleRegistry::from_value(v),
174 None => Ok(ExtensibleRegistry::default()),
175 }
176}
177
178pub async fn load_registry_raw(
182 pool: &crate::db::pool::Pool,
183 dialect: &dyn Dialect,
184 tenant_id: &str,
185 package_id: &str,
186 path_segment: &str,
187) -> Result<Option<Value>, AppError> {
188 let q_table = qualified_sys_table("_sys_kv_data");
189 let sql = format!(
190 "SELECT value FROM {} WHERE tenant_id = {} AND package_id = {} AND namespace = {} AND key = {}",
191 q_table,
192 dialect.placeholder(1),
193 dialect.placeholder(2),
194 dialect.placeholder(3),
195 dialect.placeholder(4),
196 );
197 let row: Option<(Value,)> = sqlx::query_as(&sql)
198 .bind(tenant_id)
199 .bind(package_id)
200 .bind(REGISTRY_NAMESPACE)
201 .bind(path_segment)
202 .fetch_optional(pool)
203 .await?;
204 Ok(row.map(|(v,)| v))
205}
206
207pub async fn store_registry(
211 pool: &crate::db::pool::Pool,
212 dialect: &dyn Dialect,
213 tenant_id: &str,
214 package_id: &str,
215 path_segment: &str,
216 value: &Value,
217) -> Result<(), AppError> {
218 let q_table = qualified_sys_table("_sys_kv_data");
219 let now = dialect.now_fn();
220 let (p1, p2, p3, p4, p5) = (
221 dialect.placeholder(1),
222 dialect.placeholder(2),
223 dialect.placeholder(3),
224 dialect.placeholder(4),
225 dialect.placeholder(5),
226 );
227
228 let update_sql = format!(
233 "UPDATE {tbl} SET value = {p1}, updated_at = {now} \
234 WHERE tenant_id = {p2} AND package_id = {p3} AND namespace = {p4} AND key = {p5}",
235 tbl = q_table,
236 );
237 let affected = sqlx::query(&update_sql)
238 .bind(value)
239 .bind(tenant_id)
240 .bind(package_id)
241 .bind(REGISTRY_NAMESPACE)
242 .bind(path_segment)
243 .execute(pool)
244 .await?
245 .rows_affected();
246
247 if affected == 0 {
248 let insert_sql = format!(
249 "INSERT INTO {tbl} (tenant_id, package_id, namespace, key, value, updated_at) \
250 VALUES ({p1}, {p2}, {p3}, {p4}, {p5}, {now})",
251 tbl = q_table,
252 );
253 sqlx::query(&insert_sql)
254 .bind(tenant_id)
255 .bind(package_id)
256 .bind(REGISTRY_NAMESPACE)
257 .bind(path_segment)
258 .bind(value)
259 .execute(pool)
260 .await?;
261 }
262 Ok(())
263}
264
265pub async fn delete_registry(
267 pool: &crate::db::pool::Pool,
268 dialect: &dyn Dialect,
269 tenant_id: &str,
270 package_id: &str,
271 path_segment: &str,
272) -> Result<bool, AppError> {
273 let q_table = qualified_sys_table("_sys_kv_data");
274 let sql = format!(
275 "DELETE FROM {} WHERE tenant_id = {} AND package_id = {} AND namespace = {} AND key = {}",
276 q_table,
277 dialect.placeholder(1),
278 dialect.placeholder(2),
279 dialect.placeholder(3),
280 dialect.placeholder(4),
281 );
282 let result = sqlx::query(&sql)
283 .bind(tenant_id)
284 .bind(package_id)
285 .bind(REGISTRY_NAMESPACE)
286 .bind(path_segment)
287 .execute(pool)
288 .await?;
289 Ok(result.rows_affected() > 0)
290}
291
292pub fn validate_registry_document(
296 value: &Value,
297 extensible_columns: &[String],
298 path_segment: &str,
299) -> Result<ExtensibleRegistry, AppError> {
300 let obj = value.as_object().ok_or_else(|| {
301 AppError::Validation(
302 "registry must be a JSON object mapping column name -> field definitions".into(),
303 )
304 })?;
305 for column in obj.keys() {
306 if !extensible_columns.iter().any(|c| c == column) {
307 return Err(AppError::Validation(format!(
308 "'{}' is not an extensible column on '{}' (declare it with \"extensible\": true)",
309 column, path_segment
310 )));
311 }
312 }
313 ExtensibleRegistry::from_value(value.clone())
315}
316
317pub fn index_ddl(
330 schema: &str,
331 table: &str,
332 registry: &ExtensibleRegistry,
333 dialect: &dyn Dialect,
334 rls_predicate: Option<(&str, &str)>,
335) -> Vec<String> {
336 let qualified = if dialect.supports_schemas() {
337 format!(
338 "{}.{}",
339 dialect.quote_ident(schema),
340 dialect.quote_ident(table)
341 )
342 } else {
343 dialect.quote_ident(table)
344 };
345 let if_not_exists = if dialect.name() == "mysql" {
346 ""
347 } else {
348 "IF NOT EXISTS "
349 };
350 let where_clause = rls_predicate
351 .map(|(col, tid)| {
352 format!(
353 " WHERE {} = '{}'",
354 dialect.quote_ident(col),
355 tid.replace('\'', "''")
356 )
357 })
358 .unwrap_or_default();
359
360 let mut out = Vec::new();
361 let mut columns: Vec<&String> = registry.bags.keys().collect();
363 columns.sort();
364 for column in columns {
365 let bag = ®istry.bags[column];
366 let mut keys: Vec<&String> = bag.keys().collect();
367 keys.sort();
368 for key in keys {
369 let def = &bag[key];
370 if !def.filterable && !def.sortable {
371 continue;
372 }
373 let canonical = def.canonical();
374 let expr = dialect.json_extract_typed(&dialect.quote_ident(column), key, &canonical);
375 let tenant_suffix = rls_predicate.map(|(_, tid)| tid).unwrap_or("");
376 let name = index_name(table, column, key, tenant_suffix);
377 out.push(format!(
378 "CREATE INDEX {}{} ON {} ({}){}",
379 if_not_exists,
380 dialect.quote_ident(&name),
381 qualified,
382 expr,
383 where_clause
384 ));
385 }
386 }
387 out
388}
389
390fn index_name(table: &str, column: &str, key: &str, tenant: &str) -> String {
392 let mut raw = format!("xf_{}_{}_{}", table, column, key);
393 if !tenant.is_empty() {
394 raw.push('_');
395 raw.push_str(tenant);
396 }
397 let mut s: String = raw
398 .chars()
399 .map(|c| {
400 if c.is_ascii_alphanumeric() {
401 c.to_ascii_lowercase()
402 } else {
403 '_'
404 }
405 })
406 .collect();
407 if s.len() > 60 {
409 s.truncate(60);
410 }
411 s
412}
413
414pub async fn apply_indexes(
418 pool: &crate::db::pool::Pool,
419 statements: &[String],
420) -> (Vec<String>, Vec<(String, String)>) {
421 let mut applied = Vec::new();
422 let mut errors = Vec::new();
423 for stmt in statements {
424 match sqlx::query(stmt).execute(pool).await {
425 Ok(_) => applied.push(stmt.clone()),
426 Err(e) => errors.push((stmt.clone(), e.to_string())),
427 }
428 }
429 (applied, errors)
430}
431
432#[derive(Clone, Copy, PartialEq, Eq)]
435pub enum ValidateMode {
436 Full,
437 Partial,
438}
439
440pub fn validate_extensible_fields(
444 body: &HashMap<String, Value>,
445 entity: &ResolvedEntity,
446 registry: &ExtensibleRegistry,
447 mode: ValidateMode,
448) -> Result<(), AppError> {
449 for column in &entity.extensible_columns {
450 let present = body.get(column);
451
452 let obj = match present {
454 Some(Value::Null) | None => None,
455 Some(Value::Object(o)) => Some(o),
456 Some(_) => {
457 return Err(AppError::Validation(format!(
458 "extensible-fields column '{}' must be a JSON object",
459 column
460 )))
461 }
462 };
463
464 let bag = registry.bag(column);
465
466 if obj.is_some_and(|o| !o.is_empty()) && bag.is_none() {
469 return Err(AppError::Validation(format!(
470 "no extensible-field registry declared for column '{}' (namespace '{}', key '{}')",
471 column, REGISTRY_NAMESPACE, entity.path_segment
472 )));
473 }
474
475 if let (Some(o), Some(bag)) = (obj, bag) {
477 for (key, val) in o {
478 let def = bag.get(key).ok_or_else(|| {
479 AppError::Validation(format!("unknown extensible field '{}.{}'", column, key))
480 })?;
481 validate_one(column, def, val)?;
482 }
483 }
484
485 if mode == ValidateMode::Full {
487 if let Some(bag) = bag {
488 for def in bag.values().filter(|d| d.required) {
489 let provided = obj.and_then(|o| o.get(&def.key));
490 if matches!(provided, None | Some(Value::Null)) {
491 return Err(AppError::Validation(format!(
492 "missing required extensible field '{}.{}'",
493 column, def.key
494 )));
495 }
496 }
497 }
498 }
499 }
500 Ok(())
501}
502
503fn validate_one(column: &str, def: &ExtensibleFieldDef, val: &Value) -> Result<(), AppError> {
504 if val.is_null() {
505 return Ok(());
506 }
507 let label = format!("{}.{}", column, def.key);
508 let canonical = def.canonical();
509 let category = crate::db::type_category(&canonical);
510 use crate::db::TypeCategory;
511
512 match category {
513 TypeCategory::Int | TypeCategory::Float => {
514 let n = val.as_f64().ok_or_else(|| {
515 AppError::Validation(format!("extensible field '{}' must be a number", label))
516 })?;
517 if category == TypeCategory::Int && val.as_i64().is_none() && n.fract() != 0.0 {
518 return Err(AppError::Validation(format!(
519 "extensible field '{}' must be an integer",
520 label
521 )));
522 }
523 if let Some(min) = def.min {
524 if n < min {
525 return Err(AppError::Validation(format!(
526 "extensible field '{}' must be >= {}",
527 label, min
528 )));
529 }
530 }
531 if let Some(max) = def.max {
532 if n > max {
533 return Err(AppError::Validation(format!(
534 "extensible field '{}' must be <= {}",
535 label, max
536 )));
537 }
538 }
539 }
540 TypeCategory::Bool => {
541 if !val.is_boolean() {
542 return Err(AppError::Validation(format!(
543 "extensible field '{}' must be a boolean",
544 label
545 )));
546 }
547 }
548 TypeCategory::Text
549 | TypeCategory::Uuid
550 | TypeCategory::Date
551 | TypeCategory::Timestamp
552 | TypeCategory::Time => {
553 let s = val.as_str().ok_or_else(|| {
554 AppError::Validation(format!("extensible field '{}' must be a string", label))
555 })?;
556 if let Some(maxl) = def.max_length {
557 if s.chars().count() > maxl as usize {
558 return Err(AppError::Validation(format!(
559 "extensible field '{}' exceeds max length {}",
560 label, maxl
561 )));
562 }
563 }
564 if let Some(minl) = def.min_length {
565 if s.chars().count() < minl as usize {
566 return Err(AppError::Validation(format!(
567 "extensible field '{}' is shorter than min length {}",
568 label, minl
569 )));
570 }
571 }
572 if let Some(pat) = &def.pattern {
573 let re = regex::Regex::new(pat).map_err(|e| {
574 AppError::Validation(format!(
575 "extensible field '{}' has an invalid pattern: {}",
576 label, e
577 ))
578 })?;
579 if !re.is_match(s) {
580 return Err(AppError::Validation(format!(
581 "extensible field '{}' does not match required pattern",
582 label
583 )));
584 }
585 }
586 }
587 _ => {}
589 }
590
591 if let Some(allowed) = &def.allowed {
592 if !allowed.iter().any(|a| a == val) {
593 return Err(AppError::Validation(format!(
594 "extensible field '{}' has a value that is not allowed",
595 label
596 )));
597 }
598 }
599 Ok(())
600}
601
602#[cfg(test)]
603mod tests {
604 use super::*;
605 use crate::config::resolved::{PkType, ResolvedEntity};
606 use serde_json::json;
607 use std::collections::HashSet;
608
609 fn entity_with_bag(column: &str) -> ResolvedEntity {
610 ResolvedEntity {
611 table_id: "t".into(),
612 schema_name: "s".into(),
613 table_name: "products".into(),
614 path_segment: "products".into(),
615 pk_columns: vec!["id".into()],
616 pk_type: PkType::Uuid,
617 columns: vec![],
618 operations: vec![],
619 sensitive_columns: HashSet::new(),
620 includes: vec![],
621 validation: HashMap::new(),
622 events: vec![],
623 archive_field: None,
624 package_id: "_default".into(),
625 audit_log: false,
626 parent_ref_column: None,
627 versioning: None,
628 mcp: None,
629 extensible_columns: vec![column.into()],
630 }
631 }
632
633 fn registry() -> ExtensibleRegistry {
634 ExtensibleRegistry::from_value(json!({
635 "attributes": [
636 {"key": "warrantyMonths", "type": "int", "min": 0, "required": true},
637 {"key": "energyRating", "type": "text", "maxLength": 3},
638 {"key": "notes", "type": "text", "sortable": false, "filterable": false}
639 ]
640 }))
641 .unwrap()
642 }
643
644 fn body(attrs: Value) -> HashMap<String, Value> {
645 let mut m = HashMap::new();
646 m.insert("attributes".to_string(), attrs);
647 m
648 }
649
650 #[test]
651 fn accepts_valid_extensible_fields() {
652 let e = entity_with_bag("attributes");
653 let reg = registry();
654 let b = body(json!({"warrantyMonths": 24, "energyRating": "A++"}));
655 assert!(validate_extensible_fields(&b, &e, ®, ValidateMode::Full).is_ok());
656 }
657
658 #[test]
659 fn rejects_unknown_key() {
660 let e = entity_with_bag("attributes");
661 let reg = registry();
662 let b = body(json!({"warrantyMonths": 24, "bogus": 1}));
663 let err = validate_extensible_fields(&b, &e, ®, ValidateMode::Partial).unwrap_err();
664 assert!(format!("{:?}", err).contains("unknown extensible field"));
665 }
666
667 #[test]
668 fn rejects_type_mismatch_and_bounds() {
669 let e = entity_with_bag("attributes");
670 let reg = registry();
671 let b = body(json!({"warrantyMonths": "x"}));
673 assert!(validate_extensible_fields(&b, &e, ®, ValidateMode::Partial).is_err());
674 let b = body(json!({"warrantyMonths": -1}));
676 assert!(validate_extensible_fields(&b, &e, ®, ValidateMode::Partial).is_err());
677 let b = body(json!({"energyRating": "TOOLONG"}));
679 assert!(validate_extensible_fields(&b, &e, ®, ValidateMode::Partial).is_err());
680 }
681
682 #[test]
683 fn enforces_required_on_create_only() {
684 let e = entity_with_bag("attributes");
685 let reg = registry();
686 let b = body(json!({"energyRating": "A"}));
687 assert!(validate_extensible_fields(&b, &e, ®, ValidateMode::Full).is_err());
689 assert!(validate_extensible_fields(&b, &e, ®, ValidateMode::Partial).is_ok());
691 }
692
693 #[test]
694 fn rejects_extensible_fields_without_registry() {
695 let e = entity_with_bag("attributes");
696 let empty = ExtensibleRegistry::default();
697 let b = body(json!({"warrantyMonths": 24}));
698 assert!(validate_extensible_fields(&b, &e, &empty, ValidateMode::Partial).is_err());
699 }
700
701 #[test]
702 fn ignores_absent_bag_when_no_required() {
703 let mut e = entity_with_bag("attributes");
704 e.extensible_columns = vec!["other".into()]; let reg = registry();
706 let b: HashMap<String, Value> = HashMap::new();
707 assert!(validate_extensible_fields(&b, &e, ®, ValidateMode::Partial).is_ok());
708 }
709
710 #[test]
713 fn registry_document_accepts_known_columns() {
714 let cols = vec!["attributes".to_string(), "specs".to_string()];
715 let doc = json!({
716 "attributes": [{"key": "warrantyMonths", "type": "int"}],
717 "specs": [{"key": "voltage", "type": "decimal"}]
718 });
719 assert!(validate_registry_document(&doc, &cols, "products").is_ok());
720 }
721
722 #[test]
723 fn registry_document_rejects_unknown_column() {
724 let cols = vec!["attributes".to_string()];
725 let doc = json!({ "not_a_bag": [{"key": "x", "type": "int"}] });
726 let err = validate_registry_document(&doc, &cols, "products").unwrap_err();
727 assert!(format!("{:?}", err).contains("not an extensible column"));
728 }
729
730 #[test]
731 fn registry_document_rejects_non_object() {
732 let cols = vec!["attributes".to_string()];
733 assert!(validate_registry_document(&json!([1, 2, 3]), &cols, "products").is_err());
734 }
735
736 #[test]
737 fn registry_document_rejects_malformed_def() {
738 let cols = vec!["attributes".to_string()];
739 let doc = json!({ "attributes": [{"key": "warrantyMonths"}] });
741 assert!(validate_registry_document(&doc, &cols, "products").is_err());
742 }
743
744 fn index_registry() -> ExtensibleRegistry {
747 ExtensibleRegistry::from_value(json!({
748 "attributes": [
749 {"key": "warrantyMonths", "type": "int", "filterable": true, "sortable": true},
750 {"key": "internalNote", "type": "text", "filterable": false, "sortable": false}
751 ]
752 }))
753 .unwrap()
754 }
755
756 #[test]
757 fn index_ddl_covers_only_queryable_fields() {
758 let dialect = crate::db::active_dialect();
759 let stmts = index_ddl(
760 "main",
761 "products",
762 &index_registry(),
763 dialect.as_ref(),
764 None,
765 );
766 assert_eq!(stmts.len(), 1, "got: {:?}", stmts);
768 assert!(stmts[0].contains("CREATE INDEX"));
769 assert!(stmts[0].contains("warrantyMonths"), "got: {}", stmts[0]);
770 assert!(
771 !stmts[0].contains("internalNote"),
772 "non-queryable field must not be indexed"
773 );
774 }
775
776 #[test]
777 fn index_ddl_adds_partial_predicate_for_rls() {
778 let dialect = crate::db::active_dialect();
779 let stmts = index_ddl(
780 "main",
781 "products",
782 &index_registry(),
783 dialect.as_ref(),
784 Some(("tenant_id", "acme")),
785 );
786 assert_eq!(stmts.len(), 1);
787 assert!(stmts[0].contains("WHERE"), "got: {}", stmts[0]);
788 assert!(stmts[0].contains("acme"), "got: {}", stmts[0]);
789 }
790
791 #[test]
792 fn index_ddl_escapes_tenant_in_predicate() {
793 let dialect = crate::db::active_dialect();
794 let stmts = index_ddl(
795 "main",
796 "products",
797 &index_registry(),
798 dialect.as_ref(),
799 Some(("tenant_id", "a'b")),
800 );
801 assert!(stmts[0].contains("'a''b'"), "got: {}", stmts[0]);
802 }
803
804 #[test]
807 fn registry_cache_insert_get_evict_and_ttl() {
808 let cache: RegistryCache = Default::default();
809 let key = (
810 "acme".to_string(),
811 "_default".to_string(),
812 "products".to_string(),
813 );
814 let entry = CachedRegistry {
815 registry: index_registry(),
816 loaded_at: std::time::Instant::now(),
817 };
818 cache.write().unwrap().insert(key.clone(), entry);
819
820 {
822 let c = cache.read().unwrap();
823 let got = c.get(&key).expect("entry present");
824 assert!(got.loaded_at.elapsed() < REGISTRY_CACHE_TTL);
825 assert!(got.registry.field("attributes", "warrantyMonths").is_some());
826 }
827
828 cache.write().unwrap().remove(&key);
830 assert!(cache.read().unwrap().get(&key).is_none());
831 }
832}