use crate::effects::Effect;
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use super::pending_effect::PendingEffect;
use super::ordering_constraint::{OrderingConstraint, ConstraintType};
#[derive(Debug)]
pub struct EffectOrderingManager {
sequence_counter: AtomicU64,
pending_effects: Arc<RwLock<BTreeMap<u64, PendingEffect>>>,
ordering_constraints: Arc<RwLock<Vec<OrderingConstraint>>>,
}
impl EffectOrderingManager {
pub fn new() -> Self {
Self {
sequence_counter: AtomicU64::new(0),
pending_effects: Arc::new(RwLock::new(BTreeMap::new())),
ordering_constraints: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn next_sequence(&self) -> u64 {
self.sequence_counter.fetch_add(1, Ordering::SeqCst) + 1
}
pub fn compute_dependencies(&self, effect: &Effect, sequence: u64) -> Result<Vec<u64>, String> {
let constraints = self.ordering_constraints.read().unwrap();
let pending = self.pending_effects.read().unwrap();
let mut dependencies = Vec::new();
for constraint in constraints.iter() {
if constraint.effect_type == *effect {
match constraint.constraint_type {
ConstraintType::Serialized => {
for (seq, pending_effect) in pending.iter() {
if *seq < sequence && pending_effect.effect == *effect {
dependencies.push(*seq);
}
}
}
ConstraintType::OrderedParallel => {
for (seq, pending_effect) in pending.iter() {
if *seq < sequence && pending_effect.effect == *effect {
dependencies.push(*seq);
break; }
}
}
ConstraintType::Isolated => {
}
}
}
}
Ok(dependencies)
}
pub fn wait_for_dependencies(&self, dependencies: &[u64], timeout: Duration) -> Result<(), String> {
let start_time = std::time::SystemTime::now();
for &dep_sequence in dependencies {
loop {
{
let pending = self.pending_effects.read().unwrap();
if !pending.contains_key(&dep_sequence) {
break; }
}
if start_time.elapsed().unwrap_or(Duration::from_secs(0)) > timeout {
return Err(format!("Timeout waiting for dependency {dep_sequence}"));
}
std::thread::sleep(Duration::from_millis(1));
}
}
Ok(())
}
pub fn add_pending_effect(&self, effect: PendingEffect) -> Result<(), String> {
let mut pending = self.pending_effects.write().unwrap();
pending.insert(effect.sequence, effect);
Ok(())
}
pub fn complete_effect(&self, sequence: u64) -> Result<(), String> {
let mut pending = self.pending_effects.write().unwrap();
pending.remove(&sequence);
Ok(())
}
pub fn notify_effect_completion(&self, _sequence: u64) {
}
pub fn get_pending_effect(&self, sequence: u64) -> Option<PendingEffect> {
let pending = self.pending_effects.read().unwrap();
pending.get(&sequence).cloned()
}
#[allow(dead_code)] pub fn add_ordering_constraint(&self, constraint: OrderingConstraint) {
let mut constraints = self.ordering_constraints.write().unwrap();
constraints.push(constraint);
constraints.sort_by_key(|c| c.priority);
}
}
impl Default for EffectOrderingManager {
fn default() -> Self {
Self::new()
}
}