1use super::store::TaskStore;
10use super::types::{
11 NewTask, Task, TaskError, TaskId, TaskStatus, TaskStoreData, TaskStoreMeta, TaskUpdate,
12};
13use async_trait::async_trait;
14use std::path::PathBuf;
15use tokio::fs;
16use tokio::io::AsyncWriteExt;
17use tokio::sync::Mutex;
18
19const MAX_TASKS: usize = 10_000;
20const MAX_COMPLETED_TASKS: usize = 5_000;
21
22pub struct FileTaskStore {
49 path: PathBuf,
50 lock: Mutex<()>,
52}
53
54impl FileTaskStore {
55 pub fn new(path: PathBuf) -> Self {
57 Self {
58 path,
59 lock: Mutex::new(()),
60 }
61 }
62
63 pub fn in_project(project_root: &std::path::Path) -> Self {
65 Self::new(project_root.join(".rkat").join("tasks.json"))
66 }
67
68 pub fn path(&self) -> &PathBuf {
70 &self.path
71 }
72
73 async fn load(&self) -> Result<TaskStoreData, TaskError> {
75 let exists = fs::try_exists(&self.path)
76 .await
77 .map_err(|e| TaskError::StorageError(format!("Failed to check file: {e}")))?;
78 if !exists {
79 return Ok(TaskStoreData {
80 meta: TaskStoreMeta {
81 version: 1,
82 project_id: meerkat_core::time_compat::new_uuid_v7().to_string(),
83 created_at: chrono::Utc::now().to_rfc3339(),
84 store_rev: 0,
85 },
86 tasks: Vec::new(),
87 });
88 }
89
90 let content = fs::read_to_string(&self.path)
91 .await
92 .map_err(|e| TaskError::StorageError(format!("Failed to read file: {e}")))?;
93
94 serde_json::from_str(&content)
95 .map_err(|e| TaskError::InvalidData(format!("Failed to parse JSON: {e}")))
96 }
97
98 async fn save(&self, data: &mut TaskStoreData) -> Result<(), TaskError> {
103 data.meta.store_rev += 1;
105
106 if let Some(parent) = self.path.parent() {
108 fs::create_dir_all(parent)
109 .await
110 .map_err(|e| TaskError::StorageError(format!("Failed to create directory: {e}")))?;
111 }
112
113 let temp_path = self.path.with_extension("json.tmp");
115 let content = serde_json::to_string_pretty(data)
116 .map_err(|e| TaskError::StorageError(format!("Failed to serialize: {e}")))?;
117
118 let mut file = fs::File::create(&temp_path)
119 .await
120 .map_err(|e| TaskError::StorageError(format!("Failed to create temp file: {e}")))?;
121
122 file.write_all(content.as_bytes())
123 .await
124 .map_err(|e| TaskError::StorageError(format!("Failed to write: {e}")))?;
125
126 file.sync_all()
127 .await
128 .map_err(|e| TaskError::StorageError(format!("Failed to sync: {e}")))?;
129
130 fs::rename(&temp_path, &self.path)
132 .await
133 .map_err(|e| TaskError::StorageError(format!("Failed to rename: {e}")))?;
134
135 Ok(())
136 }
137
138 fn enforce_retention(tasks: &mut Vec<Task>) -> usize {
139 let total = tasks.len();
140 if total <= MAX_TASKS {
141 let completed = tasks
142 .iter()
143 .filter(|task| task.status == TaskStatus::Completed)
144 .count();
145 if completed <= MAX_COMPLETED_TASKS {
146 return 0;
147 }
148 }
149
150 let mut remove = vec![false; total];
151
152 let mut completed_indices: Vec<usize> = tasks
153 .iter()
154 .enumerate()
155 .filter(|(_, task)| task.status == TaskStatus::Completed)
156 .map(|(idx, _)| idx)
157 .collect();
158 completed_indices.sort_by(|a, b| tasks[*a].updated_at.cmp(&tasks[*b].updated_at));
159
160 let completed_count = completed_indices.len();
161 let mut excess_completed = completed_count.saturating_sub(MAX_COMPLETED_TASKS);
162 for idx in completed_indices {
163 if excess_completed == 0 {
164 break;
165 }
166 remove[idx] = true;
167 excess_completed -= 1;
168 }
169
170 let mut removed = remove.iter().filter(|&&flag| flag).count();
171 let remaining = total.saturating_sub(removed);
172 let mut excess_total = remaining.saturating_sub(MAX_TASKS);
173 if excess_total > 0 {
174 let mut all_indices: Vec<usize> = (0..total).collect();
175 all_indices.sort_by(|a, b| tasks[*a].updated_at.cmp(&tasks[*b].updated_at));
176 for idx in all_indices {
177 if excess_total == 0 {
178 break;
179 }
180 if !remove[idx] {
181 remove[idx] = true;
182 removed += 1;
183 excess_total -= 1;
184 }
185 }
186 }
187
188 if removed > 0 {
189 let mut idx = 0usize;
190 tasks.retain(|_| {
191 let keep = !remove[idx];
192 idx += 1;
193 keep
194 });
195 }
196
197 removed
198 }
199}
200
201#[async_trait]
202impl TaskStore for FileTaskStore {
203 async fn list(&self) -> Result<Vec<Task>, TaskError> {
204 let _guard = self.lock.lock().await;
205 let data = self.load().await?;
206 Ok(data.tasks)
207 }
208
209 async fn get(&self, id: &TaskId) -> Result<Option<Task>, TaskError> {
210 let _guard = self.lock.lock().await;
211 let data = self.load().await?;
212 Ok(data.tasks.into_iter().find(|t| &t.id == id))
213 }
214
215 async fn create(&self, new_task: NewTask, session_id: Option<&str>) -> Result<Task, TaskError> {
216 let _guard = self.lock.lock().await;
217 let mut data = self.load().await?;
218
219 let now = chrono::Utc::now().to_rfc3339();
220 let task = Task {
221 id: TaskId::new(),
222 subject: new_task.subject,
223 description: new_task.description,
224 status: TaskStatus::default(),
225 priority: new_task.priority.unwrap_or_default(),
226 labels: new_task.labels.unwrap_or_default(),
227 blocks: new_task.blocks.unwrap_or_default(),
228 created_at: now.clone(),
229 updated_at: now,
230 created_by_session: session_id.map(String::from),
231 updated_by_session: session_id.map(String::from),
232 owner: new_task.owner,
233 metadata: new_task.metadata.unwrap_or_default(),
234 blocked_by: new_task.blocked_by.unwrap_or_default(),
235 };
236
237 data.tasks.push(task.clone());
238 Self::enforce_retention(&mut data.tasks);
239 self.save(&mut data).await?;
240
241 Ok(task)
242 }
243
244 async fn update(
245 &self,
246 id: &TaskId,
247 update: TaskUpdate,
248 session_id: Option<&str>,
249 ) -> Result<Task, TaskError> {
250 let _guard = self.lock.lock().await;
251 let mut data = self.load().await?;
252
253 let task = data
254 .tasks
255 .iter_mut()
256 .find(|t| &t.id == id)
257 .ok_or_else(|| TaskError::NotFound(id.0.clone()))?;
258
259 if let Some(subject) = update.subject {
260 task.subject = subject;
261 }
262 if let Some(description) = update.description {
263 task.description = description;
264 }
265 if let Some(status) = update.status {
266 task.status = status;
267 }
268 if let Some(priority) = update.priority {
269 task.priority = priority;
270 }
271 if let Some(labels) = update.labels {
272 task.labels = labels;
273 }
274 if let Some(add_blocks) = update.add_blocks {
275 for block_id in add_blocks {
276 if !task.blocks.contains(&block_id) {
277 task.blocks.push(block_id);
278 }
279 }
280 }
281 if let Some(remove_blocks) = update.remove_blocks {
282 task.blocks.retain(|b| !remove_blocks.contains(b));
283 }
284
285 if let Some(owner) = update.owner {
287 task.owner = Some(owner);
288 }
289 if let Some(metadata) = update.metadata {
290 for (key, value) in metadata {
291 if value.is_null() {
292 task.metadata.remove(&key);
294 } else {
295 task.metadata.insert(key, value);
296 }
297 }
298 }
299 if let Some(add_blocked_by) = update.add_blocked_by {
300 for block_id in add_blocked_by {
301 if !task.blocked_by.contains(&block_id) {
302 task.blocked_by.push(block_id);
303 }
304 }
305 }
306 if let Some(remove_blocked_by) = update.remove_blocked_by {
307 task.blocked_by.retain(|b| !remove_blocked_by.contains(b));
308 }
309
310 task.updated_at = chrono::Utc::now().to_rfc3339();
311 task.updated_by_session = session_id.map(String::from);
312
313 let updated_task = task.clone();
314 self.save(&mut data).await?;
315
316 Ok(updated_task)
317 }
318
319 async fn delete(&self, id: &TaskId) -> Result<(), TaskError> {
320 let _guard = self.lock.lock().await;
321 let mut data = self.load().await?;
322
323 let len_before = data.tasks.len();
324 data.tasks.retain(|t| &t.id != id);
325
326 if data.tasks.len() == len_before {
327 return Err(TaskError::NotFound(id.0.clone()));
328 }
329
330 self.save(&mut data).await?;
331 Ok(())
332 }
333}
334
335#[cfg(test)]
336#[allow(clippy::unwrap_used, clippy::expect_used)]
337mod tests {
338 use super::*;
339 use crate::builtin::types::{TaskPriority, TaskStatus};
340 use tempfile::TempDir;
341
342 fn create_temp_store() -> (TempDir, FileTaskStore) {
343 let temp_dir = TempDir::new().unwrap();
344 let store_path = temp_dir.path().join(".rkat").join("tasks.json");
345 let store = FileTaskStore::new(store_path);
346 (temp_dir, store)
347 }
348
349 #[tokio::test]
350 async fn test_file_store_create_and_get() {
351 let (_temp_dir, store) = create_temp_store();
352
353 let new_task = NewTask {
354 subject: "Test task".to_string(),
355 description: "Test description".to_string(),
356 priority: Some(TaskPriority::High),
357 labels: Some(vec!["test".to_string(), "important".to_string()]),
358 blocks: None,
359 ..Default::default()
360 };
361
362 let created = store.create(new_task, Some("session-1")).await.unwrap();
363
364 assert_eq!(created.subject, "Test task");
366 assert_eq!(created.description, "Test description");
367 assert_eq!(created.priority, TaskPriority::High);
368 assert_eq!(
369 created.labels,
370 vec!["test".to_string(), "important".to_string()]
371 );
372 assert_eq!(created.status, TaskStatus::Pending);
373 assert_eq!(created.created_by_session, Some("session-1".to_string()));
374 assert_eq!(created.updated_by_session, Some("session-1".to_string()));
375 assert!(!created.created_at.is_empty());
376 assert!(!created.updated_at.is_empty());
377 assert_eq!(created.id.0.len(), 36); let fetched = store.get(&created.id).await.unwrap();
381 assert!(fetched.is_some());
382 let fetched = fetched.unwrap();
383 assert_eq!(fetched.id, created.id);
384 assert_eq!(fetched.subject, created.subject);
385 assert_eq!(fetched.description, created.description);
386 }
387
388 #[tokio::test]
389 async fn test_file_store_list() {
390 let (_temp_dir, store) = create_temp_store();
391
392 let tasks = store.list().await.unwrap();
394 assert!(tasks.is_empty());
395
396 let task1 = NewTask {
398 subject: "Task 1".to_string(),
399 description: "First task".to_string(),
400 priority: Some(TaskPriority::Low),
401 labels: None,
402 blocks: None,
403 ..Default::default()
404 };
405 let task2 = NewTask {
406 subject: "Task 2".to_string(),
407 description: "Second task".to_string(),
408 priority: Some(TaskPriority::High),
409 labels: None,
410 blocks: None,
411 ..Default::default()
412 };
413 let task3 = NewTask {
414 subject: "Task 3".to_string(),
415 description: "Third task".to_string(),
416 priority: None,
417 labels: Some(vec!["urgent".to_string()]),
418 blocks: None,
419 ..Default::default()
420 };
421
422 let created1 = store.create(task1, None).await.unwrap();
423 let created2 = store.create(task2, None).await.unwrap();
424 let created3 = store.create(task3, None).await.unwrap();
425
426 let tasks = store.list().await.unwrap();
428 assert_eq!(tasks.len(), 3);
429
430 let ids: Vec<_> = tasks.iter().map(|t| &t.id).collect();
432 assert!(ids.contains(&&created1.id));
433 assert!(ids.contains(&&created2.id));
434 assert!(ids.contains(&&created3.id));
435 }
436
437 #[tokio::test]
438 async fn test_file_store_update() {
439 let (_temp_dir, store) = create_temp_store();
440
441 let new_task = NewTask {
442 subject: "Original subject".to_string(),
443 description: "Original description".to_string(),
444 priority: Some(TaskPriority::Low),
445 labels: Some(vec!["initial".to_string()]),
446 blocks: None,
447 ..Default::default()
448 };
449
450 let created = store.create(new_task, Some("session-1")).await.unwrap();
451 let original_created_at = created.created_at.clone();
452
453 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
455
456 let update = TaskUpdate {
458 subject: Some("Updated subject".to_string()),
459 description: Some("Updated description".to_string()),
460 status: Some(TaskStatus::InProgress),
461 priority: Some(TaskPriority::High),
462 labels: Some(vec!["updated".to_string(), "reviewed".to_string()]),
463 add_blocks: None,
464 remove_blocks: None,
465 owner: None,
466 metadata: None,
467 add_blocked_by: None,
468 remove_blocked_by: None,
469 };
470
471 let updated = store
472 .update(&created.id, update, Some("session-2"))
473 .await
474 .unwrap();
475
476 assert_eq!(updated.id, created.id);
478 assert_eq!(updated.subject, "Updated subject");
479 assert_eq!(updated.description, "Updated description");
480 assert_eq!(updated.status, TaskStatus::InProgress);
481 assert_eq!(updated.priority, TaskPriority::High);
482 assert_eq!(
483 updated.labels,
484 vec!["updated".to_string(), "reviewed".to_string()]
485 );
486 assert_eq!(updated.created_at, original_created_at); assert_eq!(updated.created_by_session, Some("session-1".to_string())); assert_eq!(updated.updated_by_session, Some("session-2".to_string()));
489 assert_ne!(updated.updated_at, original_created_at); }
491
492 #[tokio::test]
493 async fn test_file_store_delete() {
494 let (_temp_dir, store) = create_temp_store();
495
496 let new_task = NewTask {
497 subject: "To delete".to_string(),
498 description: "Will be deleted".to_string(),
499 priority: None,
500 labels: None,
501 blocks: None,
502 ..Default::default()
503 };
504
505 let created = store.create(new_task, None).await.unwrap();
506
507 assert!(store.get(&created.id).await.unwrap().is_some());
509
510 store.delete(&created.id).await.unwrap();
512
513 assert!(store.get(&created.id).await.unwrap().is_none());
515 }
516
517 #[tokio::test]
518 async fn test_file_store_delete_not_found() {
519 let (_temp_dir, store) = create_temp_store();
520
521 let result = store.delete(&TaskId::from_string("nonexistent")).await;
522 assert!(matches!(result, Err(TaskError::NotFound(_))));
523 }
524
525 #[tokio::test]
526 async fn test_file_store_update_not_found() {
527 let (_temp_dir, store) = create_temp_store();
528
529 let update = TaskUpdate {
530 subject: Some("Updated".to_string()),
531 ..Default::default()
532 };
533
534 let result = store
535 .update(&TaskId::from_string("nonexistent"), update, None)
536 .await;
537 assert!(matches!(result, Err(TaskError::NotFound(_))));
538 }
539
540 #[tokio::test]
541 async fn test_file_store_persistence() {
542 let temp_dir = TempDir::new().unwrap();
543 let store_path = temp_dir.path().join(".rkat").join("tasks.json");
544
545 let task_id;
547 {
548 let store = FileTaskStore::new(store_path.clone());
549 let new_task = NewTask {
550 subject: "Persisted task".to_string(),
551 description: "Should survive reload".to_string(),
552 priority: Some(TaskPriority::High),
553 labels: Some(vec!["persistent".to_string()]),
554 blocks: None,
555 ..Default::default()
556 };
557 let created = store.create(new_task, Some("session-1")).await.unwrap();
558 task_id = created.id;
559 }
561
562 {
564 let store = FileTaskStore::new(store_path.clone());
565 let fetched = store.get(&task_id).await.unwrap();
566 assert!(fetched.is_some());
567 let task = fetched.unwrap();
568 assert_eq!(task.subject, "Persisted task");
569 assert_eq!(task.description, "Should survive reload");
570 assert_eq!(task.priority, TaskPriority::High);
571 assert_eq!(task.labels, vec!["persistent".to_string()]);
572 assert_eq!(task.created_by_session, Some("session-1".to_string()));
573 }
574 }
575
576 #[tokio::test]
577 async fn test_file_store_creates_parent_dirs() {
578 let temp_dir = TempDir::new().unwrap();
579 let deeply_nested = temp_dir
580 .path()
581 .join("a")
582 .join("b")
583 .join("c")
584 .join("tasks.json");
585
586 assert!(!deeply_nested.parent().unwrap().exists());
588
589 let store = FileTaskStore::new(deeply_nested.clone());
590
591 let new_task = NewTask {
592 subject: "Nested task".to_string(),
593 description: "".to_string(),
594 priority: None,
595 labels: None,
596 blocks: None,
597 ..Default::default()
598 };
599
600 let created = store.create(new_task, None).await.unwrap();
601
602 assert!(deeply_nested.parent().unwrap().exists());
604 assert!(deeply_nested.exists());
605
606 let fetched = store.get(&created.id).await.unwrap();
608 assert!(fetched.is_some());
609 }
610
611 #[tokio::test]
612 async fn test_file_store_atomic_write() {
613 let temp_dir = TempDir::new().unwrap();
614 let store_path = temp_dir.path().join(".rkat").join("tasks.json");
615 let temp_path = store_path.with_extension("json.tmp");
616
617 let store = FileTaskStore::new(store_path.clone());
618
619 let new_task = NewTask {
621 subject: "Atomic test".to_string(),
622 description: "".to_string(),
623 priority: None,
624 labels: None,
625 blocks: None,
626 ..Default::default()
627 };
628
629 store.create(new_task, None).await.unwrap();
630
631 assert!(!temp_path.exists());
633
634 assert!(store_path.exists());
636 let content = fs::read_to_string(&store_path).await.unwrap();
637 let data: TaskStoreData = serde_json::from_str(&content).unwrap();
638 assert_eq!(data.tasks.len(), 1);
639 assert_eq!(data.meta.store_rev, 1);
640 }
641
642 #[tokio::test]
643 async fn test_file_store_store_rev_increments() {
644 let (_temp_dir, store) = create_temp_store();
645
646 let task1 = store
648 .create(
649 NewTask {
650 subject: "Task 1".to_string(),
651 description: "".to_string(),
652 priority: None,
653 labels: None,
654 blocks: None,
655 ..Default::default()
656 },
657 None,
658 )
659 .await
660 .unwrap();
661
662 let content = fs::read_to_string(store.path()).await.unwrap();
664 let data: TaskStoreData = serde_json::from_str(&content).unwrap();
665 assert_eq!(data.meta.store_rev, 1);
666
667 store
669 .create(
670 NewTask {
671 subject: "Task 2".to_string(),
672 description: "".to_string(),
673 priority: None,
674 labels: None,
675 blocks: None,
676 ..Default::default()
677 },
678 None,
679 )
680 .await
681 .unwrap();
682
683 let content = fs::read_to_string(store.path()).await.unwrap();
684 let data: TaskStoreData = serde_json::from_str(&content).unwrap();
685 assert_eq!(data.meta.store_rev, 2);
686
687 store
689 .update(
690 &task1.id,
691 TaskUpdate {
692 subject: Some("Updated".to_string()),
693 ..Default::default()
694 },
695 None,
696 )
697 .await
698 .unwrap();
699
700 let content = fs::read_to_string(store.path()).await.unwrap();
701 let data: TaskStoreData = serde_json::from_str(&content).unwrap();
702 assert_eq!(data.meta.store_rev, 3);
703
704 store.delete(&task1.id).await.unwrap();
706
707 let content = fs::read_to_string(store.path()).await.unwrap();
708 let data: TaskStoreData = serde_json::from_str(&content).unwrap();
709 assert_eq!(data.meta.store_rev, 4);
710 }
711
712 #[tokio::test]
713 async fn test_file_store_add_and_remove_blocks() {
714 let (_temp_dir, store) = create_temp_store();
715
716 let task1 = store
718 .create(
719 NewTask {
720 subject: "Task 1".to_string(),
721 description: "".to_string(),
722 priority: None,
723 labels: None,
724 blocks: None,
725 ..Default::default()
726 },
727 None,
728 )
729 .await
730 .unwrap();
731
732 let task2 = store
733 .create(
734 NewTask {
735 subject: "Task 2".to_string(),
736 description: "".to_string(),
737 priority: None,
738 labels: None,
739 blocks: None,
740 ..Default::default()
741 },
742 None,
743 )
744 .await
745 .unwrap();
746
747 let updated = store
749 .update(
750 &task1.id,
751 TaskUpdate {
752 add_blocks: Some(vec![task2.id.clone()]),
753 ..Default::default()
754 },
755 None,
756 )
757 .await
758 .unwrap();
759
760 assert_eq!(updated.blocks.len(), 1);
761 assert!(updated.blocks.contains(&task2.id));
762
763 let fetched = store.get(&task1.id).await.unwrap().unwrap();
765 assert_eq!(fetched.blocks.len(), 1);
766
767 let updated = store
769 .update(
770 &task1.id,
771 TaskUpdate {
772 remove_blocks: Some(vec![task2.id.clone()]),
773 ..Default::default()
774 },
775 None,
776 )
777 .await
778 .unwrap();
779
780 assert!(updated.blocks.is_empty());
781 }
782
783 #[tokio::test]
784 async fn test_file_store_in_project() {
785 let temp_dir = TempDir::new().unwrap();
786 let store = FileTaskStore::in_project(temp_dir.path());
787
788 let expected_path = temp_dir.path().join(".rkat").join("tasks.json");
789 assert_eq!(store.path(), &expected_path);
790 }
791
792 #[tokio::test]
793 async fn test_file_store_get_nonexistent() {
794 let (_temp_dir, store) = create_temp_store();
795
796 let result = store
797 .get(&TaskId::from_string("nonexistent"))
798 .await
799 .unwrap();
800 assert!(result.is_none());
801 }
802
803 #[tokio::test]
804 async fn test_file_store_create_with_defaults() {
805 let (_temp_dir, store) = create_temp_store();
806
807 let new_task = NewTask {
808 subject: "Simple task".to_string(),
809 description: "No optional fields".to_string(),
810 priority: None,
811 labels: None,
812 blocks: None,
813 ..Default::default()
814 };
815
816 let created = store.create(new_task, None).await.unwrap();
817
818 assert_eq!(created.priority, TaskPriority::Medium);
820 assert!(created.labels.is_empty());
821 assert!(created.blocks.is_empty());
822 assert!(created.created_by_session.is_none());
823 assert!(created.updated_by_session.is_none());
824 }
825}