1use std::collections::HashMap;
13use std::fmt;
14
15use jsonschema::draft202012;
16
17use crate::schema_materialization::{
18 schema_properties, schema_property_to_context_type, schema_required_fields,
19};
20use crate::{AdapterProvides, EventId, EventPayload, EventTime, ExternalEvent, ExternalEventKind};
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23#[non_exhaustive]
24pub enum EventBindingError {
25 UnknownSemanticKind {
26 kind: String,
27 },
28 InvalidSchema {
29 kind: String,
30 detail: String,
31 },
32 PayloadSchemaMismatch {
33 kind: String,
34 detail: String,
35 },
36 PayloadMustBeObject {
37 kind: String,
38 },
39 UnsupportedFieldType {
40 kind: String,
41 field: String,
42 detail: String,
43 },
44 MissingContextProvision {
45 kind: String,
46 field: String,
47 },
48 ContextTypeMismatch {
49 kind: String,
50 field: String,
51 expected: String,
52 got: String,
53 },
54}
55
56impl fmt::Display for EventBindingError {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 match self {
59 Self::UnknownSemanticKind { kind } => {
60 write!(f, "unknown semantic event kind '{kind}'")
61 }
62 Self::InvalidSchema { kind, detail } => {
63 write!(f, "invalid schema for semantic event kind '{kind}': {detail}")
64 }
65 Self::PayloadSchemaMismatch { kind, detail } => {
66 write!(f, "payload does not match schema for semantic event kind '{kind}': {detail}")
67 }
68 Self::PayloadMustBeObject { kind } => {
69 write!(f, "semantic event payload for kind '{kind}' must be a JSON object")
70 }
71 Self::UnsupportedFieldType {
72 kind,
73 field,
74 detail,
75 } => write!(
76 f,
77 "unsupported field type for '{kind}.{field}' in semantic event schema: {detail}"
78 ),
79 Self::MissingContextProvision { kind, field } => write!(
80 f,
81 "semantic event field '{kind}.{field}' is required but no matching adapter context key exists"
82 ),
83 Self::ContextTypeMismatch {
84 kind,
85 field,
86 expected,
87 got,
88 } => write!(
89 f,
90 "semantic event field '{kind}.{field}' type mismatch: expected context type '{expected}', got '{got}'"
91 ),
92 }
93 }
94}
95
96impl std::error::Error for EventBindingError {}
97
98#[derive(Debug)]
99pub struct EventBinder {
100 compiled: HashMap<String, CompiledSemanticKind>,
101}
102
103#[derive(Debug)]
104struct CompiledSemanticKind {
105 validator: jsonschema::Validator,
106}
107
108pub fn compile_event_binder(provides: &AdapterProvides) -> Result<EventBinder, EventBindingError> {
109 let mut compiled = HashMap::new();
110
111 for (semantic_kind, schema) in &provides.event_schemas {
112 let validator =
113 draft202012::new(schema).map_err(|err| EventBindingError::InvalidSchema {
114 kind: semantic_kind.clone(),
115 detail: err.to_string(),
116 })?;
117
118 let schema_object = schema
119 .as_object()
120 .ok_or_else(|| EventBindingError::InvalidSchema {
121 kind: semantic_kind.clone(),
122 detail: "schema must be a JSON object".to_string(),
123 })?;
124
125 let required_fields = schema_required_fields(schema_object);
126 let properties = schema_properties(schema_object);
127
128 for field in &required_fields {
129 let field_schema = properties.and_then(|map| map.get(*field)).ok_or_else(|| {
130 EventBindingError::UnsupportedFieldType {
131 kind: semantic_kind.clone(),
132 field: field.to_string(),
133 detail: "required field is not declared in payload_schema.properties"
134 .to_string(),
135 }
136 })?;
137
138 let expected_context_type =
139 schema_property_to_context_type(field_schema).map_err(|detail| {
140 EventBindingError::UnsupportedFieldType {
141 kind: semantic_kind.clone(),
142 field: field.to_string(),
143 detail,
144 }
145 })?;
146
147 let Some(context_key) = provides.context.get(*field) else {
148 return Err(EventBindingError::MissingContextProvision {
149 kind: semantic_kind.clone(),
150 field: field.to_string(),
151 });
152 };
153
154 if context_key.ty != expected_context_type {
155 return Err(EventBindingError::ContextTypeMismatch {
156 kind: semantic_kind.clone(),
157 field: field.to_string(),
158 expected: expected_context_type.to_string(),
159 got: context_key.ty.clone(),
160 });
161 }
162 }
163
164 compiled.insert(semantic_kind.clone(), CompiledSemanticKind { validator });
165 }
166
167 Ok(EventBinder { compiled })
168}
169
170pub fn bind_semantic_event_with_binder(
171 binder: &EventBinder,
172 event_id: EventId,
173 kind: ExternalEventKind,
174 at: EventTime,
175 semantic_kind: &str,
176 payload: serde_json::Value,
177) -> Result<ExternalEvent, EventBindingError> {
178 let compiled_kind = binder.compiled.get(semantic_kind).ok_or_else(|| {
179 EventBindingError::UnknownSemanticKind {
180 kind: semantic_kind.to_string(),
181 }
182 })?;
183
184 if let Err(err) = compiled_kind.validator.validate(&payload) {
185 return Err(EventBindingError::PayloadSchemaMismatch {
186 kind: semantic_kind.to_string(),
187 detail: err.to_string(),
188 });
189 }
190
191 let _payload_object =
192 payload
193 .as_object()
194 .ok_or_else(|| EventBindingError::PayloadMustBeObject {
195 kind: semantic_kind.to_string(),
196 })?;
197
198 let bytes =
199 serde_json::to_vec(&payload).map_err(|err| EventBindingError::PayloadSchemaMismatch {
200 kind: semantic_kind.to_string(),
201 detail: err.to_string(),
202 })?;
203
204 ExternalEvent::with_payload(event_id, kind, at, EventPayload { data: bytes }).map_err(|err| {
205 EventBindingError::PayloadSchemaMismatch {
206 kind: semantic_kind.to_string(),
207 detail: err.to_string(),
208 }
209 })
210}