1use crate::storage::schema::Value;
40use crate::storage::unified::{EntityData, UnifiedStore};
41use crate::utils::json::{parse_json, JsonValue};
42
43use std::time::{SystemTime, UNIX_EPOCH};
44
45const REGISTRY_KEY: &str = "red.analytics.schema_registry.entries_json";
46
47#[derive(Debug, Clone, PartialEq)]
49pub struct SchemaEntry {
50 pub event_name: String,
51 pub version: u32,
52 pub schema_json: String,
53 pub registered_at_ms: u128,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57pub enum SchemaError {
58 InvalidSchemaJson(String),
60 InvalidSchemaShape(String),
63 BreakingChange {
68 event_name: String,
69 previous_version: u32,
70 offenders: Vec<BreakingChange>,
71 },
72}
73
74#[derive(Debug, Clone, PartialEq)]
77pub enum BreakingChange {
78 Rename { from: String, to: String },
84 Retype {
86 field: String,
87 from: String,
88 to: String,
89 },
90 Drop { field: String },
92 RequiredAdd { field: String },
96}
97
98impl BreakingChange {
99 pub fn describe(&self) -> String {
101 match self {
102 BreakingChange::Rename { from, to } => format!("renamed field '{from}' to '{to}'"),
103 BreakingChange::Retype { field, from, to } => {
104 format!("retyped field '{field}' from {from} to {to}")
105 }
106 BreakingChange::Drop { field } => format!("dropped field '{field}'"),
107 BreakingChange::RequiredAdd { field } => {
108 format!("required-add for field '{field}'")
109 }
110 }
111 }
112}
113
114#[derive(Debug, Clone, PartialEq)]
115pub enum ValidationError {
116 UnknownEventName(String),
120 InvalidPayloadJson(String),
121 PayloadNotObject,
123 MissingRequiredField {
125 event_name: String,
126 version: u32,
127 field: String,
128 },
129 UnknownField {
133 event_name: String,
134 version: u32,
135 field: String,
136 },
137 TypeMismatch {
139 event_name: String,
140 version: u32,
141 field: String,
142 expected: String,
143 got: String,
144 },
145}
146
147fn now_ms() -> u128 {
148 SystemTime::now()
149 .duration_since(UNIX_EPOCH)
150 .map(|d| d.as_millis())
151 .unwrap_or(0)
152}
153
154fn read_latest_registry_json(store: &UnifiedStore) -> Option<String> {
160 let manager = store.get_collection("red_config")?;
161 let mut all = manager.query_all(|_| true);
162 all.sort_by_key(|b| std::cmp::Reverse(b.id.raw()));
163 for entity in all {
164 let EntityData::Row(row) = &entity.data else {
165 continue;
166 };
167 let Some(named) = &row.named else { continue };
168 let matches = matches!(
169 named.get("key"),
170 Some(Value::Text(s)) if s.as_ref() == REGISTRY_KEY
171 );
172 if matches {
173 if let Some(Value::Text(s)) = named.get("value") {
174 return Some(s.to_string());
175 }
176 }
177 }
178 None
179}
180
181fn load(store: &UnifiedStore) -> Vec<SchemaEntry> {
182 let raw = match read_latest_registry_json(store) {
183 Some(s) => s,
184 None => return Vec::new(),
185 };
186 let Ok(parsed) = parse_json(&raw) else {
187 return Vec::new();
188 };
189 let Some(arr) = parsed.as_array() else {
190 return Vec::new();
191 };
192 let mut out = Vec::with_capacity(arr.len());
193 for item in arr {
194 let Some(obj) = item.as_object() else {
195 continue;
196 };
197 let lookup = |k: &str| obj.iter().find(|(key, _)| key == k).map(|(_, v)| v);
198 let Some(event_name) = lookup("event_name").and_then(JsonValue::as_str) else {
199 continue;
200 };
201 let Some(version) = lookup("version").and_then(JsonValue::as_f64) else {
202 continue;
203 };
204 let Some(schema_json) = lookup("schema_json").and_then(JsonValue::as_str) else {
205 continue;
206 };
207 let Some(registered_at_ms) = lookup("registered_at_ms").and_then(JsonValue::as_f64) else {
208 continue;
209 };
210 out.push(SchemaEntry {
211 event_name: event_name.to_string(),
212 version: version as u32,
213 schema_json: schema_json.to_string(),
214 registered_at_ms: registered_at_ms as u128,
215 });
216 }
217 out
218}
219
220fn entry_to_json(e: &SchemaEntry) -> crate::serde_json::Value {
221 let mut obj = crate::serde_json::Map::new();
222 obj.insert(
223 "event_name".to_string(),
224 crate::serde_json::Value::String(e.event_name.clone()),
225 );
226 obj.insert(
227 "version".to_string(),
228 crate::serde_json::Value::Number(e.version as f64),
229 );
230 obj.insert(
231 "schema_json".to_string(),
232 crate::serde_json::Value::String(e.schema_json.clone()),
233 );
234 obj.insert(
235 "registered_at_ms".to_string(),
236 crate::serde_json::Value::Number(e.registered_at_ms as f64),
237 );
238 crate::serde_json::Value::Object(obj)
239}
240
241fn save(store: &UnifiedStore, entries: &[SchemaEntry]) {
242 let arr = crate::serde_json::Value::Array(entries.iter().map(entry_to_json).collect());
243 let wrapped = crate::serde_json::Value::String(arr.to_string());
247 store.set_config_tree(REGISTRY_KEY, &wrapped);
248}
249
250fn validate_schema_shape(schema_json: &str) -> Result<JsonValue, SchemaError> {
254 let parsed =
255 parse_json(schema_json).map_err(|err| SchemaError::InvalidSchemaJson(err.to_string()))?;
256 let Some(obj) = parsed.as_object() else {
257 return Err(SchemaError::InvalidSchemaShape(
258 "schema must be a JSON object".to_string(),
259 ));
260 };
261 let lookup = |k: &str| obj.iter().find(|(key, _)| key == k).map(|(_, v)| v);
262 match lookup("type").and_then(JsonValue::as_str) {
263 Some("object") => {}
264 Some(other) => {
265 return Err(SchemaError::InvalidSchemaShape(format!(
266 "schema `type` must be \"object\", got \"{other}\""
267 )));
268 }
269 None => {
270 return Err(SchemaError::InvalidSchemaShape(
271 "schema must declare `type`".to_string(),
272 ));
273 }
274 }
275 if let Some(props) = lookup("properties") {
276 if props.as_object().is_none() {
277 return Err(SchemaError::InvalidSchemaShape(
278 "schema `properties` must be an object".to_string(),
279 ));
280 }
281 }
282 if let Some(req) = lookup("required") {
283 let Some(arr) = req.as_array() else {
284 return Err(SchemaError::InvalidSchemaShape(
285 "schema `required` must be an array of strings".to_string(),
286 ));
287 };
288 for item in arr {
289 if item.as_str().is_none() {
290 return Err(SchemaError::InvalidSchemaShape(
291 "schema `required` must be an array of strings".to_string(),
292 ));
293 }
294 }
295 }
296 Ok(parsed)
297}
298
299pub fn register(
307 store: &UnifiedStore,
308 event_name: &str,
309 schema_json: &str,
310) -> Result<u32, SchemaError> {
311 let candidate = validate_schema_shape(schema_json)?;
312 let mut entries = load(store);
313
314 let previous = entries
315 .iter()
316 .filter(|e| e.event_name == event_name)
317 .max_by_key(|e| e.version)
318 .cloned();
319
320 let next_version = match previous {
321 None => 1,
322 Some(prev) => {
323 let prev_schema = parse_json(&prev.schema_json).map_err(|e| {
324 SchemaError::InvalidSchemaShape(format!(
325 "previously registered schema for {event_name} v{} is corrupt: {e}",
326 prev.version
327 ))
328 })?;
329 let offenders = diff_for_breaking_changes(&prev_schema, &candidate);
330 if !offenders.is_empty() {
331 return Err(SchemaError::BreakingChange {
332 event_name: event_name.to_string(),
333 previous_version: prev.version,
334 offenders,
335 });
336 }
337 prev.version + 1
338 }
339 };
340
341 entries.push(SchemaEntry {
342 event_name: event_name.to_string(),
343 version: next_version,
344 schema_json: schema_json.to_string(),
345 registered_at_ms: now_ms(),
346 });
347 save(store, &entries);
348 Ok(next_version)
349}
350
351fn schema_fields(schema: &JsonValue) -> Vec<(String, String, bool)> {
355 let Some(obj) = schema.as_object() else {
356 return Vec::new();
357 };
358 let properties: &[(String, JsonValue)] = obj
359 .iter()
360 .find(|(k, _)| k == "properties")
361 .and_then(|(_, v)| v.as_object())
362 .unwrap_or(&[]);
363 let required: Vec<&str> = obj
364 .iter()
365 .find(|(k, _)| k == "required")
366 .and_then(|(_, v)| v.as_array())
367 .map(|arr| arr.iter().filter_map(JsonValue::as_str).collect())
368 .unwrap_or_default();
369 properties
370 .iter()
371 .map(|(name, prop)| {
372 let ty = prop
373 .as_object()
374 .and_then(|entries| entries.iter().find(|(k, _)| k == "type"))
375 .and_then(|(_, v)| v.as_str())
376 .unwrap_or("")
377 .to_string();
378 let req = required.contains(&name.as_str());
379 (name.clone(), ty, req)
380 })
381 .collect()
382}
383
384fn diff_for_breaking_changes(prev: &JsonValue, next: &JsonValue) -> Vec<BreakingChange> {
392 let prev_fields = schema_fields(prev);
393 let next_fields = schema_fields(next);
394
395 let mut breaks = Vec::new();
396 let mut dropped: Vec<(String, String)> = Vec::new();
397 let mut added: Vec<(String, String, bool)> = Vec::new();
399
400 for (name, prev_type, prev_required) in &prev_fields {
401 match next_fields.iter().find(|(n, _, _)| n == name) {
402 Some((_, next_type, next_required)) => {
403 if prev_type != next_type && !prev_type.is_empty() && !next_type.is_empty() {
404 breaks.push(BreakingChange::Retype {
405 field: name.clone(),
406 from: prev_type.clone(),
407 to: next_type.clone(),
408 });
409 }
410 if !prev_required && *next_required {
411 breaks.push(BreakingChange::RequiredAdd {
412 field: name.clone(),
413 });
414 }
415 }
416 None => dropped.push((name.clone(), prev_type.clone())),
417 }
418 }
419
420 for (name, next_type, next_required) in &next_fields {
421 if prev_fields.iter().any(|(n, _, _)| n == name) {
422 continue;
423 }
424 added.push((name.clone(), next_type.clone(), *next_required));
425 }
426
427 for (drop_name, drop_type) in dropped {
432 let paired = added
433 .iter()
434 .position(|(_, ty, _)| ty == &drop_type && !drop_type.is_empty());
435 match paired {
436 Some(idx) => {
437 let (add_name, _, _) = added.remove(idx);
438 breaks.push(BreakingChange::Rename {
439 from: drop_name,
440 to: add_name,
441 });
442 }
443 None => breaks.push(BreakingChange::Drop { field: drop_name }),
444 }
445 }
446
447 for (name, _, required) in added {
450 if required {
451 breaks.push(BreakingChange::RequiredAdd { field: name });
452 }
453 }
454
455 breaks
456}
457
458pub fn latest(store: &UnifiedStore, event_name: &str) -> Option<(u32, String)> {
464 let entries = load(store);
465 entries
466 .into_iter()
467 .filter(|e| e.event_name == event_name)
468 .max_by_key(|e| e.version)
469 .map(|e| (e.version, e.schema_json))
470}
471
472pub fn list(store: &UnifiedStore) -> Vec<SchemaEntry> {
475 load(store)
476}
477
478fn json_type_name(v: &JsonValue) -> &'static str {
479 match v {
480 JsonValue::Null => "null",
481 JsonValue::Bool(_) => "boolean",
482 JsonValue::Number(_) => "number",
483 JsonValue::String(_) => "string",
484 JsonValue::Array(_) => "array",
485 JsonValue::Object(_) => "object",
486 }
487}
488
489fn type_matches(expected: &str, got: &JsonValue) -> bool {
490 match expected {
491 "string" => matches!(got, JsonValue::String(_)),
492 "boolean" => matches!(got, JsonValue::Bool(_)),
493 "array" => matches!(got, JsonValue::Array(_)),
494 "object" => matches!(got, JsonValue::Object(_)),
495 "null" => matches!(got, JsonValue::Null),
496 "number" => matches!(got, JsonValue::Number(_)),
497 "integer" => match got {
498 JsonValue::Number(n) => *n == n.trunc(),
499 _ => false,
500 },
501 _ => false,
502 }
503}
504
505pub fn validate(
514 store: &UnifiedStore,
515 event_name: &str,
516 payload_json: &str,
517) -> Result<(), ValidationError> {
518 let Some((version, schema_json)) = latest(store, event_name) else {
519 return Err(ValidationError::UnknownEventName(event_name.to_string()));
520 };
521 let schema = parse_json(&schema_json)
522 .map_err(|e| ValidationError::InvalidPayloadJson(format!("schema corrupt: {e}")))?;
523 let payload =
524 parse_json(payload_json).map_err(|e| ValidationError::InvalidPayloadJson(e.to_string()))?;
525 let Some(payload_obj) = payload.as_object() else {
526 return Err(ValidationError::PayloadNotObject);
527 };
528 let schema_obj = schema.as_object().unwrap_or(&[]);
529 let properties: &[(String, JsonValue)] = schema_obj
530 .iter()
531 .find(|(k, _)| k == "properties")
532 .and_then(|(_, v)| v.as_object())
533 .unwrap_or(&[]);
534 let required: Vec<&str> = schema_obj
535 .iter()
536 .find(|(k, _)| k == "required")
537 .and_then(|(_, v)| v.as_array())
538 .map(|arr| arr.iter().filter_map(JsonValue::as_str).collect())
539 .unwrap_or_default();
540
541 for req in &required {
544 if !payload_obj.iter().any(|(k, _)| k == *req) {
545 return Err(ValidationError::MissingRequiredField {
546 event_name: event_name.to_string(),
547 version,
548 field: (*req).to_string(),
549 });
550 }
551 }
552 for (key, value) in payload_obj {
554 let Some((_, prop)) = properties.iter().find(|(k, _)| k == key) else {
555 return Err(ValidationError::UnknownField {
556 event_name: event_name.to_string(),
557 version,
558 field: key.clone(),
559 });
560 };
561 let expected_type = prop
562 .as_object()
563 .and_then(|entries| entries.iter().find(|(k, _)| k == "type"))
564 .and_then(|(_, v)| v.as_str())
565 .unwrap_or("");
566 if expected_type.is_empty() {
567 continue;
568 }
569 if !type_matches(expected_type, value) {
570 return Err(ValidationError::TypeMismatch {
571 event_name: event_name.to_string(),
572 version,
573 field: key.clone(),
574 expected: expected_type.to_string(),
575 got: json_type_name(value).to_string(),
576 });
577 }
578 }
579 Ok(())
580}
581
582pub fn validation_error_to_reddb(err: ValidationError) -> crate::api::RedDBError {
588 let body = match &err {
589 ValidationError::UnknownEventName(name) => {
590 format!("AnalyticsSchemaError:UnknownEventName:{name}")
591 }
592 ValidationError::InvalidPayloadJson(reason) => {
593 format!("AnalyticsSchemaError:InvalidPayloadJson:{reason}")
594 }
595 ValidationError::PayloadNotObject => "AnalyticsSchemaError:PayloadNotObject".to_string(),
596 ValidationError::MissingRequiredField {
597 event_name,
598 version,
599 field,
600 } => format!("AnalyticsSchemaError:MissingRequiredField:{event_name}:v{version}:{field}"),
601 ValidationError::UnknownField {
602 event_name,
603 version,
604 field,
605 } => format!("AnalyticsSchemaError:UnknownField:{event_name}:v{version}:{field}"),
606 ValidationError::TypeMismatch {
607 event_name,
608 version,
609 field,
610 expected,
611 got,
612 } => format!(
613 "AnalyticsSchemaError:TypeMismatch:{event_name}:v{version}:{field}:{expected}:{got}"
614 ),
615 };
616 crate::api::RedDBError::InvalidOperation(body)
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622
623 fn store() -> UnifiedStore {
624 UnifiedStore::new()
625 }
626
627 const PAGE_VIEW_SCHEMA: &str = r#"{
628 "type": "object",
629 "properties": {
630 "url": {"type": "string"},
631 "user_id": {"type": "integer"}
632 },
633 "required": ["url"]
634 }"#;
635
636 #[test]
637 fn first_registration_is_version_1() {
638 let s = store();
639 let v = register(&s, "page_view", PAGE_VIEW_SCHEMA).expect("register ok");
640 assert_eq!(v, 1);
641 let (latest_v, _) = latest(&s, "page_view").expect("latest present");
642 assert_eq!(latest_v, 1);
643 }
644
645 #[test]
646 fn re_registering_identical_schema_bumps_to_next_version() {
647 let s = store();
651 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
652 let v = register(&s, "page_view", PAGE_VIEW_SCHEMA).expect("identical is additive");
653 assert_eq!(v, 2);
654 }
655
656 const PURCHASE_V1: &str =
659 r#"{"type":"object","properties":{"amount":{"type":"number"}},"required":["amount"]}"#;
660
661 #[test]
662 fn additive_optional_field_is_accepted_as_v2() {
663 let s = store();
664 register(&s, "purchase", PURCHASE_V1).unwrap();
665 let v2 = register(
666 &s,
667 "purchase",
668 r#"{"type":"object",
669 "properties":{"amount":{"type":"number"},
670 "discount_code":{"type":"string"}},
671 "required":["amount"]}"#,
672 )
673 .expect("optional add is additive");
674 assert_eq!(v2, 2);
675 let (latest_v, _) = latest(&s, "purchase").unwrap();
676 assert_eq!(latest_v, 2);
677 }
678
679 #[test]
680 fn additive_optional_field_with_default_is_accepted() {
681 let s = store();
682 register(&s, "purchase", PURCHASE_V1).unwrap();
683 let v2 = register(
684 &s,
685 "purchase",
686 r#"{"type":"object",
687 "properties":{"amount":{"type":"number"},
688 "currency":{"type":"string","default":"USD"}},
689 "required":["amount"]}"#,
690 )
691 .expect("optional add with default is additive");
692 assert_eq!(v2, 2);
693 }
694
695 #[test]
696 fn widening_string_max_length_is_accepted() {
697 let s = store();
698 register(
699 &s,
700 "ev",
701 r#"{"type":"object","properties":{"name":{"type":"string","maxLength":32}},"required":["name"]}"#,
702 )
703 .unwrap();
704 let v2 = register(
705 &s,
706 "ev",
707 r#"{"type":"object","properties":{"name":{"type":"string","maxLength":128}},"required":["name"]}"#,
708 )
709 .expect("widening maxLength is additive");
710 assert_eq!(v2, 2);
711 }
712
713 #[test]
714 fn breaking_rename_is_rejected() {
715 let s = store();
716 register(&s, "purchase", PURCHASE_V1).unwrap();
717 let err = register(
718 &s,
719 "purchase",
720 r#"{"type":"object","properties":{"total":{"type":"number"}},"required":["total"]}"#,
721 )
722 .unwrap_err();
723 match err {
724 SchemaError::BreakingChange {
725 event_name,
726 previous_version,
727 offenders,
728 } => {
729 assert_eq!(event_name, "purchase");
730 assert_eq!(previous_version, 1);
731 assert!(
732 offenders.iter().any(|b| matches!(
733 b,
734 BreakingChange::Rename { from, to }
735 if from == "amount" && to == "total"
736 )),
737 "expected Rename(amount->total), got {offenders:?}"
738 );
739 }
740 other => panic!("expected BreakingChange, got {other:?}"),
741 }
742 }
743
744 #[test]
745 fn breaking_retype_is_rejected() {
746 let s = store();
747 register(&s, "purchase", PURCHASE_V1).unwrap();
748 let err = register(
749 &s,
750 "purchase",
751 r#"{"type":"object","properties":{"amount":{"type":"string"}},"required":["amount"]}"#,
752 )
753 .unwrap_err();
754 let SchemaError::BreakingChange { offenders, .. } = err else {
755 panic!("expected BreakingChange");
756 };
757 assert!(offenders.iter().any(|b| matches!(
758 b,
759 BreakingChange::Retype { field, from, to }
760 if field == "amount" && from == "number" && to == "string"
761 )));
762 }
763
764 #[test]
765 fn breaking_drop_is_rejected() {
766 let s = store();
767 register(
768 &s,
769 "ev",
770 r#"{"type":"object",
771 "properties":{"a":{"type":"number"},"b":{"type":"boolean"}},
772 "required":["a"]}"#,
773 )
774 .unwrap();
775 let err = register(
776 &s,
777 "ev",
778 r#"{"type":"object","properties":{"a":{"type":"number"}},"required":["a"]}"#,
779 )
780 .unwrap_err();
781 let SchemaError::BreakingChange { offenders, .. } = err else {
782 panic!("expected BreakingChange");
783 };
784 assert!(offenders
785 .iter()
786 .any(|b| matches!(b, BreakingChange::Drop { field } if field == "b")));
787 }
788
789 #[test]
790 fn breaking_optional_to_required_is_rejected() {
791 let s = store();
792 register(
793 &s,
794 "ev",
795 r#"{"type":"object",
796 "properties":{"a":{"type":"number"},"b":{"type":"string"}},
797 "required":["a"]}"#,
798 )
799 .unwrap();
800 let err = register(
801 &s,
802 "ev",
803 r#"{"type":"object",
804 "properties":{"a":{"type":"number"},"b":{"type":"string"}},
805 "required":["a","b"]}"#,
806 )
807 .unwrap_err();
808 let SchemaError::BreakingChange { offenders, .. } = err else {
809 panic!("expected BreakingChange");
810 };
811 assert!(offenders
812 .iter()
813 .any(|b| matches!(b, BreakingChange::RequiredAdd { field } if field == "b")));
814 }
815
816 #[test]
817 fn multi_field_break_reports_every_offender() {
818 let s = store();
819 register(
820 &s,
821 "ev",
822 r#"{"type":"object",
823 "properties":{"a":{"type":"number"},
824 "b":{"type":"string"},
825 "c":{"type":"boolean"}},
826 "required":["a"]}"#,
827 )
828 .unwrap();
829 let err = register(
832 &s,
833 "ev",
834 r#"{"type":"object",
835 "properties":{"a":{"type":"string"},
836 "b":{"type":"string"},
837 "d":{"type":"integer"}},
838 "required":["a","d"]}"#,
839 )
840 .unwrap_err();
841 let SchemaError::BreakingChange { offenders, .. } = err else {
842 panic!("expected BreakingChange");
843 };
844 assert!(offenders
845 .iter()
846 .any(|b| matches!(b, BreakingChange::Retype { field, .. } if field == "a")));
847 assert!(offenders
848 .iter()
849 .any(|b| matches!(b, BreakingChange::Drop { field } if field == "c")));
850 assert!(offenders
851 .iter()
852 .any(|b| matches!(b, BreakingChange::RequiredAdd { field } if field == "d")));
853 }
854
855 #[test]
856 fn validate_resolves_to_latest_version_after_evolution() {
857 let s = store();
862 register(&s, "purchase", PURCHASE_V1).unwrap();
863 register(
864 &s,
865 "purchase",
866 r#"{"type":"object",
867 "properties":{"amount":{"type":"number"},
868 "discount_code":{"type":"string"}},
869 "required":["amount"]}"#,
870 )
871 .unwrap();
872 validate(&s, "purchase", r#"{"amount":1.0}"#).expect("v1-shape still valid");
873 validate(&s, "purchase", r#"{"amount":1.0,"discount_code":"X"}"#)
874 .expect("v2-only field accepted");
875 let err = validate(&s, "purchase", r#"{"amount":1.0,"mystery":1}"#).unwrap_err();
876 assert!(matches!(err, ValidationError::UnknownField { version, .. } if version == 2));
877 }
878
879 #[test]
880 fn list_returns_every_version_not_just_latest() {
881 let s = store();
884 register(&s, "purchase", PURCHASE_V1).unwrap();
885 register(
886 &s,
887 "purchase",
888 r#"{"type":"object",
889 "properties":{"amount":{"type":"number"},
890 "discount_code":{"type":"string"}},
891 "required":["amount"]}"#,
892 )
893 .unwrap();
894 let purchase_versions: Vec<u32> = list(&s)
895 .into_iter()
896 .filter(|e| e.event_name == "purchase")
897 .map(|e| e.version)
898 .collect();
899 let mut sorted = purchase_versions.clone();
900 sorted.sort();
901 assert_eq!(
902 sorted,
903 vec![1, 2],
904 "expected both versions, got {purchase_versions:?}"
905 );
906 }
907
908 #[test]
909 fn invalid_schema_json_rejected_at_register() {
910 let s = store();
911 let err = register(&s, "x", "{not json").unwrap_err();
912 assert!(matches!(err, SchemaError::InvalidSchemaJson(_)));
913 }
914
915 #[test]
916 fn schema_must_be_type_object() {
917 let s = store();
918 let err = register(&s, "x", r#"{"type":"string"}"#).unwrap_err();
919 assert!(matches!(err, SchemaError::InvalidSchemaShape(_)));
920 }
921
922 #[test]
923 fn validate_happy_path_accepts_known_fields() {
924 let s = store();
925 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
926 validate(&s, "page_view", r#"{"url":"/x","user_id":42}"#).expect("ok");
927 validate(&s, "page_view", r#"{"url":"/y"}"#).expect("ok without optional");
928 }
929
930 #[test]
931 fn validate_rejects_unknown_field() {
932 let s = store();
933 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
934 let err = validate(&s, "page_view", r#"{"url":"/x","mystery":1}"#).unwrap_err();
935 match err {
936 ValidationError::UnknownField { field, .. } => assert_eq!(field, "mystery"),
937 other => panic!("expected UnknownField, got {other:?}"),
938 }
939 }
940
941 #[test]
942 fn validate_rejects_missing_required_field() {
943 let s = store();
944 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
945 let err = validate(&s, "page_view", r#"{}"#).unwrap_err();
946 match err {
947 ValidationError::MissingRequiredField { field, .. } => assert_eq!(field, "url"),
948 other => panic!("expected MissingRequiredField, got {other:?}"),
949 }
950 }
951
952 #[test]
953 fn validate_rejects_type_mismatch() {
954 let s = store();
955 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
956 let err = validate(&s, "page_view", r#"{"url":123}"#).unwrap_err();
957 match err {
958 ValidationError::TypeMismatch {
959 field,
960 expected,
961 got,
962 ..
963 } => {
964 assert_eq!(field, "url");
965 assert_eq!(expected, "string");
966 assert_eq!(got, "number");
967 }
968 other => panic!("expected TypeMismatch, got {other:?}"),
969 }
970 }
971
972 #[test]
973 fn validate_unknown_event_name() {
974 let s = store();
975 let err = validate(&s, "nope", r#"{}"#).unwrap_err();
976 assert!(matches!(err, ValidationError::UnknownEventName(name) if name == "nope"));
977 }
978
979 #[test]
980 fn validate_payload_must_be_object() {
981 let s = store();
982 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
983 let err = validate(&s, "page_view", r#""hello""#).unwrap_err();
984 assert!(matches!(err, ValidationError::PayloadNotObject));
985 }
986
987 #[test]
988 fn list_returns_every_registered_event() {
989 let s = store();
990 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
991 register(
992 &s,
993 "signup",
994 r#"{"type":"object","properties":{"email":{"type":"string"}},"required":["email"]}"#,
995 )
996 .unwrap();
997 let mut names: Vec<String> = list(&s).into_iter().map(|e| e.event_name).collect();
998 names.sort();
999 assert_eq!(names, vec!["page_view".to_string(), "signup".to_string()]);
1000 assert!(list(&s).iter().all(|e| e.version == 1));
1001 assert!(list(&s).iter().all(|e| e.registered_at_ms > 0));
1002 }
1003
1004 #[test]
1005 fn persistence_smoke_latest_survives_restart() {
1006 let s = store();
1013 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
1014 let raw =
1015 read_latest_registry_json(&s).expect("registry json must be persisted on register");
1016 assert!(raw.contains("page_view"));
1017 let (v, schema) = latest(&s, "page_view").expect("latest after persist");
1020 assert_eq!(v, 1);
1021 assert!(schema.contains("\"url\""));
1022 }
1023
1024 #[test]
1025 fn validation_error_maps_to_invalid_operation_with_typed_marker() {
1026 let err = validation_error_to_reddb(ValidationError::MissingRequiredField {
1027 event_name: "page_view".to_string(),
1028 version: 1,
1029 field: "url".to_string(),
1030 });
1031 match err {
1032 crate::api::RedDBError::InvalidOperation(body) => {
1033 assert!(
1034 body.starts_with("AnalyticsSchemaError:MissingRequiredField:page_view:v1:url"),
1035 "unexpected body: {body}"
1036 );
1037 }
1038 other => panic!("expected InvalidOperation, got {other:?}"),
1039 }
1040 }
1041}