use super::{
envelope::SchemaId,
error::ProtocolError,
frame::{Frame, FrameType, validate_stream},
};
pub fn negotiate_schema(
channel_schema: SchemaId,
accepted_schemas: &[SchemaId],
) -> Result<SchemaId, ProtocolError> {
if accepted_schemas.is_empty() || accepted_schemas.contains(&channel_schema) {
Ok(channel_schema)
} else {
Err(ProtocolError::SchemaIncompatible {
message: Some("subscriber does not accept channel schema".to_owned()),
})
}
}
pub fn subscribe_error_frame(
stream_id: u32,
error: &ProtocolError,
) -> Result<Frame, ProtocolError> {
validate_stream(FrameType::SubscribeError, stream_id)?;
Ok(Frame::SubscribeError {
flags: 0,
stream_id,
reason_code: error.reason_code(),
message: error.message().map(str::to_owned),
})
}
#[cfg(test)]
mod tests {
use super::{negotiate_schema, subscribe_error_frame};
use crate::protocol::{Frame, ProtocolError, SchemaId};
#[test]
fn negotiation_selects_channel_schema_when_accepted() -> Result<(), ProtocolError> {
let hash_a = schema(0xA0);
let hash_b = schema(0xB0);
let selected = negotiate_schema(hash_a, &[hash_a, hash_b])?;
assert_eq!(selected, hash_a);
Ok(())
}
#[test]
fn negotiation_reports_schema_incompatible_with_distinct_reason_code() {
let hash_a = schema(0xA0);
let hash_b = schema(0xB0);
let hash_c = schema(0xC0);
let result = negotiate_schema(hash_a, &[hash_b, hash_c]);
assert!(matches!(
&result,
Err(ProtocolError::SchemaIncompatible { .. })
));
let reason_code = result.err().map_or(0, |error| error.reason_code());
assert_eq!(reason_code, ProtocolError::SCHEMA_INCOMPATIBLE_CODE);
assert_ne!(reason_code, ProtocolError::CODEC_ERROR_CODE);
}
#[test]
fn negotiation_accepts_empty_list_as_opt_out() -> Result<(), ProtocolError> {
let hash_a = schema(0xA0);
let selected = negotiate_schema(hash_a, &[])?;
assert_eq!(selected, hash_a);
Ok(())
}
#[test]
fn negotiation_requires_exact_schema_id_equality() {
let channel_schema = SchemaId::new([0xAB; SchemaId::WIRE_LEN]);
let mut near_match = [0xAB; SchemaId::WIRE_LEN];
near_match[SchemaId::WIRE_LEN - 1] = 0xAC;
let result = negotiate_schema(channel_schema, &[SchemaId::new(near_match)]);
assert!(matches!(
result,
Err(ProtocolError::SchemaIncompatible { .. })
));
}
#[test]
fn schema_incompatible_error_builds_subscribe_error_frame() -> Result<(), ProtocolError> {
let hash_a = schema(0xA0);
let hash_b = schema(0xB0);
let Err(error) = negotiate_schema(hash_a, &[hash_b]) else {
return Err(ProtocolError::codec(
"schema negotiation unexpectedly succeeded",
));
};
let frame = subscribe_error_frame(5, &error)?;
assert!(matches!(
frame,
Frame::SubscribeError {
stream_id: 5,
reason_code,
message,
..
} if reason_code == ProtocolError::SCHEMA_INCOMPATIBLE_CODE
&& message.as_deref() == Some("subscriber does not accept channel schema")
));
Ok(())
}
fn schema(seed: u8) -> SchemaId {
SchemaId::new([seed; SchemaId::WIRE_LEN])
}
}