use live_data::{
Endpoint, FeedDescriptor, SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse,
SubscriptionDescriptor,
};
use crate::registry::ManifestRegistry;
use crate::validate::{ValidationError, negotiate_subscription};
#[derive(Debug, Clone)]
pub struct LocalPublisher {
registry: ManifestRegistry,
default_endpoint: Endpoint,
}
impl LocalPublisher {
pub fn new(server_version: impl Into<String>, default_endpoint: Endpoint) -> Self {
Self {
registry: ManifestRegistry::new(server_version),
default_endpoint,
}
}
pub fn registry(&self) -> &ManifestRegistry {
&self.registry
}
pub fn registry_mut(&mut self) -> &mut ManifestRegistry {
&mut self.registry
}
pub fn default_endpoint(&self) -> &Endpoint {
&self.default_endpoint
}
pub fn register(&mut self, descriptor: FeedDescriptor) -> Result<(), ValidationError> {
self.registry.register(descriptor)
}
pub fn handle_subscribe(&self, req: &SubscriptionDescriptor) -> SubscribeResponse {
let descriptor = match self.registry.get(&req.feed) {
Some(d) => d,
None => {
return SubscribeResponse::Err(SubscribeError {
code: SubscribeErrorCode::UnknownFeed,
message: format!("unknown feed '{}'", req.feed),
});
}
};
match negotiate_subscription(descriptor, req) {
Ok(neg) => SubscribeResponse::Ack(SubscribeAck {
endpoint: Endpoint::new(neg.transport, self.default_endpoint.address.clone()),
format: neg.format,
negotiated_columns: neg.columns,
}),
Err(e) => SubscribeResponse::Err(map_validation(e)),
}
}
}
fn map_validation(e: ValidationError) -> SubscribeError {
let (code, message) = match &e {
ValidationError::UnknownFeed(_) => (SubscribeErrorCode::UnknownFeed, format!("{e}")),
ValidationError::UnknownColumn { .. }
| ValidationError::FilterNotSupported { .. }
| ValidationError::SamplingNotSupported { .. } => {
(SubscribeErrorCode::UnsupportedCapability, format!("{e}"))
}
ValidationError::TransportNotSupported { .. } => {
(SubscribeErrorCode::UnsupportedTransport, format!("{e}"))
}
ValidationError::FormatNotSupported { .. } => {
(SubscribeErrorCode::UnsupportedFormat, format!("{e}"))
}
_ => (SubscribeErrorCode::Internal, format!("{e}")),
};
SubscribeError { code, message }
}