live-data 0.1.0

Shared descriptor, manifest, and subscription types for the live-feed publisher SDK and the live-stream consumer SDK.
Documentation
//! Subscription request and response types.
//!
//! [`SubscriptionDescriptor`] is what a consumer sends; the publisher replies
//! with [`SubscribeResponse`], either an [`SubscribeAck`] pointing at the
//! endpoint to read from, or a [`SubscribeError`].

use serde::{Deserialize, Serialize};

use crate::endpoint::Endpoint;
use crate::transport::{FormatPreference, TransportPreference};

/// Request to open one feed.
///
/// Every field except `feed` is optional and represents a *preference*. A 0.1.0
/// publisher honours `columns` and ignores `filter` and `sampling` unless
/// [`Capabilities`](crate::Capabilities) advertises support.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SubscriptionDescriptor {
    /// Name of the feed, matching a [`FeedDescriptor::name`](crate::FeedDescriptor::name).
    pub feed: String,
    /// Subset of columns to receive. `None` means "all columns".
    #[serde(default)]
    pub columns: Option<Vec<String>>,
    /// Row-level filter applied server-side. 0.1.0 servers refuse anything
    /// other than [`FilterExpr::Empty`].
    #[serde(default = "FilterExpr::empty")]
    pub filter: FilterExpr,
    /// Optional sampling policy, e.g. "every Nth batch".
    #[serde(default)]
    pub sampling: Option<Sampling>,
    /// Ordered transport preference. Empty means "any".
    #[serde(default)]
    pub transport_pref: TransportPreference,
    /// Preferred batch format. `None` means "publisher's default".
    #[serde(default)]
    pub format_pref: Option<FormatPreference>,
}

impl SubscriptionDescriptor {
    pub fn new(feed: impl Into<String>) -> Self {
        Self {
            feed: feed.into(),
            columns: None,
            filter: FilterExpr::empty(),
            sampling: None,
            transport_pref: TransportPreference::any(),
            format_pref: None,
        }
    }

    pub fn columns<I, S>(mut self, cols: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        self.columns = Some(cols.into_iter().map(Into::into).collect());
        self
    }

    pub fn transport_pref(mut self, pref: TransportPreference) -> Self {
        self.transport_pref = pref;
        self
    }

    pub fn format(mut self, fmt: FormatPreference) -> Self {
        self.format_pref = Some(fmt);
        self
    }

    pub fn sampling(mut self, sampling: Sampling) -> Self {
        self.sampling = Some(sampling);
        self
    }
}

/// Optional sampling policy that can be applied server-side.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct Sampling {
    /// Emit every Nth batch. A value of 1 is equivalent to no sampling.
    pub stride: u32,
}

impl Sampling {
    pub fn every(stride: u32) -> Self {
        Self { stride: stride.max(1) }
    }
}

/// Server-side filter expression.
///
/// 0.1.0 deliberately exposes only `Empty`. The enum is `#[non_exhaustive]` and
/// tagged so future filter shapes can be added without breaking the wire format
/// or downstream pattern matchers.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
#[non_exhaustive]
pub enum FilterExpr {
    /// No filter, accept every row.
    Empty,
}

impl FilterExpr {
    pub fn empty() -> Self {
        Self::Empty
    }

    pub fn is_empty(&self) -> bool {
        matches!(self, Self::Empty)
    }
}

impl Default for FilterExpr {
    fn default() -> Self {
        Self::Empty
    }
}

/// Publisher's reply to a [`SubscriptionDescriptor`].
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "outcome", rename_all = "snake_case")]
#[non_exhaustive]
pub enum SubscribeResponse {
    Ack(SubscribeAck),
    Err(SubscribeError),
}

/// Successful subscription. Tells the consumer where to read from and what to
/// expect.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SubscribeAck {
    /// Endpoint to open for batches. May be the same connection the request
    /// was sent on, or a different one the publisher advertised.
    pub endpoint: Endpoint,
    /// Format batches will be encoded in.
    pub format: FormatPreference,
    /// Final column set if projection was applied. `None` means the full
    /// schema is delivered.
    #[serde(default)]
    pub negotiated_columns: Option<Vec<String>>,
}

/// Reason a subscription was refused.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SubscribeError {
    pub code: SubscribeErrorCode,
    pub message: String,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum SubscribeErrorCode {
    /// No feed with the requested name.
    UnknownFeed,
    /// Server cannot satisfy the transport preference.
    UnsupportedTransport,
    /// Server cannot serialise in the requested format.
    UnsupportedFormat,
    /// Server cannot honour the requested capability, e.g. a non-empty filter.
    UnsupportedCapability,
    /// Generic server-side failure with details in the message.
    Internal,
}