1use std::path::Path;
13use std::sync::Arc;
14use rusqlite::{Connection, params};
15use tokio::sync::Mutex;
16use serde::{Serialize, Deserialize};
17use chrono::{DateTime, Utc};
18
19use crate::task::{Task, TaskStatus};
20use crate::error::SchedulerError;
21
22#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct PersistableTask {
39 pub id: String,
40 pub name: String,
41 pub status: TaskStatus,
42 pub created_at: DateTime<Utc>,
43 pub last_executed: Option<DateTime<Utc>>,
44 pub next_execution: Option<DateTime<Utc>>,
45 pub interval_seconds: Option<u64>,
46 pub daily_time: Option<String>,
47}
48
49pub struct TaskPersistenceManager {
55 conn: Arc<Mutex<Connection>>,
56}
57
58impl TaskPersistenceManager {
59 pub async fn new<P: AsRef<Path>>(database_path: P) -> Result<Self, SchedulerError> {
73 let conn = Connection::open(database_path)
74 .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
75
76 conn.execute(
78 r#"
79 CREATE TABLE IF NOT EXISTS tasks (
80 id TEXT PRIMARY KEY,
81 name TEXT NOT NULL,
82 status TEXT NOT NULL,
83 created_at TEXT NOT NULL,
84 last_executed TEXT,
85 next_execution TEXT,
86 interval_seconds INTEGER,
87 daily_time TEXT
88 )
89 "#,
90 [],
91 )
92 .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
93
94 Ok(Self {
95 conn: Arc::new(Mutex::new(conn)),
96 })
97 }
98
99 pub async fn save_task(&self, task: &Task) -> Result<(), SchedulerError> {
113 let persistable_task = PersistableTask {
114 id: task.id().to_string(),
115 name: task.name().to_string(),
116 status: task.get_status().await,
117 created_at: task.created_at,
118 last_executed: task.last_run,
119 next_execution: task.next_run,
120 interval_seconds: task.interval.map(|d| d.num_seconds() as u64),
121 daily_time: None,
122 };
123
124 let conn = self.conn.lock().await;
125
126 conn.execute(
127 r#"
128 INSERT OR REPLACE INTO tasks
129 (id, name, status, created_at, last_executed, next_execution, interval_seconds, daily_time)
130 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
131 "#,
132 params![
133 &persistable_task.id,
134 &persistable_task.name,
135 persistable_task.status.to_string(),
136 persistable_task.created_at.to_rfc3339(),
137 persistable_task.last_executed.map(|dt| dt.to_rfc3339()),
138 persistable_task.next_execution.map(|dt| dt.to_rfc3339()),
139 persistable_task.interval_seconds,
140 persistable_task.daily_time,
141 ],
142 )
143 .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
144
145 Ok(())
146 }
147
148 pub async fn get_task(&self, task_id: &str) -> Result<Option<PersistableTask>, SchedulerError> {
160 let conn = self.conn.lock().await;
161
162 let mut stmt = conn.prepare(
163 r#"
164 SELECT
165 id, name, status, created_at, last_executed,
166 next_execution, interval_seconds, daily_time
167 FROM tasks
168 WHERE id = ?1
169 "#,
170 )
171 .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
172
173 let task = stmt.query_row(
174 params![task_id],
175 |row| {
176 Ok(PersistableTask {
177 id: row.get(0)?,
178 name: row.get(1)?,
179 status: TaskStatus::from(row.get::<_, String>(2)?),
180 created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
181 .map_err(|e| rusqlite::Error::FromSqlConversionFailure(
182 0,
183 rusqlite::types::Type::Text,
184 Box::new(e),
185 ))?.with_timezone(&Utc),
186 last_executed: row.get::<_, Option<String>>(4)?
187 .and_then(|s| DateTime::parse_from_rfc3339(&s)
188 .map(|dt| dt.with_timezone(&Utc))
189 .ok()),
190 next_execution: row.get::<_, Option<String>>(5)?
191 .and_then(|s| DateTime::parse_from_rfc3339(&s)
192 .map(|dt| dt.with_timezone(&Utc))
193 .ok()),
194 interval_seconds: row.get(6)?,
195 daily_time: row.get(7)?,
196 })
197 },
198 );
199
200 match task {
201 Ok(task) => Ok(Some(task)),
202 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
203 Err(e) => Err(SchedulerError::PersistenceError(e.to_string())),
204 }
205 }
206
207 pub async fn list_tasks(&self) -> Result<Vec<PersistableTask>, SchedulerError> {
216 let conn = self.conn.lock().await;
217
218 let mut stmt = conn.prepare(
219 r#"
220 SELECT
221 id, name, status, created_at, last_executed,
222 next_execution, interval_seconds, daily_time
223 FROM tasks
224 "#,
225 )
226 .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
227
228 let tasks = stmt.query_map([], |row| {
229 Ok(PersistableTask {
230 id: row.get(0)?,
231 name: row.get(1)?,
232 status: TaskStatus::from(row.get::<_, String>(2)?),
233 created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
234 .map_err(|e| rusqlite::Error::FromSqlConversionFailure(
235 0,
236 rusqlite::types::Type::Text,
237 Box::new(e),
238 ))?.with_timezone(&Utc),
239 last_executed: row.get::<_, Option<String>>(4)?
240 .and_then(|s| DateTime::parse_from_rfc3339(&s)
241 .map(|dt| dt.with_timezone(&Utc))
242 .ok()),
243 next_execution: row.get::<_, Option<String>>(5)?
244 .and_then(|s| DateTime::parse_from_rfc3339(&s)
245 .map(|dt| dt.with_timezone(&Utc))
246 .ok()),
247 interval_seconds: row.get(6)?,
248 daily_time: row.get(7)?,
249 })
250 })
251 .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
252
253 let mut result = Vec::new();
254 for task in tasks {
255 result.push(task.map_err(|e| SchedulerError::PersistenceError(e.to_string()))?);
256 }
257
258 Ok(result)
259 }
260
261 pub async fn delete_task(&self, task_id: &str) -> Result<(), SchedulerError> {
272 let conn = self.conn.lock().await;
273
274 conn.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])
275 .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
276
277 Ok(())
278 }
279
280 pub async fn clear_tasks(&self) -> Result<(), SchedulerError> {
290 let conn = self.conn.lock().await;
291
292 conn.execute("DELETE FROM tasks", [])
293 .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
294
295 Ok(())
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use tempfile::NamedTempFile;
303 use crate::task::TaskBuilder;
304
305 #[tokio::test]
306 async fn test_task_persistence() -> Result<(), Box<dyn std::error::Error>> {
307 let temp_db = NamedTempFile::new()?;
309 let persistence_manager = TaskPersistenceManager::new(temp_db.path()).await?;
310
311 let task = TaskBuilder::new("test_task", || Ok(()))
313 .every_seconds(10)
314 .build();
315
316 persistence_manager.save_task(&task).await?;
318
319 let retrieved_task = persistence_manager.get_task(&task.id().to_string()).await?;
321 assert!(retrieved_task.is_some());
322 assert_eq!(retrieved_task.unwrap().name, "test_task");
323
324 let tasks = persistence_manager.list_tasks().await?;
326 assert_eq!(tasks.len(), 1);
327
328 persistence_manager.delete_task(&task.id().to_string()).await?;
330 let tasks = persistence_manager.list_tasks().await?;
331 assert_eq!(tasks.len(), 0);
332
333 Ok(())
334 }
335}