1use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::procedure::{
9 HandlerFn, ProcedureDef, ProcedureType, SubscriptionDef, SubscriptionHandlerFn,
10};
11
12pub struct IncomingDef {
13 pub input_schema: Value,
14 pub output_schema: Value,
15 pub error_schema: Option<Value>,
16 pub handler: HandlerFn,
17}
18
19pub struct ChannelDef {
20 pub name: String,
21 pub input_schema: Value,
22 pub incoming: Vec<(String, IncomingDef)>,
23 pub outgoing: Vec<(String, Value)>,
24 pub subscribe_handler: SubscriptionHandlerFn,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ChannelMeta {
29 pub input: Value,
30 pub incoming: BTreeMap<String, IncomingMeta>,
31 pub outgoing: BTreeMap<String, Value>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct IncomingMeta {
36 pub input: Value,
37 pub output: Value,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
39 pub error: Option<Value>,
40}
41
42impl ChannelDef {
43 pub fn expand(self) -> (Vec<ProcedureDef>, Vec<SubscriptionDef>, ChannelMeta) {
45 let mut procedures = Vec::new();
46 let mut incoming_meta = BTreeMap::new();
47
48 for (msg_name, msg_def) in self.incoming {
49 let merged_input = merge_object_schemas(&self.input_schema, &msg_def.input_schema);
50
51 procedures.push(ProcedureDef {
52 name: format!("{}.{}", self.name, msg_name),
53 proc_type: ProcedureType::Command,
54 input_schema: merged_input,
55 output_schema: msg_def.output_schema.clone(),
56 error_schema: msg_def.error_schema.clone(),
57 context_keys: vec![],
58 handler: msg_def.handler,
59 });
60
61 incoming_meta.insert(
62 msg_name,
63 IncomingMeta {
64 input: msg_def.input_schema,
65 output: msg_def.output_schema,
66 error: msg_def.error_schema,
67 },
68 );
69 }
70
71 let mut mapping = serde_json::Map::new();
73 let mut outgoing_meta = BTreeMap::new();
74 for (event_name, payload_schema) in &self.outgoing {
75 let mut variant = serde_json::Map::new();
76 let mut props = serde_json::Map::new();
77 props.insert("payload".to_string(), payload_schema.clone());
78 variant.insert("properties".to_string(), Value::Object(props));
79 mapping.insert(event_name.clone(), Value::Object(variant));
80 outgoing_meta.insert(event_name.clone(), payload_schema.clone());
81 }
82 let union_schema = serde_json::json!({
83 "discriminator": "type",
84 "mapping": Value::Object(mapping)
85 });
86
87 let subscriptions = vec![SubscriptionDef {
88 name: format!("{}.events", self.name),
89 input_schema: self.input_schema.clone(),
90 output_schema: union_schema,
91 error_schema: None,
92 context_keys: vec![],
93 handler: self.subscribe_handler,
94 }];
95
96 let meta =
97 ChannelMeta { input: self.input_schema, incoming: incoming_meta, outgoing: outgoing_meta };
98
99 (procedures, subscriptions, meta)
100 }
101}
102
103fn merge_object_schemas(channel: &Value, message: &Value) -> Value {
106 let mut merged = serde_json::Map::new();
107
108 let ch_props = channel.get("properties").and_then(|v| v.as_object());
109 let ch_opt = channel.get("optionalProperties").and_then(|v| v.as_object());
110 let msg_props = message.get("properties").and_then(|v| v.as_object());
111 let msg_opt = message.get("optionalProperties").and_then(|v| v.as_object());
112
113 let mut props = serde_json::Map::new();
114 if let Some(p) = ch_props {
115 props.extend(p.clone());
116 }
117 if let Some(p) = msg_props {
118 props.extend(p.clone());
119 }
120
121 let mut opt_props = serde_json::Map::new();
122 if let Some(p) = ch_opt {
123 opt_props.extend(p.clone());
124 }
125 if let Some(p) = msg_opt {
126 opt_props.extend(p.clone());
127 }
128
129 if !props.is_empty() || opt_props.is_empty() {
132 merged.insert("properties".to_string(), Value::Object(props));
133 }
134 if !opt_props.is_empty() {
135 merged.insert("optionalProperties".to_string(), Value::Object(opt_props));
136 }
137
138 Value::Object(merged)
139}
140
141#[cfg(test)]
142mod tests {
143 use std::sync::Arc;
144
145 use super::*;
146 use crate::procedure::BoxStream;
147
148 fn dummy_handler() -> HandlerFn {
149 Arc::new(|_, _| Box::pin(async { Ok(serde_json::json!({})) }))
150 }
151
152 struct EmptyStream;
153
154 impl futures_core::Stream for EmptyStream {
155 type Item = Result<serde_json::Value, crate::errors::SeamError>;
156 fn poll_next(
157 self: std::pin::Pin<&mut Self>,
158 _cx: &mut std::task::Context<'_>,
159 ) -> std::task::Poll<Option<Self::Item>> {
160 std::task::Poll::Ready(None)
161 }
162 }
163
164 fn dummy_sub_handler() -> SubscriptionHandlerFn {
165 Arc::new(|_, _| {
166 Box::pin(async {
167 let stream: BoxStream<Result<serde_json::Value, crate::errors::SeamError>> =
168 Box::pin(EmptyStream);
169 Ok(stream)
170 })
171 })
172 }
173
174 #[test]
175 fn expand_produces_commands_and_subscription() {
176 let channel = ChannelDef {
177 name: "chat".to_string(),
178 input_schema: serde_json::json!({"properties": {"roomId": {"type": "string"}}}),
179 incoming: vec![(
180 "send".to_string(),
181 IncomingDef {
182 input_schema: serde_json::json!({"properties": {"text": {"type": "string"}}}),
183 output_schema: serde_json::json!({"properties": {"ok": {"type": "boolean"}}}),
184 error_schema: None,
185 handler: dummy_handler(),
186 },
187 )],
188 outgoing: vec![(
189 "message".to_string(),
190 serde_json::json!({"properties": {"text": {"type": "string"}}}),
191 )],
192 subscribe_handler: dummy_sub_handler(),
193 };
194
195 let (procs, subs, meta) = channel.expand();
196
197 assert_eq!(procs.len(), 1);
199 assert_eq!(procs[0].name, "chat.send");
200 assert_eq!(procs[0].proc_type, ProcedureType::Command);
201
202 let input = &procs[0].input_schema;
204 assert!(input["properties"]["roomId"].is_object());
205 assert!(input["properties"]["text"].is_object());
206
207 assert_eq!(subs.len(), 1);
209 assert_eq!(subs[0].name, "chat.events");
210
211 assert_eq!(subs[0].output_schema["discriminator"], "type");
213 assert!(subs[0].output_schema["mapping"]["message"].is_object());
214
215 assert_eq!(meta.input, serde_json::json!({"properties": {"roomId": {"type": "string"}}}));
217 assert!(meta.incoming.contains_key("send"));
218 assert_eq!(
219 meta.incoming["send"].input,
220 serde_json::json!({"properties": {"text": {"type": "string"}}})
221 );
222 assert!(meta.outgoing.contains_key("message"));
223 }
224
225 #[test]
226 fn expand_multiple_incoming_and_outgoing() {
227 let channel = ChannelDef {
228 name: "game".to_string(),
229 input_schema: serde_json::json!({"properties": {"gameId": {"type": "string"}}}),
230 incoming: vec![
231 (
232 "move".to_string(),
233 IncomingDef {
234 input_schema: serde_json::json!({"properties": {"x": {"type": "int32"}}}),
235 output_schema: serde_json::json!({}),
236 error_schema: Some(serde_json::json!({"properties": {"code": {"type": "string"}}})),
237 handler: dummy_handler(),
238 },
239 ),
240 (
241 "resign".to_string(),
242 IncomingDef {
243 input_schema: serde_json::json!({}),
244 output_schema: serde_json::json!({}),
245 error_schema: None,
246 handler: dummy_handler(),
247 },
248 ),
249 ],
250 outgoing: vec![
251 ("moved".to_string(), serde_json::json!({"properties": {"x": {"type": "int32"}}})),
252 ("ended".to_string(), serde_json::json!({"properties": {"winner": {"type": "string"}}})),
253 ],
254 subscribe_handler: dummy_sub_handler(),
255 };
256
257 let (procs, subs, meta) = channel.expand();
258
259 assert_eq!(procs.len(), 2);
260 assert_eq!(procs[0].name, "game.move");
261 assert_eq!(procs[1].name, "game.resign");
262 assert!(procs[0].error_schema.is_some());
263 assert!(procs[1].error_schema.is_none());
264
265 assert_eq!(subs.len(), 1);
266 assert_eq!(meta.incoming.len(), 2);
267 assert_eq!(meta.outgoing.len(), 2);
268 assert!(meta.incoming["move"].error.is_some());
269 assert!(meta.incoming["resign"].error.is_none());
270 }
271
272 #[test]
273 fn merge_object_schemas_combines_properties() {
274 let channel = serde_json::json!({
275 "properties": {"a": {"type": "string"}},
276 "optionalProperties": {"b": {"type": "int32"}}
277 });
278 let message = serde_json::json!({
279 "properties": {"c": {"type": "boolean"}}
280 });
281 let merged = merge_object_schemas(&channel, &message);
282 assert!(merged["properties"]["a"].is_object());
283 assert!(merged["properties"]["c"].is_object());
284 assert!(merged["optionalProperties"]["b"].is_object());
285 }
286
287 #[test]
288 fn merge_empty_schemas() {
289 let merged = merge_object_schemas(&serde_json::json!({}), &serde_json::json!({}));
290 assert_eq!(merged, serde_json::json!({"properties": {}}));
292 }
293}