use crate::db;
use crate::types::*;
use chrono::{DateTime, Duration, Utc};
use rusqlite::Connection;
use std::path::Path;
use uuid::Uuid;
pub struct Engine {
conn: Connection,
}
impl Engine {
pub fn new(db_path: &Path) -> Result<Self, TMinusError> {
let conn = db::init_db(db_path)?;
Ok(Engine { conn })
}
pub fn in_memory() -> Result<Self, TMinusError> {
let conn = db::init_db(Path::new(":memory:"))?;
Ok(Engine { conn })
}
pub fn schedule_event(
&mut self,
kind: EventKind,
scheduled_at: DateTime<Utc>,
t_minus: Duration,
organizer: AgentId,
attendees: Vec<AgentId>,
quorum: usize,
payload: serde_json::Value,
) -> Result<TMinusEvent, TMinusError> {
let event = TMinusEvent {
id: Uuid::new_v4(),
kind,
scheduled_at,
t_minus,
organizer: organizer.clone(),
attendees: attendees.into_iter().map(|a| (a, ResponseStatus::Pending)).collect(),
quorum,
payload,
};
db::insert_event(&self.conn, &event)?;
Ok(event)
}
pub fn confirm(&mut self, event_id: Uuid, agent_id: &AgentId) -> Result<TMinusEvent, TMinusError> {
let mut events = db::load_events(&self.conn)?;
let event = events.iter_mut().find(|e| e.id == event_id)
.ok_or(TMinusError::EventNotFound(event_id))?;
let attendee = event.attendees.iter_mut().find(|(a, _)| a == agent_id)
.ok_or(TMinusError::NotAttendee(agent_id.clone(), event_id))?;
attendee.1 = ResponseStatus::Confirmed;
db::insert_event(&self.conn, event)?;
Ok(event.clone())
}
pub fn defer(&mut self, event_id: Uuid, agent_id: &AgentId, duration: Duration) -> Result<TMinusEvent, TMinusError> {
let mut events = db::load_events(&self.conn)?;
let event = events.iter_mut().find(|e| e.id == event_id)
.ok_or(TMinusError::EventNotFound(event_id))?;
let attendee = event.attendees.iter_mut().find(|(a, _)| a == agent_id)
.ok_or(TMinusError::NotAttendee(agent_id.clone(), event_id))?;
attendee.1 = ResponseStatus::Deferred(duration);
db::insert_event(&self.conn, event)?;
Ok(event.clone())
}
pub fn mark_missed(&mut self, event_id: Uuid) -> Result<TMinusEvent, TMinusError> {
let mut events = db::load_events(&self.conn)?;
let event = events.iter_mut().find(|e| e.id == event_id)
.ok_or(TMinusError::EventNotFound(event_id))?;
for (_, status) in event.attendees.iter_mut() {
if matches!(status, ResponseStatus::Pending) {
*status = ResponseStatus::Missed;
}
}
db::insert_event(&self.conn, event)?;
Ok(event.clone())
}
pub fn list_events(&self) -> Result<Vec<TMinusEvent>, TMinusError> {
db::load_events(&self.conn)
}
pub fn get_event(&self, id: Uuid) -> Result<Option<TMinusEvent>, TMinusError> {
let events = db::load_events(&self.conn)?;
Ok(events.into_iter().find(|e| e.id == id))
}
pub fn remove_event(&mut self, id: Uuid) -> Result<bool, TMinusError> {
db::delete_event(&self.conn, id)
}
pub fn create_campaign(&mut self, name: String) -> Result<Campaign, TMinusError> {
let campaign = Campaign::new(name);
db::insert_campaign(&self.conn, &campaign)?;
Ok(campaign)
}
pub fn campaign_add_event(&mut self, campaign_id: Uuid, event_id: Uuid) -> Result<Campaign, TMinusError> {
let mut campaign = db::load_campaign(&self.conn, campaign_id)?
.ok_or(TMinusError::CampaignNotFound(campaign_id))?;
if !campaign.events.contains(&event_id) {
campaign.events.push(event_id);
}
db::insert_campaign(&self.conn, &campaign)?;
Ok(campaign)
}
pub fn campaign_link(&mut self, campaign_id: Uuid, from: Uuid, to: Uuid) -> Result<Campaign, TMinusError> {
let mut campaign = db::load_campaign(&self.conn, campaign_id)?
.ok_or(TMinusError::CampaignNotFound(campaign_id))?;
if !campaign.events.contains(&from) {
return Err(TMinusError::InvalidInput(format!("event {from} not in campaign")));
}
if !campaign.events.contains(&to) {
return Err(TMinusError::InvalidInput(format!("event {to} not in campaign")));
}
if !campaign.dependencies.contains(&(from, to)) {
campaign.dependencies.push((from, to));
}
db::insert_campaign(&self.conn, &campaign)?;
if campaign.execution_order().is_err() {
campaign.dependencies.pop();
db::insert_campaign(&self.conn, &campaign)?;
return Err(TMinusError::DependencyCycle(vec![from, to]));
}
Ok(campaign)
}
pub fn campaign_execution_order(&self, campaign_id: Uuid) -> Result<Vec<Uuid>, TMinusError> {
let campaign = db::load_campaign(&self.conn, campaign_id)?
.ok_or(TMinusError::CampaignNotFound(campaign_id))?;
campaign.execution_order().map_err(TMinusError::DependencyCycle)
}
pub fn list_campaigns(&self) -> Result<Vec<Campaign>, TMinusError> {
db::load_campaigns(&self.conn)
}
pub fn get_campaign(&self, id: Uuid) -> Result<Option<Campaign>, TMinusError> {
db::load_campaign(&self.conn, id)
}
pub fn tick(&mut self, now: DateTime<Utc>) -> Result<TickResult, TMinusError> {
let mut events = db::load_events(&self.conn)?;
let mut tick = TickResult {
fired: Vec::new(),
missed: Vec::new(),
};
for event in events.iter_mut() {
if now >= event.fire_time() {
if event.has_quorum() {
tick.fired.push(event.id);
} else {
let any_deferred = event.attendees.iter()
.any(|(_, s)| matches!(s, ResponseStatus::Deferred(_)));
if !any_deferred {
for (_, status) in event.attendees.iter_mut() {
if matches!(status, ResponseStatus::Pending) {
*status = ResponseStatus::Missed;
}
}
tick.missed.push(event.id);
}
}
db::insert_event(&self.conn, event)?;
}
}
for id in &tick.fired {
db::delete_event(&self.conn, *id)?;
}
Ok(tick)
}
pub fn apply_deferral_cascade(
&mut self,
event_id: Uuid,
) -> Result<Option<TMinusEvent>, TMinusError> {
let mut events = db::load_events(&self.conn)?;
let event = events.iter_mut().find(|e| e.id == event_id)
.ok_or(TMinusError::EventNotFound(event_id))?;
let max_deferred = event.attendees.iter()
.filter_map(|(_, s)| match s {
ResponseStatus::Deferred(d) => Some(*d),
_ => None,
})
.max()
.unwrap_or(Duration::zero());
if max_deferred > Duration::zero() {
event.t_minus = event.t_minus + max_deferred;
for (_, status) in event.attendees.iter_mut() {
if matches!(status, ResponseStatus::Deferred(_)) {
*status = ResponseStatus::Pending;
}
}
db::insert_event(&self.conn, event)?;
Ok(Some(event.clone()))
} else {
Ok(None)
}
}
}