1use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::procedure::{
9 HandlerFn, ProcedureDef, ProcedureType, SubscriptionDef, SubscriptionHandlerFn,
10};
11
12pub struct IncomingDef {
13 pub input_schema: Value,
14 pub output_schema: Value,
15 pub error_schema: Option<Value>,
16 pub handler: HandlerFn,
17}
18
19pub struct ChannelDef {
20 pub name: String,
21 pub input_schema: Value,
22 pub incoming: Vec<(String, IncomingDef)>,
23 pub outgoing: Vec<(String, Value)>,
24 pub subscribe_handler: SubscriptionHandlerFn,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ChannelMeta {
29 pub input: Value,
30 pub incoming: BTreeMap<String, IncomingMeta>,
31 pub outgoing: BTreeMap<String, Value>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct IncomingMeta {
36 pub input: Value,
37 pub output: Value,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
39 pub error: Option<Value>,
40}
41
42impl ChannelDef {
43 pub fn expand(self) -> (Vec<ProcedureDef>, Vec<SubscriptionDef>, ChannelMeta) {
45 let mut procedures = Vec::new();
46 let mut incoming_meta = BTreeMap::new();
47
48 for (msg_name, msg_def) in self.incoming {
49 let merged_input = merge_object_schemas(&self.input_schema, &msg_def.input_schema);
50
51 procedures.push(ProcedureDef {
52 name: format!("{}.{}", self.name, msg_name),
53 proc_type: ProcedureType::Command,
54 input_schema: merged_input,
55 output_schema: msg_def.output_schema.clone(),
56 error_schema: msg_def.error_schema.clone(),
57 context_keys: vec![],
58 suppress: None,
59 cache: None,
60 handler: msg_def.handler,
61 });
62
63 incoming_meta.insert(
64 msg_name,
65 IncomingMeta {
66 input: msg_def.input_schema,
67 output: msg_def.output_schema,
68 error: msg_def.error_schema,
69 },
70 );
71 }
72
73 let mut mapping = serde_json::Map::new();
75 let mut outgoing_meta = BTreeMap::new();
76 for (event_name, payload_schema) in &self.outgoing {
77 let mut variant = serde_json::Map::new();
78 let mut props = serde_json::Map::new();
79 props.insert("payload".to_string(), payload_schema.clone());
80 variant.insert("properties".to_string(), Value::Object(props));
81 mapping.insert(event_name.clone(), Value::Object(variant));
82 outgoing_meta.insert(event_name.clone(), payload_schema.clone());
83 }
84 let union_schema = serde_json::json!({
85 "discriminator": "type",
86 "mapping": Value::Object(mapping)
87 });
88
89 let subscriptions = vec![SubscriptionDef {
90 name: format!("{}.events", self.name),
91 input_schema: self.input_schema.clone(),
92 output_schema: union_schema,
93 error_schema: None,
94 context_keys: vec![],
95 suppress: None,
96 handler: self.subscribe_handler,
97 }];
98
99 let meta =
100 ChannelMeta { input: self.input_schema, incoming: incoming_meta, outgoing: outgoing_meta };
101
102 (procedures, subscriptions, meta)
103 }
104}
105
106fn merge_object_schemas(channel: &Value, message: &Value) -> Value {
109 let mut merged = serde_json::Map::new();
110
111 let ch_props = channel.get("properties").and_then(|v| v.as_object());
112 let ch_opt = channel.get("optionalProperties").and_then(|v| v.as_object());
113 let msg_props = message.get("properties").and_then(|v| v.as_object());
114 let msg_opt = message.get("optionalProperties").and_then(|v| v.as_object());
115
116 let mut props = serde_json::Map::new();
117 if let Some(p) = ch_props {
118 props.extend(p.clone());
119 }
120 if let Some(p) = msg_props {
121 props.extend(p.clone());
122 }
123
124 let mut opt_props = serde_json::Map::new();
125 if let Some(p) = ch_opt {
126 opt_props.extend(p.clone());
127 }
128 if let Some(p) = msg_opt {
129 opt_props.extend(p.clone());
130 }
131
132 if !props.is_empty() || opt_props.is_empty() {
135 merged.insert("properties".to_string(), Value::Object(props));
136 }
137 if !opt_props.is_empty() {
138 merged.insert("optionalProperties".to_string(), Value::Object(opt_props));
139 }
140
141 Value::Object(merged)
142}
143
144#[cfg(test)]
145mod tests {
146 use std::sync::Arc;
147
148 use super::*;
149 use crate::procedure::BoxStream;
150
151 fn dummy_handler() -> HandlerFn {
152 Arc::new(|_, _| Box::pin(async { Ok(serde_json::json!({})) }))
153 }
154
155 struct EmptyStream;
156
157 impl futures_core::Stream for EmptyStream {
158 type Item = Result<serde_json::Value, crate::errors::SeamError>;
159 fn poll_next(
160 self: std::pin::Pin<&mut Self>,
161 _cx: &mut std::task::Context<'_>,
162 ) -> std::task::Poll<Option<Self::Item>> {
163 std::task::Poll::Ready(None)
164 }
165 }
166
167 fn dummy_sub_handler() -> SubscriptionHandlerFn {
168 Arc::new(|_params| {
169 Box::pin(async {
170 let stream: BoxStream<Result<serde_json::Value, crate::errors::SeamError>> =
171 Box::pin(EmptyStream);
172 Ok(stream)
173 })
174 })
175 }
176
177 #[test]
178 fn expand_produces_commands_and_subscription() {
179 let channel = ChannelDef {
180 name: "chat".to_string(),
181 input_schema: serde_json::json!({"properties": {"roomId": {"type": "string"}}}),
182 incoming: vec![(
183 "send".to_string(),
184 IncomingDef {
185 input_schema: serde_json::json!({"properties": {"text": {"type": "string"}}}),
186 output_schema: serde_json::json!({"properties": {"ok": {"type": "boolean"}}}),
187 error_schema: None,
188 handler: dummy_handler(),
189 },
190 )],
191 outgoing: vec![(
192 "message".to_string(),
193 serde_json::json!({"properties": {"text": {"type": "string"}}}),
194 )],
195 subscribe_handler: dummy_sub_handler(),
196 };
197
198 let (procs, subs, meta) = channel.expand();
199
200 assert_eq!(procs.len(), 1);
202 assert_eq!(procs[0].name, "chat.send");
203 assert_eq!(procs[0].proc_type, ProcedureType::Command);
204
205 let input = &procs[0].input_schema;
207 assert!(input["properties"]["roomId"].is_object());
208 assert!(input["properties"]["text"].is_object());
209
210 assert_eq!(subs.len(), 1);
212 assert_eq!(subs[0].name, "chat.events");
213
214 assert_eq!(subs[0].output_schema["discriminator"], "type");
216 assert!(subs[0].output_schema["mapping"]["message"].is_object());
217
218 assert_eq!(meta.input, serde_json::json!({"properties": {"roomId": {"type": "string"}}}));
220 assert!(meta.incoming.contains_key("send"));
221 assert_eq!(
222 meta.incoming["send"].input,
223 serde_json::json!({"properties": {"text": {"type": "string"}}})
224 );
225 assert!(meta.outgoing.contains_key("message"));
226 }
227
228 #[test]
229 fn expand_multiple_incoming_and_outgoing() {
230 let channel = ChannelDef {
231 name: "game".to_string(),
232 input_schema: serde_json::json!({"properties": {"gameId": {"type": "string"}}}),
233 incoming: vec![
234 (
235 "move".to_string(),
236 IncomingDef {
237 input_schema: serde_json::json!({"properties": {"x": {"type": "int32"}}}),
238 output_schema: serde_json::json!({}),
239 error_schema: Some(serde_json::json!({"properties": {"code": {"type": "string"}}})),
240 handler: dummy_handler(),
241 },
242 ),
243 (
244 "resign".to_string(),
245 IncomingDef {
246 input_schema: serde_json::json!({}),
247 output_schema: serde_json::json!({}),
248 error_schema: None,
249 handler: dummy_handler(),
250 },
251 ),
252 ],
253 outgoing: vec![
254 ("moved".to_string(), serde_json::json!({"properties": {"x": {"type": "int32"}}})),
255 ("ended".to_string(), serde_json::json!({"properties": {"winner": {"type": "string"}}})),
256 ],
257 subscribe_handler: dummy_sub_handler(),
258 };
259
260 let (procs, subs, meta) = channel.expand();
261
262 assert_eq!(procs.len(), 2);
263 assert_eq!(procs[0].name, "game.move");
264 assert_eq!(procs[1].name, "game.resign");
265 assert!(procs[0].error_schema.is_some());
266 assert!(procs[1].error_schema.is_none());
267
268 assert_eq!(subs.len(), 1);
269 assert_eq!(meta.incoming.len(), 2);
270 assert_eq!(meta.outgoing.len(), 2);
271 assert!(meta.incoming["move"].error.is_some());
272 assert!(meta.incoming["resign"].error.is_none());
273 }
274
275 #[test]
276 fn merge_object_schemas_combines_properties() {
277 let channel = serde_json::json!({
278 "properties": {"a": {"type": "string"}},
279 "optionalProperties": {"b": {"type": "int32"}}
280 });
281 let message = serde_json::json!({
282 "properties": {"c": {"type": "boolean"}}
283 });
284 let merged = merge_object_schemas(&channel, &message);
285 assert!(merged["properties"]["a"].is_object());
286 assert!(merged["properties"]["c"].is_object());
287 assert!(merged["optionalProperties"]["b"].is_object());
288 }
289
290 #[test]
291 fn merge_empty_schemas() {
292 let merged = merge_object_schemas(&serde_json::json!({}), &serde_json::json!({}));
293 assert_eq!(merged, serde_json::json!({"properties": {}}));
295 }
296}