use core::fmt::Debug;
use std::marker::PhantomData;
use crate::identifier::Identifier;
#[derive(Debug, Clone)]
pub struct StreamQuery<E: Clone> {
filter: Option<StreamFilter>,
origin: i64,
event_type: PhantomData<E>,
}
impl<E: Clone> StreamQuery<E> {
pub fn filter(&self) -> Option<&StreamFilter> {
self.filter.as_ref()
}
pub fn origin(&self) -> i64 {
self.origin
}
pub fn change_origin(mut self, origin: i64) -> Self {
self.origin = origin;
self
}
}
pub fn query<E: Clone>(filter: Option<StreamFilter>) -> StreamQuery<E> {
StreamQuery {
filter,
origin: 0,
event_type: PhantomData,
}
}
pub fn events(names: &'static [&'static str]) -> StreamFilter {
StreamFilter::Events { names }
}
pub fn eq(ident: Identifier, value: impl ToString) -> StreamFilter {
StreamFilter::Eq {
ident,
value: value.to_string(),
}
}
pub fn and(l: StreamFilter, r: StreamFilter) -> StreamFilter {
StreamFilter::And {
l: Box::new(l),
r: Box::new(r),
}
}
pub fn or(l: StreamFilter, r: StreamFilter) -> StreamFilter {
StreamFilter::Or {
l: Box::new(l),
r: Box::new(r),
}
}
#[macro_export]
macro_rules! query {
($event_ty: ty) => {{
$crate::query!(0; $event_ty)
}};
($event_ty:ty, $($filter:tt)+ ) => {{
$crate::query!(0; $event_ty, $($filter)*)
}};
($origin:expr; $event_ty:ty) => {{
$crate::stream_query::query::<$event_ty>(None).change_origin($origin)
}};
($origin:expr; $event_ty:ty, $($filter:tt)+ ) => {{
$crate::stream_query::query::<$event_ty>(Some($crate::filter!($event_ty, $($filter)*))).change_origin($origin)
}};
}
#[macro_export]
#[doc(hidden)]
macro_rules! filter {
($event_ty:ty, events[$($events:ty),+]) =>{
{
use $crate::Event;
const TYPES: &[&str] = {
const FILTER_ARG: &[&str] = &[$(stringify!($events),)+];
if !$crate::utils::include(<$event_ty>::SCHEMA.types, FILTER_ARG) {
panic!("Invalid events filter: specified events not found");
}
FILTER_ARG
};
$crate::stream_query::events(TYPES)
}
};
($event_ty:ty, $ident:ident == $value:expr) => {
{
use $crate::Event;
const _: &[&str] = {
const FILTER_ARG: &[&str] = &[stringify!($ident)];
if !$crate::utils::include(<$event_ty>::SCHEMA.domain_identifiers, FILTER_ARG) {
panic!("Invalid eq filter: specified domain identifier does not exist");
}
FILTER_ARG
};
$crate::stream_query::eq($crate::ident!(#$ident), &$value)
}
};
($event_ty:ty, ($($h:tt)+) and ($($t:tt)+)) => {
$crate::stream_query::and($crate::filter!($event_ty, $($h)+), $crate::filter!($event_ty, $($t)+))
};
($event_ty:ty, ($($h:tt)+) or ($($t:tt)+)) => {
$crate::stream_query::or($crate::filter!($event_ty, $($h)+), $crate::filter!($event_ty, $($t)+))
};
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum StreamFilter {
Events {
names: &'static [&'static str],
},
Eq {
ident: Identifier,
value: String,
},
And {
l: Box<StreamFilter>,
r: Box<StreamFilter>,
},
Or {
l: Box<StreamFilter>,
r: Box<StreamFilter>,
},
}
pub trait FilterEvaluator {
type Result;
fn eval(&mut self, filter: &StreamFilter) -> Self::Result;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{domain_identifiers, ident, DomainIdentifierSet, Event, EventSchema};
#[derive(Clone)]
#[allow(dead_code)]
enum ShoppingCartEvent {
Added {
product_id: String,
cart_id: String,
quantity: i64,
},
Removed {
product_id: String,
cart_id: String,
quantity: i64,
},
}
impl Event for ShoppingCartEvent {
const SCHEMA: EventSchema = EventSchema {
types: &["Added", "Removed"],
domain_identifiers: &["cart_id", "product_id"],
};
fn name(&self) -> &'static str {
match self {
ShoppingCartEvent::Added { .. } => "ShoppingCartAdded",
ShoppingCartEvent::Removed { .. } => "ShoppingCartRemoved",
}
}
fn domain_identifiers(&self) -> DomainIdentifierSet {
match self {
ShoppingCartEvent::Added {
product_id,
cart_id,
..
} => domain_identifiers! {product_id: product_id, cart_id: cart_id},
ShoppingCartEvent::Removed {
product_id,
cart_id,
..
} => domain_identifiers! {product_id: product_id, cart_id: cart_id},
}
}
}
#[test]
fn it_can_create_stream_query_with_filter() {
let filter = eq(ident!(#id), "123");
let query_no_filter: StreamQuery<()> = query(None::<StreamFilter>);
assert_eq!(query_no_filter.filter(), None);
let query_with_filter: StreamQuery<()> = query(Some(filter.clone()));
assert_eq!(query_with_filter.filter(), Some(&filter));
}
#[test]
fn it_can_create_stream_query_macros() {
let query_no_filter: StreamQuery<ShoppingCartEvent> = query!(ShoppingCartEvent);
assert_eq!(query_no_filter.filter(), None);
let query_with_filter: StreamQuery<ShoppingCartEvent> =
query!(ShoppingCartEvent, cart_id == "123");
assert_eq!(
query_with_filter.filter(),
Some(&eq(ident!(#cart_id), "123"))
);
let query_with_origin: StreamQuery<ShoppingCartEvent> =
query!(42; ShoppingCartEvent, cart_id == "123");
assert_eq!(query_with_origin.origin(), 42);
assert_eq!(
query_with_origin.filter(),
Some(&eq(ident!(#cart_id), "123"))
);
let query_with_origin_no_filter: StreamQuery<ShoppingCartEvent> =
query!(42; ShoppingCartEvent);
assert_eq!(query_with_origin_no_filter.origin(), 42);
assert_eq!(query_with_origin_no_filter.filter(), None);
}
#[test]
fn it_can_create_filter_macros() {
let filter = filter!(ShoppingCartEvent, cart_id == "123");
assert_eq!(filter, eq(ident!(#cart_id), "123"));
let filter = filter!(ShoppingCartEvent, (cart_id == "123") and (product_id == "345"));
assert_eq!(
filter,
and(eq(ident!(#cart_id), "123"), eq(ident!(#product_id), "345"))
);
let filter = filter!(ShoppingCartEvent, (cart_id == "123") or (product_id == "345"));
assert_eq!(
filter,
or(eq(ident!(#cart_id), "123"), eq(ident!(#product_id), "345"))
);
let filter = filter!(ShoppingCartEvent, ((cart_id == "123") and (product_id == "345")) or
((cart_id == "678") and (product_id == "901")));
assert_eq!(
filter,
or(
and(eq(ident!(#cart_id), "123"), eq(ident!(#product_id), "345")),
and(eq(ident!(#cart_id), "678"), eq(ident!(#product_id), "901"))
)
);
let filter = filter!(ShoppingCartEvent, (cart_id == "123") and ((product_id == "345") and (events[Added, Removed])));
assert_eq!(
filter,
and(
eq(ident!(#cart_id), "123"),
and(
eq(ident!(#product_id), "345"),
events(&["Added", "Removed"])
)
)
);
}
}