Skip to main content

seam_server/
channel.rs

1/* src/server/core/rust/src/channel.rs */
2
3use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::procedure::{
9  HandlerFn, ProcedureDef, ProcedureType, SubscriptionDef, SubscriptionHandlerFn,
10};
11
12pub struct IncomingDef {
13  pub input_schema: Value,
14  pub output_schema: Value,
15  pub error_schema: Option<Value>,
16  pub handler: HandlerFn,
17}
18
19pub struct ChannelDef {
20  pub name: String,
21  pub input_schema: Value,
22  pub incoming: Vec<(String, IncomingDef)>,
23  pub outgoing: Vec<(String, Value)>,
24  pub subscribe_handler: SubscriptionHandlerFn,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ChannelMeta {
29  pub input: Value,
30  pub incoming: BTreeMap<String, IncomingMeta>,
31  pub outgoing: BTreeMap<String, Value>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct IncomingMeta {
36  pub input: Value,
37  pub output: Value,
38  #[serde(default, skip_serializing_if = "Option::is_none")]
39  pub error: Option<Value>,
40}
41
42impl ChannelDef {
43  /// Expand channel to Level 0 primitives (commands + subscription) and metadata.
44  pub fn expand(self) -> (Vec<ProcedureDef>, Vec<SubscriptionDef>, ChannelMeta) {
45    let mut procedures = Vec::new();
46    let mut incoming_meta = BTreeMap::new();
47
48    for (msg_name, msg_def) in self.incoming {
49      let merged_input = merge_object_schemas(&self.input_schema, &msg_def.input_schema);
50
51      procedures.push(ProcedureDef {
52        name: format!("{}.{}", self.name, msg_name),
53        proc_type: ProcedureType::Command,
54        input_schema: merged_input,
55        output_schema: msg_def.output_schema.clone(),
56        error_schema: msg_def.error_schema.clone(),
57        handler: msg_def.handler,
58      });
59
60      incoming_meta.insert(
61        msg_name,
62        IncomingMeta {
63          input: msg_def.input_schema,
64          output: msg_def.output_schema,
65          error: msg_def.error_schema,
66        },
67      );
68    }
69
70    // Build tagged union schema for outgoing events
71    let mut mapping = serde_json::Map::new();
72    let mut outgoing_meta = BTreeMap::new();
73    for (event_name, payload_schema) in &self.outgoing {
74      let mut variant = serde_json::Map::new();
75      let mut props = serde_json::Map::new();
76      props.insert("payload".to_string(), payload_schema.clone());
77      variant.insert("properties".to_string(), Value::Object(props));
78      mapping.insert(event_name.clone(), Value::Object(variant));
79      outgoing_meta.insert(event_name.clone(), payload_schema.clone());
80    }
81    let union_schema = serde_json::json!({
82      "discriminator": "type",
83      "mapping": Value::Object(mapping)
84    });
85
86    let subscriptions = vec![SubscriptionDef {
87      name: format!("{}.events", self.name),
88      input_schema: self.input_schema.clone(),
89      output_schema: union_schema,
90      error_schema: None,
91      handler: self.subscribe_handler,
92    }];
93
94    let meta =
95      ChannelMeta { input: self.input_schema, incoming: incoming_meta, outgoing: outgoing_meta };
96
97    (procedures, subscriptions, meta)
98  }
99}
100
101/// Merge two JTD object schemas (properties + optionalProperties).
102/// Message properties override channel properties on key collision.
103fn merge_object_schemas(channel: &Value, message: &Value) -> Value {
104  let mut merged = serde_json::Map::new();
105
106  let ch_props = channel.get("properties").and_then(|v| v.as_object());
107  let ch_opt = channel.get("optionalProperties").and_then(|v| v.as_object());
108  let msg_props = message.get("properties").and_then(|v| v.as_object());
109  let msg_opt = message.get("optionalProperties").and_then(|v| v.as_object());
110
111  let mut props = serde_json::Map::new();
112  if let Some(p) = ch_props {
113    props.extend(p.clone());
114  }
115  if let Some(p) = msg_props {
116    props.extend(p.clone());
117  }
118
119  let mut opt_props = serde_json::Map::new();
120  if let Some(p) = ch_opt {
121    opt_props.extend(p.clone());
122  }
123  if let Some(p) = msg_opt {
124    opt_props.extend(p.clone());
125  }
126
127  // Always emit "properties" when there are required props, or when
128  // there are no optional props either (empty schema -> empty properties).
129  if !props.is_empty() || opt_props.is_empty() {
130    merged.insert("properties".to_string(), Value::Object(props));
131  }
132  if !opt_props.is_empty() {
133    merged.insert("optionalProperties".to_string(), Value::Object(opt_props));
134  }
135
136  Value::Object(merged)
137}
138
139#[cfg(test)]
140mod tests {
141  use std::sync::Arc;
142
143  use super::*;
144  use crate::procedure::BoxStream;
145
146  fn dummy_handler() -> HandlerFn {
147    Arc::new(|_| Box::pin(async { Ok(serde_json::json!({})) }))
148  }
149
150  struct EmptyStream;
151
152  impl futures_core::Stream for EmptyStream {
153    type Item = Result<serde_json::Value, crate::errors::SeamError>;
154    fn poll_next(
155      self: std::pin::Pin<&mut Self>,
156      _cx: &mut std::task::Context<'_>,
157    ) -> std::task::Poll<Option<Self::Item>> {
158      std::task::Poll::Ready(None)
159    }
160  }
161
162  fn dummy_sub_handler() -> SubscriptionHandlerFn {
163    Arc::new(|_| {
164      Box::pin(async {
165        let stream: BoxStream<Result<serde_json::Value, crate::errors::SeamError>> =
166          Box::pin(EmptyStream);
167        Ok(stream)
168      })
169    })
170  }
171
172  #[test]
173  fn expand_produces_commands_and_subscription() {
174    let channel = ChannelDef {
175      name: "chat".to_string(),
176      input_schema: serde_json::json!({"properties": {"roomId": {"type": "string"}}}),
177      incoming: vec![(
178        "send".to_string(),
179        IncomingDef {
180          input_schema: serde_json::json!({"properties": {"text": {"type": "string"}}}),
181          output_schema: serde_json::json!({"properties": {"ok": {"type": "boolean"}}}),
182          error_schema: None,
183          handler: dummy_handler(),
184        },
185      )],
186      outgoing: vec![(
187        "message".to_string(),
188        serde_json::json!({"properties": {"text": {"type": "string"}}}),
189      )],
190      subscribe_handler: dummy_sub_handler(),
191    };
192
193    let (procs, subs, meta) = channel.expand();
194
195    // One command: chat.send
196    assert_eq!(procs.len(), 1);
197    assert_eq!(procs[0].name, "chat.send");
198    assert_eq!(procs[0].proc_type, ProcedureType::Command);
199
200    // Merged input: roomId (channel) + text (message)
201    let input = &procs[0].input_schema;
202    assert!(input["properties"]["roomId"].is_object());
203    assert!(input["properties"]["text"].is_object());
204
205    // One subscription: chat.events
206    assert_eq!(subs.len(), 1);
207    assert_eq!(subs[0].name, "chat.events");
208
209    // Tagged union schema
210    assert_eq!(subs[0].output_schema["discriminator"], "type");
211    assert!(subs[0].output_schema["mapping"]["message"].is_object());
212
213    // Meta preserves original (non-merged) schemas
214    assert_eq!(meta.input, serde_json::json!({"properties": {"roomId": {"type": "string"}}}));
215    assert!(meta.incoming.contains_key("send"));
216    assert_eq!(
217      meta.incoming["send"].input,
218      serde_json::json!({"properties": {"text": {"type": "string"}}})
219    );
220    assert!(meta.outgoing.contains_key("message"));
221  }
222
223  #[test]
224  fn expand_multiple_incoming_and_outgoing() {
225    let channel = ChannelDef {
226      name: "game".to_string(),
227      input_schema: serde_json::json!({"properties": {"gameId": {"type": "string"}}}),
228      incoming: vec![
229        (
230          "move".to_string(),
231          IncomingDef {
232            input_schema: serde_json::json!({"properties": {"x": {"type": "int32"}}}),
233            output_schema: serde_json::json!({}),
234            error_schema: Some(serde_json::json!({"properties": {"code": {"type": "string"}}})),
235            handler: dummy_handler(),
236          },
237        ),
238        (
239          "resign".to_string(),
240          IncomingDef {
241            input_schema: serde_json::json!({}),
242            output_schema: serde_json::json!({}),
243            error_schema: None,
244            handler: dummy_handler(),
245          },
246        ),
247      ],
248      outgoing: vec![
249        ("moved".to_string(), serde_json::json!({"properties": {"x": {"type": "int32"}}})),
250        ("ended".to_string(), serde_json::json!({"properties": {"winner": {"type": "string"}}})),
251      ],
252      subscribe_handler: dummy_sub_handler(),
253    };
254
255    let (procs, subs, meta) = channel.expand();
256
257    assert_eq!(procs.len(), 2);
258    assert_eq!(procs[0].name, "game.move");
259    assert_eq!(procs[1].name, "game.resign");
260    assert!(procs[0].error_schema.is_some());
261    assert!(procs[1].error_schema.is_none());
262
263    assert_eq!(subs.len(), 1);
264    assert_eq!(meta.incoming.len(), 2);
265    assert_eq!(meta.outgoing.len(), 2);
266    assert!(meta.incoming["move"].error.is_some());
267    assert!(meta.incoming["resign"].error.is_none());
268  }
269
270  #[test]
271  fn merge_object_schemas_combines_properties() {
272    let channel = serde_json::json!({
273      "properties": {"a": {"type": "string"}},
274      "optionalProperties": {"b": {"type": "int32"}}
275    });
276    let message = serde_json::json!({
277      "properties": {"c": {"type": "boolean"}}
278    });
279    let merged = merge_object_schemas(&channel, &message);
280    assert!(merged["properties"]["a"].is_object());
281    assert!(merged["properties"]["c"].is_object());
282    assert!(merged["optionalProperties"]["b"].is_object());
283  }
284
285  #[test]
286  fn merge_empty_schemas() {
287    let merged = merge_object_schemas(&serde_json::json!({}), &serde_json::json!({}));
288    // Both empty -> produces empty "properties"
289    assert_eq!(merged, serde_json::json!({"properties": {}}));
290  }
291}