foxglove 0.24.0

Foxglove SDK
Documentation
//! Server advertise message types.

use std::borrow::Cow;
use std::collections::BTreeMap;
use std::sync::Arc;

use serde::{Deserialize, Serialize};

use crate::RawChannel;
use crate::Schema as CrateSchema;
use crate::protocol::JsonMessage;
use crate::protocol::schema::{self, Schema};

/// Server advertise message.
///
/// Spec: <https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md#advertise>
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "op", rename = "advertise", rename_all = "camelCase")]
pub struct Advertise<'a> {
    /// Advertised channels.
    #[serde(borrow)]
    pub channels: Vec<Channel<'a>>,
}

impl<'a> Advertise<'a> {
    /// Creates a new advertise message.
    pub fn new(channels: impl IntoIterator<Item = Channel<'a>>) -> Self {
        Self {
            channels: channels.into_iter().collect(),
        }
    }

    /// Returns an owned version of this message.
    pub fn into_owned(self) -> Advertise<'static> {
        Advertise {
            channels: self.channels.into_iter().map(|c| c.into_owned()).collect(),
        }
    }
}

impl JsonMessage for Advertise<'_> {}

/// Server channel advertisement.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Channel<'a> {
    /// Channel ID.
    pub id: u64,
    /// Topic name.
    #[serde(borrow)]
    pub topic: Cow<'a, str>,
    /// Message encoding.
    #[serde(borrow)]
    pub encoding: Cow<'a, str>,
    /// Schema name.
    #[serde(borrow)]
    pub schema_name: Cow<'a, str>,
    /// Schema encoding.
    #[serde(borrow, skip_serializing_if = "Option::is_none")]
    pub schema_encoding: Option<Cow<'a, str>>,
    /// Schema data.
    ///
    /// This is the protocol-encoded form. You can use the [`Channel::schema`] method to
    /// decode it.
    #[serde(borrow)]
    pub schema: Cow<'a, str>,
    /// Channel metadata.
    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
    pub metadata: BTreeMap<String, String>,
}

impl<'a> Channel<'a> {
    /// Creates a new builder for a channel advertisement.
    #[must_use]
    pub fn builder(
        id: u64,
        topic: impl Into<Cow<'a, str>>,
        encoding: impl Into<Cow<'a, str>>,
    ) -> ChannelBuilder<'a> {
        ChannelBuilder {
            id,
            topic: topic.into(),
            encoding: encoding.into(),
            schema: None,
            metadata: BTreeMap::new(),
        }
    }

    /// Returns the decoded schema data.
    pub fn decode_schema(&self) -> Result<Vec<u8>, schema::DecodeError> {
        if let Some(schema_encoding) = self.schema_encoding.as_ref() {
            schema::decode_schema_data(schema_encoding, &self.schema)
        } else {
            Err(schema::DecodeError::MissingSchema)
        }
    }

    /// Returns an owned version of this channel.
    pub fn into_owned(self) -> Channel<'static> {
        Channel {
            id: self.id,
            topic: self.topic.into_owned().into(),
            encoding: self.encoding.into_owned().into(),
            schema_name: self.schema_name.into_owned().into(),
            schema_encoding: self.schema_encoding.map(|s| s.into_owned().into()),
            schema: self.schema.into_owned().into(),
            metadata: self.metadata,
        }
    }
}

impl<'a> TryFrom<Channel<'a>> for Schema<'a> {
    type Error = schema::DecodeError;

    fn try_from(value: Channel<'a>) -> Result<Self, schema::DecodeError> {
        let schema = value.decode_schema()?;
        Ok(Schema::new(
            value.schema_name,
            value.schema_encoding.unwrap_or_default(),
            schema,
        ))
    }
}

/// Server channel advertisement builder.
pub struct ChannelBuilder<'a> {
    id: u64,
    topic: Cow<'a, str>,
    encoding: Cow<'a, str>,
    schema: Option<Schema<'a>>,
    metadata: BTreeMap<String, String>,
}

