Skip to main content

ralph_core/
task_store.rs

1//! Persistent task storage with JSONL format.
2//!
3//! TaskStore provides load/save operations for the .ralph/agent/tasks.jsonl file,
4//! with convenience methods for querying and updating tasks.
5//!
6//! # Multi-loop Safety
7//!
8//! When multiple Ralph loops run concurrently (in worktrees), this store uses
9//! file locking to ensure safe concurrent access:
10//!
11//! - **Shared locks** for reading: Multiple loops can read simultaneously
12//! - **Exclusive locks** for writing: Only one loop can write at a time
13//!
14//! Use `load()` and `save()` for simple single-operation access, or use
15//! `with_exclusive_lock()` for read-modify-write operations that need atomicity.
16
17use crate::file_lock::FileLock;
18use crate::task::{Task, TaskStatus};
19use std::io;
20use std::path::Path;
21use tracing::warn;
22
23/// A store for managing tasks with JSONL persistence and file locking.
24pub struct TaskStore {
25    path: std::path::PathBuf,
26    tasks: Vec<Task>,
27    lock: FileLock,
28}
29
30/// Parses a JSONL line into a Task, logging a warning on failure.
31fn parse_task_line(line: &str) -> Option<Task> {
32    match serde_json::from_str(line) {
33        Ok(task) => Some(task),
34        Err(e) => {
35            warn!(
36                error = %e,
37                line = line.chars().take(200).collect::<String>(),
38                "Skipping malformed task line in JSONL"
39            );
40            None
41        }
42    }
43}
44
45impl TaskStore {
46    /// Loads tasks from the JSONL file at the given path.
47    ///
48    /// If the file doesn't exist, returns an empty store.
49    /// Logs warnings for malformed JSON lines and skips them.
50    ///
51    /// Uses a shared lock to allow concurrent reads from multiple loops.
52    pub fn load(path: &Path) -> io::Result<Self> {
53        let lock = FileLock::new(path)?;
54        let _guard = lock.shared()?;
55
56        let tasks = if path.exists() {
57            let content = std::fs::read_to_string(path)?;
58            content
59                .lines()
60                .filter(|line| !line.trim().is_empty())
61                .filter_map(|line| parse_task_line(line))
62                .collect()
63        } else {
64            Vec::new()
65        };
66
67        Ok(Self {
68            path: path.to_path_buf(),
69            tasks,
70            lock,
71        })
72    }
73
74    /// Saves all tasks to the JSONL file.
75    ///
76    /// Creates parent directories if they don't exist.
77    /// Uses an exclusive lock to prevent concurrent writes.
78    pub fn save(&self) -> io::Result<()> {
79        let _guard = self.lock.exclusive()?;
80
81        if let Some(parent) = self.path.parent() {
82            std::fs::create_dir_all(parent)?;
83        }
84        let content: String = self
85            .tasks
86            .iter()
87            .map(|t| {
88                serde_json::to_string(t).map_err(|e| {
89                    io::Error::new(
90                        io::ErrorKind::InvalidData,
91                        format!("task serialization failed: {e}"),
92                    )
93                })
94            })
95            .collect::<Result<Vec<_>, _>>()?
96            .join("\n");
97        std::fs::write(
98            &self.path,
99            if content.is_empty() {
100                String::new()
101            } else {
102                content + "\n"
103            },
104        )
105    }
106
107    /// Reloads tasks from disk, useful after external modifications.
108    ///
109    /// Logs warnings for malformed JSON lines and skips them.
110    /// Uses a shared lock to allow concurrent reads.
111    pub fn reload(&mut self) -> io::Result<()> {
112        let _guard = self.lock.shared()?;
113
114        self.tasks = if self.path.exists() {
115            let content = std::fs::read_to_string(&self.path)?;
116            content
117                .lines()
118                .filter(|line| !line.trim().is_empty())
119                .filter_map(|line| parse_task_line(line))
120                .collect()
121        } else {
122            Vec::new()
123        };
124
125        Ok(())
126    }
127
128    /// Executes a read-modify-write operation atomically.
129    ///
130    /// Acquires an exclusive lock, reloads from disk, executes the
131    /// provided function, and saves back to disk. This ensures that
132    /// concurrent modifications from other loops are not lost.
133    ///
134    /// # Example
135    ///
136    /// ```ignore
137    /// store.with_exclusive_lock(|store| {
138    ///     let task = Task::new("New task".to_string(), 1);
139    ///     store.add(task);
140    /// })?;
141    /// ```
142    pub fn with_exclusive_lock<F, T>(&mut self, f: F) -> io::Result<T>
143    where
144        F: FnOnce(&mut Self) -> T,
145    {
146        let _guard = self.lock.exclusive()?;
147
148        // Reload to get latest changes from other loops
149        self.tasks = if self.path.exists() {
150            let content = std::fs::read_to_string(&self.path)?;
151            content
152                .lines()
153                .filter(|line| !line.trim().is_empty())
154                .filter_map(|line| parse_task_line(line))
155                .collect()
156        } else {
157            Vec::new()
158        };
159
160        // Execute the user function
161        let result = f(self);
162
163        // Save changes
164        if let Some(parent) = self.path.parent() {
165            std::fs::create_dir_all(parent)?;
166        }
167        let content: String = self
168            .tasks
169            .iter()
170            .map(|t| {
171                serde_json::to_string(t).map_err(|e| {
172                    io::Error::new(
173                        io::ErrorKind::InvalidData,
174                        format!("task serialization failed: {e}"),
175                    )
176                })
177            })
178            .collect::<Result<Vec<_>, _>>()?
179            .join("\n");
180        std::fs::write(
181            &self.path,
182            if content.is_empty() {
183                String::new()
184            } else {
185                content + "\n"
186            },
187        )?;
188
189        Ok(result)
190    }
191
192    /// Adds a new task to the store and returns a reference to it.
193    pub fn add(&mut self, task: Task) -> &Task {
194        self.tasks.push(task);
195        self.tasks.last().unwrap()
196    }
197
198    /// Gets a task by ID (immutable reference).
199    pub fn get(&self, id: &str) -> Option<&Task> {
200        self.tasks.iter().find(|t| t.id == id)
201    }
202
203    /// Gets a task by ID (mutable reference).
204    pub fn get_mut(&mut self, id: &str) -> Option<&mut Task> {
205        self.tasks.iter_mut().find(|t| t.id == id)
206    }
207
208    /// Closes a task by ID and returns a reference to it.
209    pub fn close(&mut self, id: &str) -> Option<&Task> {
210        if let Some(task) = self.get_mut(id) {
211            task.status = TaskStatus::Closed;
212            task.closed = Some(chrono::Utc::now().to_rfc3339());
213            return self.get(id);
214        }
215        None
216    }
217
218    /// Fails a task by ID and returns a reference to it.
219    pub fn fail(&mut self, id: &str) -> Option<&Task> {
220        if let Some(task) = self.get_mut(id) {
221            task.status = TaskStatus::Failed;
222            task.closed = Some(chrono::Utc::now().to_rfc3339());
223            return self.get(id);
224        }
225        None
226    }
227
228    /// Returns all tasks as a slice.
229    pub fn all(&self) -> &[Task] {
230        &self.tasks
231    }
232
233    /// Returns all open tasks (not closed).
234    pub fn open(&self) -> Vec<&Task> {
235        self.tasks
236            .iter()
237            .filter(|t| t.status != TaskStatus::Closed)
238            .collect()
239    }
240
241    /// Returns all ready tasks (open with no pending blockers).
242    pub fn ready(&self) -> Vec<&Task> {
243        self.tasks
244            .iter()
245            .filter(|t| t.is_ready(&self.tasks))
246            .collect()
247    }
248
249    /// Returns true if there are any open tasks.
250    ///
251    /// A task is considered open if it is not Closed. This includes Failed tasks.
252    pub fn has_open_tasks(&self) -> bool {
253        self.tasks.iter().any(|t| t.status != TaskStatus::Closed)
254    }
255
256    /// Returns true if there are any pending (non-terminal) tasks.
257    ///
258    /// A task is pending if its status is not terminal (i.e., not Closed or Failed).
259    /// Use this when you need to check if there's active work remaining.
260    pub fn has_pending_tasks(&self) -> bool {
261        self.tasks.iter().any(|t| !t.status.is_terminal())
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use tempfile::TempDir;
269
270    #[test]
271    fn test_load_nonexistent_file() {
272        let tmp = TempDir::new().unwrap();
273        let path = tmp.path().join("tasks.jsonl");
274        let store = TaskStore::load(&path).unwrap();
275        assert_eq!(store.all().len(), 0);
276    }
277
278    #[test]
279    fn test_add_and_save() {
280        let tmp = TempDir::new().unwrap();
281        let path = tmp.path().join("tasks.jsonl");
282
283        let mut store = TaskStore::load(&path).unwrap();
284        let task = Task::new("Test task".to_string(), 1);
285        store.add(task);
286        store.save().unwrap();
287
288        let loaded = TaskStore::load(&path).unwrap();
289        assert_eq!(loaded.all().len(), 1);
290        assert_eq!(loaded.all()[0].title, "Test task");
291    }
292
293    #[test]
294    fn test_get_task() {
295        let tmp = TempDir::new().unwrap();
296        let path = tmp.path().join("tasks.jsonl");
297        let mut store = TaskStore::load(&path).unwrap();
298        let task = Task::new("Test".to_string(), 1);
299        let id = task.id.clone();
300        store.add(task);
301
302        assert!(store.get(&id).is_some());
303        assert_eq!(store.get(&id).unwrap().title, "Test");
304    }
305
306    #[test]
307    fn test_close_task() {
308        let tmp = TempDir::new().unwrap();
309        let path = tmp.path().join("tasks.jsonl");
310        let mut store = TaskStore::load(&path).unwrap();
311        let task = Task::new("Test".to_string(), 1);
312        let id = task.id.clone();
313        store.add(task);
314
315        let closed = store.close(&id).unwrap();
316        assert_eq!(closed.status, TaskStatus::Closed);
317        assert!(closed.closed.is_some());
318    }
319
320    #[test]
321    fn test_open_tasks() {
322        let tmp = TempDir::new().unwrap();
323        let path = tmp.path().join("tasks.jsonl");
324        let mut store = TaskStore::load(&path).unwrap();
325
326        let task1 = Task::new("Open 1".to_string(), 1);
327        store.add(task1);
328
329        let mut task2 = Task::new("Closed".to_string(), 1);
330        task2.status = TaskStatus::Closed;
331        store.add(task2);
332
333        assert_eq!(store.open().len(), 1);
334    }
335
336    #[test]
337    fn test_ready_tasks() {
338        let tmp = TempDir::new().unwrap();
339        let path = tmp.path().join("tasks.jsonl");
340        let mut store = TaskStore::load(&path).unwrap();
341
342        let task1 = Task::new("Ready".to_string(), 1);
343        let id1 = task1.id.clone();
344        store.add(task1);
345
346        let mut task2 = Task::new("Blocked".to_string(), 1);
347        task2.blocked_by.push(id1);
348        store.add(task2);
349
350        let ready = store.ready();
351        assert_eq!(ready.len(), 1);
352        assert_eq!(ready[0].title, "Ready");
353    }
354
355    #[test]
356    fn test_has_open_tasks() {
357        let tmp = TempDir::new().unwrap();
358        let path = tmp.path().join("tasks.jsonl");
359        let mut store = TaskStore::load(&path).unwrap();
360
361        assert!(!store.has_open_tasks());
362
363        let task = Task::new("Test".to_string(), 1);
364        store.add(task);
365
366        assert!(store.has_open_tasks());
367    }
368
369    #[test]
370    fn test_has_pending_tasks_excludes_failed() {
371        let tmp = TempDir::new().unwrap();
372        let path = tmp.path().join("tasks.jsonl");
373        let mut store = TaskStore::load(&path).unwrap();
374
375        // Empty store has no pending tasks
376        assert!(!store.has_pending_tasks());
377
378        // Add an open task - should have pending
379        let task1 = Task::new("Open task".to_string(), 1);
380        store.add(task1);
381        assert!(store.has_pending_tasks());
382
383        // Close the task - should have no pending
384        let id = store.all()[0].id.clone();
385        store.close(&id);
386        assert!(!store.has_pending_tasks());
387    }
388
389    #[test]
390    fn test_has_pending_tasks_failed_is_terminal() {
391        let tmp = TempDir::new().unwrap();
392        let path = tmp.path().join("tasks.jsonl");
393        let mut store = TaskStore::load(&path).unwrap();
394
395        // Add a task and fail it
396        let task = Task::new("Failed task".to_string(), 1);
397        store.add(task);
398        let id = store.all()[0].id.clone();
399        store.fail(&id);
400
401        // Failed tasks are terminal, so no pending tasks
402        assert!(!store.has_pending_tasks());
403
404        // But has_open_tasks returns true (Failed != Closed)
405        assert!(store.has_open_tasks());
406    }
407
408    #[test]
409    fn test_reload() {
410        let tmp = TempDir::new().unwrap();
411        let path = tmp.path().join("tasks.jsonl");
412
413        // Create and save initial store
414        let mut store1 = TaskStore::load(&path).unwrap();
415        store1.add(Task::new("Task 1".to_string(), 1));
416        store1.save().unwrap();
417
418        // Create second store that reads the same file
419        let mut store2 = TaskStore::load(&path).unwrap();
420        store2.add(Task::new("Task 2".to_string(), 1));
421        store2.save().unwrap();
422
423        // Reload first store to see changes
424        store1.reload().unwrap();
425        assert_eq!(store1.all().len(), 2);
426    }
427
428    #[test]
429    fn test_with_exclusive_lock() {
430        let tmp = TempDir::new().unwrap();
431        let path = tmp.path().join("tasks.jsonl");
432
433        let mut store = TaskStore::load(&path).unwrap();
434
435        // Use with_exclusive_lock for atomic operation
436        store
437            .with_exclusive_lock(|s| {
438                s.add(Task::new("Atomic task".to_string(), 1));
439            })
440            .unwrap();
441
442        // Verify the task was saved
443        let loaded = TaskStore::load(&path).unwrap();
444        assert_eq!(loaded.all().len(), 1);
445        assert_eq!(loaded.all()[0].title, "Atomic task");
446    }
447
448    #[test]
449    fn test_concurrent_writes_with_lock() {
450        use std::sync::{Arc, Barrier};
451        use std::thread;
452
453        let tmp = TempDir::new().unwrap();
454        let path = tmp.path().join("tasks.jsonl");
455        let path_clone = path.clone();
456
457        let barrier = Arc::new(Barrier::new(2));
458        let barrier_clone = barrier.clone();
459
460        // Thread 1: Add task 1
461        let handle1 = thread::spawn(move || {
462            let mut store = TaskStore::load(&path).unwrap();
463            barrier.wait();
464
465            store
466                .with_exclusive_lock(|s| {
467                    s.add(Task::new("Task from thread 1".to_string(), 1));
468                })
469                .unwrap();
470        });
471
472        // Thread 2: Add task 2
473        let handle2 = thread::spawn(move || {
474            let mut store = TaskStore::load(&path_clone).unwrap();
475            barrier_clone.wait();
476
477            store
478                .with_exclusive_lock(|s| {
479                    s.add(Task::new("Task from thread 2".to_string(), 1));
480                })
481                .unwrap();
482        });
483
484        handle1.join().unwrap();
485        handle2.join().unwrap();
486
487        // Both tasks should be present
488        let final_store = TaskStore::load(tmp.path().join("tasks.jsonl").as_ref()).unwrap();
489        assert_eq!(final_store.all().len(), 2);
490    }
491
492    #[test]
493    fn test_load_skips_malformed_lines() {
494        let tmp = TempDir::new().unwrap();
495        let path = tmp.path().join("tasks.jsonl");
496
497        // Write a file with one valid task line and some malformed lines
498        let mut store = TaskStore::load(&path).unwrap();
499        let task = Task::new("Valid task".to_string(), 1);
500        store.add(task);
501        store.save().unwrap();
502
503        // Append malformed lines to the file
504        let mut content = std::fs::read_to_string(&path).unwrap();
505        content.push_str("this is not json\n");
506        content.push_str("{\"broken\": true}\n");
507        std::fs::write(&path, content).unwrap();
508
509        // Load should succeed with only the valid task
510        let loaded = TaskStore::load(&path).unwrap();
511        assert_eq!(loaded.all().len(), 1);
512        assert_eq!(loaded.all()[0].title, "Valid task");
513    }
514}