Skip to main content

live_feed/
local.rs

1//! In-memory publisher subscribe-validation contract.
2
3use live_data::{
4    Endpoint, FeedDescriptor, SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse,
5    SubscriptionDescriptor,
6};
7
8use crate::registry::ManifestRegistry;
9use crate::validate::{ValidationError, negotiate_subscription};
10
11/// In-memory publisher backed by a [`ManifestRegistry`].
12///
13#[derive(Debug, Clone)]
14pub struct LocalPublisher {
15    registry: ManifestRegistry,
16    default_endpoint: Endpoint,
17}
18
19impl LocalPublisher {
20    pub fn new(server_version: impl Into<String>, default_endpoint: Endpoint) -> Self {
21        Self {
22            registry: ManifestRegistry::new(server_version),
23            default_endpoint,
24        }
25    }
26
27    pub fn registry(&self) -> &ManifestRegistry {
28        &self.registry
29    }
30
31    pub fn registry_mut(&mut self) -> &mut ManifestRegistry {
32        &mut self.registry
33    }
34
35    /// Default endpoint advertised on successful subscriptions. The
36    /// negotiated transport's tag is paired with this endpoint's
37    /// address to form the [`SubscribeAck::endpoint`].
38    pub fn default_endpoint(&self) -> &Endpoint {
39        &self.default_endpoint
40    }
41
42    pub fn register(&mut self, descriptor: FeedDescriptor) -> Result<(), ValidationError> {
43        self.registry.register(descriptor)
44    }
45
46    /// Validate a subscription request and produce the matching
47    /// [`SubscribeResponse`]. Unknown feeds and capability violations
48    /// produce `Err` variants with descriptive codes; successful
49    /// negotiations produce an `Ack` pointing at the publisher's
50    /// default endpoint with the negotiated transport.
51    pub fn handle_subscribe(&self, req: &SubscriptionDescriptor) -> SubscribeResponse {
52        let descriptor = match self.registry.get(&req.feed) {
53            Some(d) => d,
54            None => {
55                return SubscribeResponse::Err(SubscribeError {
56                    code: SubscribeErrorCode::UnknownFeed,
57                    message: format!("unknown feed '{}'", req.feed),
58                });
59            }
60        };
61        match negotiate_subscription(descriptor, req) {
62            Ok(neg) => SubscribeResponse::Ack(SubscribeAck {
63                endpoint: Endpoint::new(neg.transport, self.default_endpoint.address.clone()),
64                format: neg.format,
65                negotiated_columns: neg.columns,
66            }),
67            Err(e) => SubscribeResponse::Err(map_validation(e)),
68        }
69    }
70}
71
72/// Translate a [`ValidationError`] into the wire-shaped
73/// [`SubscribeError`] code that consumers see.
74fn map_validation(e: ValidationError) -> SubscribeError {
75    let (code, message) = match &e {
76        ValidationError::UnknownFeed(_) => (SubscribeErrorCode::UnknownFeed, format!("{e}")),
77        ValidationError::UnknownColumn { .. }
78        | ValidationError::FilterNotSupported { .. }
79        | ValidationError::SamplingNotSupported { .. } => {
80            (SubscribeErrorCode::UnsupportedCapability, format!("{e}"))
81        }
82        ValidationError::TransportNotSupported { .. } => {
83            (SubscribeErrorCode::UnsupportedTransport, format!("{e}"))
84        }
85        ValidationError::FormatNotSupported { .. } => {
86            (SubscribeErrorCode::UnsupportedFormat, format!("{e}"))
87        }
88        _ => (SubscribeErrorCode::Internal, format!("{e}")),
89    };
90    SubscribeError { code, message }
91}