use core::fmt::Debug;
use std::marker::PhantomData;
use crate::{domain_ids, event::EventId, DomainIdSet, Event, PersistedEvent};
#[derive(Debug, Clone)]
pub struct StreamQuery<ID: EventId, E: Event + Clone> {
filters: Vec<StreamFilter<ID, E>>,
event_type: PhantomData<E>,
event_id_type: PhantomData<ID>,
}
impl<ID: EventId, E: Event + Clone> StreamQuery<ID, E> {
pub fn filters(&self) -> &[StreamFilter<ID, E>] {
&self.filters
}
pub fn cast<U>(&self) -> StreamQuery<ID, U>
where
E: Event + Into<U>,
U: Event + Clone,
{
StreamQuery {
filters: self.filters.iter().map(|f| f.cast()).collect(),
event_type: PhantomData,
event_id_type: PhantomData,
}
}
pub fn union<U, O>(&self, other: &StreamQuery<ID, O>) -> StreamQuery<ID, U>
where
E: Event + Into<U>,
U: Event + Clone,
O: Event + Into<U> + Clone,
{
let filters = self
.filters
.iter()
.map(|f| f.cast())
.chain(other.filters.iter().map(|f| f.cast()))
.collect();
StreamQuery {
filters,
event_type: PhantomData,
event_id_type: PhantomData,
}
}
pub fn change_origin(self, origin: ID) -> Self {
let filters = self
.filters
.iter()
.map(|f| StreamFilter {
origin,
..f.clone()
})
.collect();
StreamQuery {
filters,
event_type: PhantomData,
event_id_type: PhantomData,
}
}
pub fn exclude_events(self, excluded_events: &'static [&'static str]) -> Self {
let filters = self
.filters
.iter()
.map(|f| StreamFilter {
excluded_events: Some(
excluded_events
.iter()
.filter(|e| f.events.contains(e))
.cloned()
.collect(),
),
..f.clone()
})
.collect();
StreamQuery {
filters,
event_type: PhantomData,
event_id_type: PhantomData,
}
}
pub fn matches(&self, event: &PersistedEvent<ID, E>) -> bool {
self.filters.iter().any(|filter| {
if let Some(excluded_events) = &filter.excluded_events {
if excluded_events.contains(&event.name()) {
return false;
}
}
if !filter.events.contains(&event.name()) {
return false;
}
if filter
.identifiers
.iter()
.any(|(ident, value)| event.domain_ids().get(ident) != Some(value))
{
return false;
}
if event.id() <= filter.origin {
return false;
}
true
})
}
pub fn matches_event(&self, event: &str) -> bool {
self.filters.iter().any(|filter| {
if let Some(excluded_events) = &filter.excluded_events {
if excluded_events.contains(&event) {
return false;
}
}
true
})
}
}
impl<ID: EventId, E: Event + Clone + PartialEq> PartialEq for StreamQuery<ID, E> {
fn eq(&self, other: &Self) -> bool {
self.filters == other.filters
}
}
pub fn query<ID, E, O>(filter: Option<StreamFilter<ID, O>>) -> StreamQuery<ID, E>
where
ID: EventId,
E: Event + Clone,
O: Event + Clone + Into<E>,
{
if let Some(filter) = filter {
StreamQuery {
filters: vec![filter.cast()],
event_type: PhantomData,
event_id_type: PhantomData,
}
} else {
StreamQuery {
filters: vec![StreamFilter::new(domain_ids!())],
event_type: PhantomData,
event_id_type: PhantomData,
}
}
}
#[macro_export]
macro_rules! query {
($event_ty: ty) => {{
$crate::query::<_, $event_ty, $event_ty>(None)
}};
($event_ty:ty; $($filter:tt)+ ) => {{
$crate::query::<_, $event_ty, _>(Some($crate::filter!($event_ty; $($filter)*)))
}};
($origin:expr => $event_ty:ty; $($filter:tt)+ ) => {{
$crate::query!($event_ty; $($filter)*).change_origin($origin)
}};
}
#[macro_export]
macro_rules! event_types{
($event_ty:ty, [$($events:ty),+]) =>{
{
use $crate::Event;
const EVENTS: &[&str] = {
const FILTER_ARG: &[&str] = &[$(stringify!($events),)+];
if !$crate::utils::include(<$event_ty>::SCHEMA.events, FILTER_ARG) {
panic!("one or more of the specified events do not exist");
}
FILTER_ARG
};
EVENTS
}
};
}
#[macro_export]
#[doc(hidden)]
macro_rules! filter {
($origin:expr => $event_ty:ty; $($ident:ident == $value:expr),*) =>{
$crate::filter!($event_ty; $($ident == $value),*).change_origin($origin)
};
($event_ty:ty; $($ident:ident == $value:expr),*) =>{
{
#[allow(dead_code)]
{
use $crate::Event;
const DOMAIN_IDS: &[&$crate::DomainIdInfo] = <$event_ty>::SCHEMA.domain_ids;
const DOMAIN_IDS_INDENTS: &[&str] = &$crate::const_slice_iter!(DOMAIN_IDS, const fn map(item: &$crate::DomainIdInfo) -> &str {
item.ident.into_inner()
});
$(
const _:&[&str] = {
const FILTER_ARG: &[&str] = &[stringify!($ident)];
if !$crate::utils::include(DOMAIN_IDS_INDENTS, FILTER_ARG) {
panic!(concat!("Invalid domain filter: the domain identifier ", stringify!($ident), " does not exist"));
}
FILTER_ARG
};
)*
}
$crate::StreamFilter::<_, $event_ty>::new($crate::domain_ids!($($ident: $value.clone()),*))
}
};
}
#[macro_export]
macro_rules! union {
($query:expr) =>{
Into::<$crate::stream_query::StreamQuery<_, _>>::into($query).cast()
};
($query1:expr, $query2: expr) =>{
$crate::StreamQuery::<_, _>::union(&Into::<$crate::StreamQuery<_, _>>::into($query1),&Into::<$crate::StreamQuery<_, _>>::into($query2))
};
($query:expr, $($queries: expr),*) =>{
{
let mut result = $crate::union!($($queries),*);
result = $crate::StreamQuery::<_, _>::union(&Into::<$crate::StreamQuery<_, _>>::into($query), &result);
result
}
};
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamFilter<ID: EventId, E: Event + Clone> {
events: &'static [&'static str],
identifiers: DomainIdSet,
origin: ID,
excluded_events: Option<Vec<&'static str>>,
event_type: PhantomData<E>,
}
impl<ID: EventId, E: Event + Clone> StreamFilter<ID, E> {
pub fn new(identifiers: DomainIdSet) -> Self {
Self {
events: E::SCHEMA.events,
identifiers,
origin: Default::default(),
excluded_events: None,
event_type: PhantomData,
}
}
pub fn change_origin(self, origin: ID) -> Self {
Self { origin, ..self }
}
pub fn exclude_events(self, excluded_events: &'static [&'static str]) -> Self {
Self {
excluded_events: Some(excluded_events.to_vec()),
..self
}
}
pub fn cast<O>(&self) -> StreamFilter<ID, O>
where
E: Event + Into<O>,
O: Event + Clone,
{
StreamFilter {
events: self.events,
identifiers: self.identifiers.clone(),
origin: self.origin,
excluded_events: self.excluded_events.clone(),
event_type: PhantomData,
}
}
pub fn events(&self) -> &'static [&'static str] {
self.events
}
pub fn identifiers(&self) -> &DomainIdSet {
&self.identifiers
}
pub fn origin(&self) -> ID {
self.origin
}
pub fn excluded_events(&self) -> Option<&Vec<&'static str>> {
self.excluded_events.as_ref()
}
}
#[cfg(test)]
mod tests {
use crate::ident;
use crate::stream_query::StreamFilter;
use crate::utils::tests::*;
use crate::IdentifierValue;
#[test]
fn test_filter_with_no_origin_and_no_exclude_events() {
let filter: StreamFilter<i64, _> = filter! {
ShoppingCartEvent;
cart_id == 42
};
assert_eq!(filter.identifiers.len(), 1);
assert_eq!(
filter.identifiers[&ident!(#cart_id)],
IdentifierValue::i64(42)
);
}
#[test]
fn test_filter_with_origin() {
let filter = filter! {
10 =>
ShoppingCartEvent;
cart_id == 42
};
assert_eq!(filter.origin, 10);
}
#[test]
fn test_filter_with_all_parameters() {
let filter = filter! {
10 =>
ShoppingCartEvent;
cart_id == 42
};
assert_eq!(filter.origin, 10);
assert_eq!(filter.identifiers.len(), 1);
assert_eq!(
filter.identifiers[&ident!(#cart_id)],
IdentifierValue::i64(42)
);
}
}