Skip to main content

seam_server/
channel.rs

1/* src/server/core/rust/src/channel.rs */
2
3use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::procedure::{
9	HandlerFn, ProcedureDef, ProcedureType, SubscriptionDef, SubscriptionHandlerFn,
10};
11
12pub struct IncomingDef {
13	pub input_schema: Value,
14	pub output_schema: Value,
15	pub error_schema: Option<Value>,
16	pub handler: HandlerFn,
17}
18
19pub struct ChannelDef {
20	pub name: String,
21	pub input_schema: Value,
22	pub incoming: Vec<(String, IncomingDef)>,
23	pub outgoing: Vec<(String, Value)>,
24	pub subscribe_handler: SubscriptionHandlerFn,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ChannelMeta {
29	pub input: Value,
30	pub incoming: BTreeMap<String, IncomingMeta>,
31	pub outgoing: BTreeMap<String, Value>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct IncomingMeta {
36	pub input: Value,
37	pub output: Value,
38	#[serde(default, skip_serializing_if = "Option::is_none")]
39	pub error: Option<Value>,
40}
41
42impl ChannelDef {
43	/// Expand channel to Level 0 primitives (commands + subscription) and metadata.
44	pub fn expand(self) -> (Vec<ProcedureDef>, Vec<SubscriptionDef>, ChannelMeta) {
45		let mut procedures = Vec::new();
46		let mut incoming_meta = BTreeMap::new();
47
48		for (msg_name, msg_def) in self.incoming {
49			let merged_input = merge_object_schemas(&self.input_schema, &msg_def.input_schema);
50
51			procedures.push(ProcedureDef {
52				name: format!("{}.{}", self.name, msg_name),
53				proc_type: ProcedureType::Command,
54				input_schema: merged_input,
55				output_schema: msg_def.output_schema.clone(),
56				error_schema: msg_def.error_schema.clone(),
57				context_keys: vec![],
58				suppress: None,
59				cache: None,
60				handler: msg_def.handler,
61			});
62
63			incoming_meta.insert(
64				msg_name,
65				IncomingMeta {
66					input: msg_def.input_schema,
67					output: msg_def.output_schema,
68					error: msg_def.error_schema,
69				},
70			);
71		}
72
73		// Build tagged union schema for outgoing events
74		let mut mapping = serde_json::Map::new();
75		let mut outgoing_meta = BTreeMap::new();
76		for (event_name, payload_schema) in &self.outgoing {
77			let mut variant = serde_json::Map::new();
78			let mut props = serde_json::Map::new();
79			props.insert("payload".to_string(), payload_schema.clone());
80			variant.insert("properties".to_string(), Value::Object(props));
81			mapping.insert(event_name.clone(), Value::Object(variant));
82			outgoing_meta.insert(event_name.clone(), payload_schema.clone());
83		}
84		let union_schema = serde_json::json!({
85			"discriminator": "type",
86			"mapping": Value::Object(mapping)
87		});
88
89		let subscriptions = vec![SubscriptionDef {
90			name: format!("{}.events", self.name),
91			input_schema: self.input_schema.clone(),
92			output_schema: union_schema,
93			error_schema: None,
94			context_keys: vec![],
95			suppress: None,
96			handler: self.subscribe_handler,
97		}];
98
99		let meta =
100			ChannelMeta { input: self.input_schema, incoming: incoming_meta, outgoing: outgoing_meta };
101
102		(procedures, subscriptions, meta)
103	}
104}
105
106/// Merge two JTD object schemas (properties + optionalProperties).
107/// Message properties override channel properties on key collision.
108fn merge_object_schemas(channel: &Value, message: &Value) -> Value {
109	let mut merged = serde_json::Map::new();
110
111	let ch_props = channel.get("properties").and_then(|v| v.as_object());
112	let ch_opt = channel.get("optionalProperties").and_then(|v| v.as_object());
113	let msg_props = message.get("properties").and_then(|v| v.as_object());
114	let msg_opt = message.get("optionalProperties").and_then(|v| v.as_object());
115
116	let mut props = serde_json::Map::new();
117	if let Some(p) = ch_props {
118		props.extend(p.clone());
119	}
120	if let Some(p) = msg_props {
121		props.extend(p.clone());
122	}
123
124	let mut opt_props = serde_json::Map::new();
125	if let Some(p) = ch_opt {
126		opt_props.extend(p.clone());
127	}
128	if let Some(p) = msg_opt {
129		opt_props.extend(p.clone());
130	}
131
132	// Always emit "properties" when there are required props, or when
133	// there are no optional props either (empty schema -> empty properties).
134	if !props.is_empty() || opt_props.is_empty() {
135		merged.insert("properties".to_string(), Value::Object(props));
136	}
137	if !opt_props.is_empty() {
138		merged.insert("optionalProperties".to_string(), Value::Object(opt_props));
139	}
140
141	Value::Object(merged)
142}
143
144#[cfg(test)]
145mod tests {
146	use std::sync::Arc;
147
148	use super::*;
149	use crate::procedure::BoxStream;
150
151	fn dummy_handler() -> HandlerFn {
152		Arc::new(|_, _| Box::pin(async { Ok(serde_json::json!({})) }))
153	}
154
155	struct EmptyStream;
156
157	impl futures_core::Stream for EmptyStream {
158		type Item = Result<serde_json::Value, crate::errors::SeamError>;
159		fn poll_next(
160			self: std::pin::Pin<&mut Self>,
161			_cx: &mut std::task::Context<'_>,
162		) -> std::task::Poll<Option<Self::Item>> {
163			std::task::Poll::Ready(None)
164		}
165	}
166
167	fn dummy_sub_handler() -> SubscriptionHandlerFn {
168		Arc::new(|_params| {
169			Box::pin(async {
170				let stream: BoxStream<Result<serde_json::Value, crate::errors::SeamError>> =
171					Box::pin(EmptyStream);
172				Ok(stream)
173			})
174		})
175	}
176
177	#[test]
178	fn expand_produces_commands_and_subscription() {
179		let channel = ChannelDef {
180			name: "chat".to_string(),
181			input_schema: serde_json::json!({"properties": {"roomId": {"type": "string"}}}),
182			incoming: vec![(
183				"send".to_string(),
184				IncomingDef {
185					input_schema: serde_json::json!({"properties": {"text": {"type": "string"}}}),
186					output_schema: serde_json::json!({"properties": {"ok": {"type": "boolean"}}}),
187					error_schema: None,
188					handler: dummy_handler(),
189				},
190			)],
191			outgoing: vec![(
192				"message".to_string(),
193				serde_json::json!({"properties": {"text": {"type": "string"}}}),
194			)],
195			subscribe_handler: dummy_sub_handler(),
196		};
197
198		let (procs, subs, meta) = channel.expand();
199
200		// One command: chat.send
201		assert_eq!(procs.len(), 1);
202		assert_eq!(procs[0].name, "chat.send");
203		assert_eq!(procs[0].proc_type, ProcedureType::Command);
204
205		// Merged input: roomId (channel) + text (message)
206		let input = &procs[0].input_schema;
207		assert!(input["properties"]["roomId"].is_object());
208		assert!(input["properties"]["text"].is_object());
209
210		// One subscription: chat.events
211		assert_eq!(subs.len(), 1);
212		assert_eq!(subs[0].name, "chat.events");
213
214		// Tagged union schema
215		assert_eq!(subs[0].output_schema["discriminator"], "type");
216		assert!(subs[0].output_schema["mapping"]["message"].is_object());
217
218		// Meta preserves original (non-merged) schemas
219		assert_eq!(meta.input, serde_json::json!({"properties": {"roomId": {"type": "string"}}}));
220		assert!(meta.incoming.contains_key("send"));
221		assert_eq!(
222			meta.incoming["send"].input,
223			serde_json::json!({"properties": {"text": {"type": "string"}}})
224		);
225		assert!(meta.outgoing.contains_key("message"));
226	}
227
228	#[test]
229	fn expand_multiple_incoming_and_outgoing() {
230		let channel = ChannelDef {
231			name: "game".to_string(),
232			input_schema: serde_json::json!({"properties": {"gameId": {"type": "string"}}}),
233			incoming: vec![
234				(
235					"move".to_string(),
236					IncomingDef {
237						input_schema: serde_json::json!({"properties": {"x": {"type": "int32"}}}),
238						output_schema: serde_json::json!({}),
239						error_schema: Some(serde_json::json!({"properties": {"code": {"type": "string"}}})),
240						handler: dummy_handler(),
241					},
242				),
243				(
244					"resign".to_string(),
245					IncomingDef {
246						input_schema: serde_json::json!({}),
247						output_schema: serde_json::json!({}),
248						error_schema: None,
249						handler: dummy_handler(),
250					},
251				),
252			],
253			outgoing: vec![
254				("moved".to_string(), serde_json::json!({"properties": {"x": {"type": "int32"}}})),
255				("ended".to_string(), serde_json::json!({"properties": {"winner": {"type": "string"}}})),
256			],
257			subscribe_handler: dummy_sub_handler(),
258		};
259
260		let (procs, subs, meta) = channel.expand();
261
262		assert_eq!(procs.len(), 2);
263		assert_eq!(procs[0].name, "game.move");
264		assert_eq!(procs[1].name, "game.resign");
265		assert!(procs[0].error_schema.is_some());
266		assert!(procs[1].error_schema.is_none());
267
268		assert_eq!(subs.len(), 1);
269		assert_eq!(meta.incoming.len(), 2);
270		assert_eq!(meta.outgoing.len(), 2);
271		assert!(meta.incoming["move"].error.is_some());
272		assert!(meta.incoming["resign"].error.is_none());
273	}
274
275	#[test]
276	fn merge_object_schemas_combines_properties() {
277		let channel = serde_json::json!({
278			"properties": {"a": {"type": "string"}},
279			"optionalProperties": {"b": {"type": "int32"}}
280		});
281		let message = serde_json::json!({
282			"properties": {"c": {"type": "boolean"}}
283		});
284		let merged = merge_object_schemas(&channel, &message);
285		assert!(merged["properties"]["a"].is_object());
286		assert!(merged["properties"]["c"].is_object());
287		assert!(merged["optionalProperties"]["b"].is_object());
288	}
289
290	#[test]
291	fn merge_empty_schemas() {
292		let merged = merge_object_schemas(&serde_json::json!({}), &serde_json::json!({}));
293		// Both empty -> produces empty "properties"
294		assert_eq!(merged, serde_json::json!({"properties": {}}));
295	}
296}