use crate::errors::Result;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
pub mod last_value;
pub mod topic;
pub mod binop;
pub mod ephemeral;
pub use last_value::LastValue;
pub use topic::Topic;
pub use binop::BinaryOperatorAggregate;
pub use ephemeral::EphemeralValue;
pub trait BaseChannel: Send + Sync + Debug {
fn get(&self) -> Result<Option<serde_json::Value>>;
fn update(&mut self, values: Vec<serde_json::Value>) -> Result<()>;
fn checkpoint(&self) -> Result<serde_json::Value>;
fn from_checkpoint(data: serde_json::Value) -> Result<Box<dyn BaseChannel>>
where
Self: Sized;
fn type_name(&self) -> &'static str;
fn is_empty(&self) -> bool {
self.get().ok().flatten().is_none()
}
}
pub type ChannelBox = Box<dyn BaseChannel>;
pub fn last_value<T>() -> LastValue<T>
where
T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Debug + 'static,
{
LastValue::new()
}
pub fn topic<T>() -> Topic<T>
where
T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Debug + 'static,
{
Topic::new()
}
pub fn binop<T, F>(initial: T, reducer: F) -> BinaryOperatorAggregate<T, F>
where
T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Debug + 'static,
F: Fn(T, T) -> T + Send + Sync + 'static,
{
BinaryOperatorAggregate::new(initial, reducer)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_channel_helpers() {
let mut lv = last_value::<i32>();
lv.update(vec![serde_json::json!(42)]).unwrap();
assert_eq!(lv.get().unwrap(), Some(serde_json::json!(42)));
let mut topic = topic::<String>();
topic.update(vec![serde_json::json!("hello")]).unwrap();
topic.update(vec![serde_json::json!("world")]).unwrap();
let values: Vec<String> = serde_json::from_value(topic.get().unwrap().unwrap()).unwrap();
assert_eq!(values.len(), 2);
}
}