Skip to main content

ralph_core/
task_store.rs

1//! Persistent task storage with JSONL format.
2//!
3//! TaskStore provides load/save operations for the .ralph/agent/tasks.jsonl file,
4//! with convenience methods for querying and updating tasks.
5//!
6//! # Multi-loop Safety
7//!
8//! When multiple Ralph loops run concurrently (in worktrees), this store uses
9//! file locking to ensure safe concurrent access:
10//!
11//! - **Shared locks** for reading: Multiple loops can read simultaneously
12//! - **Exclusive locks** for writing: Only one loop can write at a time
13//!
14//! Use `load()` and `save()` for simple single-operation access, or use
15//! `with_exclusive_lock()` for read-modify-write operations that need atomicity.
16
17use crate::file_lock::FileLock;
18use crate::task::{Task, TaskStatus};
19use std::io;
20use std::path::Path;
21
22/// A store for managing tasks with JSONL persistence and file locking.
23pub struct TaskStore {
24    path: std::path::PathBuf,
25    tasks: Vec<Task>,
26    lock: FileLock,
27}
28
29impl TaskStore {
30    /// Loads tasks from the JSONL file at the given path.
31    ///
32    /// If the file doesn't exist, returns an empty store.
33    /// Silently skips malformed JSON lines.
34    ///
35    /// Uses a shared lock to allow concurrent reads from multiple loops.
36    pub fn load(path: &Path) -> io::Result<Self> {
37        let lock = FileLock::new(path)?;
38        let _guard = lock.shared()?;
39
40        let tasks = if path.exists() {
41            let content = std::fs::read_to_string(path)?;
42            content
43                .lines()
44                .filter(|line| !line.trim().is_empty())
45                .filter_map(|line| serde_json::from_str(line).ok())
46                .collect()
47        } else {
48            Vec::new()
49        };
50
51        Ok(Self {
52            path: path.to_path_buf(),
53            tasks,
54            lock,
55        })
56    }
57
58    /// Saves all tasks to the JSONL file.
59    ///
60    /// Creates parent directories if they don't exist.
61    /// Uses an exclusive lock to prevent concurrent writes.
62    pub fn save(&self) -> io::Result<()> {
63        let _guard = self.lock.exclusive()?;
64
65        if let Some(parent) = self.path.parent() {
66            std::fs::create_dir_all(parent)?;
67        }
68        let content: String = self
69            .tasks
70            .iter()
71            .map(|t| serde_json::to_string(t).unwrap())
72            .collect::<Vec<_>>()
73            .join("\n");
74        std::fs::write(
75            &self.path,
76            if content.is_empty() {
77                String::new()
78            } else {
79                content + "\n"
80            },
81        )
82    }
83
84    /// Reloads tasks from disk, useful after external modifications.
85    ///
86    /// Uses a shared lock to allow concurrent reads.
87    pub fn reload(&mut self) -> io::Result<()> {
88        let _guard = self.lock.shared()?;
89
90        self.tasks = if self.path.exists() {
91            let content = std::fs::read_to_string(&self.path)?;
92            content
93                .lines()
94                .filter(|line| !line.trim().is_empty())
95                .filter_map(|line| serde_json::from_str(line).ok())
96                .collect()
97        } else {
98            Vec::new()
99        };
100
101        Ok(())
102    }
103
104    /// Executes a read-modify-write operation atomically.
105    ///
106    /// Acquires an exclusive lock, reloads from disk, executes the
107    /// provided function, and saves back to disk. This ensures that
108    /// concurrent modifications from other loops are not lost.
109    ///
110    /// # Example
111    ///
112    /// ```ignore
113    /// store.with_exclusive_lock(|store| {
114    ///     let task = Task::new("New task".to_string(), 1);
115    ///     store.add(task);
116    /// })?;
117    /// ```
118    pub fn with_exclusive_lock<F, T>(&mut self, f: F) -> io::Result<T>
119    where
120        F: FnOnce(&mut Self) -> T,
121    {
122        let _guard = self.lock.exclusive()?;
123
124        // Reload to get latest changes from other loops
125        self.tasks = if self.path.exists() {
126            let content = std::fs::read_to_string(&self.path)?;
127            content
128                .lines()
129                .filter(|line| !line.trim().is_empty())
130                .filter_map(|line| serde_json::from_str(line).ok())
131                .collect()
132        } else {
133            Vec::new()
134        };
135
136        // Execute the user function
137        let result = f(self);
138
139        // Save changes
140        if let Some(parent) = self.path.parent() {
141            std::fs::create_dir_all(parent)?;
142        }
143        let content: String = self
144            .tasks
145            .iter()
146            .map(|t| serde_json::to_string(t).unwrap())
147            .collect::<Vec<_>>()
148            .join("\n");
149        std::fs::write(
150            &self.path,
151            if content.is_empty() {
152                String::new()
153            } else {
154                content + "\n"
155            },
156        )?;
157
158        Ok(result)
159    }
160
161    /// Adds a new task to the store and returns a reference to it.
162    pub fn add(&mut self, task: Task) -> &Task {
163        self.tasks.push(task);
164        self.tasks.last().unwrap()
165    }
166
167    /// Gets a task by ID (immutable reference).
168    pub fn get(&self, id: &str) -> Option<&Task> {
169        self.tasks.iter().find(|t| t.id == id)
170    }
171
172    /// Gets a task by ID (mutable reference).
173    pub fn get_mut(&mut self, id: &str) -> Option<&mut Task> {
174        self.tasks.iter_mut().find(|t| t.id == id)
175    }
176
177    /// Closes a task by ID and returns a reference to it.
178    pub fn close(&mut self, id: &str) -> Option<&Task> {
179        if let Some(task) = self.get_mut(id) {
180            task.status = TaskStatus::Closed;
181            task.closed = Some(chrono::Utc::now().to_rfc3339());
182            return self.get(id);
183        }
184        None
185    }
186
187    /// Fails a task by ID and returns a reference to it.
188    pub fn fail(&mut self, id: &str) -> Option<&Task> {
189        if let Some(task) = self.get_mut(id) {
190            task.status = TaskStatus::Failed;
191            task.closed = Some(chrono::Utc::now().to_rfc3339());
192            return self.get(id);
193        }
194        None
195    }
196
197    /// Returns all tasks as a slice.
198    pub fn all(&self) -> &[Task] {
199        &self.tasks
200    }
201
202    /// Returns all open tasks (not closed).
203    pub fn open(&self) -> Vec<&Task> {
204        self.tasks
205            .iter()
206            .filter(|t| t.status != TaskStatus::Closed)
207            .collect()
208    }
209
210    /// Returns all ready tasks (open with no pending blockers).
211    pub fn ready(&self) -> Vec<&Task> {
212        self.tasks
213            .iter()
214            .filter(|t| t.is_ready(&self.tasks))
215            .collect()
216    }
217
218    /// Returns true if there are any open tasks.
219    ///
220    /// A task is considered open if it is not Closed. This includes Failed tasks.
221    pub fn has_open_tasks(&self) -> bool {
222        self.tasks.iter().any(|t| t.status != TaskStatus::Closed)
223    }
224
225    /// Returns true if there are any pending (non-terminal) tasks.
226    ///
227    /// A task is pending if its status is not terminal (i.e., not Closed or Failed).
228    /// Use this when you need to check if there's active work remaining.
229    pub fn has_pending_tasks(&self) -> bool {
230        self.tasks.iter().any(|t| !t.status.is_terminal())
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use tempfile::TempDir;
238
239    #[test]
240    fn test_load_nonexistent_file() {
241        let tmp = TempDir::new().unwrap();
242        let path = tmp.path().join("tasks.jsonl");
243        let store = TaskStore::load(&path).unwrap();
244        assert_eq!(store.all().len(), 0);
245    }
246
247    #[test]
248    fn test_add_and_save() {
249        let tmp = TempDir::new().unwrap();
250        let path = tmp.path().join("tasks.jsonl");
251
252        let mut store = TaskStore::load(&path).unwrap();
253        let task = Task::new("Test task".to_string(), 1);
254        store.add(task);
255        store.save().unwrap();
256
257        let loaded = TaskStore::load(&path).unwrap();
258        assert_eq!(loaded.all().len(), 1);
259        assert_eq!(loaded.all()[0].title, "Test task");
260    }
261
262    #[test]
263    fn test_get_task() {
264        let tmp = TempDir::new().unwrap();
265        let path = tmp.path().join("tasks.jsonl");
266        let mut store = TaskStore::load(&path).unwrap();
267        let task = Task::new("Test".to_string(), 1);
268        let id = task.id.clone();
269        store.add(task);
270
271        assert!(store.get(&id).is_some());
272        assert_eq!(store.get(&id).unwrap().title, "Test");
273    }
274
275    #[test]
276    fn test_close_task() {
277        let tmp = TempDir::new().unwrap();
278        let path = tmp.path().join("tasks.jsonl");
279        let mut store = TaskStore::load(&path).unwrap();
280        let task = Task::new("Test".to_string(), 1);
281        let id = task.id.clone();
282        store.add(task);
283
284        let closed = store.close(&id).unwrap();
285        assert_eq!(closed.status, TaskStatus::Closed);
286        assert!(closed.closed.is_some());
287    }
288
289    #[test]
290    fn test_open_tasks() {
291        let tmp = TempDir::new().unwrap();
292        let path = tmp.path().join("tasks.jsonl");
293        let mut store = TaskStore::load(&path).unwrap();
294
295        let task1 = Task::new("Open 1".to_string(), 1);
296        store.add(task1);
297
298        let mut task2 = Task::new("Closed".to_string(), 1);
299        task2.status = TaskStatus::Closed;
300        store.add(task2);
301
302        assert_eq!(store.open().len(), 1);
303    }
304
305    #[test]
306    fn test_ready_tasks() {
307        let tmp = TempDir::new().unwrap();
308        let path = tmp.path().join("tasks.jsonl");
309        let mut store = TaskStore::load(&path).unwrap();
310
311        let task1 = Task::new("Ready".to_string(), 1);
312        let id1 = task1.id.clone();
313        store.add(task1);
314
315        let mut task2 = Task::new("Blocked".to_string(), 1);
316        task2.blocked_by.push(id1);
317        store.add(task2);
318
319        let ready = store.ready();
320        assert_eq!(ready.len(), 1);
321        assert_eq!(ready[0].title, "Ready");
322    }
323
324    #[test]
325    fn test_has_open_tasks() {
326        let tmp = TempDir::new().unwrap();
327        let path = tmp.path().join("tasks.jsonl");
328        let mut store = TaskStore::load(&path).unwrap();
329
330        assert!(!store.has_open_tasks());
331
332        let task = Task::new("Test".to_string(), 1);
333        store.add(task);
334
335        assert!(store.has_open_tasks());
336    }
337
338    #[test]
339    fn test_has_pending_tasks_excludes_failed() {
340        let tmp = TempDir::new().unwrap();
341        let path = tmp.path().join("tasks.jsonl");
342        let mut store = TaskStore::load(&path).unwrap();
343
344        // Empty store has no pending tasks
345        assert!(!store.has_pending_tasks());
346
347        // Add an open task - should have pending
348        let task1 = Task::new("Open task".to_string(), 1);
349        store.add(task1);
350        assert!(store.has_pending_tasks());
351
352        // Close the task - should have no pending
353        let id = store.all()[0].id.clone();
354        store.close(&id);
355        assert!(!store.has_pending_tasks());
356    }
357
358    #[test]
359    fn test_has_pending_tasks_failed_is_terminal() {
360        let tmp = TempDir::new().unwrap();
361        let path = tmp.path().join("tasks.jsonl");
362        let mut store = TaskStore::load(&path).unwrap();
363
364        // Add a task and fail it
365        let task = Task::new("Failed task".to_string(), 1);
366        store.add(task);
367        let id = store.all()[0].id.clone();
368        store.fail(&id);
369
370        // Failed tasks are terminal, so no pending tasks
371        assert!(!store.has_pending_tasks());
372
373        // But has_open_tasks returns true (Failed != Closed)
374        assert!(store.has_open_tasks());
375    }
376
377    #[test]
378    fn test_reload() {
379        let tmp = TempDir::new().unwrap();
380        let path = tmp.path().join("tasks.jsonl");
381
382        // Create and save initial store
383        let mut store1 = TaskStore::load(&path).unwrap();
384        store1.add(Task::new("Task 1".to_string(), 1));
385        store1.save().unwrap();
386
387        // Create second store that reads the same file
388        let mut store2 = TaskStore::load(&path).unwrap();
389        store2.add(Task::new("Task 2".to_string(), 1));
390        store2.save().unwrap();
391
392        // Reload first store to see changes
393        store1.reload().unwrap();
394        assert_eq!(store1.all().len(), 2);
395    }
396
397    #[test]
398    fn test_with_exclusive_lock() {
399        let tmp = TempDir::new().unwrap();
400        let path = tmp.path().join("tasks.jsonl");
401
402        let mut store = TaskStore::load(&path).unwrap();
403
404        // Use with_exclusive_lock for atomic operation
405        store
406            .with_exclusive_lock(|s| {
407                s.add(Task::new("Atomic task".to_string(), 1));
408            })
409            .unwrap();
410
411        // Verify the task was saved
412        let loaded = TaskStore::load(&path).unwrap();
413        assert_eq!(loaded.all().len(), 1);
414        assert_eq!(loaded.all()[0].title, "Atomic task");
415    }
416
417    #[test]
418    fn test_concurrent_writes_with_lock() {
419        use std::sync::{Arc, Barrier};
420        use std::thread;
421
422        let tmp = TempDir::new().unwrap();
423        let path = tmp.path().join("tasks.jsonl");
424        let path_clone = path.clone();
425
426        let barrier = Arc::new(Barrier::new(2));
427        let barrier_clone = barrier.clone();
428
429        // Thread 1: Add task 1
430        let handle1 = thread::spawn(move || {
431            let mut store = TaskStore::load(&path).unwrap();
432            barrier.wait();
433
434            store
435                .with_exclusive_lock(|s| {
436                    s.add(Task::new("Task from thread 1".to_string(), 1));
437                })
438                .unwrap();
439        });
440
441        // Thread 2: Add task 2
442        let handle2 = thread::spawn(move || {
443            let mut store = TaskStore::load(&path_clone).unwrap();
444            barrier_clone.wait();
445
446            store
447                .with_exclusive_lock(|s| {
448                    s.add(Task::new("Task from thread 2".to_string(), 1));
449                })
450                .unwrap();
451        });
452
453        handle1.join().unwrap();
454        handle2.join().unwrap();
455
456        // Both tasks should be present
457        let final_store = TaskStore::load(tmp.path().join("tasks.jsonl").as_ref()).unwrap();
458        assert_eq!(final_store.all().len(), 2);
459    }
460}