Skip to main content

live_data/
subscription.rs

1//! Subscription request and response types.
2//!
3//! [`SubscriptionDescriptor`] is what a consumer sends; the publisher replies
4//! with [`SubscribeResponse`], either an [`SubscribeAck`] pointing at the
5//! endpoint to read from, or a [`SubscribeError`].
6
7use serde::{Deserialize, Serialize};
8
9use crate::endpoint::Endpoint;
10use crate::transport::{FormatPreference, TransportPreference};
11
12/// Request to open one feed.
13///
14/// Every field except `feed` is optional and represents a *preference*. A 0.1.0
15/// publisher honours `columns` and ignores `filter` and `sampling` unless
16/// [`Capabilities`](crate::Capabilities) advertises support.
17#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
18pub struct SubscriptionDescriptor {
19    /// Name of the feed, matching a [`FeedDescriptor::name`](crate::FeedDescriptor::name).
20    pub feed: String,
21    /// Subset of columns to receive. `None` means "all columns".
22    #[serde(default)]
23    pub columns: Option<Vec<String>>,
24    /// Row-level filter applied server-side. 0.1.0 servers refuse anything
25    /// other than [`FilterExpr::Empty`].
26    #[serde(default = "FilterExpr::empty")]
27    pub filter: FilterExpr,
28    /// Optional sampling policy, e.g. "every Nth batch".
29    #[serde(default)]
30    pub sampling: Option<Sampling>,
31    /// Ordered transport preference. Empty means "any".
32    #[serde(default)]
33    pub transport_pref: TransportPreference,
34    /// Preferred batch format. `None` means "publisher's default".
35    #[serde(default)]
36    pub format_pref: Option<FormatPreference>,
37}
38
39impl SubscriptionDescriptor {
40    pub fn new(feed: impl Into<String>) -> Self {
41        Self {
42            feed: feed.into(),
43            columns: None,
44            filter: FilterExpr::empty(),
45            sampling: None,
46            transport_pref: TransportPreference::any(),
47            format_pref: None,
48        }
49    }
50
51    pub fn columns<I, S>(mut self, cols: I) -> Self
52    where
53        I: IntoIterator<Item = S>,
54        S: Into<String>,
55    {
56        self.columns = Some(cols.into_iter().map(Into::into).collect());
57        self
58    }
59
60    pub fn transport_pref(mut self, pref: TransportPreference) -> Self {
61        self.transport_pref = pref;
62        self
63    }
64
65    pub fn format(mut self, fmt: FormatPreference) -> Self {
66        self.format_pref = Some(fmt);
67        self
68    }
69
70    pub fn sampling(mut self, sampling: Sampling) -> Self {
71        self.sampling = Some(sampling);
72        self
73    }
74}
75
76/// Optional sampling policy that can be applied server-side.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
78pub struct Sampling {
79    /// Emit every Nth batch. A value of 1 is equivalent to no sampling.
80    pub stride: u32,
81}
82
83impl Sampling {
84    pub fn every(stride: u32) -> Self {
85        Self { stride: stride.max(1) }
86    }
87}
88
89/// Server-side filter expression.
90///
91/// 0.1.0 deliberately exposes only `Empty`. The enum is `#[non_exhaustive]` and
92/// tagged so future filter shapes can be added without breaking the wire format
93/// or downstream pattern matchers.
94#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
95#[serde(tag = "kind", rename_all = "snake_case")]
96#[non_exhaustive]
97pub enum FilterExpr {
98    /// No filter, accept every row.
99    Empty,
100}
101
102impl FilterExpr {
103    pub fn empty() -> Self {
104        Self::Empty
105    }
106
107    pub fn is_empty(&self) -> bool {
108        matches!(self, Self::Empty)
109    }
110}
111
112impl Default for FilterExpr {
113    fn default() -> Self {
114        Self::Empty
115    }
116}
117
118/// Publisher's reply to a [`SubscriptionDescriptor`].
119#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
120#[serde(tag = "outcome", rename_all = "snake_case")]
121#[non_exhaustive]
122pub enum SubscribeResponse {
123    Ack(SubscribeAck),
124    Err(SubscribeError),
125}
126
127/// Successful subscription. Tells the consumer where to read from and what to
128/// expect.
129#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
130pub struct SubscribeAck {
131    /// Endpoint to open for batches. May be the same connection the request
132    /// was sent on, or a different one the publisher advertised.
133    pub endpoint: Endpoint,
134    /// Format batches will be encoded in.
135    pub format: FormatPreference,
136    /// Final column set if projection was applied. `None` means the full
137    /// schema is delivered.
138    #[serde(default)]
139    pub negotiated_columns: Option<Vec<String>>,
140}
141
142/// Reason a subscription was refused.
143#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
144pub struct SubscribeError {
145    pub code: SubscribeErrorCode,
146    pub message: String,
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
150#[serde(rename_all = "snake_case")]
151#[non_exhaustive]
152pub enum SubscribeErrorCode {
153    /// No feed with the requested name.
154    UnknownFeed,
155    /// Server cannot satisfy the transport preference.
156    UnsupportedTransport,
157    /// Server cannot serialise in the requested format.
158    UnsupportedFormat,
159    /// Server cannot honour the requested capability, e.g. a non-empty filter.
160    UnsupportedCapability,
161    /// Generic server-side failure with details in the message.
162    Internal,
163}