Skip to main content

meerkat_mob/
tasks.rs

1//! Shared task board for mob coordination.
2//!
3//! The `TaskBoard` is a projection built from `TaskCreated` and `TaskUpdated`
4//! events. It provides the current view of all tasks in a mob.
5
6use crate::MobError;
7use crate::event::NewMobEvent;
8use crate::event::{MobEvent, MobEventKind};
9use crate::ids::{MeerkatId, MobId, TaskId};
10use crate::store::MobEventStore;
11#[cfg(target_arch = "wasm32")]
12use crate::tokio;
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15use std::collections::BTreeMap;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18
19/// Task lifecycle status.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum TaskStatus {
23    /// Task is open and available for claiming.
24    Open,
25    /// Task is currently being worked on.
26    InProgress,
27    /// Task has been completed successfully.
28    Completed,
29    /// Task has been cancelled.
30    Cancelled,
31}
32
33/// A task on the shared task board.
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct MobTask {
36    /// Unique task identifier.
37    pub id: TaskId,
38    /// Short subject line.
39    pub subject: String,
40    /// Detailed description.
41    pub description: String,
42    /// Current status.
43    pub status: TaskStatus,
44    /// Assigned owner (meerkat ID), if any.
45    pub owner: Option<MeerkatId>,
46    /// Task IDs that block this task.
47    pub blocked_by: Vec<TaskId>,
48    /// When the task was created.
49    pub created_at: DateTime<Utc>,
50    /// When the task was last updated.
51    pub updated_at: DateTime<Utc>,
52}
53
54/// Projected view of all tasks in a mob, built from events.
55#[derive(Debug, Clone, Default)]
56pub struct TaskBoard {
57    tasks: BTreeMap<TaskId, MobTask>,
58}
59
60/// Explicit task-board owner that validates updates and maintains the projection.
61#[derive(Clone)]
62pub struct MobTaskBoardService {
63    mob_id: MobId,
64    board: Arc<RwLock<TaskBoard>>,
65    events: Arc<dyn MobEventStore>,
66}
67
68impl MobTaskBoardService {
69    pub fn new(
70        mob_id: MobId,
71        board: Arc<RwLock<TaskBoard>>,
72        events: Arc<dyn MobEventStore>,
73    ) -> Self {
74        Self {
75            mob_id,
76            board,
77            events,
78        }
79    }
80
81    pub async fn create_task(
82        &self,
83        subject: String,
84        description: String,
85        blocked_by: Vec<TaskId>,
86    ) -> Result<TaskId, MobError> {
87        if subject.trim().is_empty() {
88            return Err(MobError::Internal(
89                "task subject cannot be empty".to_string(),
90            ));
91        }
92
93        let task_id = TaskId::from(uuid::Uuid::new_v4().to_string());
94        let appended = self
95            .events
96            .append(NewMobEvent {
97                mob_id: self.mob_id.clone(),
98                timestamp: None,
99                kind: MobEventKind::TaskCreated {
100                    task_id: task_id.clone(),
101                    subject,
102                    description,
103                    blocked_by,
104                },
105            })
106            .await?;
107        self.board.write().await.apply(&appended);
108        Ok(task_id)
109    }
110
111    pub async fn update_task(
112        &self,
113        task_id: TaskId,
114        status: TaskStatus,
115        owner: Option<MeerkatId>,
116    ) -> Result<(), MobError> {
117        // We treat `owner` as an *optional* claim/mutation field.
118        //
119        // Contract:
120        // - Owner changes are only applied when `status == in_progress`.
121        // - For other status transitions (open/completed/cancelled), any provided
122        //   `owner` value is ignored and the current owner is preserved.
123        //
124        // Rationale: some tool-schema layers may erroneously require sending
125        // `owner` even when completing/cancelling a task. Ignoring `owner` for
126        // non-in_progress transitions avoids spurious failures while still
127        // preventing owner changes outside of in_progress.
128        let effective_owner = {
129            let board = self.board.read().await;
130            let task = board
131                .get(&task_id)
132                .ok_or_else(|| MobError::Internal(format!("task '{task_id}' not found")))?;
133            let current_owner = task.owner.clone();
134
135            if matches!(status, TaskStatus::InProgress) {
136                if let Some(new_owner) = owner {
137                    let blocked = task.blocked_by.iter().any(|dependency| {
138                        board.get(dependency).map(|t| t.status) != Some(TaskStatus::Completed)
139                    });
140                    if blocked {
141                        return Err(MobError::Internal(format!(
142                            "task '{task_id}' is blocked by incomplete dependencies"
143                        )));
144                    }
145                    Some(new_owner)
146                } else {
147                    // No owner supplied: preserve current owner (if any).
148                    current_owner
149                }
150            } else {
151                // Owner is not mutable for non-in_progress statuses.
152                current_owner
153            }
154        };
155
156        let appended = self
157            .events
158            .append(NewMobEvent {
159                mob_id: self.mob_id.clone(),
160                timestamp: None,
161                kind: MobEventKind::TaskUpdated {
162                    task_id,
163                    status,
164                    owner: effective_owner,
165                },
166            })
167            .await?;
168        self.board.write().await.apply(&appended);
169        Ok(())
170    }
171
172    pub async fn clear(&self) {
173        self.board.write().await.clear();
174    }
175}
176
177impl TaskBoard {
178    /// Build a `TaskBoard` from a sequence of mob events.
179    ///
180    /// Only `TaskCreated` and `TaskUpdated` events are considered.
181    pub fn project(events: &[MobEvent]) -> Self {
182        let mut board = Self::default();
183        for event in events {
184            board.apply(event);
185        }
186        board
187    }
188
189    /// Apply a single event to update the task board state.
190    pub fn apply(&mut self, event: &MobEvent) {
191        match &event.kind {
192            MobEventKind::TaskCreated {
193                task_id,
194                subject,
195                description,
196                blocked_by,
197            } => {
198                self.tasks.insert(
199                    task_id.clone(),
200                    MobTask {
201                        id: task_id.clone(),
202                        subject: subject.clone(),
203                        description: description.clone(),
204                        status: TaskStatus::Open,
205                        owner: None,
206                        blocked_by: blocked_by.clone(),
207                        created_at: event.timestamp,
208                        updated_at: event.timestamp,
209                    },
210                );
211            }
212            MobEventKind::TaskUpdated {
213                task_id,
214                status,
215                owner,
216            } => {
217                if let Some(task) = self.tasks.get_mut(task_id) {
218                    task.status = *status;
219                    task.owner = owner.clone();
220                    task.updated_at = event.timestamp;
221                } else {
222                    tracing::warn!(
223                        task_id = %task_id,
224                        cursor = event.cursor,
225                        "task update ignored for unknown task id"
226                    );
227                }
228            }
229            MobEventKind::MobReset => {
230                self.tasks.clear();
231            }
232            _ => {}
233        }
234    }
235
236    /// Get a task by ID.
237    pub fn get(&self, task_id: &TaskId) -> Option<&MobTask> {
238        self.tasks.get(task_id)
239    }
240
241    /// List all tasks.
242    pub fn list(&self) -> impl Iterator<Item = &MobTask> {
243        self.tasks.values()
244    }
245
246    /// Number of tasks on the board.
247    pub fn len(&self) -> usize {
248        self.tasks.len()
249    }
250
251    /// Whether the board is empty.
252    pub fn is_empty(&self) -> bool {
253        self.tasks.is_empty()
254    }
255
256    /// Remove all tasks from the board.
257    pub fn clear(&mut self) {
258        self.tasks.clear();
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use crate::ids::MobId;
266    use crate::store::InMemoryMobEventStore;
267    use std::sync::Arc;
268    use tokio::sync::RwLock;
269
270    fn make_event(cursor: u64, kind: MobEventKind) -> MobEvent {
271        MobEvent {
272            cursor,
273            timestamp: Utc::now(),
274            mob_id: MobId::from("test-mob"),
275            kind,
276        }
277    }
278
279    #[test]
280    fn test_task_status_serde_roundtrip() {
281        for status in [
282            TaskStatus::Open,
283            TaskStatus::InProgress,
284            TaskStatus::Completed,
285            TaskStatus::Cancelled,
286        ] {
287            let json = serde_json::to_string(&status).unwrap();
288            let parsed: TaskStatus = serde_json::from_str(&json).unwrap();
289            assert_eq!(parsed, status);
290        }
291    }
292
293    #[test]
294    fn test_mob_task_serde_roundtrip() {
295        let task = MobTask {
296            id: TaskId::from("task-001"),
297            subject: "Build widget".to_string(),
298            description: "A detailed description".to_string(),
299            status: TaskStatus::InProgress,
300            owner: Some(MeerkatId::from("agent-1")),
301            blocked_by: vec![TaskId::from("task-000")],
302            created_at: Utc::now(),
303            updated_at: Utc::now(),
304        };
305        let json = serde_json::to_string(&task).unwrap();
306        let parsed: MobTask = serde_json::from_str(&json).unwrap();
307        assert_eq!(parsed.id, task.id);
308        assert_eq!(parsed.status, TaskStatus::InProgress);
309        assert_eq!(parsed.owner, Some(MeerkatId::from("agent-1")));
310    }
311
312    #[test]
313    fn test_task_board_project_empty() {
314        let board = TaskBoard::project(&[]);
315        assert!(board.is_empty());
316        assert_eq!(board.len(), 0);
317    }
318
319    #[test]
320    fn test_task_board_project_create() {
321        let events = vec![make_event(
322            1,
323            MobEventKind::TaskCreated {
324                task_id: TaskId::from("t1"),
325                subject: "Task 1".to_string(),
326                description: "Do something".to_string(),
327                blocked_by: vec![],
328            },
329        )];
330        let board = TaskBoard::project(&events);
331        assert_eq!(board.len(), 1);
332        let task_id = TaskId::from("t1");
333        let task = board.get(&task_id).unwrap();
334        assert_eq!(task.subject, "Task 1");
335        assert_eq!(task.status, TaskStatus::Open);
336        assert!(task.owner.is_none());
337    }
338
339    #[test]
340    fn test_task_board_project_create_and_update() {
341        let events = vec![
342            make_event(
343                1,
344                MobEventKind::TaskCreated {
345                    task_id: TaskId::from("t1"),
346                    subject: "Task 1".to_string(),
347                    description: "Do something".to_string(),
348                    blocked_by: vec![TaskId::from("t0")],
349                },
350            ),
351            make_event(
352                2,
353                MobEventKind::TaskUpdated {
354                    task_id: TaskId::from("t1"),
355                    status: TaskStatus::InProgress,
356                    owner: Some(MeerkatId::from("agent-1")),
357                },
358            ),
359            make_event(
360                3,
361                MobEventKind::TaskUpdated {
362                    task_id: TaskId::from("t1"),
363                    status: TaskStatus::Completed,
364                    owner: Some(MeerkatId::from("agent-1")),
365                },
366            ),
367        ];
368        let board = TaskBoard::project(&events);
369        let task_id = TaskId::from("t1");
370        let task = board.get(&task_id).unwrap();
371        assert_eq!(task.status, TaskStatus::Completed);
372        assert_eq!(task.owner, Some(MeerkatId::from("agent-1")));
373        assert_eq!(task.blocked_by, vec![TaskId::from("t0")]);
374    }
375
376    #[test]
377    fn test_task_board_ignores_non_task_events() {
378        let events = vec![
379            make_event(1, MobEventKind::MobCompleted),
380            make_event(
381                2,
382                MobEventKind::PeersWired {
383                    a: MeerkatId::from("a"),
384                    b: MeerkatId::from("b"),
385                },
386            ),
387        ];
388        let board = TaskBoard::project(&events);
389        assert!(board.is_empty());
390    }
391
392    #[test]
393    fn test_task_board_update_nonexistent_task_is_noop() {
394        let events = vec![make_event(
395            1,
396            MobEventKind::TaskUpdated {
397                task_id: TaskId::from("nonexistent"),
398                status: TaskStatus::Completed,
399                owner: None,
400            },
401        )];
402        let board = TaskBoard::project(&events);
403        assert!(board.is_empty());
404    }
405
406    #[test]
407    fn test_task_board_multiple_tasks() {
408        let events = vec![
409            make_event(
410                1,
411                MobEventKind::TaskCreated {
412                    task_id: TaskId::from("t1"),
413                    subject: "Task 1".to_string(),
414                    description: "First".to_string(),
415                    blocked_by: vec![],
416                },
417            ),
418            make_event(
419                2,
420                MobEventKind::TaskCreated {
421                    task_id: TaskId::from("t2"),
422                    subject: "Task 2".to_string(),
423                    description: "Second".to_string(),
424                    blocked_by: vec![TaskId::from("t1")],
425                },
426            ),
427        ];
428        let board = TaskBoard::project(&events);
429        assert_eq!(board.len(), 2);
430        let tasks: Vec<_> = board.list().collect();
431        assert_eq!(tasks.len(), 2);
432    }
433
434    #[test]
435    fn test_task_board_idempotent_replay() {
436        let events = vec![
437            make_event(
438                1,
439                MobEventKind::TaskCreated {
440                    task_id: TaskId::from("t1"),
441                    subject: "Task 1".to_string(),
442                    description: "First".to_string(),
443                    blocked_by: vec![],
444                },
445            ),
446            make_event(
447                2,
448                MobEventKind::TaskUpdated {
449                    task_id: TaskId::from("t1"),
450                    status: TaskStatus::Completed,
451                    owner: None,
452                },
453            ),
454        ];
455        let board1 = TaskBoard::project(&events);
456        let board2 = TaskBoard::project(&events);
457        let task_id = TaskId::from("t1");
458        assert_eq!(
459            board1.get(&task_id).unwrap().status,
460            board2.get(&task_id).unwrap().status
461        );
462    }
463
464    #[tokio::test]
465    async fn task_board_service_validates_dependency_gated_claims() {
466        let board = Arc::new(RwLock::new(TaskBoard::default()));
467        let service = MobTaskBoardService::new(
468            MobId::from("service-mob"),
469            board.clone(),
470            Arc::new(InMemoryMobEventStore::new()),
471        );
472
473        let blocker = service
474            .create_task("Blocker".into(), "done first".into(), vec![])
475            .await
476            .expect("create blocker");
477        let blocked = service
478            .create_task(
479                "Blocked".into(),
480                "done second".into(),
481                vec![blocker.clone()],
482            )
483            .await
484            .expect("create blocked task");
485
486        let err = service
487            .update_task(
488                blocked.clone(),
489                TaskStatus::InProgress,
490                Some(MeerkatId::from("worker-1")),
491            )
492            .await
493            .expect_err("blocked task claim should be rejected");
494        assert!(
495            err.to_string()
496                .contains("blocked by incomplete dependencies")
497        );
498
499        service
500            .update_task(blocker, TaskStatus::Completed, None)
501            .await
502            .expect("complete blocker");
503        service
504            .update_task(
505                blocked.clone(),
506                TaskStatus::InProgress,
507                Some(MeerkatId::from("worker-1")),
508            )
509            .await
510            .expect("claim unblocked task");
511
512        let board = board.read().await;
513        assert_eq!(
514            board.get(&blocked).expect("blocked task snapshot").owner,
515            Some(MeerkatId::from("worker-1"))
516        );
517    }
518}