1use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::procedure::{
9 HandlerFn, ProcedureDef, ProcedureType, SubscriptionDef, SubscriptionHandlerFn,
10};
11
12pub struct IncomingDef {
13 pub input_schema: Value,
14 pub output_schema: Value,
15 pub error_schema: Option<Value>,
16 pub handler: HandlerFn,
17}
18
19pub struct ChannelDef {
20 pub name: String,
21 pub input_schema: Value,
22 pub incoming: Vec<(String, IncomingDef)>,
23 pub outgoing: Vec<(String, Value)>,
24 pub subscribe_handler: SubscriptionHandlerFn,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ChannelMeta {
29 pub input: Value,
30 pub incoming: BTreeMap<String, IncomingMeta>,
31 pub outgoing: BTreeMap<String, Value>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct IncomingMeta {
36 pub input: Value,
37 pub output: Value,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
39 pub error: Option<Value>,
40}
41
42impl ChannelDef {
43 pub fn expand(self) -> (Vec<ProcedureDef>, Vec<SubscriptionDef>, ChannelMeta) {
45 let mut procedures = Vec::new();
46 let mut incoming_meta = BTreeMap::new();
47
48 for (msg_name, msg_def) in self.incoming {
49 let merged_input = merge_object_schemas(&self.input_schema, &msg_def.input_schema);
50
51 procedures.push(ProcedureDef {
52 name: format!("{}.{}", self.name, msg_name),
53 proc_type: ProcedureType::Command,
54 input_schema: merged_input,
55 output_schema: msg_def.output_schema.clone(),
56 error_schema: msg_def.error_schema.clone(),
57 handler: msg_def.handler,
58 });
59
60 incoming_meta.insert(
61 msg_name,
62 IncomingMeta {
63 input: msg_def.input_schema,
64 output: msg_def.output_schema,
65 error: msg_def.error_schema,
66 },
67 );
68 }
69
70 let mut mapping = serde_json::Map::new();
72 let mut outgoing_meta = BTreeMap::new();
73 for (event_name, payload_schema) in &self.outgoing {
74 let mut variant = serde_json::Map::new();
75 let mut props = serde_json::Map::new();
76 props.insert("payload".to_string(), payload_schema.clone());
77 variant.insert("properties".to_string(), Value::Object(props));
78 mapping.insert(event_name.clone(), Value::Object(variant));
79 outgoing_meta.insert(event_name.clone(), payload_schema.clone());
80 }
81 let union_schema = serde_json::json!({
82 "discriminator": "type",
83 "mapping": Value::Object(mapping)
84 });
85
86 let subscriptions = vec![SubscriptionDef {
87 name: format!("{}.events", self.name),
88 input_schema: self.input_schema.clone(),
89 output_schema: union_schema,
90 error_schema: None,
91 handler: self.subscribe_handler,
92 }];
93
94 let meta =
95 ChannelMeta { input: self.input_schema, incoming: incoming_meta, outgoing: outgoing_meta };
96
97 (procedures, subscriptions, meta)
98 }
99}
100
101fn merge_object_schemas(channel: &Value, message: &Value) -> Value {
104 let mut merged = serde_json::Map::new();
105
106 let ch_props = channel.get("properties").and_then(|v| v.as_object());
107 let ch_opt = channel.get("optionalProperties").and_then(|v| v.as_object());
108 let msg_props = message.get("properties").and_then(|v| v.as_object());
109 let msg_opt = message.get("optionalProperties").and_then(|v| v.as_object());
110
111 let mut props = serde_json::Map::new();
112 if let Some(p) = ch_props {
113 props.extend(p.clone());
114 }
115 if let Some(p) = msg_props {
116 props.extend(p.clone());
117 }
118
119 let mut opt_props = serde_json::Map::new();
120 if let Some(p) = ch_opt {
121 opt_props.extend(p.clone());
122 }
123 if let Some(p) = msg_opt {
124 opt_props.extend(p.clone());
125 }
126
127 if !props.is_empty() || opt_props.is_empty() {
130 merged.insert("properties".to_string(), Value::Object(props));
131 }
132 if !opt_props.is_empty() {
133 merged.insert("optionalProperties".to_string(), Value::Object(opt_props));
134 }
135
136 Value::Object(merged)
137}
138
139#[cfg(test)]
140mod tests {
141 use std::sync::Arc;
142
143 use super::*;
144 use crate::procedure::BoxStream;
145
146 fn dummy_handler() -> HandlerFn {
147 Arc::new(|_| Box::pin(async { Ok(serde_json::json!({})) }))
148 }
149
150 struct EmptyStream;
151
152 impl futures_core::Stream for EmptyStream {
153 type Item = Result<serde_json::Value, crate::errors::SeamError>;
154 fn poll_next(
155 self: std::pin::Pin<&mut Self>,
156 _cx: &mut std::task::Context<'_>,
157 ) -> std::task::Poll<Option<Self::Item>> {
158 std::task::Poll::Ready(None)
159 }
160 }
161
162 fn dummy_sub_handler() -> SubscriptionHandlerFn {
163 Arc::new(|_| {
164 Box::pin(async {
165 let stream: BoxStream<Result<serde_json::Value, crate::errors::SeamError>> =
166 Box::pin(EmptyStream);
167 Ok(stream)
168 })
169 })
170 }
171
172 #[test]
173 fn expand_produces_commands_and_subscription() {
174 let channel = ChannelDef {
175 name: "chat".to_string(),
176 input_schema: serde_json::json!({"properties": {"roomId": {"type": "string"}}}),
177 incoming: vec![(
178 "send".to_string(),
179 IncomingDef {
180 input_schema: serde_json::json!({"properties": {"text": {"type": "string"}}}),
181 output_schema: serde_json::json!({"properties": {"ok": {"type": "boolean"}}}),
182 error_schema: None,
183 handler: dummy_handler(),
184 },
185 )],
186 outgoing: vec![(
187 "message".to_string(),
188 serde_json::json!({"properties": {"text": {"type": "string"}}}),
189 )],
190 subscribe_handler: dummy_sub_handler(),
191 };
192
193 let (procs, subs, meta) = channel.expand();
194
195 assert_eq!(procs.len(), 1);
197 assert_eq!(procs[0].name, "chat.send");
198 assert_eq!(procs[0].proc_type, ProcedureType::Command);
199
200 let input = &procs[0].input_schema;
202 assert!(input["properties"]["roomId"].is_object());
203 assert!(input["properties"]["text"].is_object());
204
205 assert_eq!(subs.len(), 1);
207 assert_eq!(subs[0].name, "chat.events");
208
209 assert_eq!(subs[0].output_schema["discriminator"], "type");
211 assert!(subs[0].output_schema["mapping"]["message"].is_object());
212
213 assert_eq!(meta.input, serde_json::json!({"properties": {"roomId": {"type": "string"}}}));
215 assert!(meta.incoming.contains_key("send"));
216 assert_eq!(
217 meta.incoming["send"].input,
218 serde_json::json!({"properties": {"text": {"type": "string"}}})
219 );
220 assert!(meta.outgoing.contains_key("message"));
221 }
222
223 #[test]
224 fn expand_multiple_incoming_and_outgoing() {
225 let channel = ChannelDef {
226 name: "game".to_string(),
227 input_schema: serde_json::json!({"properties": {"gameId": {"type": "string"}}}),
228 incoming: vec![
229 (
230 "move".to_string(),
231 IncomingDef {
232 input_schema: serde_json::json!({"properties": {"x": {"type": "int32"}}}),
233 output_schema: serde_json::json!({}),
234 error_schema: Some(serde_json::json!({"properties": {"code": {"type": "string"}}})),
235 handler: dummy_handler(),
236 },
237 ),
238 (
239 "resign".to_string(),
240 IncomingDef {
241 input_schema: serde_json::json!({}),
242 output_schema: serde_json::json!({}),
243 error_schema: None,
244 handler: dummy_handler(),
245 },
246 ),
247 ],
248 outgoing: vec![
249 ("moved".to_string(), serde_json::json!({"properties": {"x": {"type": "int32"}}})),
250 ("ended".to_string(), serde_json::json!({"properties": {"winner": {"type": "string"}}})),
251 ],
252 subscribe_handler: dummy_sub_handler(),
253 };
254
255 let (procs, subs, meta) = channel.expand();
256
257 assert_eq!(procs.len(), 2);
258 assert_eq!(procs[0].name, "game.move");
259 assert_eq!(procs[1].name, "game.resign");
260 assert!(procs[0].error_schema.is_some());
261 assert!(procs[1].error_schema.is_none());
262
263 assert_eq!(subs.len(), 1);
264 assert_eq!(meta.incoming.len(), 2);
265 assert_eq!(meta.outgoing.len(), 2);
266 assert!(meta.incoming["move"].error.is_some());
267 assert!(meta.incoming["resign"].error.is_none());
268 }
269
270 #[test]
271 fn merge_object_schemas_combines_properties() {
272 let channel = serde_json::json!({
273 "properties": {"a": {"type": "string"}},
274 "optionalProperties": {"b": {"type": "int32"}}
275 });
276 let message = serde_json::json!({
277 "properties": {"c": {"type": "boolean"}}
278 });
279 let merged = merge_object_schemas(&channel, &message);
280 assert!(merged["properties"]["a"].is_object());
281 assert!(merged["properties"]["c"].is_object());
282 assert!(merged["optionalProperties"]["b"].is_object());
283 }
284
285 #[test]
286 fn merge_empty_schemas() {
287 let merged = merge_object_schemas(&serde_json::json!({}), &serde_json::json!({}));
288 assert_eq!(merged, serde_json::json!({"properties": {}}));
290 }
291}