use std::collections::HashMap;
use std::sync::Arc;
use prost::Name;
use prost_types::Any;
use crate::proto::{event_page, CloudEvent, CloudEventsResponse, EventBook};
pub type CloudEventsHandler<T> = fn(&T) -> Option<CloudEvent>;
type BoxedHandler = Arc<dyn Fn(&Any) -> Option<CloudEvent> + Send + Sync>;
type BoxedHandlerFactory = Arc<dyn Fn() -> BoxedHandler + Send + Sync>;
enum HandlerEntry {
Static(BoxedHandler),
Factory(BoxedHandlerFactory),
}
pub trait CloudEventsProjector: Send + Sync {
fn name(&self) -> &str;
fn domain(&self) -> &str;
}
pub struct CloudEventsRouter {
name: String,
domain: String,
handlers: HashMap<String, HandlerEntry>,
}
impl CloudEventsRouter {
pub fn new(name: impl Into<String>, domain: impl Into<String>) -> Self {
Self {
name: name.into(),
domain: domain.into(),
handlers: HashMap::new(),
}
}
pub fn on<T>(mut self, handler: CloudEventsHandler<T>) -> Self
where
T: prost::Message + Name + Default + 'static,
{
let suffix = T::NAME;
let boxed: BoxedHandler = Arc::new(move |any: &Any| match any.to_msg::<T>() {
Ok(event) => handler(&event),
Err(_) => None,
});
self.handlers.insert(suffix.to_string(), HandlerEntry::Static(boxed));
self
}
pub fn on_with<T, F>(mut self, factory: F) -> Self
where
T: prost::Message + Name + Default + 'static,
F: Fn() -> Arc<dyn Fn(&T) -> Option<CloudEvent> + Send + Sync> + Send + Sync + 'static,
{
let suffix = T::NAME;
let factory_arc: BoxedHandlerFactory = Arc::new(move || {
let inner = factory();
Arc::new(move |any: &Any| match any.to_msg::<T>() {
Ok(event) => inner(&event),
Err(_) => None,
})
});
self.handlers.insert(suffix.to_string(), HandlerEntry::Factory(factory_arc));
self
}
pub fn name(&self) -> &str {
&self.name
}
pub fn domain(&self) -> &str {
&self.domain
}
pub fn event_types(&self) -> Vec<String> {
self.handlers.keys().cloned().collect()
}
pub fn project(&self, source: &EventBook) -> CloudEventsResponse {
let mut events = Vec::new();
for page in &source.pages {
let event_any = match &page.payload {
Some(event_page::Payload::Event(e)) => e,
_ => continue,
};
let type_url = &event_any.type_url;
let suffix = type_url
.rsplit('/')
.next()
.and_then(|full_name: &str| full_name.rsplit('.').next())
.unwrap_or("");
if let Some(entry) = self.handlers.get(suffix) {
let cloud_event = match entry {
HandlerEntry::Static(handler) => handler(event_any),
HandlerEntry::Factory(factory) => {
let handler = factory();
handler(event_any)
}
};
if let Some(ce) = cloud_event {
events.push(ce);
}
}
}
CloudEventsResponse { events }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_router_creation() {
let router = CloudEventsRouter::new("test-projector", "test-domain");
assert_eq!(router.name(), "test-projector");
assert_eq!(router.domain(), "test-domain");
}
}