Skip to main content

everruns_local/
schedule_store.rs

1// SQLite-backed SessionScheduleStore, org + session scoped.
2//
3// Metadata round-trip decision (EVE-594 acceptance criterion):
4// The shared core primitive `everruns_core::session_schedule::SessionSchedule`
5// intentionally has NO open metadata bag, and we do not add one — touching the
6// shared data model is out of scope. Instead, the local store carries a JSON
7// `metadata` column in its OWN schema. An embedder that needs to preserve extra
8// fields (name/color/kind/command/model/isolated, etc.) calls the additive
9// `create_schedule_with_metadata` / `get_metadata` methods on this concrete
10// type. The trait-level `SessionScheduleStore` surface is unchanged, so the
11// runtime act path sees the standard schedule store while embedders keep their
12// extensible bag locally.
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use everruns_core::error::{AgentLoopError, Result};
17use everruns_core::session_schedule::SessionSchedule;
18use everruns_core::traits::SessionScheduleStore;
19use everruns_core::typed_id::{PrincipalId, ScheduleId, SessionId};
20use rusqlite::OptionalExtension;
21use serde_json::Value;
22
23use crate::db::SqliteDb;
24use crate::error::LocalError;
25
26/// SQLite-backed schedule store for local embedded hosts.
27#[derive(Clone)]
28pub struct LocalScheduleStore {
29    db: SqliteDb,
30    /// Internal org id this store instance is scoped to.
31    org_id: i64,
32    /// Principal stamped on created schedules.
33    owner_principal_id: PrincipalId,
34}
35
36impl LocalScheduleStore {
37    /// Open (and migrate) a schedule store over `db`, scoped to `org_id`.
38    pub fn new(db: SqliteDb, org_id: i64, owner_principal_id: PrincipalId) -> Result<Self> {
39        Self::ensure_schema(&db)?;
40        Ok(Self::scoped(db, org_id, owner_principal_id))
41    }
42
43    /// Create the `local_schedules` schema if it does not yet exist. Idempotent.
44    /// The schema is org-agnostic (org scoping is a column), so this only needs
45    /// to run once per database file.
46    fn ensure_schema(db: &SqliteDb) -> Result<()> {
47        db.with_conn(|conn| {
48            conn.execute_batch(
49                "CREATE TABLE IF NOT EXISTS local_schedules (
50                    id          TEXT PRIMARY KEY,
51                    org_id      INTEGER NOT NULL,
52                    session_id  TEXT NOT NULL,
53                    enabled     INTEGER NOT NULL,
54                    snapshot    TEXT NOT NULL,
55                    metadata    TEXT NOT NULL DEFAULT '{}'
56                 );
57                 CREATE INDEX IF NOT EXISTS idx_local_schedules_session
58                    ON local_schedules(org_id, session_id);",
59            )
60        })
61        .map_err(AgentLoopError::from)?;
62        Ok(())
63    }
64
65    /// Construct a store scoped to `org_id` without touching the database.
66    /// Callers must have already ensured the schema exists (via [`Self::new`]);
67    /// this keeps the per-(org) factory on the act path cheap and infallible.
68    pub(crate) fn scoped(db: SqliteDb, org_id: i64, owner_principal_id: PrincipalId) -> Self {
69        Self {
70            db,
71            org_id,
72            owner_principal_id,
73        }
74    }
75
76    fn build_schedule(
77        &self,
78        session_id: SessionId,
79        description: String,
80        cron_expression: Option<String>,
81        scheduled_at: Option<DateTime<Utc>>,
82        timezone: String,
83    ) -> SessionSchedule {
84        let now = Utc::now();
85        SessionSchedule {
86            id: ScheduleId::new(),
87            session_id,
88            owner_principal_id: self.owner_principal_id,
89            resolved_owner_user_id: None,
90            owner: None,
91            effective_owner: None,
92            description,
93            cron_expression: cron_expression.clone(),
94            scheduled_at,
95            timezone,
96            enabled: true,
97            schedule_type: SessionSchedule::derive_type(&cron_expression),
98            next_trigger_at: scheduled_at,
99            last_triggered_at: None,
100            trigger_count: 0,
101            created_at: now,
102            updated_at: now,
103        }
104    }
105
106    fn insert(&self, schedule: &SessionSchedule, metadata: &Value) -> Result<()> {
107        let snapshot = serde_json::to_string(schedule)
108            .map_err(|e| AgentLoopError::from(LocalError::from(e)))?;
109        let metadata_json = serde_json::to_string(metadata)
110            .map_err(|e| AgentLoopError::from(LocalError::from(e)))?;
111        let id = schedule.id.to_string();
112        let session = schedule.session_id.to_string();
113        let enabled = schedule.enabled as i64;
114        let org_id = self.org_id;
115        self.db
116            .with_conn(|conn| {
117                conn.execute(
118                    "INSERT INTO local_schedules (id, org_id, session_id, enabled, snapshot, metadata)
119                     VALUES (?1, ?2, ?3, ?4, ?5, ?6)
120                     ON CONFLICT(id) DO UPDATE SET
121                        enabled = excluded.enabled,
122                        snapshot = excluded.snapshot,
123                        metadata = excluded.metadata",
124                    rusqlite::params![id, org_id, session, enabled, snapshot, metadata_json],
125                )
126            })
127            .map_err(AgentLoopError::from)?;
128        Ok(())
129    }
130
131    fn load(&self, schedule_id: ScheduleId) -> Result<Option<SessionSchedule>> {
132        let id = schedule_id.to_string();
133        let org_id = self.org_id;
134        let snapshot: Option<String> = self
135            .db
136            .with_conn(|conn| {
137                conn.query_row(
138                    "SELECT snapshot FROM local_schedules WHERE id = ?1 AND org_id = ?2",
139                    rusqlite::params![id, org_id],
140                    |row| row.get(0),
141                )
142                .optional()
143            })
144            .map_err(AgentLoopError::from)?;
145        match snapshot {
146            Some(json) => Ok(Some(
147                serde_json::from_str(&json)
148                    .map_err(|e| AgentLoopError::from(LocalError::from(e)))?,
149            )),
150            None => Ok(None),
151        }
152    }
153
154    /// Create a schedule and persist caller-supplied extra fields in the local
155    /// `metadata` column. This is the additive seam that satisfies the
156    /// "extensible metadata bag" criterion without changing the core primitive.
157    pub async fn create_schedule_with_metadata(
158        &self,
159        session_id: SessionId,
160        description: String,
161        cron_expression: Option<String>,
162        scheduled_at: Option<DateTime<Utc>>,
163        timezone: String,
164        metadata: Value,
165    ) -> Result<SessionSchedule> {
166        let schedule = self.build_schedule(
167            session_id,
168            description,
169            cron_expression,
170            scheduled_at,
171            timezone,
172        );
173        self.insert(&schedule, &metadata)?;
174        Ok(schedule)
175    }
176
177    /// Read back the metadata bag previously stored for a schedule. Returns
178    /// `None` when the schedule does not exist in this org scope.
179    pub async fn get_metadata(&self, schedule_id: ScheduleId) -> Result<Option<Value>> {
180        let id = schedule_id.to_string();
181        let org_id = self.org_id;
182        let metadata: Option<String> = self
183            .db
184            .with_conn(|conn| {
185                conn.query_row(
186                    "SELECT metadata FROM local_schedules WHERE id = ?1 AND org_id = ?2",
187                    rusqlite::params![id, org_id],
188                    |row| row.get(0),
189                )
190                .optional()
191            })
192            .map_err(AgentLoopError::from)?;
193        match metadata {
194            Some(json) => Ok(Some(
195                serde_json::from_str(&json)
196                    .map_err(|e| AgentLoopError::from(LocalError::from(e)))?,
197            )),
198            None => Ok(None),
199        }
200    }
201}
202
203#[async_trait]
204impl SessionScheduleStore for LocalScheduleStore {
205    async fn create_schedule(
206        &self,
207        session_id: SessionId,
208        description: String,
209        cron_expression: Option<String>,
210        scheduled_at: Option<DateTime<Utc>>,
211        timezone: String,
212    ) -> Result<SessionSchedule> {
213        // Trait path: empty metadata bag.
214        self.create_schedule_with_metadata(
215            session_id,
216            description,
217            cron_expression,
218            scheduled_at,
219            timezone,
220            Value::Object(Default::default()),
221        )
222        .await
223    }
224
225    async fn cancel_schedule(
226        &self,
227        _session_id: SessionId,
228        schedule_id: ScheduleId,
229    ) -> Result<SessionSchedule> {
230        let mut schedule = self
231            .load(schedule_id)?
232            .ok_or_else(|| AgentLoopError::tool("schedule not found".to_string()))?;
233        schedule.enabled = false;
234        schedule.updated_at = Utc::now();
235        // Preserve existing metadata across the snapshot rewrite.
236        let metadata = self
237            .get_metadata(schedule_id)
238            .await?
239            .unwrap_or_else(|| Value::Object(Default::default()));
240        self.insert(&schedule, &metadata)?;
241        Ok(schedule)
242    }
243
244    async fn list_schedules(&self, session_id: SessionId) -> Result<Vec<SessionSchedule>> {
245        let session = session_id.to_string();
246        let org_id = self.org_id;
247        let snapshots: Vec<String> = self
248            .db
249            .with_conn(|conn| {
250                let mut stmt = conn.prepare(
251                    "SELECT snapshot FROM local_schedules
252                     WHERE org_id = ?1 AND session_id = ?2 ORDER BY rowid ASC",
253                )?;
254                stmt.query_map(rusqlite::params![org_id, session], |row| row.get(0))?
255                    .collect::<rusqlite::Result<Vec<String>>>()
256            })
257            .map_err(AgentLoopError::from)?;
258        snapshots
259            .into_iter()
260            .map(|json| {
261                serde_json::from_str(&json).map_err(|e| AgentLoopError::from(LocalError::from(e)))
262            })
263            .collect()
264    }
265
266    async fn count_active_schedules(&self, session_id: SessionId) -> Result<u32> {
267        let session = session_id.to_string();
268        let org_id = self.org_id;
269        let count: i64 = self
270            .db
271            .with_conn(|conn| {
272                conn.query_row(
273                    "SELECT COUNT(*) FROM local_schedules
274                     WHERE org_id = ?1 AND session_id = ?2 AND enabled = 1",
275                    rusqlite::params![org_id, session],
276                    |row| row.get(0),
277                )
278            })
279            .map_err(AgentLoopError::from)?;
280        Ok(count as u32)
281    }
282
283    async fn count_active_org_schedules(&self) -> Result<u32> {
284        let org_id = self.org_id;
285        let count: i64 = self
286            .db
287            .with_conn(|conn| {
288                conn.query_row(
289                    "SELECT COUNT(*) FROM local_schedules
290                     WHERE org_id = ?1 AND enabled = 1",
291                    rusqlite::params![org_id],
292                    |row| row.get(0),
293                )
294            })
295            .map_err(AgentLoopError::from)?;
296        Ok(count as u32)
297    }
298}