use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::Stream;
use pin_project_lite::pin_project;
use crate::event::{MEvent, MEventType};
pub trait SagaStreamExt: Stream<Item = MEvent> + Sized {
fn of_item_type(self, item_type: &'static str) -> OfItemType<Self> {
OfItemType {
stream: self,
item_type,
}
}
fn of_change_type(self, change_type: MEventType) -> OfChangeType<Self> {
OfChangeType {
stream: self,
change_type,
}
}
fn pairwise(self) -> Pairwise<Self> {
Pairwise {
stream: self,
previous: None,
}
}
fn accumulate<S, F>(self, initial: S, f: F) -> Scan<Self, S, F>
where
S: Clone,
F: FnMut(&mut S, MEvent) -> S,
{
Scan {
stream: self,
state: initial,
f,
}
}
}
impl<T: Stream<Item = MEvent> + Sized> SagaStreamExt for T {}
pin_project! {
pub struct OfItemType<S> {
#[pin]
stream: S,
item_type: &'static str,
}
}
impl<S: Stream<Item = MEvent>> Stream for OfItemType<S> {
type Item = MEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(event)) => {
if event.item_type() == *this.item_type {
return Poll::Ready(Some(event));
}
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}
pin_project! {
pub struct OfChangeType<S> {
#[pin]
stream: S,
change_type: MEventType,
}
}
impl<S: Stream<Item = MEvent>> Stream for OfChangeType<S> {
type Item = MEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(event)) => {
if event.change_type() == *this.change_type {
return Poll::Ready(Some(event));
}
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}
pin_project! {
pub struct Pairwise<S: Stream<Item = MEvent>> {
#[pin]
stream: S,
previous: Option<MEvent>,
}
}
impl<S: Stream<Item = MEvent>> Stream for Pairwise<S> {
type Item = (MEvent, MEvent);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(event)) => {
if let Some(prev) = this.previous.take() {
*this.previous = Some(event.clone());
return Poll::Ready(Some((prev, event)));
} else {
*this.previous = Some(event);
}
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}
pin_project! {
pub struct Scan<S, State, F> {
#[pin]
stream: S,
state: State,
f: F,
}
}
impl<S, State, F> Stream for Scan<S, State, F>
where
S: Stream<Item = MEvent>,
State: Clone,
F: FnMut(&mut State, MEvent) -> State,
{
type Item = State;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.stream.poll_next(cx) {
Poll::Ready(Some(event)) => {
let new_state = (this.f)(this.state, event);
*this.state = new_state.clone();
Poll::Ready(Some(new_state))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pub fn is_item_type(item_type: &'static str) -> impl Fn(&MEvent) -> bool {
move |event| event.item_type() == item_type
}
pub fn is_change_type(change_type: MEventType) -> impl Fn(&MEvent) -> bool {
move |event| event.change_type() == change_type
}
#[cfg(test)]
mod tests {
use futures::{StreamExt, executor::block_on, stream};
use serde_json::json;
use super::*;
fn make_event(item_type: &str, change_type: MEventType) -> MEvent {
MEvent {
tx: "test-tx".to_string(),
item_type: item_type.to_string(),
item: json!({"id": "test-id", "hash": "test-hash"}),
change_type,
created_at: chrono::Utc::now().to_rfc3339(),
source_id: None,
options: None,
}
}
#[test]
fn test_of_item_type() {
let events = stream::iter(vec![
make_event("Target", MEventType::SET),
make_event("Scene", MEventType::SET),
make_event("Target", MEventType::DEL),
]);
let filtered: Vec<_> = block_on(events.of_item_type("Target").collect());
assert_eq!(filtered.len(), 2);
assert!(filtered.iter().all(|e: &MEvent| e.item_type() == "Target"));
}
#[test]
fn test_of_change_type() {
let events = stream::iter(vec![
make_event("Target", MEventType::SET),
make_event("Target", MEventType::DEL),
make_event("Target", MEventType::SET),
]);
let filtered: Vec<_> = block_on(events.of_change_type(MEventType::SET).collect());
assert_eq!(filtered.len(), 2);
assert!(
filtered
.iter()
.all(|e: &MEvent| e.change_type() == MEventType::SET)
);
}
#[test]
fn test_pairwise() {
let events = stream::iter(vec![
make_event("Target", MEventType::SET),
make_event("Target", MEventType::SET),
make_event("Target", MEventType::SET),
]);
let pairs: Vec<_> = block_on(events.pairwise().collect());
assert_eq!(pairs.len(), 2); }
#[test]
fn test_scan() {
let events = stream::iter(vec![
make_event("Target", MEventType::SET),
make_event("Target", MEventType::SET),
make_event("Target", MEventType::SET),
]);
let counts: Vec<_> = block_on(events.accumulate(0, |count, _| *count + 1).collect());
assert_eq!(counts, vec![1, 2, 3]);
}
#[test]
fn test_chained_operators() {
let events = stream::iter(vec![
make_event("Target", MEventType::SET),
make_event("Scene", MEventType::SET),
make_event("Target", MEventType::DEL),
make_event("Target", MEventType::SET),
]);
let filtered: Vec<_> = block_on(
events
.of_item_type("Target")
.of_change_type(MEventType::SET)
.collect(),
);
assert_eq!(filtered.len(), 2);
}
}