Skip to main content

reddb_server/runtime/
analytics_schema_registry.rs

1//! Issue #577 — Analytics slice 2: `AnalyticsSchemaRegistry`.
2//! Issue #581 — Analytics slice 3: additive schema evolution +
3//! breaking-change rejection.
4//!
5//! Owns `(event_name, version) → schema_json` mappings persisted in
6//! `red_config`, validates payloads at insert time, and exposes the
7//! registered set for the `red.schema_registry` virtual table.
8//!
9//! Re-registering an existing `event_name` is allowed iff the change
10//! is *additive*: new optional fields only (with or without default),
11//! widening string `maxLength`. Anything else — rename, retype, drop,
12//! optional→required, brand-new required field — is rejected with a
13//! typed `SchemaError::BreakingChange { offenders }` whose `offenders`
14//! list names every offending field together with the kind of break,
15//! so the caller can pick a new `event_name` rather than smuggle the
16//! incompatible change through the same one.
17//!
18//! Persistence shape: a single JSON document stored under
19//! `red.analytics.schema_registry.entries_json` as a Text value. It
20//! contains an array of entries `{event_name, version, schema_json,
21//! registered_at_ms}`. `red_config` is append-only, so we read by
22//! scanning that collection and keeping the row with the largest
23//! engine-assigned `EntityId` (most recent write) — same trick
24//! `signed_writes_kind` uses.
25//!
26//! The schema language is a minimal JSON Schema subset:
27//! ```json
28//! { "type": "object",
29//!   "properties": { "url": { "type": "string" } },
30//!   "required": ["url"] }
31//! ```
32//! Validation rules (v1):
33//! * payload must parse to a JSON object,
34//! * every key in `required` must be present,
35//! * every key in the payload must appear in `properties` (unknown
36//!   field rejected — strict mode),
37//! * for keys present in both, the type tag must match.
38
39use crate::storage::schema::Value;
40use crate::storage::unified::{EntityData, UnifiedStore};
41use crate::utils::json::{parse_json, JsonValue};
42
43use std::time::{SystemTime, UNIX_EPOCH};
44
45const REGISTRY_KEY: &str = "red.analytics.schema_registry.entries_json";
46
47/// One registered schema row.
48#[derive(Debug, Clone, PartialEq)]
49pub struct SchemaEntry {
50    pub event_name: String,
51    pub version: u32,
52    pub schema_json: String,
53    pub registered_at_ms: u128,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57pub enum SchemaError {
58    /// Schema text did not parse as JSON.
59    InvalidSchemaJson(String),
60    /// Schema parsed but did not match the expected
61    /// `{type:"object", properties:{}, required:[]}` shape.
62    InvalidSchemaShape(String),
63    /// A re-registration would break wire compatibility with the
64    /// previously registered version. `offenders` carries every
65    /// breaking change found in the diff so the caller can fix the
66    /// schema or pick a different `event_name` in one shot.
67    BreakingChange {
68        event_name: String,
69        previous_version: u32,
70        offenders: Vec<BreakingChange>,
71    },
72}
73
74/// One reason why a candidate schema is not an additive successor of
75/// the previous version. Used inside [`SchemaError::BreakingChange`].
76#[derive(Debug, Clone, PartialEq)]
77pub enum BreakingChange {
78    /// A field present in the previous version disappeared, and a new
79    /// field of the same declared type appeared in the candidate.
80    /// Treated as a rename rather than two separate changes because
81    /// the caller almost certainly meant to rename — the error
82    /// message tells them which pair we paired up.
83    Rename { from: String, to: String },
84    /// A field changed declared `type`.
85    Retype {
86        field: String,
87        from: String,
88        to: String,
89    },
90    /// A previously declared field is gone in the candidate.
91    Drop { field: String },
92    /// A field that was previously optional became required, or a new
93    /// field appeared in the candidate's `required` list (existing
94    /// rows wouldn't carry it).
95    RequiredAdd { field: String },
96}
97
98impl BreakingChange {
99    /// Short, machine-parseable description used in error bodies.
100    pub fn describe(&self) -> String {
101        match self {
102            BreakingChange::Rename { from, to } => format!("renamed field '{from}' to '{to}'"),
103            BreakingChange::Retype { field, from, to } => {
104                format!("retyped field '{field}' from {from} to {to}")
105            }
106            BreakingChange::Drop { field } => format!("dropped field '{field}'"),
107            BreakingChange::RequiredAdd { field } => {
108                format!("required-add for field '{field}'")
109            }
110        }
111    }
112}
113
114#[derive(Debug, Clone, PartialEq)]
115pub enum ValidationError {
116    /// No schema is registered for this `event_name`. Callers may
117    /// silently treat this as "no validation" (insert path) — the
118    /// variant is here so library consumers can branch on it.
119    UnknownEventName(String),
120    InvalidPayloadJson(String),
121    /// Payload parsed but is not a JSON object.
122    PayloadNotObject,
123    /// Payload omitted a field listed in `required`.
124    MissingRequiredField {
125        event_name: String,
126        version: u32,
127        field: String,
128    },
129    /// Payload included a field that the registered schema does not
130    /// declare in `properties`. Strict mode — slice 2 has no
131    /// `additionalProperties: true` escape hatch.
132    UnknownField {
133        event_name: String,
134        version: u32,
135        field: String,
136    },
137    /// Field's JSON type does not match the property's declared type.
138    TypeMismatch {
139        event_name: String,
140        version: u32,
141        field: String,
142        expected: String,
143        got: String,
144    },
145}
146
147fn now_ms() -> u128 {
148    SystemTime::now()
149        .duration_since(UNIX_EPOCH)
150        .map(|d| d.as_millis())
151        .unwrap_or(0)
152}
153
154/// Read the *latest* Text payload for the registry key out of
155/// `red_config`. `red_config` is append-only — `UnifiedStore::get_config`
156/// returns the first matching row, but the most-recent write wins
157/// for us. We sort by `EntityId` descending and keep the first
158/// matching row — identical to `signed_writes_kind::read_latest_config`.
159fn read_latest_registry_json(store: &UnifiedStore) -> Option<String> {
160    let manager = store.get_collection("red_config")?;
161    let mut all = manager.query_all(|_| true);
162    all.sort_by(|a, b| b.id.raw().cmp(&a.id.raw()));
163    for entity in all {
164        let EntityData::Row(row) = &entity.data else {
165            continue;
166        };
167        let Some(named) = &row.named else { continue };
168        let matches = matches!(
169            named.get("key"),
170            Some(Value::Text(s)) if s.as_ref() == REGISTRY_KEY
171        );
172        if matches {
173            if let Some(Value::Text(s)) = named.get("value") {
174                return Some(s.to_string());
175            }
176        }
177    }
178    None
179}
180
181fn load(store: &UnifiedStore) -> Vec<SchemaEntry> {
182    let raw = match read_latest_registry_json(store) {
183        Some(s) => s,
184        None => return Vec::new(),
185    };
186    let Ok(parsed) = parse_json(&raw) else {
187        return Vec::new();
188    };
189    let Some(arr) = parsed.as_array() else {
190        return Vec::new();
191    };
192    let mut out = Vec::with_capacity(arr.len());
193    for item in arr {
194        let Some(obj) = item.as_object() else {
195            continue;
196        };
197        let lookup = |k: &str| obj.iter().find(|(key, _)| key == k).map(|(_, v)| v);
198        let Some(event_name) = lookup("event_name").and_then(JsonValue::as_str) else {
199            continue;
200        };
201        let Some(version) = lookup("version").and_then(JsonValue::as_f64) else {
202            continue;
203        };
204        let Some(schema_json) = lookup("schema_json").and_then(JsonValue::as_str) else {
205            continue;
206        };
207        let Some(registered_at_ms) = lookup("registered_at_ms").and_then(JsonValue::as_f64) else {
208            continue;
209        };
210        out.push(SchemaEntry {
211            event_name: event_name.to_string(),
212            version: version as u32,
213            schema_json: schema_json.to_string(),
214            registered_at_ms: registered_at_ms as u128,
215        });
216    }
217    out
218}
219
220fn entry_to_json(e: &SchemaEntry) -> crate::serde_json::Value {
221    let mut obj = crate::serde_json::Map::new();
222    obj.insert(
223        "event_name".to_string(),
224        crate::serde_json::Value::String(e.event_name.clone()),
225    );
226    obj.insert(
227        "version".to_string(),
228        crate::serde_json::Value::Number(e.version as f64),
229    );
230    obj.insert(
231        "schema_json".to_string(),
232        crate::serde_json::Value::String(e.schema_json.clone()),
233    );
234    obj.insert(
235        "registered_at_ms".to_string(),
236        crate::serde_json::Value::Number(e.registered_at_ms as f64),
237    );
238    crate::serde_json::Value::Object(obj)
239}
240
241fn save(store: &UnifiedStore, entries: &[SchemaEntry]) {
242    let arr = crate::serde_json::Value::Array(entries.iter().map(entry_to_json).collect());
243    // Store the array as one Text value, not as a flattened tree,
244    // so `set_config_tree` writes a single row whose `value` column
245    // round-trips back into the same JSON bytes.
246    let wrapped = crate::serde_json::Value::String(arr.to_string());
247    store.set_config_tree(REGISTRY_KEY, &wrapped);
248}
249
250/// Parse + minimal shape check on the schema string. Returns the
251/// canonical re-serialised form, so the registry stores a normalised
252/// representation regardless of caller whitespace / key ordering.
253fn validate_schema_shape(schema_json: &str) -> Result<JsonValue, SchemaError> {
254    let parsed = parse_json(schema_json)
255        .map_err(|err| SchemaError::InvalidSchemaJson(err.to_string()))?;
256    let Some(obj) = parsed.as_object() else {
257        return Err(SchemaError::InvalidSchemaShape(
258            "schema must be a JSON object".to_string(),
259        ));
260    };
261    let lookup = |k: &str| obj.iter().find(|(key, _)| key == k).map(|(_, v)| v);
262    match lookup("type").and_then(JsonValue::as_str) {
263        Some("object") => {}
264        Some(other) => {
265            return Err(SchemaError::InvalidSchemaShape(format!(
266                "schema `type` must be \"object\", got \"{other}\""
267            )));
268        }
269        None => {
270            return Err(SchemaError::InvalidSchemaShape(
271                "schema must declare `type`".to_string(),
272            ));
273        }
274    }
275    if let Some(props) = lookup("properties") {
276        if props.as_object().is_none() {
277            return Err(SchemaError::InvalidSchemaShape(
278                "schema `properties` must be an object".to_string(),
279            ));
280        }
281    }
282    if let Some(req) = lookup("required") {
283        let Some(arr) = req.as_array() else {
284            return Err(SchemaError::InvalidSchemaShape(
285                "schema `required` must be an array of strings".to_string(),
286            ));
287        };
288        for item in arr {
289            if item.as_str().is_none() {
290                return Err(SchemaError::InvalidSchemaShape(
291                    "schema `required` must be an array of strings".to_string(),
292                ));
293            }
294        }
295    }
296    Ok(parsed)
297}
298
299/// Register a schema for `event_name`.
300///
301/// * First registration → returns version `1`.
302/// * Additive successor → returns `previous_version + 1`.
303/// * Anything else → `SchemaError::BreakingChange { offenders }` with
304///   every break the diff turned up so the caller can fix them all in
305///   one round-trip.
306pub fn register(
307    store: &UnifiedStore,
308    event_name: &str,
309    schema_json: &str,
310) -> Result<u32, SchemaError> {
311    let candidate = validate_schema_shape(schema_json)?;
312    let mut entries = load(store);
313
314    let previous = entries
315        .iter()
316        .filter(|e| e.event_name == event_name)
317        .max_by_key(|e| e.version)
318        .cloned();
319
320    let next_version = match previous {
321        None => 1,
322        Some(prev) => {
323            let prev_schema = parse_json(&prev.schema_json).map_err(|e| {
324                SchemaError::InvalidSchemaShape(format!(
325                    "previously registered schema for {event_name} v{} is corrupt: {e}",
326                    prev.version
327                ))
328            })?;
329            let offenders = diff_for_breaking_changes(&prev_schema, &candidate);
330            if !offenders.is_empty() {
331                return Err(SchemaError::BreakingChange {
332                    event_name: event_name.to_string(),
333                    previous_version: prev.version,
334                    offenders,
335                });
336            }
337            prev.version + 1
338        }
339    };
340
341    entries.push(SchemaEntry {
342        event_name: event_name.to_string(),
343        version: next_version,
344        schema_json: schema_json.to_string(),
345        registered_at_ms: now_ms(),
346    });
347    save(store, &entries);
348    Ok(next_version)
349}
350
351/// Extract `(field, type, is_required)` triples from a parsed schema
352/// object. `type` is the declared JSON-Schema `type` string for the
353/// property, or `""` when none was declared.
354fn schema_fields(schema: &JsonValue) -> Vec<(String, String, bool)> {
355    let Some(obj) = schema.as_object() else {
356        return Vec::new();
357    };
358    let properties: &[(String, JsonValue)] = obj
359        .iter()
360        .find(|(k, _)| k == "properties")
361        .and_then(|(_, v)| v.as_object())
362        .unwrap_or(&[]);
363    let required: Vec<&str> = obj
364        .iter()
365        .find(|(k, _)| k == "required")
366        .and_then(|(_, v)| v.as_array())
367        .map(|arr| arr.iter().filter_map(JsonValue::as_str).collect())
368        .unwrap_or_default();
369    properties
370        .iter()
371        .map(|(name, prop)| {
372            let ty = prop
373                .as_object()
374                .and_then(|entries| entries.iter().find(|(k, _)| k == "type"))
375                .and_then(|(_, v)| v.as_str())
376                .unwrap_or("")
377                .to_string();
378            let req = required.iter().any(|r| *r == name.as_str());
379            (name.clone(), ty, req)
380        })
381        .collect()
382}
383
384/// Diff a previously registered schema against a candidate and return
385/// every breaking change. Empty result == additive (or identical).
386///
387/// The diff intentionally pairs unmatched drops + adds of the same
388/// declared type as a [`BreakingChange::Rename`] — the caller is told
389/// which pair we associated so they can disambiguate if our guess is
390/// wrong.
391fn diff_for_breaking_changes(prev: &JsonValue, next: &JsonValue) -> Vec<BreakingChange> {
392    let prev_fields = schema_fields(prev);
393    let next_fields = schema_fields(next);
394
395    let mut breaks = Vec::new();
396    let mut dropped: Vec<(String, String)> = Vec::new();
397    // (name, type, required) for fields present in next but not prev.
398    let mut added: Vec<(String, String, bool)> = Vec::new();
399
400    for (name, prev_type, prev_required) in &prev_fields {
401        match next_fields.iter().find(|(n, _, _)| n == name) {
402            Some((_, next_type, next_required)) => {
403                if prev_type != next_type && !prev_type.is_empty() && !next_type.is_empty() {
404                    breaks.push(BreakingChange::Retype {
405                        field: name.clone(),
406                        from: prev_type.clone(),
407                        to: next_type.clone(),
408                    });
409                }
410                if !prev_required && *next_required {
411                    breaks.push(BreakingChange::RequiredAdd {
412                        field: name.clone(),
413                    });
414                }
415            }
416            None => dropped.push((name.clone(), prev_type.clone())),
417        }
418    }
419
420    for (name, next_type, next_required) in &next_fields {
421        if prev_fields.iter().any(|(n, _, _)| n == name) {
422            continue;
423        }
424        added.push((name.clone(), next_type.clone(), *next_required));
425    }
426
427    // Pair drops with same-typed additions first → rename. A paired
428    // addition is *not* also reported as RequiredAdd even if the new
429    // version flagged it required: the user's intent was a rename,
430    // and surfacing both would just be noise for the same root cause.
431    for (drop_name, drop_type) in dropped {
432        let paired = added
433            .iter()
434            .position(|(_, ty, _)| ty == &drop_type && !drop_type.is_empty());
435        match paired {
436            Some(idx) => {
437                let (add_name, _, _) = added.remove(idx);
438                breaks.push(BreakingChange::Rename {
439                    from: drop_name,
440                    to: add_name,
441                });
442            }
443            None => breaks.push(BreakingChange::Drop { field: drop_name }),
444        }
445    }
446
447    // Unpaired added fields: required-add is breaking, optional-add
448    // is additive (the happy path).
449    for (name, _, required) in added {
450        if required {
451            breaks.push(BreakingChange::RequiredAdd { field: name });
452        }
453    }
454
455    breaks
456}
457
458/// Return `(version, schema_json)` for the latest registered schema
459/// of `event_name`, or `None` if nothing is registered. Since slice
460/// 2 only allows version 1 per event, "latest" == "the one row that
461/// exists". Once evolution lands, the resolver will keep the
462/// max-version row per event_name.
463pub fn latest(store: &UnifiedStore, event_name: &str) -> Option<(u32, String)> {
464    let entries = load(store);
465    entries
466        .into_iter()
467        .filter(|e| e.event_name == event_name)
468        .max_by_key(|e| e.version)
469        .map(|e| (e.version, e.schema_json))
470}
471
472/// Snapshot every registered schema. Used by the
473/// `red.schema_registry` virtual table.
474pub fn list(store: &UnifiedStore) -> Vec<SchemaEntry> {
475    load(store)
476}
477
478fn json_type_name(v: &JsonValue) -> &'static str {
479    match v {
480        JsonValue::Null => "null",
481        JsonValue::Bool(_) => "boolean",
482        JsonValue::Number(_) => "number",
483        JsonValue::String(_) => "string",
484        JsonValue::Array(_) => "array",
485        JsonValue::Object(_) => "object",
486    }
487}
488
489fn type_matches(expected: &str, got: &JsonValue) -> bool {
490    match expected {
491        "string" => matches!(got, JsonValue::String(_)),
492        "boolean" => matches!(got, JsonValue::Bool(_)),
493        "array" => matches!(got, JsonValue::Array(_)),
494        "object" => matches!(got, JsonValue::Object(_)),
495        "null" => matches!(got, JsonValue::Null),
496        "number" => matches!(got, JsonValue::Number(_)),
497        "integer" => match got {
498            JsonValue::Number(n) => *n == n.trunc(),
499            _ => false,
500        },
501        _ => false,
502    }
503}
504
505/// Validate `payload` (a JSON string) against the latest schema
506/// registered for `event_name`. Returns `Ok(())` if the payload
507/// matches; `Err(ValidationError)` with a typed reason otherwise.
508///
509/// `UnknownEventName` is returned when no schema is registered —
510/// the insert path treats that as "no validation, accept" for
511/// back-compat with `timeseries` rows that don't carry an
512/// `event_name` registered yet.
513pub fn validate(
514    store: &UnifiedStore,
515    event_name: &str,
516    payload_json: &str,
517) -> Result<(), ValidationError> {
518    let Some((version, schema_json)) = latest(store, event_name) else {
519        return Err(ValidationError::UnknownEventName(event_name.to_string()));
520    };
521    let schema = parse_json(&schema_json)
522        .map_err(|e| ValidationError::InvalidPayloadJson(format!("schema corrupt: {e}")))?;
523    let payload = parse_json(payload_json)
524        .map_err(|e| ValidationError::InvalidPayloadJson(e.to_string()))?;
525    let Some(payload_obj) = payload.as_object() else {
526        return Err(ValidationError::PayloadNotObject);
527    };
528    let schema_obj = schema.as_object().unwrap_or(&[]);
529    let properties: &[(String, JsonValue)] = schema_obj
530        .iter()
531        .find(|(k, _)| k == "properties")
532        .and_then(|(_, v)| v.as_object())
533        .unwrap_or(&[]);
534    let required: Vec<&str> = schema_obj
535        .iter()
536        .find(|(k, _)| k == "required")
537        .and_then(|(_, v)| v.as_array())
538        .map(|arr| arr.iter().filter_map(JsonValue::as_str).collect())
539        .unwrap_or_default();
540
541    // Required-field check first so callers see the missing-field
542    // error before the unknown-field error when both could fire.
543    for req in &required {
544        if !payload_obj.iter().any(|(k, _)| k == *req) {
545            return Err(ValidationError::MissingRequiredField {
546                event_name: event_name.to_string(),
547                version,
548                field: (*req).to_string(),
549            });
550        }
551    }
552    // Strict mode: every payload key must appear in properties.
553    for (key, value) in payload_obj {
554        let Some((_, prop)) = properties.iter().find(|(k, _)| k == key) else {
555            return Err(ValidationError::UnknownField {
556                event_name: event_name.to_string(),
557                version,
558                field: key.clone(),
559            });
560        };
561        let expected_type = prop
562            .as_object()
563            .and_then(|entries| entries.iter().find(|(k, _)| k == "type"))
564            .and_then(|(_, v)| v.as_str())
565            .unwrap_or("");
566        if expected_type.is_empty() {
567            continue;
568        }
569        if !type_matches(expected_type, value) {
570            return Err(ValidationError::TypeMismatch {
571                event_name: event_name.to_string(),
572                version,
573                field: key.clone(),
574                expected: expected_type.to_string(),
575                got: json_type_name(value).to_string(),
576            });
577        }
578    }
579    Ok(())
580}
581
582/// Map a [`ValidationError`] onto a [`RedDBError`] with a marker
583/// prefix the transport layer can pattern-match for status codes.
584/// The exact HTTP mapping is wired up alongside the broader analytics
585/// transport work; here we keep the body shape stable so callers can
586/// already parse it.
587pub fn validation_error_to_reddb(err: ValidationError) -> crate::api::RedDBError {
588    let body = match &err {
589        ValidationError::UnknownEventName(name) => {
590            format!("AnalyticsSchemaError:UnknownEventName:{name}")
591        }
592        ValidationError::InvalidPayloadJson(reason) => {
593            format!("AnalyticsSchemaError:InvalidPayloadJson:{reason}")
594        }
595        ValidationError::PayloadNotObject => {
596            "AnalyticsSchemaError:PayloadNotObject".to_string()
597        }
598        ValidationError::MissingRequiredField {
599            event_name,
600            version,
601            field,
602        } => format!(
603            "AnalyticsSchemaError:MissingRequiredField:{event_name}:v{version}:{field}"
604        ),
605        ValidationError::UnknownField {
606            event_name,
607            version,
608            field,
609        } => format!(
610            "AnalyticsSchemaError:UnknownField:{event_name}:v{version}:{field}"
611        ),
612        ValidationError::TypeMismatch {
613            event_name,
614            version,
615            field,
616            expected,
617            got,
618        } => format!(
619            "AnalyticsSchemaError:TypeMismatch:{event_name}:v{version}:{field}:{expected}:{got}"
620        ),
621    };
622    crate::api::RedDBError::InvalidOperation(body)
623}
624
625#[cfg(test)]
626mod tests {
627    use super::*;
628
629    fn store() -> UnifiedStore {
630        UnifiedStore::new()
631    }
632
633    const PAGE_VIEW_SCHEMA: &str = r#"{
634        "type": "object",
635        "properties": {
636            "url": {"type": "string"},
637            "user_id": {"type": "integer"}
638        },
639        "required": ["url"]
640    }"#;
641
642    #[test]
643    fn first_registration_is_version_1() {
644        let s = store();
645        let v = register(&s, "page_view", PAGE_VIEW_SCHEMA).expect("register ok");
646        assert_eq!(v, 1);
647        let (latest_v, _) = latest(&s, "page_view").expect("latest present");
648        assert_eq!(latest_v, 1);
649    }
650
651    #[test]
652    fn re_registering_identical_schema_bumps_to_next_version() {
653        // Slice 3 (#581): re-registering an identical schema is the
654        // degenerate additive case — no fields changed, so it must
655        // be accepted as v2.
656        let s = store();
657        register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
658        let v = register(&s, "page_view", PAGE_VIEW_SCHEMA).expect("identical is additive");
659        assert_eq!(v, 2);
660    }
661
662    // --- slice 3 (#581): additive evolution + breaking-change rejection ---
663
664    const PURCHASE_V1: &str =
665        r#"{"type":"object","properties":{"amount":{"type":"number"}},"required":["amount"]}"#;
666
667    #[test]
668    fn additive_optional_field_is_accepted_as_v2() {
669        let s = store();
670        register(&s, "purchase", PURCHASE_V1).unwrap();
671        let v2 = register(
672            &s,
673            "purchase",
674            r#"{"type":"object",
675                "properties":{"amount":{"type":"number"},
676                              "discount_code":{"type":"string"}},
677                "required":["amount"]}"#,
678        )
679        .expect("optional add is additive");
680        assert_eq!(v2, 2);
681        let (latest_v, _) = latest(&s, "purchase").unwrap();
682        assert_eq!(latest_v, 2);
683    }
684
685    #[test]
686    fn additive_optional_field_with_default_is_accepted() {
687        let s = store();
688        register(&s, "purchase", PURCHASE_V1).unwrap();
689        let v2 = register(
690            &s,
691            "purchase",
692            r#"{"type":"object",
693                "properties":{"amount":{"type":"number"},
694                              "currency":{"type":"string","default":"USD"}},
695                "required":["amount"]}"#,
696        )
697        .expect("optional add with default is additive");
698        assert_eq!(v2, 2);
699    }
700
701    #[test]
702    fn widening_string_max_length_is_accepted() {
703        let s = store();
704        register(
705            &s,
706            "ev",
707            r#"{"type":"object","properties":{"name":{"type":"string","maxLength":32}},"required":["name"]}"#,
708        )
709        .unwrap();
710        let v2 = register(
711            &s,
712            "ev",
713            r#"{"type":"object","properties":{"name":{"type":"string","maxLength":128}},"required":["name"]}"#,
714        )
715        .expect("widening maxLength is additive");
716        assert_eq!(v2, 2);
717    }
718
719    #[test]
720    fn breaking_rename_is_rejected() {
721        let s = store();
722        register(&s, "purchase", PURCHASE_V1).unwrap();
723        let err = register(
724            &s,
725            "purchase",
726            r#"{"type":"object","properties":{"total":{"type":"number"}},"required":["total"]}"#,
727        )
728        .unwrap_err();
729        match err {
730            SchemaError::BreakingChange {
731                event_name,
732                previous_version,
733                offenders,
734            } => {
735                assert_eq!(event_name, "purchase");
736                assert_eq!(previous_version, 1);
737                assert!(
738                    offenders.iter().any(|b| matches!(
739                        b,
740                        BreakingChange::Rename { from, to }
741                            if from == "amount" && to == "total"
742                    )),
743                    "expected Rename(amount->total), got {offenders:?}"
744                );
745            }
746            other => panic!("expected BreakingChange, got {other:?}"),
747        }
748    }
749
750    #[test]
751    fn breaking_retype_is_rejected() {
752        let s = store();
753        register(&s, "purchase", PURCHASE_V1).unwrap();
754        let err = register(
755            &s,
756            "purchase",
757            r#"{"type":"object","properties":{"amount":{"type":"string"}},"required":["amount"]}"#,
758        )
759        .unwrap_err();
760        let SchemaError::BreakingChange { offenders, .. } = err else {
761            panic!("expected BreakingChange");
762        };
763        assert!(offenders.iter().any(|b| matches!(
764            b,
765            BreakingChange::Retype { field, from, to }
766                if field == "amount" && from == "number" && to == "string"
767        )));
768    }
769
770    #[test]
771    fn breaking_drop_is_rejected() {
772        let s = store();
773        register(
774            &s,
775            "ev",
776            r#"{"type":"object",
777                "properties":{"a":{"type":"number"},"b":{"type":"boolean"}},
778                "required":["a"]}"#,
779        )
780        .unwrap();
781        let err = register(
782            &s,
783            "ev",
784            r#"{"type":"object","properties":{"a":{"type":"number"}},"required":["a"]}"#,
785        )
786        .unwrap_err();
787        let SchemaError::BreakingChange { offenders, .. } = err else {
788            panic!("expected BreakingChange");
789        };
790        assert!(offenders
791            .iter()
792            .any(|b| matches!(b, BreakingChange::Drop { field } if field == "b")));
793    }
794
795    #[test]
796    fn breaking_optional_to_required_is_rejected() {
797        let s = store();
798        register(
799            &s,
800            "ev",
801            r#"{"type":"object",
802                "properties":{"a":{"type":"number"},"b":{"type":"string"}},
803                "required":["a"]}"#,
804        )
805        .unwrap();
806        let err = register(
807            &s,
808            "ev",
809            r#"{"type":"object",
810                "properties":{"a":{"type":"number"},"b":{"type":"string"}},
811                "required":["a","b"]}"#,
812        )
813        .unwrap_err();
814        let SchemaError::BreakingChange { offenders, .. } = err else {
815            panic!("expected BreakingChange");
816        };
817        assert!(offenders
818            .iter()
819            .any(|b| matches!(b, BreakingChange::RequiredAdd { field } if field == "b")));
820    }
821
822    #[test]
823    fn multi_field_break_reports_every_offender() {
824        let s = store();
825        register(
826            &s,
827            "ev",
828            r#"{"type":"object",
829                "properties":{"a":{"type":"number"},
830                              "b":{"type":"string"},
831                              "c":{"type":"boolean"}},
832                "required":["a"]}"#,
833        )
834        .unwrap();
835        // Retype `a` (number → string), drop `c`, and add brand-new
836        // required field `d`. Three independent breaks in one diff.
837        let err = register(
838            &s,
839            "ev",
840            r#"{"type":"object",
841                "properties":{"a":{"type":"string"},
842                              "b":{"type":"string"},
843                              "d":{"type":"integer"}},
844                "required":["a","d"]}"#,
845        )
846        .unwrap_err();
847        let SchemaError::BreakingChange { offenders, .. } = err else {
848            panic!("expected BreakingChange");
849        };
850        assert!(offenders
851            .iter()
852            .any(|b| matches!(b, BreakingChange::Retype { field, .. } if field == "a")));
853        assert!(offenders
854            .iter()
855            .any(|b| matches!(b, BreakingChange::Drop { field } if field == "c")));
856        assert!(offenders
857            .iter()
858            .any(|b| matches!(b, BreakingChange::RequiredAdd { field } if field == "d")));
859    }
860
861    #[test]
862    fn validate_resolves_to_latest_version_after_evolution() {
863        // After an additive evolution, validate() must use v2's
864        // strict-properties set — a payload using only v1 fields
865        // still passes; a payload using v2's new optional field
866        // also passes; an unknown field still rejects.
867        let s = store();
868        register(&s, "purchase", PURCHASE_V1).unwrap();
869        register(
870            &s,
871            "purchase",
872            r#"{"type":"object",
873                "properties":{"amount":{"type":"number"},
874                              "discount_code":{"type":"string"}},
875                "required":["amount"]}"#,
876        )
877        .unwrap();
878        validate(&s, "purchase", r#"{"amount":1.0}"#).expect("v1-shape still valid");
879        validate(&s, "purchase", r#"{"amount":1.0,"discount_code":"X"}"#)
880            .expect("v2-only field accepted");
881        let err = validate(&s, "purchase", r#"{"amount":1.0,"mystery":1}"#).unwrap_err();
882        assert!(matches!(err, ValidationError::UnknownField { version, .. } if version == 2));
883    }
884
885    #[test]
886    fn list_returns_every_version_not_just_latest() {
887        // red.schema_registry virtual table is fed by list(); slice 3
888        // contract is "every version, not just the latest".
889        let s = store();
890        register(&s, "purchase", PURCHASE_V1).unwrap();
891        register(
892            &s,
893            "purchase",
894            r#"{"type":"object",
895                "properties":{"amount":{"type":"number"},
896                              "discount_code":{"type":"string"}},
897                "required":["amount"]}"#,
898        )
899        .unwrap();
900        let purchase_versions: Vec<u32> = list(&s)
901            .into_iter()
902            .filter(|e| e.event_name == "purchase")
903            .map(|e| e.version)
904            .collect();
905        let mut sorted = purchase_versions.clone();
906        sorted.sort();
907        assert_eq!(sorted, vec![1, 2], "expected both versions, got {purchase_versions:?}");
908    }
909
910    #[test]
911    fn invalid_schema_json_rejected_at_register() {
912        let s = store();
913        let err = register(&s, "x", "{not json").unwrap_err();
914        assert!(matches!(err, SchemaError::InvalidSchemaJson(_)));
915    }
916
917    #[test]
918    fn schema_must_be_type_object() {
919        let s = store();
920        let err = register(&s, "x", r#"{"type":"string"}"#).unwrap_err();
921        assert!(matches!(err, SchemaError::InvalidSchemaShape(_)));
922    }
923
924    #[test]
925    fn validate_happy_path_accepts_known_fields() {
926        let s = store();
927        register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
928        validate(&s, "page_view", r#"{"url":"/x","user_id":42}"#).expect("ok");
929        validate(&s, "page_view", r#"{"url":"/y"}"#).expect("ok without optional");
930    }
931
932    #[test]
933    fn validate_rejects_unknown_field() {
934        let s = store();
935        register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
936        let err = validate(&s, "page_view", r#"{"url":"/x","mystery":1}"#).unwrap_err();
937        match err {
938            ValidationError::UnknownField { field, .. } => assert_eq!(field, "mystery"),
939            other => panic!("expected UnknownField, got {other:?}"),
940        }
941    }
942
943    #[test]
944    fn validate_rejects_missing_required_field() {
945        let s = store();
946        register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
947        let err = validate(&s, "page_view", r#"{}"#).unwrap_err();
948        match err {
949            ValidationError::MissingRequiredField { field, .. } => assert_eq!(field, "url"),
950            other => panic!("expected MissingRequiredField, got {other:?}"),
951        }
952    }
953
954    #[test]
955    fn validate_rejects_type_mismatch() {
956        let s = store();
957        register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
958        let err = validate(&s, "page_view", r#"{"url":123}"#).unwrap_err();
959        match err {
960            ValidationError::TypeMismatch {
961                field, expected, got, ..
962            } => {
963                assert_eq!(field, "url");
964                assert_eq!(expected, "string");
965                assert_eq!(got, "number");
966            }
967            other => panic!("expected TypeMismatch, got {other:?}"),
968        }
969    }
970
971    #[test]
972    fn validate_unknown_event_name() {
973        let s = store();
974        let err = validate(&s, "nope", r#"{}"#).unwrap_err();
975        assert!(matches!(err, ValidationError::UnknownEventName(name) if name == "nope"));
976    }
977
978    #[test]
979    fn validate_payload_must_be_object() {
980        let s = store();
981        register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
982        let err = validate(&s, "page_view", r#""hello""#).unwrap_err();
983        assert!(matches!(err, ValidationError::PayloadNotObject));
984    }
985
986    #[test]
987    fn list_returns_every_registered_event() {
988        let s = store();
989        register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
990        register(
991            &s,
992            "signup",
993            r#"{"type":"object","properties":{"email":{"type":"string"}},"required":["email"]}"#,
994        )
995        .unwrap();
996        let mut names: Vec<String> = list(&s).into_iter().map(|e| e.event_name).collect();
997        names.sort();
998        assert_eq!(names, vec!["page_view".to_string(), "signup".to_string()]);
999        assert!(list(&s).iter().all(|e| e.version == 1));
1000        assert!(list(&s).iter().all(|e| e.registered_at_ms > 0));
1001    }
1002
1003    #[test]
1004    fn persistence_smoke_latest_survives_restart() {
1005        // Slice-2 "engine restart" is simulated by handing the same
1006        // store handle to a second `latest()` call after the
1007        // original `register` returns. The real engine restart wires
1008        // through the same `UnifiedStore` API — we exercise the
1009        // serialise/deserialise path here, which is what survives
1010        // process restart on a durable backend.
1011        let s = store();
1012        register(&s, "page_view", PAGE_VIEW_SCHEMA).unwrap();
1013        let raw =
1014            read_latest_registry_json(&s).expect("registry json must be persisted on register");
1015        assert!(raw.contains("page_view"));
1016        // Round-trip through a fresh load that reuses only the
1017        // public read path:
1018        let (v, schema) = latest(&s, "page_view").expect("latest after persist");
1019        assert_eq!(v, 1);
1020        assert!(schema.contains("\"url\""));
1021    }
1022
1023    #[test]
1024    fn validation_error_maps_to_invalid_operation_with_typed_marker() {
1025        let err = validation_error_to_reddb(ValidationError::MissingRequiredField {
1026            event_name: "page_view".to_string(),
1027            version: 1,
1028            field: "url".to_string(),
1029        });
1030        match err {
1031            crate::api::RedDBError::InvalidOperation(body) => {
1032                assert!(
1033                    body.starts_with("AnalyticsSchemaError:MissingRequiredField:page_view:v1:url"),
1034                    "unexpected body: {body}"
1035                );
1036            }
1037            other => panic!("expected InvalidOperation, got {other:?}"),
1038        }
1039    }
1040}