live-feed 0.1.0

Publisher SDK for advertising and serving live data feeds. Consumers use the live-stream crate.
Documentation
//! In-memory publisher subscribe-validation contract.

use live_data::{
    Endpoint, FeedDescriptor, SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse,
    SubscriptionDescriptor,
};

use crate::registry::ManifestRegistry;
use crate::validate::{ValidationError, negotiate_subscription};

/// In-memory publisher backed by a [`ManifestRegistry`].
///
#[derive(Debug, Clone)]
pub struct LocalPublisher {
    registry: ManifestRegistry,
    default_endpoint: Endpoint,
}

impl LocalPublisher {
    pub fn new(server_version: impl Into<String>, default_endpoint: Endpoint) -> Self {
        Self {
            registry: ManifestRegistry::new(server_version),
            default_endpoint,
        }
    }

    pub fn registry(&self) -> &ManifestRegistry {
        &self.registry
    }

    pub fn registry_mut(&mut self) -> &mut ManifestRegistry {
        &mut self.registry
    }

    /// Default endpoint advertised on successful subscriptions. The
    /// negotiated transport's tag is paired with this endpoint's
    /// address to form the [`SubscribeAck::endpoint`].
    pub fn default_endpoint(&self) -> &Endpoint {
        &self.default_endpoint
    }

    pub fn register(&mut self, descriptor: FeedDescriptor) -> Result<(), ValidationError> {
        self.registry.register(descriptor)
    }

    /// Validate a subscription request and produce the matching
    /// [`SubscribeResponse`]. Unknown feeds and capability violations
    /// produce `Err` variants with descriptive codes; successful
    /// negotiations produce an `Ack` pointing at the publisher's
    /// default endpoint with the negotiated transport.
    pub fn handle_subscribe(&self, req: &SubscriptionDescriptor) -> SubscribeResponse {
        let descriptor = match self.registry.get(&req.feed) {
            Some(d) => d,
            None => {
                return SubscribeResponse::Err(SubscribeError {
                    code: SubscribeErrorCode::UnknownFeed,
                    message: format!("unknown feed '{}'", req.feed),
                });
            }
        };
        match negotiate_subscription(descriptor, req) {
            Ok(neg) => SubscribeResponse::Ack(SubscribeAck {
                endpoint: Endpoint::new(neg.transport, self.default_endpoint.address.clone()),
                format: neg.format,
                negotiated_columns: neg.columns,
            }),
            Err(e) => SubscribeResponse::Err(map_validation(e)),
        }
    }
}

/// Translate a [`ValidationError`] into the wire-shaped
/// [`SubscribeError`] code that consumers see.
fn map_validation(e: ValidationError) -> SubscribeError {
    let (code, message) = match &e {
        ValidationError::UnknownFeed(_) => (SubscribeErrorCode::UnknownFeed, format!("{e}")),
        ValidationError::UnknownColumn { .. }
        | ValidationError::FilterNotSupported { .. }
        | ValidationError::SamplingNotSupported { .. } => {
            (SubscribeErrorCode::UnsupportedCapability, format!("{e}"))
        }
        ValidationError::TransportNotSupported { .. } => {
            (SubscribeErrorCode::UnsupportedTransport, format!("{e}"))
        }
        ValidationError::FormatNotSupported { .. } => {
            (SubscribeErrorCode::UnsupportedFormat, format!("{e}"))
        }
        _ => (SubscribeErrorCode::Internal, format!("{e}")),
    };
    SubscribeError { code, message }
}