1use crate::storage::schema::Value;
40use crate::storage::unified::{EntityData, UnifiedStore};
41use crate::utils::json::{parse_json, JsonValue};
42
43use std::time::{SystemTime, UNIX_EPOCH};
44
45const REGISTRY_KEY: &str = "red.analytics.schema_registry.entries_json";
46
47#[derive(Debug, Clone, PartialEq)]
49pub struct SchemaEntry {
50 pub event_name: String,
51 pub version: u32,
52 pub schema_json: String,
53 pub registered_at_ms: u128,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57pub enum SchemaError {
58 InvalidSchemaJson(String),
60 InvalidSchemaShape(String),
63 BreakingChange {
68 event_name: String,
69 previous_version: u32,
70 offenders: Vec<BreakingChange>,
71 },
72}
73
74#[derive(Debug, Clone, PartialEq)]
77pub enum BreakingChange {
78 Rename { from: String, to: String },
84 Retype {
86 field: String,
87 from: String,
88 to: String,
89 },
90 Drop { field: String },
92 RequiredAdd { field: String },
96}
97
98impl BreakingChange {
99 pub fn describe(&self) -> String {
101 match self {
102 BreakingChange::Rename { from, to } => format!("renamed field '{from}' to '{to}'"),
103 BreakingChange::Retype { field, from, to } => {
104 format!("retyped field '{field}' from {from} to {to}")
105 }
106 BreakingChange::Drop { field } => format!("dropped field '{field}'"),
107 BreakingChange::RequiredAdd { field } => {
108 format!("required-add for field '{field}'")
109 }
110 }
111 }
112}
113
114#[derive(Debug, Clone, PartialEq)]
115pub enum ValidationError {
116 UnknownEventName(String),
120 InvalidPayloadJson(String),
121 PayloadNotObject,
123 MissingRequiredField {
125 event_name: String,
126 version: u32,
127 field: String,
128 },
129 UnknownField {
133 event_name: String,
134 version: u32,
135 field: String,
136 },
137 TypeMismatch {
139 event_name: String,
140 version: u32,
141 field: String,
142 expected: String,
143 got: String,
144 },
145}
146
147fn now_ms() -> u128 {
148 SystemTime::now()
149 .duration_since(UNIX_EPOCH)
150 .map(|d| d.as_millis())
151 .unwrap_or(0)
152}
153
154fn read_latest_registry_json(store: &UnifiedStore) -> Option<String> {
160 let manager = store.get_collection("red_config")?;
161 let mut all = manager.query_all(|_| true);
162 all.sort_by(|a, b| b.id.raw().cmp(&a.id.raw()));
163 for entity in all {
164 let EntityData::Row(row) = &entity.data else {
165 continue;
166 };
167 let Some(named) = &row.named else { continue };
168 let matches = matches!(
169 named.get("key"),
170 Some(Value::Text(s)) if s.as_ref() == REGISTRY_KEY
171 );
172 if matches {
173 if let Some(Value::Text(s)) = named.get("value") {
174 return Some(s.to_string());
175 }
176 }
177 }
178 None
179}
180
181fn load(store: &UnifiedStore) -> Vec<SchemaEntry> {
182 let raw = match read_latest_registry_json(store) {
183 Some(s) => s,
184 None => return Vec::new(),
185 };
186 let Ok(parsed) = parse_json(&raw) else {
187 return Vec::new();
188 };
189 let Some(arr) = parsed.as_array() else {
190 return Vec::new();
191 };
192 let mut out = Vec::with_capacity(arr.len());
193 for item in arr {
194 let Some(obj) = item.as_object() else {
195 continue;
196 };
197 let lookup = |k: &str| obj.iter().find(|(key, _)| key == k).map(|(_, v)| v);
198 let Some(event_name) = lookup("event_name").and_then(JsonValue::as_str) else {
199 continue;
200 };
201 let Some(version) = lookup("version").and_then(JsonValue::as_f64) else {
202 continue;
203 };
204 let Some(schema_json) = lookup("schema_json").and_then(JsonValue::as_str) else {
205 continue;
206 };
207 let Some(registered_at_ms) = lookup("registered_at_ms").and_then(JsonValue::as_f64) else {
208 continue;
209 };
210 out.push(SchemaEntry {
211 event_name: event_name.to_string(),
212 version: version as u32,
213 schema_json: schema_json.to_string(),
214 registered_at_ms: registered_at_ms as u128,
215 });
216 }
217 out
218}
219
220fn entry_to_json(e: &SchemaEntry) -> crate::serde_json::Value {
221 let mut obj = crate::serde_json::Map::new();
222 obj.insert(
223 "event_name".to_string(),
224 crate::serde_json::Value::String(e.event_name.clone()),
225 );
226 obj.insert(
227 "version".to_string(),
228 crate::serde_json::Value::Number(e.version as f64),
229 );
230 obj.insert(
231 "schema_json".to_string(),
232 crate::serde_json::Value::String(e.schema_json.clone()),
233 );
234 obj.insert(
235 "registered_at_ms".to_string(),
236 crate::serde_json::Value::Number(e.registered_at_ms as f64),
237 );
238 crate::serde_json::Value::Object(obj)
239}
240
241fn save(store: &UnifiedStore, entries: &[SchemaEntry]) {
242 let arr = crate::serde_json::Value::Array(entries.iter().map(entry_to_json).collect());
243 let wrapped = crate::serde_json::Value::String(arr.to_string());
247 store.set_config_tree(REGISTRY_KEY, &wrapped);
248}
249
250fn validate_schema_shape(schema_json: &str) -> Result<JsonValue, SchemaError> {
254 let parsed = parse_json(schema_json)
255 .map_err(|err| SchemaError::InvalidSchemaJson(err.to_string()))?;
256 let Some(obj) = parsed.as_object() else {
257 return Err(SchemaError::InvalidSchemaShape(
258 "schema must be a JSON object".to_string(),
259 ));
260 };
261 let lookup = |k: &str| obj.iter().find(|(key, _)| key == k).map(|(_, v)| v);
262 match lookup("type").and_then(JsonValue::as_str) {
263 Some("object") => {}
264 Some(other) => {
265 return Err(SchemaError::InvalidSchemaShape(format!(
266 "schema `type` must be \"object\", got \"{other}\""
267 )));
268 }
269 None => {
270 return Err(SchemaError::InvalidSchemaShape(
271 "schema must declare `type`".to_string(),
272 ));
273 }
274 }
275 if let Some(props) = lookup("properties") {
276 if props.as_object().is_none() {
277 return Err(SchemaError::InvalidSchemaShape(
278 "schema `properties` must be an object".to_string(),
279 ));
280 }
281 }
282 if let Some(req) = lookup("required") {
283 let Some(arr) = req.as_array() else {
284 return Err(SchemaError::InvalidSchemaShape(
285 "schema `required` must be an array of strings".to_string(),
286 ));
287 };
288 for item in arr {
289 if item.as_str().is_none() {
290 return Err(SchemaError::InvalidSchemaShape(
291 "schema `required` must be an array of strings".to_string(),
292 ));
293 }
294 }
295 }
296 Ok(parsed)
297}
298
299pub fn register(
307 store: &UnifiedStore,
308 event_name: &str,
309 schema_json: &str,
310) -> Result<u32, SchemaError> {
311 let candidate = validate_schema_shape(schema_json)?;
312 let mut entries = load(store);
313
314 let previous = entries
315 .iter()
316 .filter(|e| e.event_name == event_name)
317 .max_by_key(|e| e.version)
318 .cloned();
319
320 let next_version = match previous {
321 None => 1,
322 Some(prev) => {
323 let prev_schema = parse_json(&prev.schema_json).map_err(|e| {
324 SchemaError::InvalidSchemaShape(format!(
325 "previously registered schema for {event_name} v{} is corrupt: {e}",
326 prev.version
327 ))
328 })?;
329 let offenders = diff_for_breaking_changes(&prev_schema, &candidate);
330 if !offenders.is_empty() {
331 return Err(SchemaError::BreakingChange {
332 event_name: event_name.to_string(),
333 previous_version: prev.version,
334 offenders,
335 });
336 }
337 prev.version + 1
338 }
339 };
340
341 entries.push(SchemaEntry {
342 event_name: event_name.to_string(),
343 version: next_version,
344 schema_json: schema_json.to_string(),
345 registered_at_ms: now_ms(),
346 });
347 save(store, &entries);
348 Ok(next_version)
349}
350
351fn schema_fields(schema: &JsonValue) -> Vec<(String, String, bool)> {
355 let Some(obj) = schema.as_object() else {
356 return Vec::new();
357 };
358 let properties: &[(String, JsonValue)] = obj
359 .iter()
360 .find(|(k, _)| k == "properties")
361 .and_then(|(_, v)| v.as_object())
362 .unwrap_or(&[]);
363 let required: Vec<&str> = obj
364 .iter()
365 .find(|(k, _)| k == "required")
366 .and_then(|(_, v)| v.as_array())
367 .map(|arr| arr.iter().filter_map(JsonValue::as_str).collect())
368 .unwrap_or_default();
369 properties
370 .iter()
371 .map(|(name, prop)| {
372 let ty = prop
373 .as_object()
374 .and_then(|entries| entries.iter().find(|(k, _)| k == "type"))
375 .and_then(|(_, v)| v.as_str())
376 .unwrap_or("")
377 .to_string();
378 let req = required.iter().any(|r| *r == name.as_str());
379 (name.clone(), ty, req)
380 })
381 .collect()
382}
383
384fn diff_for_breaking_changes(prev: &JsonValue, next: &JsonValue) -> Vec<BreakingChange> {
392 let prev_fields = schema_fields(prev);
393 let next_fields = schema_fields(next);
394
395 let mut breaks = Vec::new();
396 let mut dropped: Vec<(String, String)> = Vec::new();
397 let mut added: Vec<(String, String, bool)> = Vec::new();
399
400 for (name, prev_type, prev_required) in &prev_fields {
401 match next_fields.iter().find(|(n, _, _)| n == name) {
402 Some((_, next_type, next_required)) => {
403 if prev_type != next_type && !prev_type.is_empty() && !next_type.is_empty() {
404 breaks.push(BreakingChange::Retype {
405 field: name.clone(),
406 from: prev_type.clone(),
407 to: next_type.clone(),
408 });
409 }
410 if !prev_required && *next_required {
411 breaks.push(BreakingChange::RequiredAdd {
412 field: name.clone(),
413 });
414 }
415 }
416 None => dropped.push((name.clone(), prev_type.clone())),
417 }
418 }
419
420 for (name, next_type, next_required) in &next_fields {
421 if prev_fields.iter().any(|(n, _, _)| n == name) {
422 continue;
423 }
424 added.push((name.clone(), next_type.clone(), *next_required));
425 }
426
427 for (drop_name, drop_type) in dropped {
432 let paired = added
433 .iter()
434 .position(|(_, ty, _)| ty == &drop_type && !drop_type.is_empty());
435 match paired {
436 Some(idx) => {
437 let (add_name, _, _) = added.remove(idx);
438 breaks.push(BreakingChange::Rename {
439 from: drop_name,
440 to: add_name,
441 });
442 }
443 None => breaks.push(BreakingChange::Drop { field: drop_name }),
444 }
445 }
446
447 for (name, _, required) in added {
450 if required {
451 breaks.push(BreakingChange::RequiredAdd { field: name });
452 }
453 }
454
455 breaks
456}
457
458pub fn latest(store: &UnifiedStore, event_name: &str) -> Option<(u32, String)> {
464 let entries = load(store);
465 entries
466 .into_iter()
467 .filter(|e| e.event_name == event_name)
468 .max_by_key(|e| e.version)
469 .map(|e| (e.version, e.schema_json))
470}
471
472pub fn list(store: &UnifiedStore) -> Vec<SchemaEntry> {
475 load(store)
476}
477
478fn json_type_name(v: &JsonValue) -> &'static str {
479 match v {
480 JsonValue::Null => "null",
481 JsonValue::Bool(_) => "boolean",
482 JsonValue::Number(_) => "number",
483 JsonValue::String(_) => "string",
484 JsonValue::Array(_) => "array",
485 JsonValue::Object(_) => "object",
486 }
487}
488
489fn type_matches(expected: &str, got: &JsonValue) -> bool {
490 match expected {
491 "string" => matches!(got, JsonValue::String(_)),
492 "boolean" => matches!(got, JsonValue::Bool(_)),
493 "array" => matches!(got, JsonValue::Array(_)),
494 "object" => matches!(got, JsonValue::Object(_)),
495 "null" => matches!(got, JsonValue::Null),
496 "number" => matches!(got, JsonValue::Number(_)),
497 "integer" => match got {
498 JsonValue::Number(n) => *n == n.trunc(),
499 _ => false,
500 },
501 _ => false,
502 }
503}
504
505pub fn validate(
514 store: &UnifiedStore,
515 event_name: &str,
516 payload_json: &str,
517) -> Result<(), ValidationError> {
518 let Some((version, schema_json)) = latest(store, event_name) else {
519 return Err(ValidationError::UnknownEventName(event_name.to_string()));
520 };
521 let schema = parse_json(&schema_json)
522 .map_err(|e| ValidationError::InvalidPayloadJson(format!("schema corrupt: {e}")))?;
523 let payload = parse_json(payload_json)
524 .map_err(|e| ValidationError::InvalidPayloadJson(e.to_string()))?;
525 let Some(payload_obj) = payload.as_object() else {
526 return Err(ValidationError::PayloadNotObject);
527 };
528 let schema_obj = schema.as_object().unwrap_or(&[]);
529 let properties: &[(String, JsonValue)] = schema_obj
530 .iter()
531 .find(|(k, _)| k == "properties")
532 .and_then(|(_, v)| v.as_object())
533 .unwrap_or(&[]);
534 let required: Vec<&str> = schema_obj
535 .iter()
536 .find(|(k, _)| k == "required")
537 .and_then(|(_, v)| v.as_array())
538 .map(|arr| arr.iter().filter_map(JsonValue::as_str).collect())
539 .unwrap_or_default();
540
541 for req in &required {
544 if !payload_obj.iter().any(|(k, _)| k == *req) {
545 return Err(ValidationError::MissingRequiredField {
546 event_name: event_name.to_string(),
547 version,
548 field: (*req).to_string(),
549 });
550 }
551 }
552 for (key, value) in payload_obj {
554 let Some((_, prop)) = properties.iter().find(|(k, _)| k == key) else {
555 return Err(ValidationError::UnknownField {
556 event_name: event_name.to_string(),
557 version,
558 field: key.clone(),
559 });
560 };
561 let expected_type = prop
562 .as_object()
563 .and_then(|entries| entries.iter().find(|(k, _)| k == "type"))
564 .and_then(|(_, v)| v.as_str())
565 .unwrap_or("");
566 if expected_type.is_empty() {
567 continue;
568 }
569 if !type_matches(expected_type, value) {
570 return Err(ValidationError::TypeMismatch {
571 event_name: event_name.to_string(),
572 version,
573 field: key.clone(),
574 expected: expected_type.to_string(),
575 got: json_type_name(value).to_string(),
576 });
577 }
578 }
579 Ok(())
580}
581
582pub fn validation_error_to_reddb(err: ValidationError) -> crate::api::RedDBError {
588 let body = match &err {
589 ValidationError::UnknownEventName(name) => {
590 format!("AnalyticsSchemaError:UnknownEventName:{name}")
591 }
592 ValidationError::InvalidPayloadJson(reason) => {
593 format!("AnalyticsSchemaError:InvalidPayloadJson:{reason}")
594 }
595 ValidationError::PayloadNotObject => {
596 "AnalyticsSchemaError:PayloadNotObject".to_string()
597 }
598 ValidationError::MissingRequiredField {
599 event_name,
600 version,
601 field,
602 } => format!(
603 "AnalyticsSchemaError:MissingRequiredField:{event_name}:v{version}:{field}"
604 ),
605 ValidationError::UnknownField {
606 event_name,
607 version,
608 field,
609 } => format!(
610 "AnalyticsSchemaError:UnknownField:{event_name}:v{version}:{field}"
611 ),
612 ValidationError::TypeMismatch {
613 event_name,
614 version,
615 field,
616 expected,
617 got,
618 } => format!(
619 "AnalyticsSchemaError:TypeMismatch:{event_name}:v{version}:{field}:{expected}:{got}"
620 ),
621 };
622 crate::api::RedDBError::InvalidOperation(body)
623}
624
625#[cfg(test)]
626mod tests {
627 use super::*;
628
629 fn store() -> UnifiedStore {
630 UnifiedStore::new()
631 }
632
633 const PAGE_VIEW_SCHEMA: &str = r#"{
634 "type": "object",
635 "properties": {
636 "url": {"type": "string"},
637 "user_id": {"type": "integer"}
638 },
639 "required": ["url"]
640 }"#;
641
642 #[test]
643 fn first_registration_is_version_1() {
644 let s = store();
645 let v = register(&s, "page_view", PAGE_VIEW_SCHEMA).expect("register ok");
646 assert_eq!(v, 1);
647 let (latest_v, _) = latest(&s, "page_view").expect("latest present");
648 assert_eq!(latest_v, 1);
649 }
650
651 #[test]
652 fn re_registering_identical_schema_bumps_to_next_version() {
653 let s = store();
657 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
658 let v = register(&s, "page_view", PAGE_VIEW_SCHEMA).expect("identical is additive");
659 assert_eq!(v, 2);
660 }
661
662 const PURCHASE_V1: &str =
665 r#"{"type":"object","properties":{"amount":{"type":"number"}},"required":["amount"]}"#;
666
667 #[test]
668 fn additive_optional_field_is_accepted_as_v2() {
669 let s = store();
670 register(&s, "purchase", PURCHASE_V1).unwrap();
671 let v2 = register(
672 &s,
673 "purchase",
674 r#"{"type":"object",
675 "properties":{"amount":{"type":"number"},
676 "discount_code":{"type":"string"}},
677 "required":["amount"]}"#,
678 )
679 .expect("optional add is additive");
680 assert_eq!(v2, 2);
681 let (latest_v, _) = latest(&s, "purchase").unwrap();
682 assert_eq!(latest_v, 2);
683 }
684
685 #[test]
686 fn additive_optional_field_with_default_is_accepted() {
687 let s = store();
688 register(&s, "purchase", PURCHASE_V1).unwrap();
689 let v2 = register(
690 &s,
691 "purchase",
692 r#"{"type":"object",
693 "properties":{"amount":{"type":"number"},
694 "currency":{"type":"string","default":"USD"}},
695 "required":["amount"]}"#,
696 )
697 .expect("optional add with default is additive");
698 assert_eq!(v2, 2);
699 }
700
701 #[test]
702 fn widening_string_max_length_is_accepted() {
703 let s = store();
704 register(
705 &s,
706 "ev",
707 r#"{"type":"object","properties":{"name":{"type":"string","maxLength":32}},"required":["name"]}"#,
708 )
709 .unwrap();
710 let v2 = register(
711 &s,
712 "ev",
713 r#"{"type":"object","properties":{"name":{"type":"string","maxLength":128}},"required":["name"]}"#,
714 )
715 .expect("widening maxLength is additive");
716 assert_eq!(v2, 2);
717 }
718
719 #[test]
720 fn breaking_rename_is_rejected() {
721 let s = store();
722 register(&s, "purchase", PURCHASE_V1).unwrap();
723 let err = register(
724 &s,
725 "purchase",
726 r#"{"type":"object","properties":{"total":{"type":"number"}},"required":["total"]}"#,
727 )
728 .unwrap_err();
729 match err {
730 SchemaError::BreakingChange {
731 event_name,
732 previous_version,
733 offenders,
734 } => {
735 assert_eq!(event_name, "purchase");
736 assert_eq!(previous_version, 1);
737 assert!(
738 offenders.iter().any(|b| matches!(
739 b,
740 BreakingChange::Rename { from, to }
741 if from == "amount" && to == "total"
742 )),
743 "expected Rename(amount->total), got {offenders:?}"
744 );
745 }
746 other => panic!("expected BreakingChange, got {other:?}"),
747 }
748 }
749
750 #[test]
751 fn breaking_retype_is_rejected() {
752 let s = store();
753 register(&s, "purchase", PURCHASE_V1).unwrap();
754 let err = register(
755 &s,
756 "purchase",
757 r#"{"type":"object","properties":{"amount":{"type":"string"}},"required":["amount"]}"#,
758 )
759 .unwrap_err();
760 let SchemaError::BreakingChange { offenders, .. } = err else {
761 panic!("expected BreakingChange");
762 };
763 assert!(offenders.iter().any(|b| matches!(
764 b,
765 BreakingChange::Retype { field, from, to }
766 if field == "amount" && from == "number" && to == "string"
767 )));
768 }
769
770 #[test]
771 fn breaking_drop_is_rejected() {
772 let s = store();
773 register(
774 &s,
775 "ev",
776 r#"{"type":"object",
777 "properties":{"a":{"type":"number"},"b":{"type":"boolean"}},
778 "required":["a"]}"#,
779 )
780 .unwrap();
781 let err = register(
782 &s,
783 "ev",
784 r#"{"type":"object","properties":{"a":{"type":"number"}},"required":["a"]}"#,
785 )
786 .unwrap_err();
787 let SchemaError::BreakingChange { offenders, .. } = err else {
788 panic!("expected BreakingChange");
789 };
790 assert!(offenders
791 .iter()
792 .any(|b| matches!(b, BreakingChange::Drop { field } if field == "b")));
793 }
794
795 #[test]
796 fn breaking_optional_to_required_is_rejected() {
797 let s = store();
798 register(
799 &s,
800 "ev",
801 r#"{"type":"object",
802 "properties":{"a":{"type":"number"},"b":{"type":"string"}},
803 "required":["a"]}"#,
804 )
805 .unwrap();
806 let err = register(
807 &s,
808 "ev",
809 r#"{"type":"object",
810 "properties":{"a":{"type":"number"},"b":{"type":"string"}},
811 "required":["a","b"]}"#,
812 )
813 .unwrap_err();
814 let SchemaError::BreakingChange { offenders, .. } = err else {
815 panic!("expected BreakingChange");
816 };
817 assert!(offenders
818 .iter()
819 .any(|b| matches!(b, BreakingChange::RequiredAdd { field } if field == "b")));
820 }
821
822 #[test]
823 fn multi_field_break_reports_every_offender() {
824 let s = store();
825 register(
826 &s,
827 "ev",
828 r#"{"type":"object",
829 "properties":{"a":{"type":"number"},
830 "b":{"type":"string"},
831 "c":{"type":"boolean"}},
832 "required":["a"]}"#,
833 )
834 .unwrap();
835 let err = register(
838 &s,
839 "ev",
840 r#"{"type":"object",
841 "properties":{"a":{"type":"string"},
842 "b":{"type":"string"},
843 "d":{"type":"integer"}},
844 "required":["a","d"]}"#,
845 )
846 .unwrap_err();
847 let SchemaError::BreakingChange { offenders, .. } = err else {
848 panic!("expected BreakingChange");
849 };
850 assert!(offenders
851 .iter()
852 .any(|b| matches!(b, BreakingChange::Retype { field, .. } if field == "a")));
853 assert!(offenders
854 .iter()
855 .any(|b| matches!(b, BreakingChange::Drop { field } if field == "c")));
856 assert!(offenders
857 .iter()
858 .any(|b| matches!(b, BreakingChange::RequiredAdd { field } if field == "d")));
859 }
860
861 #[test]
862 fn validate_resolves_to_latest_version_after_evolution() {
863 let s = store();
868 register(&s, "purchase", PURCHASE_V1).unwrap();
869 register(
870 &s,
871 "purchase",
872 r#"{"type":"object",
873 "properties":{"amount":{"type":"number"},
874 "discount_code":{"type":"string"}},
875 "required":["amount"]}"#,
876 )
877 .unwrap();
878 validate(&s, "purchase", r#"{"amount":1.0}"#).expect("v1-shape still valid");
879 validate(&s, "purchase", r#"{"amount":1.0,"discount_code":"X"}"#)
880 .expect("v2-only field accepted");
881 let err = validate(&s, "purchase", r#"{"amount":1.0,"mystery":1}"#).unwrap_err();
882 assert!(matches!(err, ValidationError::UnknownField { version, .. } if version == 2));
883 }
884
885 #[test]
886 fn list_returns_every_version_not_just_latest() {
887 let s = store();
890 register(&s, "purchase", PURCHASE_V1).unwrap();
891 register(
892 &s,
893 "purchase",
894 r#"{"type":"object",
895 "properties":{"amount":{"type":"number"},
896 "discount_code":{"type":"string"}},
897 "required":["amount"]}"#,
898 )
899 .unwrap();
900 let purchase_versions: Vec<u32> = list(&s)
901 .into_iter()
902 .filter(|e| e.event_name == "purchase")
903 .map(|e| e.version)
904 .collect();
905 let mut sorted = purchase_versions.clone();
906 sorted.sort();
907 assert_eq!(sorted, vec![1, 2], "expected both versions, got {purchase_versions:?}");
908 }
909
910 #[test]
911 fn invalid_schema_json_rejected_at_register() {
912 let s = store();
913 let err = register(&s, "x", "{not json").unwrap_err();
914 assert!(matches!(err, SchemaError::InvalidSchemaJson(_)));
915 }
916
917 #[test]
918 fn schema_must_be_type_object() {
919 let s = store();
920 let err = register(&s, "x", r#"{"type":"string"}"#).unwrap_err();
921 assert!(matches!(err, SchemaError::InvalidSchemaShape(_)));
922 }
923
924 #[test]
925 fn validate_happy_path_accepts_known_fields() {
926 let s = store();
927 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
928 validate(&s, "page_view", r#"{"url":"/x","user_id":42}"#).expect("ok");
929 validate(&s, "page_view", r#"{"url":"/y"}"#).expect("ok without optional");
930 }
931
932 #[test]
933 fn validate_rejects_unknown_field() {
934 let s = store();
935 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
936 let err = validate(&s, "page_view", r#"{"url":"/x","mystery":1}"#).unwrap_err();
937 match err {
938 ValidationError::UnknownField { field, .. } => assert_eq!(field, "mystery"),
939 other => panic!("expected UnknownField, got {other:?}"),
940 }
941 }
942
943 #[test]
944 fn validate_rejects_missing_required_field() {
945 let s = store();
946 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
947 let err = validate(&s, "page_view", r#"{}"#).unwrap_err();
948 match err {
949 ValidationError::MissingRequiredField { field, .. } => assert_eq!(field, "url"),
950 other => panic!("expected MissingRequiredField, got {other:?}"),
951 }
952 }
953
954 #[test]
955 fn validate_rejects_type_mismatch() {
956 let s = store();
957 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
958 let err = validate(&s, "page_view", r#"{"url":123}"#).unwrap_err();
959 match err {
960 ValidationError::TypeMismatch {
961 field, expected, got, ..
962 } => {
963 assert_eq!(field, "url");
964 assert_eq!(expected, "string");
965 assert_eq!(got, "number");
966 }
967 other => panic!("expected TypeMismatch, got {other:?}"),
968 }
969 }
970
971 #[test]
972 fn validate_unknown_event_name() {
973 let s = store();
974 let err = validate(&s, "nope", r#"{}"#).unwrap_err();
975 assert!(matches!(err, ValidationError::UnknownEventName(name) if name == "nope"));
976 }
977
978 #[test]
979 fn validate_payload_must_be_object() {
980 let s = store();
981 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
982 let err = validate(&s, "page_view", r#""hello""#).unwrap_err();
983 assert!(matches!(err, ValidationError::PayloadNotObject));
984 }
985
986 #[test]
987 fn list_returns_every_registered_event() {
988 let s = store();
989 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
990 register(
991 &s,
992 "signup",
993 r#"{"type":"object","properties":{"email":{"type":"string"}},"required":["email"]}"#,
994 )
995 .unwrap();
996 let mut names: Vec<String> = list(&s).into_iter().map(|e| e.event_name).collect();
997 names.sort();
998 assert_eq!(names, vec!["page_view".to_string(), "signup".to_string()]);
999 assert!(list(&s).iter().all(|e| e.version == 1));
1000 assert!(list(&s).iter().all(|e| e.registered_at_ms > 0));
1001 }
1002
1003 #[test]
1004 fn persistence_smoke_latest_survives_restart() {
1005 let s = store();
1012 register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
1013 let raw =
1014 read_latest_registry_json(&s).expect("registry json must be persisted on register");
1015 assert!(raw.contains("page_view"));
1016 let (v, schema) = latest(&s, "page_view").expect("latest after persist");
1019 assert_eq!(v, 1);
1020 assert!(schema.contains("\"url\""));
1021 }
1022
1023 #[test]
1024 fn validation_error_maps_to_invalid_operation_with_typed_marker() {
1025 let err = validation_error_to_reddb(ValidationError::MissingRequiredField {
1026 event_name: "page_view".to_string(),
1027 version: 1,
1028 field: "url".to_string(),
1029 });
1030 match err {
1031 crate::api::RedDBError::InvalidOperation(body) => {
1032 assert!(
1033 body.starts_with("AnalyticsSchemaError:MissingRequiredField:page_view:v1:url"),
1034 "unexpected body: {body}"
1035 );
1036 }
1037 other => panic!("expected InvalidOperation, got {other:?}"),
1038 }
1039 }
1040}