use std::sync::Arc;
use prost_types::Any;
use crate::proto::{event_page, EventPage};
#[cfg(test)]
use crate::proto::{page_header::SequenceType, PageHeader};
pub type UpcasterHandler = fn(&Any) -> Any;
pub type BoxedUpcasterHandler = Box<dyn Fn(&Any) -> Any + Send + Sync>;
pub type UpcasterHandlerHOF = Arc<dyn Fn() -> BoxedUpcasterHandler + Send + Sync>;
enum UpcasterHandlerEntry {
Static(BoxedUpcasterHandler),
Factory(UpcasterHandlerHOF),
}
struct UpcasterEntry {
suffix: String,
handler: UpcasterHandlerEntry,
}
pub struct UpcasterRouter {
domain: String,
handlers: Vec<UpcasterEntry>,
}
impl UpcasterRouter {
pub fn new(domain: impl Into<String>) -> Self {
Self {
domain: domain.into(),
handlers: Vec::new(),
}
}
pub fn on<F>(mut self, suffix: impl Into<String>, handler: F) -> Self
where
F: Fn(&Any) -> Any + Send + Sync + 'static,
{
self.handlers.push(UpcasterEntry {
suffix: suffix.into(),
handler: UpcasterHandlerEntry::Static(Box::new(handler)),
});
self
}
pub fn on_fn(self, suffix: impl Into<String>, handler: UpcasterHandler) -> Self {
self.on(suffix, handler)
}
pub fn on_with<F>(mut self, suffix: impl Into<String>, factory: F) -> Self
where
F: Fn() -> BoxedUpcasterHandler + Send + Sync + 'static,
{
self.handlers.push(UpcasterEntry {
suffix: suffix.into(),
handler: UpcasterHandlerEntry::Factory(Arc::new(factory)),
});
self
}
pub fn domain(&self) -> &str {
&self.domain
}
pub fn event_types(&self) -> Vec<String> {
self.handlers.iter().map(|e| e.suffix.clone()).collect()
}
pub fn upcast_event(&self, event: &Any) -> Any {
let type_url = &event.type_url;
for entry in &self.handlers {
if type_url.ends_with(&entry.suffix) {
return match &entry.handler {
UpcasterHandlerEntry::Static(handler) => handler(event),
UpcasterHandlerEntry::Factory(factory) => {
let handler = factory();
handler(event)
}
};
}
}
event.clone()
}
pub fn upcast(&self, events: &[EventPage]) -> Vec<EventPage> {
events
.iter()
.map(|page| {
match &page.payload {
Some(event_page::Payload::Event(event)) => {
let new_event = self.upcast_event(event);
if new_event.type_url != event.type_url {
EventPage {
payload: Some(event_page::Payload::Event(new_event)),
header: page.header.clone(),
created_at: page.created_at,
}
} else {
page.clone()
}
}
_ => page.clone(),
}
})
.collect()
}
pub fn handles(&self, type_url: &str) -> bool {
self.handlers.iter().any(|e| type_url.ends_with(&e.suffix))
}
}
pub struct UpcasterMode;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn upcaster_router_creation() {
let router = UpcasterRouter::new("order");
assert_eq!(router.domain(), "order");
assert!(router.event_types().is_empty());
}
#[test]
fn upcaster_router_registration() {
let router = UpcasterRouter::new("order")
.on("OrderCreatedV1", |old| old.clone())
.on("OrderShippedV1", |old| old.clone());
assert_eq!(router.event_types().len(), 2);
assert!(router.handles("type.googleapis.com/examples.OrderCreatedV1"));
assert!(router.handles("type.googleapis.com/examples.OrderShippedV1"));
assert!(!router.handles("type.googleapis.com/examples.OrderCompleted"));
}
#[test]
fn upcaster_passthrough_no_match() {
let router = UpcasterRouter::new("order").on("OrderCreatedV1", |old| old.clone());
let event = Any {
type_url: "type.googleapis.com/examples.OrderCompleted".to_string(),
value: vec![1, 2, 3],
};
let result = router.upcast_event(&event);
assert_eq!(result.type_url, event.type_url);
assert_eq!(result.value, event.value);
}
#[test]
fn upcaster_transforms_matching() {
let router = UpcasterRouter::new("order").on("OrderCreatedV1", |_old| Any {
type_url: "type.googleapis.com/examples.OrderCreated".to_string(),
value: vec![4, 5, 6],
});
let event = Any {
type_url: "type.googleapis.com/examples.OrderCreatedV1".to_string(),
value: vec![1, 2, 3],
};
let result = router.upcast_event(&event);
assert_eq!(result.type_url, "type.googleapis.com/examples.OrderCreated");
assert_eq!(result.value, vec![4, 5, 6]);
}
#[test]
fn upcaster_batch_processing() {
let router = UpcasterRouter::new("order").on("OrderCreatedV1", |_old| Any {
type_url: "type.googleapis.com/examples.OrderCreated".to_string(),
value: vec![],
});
let pages = vec![
EventPage {
payload: Some(event_page::Payload::Event(Any {
type_url: "type.googleapis.com/examples.OrderCreatedV1".to_string(),
value: vec![],
})),
header: Some(PageHeader {
sequence_type: Some(SequenceType::Sequence(0)),
}),
created_at: None,
},
EventPage {
payload: Some(event_page::Payload::Event(Any {
type_url: "type.googleapis.com/examples.OrderCompleted".to_string(),
value: vec![],
})),
header: Some(PageHeader {
sequence_type: Some(SequenceType::Sequence(1)),
}),
created_at: None,
},
];
let result = router.upcast(&pages);
assert_eq!(result.len(), 2);
if let Some(event_page::Payload::Event(e)) = &result[0].payload {
assert_eq!(e.type_url, "type.googleapis.com/examples.OrderCreated");
} else {
panic!("Expected event payload");
}
if let Some(event_page::Payload::Event(e)) = &result[1].payload {
assert_eq!(e.type_url, "type.googleapis.com/examples.OrderCompleted");
} else {
panic!("Expected event payload");
}
}
}