1use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use everruns_core::error::{AgentLoopError, Result};
17use everruns_core::session_schedule::SessionSchedule;
18use everruns_core::traits::SessionScheduleStore;
19use everruns_core::typed_id::{PrincipalId, ScheduleId, SessionId};
20use rusqlite::OptionalExtension;
21use serde_json::Value;
22
23use crate::db::SqliteDb;
24use crate::error::LocalError;
25
26#[derive(Clone)]
28pub struct LocalScheduleStore {
29 db: SqliteDb,
30 org_id: i64,
32 owner_principal_id: PrincipalId,
34}
35
36impl LocalScheduleStore {
37 pub fn new(db: SqliteDb, org_id: i64, owner_principal_id: PrincipalId) -> Result<Self> {
39 Self::ensure_schema(&db)?;
40 Ok(Self::scoped(db, org_id, owner_principal_id))
41 }
42
43 fn ensure_schema(db: &SqliteDb) -> Result<()> {
47 db.with_conn(|conn| {
48 conn.execute_batch(
49 "CREATE TABLE IF NOT EXISTS local_schedules (
50 id TEXT PRIMARY KEY,
51 org_id INTEGER NOT NULL,
52 session_id TEXT NOT NULL,
53 enabled INTEGER NOT NULL,
54 snapshot TEXT NOT NULL,
55 metadata TEXT NOT NULL DEFAULT '{}'
56 );
57 CREATE INDEX IF NOT EXISTS idx_local_schedules_session
58 ON local_schedules(org_id, session_id);",
59 )
60 })
61 .map_err(AgentLoopError::from)?;
62 Ok(())
63 }
64
65 pub(crate) fn scoped(db: SqliteDb, org_id: i64, owner_principal_id: PrincipalId) -> Self {
69 Self {
70 db,
71 org_id,
72 owner_principal_id,
73 }
74 }
75
76 fn build_schedule(
77 &self,
78 session_id: SessionId,
79 description: String,
80 cron_expression: Option<String>,
81 scheduled_at: Option<DateTime<Utc>>,
82 timezone: String,
83 ) -> SessionSchedule {
84 let now = Utc::now();
85 SessionSchedule {
86 id: ScheduleId::new(),
87 session_id,
88 owner_principal_id: self.owner_principal_id,
89 resolved_owner_user_id: None,
90 owner: None,
91 effective_owner: None,
92 description,
93 cron_expression: cron_expression.clone(),
94 scheduled_at,
95 timezone,
96 enabled: true,
97 schedule_type: SessionSchedule::derive_type(&cron_expression),
98 next_trigger_at: scheduled_at,
99 last_triggered_at: None,
100 trigger_count: 0,
101 created_at: now,
102 updated_at: now,
103 }
104 }
105
106 fn insert(&self, schedule: &SessionSchedule, metadata: &Value) -> Result<()> {
107 let snapshot = serde_json::to_string(schedule)
108 .map_err(|e| AgentLoopError::from(LocalError::from(e)))?;
109 let metadata_json = serde_json::to_string(metadata)
110 .map_err(|e| AgentLoopError::from(LocalError::from(e)))?;
111 let id = schedule.id.to_string();
112 let session = schedule.session_id.to_string();
113 let enabled = schedule.enabled as i64;
114 let org_id = self.org_id;
115 self.db
116 .with_conn(|conn| {
117 conn.execute(
118 "INSERT INTO local_schedules (id, org_id, session_id, enabled, snapshot, metadata)
119 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
120 ON CONFLICT(id) DO UPDATE SET
121 enabled = excluded.enabled,
122 snapshot = excluded.snapshot,
123 metadata = excluded.metadata",
124 rusqlite::params![id, org_id, session, enabled, snapshot, metadata_json],
125 )
126 })
127 .map_err(AgentLoopError::from)?;
128 Ok(())
129 }
130
131 fn load(&self, schedule_id: ScheduleId) -> Result<Option<SessionSchedule>> {
132 let id = schedule_id.to_string();
133 let org_id = self.org_id;
134 let snapshot: Option<String> = self
135 .db
136 .with_conn(|conn| {
137 conn.query_row(
138 "SELECT snapshot FROM local_schedules WHERE id = ?1 AND org_id = ?2",
139 rusqlite::params![id, org_id],
140 |row| row.get(0),
141 )
142 .optional()
143 })
144 .map_err(AgentLoopError::from)?;
145 match snapshot {
146 Some(json) => Ok(Some(
147 serde_json::from_str(&json)
148 .map_err(|e| AgentLoopError::from(LocalError::from(e)))?,
149 )),
150 None => Ok(None),
151 }
152 }
153
154 pub async fn create_schedule_with_metadata(
158 &self,
159 session_id: SessionId,
160 description: String,
161 cron_expression: Option<String>,
162 scheduled_at: Option<DateTime<Utc>>,
163 timezone: String,
164 metadata: Value,
165 ) -> Result<SessionSchedule> {
166 let schedule = self.build_schedule(
167 session_id,
168 description,
169 cron_expression,
170 scheduled_at,
171 timezone,
172 );
173 self.insert(&schedule, &metadata)?;
174 Ok(schedule)
175 }
176
177 pub async fn get_metadata(&self, schedule_id: ScheduleId) -> Result<Option<Value>> {
180 let id = schedule_id.to_string();
181 let org_id = self.org_id;
182 let metadata: Option<String> = self
183 .db
184 .with_conn(|conn| {
185 conn.query_row(
186 "SELECT metadata FROM local_schedules WHERE id = ?1 AND org_id = ?2",
187 rusqlite::params![id, org_id],
188 |row| row.get(0),
189 )
190 .optional()
191 })
192 .map_err(AgentLoopError::from)?;
193 match metadata {
194 Some(json) => Ok(Some(
195 serde_json::from_str(&json)
196 .map_err(|e| AgentLoopError::from(LocalError::from(e)))?,
197 )),
198 None => Ok(None),
199 }
200 }
201}
202
203#[async_trait]
204impl SessionScheduleStore for LocalScheduleStore {
205 async fn create_schedule(
206 &self,
207 session_id: SessionId,
208 description: String,
209 cron_expression: Option<String>,
210 scheduled_at: Option<DateTime<Utc>>,
211 timezone: String,
212 ) -> Result<SessionSchedule> {
213 self.create_schedule_with_metadata(
215 session_id,
216 description,
217 cron_expression,
218 scheduled_at,
219 timezone,
220 Value::Object(Default::default()),
221 )
222 .await
223 }
224
225 async fn cancel_schedule(
226 &self,
227 _session_id: SessionId,
228 schedule_id: ScheduleId,
229 ) -> Result<SessionSchedule> {
230 let mut schedule = self
231 .load(schedule_id)?
232 .ok_or_else(|| AgentLoopError::tool("schedule not found".to_string()))?;
233 schedule.enabled = false;
234 schedule.updated_at = Utc::now();
235 let metadata = self
237 .get_metadata(schedule_id)
238 .await?
239 .unwrap_or_else(|| Value::Object(Default::default()));
240 self.insert(&schedule, &metadata)?;
241 Ok(schedule)
242 }
243
244 async fn list_schedules(&self, session_id: SessionId) -> Result<Vec<SessionSchedule>> {
245 let session = session_id.to_string();
246 let org_id = self.org_id;
247 let snapshots: Vec<String> = self
248 .db
249 .with_conn(|conn| {
250 let mut stmt = conn.prepare(
251 "SELECT snapshot FROM local_schedules
252 WHERE org_id = ?1 AND session_id = ?2 ORDER BY rowid ASC",
253 )?;
254 stmt.query_map(rusqlite::params![org_id, session], |row| row.get(0))?
255 .collect::<rusqlite::Result<Vec<String>>>()
256 })
257 .map_err(AgentLoopError::from)?;
258 snapshots
259 .into_iter()
260 .map(|json| {
261 serde_json::from_str(&json).map_err(|e| AgentLoopError::from(LocalError::from(e)))
262 })
263 .collect()
264 }
265
266 async fn count_active_schedules(&self, session_id: SessionId) -> Result<u32> {
267 let session = session_id.to_string();
268 let org_id = self.org_id;
269 let count: i64 = self
270 .db
271 .with_conn(|conn| {
272 conn.query_row(
273 "SELECT COUNT(*) FROM local_schedules
274 WHERE org_id = ?1 AND session_id = ?2 AND enabled = 1",
275 rusqlite::params![org_id, session],
276 |row| row.get(0),
277 )
278 })
279 .map_err(AgentLoopError::from)?;
280 Ok(count as u32)
281 }
282
283 async fn count_active_org_schedules(&self) -> Result<u32> {
284 let org_id = self.org_id;
285 let count: i64 = self
286 .db
287 .with_conn(|conn| {
288 conn.query_row(
289 "SELECT COUNT(*) FROM local_schedules
290 WHERE org_id = ?1 AND enabled = 1",
291 rusqlite::params![org_id],
292 |row| row.get(0),
293 )
294 })
295 .map_err(AgentLoopError::from)?;
296 Ok(count as u32)
297 }
298}