Skip to main content

ralph_core/
task_store.rs

1//! Persistent task storage with JSONL format.
2//!
3//! TaskStore provides load/save operations for the .ralph/agent/tasks.jsonl file,
4//! with convenience methods for querying and updating tasks.
5//!
6//! # Multi-loop Safety
7//!
8//! When multiple Ralph loops run concurrently (in worktrees), this store uses
9//! file locking to ensure safe concurrent access:
10//!
11//! - **Shared locks** for reading: Multiple loops can read simultaneously
12//! - **Exclusive locks** for writing: Only one loop can write at a time
13//!
14//! Use `load()` and `save()` for simple single-operation access, or use
15//! `with_exclusive_lock()` for read-modify-write operations that need atomicity.
16
17use crate::file_lock::FileLock;
18use crate::task::{Task, TaskStatus};
19use std::io;
20use std::path::Path;
21use tracing::warn;
22
23/// A store for managing tasks with JSONL persistence and file locking.
24pub struct TaskStore {
25    path: std::path::PathBuf,
26    tasks: Vec<Task>,
27    lock: FileLock,
28}
29
30/// Parses a JSONL line into a Task, logging a warning on failure.
31fn parse_task_line(line: &str) -> Option<Task> {
32    match serde_json::from_str(line) {
33        Ok(task) => Some(task),
34        Err(e) => {
35            warn!(
36                error = %e,
37                line = line.chars().take(200).collect::<String>(),
38                "Skipping malformed task line in JSONL"
39            );
40            None
41        }
42    }
43}
44
45impl TaskStore {
46    /// Loads tasks from the JSONL file at the given path.
47    ///
48    /// If the file doesn't exist, returns an empty store.
49    /// Logs warnings for malformed JSON lines and skips them.
50    ///
51    /// Uses a shared lock to allow concurrent reads from multiple loops.
52    pub fn load(path: &Path) -> io::Result<Self> {
53        let lock = FileLock::new(path)?;
54        let _guard = lock.shared()?;
55
56        let tasks = if path.exists() {
57            let content = std::fs::read_to_string(path)?;
58            content
59                .lines()
60                .filter(|line| !line.trim().is_empty())
61                .filter_map(|line| parse_task_line(line))
62                .collect()
63        } else {
64            Vec::new()
65        };
66
67        Ok(Self {
68            path: path.to_path_buf(),
69            tasks,
70            lock,
71        })
72    }
73
74    /// Saves all tasks to the JSONL file.
75    ///
76    /// Creates parent directories if they don't exist.
77    /// Uses an exclusive lock to prevent concurrent writes.
78    pub fn save(&self) -> io::Result<()> {
79        let _guard = self.lock.exclusive()?;
80
81        if let Some(parent) = self.path.parent() {
82            std::fs::create_dir_all(parent)?;
83        }
84        let content: String = self
85            .tasks
86            .iter()
87            .map(|t| {
88                serde_json::to_string(t).map_err(|e| {
89                    io::Error::new(
90                        io::ErrorKind::InvalidData,
91                        format!("task serialization failed: {e}"),
92                    )
93                })
94            })
95            .collect::<Result<Vec<_>, _>>()?
96            .join("\n");
97        std::fs::write(
98            &self.path,
99            if content.is_empty() {
100                String::new()
101            } else {
102                content + "\n"
103            },
104        )
105    }
106
107    /// Reloads tasks from disk, useful after external modifications.
108    ///
109    /// Logs warnings for malformed JSON lines and skips them.
110    /// Uses a shared lock to allow concurrent reads.
111    pub fn reload(&mut self) -> io::Result<()> {
112        let _guard = self.lock.shared()?;
113
114        self.tasks = if self.path.exists() {
115            let content = std::fs::read_to_string(&self.path)?;
116            content
117                .lines()
118                .filter(|line| !line.trim().is_empty())
119                .filter_map(|line| parse_task_line(line))
120                .collect()
121        } else {
122            Vec::new()
123        };
124
125        Ok(())
126    }
127
128    /// Executes a read-modify-write operation atomically.
129    ///
130    /// Acquires an exclusive lock, reloads from disk, executes the
131    /// provided function, and saves back to disk. This ensures that
132    /// concurrent modifications from other loops are not lost.
133    ///
134    /// # Example
135    ///
136    /// ```ignore
137    /// store.with_exclusive_lock(|store| {
138    ///     let task = Task::new("New task".to_string(), 1);
139    ///     store.add(task);
140    /// })?;
141    /// ```
142    pub fn with_exclusive_lock<F, T>(&mut self, f: F) -> io::Result<T>
143    where
144        F: FnOnce(&mut Self) -> T,
145    {
146        let _guard = self.lock.exclusive()?;
147
148        // Reload to get latest changes from other loops
149        self.tasks = if self.path.exists() {
150            let content = std::fs::read_to_string(&self.path)?;
151            content
152                .lines()
153                .filter(|line| !line.trim().is_empty())
154                .filter_map(|line| parse_task_line(line))
155                .collect()
156        } else {
157            Vec::new()
158        };
159
160        // Execute the user function
161        let result = f(self);
162
163        // Save changes
164        if let Some(parent) = self.path.parent() {
165            std::fs::create_dir_all(parent)?;
166        }
167        let content: String = self
168            .tasks
169            .iter()
170            .map(|t| {
171                serde_json::to_string(t).map_err(|e| {
172                    io::Error::new(
173                        io::ErrorKind::InvalidData,
174                        format!("task serialization failed: {e}"),
175                    )
176                })
177            })
178            .collect::<Result<Vec<_>, _>>()?
179            .join("\n");
180        std::fs::write(
181            &self.path,
182            if content.is_empty() {
183                String::new()
184            } else {
185                content + "\n"
186            },
187        )?;
188
189        Ok(result)
190    }
191
192    /// Adds a new task to the store and returns a reference to it.
193    pub fn add(&mut self, task: Task) -> &Task {
194        self.tasks.push(task);
195        self.tasks.last().unwrap()
196    }
197
198    /// Gets a task by ID (immutable reference).
199    pub fn get(&self, id: &str) -> Option<&Task> {
200        self.tasks.iter().find(|t| t.id == id)
201    }
202
203    /// Gets a task by stable key (immutable reference).
204    pub fn get_by_key(&self, key: &str) -> Option<&Task> {
205        self.tasks.iter().find(|t| t.key.as_deref() == Some(key))
206    }
207
208    /// Gets a task by ID (mutable reference).
209    pub fn get_mut(&mut self, id: &str) -> Option<&mut Task> {
210        self.tasks.iter_mut().find(|t| t.id == id)
211    }
212
213    /// Gets a task by stable key (mutable reference).
214    pub fn get_by_key_mut(&mut self, key: &str) -> Option<&mut Task> {
215        self.tasks
216            .iter_mut()
217            .find(|t| t.key.as_deref() == Some(key))
218    }
219
220    /// Closes a task by ID and returns a reference to it.
221    pub fn close(&mut self, id: &str) -> Option<&Task> {
222        if let Some(task) = self.get_mut(id) {
223            task.status = TaskStatus::Closed;
224            task.closed = Some(chrono::Utc::now().to_rfc3339());
225            return self.get(id);
226        }
227        None
228    }
229
230    /// Starts a task by ID and returns a reference to it.
231    pub fn start(&mut self, id: &str) -> Option<&Task> {
232        if let Some(task) = self.get_mut(id) {
233            task.start();
234            return self.get(id);
235        }
236        None
237    }
238
239    /// Fails a task by ID and returns a reference to it.
240    pub fn fail(&mut self, id: &str) -> Option<&Task> {
241        if let Some(task) = self.get_mut(id) {
242            task.status = TaskStatus::Failed;
243            task.closed = Some(chrono::Utc::now().to_rfc3339());
244            return self.get(id);
245        }
246        None
247    }
248
249    /// Reopens a task by ID and returns a reference to it.
250    pub fn reopen(&mut self, id: &str) -> Option<&Task> {
251        if let Some(task) = self.get_mut(id) {
252            task.reopen();
253            return self.get(id);
254        }
255        None
256    }
257
258    /// Ensures a task exists for a stable key, returning the existing or created task.
259    ///
260    /// If a task with the same key already exists, its non-lifecycle metadata is refreshed and
261    /// the existing task is returned.
262    pub fn ensure(&mut self, task: Task) -> &Task {
263        if let Some(key) = task.key.as_deref()
264            && let Some(existing_idx) = self
265                .tasks
266                .iter()
267                .position(|existing| existing.key.as_deref() == Some(key))
268        {
269            let existing = &mut self.tasks[existing_idx];
270            existing.title = task.title;
271            existing.priority = task.priority;
272            if task.description.is_some() {
273                existing.description = task.description;
274            }
275            if !task.blocked_by.is_empty() {
276                existing.blocked_by = task.blocked_by;
277            }
278            return &self.tasks[existing_idx];
279        }
280
281        self.tasks.push(task);
282        self.tasks.last().unwrap()
283    }
284
285    /// Returns all tasks as a slice.
286    pub fn all(&self) -> &[Task] {
287        &self.tasks
288    }
289
290    /// Returns all open tasks (not closed).
291    pub fn open(&self) -> Vec<&Task> {
292        self.tasks
293            .iter()
294            .filter(|t| t.status != TaskStatus::Closed)
295            .collect()
296    }
297
298    /// Returns all ready tasks (open with no pending blockers).
299    pub fn ready(&self) -> Vec<&Task> {
300        self.tasks
301            .iter()
302            .filter(|t| t.is_ready(&self.tasks))
303            .collect()
304    }
305
306    /// Returns true if there are any open tasks.
307    ///
308    /// A task is considered open if it is not Closed. This includes Failed tasks.
309    pub fn has_open_tasks(&self) -> bool {
310        self.tasks.iter().any(|t| t.status != TaskStatus::Closed)
311    }
312
313    /// Returns true if there are any pending (non-terminal) tasks.
314    ///
315    /// A task is pending if its status is not terminal (i.e., not Closed or Failed).
316    /// Use this when you need to check if there's active work remaining.
317    pub fn has_pending_tasks(&self) -> bool {
318        self.tasks.iter().any(|t| !t.status.is_terminal())
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use tempfile::TempDir;
326
327    #[test]
328    fn test_load_nonexistent_file() {
329        let tmp = TempDir::new().unwrap();
330        let path = tmp.path().join("tasks.jsonl");
331        let store = TaskStore::load(&path).unwrap();
332        assert_eq!(store.all().len(), 0);
333    }
334
335    #[test]
336    fn test_add_and_save() {
337        let tmp = TempDir::new().unwrap();
338        let path = tmp.path().join("tasks.jsonl");
339
340        let mut store = TaskStore::load(&path).unwrap();
341        let task = Task::new("Test task".to_string(), 1);
342        store.add(task);
343        store.save().unwrap();
344
345        let loaded = TaskStore::load(&path).unwrap();
346        assert_eq!(loaded.all().len(), 1);
347        assert_eq!(loaded.all()[0].title, "Test task");
348    }
349
350    #[test]
351    fn test_get_task() {
352        let tmp = TempDir::new().unwrap();
353        let path = tmp.path().join("tasks.jsonl");
354        let mut store = TaskStore::load(&path).unwrap();
355        let task = Task::new("Test".to_string(), 1);
356        let id = task.id.clone();
357        store.add(task);
358
359        assert!(store.get(&id).is_some());
360        assert_eq!(store.get(&id).unwrap().title, "Test");
361    }
362
363    #[test]
364    fn test_get_task_by_key() {
365        let tmp = TempDir::new().unwrap();
366        let path = tmp.path().join("tasks.jsonl");
367        let mut store = TaskStore::load(&path).unwrap();
368        let task = Task::new("Test".to_string(), 1).with_key(Some("phase:design".to_string()));
369        store.add(task);
370
371        assert!(store.get_by_key("phase:design").is_some());
372        assert_eq!(store.get_by_key("phase:design").unwrap().title, "Test");
373    }
374
375    #[test]
376    fn test_close_task() {
377        let tmp = TempDir::new().unwrap();
378        let path = tmp.path().join("tasks.jsonl");
379        let mut store = TaskStore::load(&path).unwrap();
380        let task = Task::new("Test".to_string(), 1);
381        let id = task.id.clone();
382        store.add(task);
383
384        let closed = store.close(&id).unwrap();
385        assert_eq!(closed.status, TaskStatus::Closed);
386        assert!(closed.closed.is_some());
387    }
388
389    #[test]
390    fn test_start_task() {
391        let tmp = TempDir::new().unwrap();
392        let path = tmp.path().join("tasks.jsonl");
393        let mut store = TaskStore::load(&path).unwrap();
394        let task = Task::new("Test".to_string(), 1);
395        let id = task.id.clone();
396        store.add(task);
397
398        let started = store.start(&id).unwrap();
399        assert_eq!(started.status, TaskStatus::InProgress);
400        assert!(started.started.is_some());
401    }
402
403    #[test]
404    fn test_reopen_task() {
405        let tmp = TempDir::new().unwrap();
406        let path = tmp.path().join("tasks.jsonl");
407        let mut store = TaskStore::load(&path).unwrap();
408        let task = Task::new("Test".to_string(), 1);
409        let id = task.id.clone();
410        store.add(task);
411        store.close(&id);
412
413        let reopened = store.reopen(&id).unwrap();
414        assert_eq!(reopened.status, TaskStatus::Open);
415        assert!(reopened.closed.is_none());
416    }
417
418    #[test]
419    fn test_open_tasks() {
420        let tmp = TempDir::new().unwrap();
421        let path = tmp.path().join("tasks.jsonl");
422        let mut store = TaskStore::load(&path).unwrap();
423
424        let task1 = Task::new("Open 1".to_string(), 1);
425        store.add(task1);
426
427        let mut task2 = Task::new("Closed".to_string(), 1);
428        task2.status = TaskStatus::Closed;
429        store.add(task2);
430
431        assert_eq!(store.open().len(), 1);
432    }
433
434    #[test]
435    fn test_ready_tasks() {
436        let tmp = TempDir::new().unwrap();
437        let path = tmp.path().join("tasks.jsonl");
438        let mut store = TaskStore::load(&path).unwrap();
439
440        let task1 = Task::new("Ready".to_string(), 1);
441        let id1 = task1.id.clone();
442        store.add(task1);
443
444        let mut task2 = Task::new("Blocked".to_string(), 1);
445        task2.blocked_by.push(id1);
446        store.add(task2);
447
448        let ready = store.ready();
449        assert_eq!(ready.len(), 1);
450        assert_eq!(ready[0].title, "Ready");
451    }
452
453    #[test]
454    fn test_ensure_deduplicates_by_key() {
455        let tmp = TempDir::new().unwrap();
456        let path = tmp.path().join("tasks.jsonl");
457        let mut store = TaskStore::load(&path).unwrap();
458
459        let first = Task::new("First".to_string(), 1).with_key(Some("impl:task-01".to_string()));
460        let second = Task::new("Second".to_string(), 3).with_key(Some("impl:task-01".to_string()));
461
462        let id = store.ensure(first).id.clone();
463        let deduped_id = store.ensure(second).id.clone();
464        let deduped = store
465            .get_by_key("impl:task-01")
466            .expect("deduped task should exist");
467
468        assert_eq!(store.all().len(), 1);
469        assert_eq!(deduped_id, id);
470        assert_eq!(deduped.title, "Second");
471        assert_eq!(deduped.priority, 3);
472    }
473
474    #[test]
475    fn test_has_open_tasks() {
476        let tmp = TempDir::new().unwrap();
477        let path = tmp.path().join("tasks.jsonl");
478        let mut store = TaskStore::load(&path).unwrap();
479
480        assert!(!store.has_open_tasks());
481
482        let task = Task::new("Test".to_string(), 1);
483        store.add(task);
484
485        assert!(store.has_open_tasks());
486    }
487
488    #[test]
489    fn test_has_pending_tasks_excludes_failed() {
490        let tmp = TempDir::new().unwrap();
491        let path = tmp.path().join("tasks.jsonl");
492        let mut store = TaskStore::load(&path).unwrap();
493
494        // Empty store has no pending tasks
495        assert!(!store.has_pending_tasks());
496
497        // Add an open task - should have pending
498        let task1 = Task::new("Open task".to_string(), 1);
499        store.add(task1);
500        assert!(store.has_pending_tasks());
501
502        // Close the task - should have no pending
503        let id = store.all()[0].id.clone();
504        store.close(&id);
505        assert!(!store.has_pending_tasks());
506    }
507
508    #[test]
509    fn test_has_pending_tasks_failed_is_terminal() {
510        let tmp = TempDir::new().unwrap();
511        let path = tmp.path().join("tasks.jsonl");
512        let mut store = TaskStore::load(&path).unwrap();
513
514        // Add a task and fail it
515        let task = Task::new("Failed task".to_string(), 1);
516        store.add(task);
517        let id = store.all()[0].id.clone();
518        store.fail(&id);
519
520        // Failed tasks are terminal, so no pending tasks
521        assert!(!store.has_pending_tasks());
522
523        // But has_open_tasks returns true (Failed != Closed)
524        assert!(store.has_open_tasks());
525    }
526
527    #[test]
528    fn test_reload() {
529        let tmp = TempDir::new().unwrap();
530        let path = tmp.path().join("tasks.jsonl");
531
532        // Create and save initial store
533        let mut store1 = TaskStore::load(&path).unwrap();
534        store1.add(Task::new("Task 1".to_string(), 1));
535        store1.save().unwrap();
536
537        // Create second store that reads the same file
538        let mut store2 = TaskStore::load(&path).unwrap();
539        store2.add(Task::new("Task 2".to_string(), 1));
540        store2.save().unwrap();
541
542        // Reload first store to see changes
543        store1.reload().unwrap();
544        assert_eq!(store1.all().len(), 2);
545    }
546
547    #[test]
548    fn test_with_exclusive_lock() {
549        let tmp = TempDir::new().unwrap();
550        let path = tmp.path().join("tasks.jsonl");
551
552        let mut store = TaskStore::load(&path).unwrap();
553
554        // Use with_exclusive_lock for atomic operation
555        store
556            .with_exclusive_lock(|s| {
557                s.add(Task::new("Atomic task".to_string(), 1));
558            })
559            .unwrap();
560
561        // Verify the task was saved
562        let loaded = TaskStore::load(&path).unwrap();
563        assert_eq!(loaded.all().len(), 1);
564        assert_eq!(loaded.all()[0].title, "Atomic task");
565    }
566
567    #[test]
568    fn test_concurrent_writes_with_lock() {
569        use std::sync::{Arc, Barrier};
570        use std::thread;
571
572        let tmp = TempDir::new().unwrap();
573        let path = tmp.path().join("tasks.jsonl");
574        let path_clone = path.clone();
575
576        let barrier = Arc::new(Barrier::new(2));
577        let barrier_clone = barrier.clone();
578
579        // Thread 1: Add task 1
580        let handle1 = thread::spawn(move || {
581            let mut store = TaskStore::load(&path).unwrap();
582            barrier.wait();
583
584            store
585                .with_exclusive_lock(|s| {
586                    s.add(Task::new("Task from thread 1".to_string(), 1));
587                })
588                .unwrap();
589        });
590
591        // Thread 2: Add task 2
592        let handle2 = thread::spawn(move || {
593            let mut store = TaskStore::load(&path_clone).unwrap();
594            barrier_clone.wait();
595
596            store
597                .with_exclusive_lock(|s| {
598                    s.add(Task::new("Task from thread 2".to_string(), 1));
599                })
600                .unwrap();
601        });
602
603        handle1.join().unwrap();
604        handle2.join().unwrap();
605
606        // Both tasks should be present
607        let final_store = TaskStore::load(tmp.path().join("tasks.jsonl").as_ref()).unwrap();
608        assert_eq!(final_store.all().len(), 2);
609    }
610
611    #[test]
612    fn test_load_skips_malformed_lines() {
613        let tmp = TempDir::new().unwrap();
614        let path = tmp.path().join("tasks.jsonl");
615
616        // Write a file with one valid task line and some malformed lines
617        let mut store = TaskStore::load(&path).unwrap();
618        let task = Task::new("Valid task".to_string(), 1);
619        store.add(task);
620        store.save().unwrap();
621
622        // Append malformed lines to the file
623        let mut content = std::fs::read_to_string(&path).unwrap();
624        content.push_str("this is not json\n");
625        content.push_str("{\"broken\": true}\n");
626        std::fs::write(&path, content).unwrap();
627
628        // Load should succeed with only the valid task
629        let loaded = TaskStore::load(&path).unwrap();
630        assert_eq!(loaded.all().len(), 1);
631        assert_eq!(loaded.all()[0].title, "Valid task");
632    }
633}