use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch, RwLock};
use super::types::{GraphResult, InputCache, SourceName};
type SeqQueue = Arc<RwLock<VecDeque<(SourceName, Vec<u8>)>>>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "state")]
pub enum ReactorHealth {
Starting,
Warming {
healthy: Vec<String>,
waiting: Vec<String>,
},
Live,
Degraded { disconnected: Vec<String> },
}
impl std::fmt::Display for ReactorHealth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Starting => write!(f, "starting"),
Self::Warming { .. } => write!(f, "warming"),
Self::Live => write!(f, "live"),
Self::Degraded { .. } => write!(f, "degraded"),
}
}
}
pub fn reactor_health_channel() -> (watch::Sender<ReactorHealth>, watch::Receiver<ReactorHealth>) {
watch::channel(ReactorHealth::Starting)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReactionCriteria {
WhenAny,
WhenAll,
}
impl From<cloacina_computation_graph::ReactionMode> for ReactionCriteria {
fn from(mode: cloacina_computation_graph::ReactionMode) -> Self {
match mode {
cloacina_computation_graph::ReactionMode::WhenAny => ReactionCriteria::WhenAny,
cloacina_computation_graph::ReactionMode::WhenAll => ReactionCriteria::WhenAll,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InputStrategy {
Latest,
Sequential,
}
#[derive(Debug, Clone)]
pub struct DirtyFlags {
pub(crate) flags: HashMap<SourceName, bool>,
}
impl DirtyFlags {
pub fn new() -> Self {
Self {
flags: HashMap::new(),
}
}
pub fn with_sources(sources: &[SourceName]) -> Self {
let mut flags = HashMap::new();
for source in sources {
flags.insert(source.clone(), false);
}
Self { flags }
}
pub fn set(&mut self, source: SourceName, dirty: bool) {
self.flags.insert(source, dirty);
}
pub fn any_set(&self) -> bool {
self.flags.values().any(|&v| v)
}
pub fn all_set(&self) -> bool {
!self.flags.is_empty() && self.flags.values().all(|&v| v)
}
pub fn clear_all(&mut self) {
for v in self.flags.values_mut() {
*v = false;
}
}
}
impl Default for DirtyFlags {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub enum StrategySignal {
BoundaryReceived,
ForceFire,
}
#[derive(Debug)]
pub enum ManualCommand {
ForceFire,
FireWith(InputCache),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "command", rename_all = "snake_case")]
pub enum ReactorCommand {
ForceFire,
FireWith { cache: HashMap<String, Vec<u8>> },
GetState,
Pause,
Resume,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ReactorResponse {
Fired,
State { cache: HashMap<String, String> },
Paused,
Resumed,
Error { message: String },
}
#[derive(Clone)]
pub struct ReactorHandle {
pub cache: Arc<RwLock<InputCache>>,
pub paused: Arc<AtomicBool>,
}
impl ReactorHandle {
pub async fn get_state(&self) -> HashMap<String, String> {
let cache = self.cache.read().await;
cache.entries_as_json()
}
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::SeqCst)
}
pub fn pause(&self) {
self.paused.store(true, Ordering::SeqCst);
}
pub fn resume(&self) {
self.paused.store(false, Ordering::SeqCst);
}
}
pub use cloacina_computation_graph::CompiledGraphFn;
pub struct Reactor {
graph: CompiledGraphFn,
criteria: ReactionCriteria,
input_strategy: InputStrategy,
accumulator_rx: mpsc::Receiver<(SourceName, Vec<u8>)>,
manual_rx: mpsc::Receiver<ManualCommand>,
shutdown: watch::Receiver<bool>,
cache: Arc<RwLock<InputCache>>,
paused: Arc<AtomicBool>,
expected_sources: Vec<SourceName>,
graph_name: String,
dal: Option<crate::dal::unified::DAL>,
health: Option<watch::Sender<ReactorHealth>>,
accumulator_health_rxs: Vec<(
String,
watch::Receiver<super::accumulator::AccumulatorHealth>,
)>,
batch_flush_senders: Vec<mpsc::Sender<()>>,
}
impl Reactor {
pub fn new(
graph: CompiledGraphFn,
criteria: ReactionCriteria,
input_strategy: InputStrategy,
accumulator_rx: mpsc::Receiver<(SourceName, Vec<u8>)>,
manual_rx: mpsc::Receiver<ManualCommand>,
shutdown: watch::Receiver<bool>,
) -> Self {
Self {
graph,
criteria,
input_strategy,
accumulator_rx,
manual_rx,
shutdown,
cache: Arc::new(RwLock::new(InputCache::new())),
paused: Arc::new(AtomicBool::new(false)),
expected_sources: Vec::new(),
graph_name: String::new(),
dal: None,
health: None,
accumulator_health_rxs: Vec::new(),
batch_flush_senders: Vec::new(),
}
}
pub fn with_batch_flush_senders(mut self, senders: Vec<mpsc::Sender<()>>) -> Self {
self.batch_flush_senders = senders;
self
}
pub fn with_graph_name(mut self, name: String) -> Self {
self.graph_name = name;
self
}
pub fn with_dal(mut self, dal: crate::dal::unified::DAL) -> Self {
self.dal = Some(dal);
self
}
pub fn with_health(mut self, health: watch::Sender<ReactorHealth>) -> Self {
self.health = Some(health);
self
}
pub fn with_expected_sources(mut self, sources: Vec<SourceName>) -> Self {
self.expected_sources = sources;
self
}
pub fn with_accumulator_health(
mut self,
rxs: Vec<(
String,
watch::Receiver<super::accumulator::AccumulatorHealth>,
)>,
) -> Self {
self.accumulator_health_rxs = rxs;
self
}
pub fn handle(&self) -> ReactorHandle {
ReactorHandle {
cache: self.cache.clone(),
paused: self.paused.clone(),
}
}
pub async fn run(mut self) {
if let Some(ref health) = self.health {
let _ = health.send(ReactorHealth::Starting);
}
let cache = self.cache.clone();
if let Some(ref dal) = self.dal {
if !self.graph_name.is_empty() {
match dal.checkpoint().load_reactor_state(&self.graph_name).await {
Ok(Some((cache_data, _dirty_data, _seq_queue))) => {
if let Ok(entries) =
serde_json::from_slice::<HashMap<SourceName, Vec<u8>>>(&cache_data)
{
let mut c = cache.write().await;
for (source, bytes) in entries {
c.update(source, bytes);
}
tracing::info!(graph = %self.graph_name, "reactor cache restored from DAL");
}
}
Ok(None) => {
tracing::debug!(graph = %self.graph_name, "no persisted reactor state found");
}
Err(e) => {
tracing::warn!(graph = %self.graph_name, "failed to load reactor state: {}", e);
}
}
}
}
let dirty = if self.expected_sources.is_empty() {
Arc::new(RwLock::new(DirtyFlags::new()))
} else {
Arc::new(RwLock::new(DirtyFlags::with_sources(
&self.expected_sources,
)))
};
if !self.accumulator_health_rxs.is_empty() {
use super::accumulator::AccumulatorHealth;
let all_names: Vec<String> = self
.accumulator_health_rxs
.iter()
.map(|(n, _)| n.clone())
.collect();
let mut healthy_set: std::collections::HashSet<String> =
std::collections::HashSet::new();
if let Some(ref health) = self.health {
let _ = health.send(ReactorHealth::Warming {
healthy: vec![],
waiting: all_names.clone(),
});
}
let mut shutdown_gate = self.shutdown.clone();
'gating: loop {
for (name, rx) in &self.accumulator_health_rxs {
let h = rx.borrow().clone();
match h {
AccumulatorHealth::Live | AccumulatorHealth::SocketOnly => {
healthy_set.insert(name.clone());
}
_ => {}
}
}
if healthy_set.len() >= all_names.len() {
break 'gating;
}
tokio::select! {
_ = shutdown_gate.changed() => {
tracing::debug!(graph = %self.graph_name, "shutdown during startup gating");
return;
}
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
}
}
if let Some(ref health) = self.health {
let waiting: Vec<String> = all_names
.iter()
.filter(|n| !healthy_set.contains(*n))
.cloned()
.collect();
let _ = health.send(ReactorHealth::Warming {
healthy: healthy_set.iter().cloned().collect(),
waiting,
});
}
}
}
if let Some(ref health) = self.health {
let _ = health.send(ReactorHealth::Live);
}
let _degraded_monitor = if let Some(ref health) = self.health {
let health_tx = health.clone();
let acc_rxs = std::mem::take(&mut self.accumulator_health_rxs);
let mut shutdown_mon = self.shutdown.clone();
let graph_name = self.graph_name.clone();
Some(tokio::spawn(async move {
use super::accumulator::AccumulatorHealth;
if acc_rxs.is_empty() {
let _ = shutdown_mon.changed().await;
return;
}
loop {
tokio::select! {
_ = shutdown_mon.changed() => break,
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
let disconnected: Vec<String> = acc_rxs
.iter()
.filter(|(_, rx)| {
matches!(*rx.borrow(), AccumulatorHealth::Disconnected)
})
.map(|(name, _)| name.clone())
.collect();
if disconnected.is_empty() {
if matches!(&*health_tx.borrow(), ReactorHealth::Degraded { .. }) {
tracing::info!(graph = %graph_name, "all accumulators recovered — back to Live");
let _ = health_tx.send(ReactorHealth::Live);
}
} else {
if !matches!(&*health_tx.borrow(), ReactorHealth::Degraded { .. }) {
tracing::warn!(graph = %graph_name, ?disconnected, "accumulator(s) disconnected — entering Degraded mode");
}
let _ = health_tx.send(ReactorHealth::Degraded { disconnected });
}
}
}
}
}))
} else {
None
};
let paused = self.paused.clone();
let input_strategy = self.input_strategy.clone();
let seq_queue: SeqQueue = Arc::new(RwLock::new(VecDeque::new()));
let (strategy_tx, mut strategy_rx) = mpsc::channel::<StrategySignal>(64);
let cache_recv = cache.clone();
let dirty_recv = dirty.clone();
let seq_queue_recv = seq_queue.clone();
let input_strategy_recv = input_strategy.clone();
let mut shutdown_recv = self.shutdown.clone();
let mut accumulator_rx = self.accumulator_rx;
let mut manual_rx = self.manual_rx;
let strategy_tx_recv = strategy_tx.clone();
let receiver_handle = tokio::spawn(async move {
loop {
tokio::select! {
Some((source, bytes)) = accumulator_rx.recv() => {
match input_strategy_recv {
InputStrategy::Latest => {
cache_recv.write().await.update(source.clone(), bytes);
dirty_recv.write().await.set(source, true);
}
InputStrategy::Sequential => {
seq_queue_recv.write().await.push_back((source, bytes));
}
}
let _ = strategy_tx_recv.send(StrategySignal::BoundaryReceived).await;
}
Some(cmd) = manual_rx.recv() => {
match cmd {
ManualCommand::ForceFire => {
let _ = strategy_tx_recv.send(StrategySignal::ForceFire).await;
}
ManualCommand::FireWith(new_cache) => {
cache_recv.write().await.replace_all(new_cache);
let _ = strategy_tx_recv.send(StrategySignal::ForceFire).await;
}
}
}
_ = shutdown_recv.changed() => {
tracing::debug!("reactor receiver shutting down");
break;
}
}
}
});
let cache_exec = cache.clone();
let dirty_exec = dirty.clone();
let seq_queue_exec = seq_queue.clone();
let mut shutdown_exec = self.shutdown.clone();
let graph = self.graph.clone();
let criteria = self.criteria.clone();
let dal_exec = self.dal.clone();
let graph_name_exec = self.graph_name.clone();
let batch_flush = self.batch_flush_senders.clone();
let fire_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
loop {
tokio::select! {
Some(signal) = strategy_rx.recv() => {
match input_strategy {
InputStrategy::Latest => {
let should_run = match signal {
StrategySignal::BoundaryReceived => {
let d = dirty_exec.read().await;
match &criteria {
ReactionCriteria::WhenAny => d.any_set(),
ReactionCriteria::WhenAll => d.all_set(),
}
}
StrategySignal::ForceFire => true,
};
if should_run && !paused.load(Ordering::SeqCst) {
let snapshot = cache_exec.read().await.snapshot();
dirty_exec.write().await.clear_all();
let result = (graph)(snapshot).await;
match &result {
GraphResult::Completed { .. } => {
let fires = fire_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
tracing::info!(graph = %graph_name_exec, fires, "graph execution completed");
persist_reactor_state(
&dal_exec, &graph_name_exec, &cache_exec, &dirty_exec, None,
).await;
for sender in &batch_flush {
let _ = sender.try_send(());
}
}
GraphResult::Error(e) => {
tracing::error!(graph = %graph_name_exec, "graph execution failed: {}", e);
}
}
}
}
InputStrategy::Sequential => {
if paused.load(Ordering::SeqCst) {
continue;
}
persist_reactor_state(
&dal_exec, &graph_name_exec, &cache_exec, &dirty_exec,
Some(&seq_queue_exec),
).await;
loop {
let item = seq_queue_exec.write().await.pop_front();
match item {
Some((source, bytes)) => {
cache_exec.write().await.update(source, bytes);
let snapshot = cache_exec.read().await.snapshot();
let result = (graph)(snapshot).await;
match &result {
GraphResult::Completed { .. } => {
let fires = fire_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
tracing::info!(graph = %graph_name_exec, fires, "graph execution completed");
persist_reactor_state(
&dal_exec, &graph_name_exec, &cache_exec,
&dirty_exec, Some(&seq_queue_exec),
).await;
for sender in &batch_flush {
let _ = sender.try_send(());
}
}
GraphResult::Error(e) => {
tracing::error!("graph execution failed: {}", e);
}
}
}
None => break,
}
}
}
}
}
_ = shutdown_exec.changed() => {
tracing::debug!("reactor executor shutting down");
persist_reactor_state(
&dal_exec, &graph_name_exec, &cache_exec, &dirty_exec,
Some(&seq_queue_exec),
).await;
break;
}
}
}
let _ = receiver_handle.await;
}
}
async fn persist_reactor_state(
dal: &Option<crate::dal::unified::DAL>,
graph_name: &str,
cache: &Arc<RwLock<InputCache>>,
dirty: &Arc<RwLock<DirtyFlags>>,
seq_queue: Option<&SeqQueue>,
) {
let dal = match dal {
Some(d) if !graph_name.is_empty() => d,
_ => return,
};
let cache_snapshot = cache.read().await;
let dirty_snapshot = dirty.read().await;
let cache_bytes = match serde_json::to_vec(&cache_snapshot.entries_raw()) {
Ok(b) => b,
Err(e) => {
tracing::warn!(graph = %graph_name, "cache serialization failed: {}", e);
return;
}
};
let dirty_bytes = match serde_json::to_vec(&dirty_snapshot.flags) {
Ok(b) => b,
Err(e) => {
tracing::warn!(graph = %graph_name, "dirty flags serialization failed: {}", e);
return;
}
};
let seq_bytes = if let Some(q) = seq_queue {
let queue = q.read().await;
if queue.is_empty() {
None
} else {
match serde_json::to_vec(&*queue) {
Ok(b) => Some(b),
Err(e) => {
tracing::warn!(graph = %graph_name, "sequential queue serialization failed: {}", e);
None
}
}
}
} else {
None
};
if let Err(e) = dal
.checkpoint()
.save_reactor_state(graph_name, cache_bytes, dirty_bytes, seq_bytes)
.await
{
tracing::warn!(graph = %graph_name, "reactor state persistence failed: {}", e);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dirty_flags_when_any() {
let mut flags = DirtyFlags::new();
assert!(!flags.any_set());
flags.set(SourceName::new("alpha"), true);
assert!(flags.any_set());
flags.set(SourceName::new("beta"), false);
assert!(flags.any_set()); }
#[test]
fn test_dirty_flags_when_all() {
let mut flags = DirtyFlags::new();
flags.set(SourceName::new("alpha"), true);
flags.set(SourceName::new("beta"), false);
assert!(!flags.all_set());
flags.set(SourceName::new("beta"), true);
assert!(flags.all_set());
}
#[test]
fn test_dirty_flags_clear_all() {
let mut flags = DirtyFlags::new();
flags.set(SourceName::new("alpha"), true);
flags.set(SourceName::new("beta"), true);
assert!(flags.all_set());
flags.clear_all();
assert!(!flags.any_set());
}
#[test]
fn test_dirty_flags_empty_all_set() {
let flags = DirtyFlags::new();
assert!(!flags.all_set());
}
#[tokio::test]
async fn test_reactor_fires_on_boundary() {
let (acc_tx, acc_rx) = mpsc::channel(10);
let (_manual_tx, manual_rx) = mpsc::channel(10);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let fire_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let fire_count_inner = fire_count.clone();
let graph: CompiledGraphFn = Arc::new(move |_cache: InputCache| {
let fc = fire_count_inner.clone();
Box::pin(async move {
fc.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
GraphResult::completed(vec![])
})
});
let reactor = Reactor::new(
graph,
ReactionCriteria::WhenAny,
InputStrategy::Latest,
acc_rx,
manual_rx,
shutdown_rx,
);
let handle = tokio::spawn(reactor.run());
let bytes = super::super::types::serialize(&42u32).unwrap();
acc_tx
.send((SourceName::new("alpha"), bytes))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(fire_count.load(std::sync::atomic::Ordering::SeqCst), 1);
shutdown_tx.send(true).unwrap();
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
}
#[tokio::test]
async fn test_reactor_manual_force_fire() {
let (_acc_tx, acc_rx) = mpsc::channel(10);
let (manual_tx, manual_rx) = mpsc::channel(10);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let fire_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let fire_count_inner = fire_count.clone();
let graph: CompiledGraphFn = Arc::new(move |_cache: InputCache| {
let fc = fire_count_inner.clone();
Box::pin(async move {
fc.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
GraphResult::completed(vec![])
})
});
let reactor = Reactor::new(
graph,
ReactionCriteria::WhenAny,
InputStrategy::Latest,
acc_rx,
manual_rx,
shutdown_rx,
);
let handle = tokio::spawn(reactor.run());
manual_tx.send(ManualCommand::ForceFire).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(fire_count.load(std::sync::atomic::Ordering::SeqCst), 1);
shutdown_tx.send(true).unwrap();
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
}
#[tokio::test]
async fn test_reactor_cache_snapshot_isolation() {
let (acc_tx, acc_rx) = mpsc::channel(10);
let (_manual_tx, manual_rx) = mpsc::channel(10);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let captured_cache = Arc::new(tokio::sync::Mutex::new(None));
let captured_inner = captured_cache.clone();
let graph: CompiledGraphFn = Arc::new(move |cache: InputCache| {
let ci = captured_inner.clone();
Box::pin(async move {
*ci.lock().await = Some(cache);
GraphResult::completed(vec![])
})
});
let reactor = Reactor::new(
graph,
ReactionCriteria::WhenAny,
InputStrategy::Latest,
acc_rx,
manual_rx,
shutdown_rx,
);
let handle = tokio::spawn(reactor.run());
acc_tx
.send((
SourceName::new("alpha"),
super::super::types::serialize(&1u32).unwrap(),
))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let snapshot = captured_cache.lock().await;
assert!(snapshot.is_some());
let cache = snapshot.as_ref().unwrap();
let val: u32 = cache.get("alpha").unwrap().unwrap();
assert_eq!(val, 1);
shutdown_tx.send(true).unwrap();
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
}
}