tokio_task_scheduler/
persistence.rs

1//! Task persistence and storage
2//! 
3//! This module provides functionality for persisting tasks to a SQLite database,
4//! allowing tasks to survive application restarts and system reboots. It handles:
5//! 
6//! - Task serialization and deserialization
7//! - Database operations (create, read, update, delete)
8//! - Error handling for persistence operations
9//! 
10//! The module uses SQLite as its storage backend for reliability and simplicity.
11
12use std::path::Path;
13use std::sync::Arc;
14use rusqlite::{Connection, params};
15use tokio::sync::Mutex;
16use serde::{Serialize, Deserialize};
17use chrono::{DateTime, Utc};
18
19use crate::task::{Task, TaskStatus};
20use crate::error::SchedulerError;
21
22/// A serializable representation of a task for database storage
23/// 
24/// This struct contains all task data that needs to be persisted,
25/// with appropriate serialization support for database storage.
26/// 
27/// # Fields
28/// 
29/// * `id` - Unique identifier for the task
30/// * `name` - Human-readable name
31/// * `status` - Current execution status
32/// * `created_at` - Creation timestamp
33/// * `last_executed` - Last execution timestamp
34/// * `next_execution` - Next scheduled execution
35/// * `interval_seconds` - Execution interval in seconds
36/// * `daily_time` - Specific time for daily tasks
37#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct PersistableTask {
39    pub id: String,
40    pub name: String,
41    pub status: TaskStatus,
42    pub created_at: DateTime<Utc>,
43    pub last_executed: Option<DateTime<Utc>>,
44    pub next_execution: Option<DateTime<Utc>>,
45    pub interval_seconds: Option<u64>,
46    pub daily_time: Option<String>,
47}
48
49/// Manages task persistence using a SQLite database
50/// 
51/// This struct provides methods for saving and loading tasks from a
52/// SQLite database, ensuring task data survives between application
53/// restarts.
54pub struct TaskPersistenceManager {
55    conn: Arc<Mutex<Connection>>,
56}
57
58impl TaskPersistenceManager {
59    /// Creates a new persistence manager with the specified database
60    /// 
61    /// This method initializes the database connection and creates the
62    /// necessary tables if they don't exist.
63    /// 
64    /// # Arguments
65    /// 
66    /// * `database_path` - Path to the SQLite database file
67    /// 
68    /// # Returns
69    /// 
70    /// * `Ok(TaskPersistenceManager)` - Successfully created manager
71    /// * `Err(SchedulerError)` - If database initialization fails
72    pub async fn new<P: AsRef<Path>>(database_path: P) -> Result<Self, SchedulerError> {
73        let conn = Connection::open(database_path)
74            .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
75        
76        // Create tasks table if not exists
77        conn.execute(
78            r#"
79            CREATE TABLE IF NOT EXISTS tasks (
80                id TEXT PRIMARY KEY,
81                name TEXT NOT NULL,
82                status TEXT NOT NULL,
83                created_at TEXT NOT NULL,
84                last_executed TEXT,
85                next_execution TEXT,
86                interval_seconds INTEGER,
87                daily_time TEXT
88            )
89            "#,
90            [],
91        )
92        .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
93        
94        Ok(Self {
95            conn: Arc::new(Mutex::new(conn)),
96        })
97    }
98
99    /// Saves a task to the database
100    /// 
101    /// This method serializes and stores a task in the database. If a task
102    /// with the same ID already exists, it will be updated.
103    /// 
104    /// # Arguments
105    /// 
106    /// * `task` - The task to save
107    /// 
108    /// # Returns
109    /// 
110    /// * `Ok(())` - Task was saved successfully
111    /// * `Err(SchedulerError)` - If the save operation fails
112    pub async fn save_task(&self, task: &Task) -> Result<(), SchedulerError> {
113        let persistable_task = PersistableTask {
114            id: task.id().to_string(),
115            name: task.name().to_string(),
116            status: task.get_status().await,  
117            created_at: task.created_at,
118            last_executed: task.last_run,
119            next_execution: task.next_run,
120            interval_seconds: task.interval.map(|d| d.num_seconds() as u64),
121            daily_time: None,
122        };
123
124        let conn = self.conn.lock().await;
125        
126        conn.execute(
127            r#"
128            INSERT OR REPLACE INTO tasks 
129            (id, name, status, created_at, last_executed, next_execution, interval_seconds, daily_time)
130            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
131            "#,
132            params![
133                &persistable_task.id,
134                &persistable_task.name,
135                persistable_task.status.to_string(),
136                persistable_task.created_at.to_rfc3339(),
137                persistable_task.last_executed.map(|dt| dt.to_rfc3339()),
138                persistable_task.next_execution.map(|dt| dt.to_rfc3339()),
139                persistable_task.interval_seconds,
140                persistable_task.daily_time,
141            ],
142        )
143        .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
144
145        Ok(())
146    }
147
148    /// Retrieves a task from the database by its ID
149    /// 
150    /// # Arguments
151    /// 
152    /// * `task_id` - The ID of the task to retrieve
153    /// 
154    /// # Returns
155    /// 
156    /// * `Ok(Some(PersistableTask))` - Task found and retrieved
157    /// * `Ok(None)` - No task found with the given ID
158    /// * `Err(SchedulerError)` - If the retrieval operation fails
159    pub async fn get_task(&self, task_id: &str) -> Result<Option<PersistableTask>, SchedulerError> {
160        let conn = self.conn.lock().await;
161        
162        let mut stmt = conn.prepare(
163            r#"
164            SELECT 
165                id, name, status, created_at, last_executed, 
166                next_execution, interval_seconds, daily_time 
167            FROM tasks 
168            WHERE id = ?1
169            "#,
170        )
171        .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
172
173        let task = stmt.query_row(
174            params![task_id],
175            |row| {
176                Ok(PersistableTask {
177                    id: row.get(0)?,
178                    name: row.get(1)?,
179                    status: TaskStatus::from(row.get::<_, String>(2)?),
180                    created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
181                        .map_err(|e| rusqlite::Error::FromSqlConversionFailure(
182                            0,
183                            rusqlite::types::Type::Text,
184                            Box::new(e),
185                        ))?.with_timezone(&Utc),
186                    last_executed: row.get::<_, Option<String>>(4)?
187                        .and_then(|s| DateTime::parse_from_rfc3339(&s)
188                            .map(|dt| dt.with_timezone(&Utc))
189                            .ok()),
190                    next_execution: row.get::<_, Option<String>>(5)?
191                        .and_then(|s| DateTime::parse_from_rfc3339(&s)
192                            .map(|dt| dt.with_timezone(&Utc))
193                            .ok()),
194                    interval_seconds: row.get(6)?,
195                    daily_time: row.get(7)?,
196                })
197            },
198        );
199
200        match task {
201            Ok(task) => Ok(Some(task)),
202            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
203            Err(e) => Err(SchedulerError::PersistenceError(e.to_string())),
204        }
205    }
206
207    /// Lists all tasks stored in the database
208    /// 
209    /// This method retrieves all tasks currently stored in the database.
210    /// 
211    /// # Returns
212    /// 
213    /// * `Ok(Vec<PersistableTask>)` - List of all stored tasks
214    /// * `Err(SchedulerError)` - If the retrieval operation fails
215    pub async fn list_tasks(&self) -> Result<Vec<PersistableTask>, SchedulerError> {
216        let conn = self.conn.lock().await;
217        
218        let mut stmt = conn.prepare(
219            r#"
220            SELECT 
221                id, name, status, created_at, last_executed, 
222                next_execution, interval_seconds, daily_time 
223            FROM tasks
224            "#,
225        )
226        .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
227
228        let tasks = stmt.query_map([], |row| {
229            Ok(PersistableTask {
230                id: row.get(0)?,
231                name: row.get(1)?,
232                status: TaskStatus::from(row.get::<_, String>(2)?),
233                created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
234                    .map_err(|e| rusqlite::Error::FromSqlConversionFailure(
235                        0,
236                        rusqlite::types::Type::Text,
237                        Box::new(e),
238                    ))?.with_timezone(&Utc),
239                last_executed: row.get::<_, Option<String>>(4)?
240                    .and_then(|s| DateTime::parse_from_rfc3339(&s)
241                        .map(|dt| dt.with_timezone(&Utc))
242                        .ok()),
243                next_execution: row.get::<_, Option<String>>(5)?
244                    .and_then(|s| DateTime::parse_from_rfc3339(&s)
245                        .map(|dt| dt.with_timezone(&Utc))
246                        .ok()),
247                interval_seconds: row.get(6)?,
248                daily_time: row.get(7)?,
249            })
250        })
251        .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
252
253        let mut result = Vec::new();
254        for task in tasks {
255            result.push(task.map_err(|e| SchedulerError::PersistenceError(e.to_string()))?);
256        }
257
258        Ok(result)
259    }
260
261    /// Deletes a task from the database
262    /// 
263    /// # Arguments
264    /// 
265    /// * `task_id` - The ID of the task to delete
266    /// 
267    /// # Returns
268    /// 
269    /// * `Ok(())` - Task was deleted or didn't exist
270    /// * `Err(SchedulerError)` - If the deletion operation fails
271    pub async fn delete_task(&self, task_id: &str) -> Result<(), SchedulerError> {
272        let conn = self.conn.lock().await;
273        
274        conn.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])
275            .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
276
277        Ok(())
278    }
279
280    /// Removes all tasks from the database
281    /// 
282    /// This method deletes all stored tasks, effectively resetting
283    /// the persistence store.
284    /// 
285    /// # Returns
286    /// 
287    /// * `Ok(())` - All tasks were cleared successfully
288    /// * `Err(SchedulerError)` - If the clear operation fails
289    pub async fn clear_tasks(&self) -> Result<(), SchedulerError> {
290        let conn = self.conn.lock().await;
291        
292        conn.execute("DELETE FROM tasks", [])
293            .map_err(|e| SchedulerError::PersistenceError(e.to_string()))?;
294
295        Ok(())
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use tempfile::NamedTempFile;
303    use crate::task::TaskBuilder;
304
305    #[tokio::test]
306    async fn test_task_persistence() -> Result<(), Box<dyn std::error::Error>> {
307        // Create a temporary database file
308        let temp_db = NamedTempFile::new()?;
309        let persistence_manager = TaskPersistenceManager::new(temp_db.path()).await?;
310
311        // Create a sample task
312        let task = TaskBuilder::new("test_task", || Ok(()))
313            .every_seconds(10)
314            .build();
315
316        // Save the task
317        persistence_manager.save_task(&task).await?;
318
319        // Retrieve the task
320        let retrieved_task = persistence_manager.get_task(&task.id().to_string()).await?;
321        assert!(retrieved_task.is_some());
322        assert_eq!(retrieved_task.unwrap().name, "test_task");
323
324        // List tasks
325        let tasks = persistence_manager.list_tasks().await?;
326        assert_eq!(tasks.len(), 1);
327
328        // Delete task
329        persistence_manager.delete_task(&task.id().to_string()).await?;
330        let tasks = persistence_manager.list_tasks().await?;
331        assert_eq!(tasks.len(), 0);
332
333        Ok(())
334    }
335}