use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use rustc_hash::FxHashMap;
use varpulis_core::Value;
use super::and_op::AndState;
use super::kleene::KleeneCapture;
use super::negation::NegationConstraint;
use super::types::{SharedEvent, StackEntry};
use crate::clock::Timestamp;
#[derive(Debug, Clone)]
pub struct Run {
pub current_state: usize,
pub stack: Vec<StackEntry>,
pub captured: FxHashMap<String, SharedEvent>,
pub started_at: Timestamp,
pub deadline: Option<Timestamp>,
pub event_time_started_at: Option<DateTime<Utc>>,
pub event_time_deadline: Option<DateTime<Utc>>,
pub partition_key: Option<Value>,
pub invalidated: bool,
pub pending_negations: Vec<NegationConstraint>,
pub and_state: Option<AndState>,
pub kleene_capture: Option<KleeneCapture>,
}
impl Default for Run {
fn default() -> Self {
Self {
current_state: 0,
stack: Vec::new(),
captured: FxHashMap::default(),
started_at: Timestamp::now(),
deadline: None,
event_time_started_at: None,
event_time_deadline: None,
partition_key: None,
invalidated: false,
pending_negations: Vec::new(),
and_state: None,
kleene_capture: None,
}
}
}
impl Run {
pub fn new(start_state: usize) -> Self {
Self {
current_state: start_state,
stack: Vec::new(),
captured: FxHashMap::default(),
started_at: Timestamp::now(),
deadline: None,
event_time_started_at: None,
event_time_deadline: None,
partition_key: None,
invalidated: false,
pending_negations: Vec::new(),
and_state: None,
kleene_capture: None,
}
}
pub fn new_with_event_time(start_state: usize, event_timestamp: DateTime<Utc>) -> Self {
Self {
current_state: start_state,
stack: Vec::new(),
captured: FxHashMap::default(),
started_at: Timestamp::now(),
deadline: None,
event_time_started_at: Some(event_timestamp),
event_time_deadline: None,
partition_key: None,
invalidated: false,
pending_negations: Vec::new(),
and_state: None,
kleene_capture: None,
}
}
pub fn with_partition(mut self, key: Value) -> Self {
self.partition_key = Some(key);
self
}
pub fn with_deadline(mut self, deadline: Timestamp) -> Self {
self.deadline = Some(deadline);
self
}
pub fn with_event_time_deadline(mut self, timeout: Duration) -> Self {
if let Some(started_at) = self.event_time_started_at {
self.event_time_deadline =
Some(started_at + chrono::Duration::from_std(timeout).unwrap_or_default());
}
self
}
pub fn push(&mut self, event: SharedEvent, alias: Option<String>) {
if let Some(ref a) = alias {
self.captured.insert(a.clone(), Arc::clone(&event));
}
self.stack.push(StackEntry {
event,
alias,
timestamp: Timestamp::now(),
});
}
#[inline]
pub fn push_at(&mut self, event: SharedEvent, alias: Option<String>, ts: Timestamp) {
if let Some(ref a) = alias {
self.captured.insert(a.clone(), Arc::clone(&event));
}
self.stack.push(StackEntry {
event,
alias,
timestamp: ts,
});
}
#[inline]
pub fn push_at_kleene(&mut self, event: SharedEvent, alias: &Option<String>, ts: Timestamp) {
if let Some(ref a) = alias {
if let Some(entry) = self.captured.get_mut(a.as_str()) {
*entry = Arc::clone(&event);
} else {
self.captured.insert(a.clone(), Arc::clone(&event));
}
}
self.stack.push(StackEntry {
event,
alias: alias.clone(),
timestamp: ts,
});
}
pub fn is_timed_out(&self) -> bool {
if let Some(deadline) = self.deadline {
Timestamp::now() > deadline
} else {
false
}
}
pub fn is_timed_out_event_time(&self, watermark: DateTime<Utc>) -> bool {
if let Some(deadline) = self.event_time_deadline {
watermark > deadline
} else {
false
}
}
pub fn branch(&self) -> Self {
self.clone()
}
}