impl<'a> ChannelBuilder<'a> {
    /// Adds a schema to the channel advertisement.
    #[must_use]
    pub fn with_schema(mut self, schema: Schema<'a>) -> Self {
        self.schema = Some(schema);
        self
    }

    /// Adds metadata to the channel advertisement.
    #[must_use]
    pub fn with_metadata(mut self, metadata: BTreeMap<String, String>) -> Self {
        self.metadata = metadata;
        self
    }

    /// Constructs the channel advertisement.
    pub fn build(self) -> Result<Channel<'a>, schema::EncodeError> {
        match self.schema {
            None => {
                if schema::is_schema_required(&self.encoding) {
                    Err(schema::EncodeError::MissingSchema)
                } else {
                    Ok(Channel {
                        id: self.id,
                        topic: self.topic,
                        encoding: self.encoding,
                        schema_name: "".into(),
                        schema_encoding: None,
                        schema: Cow::Borrowed(""),
                        metadata: self.metadata,
                    })
                }
            }
            Some(schema) => Ok(Channel {
                id: self.id,
                topic: self.topic,
                encoding: self.encoding,
                schema: schema::encode_schema_data(&schema.encoding, schema.data)?,
                schema_name: schema.name,
                schema_encoding: Some(schema.encoding),
                metadata: self.metadata,
            }),
        }
    }
}

impl<'a> From<&'a CrateSchema> for Schema<'a> {
    fn from(schema: &'a CrateSchema) -> Self {
        Self::new(&schema.name, &schema.encoding, schema.data.clone())
    }
}

impl<'a> TryFrom<&'a RawChannel> for Channel<'a> {
    type Error = schema::EncodeError;

    fn try_from(ch: &'a RawChannel) -> Result<Self, Self::Error> {
        let mut builder = Self::builder(ch.id().into(), ch.topic(), ch.message_encoding());
        if let Some(s) = ch.schema() {
            builder = builder.with_schema(s.into());
        }
        let metadata = ch.metadata();
        if !metadata.is_empty() {
            builder = builder.with_metadata(metadata.clone());
        }
        builder.build()
    }
}

fn maybe_advertise_channel(channel: &Arc<RawChannel>) -> Option<Channel<'_>> {
    channel
        .as_ref()
        .try_into()
        .inspect_err(|err| match err {
            schema::EncodeError::MissingSchema => {
                tracing::error!(
                    "Ignoring advertise channel for {} because a schema is required",
                    channel.topic()
                );
            }
            err => {
                tracing::error!("Error advertising channel to client: {err}");
            }
        })
        .ok()
}

/// Creates an advertise message for the specified channels.
pub fn advertise_channels<'a>(
    channels: impl IntoIterator<Item = &'a Arc<RawChannel>>,
) -> Advertise<'a> {
    Advertise::new(channels.into_iter().filter_map(maybe_advertise_channel))
}

#[cfg(test)]
mod tests {
    use super::*;

    fn message() -> Advertise<'static> {
        Advertise::new([
            Channel::builder(10, "/t1", "json").build().unwrap(),
            Channel::builder(20, "/t2", "json")
                .with_schema(Schema::new(
                    "t2-schema",
                    "jsonschema",
                    br#"{"type": "object"}"#,
                ))
                .build()
                .unwrap(),
            Channel::builder(30, "/t3", "protobuf")
                .with_schema(Schema::new(
                    "t3-schema",
                    "protobuf",
                    &[0xde, 0xad, 0xbe, 0xef],
                ))
                .build()
                .unwrap(),
            Channel::builder(40, "/t4", "json")
                .with_metadata(BTreeMap::from([(
                    "foxglove.hasVideoTrack".to_string(),
                    "true".to_string(),
                )]))
                .build()
                .unwrap(),
        ])
    }

    #[test]
    fn test_encode() {
        insta::assert_json_snapshot!(message());
    }

    #[test]
    fn test_roundtrip() {
        let orig = message();
        let buf = orig.to_string();
        let parsed: Advertise = serde_json::from_str(&buf).unwrap();
        assert_eq!(parsed, orig);
    }
}