use serde::{Serialize, de::DeserializeOwned};
use std::any::Any;
use crate::error::PeError;
pub trait Channel: Any + Send + Sync {
fn merge(&mut self, update: Box<dyn Any + Send>);
fn clone_box(&self) -> Box<dyn Channel>;
fn clear(&mut self);
fn is_ephemeral(&self) -> bool {
false }
fn checkpoint(&self) -> Result<Vec<u8>, PeError>;
fn type_name(&self) -> &'static str;
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
#[derive(Debug, Clone)]
pub struct LastValue<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
value: T,
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> LastValue<T> {
pub fn new(initial: T) -> Self {
Self { value: initial }
}
pub fn get(&self) -> &T {
&self.value
}
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for LastValue<T> {
fn merge(&mut self, update: Box<dyn Any + Send>) {
if let Ok(val) = update.downcast::<T>() {
self.value = *val;
}
}
fn clone_box(&self) -> Box<dyn Channel> {
Box::new(self.clone())
}
fn clear(&mut self) {
}
fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
bincode::serialize(&self.value).map_err(|e| PeError::Storage {
details: format!("LastValue checkpoint failed: {e}"),
})
}
fn type_name(&self) -> &'static str {
"LastValue"
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
#[derive(Debug, Clone)]
pub struct Appender<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
values: Vec<T>,
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default for Appender<T> {
fn default() -> Self {
Self { values: vec![] }
}
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Appender<T> {
pub fn new() -> Self {
Self::default()
}
pub fn with_initial(values: Vec<T>) -> Self {
Self { values }
}
pub fn get(&self) -> &[T] {
&self.values
}
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for Appender<T> {
fn merge(&mut self, update: Box<dyn Any + Send>) {
match update.downcast::<Vec<T>>() {
Ok(items) => self.values.extend(*items),
Err(update) => {
if let Ok(item) = update.downcast::<T>() {
self.values.push(*item);
}
}
}
}
fn clone_box(&self) -> Box<dyn Channel> {
Box::new(self.clone())
}
fn clear(&mut self) {
}
fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
bincode::serialize(&self.values).map_err(|e| PeError::Storage {
details: format!("Appender checkpoint failed: {e}"),
})
}
fn type_name(&self) -> &'static str {
"Appender"
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
#[derive(Debug, Clone)]
pub struct EphemeralValue<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
value: Option<T>,
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default
for EphemeralValue<T>
{
fn default() -> Self {
Self { value: None }
}
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> EphemeralValue<T> {
pub fn new() -> Self {
Self::default()
}
pub fn get(&self) -> Option<&T> {
self.value.as_ref()
}
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel
for EphemeralValue<T>
{
fn merge(&mut self, update: Box<dyn Any + Send>) {
if let Ok(val) = update.downcast::<T>() {
self.value = Some(*val);
}
}
fn clone_box(&self) -> Box<dyn Channel> {
Box::new(self.clone())
}
fn clear(&mut self) {
self.value = None; }
fn is_ephemeral(&self) -> bool {
true
}
fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
bincode::serialize(&self.value).map_err(|e| PeError::Storage {
details: format!("EphemeralValue checkpoint failed: {e}"),
})
}
fn type_name(&self) -> &'static str {
"EphemeralValue"
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
#[derive(Debug, Clone)]
pub struct Topic<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
values: Vec<T>,
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default for Topic<T> {
fn default() -> Self {
Self { values: vec![] }
}
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Topic<T> {
pub fn new() -> Self {
Self::default()
}
pub fn get(&self) -> &[T] {
&self.values
}
}
impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for Topic<T> {
fn merge(&mut self, update: Box<dyn Any + Send>) {
match update.downcast::<Vec<T>>() {
Ok(items) => self.values.extend(*items),
Err(update) => {
if let Ok(item) = update.downcast::<T>() {
self.values.push(*item);
}
}
}
}
fn clone_box(&self) -> Box<dyn Channel> {
Box::new(self.clone())
}
fn clear(&mut self) {
self.values.clear(); }
fn is_ephemeral(&self) -> bool {
true
}
fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
bincode::serialize(&self.values).map_err(|e| PeError::Storage {
details: format!("Topic checkpoint failed: {e}"),
})
}
fn type_name(&self) -> &'static str {
"Topic"
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}