use crate::Signature;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingMapReduce {
pub map_task: Signature,
pub reduce_task: Signature,
pub chunk_size: usize,
pub buffer_size: usize,
pub backpressure: bool,
}
impl StreamingMapReduce {
pub fn new(map_task: Signature, reduce_task: Signature) -> Self {
Self {
map_task,
reduce_task,
chunk_size: 100,
buffer_size: 1000,
backpressure: true,
}
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn with_backpressure(mut self, enabled: bool) -> Self {
self.backpressure = enabled;
self
}
}
impl std::fmt::Display for StreamingMapReduce {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"StreamingMapReduce[map={}, reduce={}, chunk_size={}, buffer_size={}]",
self.map_task.task, self.reduce_task.task, self.chunk_size, self.buffer_size
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Observable<T> {
pub value: T,
#[serde(skip)]
pub subscribers: Vec<Uuid>,
pub history: Vec<(T, u64)>, }
impl<T: Clone> Observable<T> {
pub fn new(value: T) -> Self {
Self {
value,
subscribers: Vec::new(),
history: Vec::new(),
}
}
pub fn subscribe(&mut self, workflow_id: Uuid) {
if !self.subscribers.contains(&workflow_id) {
self.subscribers.push(workflow_id);
}
}
pub fn unsubscribe(&mut self, workflow_id: &Uuid) {
self.subscribers.retain(|id| id != workflow_id);
}
pub fn set(&mut self, value: T) {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
self.history.push((self.value.clone(), timestamp));
self.value = value;
}
pub fn get(&self) -> &T {
&self.value
}
pub fn subscriber_count(&self) -> usize {
self.subscribers.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReactiveWorkflow {
pub workflow_id: Uuid,
pub watched_observables: Vec<String>,
pub reaction_task: Signature,
pub debounce_ms: Option<u64>,
pub throttle_ms: Option<u64>,
pub filter: Option<String>,
}
impl ReactiveWorkflow {
pub fn new(reaction_task: Signature) -> Self {
Self {
workflow_id: Uuid::new_v4(),
watched_observables: Vec::new(),
reaction_task,
debounce_ms: None,
throttle_ms: None,
filter: None,
}
}
pub fn watch(mut self, observable_id: impl Into<String>) -> Self {
self.watched_observables.push(observable_id.into());
self
}
pub fn with_debounce(mut self, milliseconds: u64) -> Self {
self.debounce_ms = Some(milliseconds);
self
}
pub fn with_throttle(mut self, milliseconds: u64) -> Self {
self.throttle_ms = Some(milliseconds);
self
}
pub fn with_filter(mut self, condition: impl Into<String>) -> Self {
self.filter = Some(condition.into());
self
}
}
impl std::fmt::Display for ReactiveWorkflow {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ReactiveWorkflow[id={}, watching={}, reaction={}]",
self.workflow_id,
self.watched_observables.len(),
self.reaction_task.task
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum StreamOperator {
Map,
Filter,
Reduce,
Scan,
Take,
Skip,
Debounce,
Throttle,
}
impl std::fmt::Display for StreamOperator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Map => write!(f, "Map"),
Self::Filter => write!(f, "Filter"),
Self::Reduce => write!(f, "Reduce"),
Self::Scan => write!(f, "Scan"),
Self::Take => write!(f, "Take"),
Self::Skip => write!(f, "Skip"),
Self::Debounce => write!(f, "Debounce"),
Self::Throttle => write!(f, "Throttle"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReactiveStream {
pub stream_id: Uuid,
pub source_id: String,
pub operators: Vec<(StreamOperator, serde_json::Value)>,
#[serde(skip)]
pub subscribers: Vec<Uuid>,
}
impl ReactiveStream {
pub fn new(source_id: impl Into<String>) -> Self {
Self {
stream_id: Uuid::new_v4(),
source_id: source_id.into(),
operators: Vec::new(),
subscribers: Vec::new(),
}
}
pub fn map(mut self, transform: serde_json::Value) -> Self {
self.operators.push((StreamOperator::Map, transform));
self
}
pub fn filter(mut self, condition: serde_json::Value) -> Self {
self.operators.push((StreamOperator::Filter, condition));
self
}
pub fn take(mut self, count: usize) -> Self {
self.operators
.push((StreamOperator::Take, serde_json::json!(count)));
self
}
pub fn skip(mut self, count: usize) -> Self {
self.operators
.push((StreamOperator::Skip, serde_json::json!(count)));
self
}
pub fn debounce(mut self, milliseconds: u64) -> Self {
self.operators
.push((StreamOperator::Debounce, serde_json::json!(milliseconds)));
self
}
pub fn throttle(mut self, milliseconds: u64) -> Self {
self.operators
.push((StreamOperator::Throttle, serde_json::json!(milliseconds)));
self
}
pub fn subscribe(&mut self, workflow_id: Uuid) {
if !self.subscribers.contains(&workflow_id) {
self.subscribers.push(workflow_id);
}
}
}
impl std::fmt::Display for ReactiveStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ReactiveStream[id={}, source={}, operators={}]",
self.stream_id,
self.source_id,
self.operators.len()
)
}
}