use crate::file_lock::FileLock;
use crate::task::{Task, TaskStatus};
use std::io;
use std::path::Path;
use tracing::warn;
pub struct TaskStore {
path: std::path::PathBuf,
tasks: Vec<Task>,
lock: FileLock,
}
fn parse_task_line(line: &str) -> Option<Task> {
match serde_json::from_str(line) {
Ok(task) => Some(task),
Err(e) => {
warn!(
error = %e,
line = line.chars().take(200).collect::<String>(),
"Skipping malformed task line in JSONL"
);
None
}
}
}
impl TaskStore {
pub fn load(path: &Path) -> io::Result<Self> {
let lock = FileLock::new(path)?;
let _guard = lock.shared()?;
let tasks = if path.exists() {
let content = std::fs::read_to_string(path)?;
content
.lines()
.filter(|line| !line.trim().is_empty())
.filter_map(|line| parse_task_line(line))
.collect()
} else {
Vec::new()
};
Ok(Self {
path: path.to_path_buf(),
tasks,
lock,
})
}
pub fn save(&self) -> io::Result<()> {
let _guard = self.lock.exclusive()?;
if let Some(parent) = self.path.parent() {
std::fs::create_dir_all(parent)?;
}
let content: String = self
.tasks
.iter()
.map(|t| {
serde_json::to_string(t).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("task serialization failed: {e}"),
)
})
})
.collect::<Result<Vec<_>, _>>()?
.join("\n");
std::fs::write(
&self.path,
if content.is_empty() {
String::new()
} else {
content + "\n"
},
)
}
pub fn reload(&mut self) -> io::Result<()> {
let _guard = self.lock.shared()?;
self.tasks = if self.path.exists() {
let content = std::fs::read_to_string(&self.path)?;
content
.lines()
.filter(|line| !line.trim().is_empty())
.filter_map(|line| parse_task_line(line))
.collect()
} else {
Vec::new()
};
Ok(())
}
pub fn with_exclusive_lock<F, T>(&mut self, f: F) -> io::Result<T>
where
F: FnOnce(&mut Self) -> T,
{
let _guard = self.lock.exclusive()?;
self.tasks = if self.path.exists() {
let content = std::fs::read_to_string(&self.path)?;
content
.lines()
.filter(|line| !line.trim().is_empty())
.filter_map(|line| parse_task_line(line))
.collect()
} else {
Vec::new()
};
let result = f(self);
if let Some(parent) = self.path.parent() {
std::fs::create_dir_all(parent)?;
}
let content: String = self
.tasks
.iter()
.map(|t| {
serde_json::to_string(t).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("task serialization failed: {e}"),
)
})
})
.collect::<Result<Vec<_>, _>>()?
.join("\n");
std::fs::write(
&self.path,
if content.is_empty() {
String::new()
} else {
content + "\n"
},
)?;
Ok(result)
}
pub fn add(&mut self, task: Task) -> &Task {
self.tasks.push(task);
self.tasks.last().unwrap()
}
pub fn get(&self, id: &str) -> Option<&Task> {
self.tasks.iter().find(|t| t.id == id)
}
pub fn get_by_key(&self, key: &str) -> Option<&Task> {
self.tasks.iter().find(|t| t.key.as_deref() == Some(key))
}
pub fn get_mut(&mut self, id: &str) -> Option<&mut Task> {
self.tasks.iter_mut().find(|t| t.id == id)
}
pub fn get_by_key_mut(&mut self, key: &str) -> Option<&mut Task> {
self.tasks
.iter_mut()
.find(|t| t.key.as_deref() == Some(key))
}
pub fn close(&mut self, id: &str) -> Option<&Task> {
if let Some(task) = self.get_mut(id) {
task.status = TaskStatus::Closed;
task.closed = Some(chrono::Utc::now().to_rfc3339());
return self.get(id);
}
None
}
pub fn start(&mut self, id: &str) -> Option<&Task> {
if let Some(task) = self.get_mut(id) {
task.start();
return self.get(id);
}
None
}
pub fn fail(&mut self, id: &str) -> Option<&Task> {
if let Some(task) = self.get_mut(id) {
task.status = TaskStatus::Failed;
task.closed = Some(chrono::Utc::now().to_rfc3339());
return self.get(id);
}
None
}
pub fn reopen(&mut self, id: &str) -> Option<&Task> {
if let Some(task) = self.get_mut(id) {
task.reopen();
return self.get(id);
}
None
}
pub fn ensure(&mut self, task: Task) -> &Task {
if let Some(key) = task.key.as_deref()
&& let Some(existing_idx) = self
.tasks
.iter()
.position(|existing| existing.key.as_deref() == Some(key))
{
let existing = &mut self.tasks[existing_idx];
existing.title = task.title;
existing.priority = task.priority;
if task.description.is_some() {
existing.description = task.description;
}
if !task.blocked_by.is_empty() {
existing.blocked_by = task.blocked_by;
}
return &self.tasks[existing_idx];
}
self.tasks.push(task);
self.tasks.last().unwrap()
}
pub fn all(&self) -> &[Task] {
&self.tasks
}
pub fn open(&self) -> Vec<&Task> {
self.tasks
.iter()
.filter(|t| !t.status.is_terminal())
.collect()
}
pub fn ready(&self) -> Vec<&Task> {
self.tasks
.iter()
.filter(|t| t.is_ready(&self.tasks))
.collect()
}
pub fn has_open_tasks(&self) -> bool {
self.tasks.iter().any(|t| t.status != TaskStatus::Closed)
}
pub fn has_pending_tasks(&self) -> bool {
self.tasks.iter().any(|t| !t.status.is_terminal())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_load_nonexistent_file() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let store = TaskStore::load(&path).unwrap();
assert_eq!(store.all().len(), 0);
}
#[test]
fn test_add_and_save() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task = Task::new("Test task".to_string(), 1);
store.add(task);
store.save().unwrap();
let loaded = TaskStore::load(&path).unwrap();
assert_eq!(loaded.all().len(), 1);
assert_eq!(loaded.all()[0].title, "Test task");
}
#[test]
fn test_get_task() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task = Task::new("Test".to_string(), 1);
let id = task.id.clone();
store.add(task);
assert!(store.get(&id).is_some());
assert_eq!(store.get(&id).unwrap().title, "Test");
}
#[test]
fn test_get_task_by_key() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task = Task::new("Test".to_string(), 1).with_key(Some("phase:design".to_string()));
store.add(task);
assert!(store.get_by_key("phase:design").is_some());
assert_eq!(store.get_by_key("phase:design").unwrap().title, "Test");
}
#[test]
fn test_close_task() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task = Task::new("Test".to_string(), 1);
let id = task.id.clone();
store.add(task);
let closed = store.close(&id).unwrap();
assert_eq!(closed.status, TaskStatus::Closed);
assert!(closed.closed.is_some());
}
#[test]
fn test_start_task() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task = Task::new("Test".to_string(), 1);
let id = task.id.clone();
store.add(task);
let started = store.start(&id).unwrap();
assert_eq!(started.status, TaskStatus::InProgress);
assert!(started.started.is_some());
}
#[test]
fn test_reopen_task() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task = Task::new("Test".to_string(), 1);
let id = task.id.clone();
store.add(task);
store.close(&id);
let reopened = store.reopen(&id).unwrap();
assert_eq!(reopened.status, TaskStatus::Open);
assert!(reopened.closed.is_none());
}
#[test]
fn test_open_tasks() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task1 = Task::new("Open 1".to_string(), 1);
store.add(task1);
let mut task2 = Task::new("Closed".to_string(), 1);
task2.status = TaskStatus::Closed;
store.add(task2);
assert_eq!(store.open().len(), 1);
}
#[test]
fn test_ready_tasks() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task1 = Task::new("Ready".to_string(), 1);
let id1 = task1.id.clone();
store.add(task1);
let mut task2 = Task::new("Blocked".to_string(), 1);
task2.blocked_by.push(id1);
store.add(task2);
let ready = store.ready();
assert_eq!(ready.len(), 1);
assert_eq!(ready[0].title, "Ready");
}
#[test]
fn test_ensure_deduplicates_by_key() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let first = Task::new("First".to_string(), 1).with_key(Some("impl:task-01".to_string()));
let second = Task::new("Second".to_string(), 3).with_key(Some("impl:task-01".to_string()));
let id = store.ensure(first).id.clone();
let deduped_id = store.ensure(second).id.clone();
let deduped = store
.get_by_key("impl:task-01")
.expect("deduped task should exist");
assert_eq!(store.all().len(), 1);
assert_eq!(deduped_id, id);
assert_eq!(deduped.title, "Second");
assert_eq!(deduped.priority, 3);
}
#[test]
fn test_has_open_tasks() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
assert!(!store.has_open_tasks());
let task = Task::new("Test".to_string(), 1);
store.add(task);
assert!(store.has_open_tasks());
}
#[test]
fn test_has_pending_tasks_excludes_failed() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
assert!(!store.has_pending_tasks());
let task1 = Task::new("Open task".to_string(), 1);
store.add(task1);
assert!(store.has_pending_tasks());
let id = store.all()[0].id.clone();
store.close(&id);
assert!(!store.has_pending_tasks());
}
#[test]
fn test_has_pending_tasks_failed_is_terminal() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task = Task::new("Failed task".to_string(), 1);
store.add(task);
let id = store.all()[0].id.clone();
store.fail(&id);
assert!(!store.has_pending_tasks());
assert!(store.has_open_tasks());
}
#[test]
fn test_reload() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store1 = TaskStore::load(&path).unwrap();
store1.add(Task::new("Task 1".to_string(), 1));
store1.save().unwrap();
let mut store2 = TaskStore::load(&path).unwrap();
store2.add(Task::new("Task 2".to_string(), 1));
store2.save().unwrap();
store1.reload().unwrap();
assert_eq!(store1.all().len(), 2);
}
#[test]
fn test_with_exclusive_lock() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
store
.with_exclusive_lock(|s| {
s.add(Task::new("Atomic task".to_string(), 1));
})
.unwrap();
let loaded = TaskStore::load(&path).unwrap();
assert_eq!(loaded.all().len(), 1);
assert_eq!(loaded.all()[0].title, "Atomic task");
}
#[test]
fn test_concurrent_writes_with_lock() {
use std::sync::{Arc, Barrier};
use std::thread;
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let path_clone = path.clone();
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = barrier.clone();
let handle1 = thread::spawn(move || {
let mut store = TaskStore::load(&path).unwrap();
barrier.wait();
store
.with_exclusive_lock(|s| {
s.add(Task::new("Task from thread 1".to_string(), 1));
})
.unwrap();
});
let handle2 = thread::spawn(move || {
let mut store = TaskStore::load(&path_clone).unwrap();
barrier_clone.wait();
store
.with_exclusive_lock(|s| {
s.add(Task::new("Task from thread 2".to_string(), 1));
})
.unwrap();
});
handle1.join().unwrap();
handle2.join().unwrap();
let final_store = TaskStore::load(tmp.path().join("tasks.jsonl").as_ref()).unwrap();
assert_eq!(final_store.all().len(), 2);
}
#[test]
fn test_load_skips_malformed_lines() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("tasks.jsonl");
let mut store = TaskStore::load(&path).unwrap();
let task = Task::new("Valid task".to_string(), 1);
store.add(task);
store.save().unwrap();
let mut content = std::fs::read_to_string(&path).unwrap();
content.push_str("this is not json\n");
content.push_str("{\"broken\": true}\n");
std::fs::write(&path, content).unwrap();
let loaded = TaskStore::load(&path).unwrap();
assert_eq!(loaded.all().len(), 1);
assert_eq!(loaded.all()[0].title, "Valid task");
}
}