use async_trait::async_trait;
use chrono::{DateTime, Utc};
use everruns_core::error::{AgentLoopError, Result};
use everruns_core::session_schedule::SessionSchedule;
use everruns_core::traits::SessionScheduleStore;
use everruns_core::typed_id::{PrincipalId, ScheduleId, SessionId};
use rusqlite::OptionalExtension;
use serde_json::Value;
use crate::db::SqliteDb;
use crate::error::LocalError;
#[derive(Clone)]
pub struct LocalScheduleStore {
db: SqliteDb,
org_id: i64,
owner_principal_id: PrincipalId,
}
impl LocalScheduleStore {
pub fn new(db: SqliteDb, org_id: i64, owner_principal_id: PrincipalId) -> Result<Self> {
Self::ensure_schema(&db)?;
Ok(Self::scoped(db, org_id, owner_principal_id))
}
fn ensure_schema(db: &SqliteDb) -> Result<()> {
db.with_conn(|conn| {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS local_schedules (
id TEXT PRIMARY KEY,
org_id INTEGER NOT NULL,
session_id TEXT NOT NULL,
enabled INTEGER NOT NULL,
snapshot TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_local_schedules_session
ON local_schedules(org_id, session_id);",
)
})
.map_err(AgentLoopError::from)?;
Ok(())
}
pub(crate) fn scoped(db: SqliteDb, org_id: i64, owner_principal_id: PrincipalId) -> Self {
Self {
db,
org_id,
owner_principal_id,
}
}
fn build_schedule(
&self,
session_id: SessionId,
description: String,
cron_expression: Option<String>,
scheduled_at: Option<DateTime<Utc>>,
timezone: String,
) -> SessionSchedule {
let now = Utc::now();
SessionSchedule {
id: ScheduleId::new(),
session_id,
owner_principal_id: self.owner_principal_id,
resolved_owner_user_id: None,
owner: None,
effective_owner: None,
description,
cron_expression: cron_expression.clone(),
scheduled_at,
timezone,
enabled: true,
schedule_type: SessionSchedule::derive_type(&cron_expression),
next_trigger_at: scheduled_at,
last_triggered_at: None,
trigger_count: 0,
created_at: now,
updated_at: now,
}
}
fn insert(&self, schedule: &SessionSchedule, metadata: &Value) -> Result<()> {
let snapshot = serde_json::to_string(schedule)
.map_err(|e| AgentLoopError::from(LocalError::from(e)))?;
let metadata_json = serde_json::to_string(metadata)
.map_err(|e| AgentLoopError::from(LocalError::from(e)))?;
let id = schedule.id.to_string();
let session = schedule.session_id.to_string();
let enabled = schedule.enabled as i64;
let org_id = self.org_id;
self.db
.with_conn(|conn| {
conn.execute(
"INSERT INTO local_schedules (id, org_id, session_id, enabled, snapshot, metadata)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT(id) DO UPDATE SET
enabled = excluded.enabled,
snapshot = excluded.snapshot,
metadata = excluded.metadata",
rusqlite::params![id, org_id, session, enabled, snapshot, metadata_json],
)
})
.map_err(AgentLoopError::from)?;
Ok(())
}
fn load(&self, schedule_id: ScheduleId) -> Result<Option<SessionSchedule>> {
let id = schedule_id.to_string();
let org_id = self.org_id;
let snapshot: Option<String> = self
.db
.with_conn(|conn| {
conn.query_row(
"SELECT snapshot FROM local_schedules WHERE id = ?1 AND org_id = ?2",
rusqlite::params![id, org_id],
|row| row.get(0),
)
.optional()
})
.map_err(AgentLoopError::from)?;
match snapshot {
Some(json) => Ok(Some(
serde_json::from_str(&json)
.map_err(|e| AgentLoopError::from(LocalError::from(e)))?,
)),
None => Ok(None),
}
}
pub async fn create_schedule_with_metadata(
&self,
session_id: SessionId,
description: String,
cron_expression: Option<String>,
scheduled_at: Option<DateTime<Utc>>,
timezone: String,
metadata: Value,
) -> Result<SessionSchedule> {
let schedule = self.build_schedule(
session_id,
description,
cron_expression,
scheduled_at,
timezone,
);
self.insert(&schedule, &metadata)?;
Ok(schedule)
}
pub async fn get_metadata(&self, schedule_id: ScheduleId) -> Result<Option<Value>> {
let id = schedule_id.to_string();
let org_id = self.org_id;
let metadata: Option<String> = self
.db
.with_conn(|conn| {
conn.query_row(
"SELECT metadata FROM local_schedules WHERE id = ?1 AND org_id = ?2",
rusqlite::params![id, org_id],
|row| row.get(0),
)
.optional()
})
.map_err(AgentLoopError::from)?;
match metadata {
Some(json) => Ok(Some(
serde_json::from_str(&json)
.map_err(|e| AgentLoopError::from(LocalError::from(e)))?,
)),
None => Ok(None),
}
}
}
#[async_trait]
impl SessionScheduleStore for LocalScheduleStore {
async fn create_schedule(
&self,
session_id: SessionId,
description: String,
cron_expression: Option<String>,
scheduled_at: Option<DateTime<Utc>>,
timezone: String,
) -> Result<SessionSchedule> {
self.create_schedule_with_metadata(
session_id,
description,
cron_expression,
scheduled_at,
timezone,
Value::Object(Default::default()),
)
.await
}
async fn cancel_schedule(
&self,
_session_id: SessionId,
schedule_id: ScheduleId,
) -> Result<SessionSchedule> {
let mut schedule = self
.load(schedule_id)?
.ok_or_else(|| AgentLoopError::tool("schedule not found".to_string()))?;
schedule.enabled = false;
schedule.updated_at = Utc::now();
let metadata = self
.get_metadata(schedule_id)
.await?
.unwrap_or_else(|| Value::Object(Default::default()));
self.insert(&schedule, &metadata)?;
Ok(schedule)
}
async fn list_schedules(&self, session_id: SessionId) -> Result<Vec<SessionSchedule>> {
let session = session_id.to_string();
let org_id = self.org_id;
let snapshots: Vec<String> = self
.db
.with_conn(|conn| {
let mut stmt = conn.prepare(
"SELECT snapshot FROM local_schedules
WHERE org_id = ?1 AND session_id = ?2 ORDER BY rowid ASC",
)?;
stmt.query_map(rusqlite::params![org_id, session], |row| row.get(0))?
.collect::<rusqlite::Result<Vec<String>>>()
})
.map_err(AgentLoopError::from)?;
snapshots
.into_iter()
.map(|json| {
serde_json::from_str(&json).map_err(|e| AgentLoopError::from(LocalError::from(e)))
})
.collect()
}
async fn count_active_schedules(&self, session_id: SessionId) -> Result<u32> {
let session = session_id.to_string();
let org_id = self.org_id;
let count: i64 = self
.db
.with_conn(|conn| {
conn.query_row(
"SELECT COUNT(*) FROM local_schedules
WHERE org_id = ?1 AND session_id = ?2 AND enabled = 1",
rusqlite::params![org_id, session],
|row| row.get(0),
)
})
.map_err(AgentLoopError::from)?;
Ok(count as u32)
}
async fn count_active_org_schedules(&self) -> Result<u32> {
let org_id = self.org_id;
let count: i64 = self
.db
.with_conn(|conn| {
conn.query_row(
"SELECT COUNT(*) FROM local_schedules
WHERE org_id = ?1 AND enabled = 1",
rusqlite::params![org_id],
|row| row.get(0),
)
})
.map_err(AgentLoopError::from)?;
Ok(count as u32)
}
}