Skip to main content

seam_server/
channel.rs

1/* src/server/core/rust/src/channel.rs */
2
3use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::procedure::{
9  HandlerFn, ProcedureDef, ProcedureType, SubscriptionDef, SubscriptionHandlerFn,
10};
11
12pub struct IncomingDef {
13  pub input_schema: Value,
14  pub output_schema: Value,
15  pub error_schema: Option<Value>,
16  pub handler: HandlerFn,
17}
18
19pub struct ChannelDef {
20  pub name: String,
21  pub input_schema: Value,
22  pub incoming: Vec<(String, IncomingDef)>,
23  pub outgoing: Vec<(String, Value)>,
24  pub subscribe_handler: SubscriptionHandlerFn,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ChannelMeta {
29  pub input: Value,
30  pub incoming: BTreeMap<String, IncomingMeta>,
31  pub outgoing: BTreeMap<String, Value>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct IncomingMeta {
36  pub input: Value,
37  pub output: Value,
38  #[serde(default, skip_serializing_if = "Option::is_none")]
39  pub error: Option<Value>,
40}
41
42impl ChannelDef {
43  /// Expand channel to Level 0 primitives (commands + subscription) and metadata.
44  pub fn expand(self) -> (Vec<ProcedureDef>, Vec<SubscriptionDef>, ChannelMeta) {
45    let mut procedures = Vec::new();
46    let mut incoming_meta = BTreeMap::new();
47
48    for (msg_name, msg_def) in self.incoming {
49      let merged_input = merge_object_schemas(&self.input_schema, &msg_def.input_schema);
50
51      procedures.push(ProcedureDef {
52        name: format!("{}.{}", self.name, msg_name),
53        proc_type: ProcedureType::Command,
54        input_schema: merged_input,
55        output_schema: msg_def.output_schema.clone(),
56        error_schema: msg_def.error_schema.clone(),
57        context_keys: vec![],
58        handler: msg_def.handler,
59      });
60
61      incoming_meta.insert(
62        msg_name,
63        IncomingMeta {
64          input: msg_def.input_schema,
65          output: msg_def.output_schema,
66          error: msg_def.error_schema,
67        },
68      );
69    }
70
71    // Build tagged union schema for outgoing events
72    let mut mapping = serde_json::Map::new();
73    let mut outgoing_meta = BTreeMap::new();
74    for (event_name, payload_schema) in &self.outgoing {
75      let mut variant = serde_json::Map::new();
76      let mut props = serde_json::Map::new();
77      props.insert("payload".to_string(), payload_schema.clone());
78      variant.insert("properties".to_string(), Value::Object(props));
79      mapping.insert(event_name.clone(), Value::Object(variant));
80      outgoing_meta.insert(event_name.clone(), payload_schema.clone());
81    }
82    let union_schema = serde_json::json!({
83      "discriminator": "type",
84      "mapping": Value::Object(mapping)
85    });
86
87    let subscriptions = vec![SubscriptionDef {
88      name: format!("{}.events", self.name),
89      input_schema: self.input_schema.clone(),
90      output_schema: union_schema,
91      error_schema: None,
92      context_keys: vec![],
93      handler: self.subscribe_handler,
94    }];
95
96    let meta =
97      ChannelMeta { input: self.input_schema, incoming: incoming_meta, outgoing: outgoing_meta };
98
99    (procedures, subscriptions, meta)
100  }
101}
102
103/// Merge two JTD object schemas (properties + optionalProperties).
104/// Message properties override channel properties on key collision.
105fn merge_object_schemas(channel: &Value, message: &Value) -> Value {
106  let mut merged = serde_json::Map::new();
107
108  let ch_props = channel.get("properties").and_then(|v| v.as_object());
109  let ch_opt = channel.get("optionalProperties").and_then(|v| v.as_object());
110  let msg_props = message.get("properties").and_then(|v| v.as_object());
111  let msg_opt = message.get("optionalProperties").and_then(|v| v.as_object());
112
113  let mut props = serde_json::Map::new();
114  if let Some(p) = ch_props {
115    props.extend(p.clone());
116  }
117  if let Some(p) = msg_props {
118    props.extend(p.clone());
119  }
120
121  let mut opt_props = serde_json::Map::new();
122  if let Some(p) = ch_opt {
123    opt_props.extend(p.clone());
124  }
125  if let Some(p) = msg_opt {
126    opt_props.extend(p.clone());
127  }
128
129  // Always emit "properties" when there are required props, or when
130  // there are no optional props either (empty schema -> empty properties).
131  if !props.is_empty() || opt_props.is_empty() {
132    merged.insert("properties".to_string(), Value::Object(props));
133  }
134  if !opt_props.is_empty() {
135    merged.insert("optionalProperties".to_string(), Value::Object(opt_props));
136  }
137
138  Value::Object(merged)
139}
140
141#[cfg(test)]
142mod tests {
143  use std::sync::Arc;
144
145  use super::*;
146  use crate::procedure::BoxStream;
147
148  fn dummy_handler() -> HandlerFn {
149    Arc::new(|_, _| Box::pin(async { Ok(serde_json::json!({})) }))
150  }
151
152  struct EmptyStream;
153
154  impl futures_core::Stream for EmptyStream {
155    type Item = Result<serde_json::Value, crate::errors::SeamError>;
156    fn poll_next(
157      self: std::pin::Pin<&mut Self>,
158      _cx: &mut std::task::Context<'_>,
159    ) -> std::task::Poll<Option<Self::Item>> {
160      std::task::Poll::Ready(None)
161    }
162  }
163
164  fn dummy_sub_handler() -> SubscriptionHandlerFn {
165    Arc::new(|_, _| {
166      Box::pin(async {
167        let stream: BoxStream<Result<serde_json::Value, crate::errors::SeamError>> =
168          Box::pin(EmptyStream);
169        Ok(stream)
170      })
171    })
172  }
173
174  #[test]
175  fn expand_produces_commands_and_subscription() {
176    let channel = ChannelDef {
177      name: "chat".to_string(),
178      input_schema: serde_json::json!({"properties": {"roomId": {"type": "string"}}}),
179      incoming: vec![(
180        "send".to_string(),
181        IncomingDef {
182          input_schema: serde_json::json!({"properties": {"text": {"type": "string"}}}),
183          output_schema: serde_json::json!({"properties": {"ok": {"type": "boolean"}}}),
184          error_schema: None,
185          handler: dummy_handler(),
186        },
187      )],
188      outgoing: vec![(
189        "message".to_string(),
190        serde_json::json!({"properties": {"text": {"type": "string"}}}),
191      )],
192      subscribe_handler: dummy_sub_handler(),
193    };
194
195    let (procs, subs, meta) = channel.expand();
196
197    // One command: chat.send
198    assert_eq!(procs.len(), 1);
199    assert_eq!(procs[0].name, "chat.send");
200    assert_eq!(procs[0].proc_type, ProcedureType::Command);
201
202    // Merged input: roomId (channel) + text (message)
203    let input = &procs[0].input_schema;
204    assert!(input["properties"]["roomId"].is_object());
205    assert!(input["properties"]["text"].is_object());
206
207    // One subscription: chat.events
208    assert_eq!(subs.len(), 1);
209    assert_eq!(subs[0].name, "chat.events");
210
211    // Tagged union schema
212    assert_eq!(subs[0].output_schema["discriminator"], "type");
213    assert!(subs[0].output_schema["mapping"]["message"].is_object());
214
215    // Meta preserves original (non-merged) schemas
216    assert_eq!(meta.input, serde_json::json!({"properties": {"roomId": {"type": "string"}}}));
217    assert!(meta.incoming.contains_key("send"));
218    assert_eq!(
219      meta.incoming["send"].input,
220      serde_json::json!({"properties": {"text": {"type": "string"}}})
221    );
222    assert!(meta.outgoing.contains_key("message"));
223  }
224
225  #[test]
226  fn expand_multiple_incoming_and_outgoing() {
227    let channel = ChannelDef {
228      name: "game".to_string(),
229      input_schema: serde_json::json!({"properties": {"gameId": {"type": "string"}}}),
230      incoming: vec![
231        (
232          "move".to_string(),
233          IncomingDef {
234            input_schema: serde_json::json!({"properties": {"x": {"type": "int32"}}}),
235            output_schema: serde_json::json!({}),
236            error_schema: Some(serde_json::json!({"properties": {"code": {"type": "string"}}})),
237            handler: dummy_handler(),
238          },
239        ),
240        (
241          "resign".to_string(),
242          IncomingDef {
243            input_schema: serde_json::json!({}),
244            output_schema: serde_json::json!({}),
245            error_schema: None,
246            handler: dummy_handler(),
247          },
248        ),
249      ],
250      outgoing: vec![
251        ("moved".to_string(), serde_json::json!({"properties": {"x": {"type": "int32"}}})),
252        ("ended".to_string(), serde_json::json!({"properties": {"winner": {"type": "string"}}})),
253      ],
254      subscribe_handler: dummy_sub_handler(),
255    };
256
257    let (procs, subs, meta) = channel.expand();
258
259    assert_eq!(procs.len(), 2);
260    assert_eq!(procs[0].name, "game.move");
261    assert_eq!(procs[1].name, "game.resign");
262    assert!(procs[0].error_schema.is_some());
263    assert!(procs[1].error_schema.is_none());
264
265    assert_eq!(subs.len(), 1);
266    assert_eq!(meta.incoming.len(), 2);
267    assert_eq!(meta.outgoing.len(), 2);
268    assert!(meta.incoming["move"].error.is_some());
269    assert!(meta.incoming["resign"].error.is_none());
270  }
271
272  #[test]
273  fn merge_object_schemas_combines_properties() {
274    let channel = serde_json::json!({
275      "properties": {"a": {"type": "string"}},
276      "optionalProperties": {"b": {"type": "int32"}}
277    });
278    let message = serde_json::json!({
279      "properties": {"c": {"type": "boolean"}}
280    });
281    let merged = merge_object_schemas(&channel, &message);
282    assert!(merged["properties"]["a"].is_object());
283    assert!(merged["properties"]["c"].is_object());
284    assert!(merged["optionalProperties"]["b"].is_object());
285  }
286
287  #[test]
288  fn merge_empty_schemas() {
289    let merged = merge_object_schemas(&serde_json::json!({}), &serde_json::json!({}));
290    // Both empty -> produces empty "properties"
291    assert_eq!(merged, serde_json::json!({"properties": {}}));
292  }
293}