use std::borrow::Cow;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::protocol::schema::{self, Schema};
use crate::protocol::JsonMessage;
use crate::RawChannel;
use crate::Schema as CrateSchema;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "op", rename = "advertise", rename_all = "camelCase")]
pub struct Advertise<'a> {
#[serde(borrow)]
pub channels: Vec<Channel<'a>>,
}
impl<'a> Advertise<'a> {
pub fn new(channels: impl IntoIterator<Item = Channel<'a>>) -> Self {
Self {
channels: channels.into_iter().collect(),
}
}
pub fn into_owned(self) -> Advertise<'static> {
Advertise {
channels: self.channels.into_iter().map(|c| c.into_owned()).collect(),
}
}
}
impl JsonMessage for Advertise<'_> {}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Channel<'a> {
pub id: u64,
#[serde(borrow)]
pub topic: Cow<'a, str>,
#[serde(borrow)]
pub encoding: Cow<'a, str>,
#[serde(borrow)]
pub schema_name: Cow<'a, str>,
#[serde(borrow, skip_serializing_if = "Option::is_none")]
pub schema_encoding: Option<Cow<'a, str>>,
#[serde(borrow)]
pub schema: Cow<'a, str>,
}
impl<'a> Channel<'a> {
#[must_use]
pub fn builder(
id: u64,
topic: impl Into<Cow<'a, str>>,
encoding: impl Into<Cow<'a, str>>,
) -> ChannelBuilder<'a> {
ChannelBuilder {
id,
topic: topic.into(),
encoding: encoding.into(),
schema: None,
}
}
pub fn decode_schema(&self) -> Result<Vec<u8>, schema::DecodeError> {
if let Some(schema_encoding) = self.schema_encoding.as_ref() {
schema::decode_schema_data(schema_encoding, &self.schema)
} else {
Err(schema::DecodeError::MissingSchema)
}
}
pub fn into_owned(self) -> Channel<'static> {
Channel {
id: self.id,
topic: self.topic.into_owned().into(),
encoding: self.encoding.into_owned().into(),
schema_name: self.schema_name.into_owned().into(),
schema_encoding: self.schema_encoding.map(|s| s.into_owned().into()),
schema: self.schema.into_owned().into(),
}
}
}
impl<'a> TryFrom<Channel<'a>> for Schema<'a> {
type Error = schema::DecodeError;
fn try_from(value: Channel<'a>) -> Result<Self, schema::DecodeError> {
let schema = value.decode_schema()?;
Ok(Schema::new(
value.schema_name,
value.schema_encoding.unwrap_or_default(),
schema,
))
}
}
pub struct ChannelBuilder<'a> {
id: u64,
topic: Cow<'a, str>,
encoding: Cow<'a, str>,
schema: Option<Schema<'a>>,
}
impl<'a> ChannelBuilder<'a> {
#[must_use]
pub fn with_schema(mut self, schema: Schema<'a>) -> Self {
self.schema = Some(schema);
self
}
pub fn build(self) -> Result<Channel<'a>, schema::EncodeError> {
match self.schema {
None => {
if schema::is_schema_required(&self.encoding) {
Err(schema::EncodeError::MissingSchema)
} else {
Ok(Channel {
id: self.id,
topic: self.topic,
encoding: self.encoding,
schema_name: "".into(),
schema_encoding: None,
schema: Cow::Borrowed(""),
})
}
}
Some(schema) => Ok(Channel {
id: self.id,
topic: self.topic,
encoding: self.encoding,
schema: schema::encode_schema_data(&schema.encoding, schema.data)?,
schema_name: schema.name,
schema_encoding: Some(schema.encoding),
}),
}
}
}
impl<'a> From<&'a CrateSchema> for Schema<'a> {
fn from(schema: &'a CrateSchema) -> Self {
Self::new(&schema.name, &schema.encoding, schema.data.clone())
}
}
impl<'a> TryFrom<&'a RawChannel> for Channel<'a> {
type Error = schema::EncodeError;
fn try_from(ch: &'a RawChannel) -> Result<Self, Self::Error> {
let mut builder = Self::builder(ch.id().into(), ch.topic(), ch.message_encoding());
if let Some(s) = ch.schema() {
builder = builder.with_schema(s.into());
}
builder.build()
}
}
fn maybe_advertise_channel(channel: &Arc<RawChannel>) -> Option<Channel<'_>> {
channel
.as_ref()
.try_into()
.inspect_err(|err| match err {
schema::EncodeError::MissingSchema => {
tracing::error!(
"Ignoring advertise channel for {} because a schema is required",
channel.topic()
);
}
err => {
tracing::error!("Error advertising channel to client: {err}");
}
})
.ok()
}
pub fn advertise_channels<'a>(
channels: impl IntoIterator<Item = &'a Arc<RawChannel>>,
) -> Advertise<'a> {
Advertise::new(channels.into_iter().filter_map(maybe_advertise_channel))
}
#[cfg(test)]
mod tests {
use super::*;
fn message() -> Advertise<'static> {
Advertise::new([
Channel::builder(10, "/t1", "json").build().unwrap(),
Channel::builder(20, "/t2", "json")
.with_schema(Schema::new(
"t2-schema",
"jsonschema",
br#"{"type": "object"}"#,
))
.build()
.unwrap(),
Channel::builder(30, "/t3", "protobuf")
.with_schema(Schema::new(
"t3-schema",
"protobuf",
&[0xde, 0xad, 0xbe, 0xef],
))
.build()
.unwrap(),
])
}
#[test]
fn test_encode() {
insta::assert_json_snapshot!(message());
}
#[test]
fn test_roundtrip() {
let orig = message();
let buf = orig.to_string();
let parsed: Advertise = serde_json::from_str(&buf).unwrap();
assert_eq!(parsed, orig);
}
}