liminal/protocol/
schema.rs1use super::{
2 envelope::SchemaId,
3 error::ProtocolError,
4 frame::{Frame, FrameType, validate_stream},
5};
6
7pub fn negotiate_schema(
18 channel_schema: SchemaId,
19 accepted_schemas: &[SchemaId],
20) -> Result<SchemaId, ProtocolError> {
21 if accepted_schemas.is_empty() || accepted_schemas.contains(&channel_schema) {
22 Ok(channel_schema)
23 } else {
24 Err(ProtocolError::SchemaIncompatible {
25 message: Some("subscriber does not accept channel schema".to_owned()),
26 })
27 }
28}
29
30pub fn subscribe_error_frame(
41 stream_id: u32,
42 error: &ProtocolError,
43) -> Result<Frame, ProtocolError> {
44 validate_stream(FrameType::SubscribeError, stream_id)?;
45 Ok(Frame::SubscribeError {
46 flags: 0,
47 stream_id,
48 reason_code: error.reason_code(),
49 message: error.message().map(str::to_owned),
50 })
51}
52
53#[cfg(test)]
54mod tests {
55 use super::{negotiate_schema, subscribe_error_frame};
56 use crate::protocol::{Frame, ProtocolError, SchemaId};
57
58 #[test]
59 fn negotiation_selects_channel_schema_when_accepted() -> Result<(), ProtocolError> {
60 let hash_a = schema(0xA0);
61 let hash_b = schema(0xB0);
62
63 let selected = negotiate_schema(hash_a, &[hash_a, hash_b])?;
64
65 assert_eq!(selected, hash_a);
66 Ok(())
67 }
68
69 #[test]
70 fn negotiation_reports_schema_incompatible_with_distinct_reason_code() {
71 let hash_a = schema(0xA0);
72 let hash_b = schema(0xB0);
73 let hash_c = schema(0xC0);
74
75 let result = negotiate_schema(hash_a, &[hash_b, hash_c]);
76
77 assert!(matches!(
78 &result,
79 Err(ProtocolError::SchemaIncompatible { .. })
80 ));
81 let reason_code = result.err().map_or(0, |error| error.reason_code());
82 assert_eq!(reason_code, ProtocolError::SCHEMA_INCOMPATIBLE_CODE);
83 assert_ne!(reason_code, ProtocolError::CODEC_ERROR_CODE);
84 }
85
86 #[test]
87 fn negotiation_accepts_empty_list_as_opt_out() -> Result<(), ProtocolError> {
88 let hash_a = schema(0xA0);
89
90 let selected = negotiate_schema(hash_a, &[])?;
91
92 assert_eq!(selected, hash_a);
93 Ok(())
94 }
95
96 #[test]
97 fn negotiation_requires_exact_schema_id_equality() {
98 let channel_schema = SchemaId::new([0xAB; SchemaId::WIRE_LEN]);
99 let mut near_match = [0xAB; SchemaId::WIRE_LEN];
100 near_match[SchemaId::WIRE_LEN - 1] = 0xAC;
101
102 let result = negotiate_schema(channel_schema, &[SchemaId::new(near_match)]);
103
104 assert!(matches!(
105 result,
106 Err(ProtocolError::SchemaIncompatible { .. })
107 ));
108 }
109
110 #[test]
111 fn schema_incompatible_error_builds_subscribe_error_frame() -> Result<(), ProtocolError> {
112 let hash_a = schema(0xA0);
113 let hash_b = schema(0xB0);
114 let Err(error) = negotiate_schema(hash_a, &[hash_b]) else {
115 return Err(ProtocolError::codec(
116 "schema negotiation unexpectedly succeeded",
117 ));
118 };
119
120 let frame = subscribe_error_frame(5, &error)?;
121
122 assert!(matches!(
123 frame,
124 Frame::SubscribeError {
125 stream_id: 5,
126 reason_code,
127 message,
128 ..
129 } if reason_code == ProtocolError::SCHEMA_INCOMPATIBLE_CODE
130 && message.as_deref() == Some("subscriber does not accept channel schema")
131 ));
132 Ok(())
133 }
134
135 fn schema(seed: u8) -> SchemaId {
136 SchemaId::new([seed; SchemaId::WIRE_LEN])
137 }
138}