1use live_data::{
4 Endpoint, FeedDescriptor, SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse,
5 SubscriptionDescriptor,
6};
7
8use crate::registry::ManifestRegistry;
9use crate::validate::{ValidationError, negotiate_subscription};
10
11#[derive(Debug, Clone)]
14pub struct LocalPublisher {
15 registry: ManifestRegistry,
16 default_endpoint: Endpoint,
17}
18
19impl LocalPublisher {
20 pub fn new(server_version: impl Into<String>, default_endpoint: Endpoint) -> Self {
21 Self {
22 registry: ManifestRegistry::new(server_version),
23 default_endpoint,
24 }
25 }
26
27 pub fn registry(&self) -> &ManifestRegistry {
28 &self.registry
29 }
30
31 pub fn registry_mut(&mut self) -> &mut ManifestRegistry {
32 &mut self.registry
33 }
34
35 pub fn default_endpoint(&self) -> &Endpoint {
39 &self.default_endpoint
40 }
41
42 pub fn register(&mut self, descriptor: FeedDescriptor) -> Result<(), ValidationError> {
43 self.registry.register(descriptor)
44 }
45
46 pub fn handle_subscribe(&self, req: &SubscriptionDescriptor) -> SubscribeResponse {
52 let descriptor = match self.registry.get(&req.feed) {
53 Some(d) => d,
54 None => {
55 return SubscribeResponse::Err(SubscribeError {
56 code: SubscribeErrorCode::UnknownFeed,
57 message: format!("unknown feed '{}'", req.feed),
58 });
59 }
60 };
61 match negotiate_subscription(descriptor, req) {
62 Ok(neg) => SubscribeResponse::Ack(SubscribeAck {
63 endpoint: Endpoint::new(neg.transport, self.default_endpoint.address.clone()),
64 format: neg.format,
65 negotiated_columns: neg.columns,
66 }),
67 Err(e) => SubscribeResponse::Err(map_validation(e)),
68 }
69 }
70}
71
72fn map_validation(e: ValidationError) -> SubscribeError {
75 let (code, message) = match &e {
76 ValidationError::UnknownFeed(_) => (SubscribeErrorCode::UnknownFeed, format!("{e}")),
77 ValidationError::UnknownColumn { .. }
78 | ValidationError::FilterNotSupported { .. }
79 | ValidationError::SamplingNotSupported { .. } => {
80 (SubscribeErrorCode::UnsupportedCapability, format!("{e}"))
81 }
82 ValidationError::TransportNotSupported { .. } => {
83 (SubscribeErrorCode::UnsupportedTransport, format!("{e}"))
84 }
85 ValidationError::FormatNotSupported { .. } => {
86 (SubscribeErrorCode::UnsupportedFormat, format!("{e}"))
87 }
88 _ => (SubscribeErrorCode::Internal, format!("{e}")),
89 };
90 SubscribeError { code, message }
91}