Skip to main content

shuttle_rs/
task.rs

1use crate::core::{Event, EventFilter, EventStore, EventType, NewEvent, Result, ShuttleError};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::{json, Value};
5use std::collections::HashMap;
6use uuid::Uuid;
7
8pub const TAG_OPEN: &str = "task:open";
9pub const TAG_CLAIMED: &str = "task:claimed";
10pub const TAG_DONE: &str = "task:done";
11pub const TAG_HANDOFF_PENDING: &str = "handoff:pending";
12pub const TAG_HANDOFF_ACCEPTED: &str = "handoff:accepted";
13pub const TAG_HANDOFF_DONE: &str = "handoff:done";
14
15pub fn claim_tag(agent: &str) -> String {
16    format!("claimed_by:{agent}")
17}
18
19pub fn task_ref_tag(id: Uuid) -> String {
20    format!("task_ref:{id}")
21}
22
23pub fn handoff_ref_tag(id: Uuid) -> String {
24    format!("handoff_ref:{id}")
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum TaskStatus {
30    Open,
31    Claimed,
32    Completed,
33}
34
35impl TaskStatus {
36    pub fn as_str(&self) -> &'static str {
37        match self {
38            Self::Open => "open",
39            Self::Claimed => "claimed",
40            Self::Completed => "completed",
41        }
42    }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub struct TaskSummary {
47    pub id: Uuid,
48    pub status: TaskStatus,
49    pub content: String,
50    pub created_by: String,
51    pub claimed_by: Option<String>,
52    pub created_at: DateTime<Utc>,
53    pub updated_at: DateTime<Utc>,
54    pub source_event_ids: Vec<Uuid>,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum HandoffStatus {
60    Pending,
61    Accepted,
62    Completed,
63}
64
65impl HandoffStatus {
66    pub fn as_str(&self) -> &'static str {
67        match self {
68            Self::Pending => "pending",
69            Self::Accepted => "accepted",
70            Self::Completed => "completed",
71        }
72    }
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
76pub struct HandoffSummary {
77    pub id: Uuid,
78    pub status: HandoffStatus,
79    pub content: String,
80    pub from_agent: String,
81    pub to_agent: String,
82    pub accepted_by: Option<String>,
83    pub created_at: DateTime<Utc>,
84    pub updated_at: DateTime<Utc>,
85    pub source_event_ids: Vec<Uuid>,
86}
87
88pub fn new_task(workspace_id: String, agent: String, session_id: String, content: String) -> Event {
89    Event::new(NewEvent {
90        event_type: EventType::Task,
91        workspace_id,
92        repo_id: None,
93        repo_path: None,
94        git_remote: None,
95        bit_repo_id: None,
96        branch: None,
97        commit: None,
98        repo_dirty: None,
99        agent,
100        session_id,
101        title: Some("task".to_owned()),
102        content,
103        tags: vec![TAG_OPEN.to_owned()],
104        metadata_json: json!({ "action": "created", "status": "open" }),
105    })
106}
107
108pub fn new_claim(workspace_id: String, agent: String, session_id: String, task_id: Uuid) -> Event {
109    Event::new(NewEvent {
110        event_type: EventType::Task,
111        workspace_id,
112        repo_id: None,
113        repo_path: None,
114        git_remote: None,
115        bit_repo_id: None,
116        branch: None,
117        commit: None,
118        repo_dirty: None,
119        agent: agent.clone(),
120        session_id,
121        title: Some("task claim".to_owned()),
122        content: format!("claimed task {task_id}"),
123        tags: vec![
124            TAG_CLAIMED.to_owned(),
125            claim_tag(&agent),
126            task_ref_tag(task_id),
127        ],
128        metadata_json: json!({
129            "action": "claimed",
130            "status": "claimed",
131            "task_id": task_id,
132            "claimed_by": agent,
133        }),
134    })
135}
136
137pub fn new_task_update(
138    workspace_id: String,
139    agent: String,
140    session_id: String,
141    task_id: Uuid,
142    content: String,
143) -> Event {
144    Event::new(NewEvent {
145        event_type: EventType::Task,
146        workspace_id,
147        repo_id: None,
148        repo_path: None,
149        git_remote: None,
150        bit_repo_id: None,
151        branch: None,
152        commit: None,
153        repo_dirty: None,
154        agent,
155        session_id,
156        title: Some("task update".to_owned()),
157        content,
158        tags: vec![task_ref_tag(task_id)],
159        metadata_json: json!({ "action": "updated", "task_id": task_id }),
160    })
161}
162
163pub fn new_task_done(
164    workspace_id: String,
165    agent: String,
166    session_id: String,
167    task_id: Uuid,
168) -> Event {
169    Event::new(NewEvent {
170        event_type: EventType::Task,
171        workspace_id,
172        repo_id: None,
173        repo_path: None,
174        git_remote: None,
175        bit_repo_id: None,
176        branch: None,
177        commit: None,
178        repo_dirty: None,
179        agent,
180        session_id,
181        title: Some("task completed".to_owned()),
182        content: format!("completed task {task_id}"),
183        tags: vec![TAG_DONE.to_owned(), task_ref_tag(task_id)],
184        metadata_json: json!({
185            "action": "completed",
186            "status": "completed",
187            "task_id": task_id,
188        }),
189    })
190}
191
192pub fn new_handoff(
193    workspace_id: String,
194    from_agent: String,
195    session_id: String,
196    to_agent: String,
197    content: String,
198) -> Event {
199    Event::new(NewEvent {
200        event_type: EventType::Handoff,
201        workspace_id,
202        repo_id: None,
203        repo_path: None,
204        git_remote: None,
205        bit_repo_id: None,
206        branch: None,
207        commit: None,
208        repo_dirty: None,
209        agent: from_agent.clone(),
210        session_id,
211        title: Some("handoff".to_owned()),
212        content,
213        tags: vec![TAG_HANDOFF_PENDING.to_owned()],
214        metadata_json: json!({
215            "action": "requested",
216            "status": "pending",
217            "from": from_agent,
218            "to": to_agent,
219        }),
220    })
221}
222
223pub fn new_handoff_accept(
224    workspace_id: String,
225    agent: String,
226    session_id: String,
227    handoff_id: Uuid,
228) -> Event {
229    Event::new(NewEvent {
230        event_type: EventType::Handoff,
231        workspace_id,
232        repo_id: None,
233        repo_path: None,
234        git_remote: None,
235        bit_repo_id: None,
236        branch: None,
237        commit: None,
238        repo_dirty: None,
239        agent: agent.clone(),
240        session_id,
241        title: Some("handoff accepted".to_owned()),
242        content: format!("accepted handoff {handoff_id}"),
243        tags: vec![
244            TAG_HANDOFF_ACCEPTED.to_owned(),
245            handoff_ref_tag(handoff_id),
246            claim_tag(&agent),
247        ],
248        metadata_json: json!({
249            "action": "accepted",
250            "status": "accepted",
251            "handoff_id": handoff_id,
252            "accepted_by": agent,
253        }),
254    })
255}
256
257pub fn new_handoff_done(
258    workspace_id: String,
259    agent: String,
260    session_id: String,
261    handoff_id: Uuid,
262) -> Event {
263    Event::new(NewEvent {
264        event_type: EventType::Handoff,
265        workspace_id,
266        repo_id: None,
267        repo_path: None,
268        git_remote: None,
269        bit_repo_id: None,
270        branch: None,
271        commit: None,
272        repo_dirty: None,
273        agent,
274        session_id,
275        title: Some("handoff completed".to_owned()),
276        content: format!("completed handoff {handoff_id}"),
277        tags: vec![TAG_HANDOFF_DONE.to_owned(), handoff_ref_tag(handoff_id)],
278        metadata_json: json!({
279            "action": "completed",
280            "status": "completed",
281            "handoff_id": handoff_id,
282        }),
283    })
284}
285
286pub async fn list(store: &impl EventStore) -> Result<Vec<TaskSummary>> {
287    tasks(store, None, None).await
288}
289
290pub async fn tasks(
291    store: &impl EventStore,
292    workspace_id: Option<&str>,
293    limit: Option<u32>,
294) -> Result<Vec<TaskSummary>> {
295    let events = store
296        .list(EventFilter {
297            event_type: Some(EventType::Task),
298            workspace_id: workspace_id.map(ToOwned::to_owned),
299            limit: Some(u32::MAX),
300            ..EventFilter::default()
301        })
302        .await?;
303    let mut tasks = project_tasks(events);
304    tasks.sort_by(|left, right| {
305        right
306            .updated_at
307            .cmp(&left.updated_at)
308            .then(left.id.cmp(&right.id))
309    });
310    if let Some(limit) = limit {
311        tasks.truncate(limit as usize);
312    }
313    Ok(tasks)
314}
315
316pub async fn open_tasks(
317    store: &impl EventStore,
318    workspace_id: &str,
319    limit: Option<u32>,
320) -> Result<Vec<TaskSummary>> {
321    let mut tasks = tasks(store, Some(workspace_id), None).await?;
322    tasks.retain(|task| task.status == TaskStatus::Open);
323    tasks.truncate(limit.unwrap_or(20) as usize);
324    Ok(tasks)
325}
326
327pub async fn ensure_task_exists(
328    store: &impl EventStore,
329    workspace_id: &str,
330    task_id: Uuid,
331) -> Result<()> {
332    if tasks(store, Some(workspace_id), None)
333        .await?
334        .iter()
335        .any(|task| task.id == task_id)
336    {
337        Ok(())
338    } else {
339        Err(ShuttleError::Store(format!("unknown task id: {task_id}")))
340    }
341}
342
343pub async fn claimed_tasks(
344    store: &impl EventStore,
345    workspace_id: &str,
346    limit: Option<u32>,
347) -> Result<Vec<TaskSummary>> {
348    let mut tasks = tasks(store, Some(workspace_id), None).await?;
349    tasks.retain(|task| task.status == TaskStatus::Claimed);
350    tasks.truncate(limit.unwrap_or(20) as usize);
351    Ok(tasks)
352}
353
354pub async fn handoffs(
355    store: &impl EventStore,
356    workspace_id: Option<&str>,
357    limit: Option<u32>,
358) -> Result<Vec<HandoffSummary>> {
359    let events = store
360        .list(EventFilter {
361            event_type: Some(EventType::Handoff),
362            workspace_id: workspace_id.map(ToOwned::to_owned),
363            limit: Some(u32::MAX),
364            ..EventFilter::default()
365        })
366        .await?;
367    let mut handoffs = project_handoffs(events);
368    handoffs.sort_by(|left, right| {
369        right
370            .updated_at
371            .cmp(&left.updated_at)
372            .then(left.id.cmp(&right.id))
373    });
374    if let Some(limit) = limit {
375        handoffs.truncate(limit as usize);
376    }
377    Ok(handoffs)
378}
379
380pub async fn pending_handoffs(
381    store: &impl EventStore,
382    workspace_id: &str,
383    limit: Option<u32>,
384) -> Result<Vec<HandoffSummary>> {
385    let mut handoffs = handoffs(store, Some(workspace_id), None).await?;
386    handoffs.retain(|handoff| handoff.status == HandoffStatus::Pending);
387    handoffs.truncate(limit.unwrap_or(20) as usize);
388    Ok(handoffs)
389}
390
391pub async fn completed_handoffs(
392    store: &impl EventStore,
393    workspace_id: &str,
394    limit: Option<u32>,
395) -> Result<Vec<HandoffSummary>> {
396    let mut handoffs = handoffs(store, Some(workspace_id), None).await?;
397    handoffs.retain(|handoff| handoff.status == HandoffStatus::Completed);
398    handoffs.truncate(limit.unwrap_or(20) as usize);
399    Ok(handoffs)
400}
401
402pub async fn ensure_handoff_exists(
403    store: &impl EventStore,
404    workspace_id: &str,
405    handoff_id: Uuid,
406) -> Result<()> {
407    if handoffs(store, Some(workspace_id), None)
408        .await?
409        .iter()
410        .any(|handoff| handoff.id == handoff_id)
411    {
412        Ok(())
413    } else {
414        Err(ShuttleError::Store(format!(
415            "unknown handoff id: {handoff_id}"
416        )))
417    }
418}
419
420fn project_tasks(events: Vec<Event>) -> Vec<TaskSummary> {
421    let mut events = events;
422    events.sort_by(|left, right| {
423        left.created_at
424            .cmp(&right.created_at)
425            .then(left.id.cmp(&right.id))
426    });
427    let mut tasks: HashMap<Uuid, TaskSummary> = HashMap::new();
428    for event in events {
429        let action = action(&event);
430        let task_id = referenced_id(&event, "task_id", "task_ref").unwrap_or(event.id);
431        match action.as_deref() {
432            Some("claimed") => {
433                if let Some(task) = tasks.get_mut(&task_id) {
434                    task.status = TaskStatus::Claimed;
435                    task.claimed_by = Some(
436                        string_metadata(&event.metadata_json, "claimed_by")
437                            .unwrap_or_else(|| event.agent.clone()),
438                    );
439                    task.updated_at = event.created_at;
440                    task.source_event_ids.push(event.id);
441                }
442            }
443            Some("updated") => {
444                if let Some(task) = tasks.get_mut(&task_id) {
445                    task.content = event.content.clone();
446                    task.updated_at = event.created_at;
447                    task.source_event_ids.push(event.id);
448                }
449            }
450            Some("completed") => {
451                if let Some(task) = tasks.get_mut(&task_id) {
452                    task.status = TaskStatus::Completed;
453                    task.updated_at = event.created_at;
454                    task.source_event_ids.push(event.id);
455                }
456            }
457            _ if event.tags.iter().any(|tag| tag == TAG_OPEN) => {
458                tasks.entry(task_id).or_insert_with(|| TaskSummary {
459                    id: task_id,
460                    status: TaskStatus::Open,
461                    content: event.content.clone(),
462                    created_by: event.agent.clone(),
463                    claimed_by: None,
464                    created_at: event.created_at,
465                    updated_at: event.created_at,
466                    source_event_ids: vec![event.id],
467                });
468            }
469            _ if event.tags.iter().any(|tag| tag == TAG_CLAIMED) => {
470                if let Some(task) = tasks.get_mut(&task_id) {
471                    task.status = TaskStatus::Claimed;
472                    task.claimed_by = Some(event.agent.clone());
473                    task.updated_at = event.created_at;
474                    task.source_event_ids.push(event.id);
475                }
476            }
477            _ => {}
478        }
479    }
480    tasks.into_values().collect()
481}
482
483fn project_handoffs(events: Vec<Event>) -> Vec<HandoffSummary> {
484    let mut events = events;
485    events.sort_by(|left, right| {
486        left.created_at
487            .cmp(&right.created_at)
488            .then(left.id.cmp(&right.id))
489    });
490    let mut handoffs: HashMap<Uuid, HandoffSummary> = HashMap::new();
491    for event in events {
492        let action = action(&event);
493        let handoff_id = referenced_id(&event, "handoff_id", "handoff_ref").unwrap_or(event.id);
494        match action.as_deref() {
495            Some("accepted") => {
496                if let Some(handoff) = handoffs.get_mut(&handoff_id) {
497                    handoff.status = HandoffStatus::Accepted;
498                    handoff.accepted_by = Some(
499                        string_metadata(&event.metadata_json, "accepted_by")
500                            .unwrap_or_else(|| event.agent.clone()),
501                    );
502                    handoff.updated_at = event.created_at;
503                    handoff.source_event_ids.push(event.id);
504                }
505            }
506            Some("completed") => {
507                if let Some(handoff) = handoffs.get_mut(&handoff_id) {
508                    handoff.status = HandoffStatus::Completed;
509                    handoff.updated_at = event.created_at;
510                    handoff.source_event_ids.push(event.id);
511                }
512            }
513            _ => {
514                let to_agent = string_metadata(&event.metadata_json, "to");
515                if let Some(to_agent) = to_agent {
516                    handoffs
517                        .entry(handoff_id)
518                        .or_insert_with(|| HandoffSummary {
519                            id: handoff_id,
520                            status: HandoffStatus::Pending,
521                            content: event.content.clone(),
522                            from_agent: string_metadata(&event.metadata_json, "from")
523                                .unwrap_or_else(|| event.agent.clone()),
524                            to_agent,
525                            accepted_by: None,
526                            created_at: event.created_at,
527                            updated_at: event.created_at,
528                            source_event_ids: vec![event.id],
529                        });
530                }
531            }
532        }
533    }
534    handoffs.into_values().collect()
535}
536
537fn action(event: &Event) -> Option<String> {
538    string_metadata(&event.metadata_json, "action")
539}
540
541fn string_metadata(metadata: &Value, key: &str) -> Option<String> {
542    metadata
543        .get(key)
544        .and_then(Value::as_str)
545        .map(ToOwned::to_owned)
546}
547
548fn referenced_id(event: &Event, metadata_key: &str, tag_prefix: &str) -> Option<Uuid> {
549    event
550        .metadata_json
551        .get(metadata_key)
552        .and_then(Value::as_str)
553        .and_then(|value| Uuid::parse_str(value).ok())
554        .or_else(|| {
555            event
556                .tags
557                .iter()
558                .filter_map(|tag| tag.strip_prefix(&format!("{tag_prefix}:")))
559                .find_map(|id| Uuid::parse_str(id).ok())
560        })
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use crate::core::EventStore;
567    use crate::store::SqliteEventStore;
568
569    #[test]
570    fn task_create_and_claim_use_event_tags() {
571        let task = new_task(
572            "workspace".into(),
573            "codex".into(),
574            "session".into(),
575            "ship mvp".into(),
576        );
577        assert_eq!(task.event_type, EventType::Task);
578        assert_eq!(task.tags, vec![TAG_OPEN]);
579
580        let claim = new_claim(
581            "workspace".into(),
582            "codex".into(),
583            "session".into(),
584            task.id,
585        );
586        assert!(claim.tags.contains(&TAG_CLAIMED.to_owned()));
587        assert!(claim.tags.contains(&"claimed_by:codex".to_owned()));
588        assert!(claim.tags.contains(&format!("task_ref:{}", task.id)));
589    }
590
591    #[test]
592    fn open_tasks_excludes_claimed_tasks() {
593        let dir = tempfile::tempdir().unwrap();
594        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
595        let first = new_task(
596            "workspace".into(),
597            "codex".into(),
598            "session".into(),
599            "ship first".into(),
600        );
601        let second = new_task(
602            "workspace".into(),
603            "codex".into(),
604            "session".into(),
605            "ship second".into(),
606        );
607        let claim = new_claim(
608            "workspace".into(),
609            "claude".into(),
610            "session".into(),
611            first.id,
612        );
613
614        futures_executor::block_on(store.append(first)).unwrap();
615        futures_executor::block_on(store.append(second)).unwrap();
616        futures_executor::block_on(store.append(claim)).unwrap();
617
618        let tasks = futures_executor::block_on(open_tasks(&store, "workspace", None)).unwrap();
619
620        assert_eq!(tasks.len(), 1);
621        assert_eq!(tasks[0].content, "ship second");
622    }
623
624    #[test]
625    fn open_tasks_considers_claims_beyond_default_projection_window() {
626        let dir = tempfile::tempdir().unwrap();
627        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
628        let claimed_task = new_task(
629            "workspace".into(),
630            "codex".into(),
631            "session".into(),
632            "claimed".into(),
633        );
634        let open_task = new_task(
635            "workspace".into(),
636            "codex".into(),
637            "session".into(),
638            "still open".into(),
639        );
640        let old_claim = new_claim(
641            "workspace".into(),
642            "claude".into(),
643            "session".into(),
644            claimed_task.id,
645        );
646        futures_executor::block_on(store.append(claimed_task)).unwrap();
647        futures_executor::block_on(store.append(open_task)).unwrap();
648        futures_executor::block_on(store.append(old_claim)).unwrap();
649
650        for _ in 0..500 {
651            let task = new_task(
652                "workspace".into(),
653                "codex".into(),
654                "session".into(),
655                "noise".into(),
656            );
657            let claim = new_claim(
658                "workspace".into(),
659                "claude".into(),
660                "session".into(),
661                task.id,
662            );
663            futures_executor::block_on(store.append(task)).unwrap();
664            futures_executor::block_on(store.append(claim)).unwrap();
665        }
666
667        let tasks = futures_executor::block_on(open_tasks(&store, "workspace", None)).unwrap();
668
669        assert_eq!(tasks.len(), 1);
670        assert_eq!(tasks[0].content, "still open");
671    }
672
673    #[test]
674    fn task_projection_tracks_update_claim_and_completion() {
675        let dir = tempfile::tempdir().unwrap();
676        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
677        let task = new_task(
678            "workspace".into(),
679            "codex".into(),
680            "session".into(),
681            "first description".into(),
682        );
683        let update = new_task_update(
684            "workspace".into(),
685            "codex".into(),
686            "session".into(),
687            task.id,
688            "latest description".into(),
689        );
690        let claim = new_claim(
691            "workspace".into(),
692            "claude".into(),
693            "session".into(),
694            task.id,
695        );
696        let done = new_task_done(
697            "workspace".into(),
698            "claude".into(),
699            "session".into(),
700            task.id,
701        );
702
703        futures_executor::block_on(store.append(task)).unwrap();
704        futures_executor::block_on(store.append(update)).unwrap();
705        futures_executor::block_on(store.append(claim)).unwrap();
706        futures_executor::block_on(store.append(done)).unwrap();
707
708        let tasks = futures_executor::block_on(tasks(&store, Some("workspace"), None)).unwrap();
709        let open = futures_executor::block_on(open_tasks(&store, "workspace", None)).unwrap();
710
711        assert_eq!(tasks.len(), 1);
712        assert_eq!(tasks[0].status, TaskStatus::Completed);
713        assert_eq!(tasks[0].content, "latest description");
714        assert_eq!(tasks[0].claimed_by.as_deref(), Some("claude"));
715        assert!(open.is_empty());
716    }
717
718    #[test]
719    fn handoff_projection_tracks_accept_and_completion() {
720        let dir = tempfile::tempdir().unwrap();
721        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
722        let handoff = new_handoff(
723            "workspace".into(),
724            "codex".into(),
725            "session".into(),
726            "claude".into(),
727            "continue this branch".into(),
728        );
729        let accept = new_handoff_accept(
730            "workspace".into(),
731            "claude".into(),
732            "session".into(),
733            handoff.id,
734        );
735        let done = new_handoff_done(
736            "workspace".into(),
737            "claude".into(),
738            "session".into(),
739            handoff.id,
740        );
741
742        futures_executor::block_on(store.append(handoff)).unwrap();
743        let pending =
744            futures_executor::block_on(pending_handoffs(&store, "workspace", None)).unwrap();
745        assert_eq!(pending.len(), 1);
746        assert_eq!(pending[0].to_agent, "claude");
747
748        futures_executor::block_on(store.append(accept)).unwrap();
749        futures_executor::block_on(store.append(done)).unwrap();
750
751        let handoffs =
752            futures_executor::block_on(handoffs(&store, Some("workspace"), None)).unwrap();
753        let completed =
754            futures_executor::block_on(completed_handoffs(&store, "workspace", None)).unwrap();
755
756        assert_eq!(handoffs.len(), 1);
757        assert_eq!(handoffs[0].status, HandoffStatus::Completed);
758        assert_eq!(handoffs[0].accepted_by.as_deref(), Some("claude"));
759        assert_eq!(completed.len(), 1);
760    }
761}