Skip to main content

liminal/protocol/
schema.rs

1use super::{
2    envelope::SchemaId,
3    error::ProtocolError,
4    frame::{Frame, FrameType, validate_stream},
5};
6
7/// Select the schema that will be used for a subscription stream.
8///
9/// An empty accepted-schema list is an explicit opt-out from schema enforcement
10/// and accepts the channel's declared schema. Otherwise matching is exact
11/// [`SchemaId`] equality against the channel schema hash.
12///
13/// # Errors
14///
15/// Returns [`ProtocolError::SchemaIncompatible`] when the subscriber provided a
16/// non-empty accepted-schema list that does not contain the channel schema.
17pub fn negotiate_schema(
18    channel_schema: SchemaId,
19    accepted_schemas: &[SchemaId],
20) -> Result<SchemaId, ProtocolError> {
21    if accepted_schemas.is_empty() || accepted_schemas.contains(&channel_schema) {
22        Ok(channel_schema)
23    } else {
24        Err(ProtocolError::SchemaIncompatible {
25            message: Some("subscriber does not accept channel schema".to_owned()),
26        })
27    }
28}
29
30/// Construct a `SubscribeError` frame for a failed subscription negotiation.
31///
32/// This keeps schema incompatibility visible on the subscription stream as an
33/// explicit protocol error frame with the stable numeric reason code carried by
34/// [`ProtocolError::reason_code`].
35///
36/// # Errors
37///
38/// Returns [`ProtocolError::InvalidStream`] when `stream_id` is not an
39/// application stream.
40pub fn subscribe_error_frame(
41    stream_id: u32,
42    error: &ProtocolError,
43) -> Result<Frame, ProtocolError> {
44    validate_stream(FrameType::SubscribeError, stream_id)?;
45    Ok(Frame::SubscribeError {
46        flags: 0,
47        stream_id,
48        reason_code: error.reason_code(),
49        message: error.message().map(str::to_owned),
50    })
51}
52
53#[cfg(test)]
54mod tests {
55    use super::{negotiate_schema, subscribe_error_frame};
56    use crate::protocol::{Frame, ProtocolError, SchemaId};
57
58    #[test]
59    fn negotiation_selects_channel_schema_when_accepted() -> Result<(), ProtocolError> {
60        let hash_a = schema(0xA0);
61        let hash_b = schema(0xB0);
62
63        let selected = negotiate_schema(hash_a, &[hash_a, hash_b])?;
64
65        assert_eq!(selected, hash_a);
66        Ok(())
67    }
68
69    #[test]
70    fn negotiation_reports_schema_incompatible_with_distinct_reason_code() {
71        let hash_a = schema(0xA0);
72        let hash_b = schema(0xB0);
73        let hash_c = schema(0xC0);
74
75        let result = negotiate_schema(hash_a, &[hash_b, hash_c]);
76
77        assert!(matches!(
78            &result,
79            Err(ProtocolError::SchemaIncompatible { .. })
80        ));
81        let reason_code = result.err().map_or(0, |error| error.reason_code());
82        assert_eq!(reason_code, ProtocolError::SCHEMA_INCOMPATIBLE_CODE);
83        assert_ne!(reason_code, ProtocolError::CODEC_ERROR_CODE);
84    }
85
86    #[test]
87    fn negotiation_accepts_empty_list_as_opt_out() -> Result<(), ProtocolError> {
88        let hash_a = schema(0xA0);
89
90        let selected = negotiate_schema(hash_a, &[])?;
91
92        assert_eq!(selected, hash_a);
93        Ok(())
94    }
95
96    #[test]
97    fn negotiation_requires_exact_schema_id_equality() {
98        let channel_schema = SchemaId::new([0xAB; SchemaId::WIRE_LEN]);
99        let mut near_match = [0xAB; SchemaId::WIRE_LEN];
100        near_match[SchemaId::WIRE_LEN - 1] = 0xAC;
101
102        let result = negotiate_schema(channel_schema, &[SchemaId::new(near_match)]);
103
104        assert!(matches!(
105            result,
106            Err(ProtocolError::SchemaIncompatible { .. })
107        ));
108    }
109
110    #[test]
111    fn schema_incompatible_error_builds_subscribe_error_frame() -> Result<(), ProtocolError> {
112        let hash_a = schema(0xA0);
113        let hash_b = schema(0xB0);
114        let Err(error) = negotiate_schema(hash_a, &[hash_b]) else {
115            return Err(ProtocolError::codec(
116                "schema negotiation unexpectedly succeeded",
117            ));
118        };
119
120        let frame = subscribe_error_frame(5, &error)?;
121
122        assert!(matches!(
123            frame,
124            Frame::SubscribeError {
125                stream_id: 5,
126                reason_code,
127                message,
128                ..
129            } if reason_code == ProtocolError::SCHEMA_INCOMPATIBLE_CODE
130                && message.as_deref() == Some("subscriber does not accept channel schema")
131        ));
132        Ok(())
133    }
134
135    fn schema(seed: u8) -> SchemaId {
136        SchemaId::new([seed; SchemaId::WIRE_LEN])
137    }
138}