1use crate::db;
2use crate::types::*;
3use chrono::{DateTime, Duration, Utc};
4use rusqlite::Connection;
5use std::path::Path;
6use uuid::Uuid;
7
8pub struct Engine {
10 conn: Connection,
11}
12
13impl Engine {
14 pub fn new(db_path: &Path) -> Result<Self, TMinusError> {
16 let conn = db::init_db(db_path)?;
17 Ok(Engine { conn })
18 }
19
20 pub fn in_memory() -> Result<Self, TMinusError> {
22 let conn = db::init_db(Path::new(":memory:"))?;
23 Ok(Engine { conn })
24 }
25
26 pub fn schedule_event(
30 &mut self,
31 kind: EventKind,
32 scheduled_at: DateTime<Utc>,
33 t_minus: Duration,
34 organizer: AgentId,
35 attendees: Vec<AgentId>,
36 quorum: usize,
37 payload: serde_json::Value,
38 ) -> Result<TMinusEvent, TMinusError> {
39 let event = TMinusEvent {
40 id: Uuid::new_v4(),
41 kind,
42 scheduled_at,
43 t_minus,
44 organizer: organizer.clone(),
45 attendees: attendees.into_iter().map(|a| (a, ResponseStatus::Pending)).collect(),
46 quorum,
47 payload,
48 };
49 db::insert_event(&self.conn, &event)?;
50 Ok(event)
51 }
52
53 pub fn confirm(&mut self, event_id: Uuid, agent_id: &AgentId) -> Result<TMinusEvent, TMinusError> {
55 let mut events = db::load_events(&self.conn)?;
56 let event = events.iter_mut().find(|e| e.id == event_id)
57 .ok_or(TMinusError::EventNotFound(event_id))?;
58
59 let attendee = event.attendees.iter_mut().find(|(a, _)| a == agent_id)
60 .ok_or(TMinusError::NotAttendee(agent_id.clone(), event_id))?;
61
62 attendee.1 = ResponseStatus::Confirmed;
63 db::insert_event(&self.conn, event)?;
64 Ok(event.clone())
65 }
66
67 pub fn defer(&mut self, event_id: Uuid, agent_id: &AgentId, duration: Duration) -> Result<TMinusEvent, TMinusError> {
69 let mut events = db::load_events(&self.conn)?;
70 let event = events.iter_mut().find(|e| e.id == event_id)
71 .ok_or(TMinusError::EventNotFound(event_id))?;
72
73 let attendee = event.attendees.iter_mut().find(|(a, _)| a == agent_id)
74 .ok_or(TMinusError::NotAttendee(agent_id.clone(), event_id))?;
75
76 attendee.1 = ResponseStatus::Deferred(duration);
77 db::insert_event(&self.conn, event)?;
78 Ok(event.clone())
79 }
80
81 pub fn mark_missed(&mut self, event_id: Uuid) -> Result<TMinusEvent, TMinusError> {
83 let mut events = db::load_events(&self.conn)?;
84 let event = events.iter_mut().find(|e| e.id == event_id)
85 .ok_or(TMinusError::EventNotFound(event_id))?;
86
87 for (_, status) in event.attendees.iter_mut() {
88 if matches!(status, ResponseStatus::Pending) {
89 *status = ResponseStatus::Missed;
90 }
91 }
92 db::insert_event(&self.conn, event)?;
93 Ok(event.clone())
94 }
95
96 pub fn list_events(&self) -> Result<Vec<TMinusEvent>, TMinusError> {
98 db::load_events(&self.conn)
99 }
100
101 pub fn get_event(&self, id: Uuid) -> Result<Option<TMinusEvent>, TMinusError> {
103 let events = db::load_events(&self.conn)?;
104 Ok(events.into_iter().find(|e| e.id == id))
105 }
106
107 pub fn remove_event(&mut self, id: Uuid) -> Result<bool, TMinusError> {
109 db::delete_event(&self.conn, id)
110 }
111
112 pub fn create_campaign(&mut self, name: String) -> Result<Campaign, TMinusError> {
116 let campaign = Campaign::new(name);
117 db::insert_campaign(&self.conn, &campaign)?;
118 Ok(campaign)
119 }
120
121 pub fn campaign_add_event(&mut self, campaign_id: Uuid, event_id: Uuid) -> Result<Campaign, TMinusError> {
123 let mut campaign = db::load_campaign(&self.conn, campaign_id)?
124 .ok_or(TMinusError::CampaignNotFound(campaign_id))?;
125
126 if !campaign.events.contains(&event_id) {
127 campaign.events.push(event_id);
128 }
129 db::insert_campaign(&self.conn, &campaign)?;
130 Ok(campaign)
131 }
132
133 pub fn campaign_link(&mut self, campaign_id: Uuid, from: Uuid, to: Uuid) -> Result<Campaign, TMinusError> {
135 let mut campaign = db::load_campaign(&self.conn, campaign_id)?
136 .ok_or(TMinusError::CampaignNotFound(campaign_id))?;
137
138 if !campaign.events.contains(&from) {
139 return Err(TMinusError::InvalidInput(format!("event {from} not in campaign")));
140 }
141 if !campaign.events.contains(&to) {
142 return Err(TMinusError::InvalidInput(format!("event {to} not in campaign")));
143 }
144
145 if !campaign.dependencies.contains(&(from, to)) {
146 campaign.dependencies.push((from, to));
147 }
148 db::insert_campaign(&self.conn, &campaign)?;
149
150 if campaign.execution_order().is_err() {
152 campaign.dependencies.pop();
154 db::insert_campaign(&self.conn, &campaign)?;
155 return Err(TMinusError::DependencyCycle(vec![from, to]));
156 }
157
158 Ok(campaign)
159 }
160
161 pub fn campaign_execution_order(&self, campaign_id: Uuid) -> Result<Vec<Uuid>, TMinusError> {
163 let campaign = db::load_campaign(&self.conn, campaign_id)?
164 .ok_or(TMinusError::CampaignNotFound(campaign_id))?;
165 campaign.execution_order().map_err(TMinusError::DependencyCycle)
166 }
167
168 pub fn list_campaigns(&self) -> Result<Vec<Campaign>, TMinusError> {
170 db::load_campaigns(&self.conn)
171 }
172
173 pub fn get_campaign(&self, id: Uuid) -> Result<Option<Campaign>, TMinusError> {
175 db::load_campaign(&self.conn, id)
176 }
177
178 pub fn tick(&mut self, now: DateTime<Utc>) -> Result<TickResult, TMinusError> {
182 let mut events = db::load_events(&self.conn)?;
183 let mut tick = TickResult {
184 fired: Vec::new(),
185 missed: Vec::new(),
186 };
187
188 for event in events.iter_mut() {
189 if now >= event.fire_time() {
190 if event.has_quorum() {
191 tick.fired.push(event.id);
192 } else {
193 let any_deferred = event.attendees.iter()
195 .any(|(_, s)| matches!(s, ResponseStatus::Deferred(_)));
196
197 if !any_deferred {
198 for (_, status) in event.attendees.iter_mut() {
200 if matches!(status, ResponseStatus::Pending) {
201 *status = ResponseStatus::Missed;
202 }
203 }
204 tick.missed.push(event.id);
205 }
206 }
208 db::insert_event(&self.conn, event)?;
209 }
210 }
211
212 for id in &tick.fired {
214 db::delete_event(&self.conn, *id)?;
215 }
216
217 Ok(tick)
218 }
219
220 pub fn apply_deferral_cascade(
223 &mut self,
224 event_id: Uuid,
225 ) -> Result<Option<TMinusEvent>, TMinusError> {
226 let mut events = db::load_events(&self.conn)?;
227 let event = events.iter_mut().find(|e| e.id == event_id)
228 .ok_or(TMinusError::EventNotFound(event_id))?;
229
230 let max_deferred = event.attendees.iter()
231 .filter_map(|(_, s)| match s {
232 ResponseStatus::Deferred(d) => Some(*d),
233 _ => None,
234 })
235 .max()
236 .unwrap_or(Duration::zero());
237
238 if max_deferred > Duration::zero() {
239 event.t_minus = event.t_minus + max_deferred;
241 for (_, status) in event.attendees.iter_mut() {
243 if matches!(status, ResponseStatus::Deferred(_)) {
244 *status = ResponseStatus::Pending;
245 }
246 }
247 db::insert_event(&self.conn, event)?;
248 Ok(Some(event.clone()))
249 } else {
250 Ok(None)
251 }
252 }
253}