Skip to main content

t_minus/
engine.rs

1use crate::db;
2use crate::types::*;
3use chrono::{DateTime, Duration, Utc};
4use rusqlite::Connection;
5use std::path::Path;
6use uuid::Uuid;
7
8/// The main engine driving T-minus coordination.
9pub struct Engine {
10    conn: Connection,
11}
12
13impl Engine {
14    /// Create a new engine with SQLite storage at the given path.
15    pub fn new(db_path: &Path) -> Result<Self, TMinusError> {
16        let conn = db::init_db(db_path)?;
17        Ok(Engine { conn })
18        }
19
20    /// Create an in-memory engine (for testing).
21    pub fn in_memory() -> Result<Self, TMinusError> {
22        let conn = db::init_db(Path::new(":memory:"))?;
23        Ok(Engine { conn })
24    }
25
26    // ── Events ──────────────────────────────────────────────
27
28    /// Schedule a new event.
29    pub fn schedule_event(
30        &mut self,
31        kind: EventKind,
32        scheduled_at: DateTime<Utc>,
33        t_minus: Duration,
34        organizer: AgentId,
35        attendees: Vec<AgentId>,
36        quorum: usize,
37        payload: serde_json::Value,
38    ) -> Result<TMinusEvent, TMinusError> {
39        let event = TMinusEvent {
40            id: Uuid::new_v4(),
41            kind,
42            scheduled_at,
43            t_minus,
44            organizer: organizer.clone(),
45            attendees: attendees.into_iter().map(|a| (a, ResponseStatus::Pending)).collect(),
46            quorum,
47            payload,
48        };
49        db::insert_event(&self.conn, &event)?;
50        Ok(event)
51    }
52
53    /// Confirm an agent's attendance.
54    pub fn confirm(&mut self, event_id: Uuid, agent_id: &AgentId) -> Result<TMinusEvent, TMinusError> {
55        let mut events = db::load_events(&self.conn)?;
56        let event = events.iter_mut().find(|e| e.id == event_id)
57            .ok_or(TMinusError::EventNotFound(event_id))?;
58
59        let attendee = event.attendees.iter_mut().find(|(a, _)| a == agent_id)
60            .ok_or(TMinusError::NotAttendee(agent_id.clone(), event_id))?;
61
62        attendee.1 = ResponseStatus::Confirmed;
63        db::insert_event(&self.conn, event)?;
64        Ok(event.clone())
65    }
66
67    /// Defer an agent's response with a requested delay.
68    pub fn defer(&mut self, event_id: Uuid, agent_id: &AgentId, duration: Duration) -> Result<TMinusEvent, TMinusError> {
69        let mut events = db::load_events(&self.conn)?;
70        let event = events.iter_mut().find(|e| e.id == event_id)
71            .ok_or(TMinusError::EventNotFound(event_id))?;
72
73        let attendee = event.attendees.iter_mut().find(|(a, _)| a == agent_id)
74            .ok_or(TMinusError::NotAttendee(agent_id.clone(), event_id))?;
75
76        attendee.1 = ResponseStatus::Deferred(duration);
77        db::insert_event(&self.conn, event)?;
78        Ok(event.clone())
79    }
80
81    /// Mark all pending attendees of an event as missed.
82    pub fn mark_missed(&mut self, event_id: Uuid) -> Result<TMinusEvent, TMinusError> {
83        let mut events = db::load_events(&self.conn)?;
84        let event = events.iter_mut().find(|e| e.id == event_id)
85            .ok_or(TMinusError::EventNotFound(event_id))?;
86
87        for (_, status) in event.attendees.iter_mut() {
88            if matches!(status, ResponseStatus::Pending) {
89                *status = ResponseStatus::Missed;
90            }
91        }
92        db::insert_event(&self.conn, event)?;
93        Ok(event.clone())
94    }
95
96    /// Get all events.
97    pub fn list_events(&self) -> Result<Vec<TMinusEvent>, TMinusError> {
98        db::load_events(&self.conn)
99    }
100
101    /// Get a specific event.
102    pub fn get_event(&self, id: Uuid) -> Result<Option<TMinusEvent>, TMinusError> {
103        let events = db::load_events(&self.conn)?;
104        Ok(events.into_iter().find(|e| e.id == id))
105    }
106
107    /// Remove a fired/missed event.
108    pub fn remove_event(&mut self, id: Uuid) -> Result<bool, TMinusError> {
109        db::delete_event(&self.conn, id)
110    }
111
112    // ── Campaigns ───────────────────────────────────────────
113
114    /// Create a new campaign.
115    pub fn create_campaign(&mut self, name: String) -> Result<Campaign, TMinusError> {
116        let campaign = Campaign::new(name);
117        db::insert_campaign(&self.conn, &campaign)?;
118        Ok(campaign)
119    }
120
121    /// Add an event to a campaign.
122    pub fn campaign_add_event(&mut self, campaign_id: Uuid, event_id: Uuid) -> Result<Campaign, TMinusError> {
123        let mut campaign = db::load_campaign(&self.conn, campaign_id)?
124            .ok_or(TMinusError::CampaignNotFound(campaign_id))?;
125
126        if !campaign.events.contains(&event_id) {
127            campaign.events.push(event_id);
128        }
129        db::insert_campaign(&self.conn, &campaign)?;
130        Ok(campaign)
131    }
132
133    /// Add a dependency edge between two events in a campaign.
134    pub fn campaign_link(&mut self, campaign_id: Uuid, from: Uuid, to: Uuid) -> Result<Campaign, TMinusError> {
135        let mut campaign = db::load_campaign(&self.conn, campaign_id)?
136            .ok_or(TMinusError::CampaignNotFound(campaign_id))?;
137
138        if !campaign.events.contains(&from) {
139            return Err(TMinusError::InvalidInput(format!("event {from} not in campaign")));
140        }
141        if !campaign.events.contains(&to) {
142            return Err(TMinusError::InvalidInput(format!("event {to} not in campaign")));
143        }
144
145        if !campaign.dependencies.contains(&(from, to)) {
146            campaign.dependencies.push((from, to));
147        }
148        db::insert_campaign(&self.conn, &campaign)?;
149
150        // Validate no cycle
151        if campaign.execution_order().is_err() {
152            // Revert
153            campaign.dependencies.pop();
154            db::insert_campaign(&self.conn, &campaign)?;
155            return Err(TMinusError::DependencyCycle(vec![from, to]));
156        }
157
158        Ok(campaign)
159    }
160
161    /// Get campaign execution order (topological sort).
162    pub fn campaign_execution_order(&self, campaign_id: Uuid) -> Result<Vec<Uuid>, TMinusError> {
163        let campaign = db::load_campaign(&self.conn, campaign_id)?
164            .ok_or(TMinusError::CampaignNotFound(campaign_id))?;
165        campaign.execution_order().map_err(TMinusError::DependencyCycle)
166    }
167
168    /// List all campaigns.
169    pub fn list_campaigns(&self) -> Result<Vec<Campaign>, TMinusError> {
170        db::load_campaigns(&self.conn)
171    }
172
173    /// Get a specific campaign.
174    pub fn get_campaign(&self, id: Uuid) -> Result<Option<Campaign>, TMinusError> {
175        db::load_campaign(&self.conn, id)
176    }
177
178    // ── Tick ────────────────────────────────────────────────
179
180    /// Process the current moment: fire events that reached quorum, mark missed, handle deferrals.
181    pub fn tick(&mut self, now: DateTime<Utc>) -> Result<TickResult, TMinusError> {
182        let mut events = db::load_events(&self.conn)?;
183        let mut tick = TickResult {
184            fired: Vec::new(),
185            missed: Vec::new(),
186        };
187
188        for event in events.iter_mut() {
189            if now >= event.fire_time() {
190                if event.has_quorum() {
191                    tick.fired.push(event.id);
192                } else {
193                    // Check if any deferred attendees can still make it
194                    let any_deferred = event.attendees.iter()
195                        .any(|(_, s)| matches!(s, ResponseStatus::Deferred(_)));
196
197                    if !any_deferred {
198                        // Mark all pending as missed
199                        for (_, status) in event.attendees.iter_mut() {
200                            if matches!(status, ResponseStatus::Pending) {
201                                *status = ResponseStatus::Missed;
202                            }
203                        }
204                        tick.missed.push(event.id);
205                    }
206                    // If there are deferred attendees, give them more time (don't mark missed yet)
207                }
208                db::insert_event(&self.conn, event)?;
209            }
210        }
211
212        // Remove fired events
213        for id in &tick.fired {
214            db::delete_event(&self.conn, *id)?;
215        }
216
217        Ok(tick)
218    }
219
220    /// Process a deferral cascade: when an agent defers, push the event's t_minus forward.
221    /// Returns the updated event if the max deferral is applied.
222    pub fn apply_deferral_cascade(
223        &mut self,
224        event_id: Uuid,
225    ) -> Result<Option<TMinusEvent>, TMinusError> {
226        let mut events = db::load_events(&self.conn)?;
227        let event = events.iter_mut().find(|e| e.id == event_id)
228            .ok_or(TMinusError::EventNotFound(event_id))?;
229
230        let max_deferred = event.attendees.iter()
231            .filter_map(|(_, s)| match s {
232                ResponseStatus::Deferred(d) => Some(*d),
233                _ => None,
234            })
235            .max()
236            .unwrap_or(Duration::zero());
237
238        if max_deferred > Duration::zero() {
239            // Extend the fire time by the max deferral
240            event.t_minus = event.t_minus + max_deferred;
241            // Reset deferred attendees to pending
242            for (_, status) in event.attendees.iter_mut() {
243                if matches!(status, ResponseStatus::Deferred(_)) {
244                    *status = ResponseStatus::Pending;
245                }
246            }
247            db::insert_event(&self.conn, event)?;
248            Ok(Some(event.clone()))
249        } else {
250            Ok(None)
251        }
252    }
253}