1use crate::core::{Event, EventFilter, EventStore, EventType, NewEvent, Result, ShuttleError};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::{json, Value};
5use std::collections::HashMap;
6use uuid::Uuid;
7
8pub const TAG_OPEN: &str = "task:open";
9pub const TAG_CLAIMED: &str = "task:claimed";
10pub const TAG_DONE: &str = "task:done";
11pub const TAG_HANDOFF_PENDING: &str = "handoff:pending";
12pub const TAG_HANDOFF_ACCEPTED: &str = "handoff:accepted";
13pub const TAG_HANDOFF_DONE: &str = "handoff:done";
14
15pub fn claim_tag(agent: &str) -> String {
16 format!("claimed_by:{agent}")
17}
18
19pub fn task_ref_tag(id: Uuid) -> String {
20 format!("task_ref:{id}")
21}
22
23pub fn handoff_ref_tag(id: Uuid) -> String {
24 format!("handoff_ref:{id}")
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum TaskStatus {
30 Open,
31 Claimed,
32 Completed,
33}
34
35impl TaskStatus {
36 pub fn as_str(&self) -> &'static str {
37 match self {
38 Self::Open => "open",
39 Self::Claimed => "claimed",
40 Self::Completed => "completed",
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub struct TaskSummary {
47 pub id: Uuid,
48 pub status: TaskStatus,
49 pub content: String,
50 pub created_by: String,
51 pub claimed_by: Option<String>,
52 pub created_at: DateTime<Utc>,
53 pub updated_at: DateTime<Utc>,
54 pub source_event_ids: Vec<Uuid>,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum HandoffStatus {
60 Pending,
61 Accepted,
62 Completed,
63}
64
65impl HandoffStatus {
66 pub fn as_str(&self) -> &'static str {
67 match self {
68 Self::Pending => "pending",
69 Self::Accepted => "accepted",
70 Self::Completed => "completed",
71 }
72 }
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
76pub struct HandoffSummary {
77 pub id: Uuid,
78 pub status: HandoffStatus,
79 pub content: String,
80 pub from_agent: String,
81 pub to_agent: String,
82 pub accepted_by: Option<String>,
83 pub created_at: DateTime<Utc>,
84 pub updated_at: DateTime<Utc>,
85 pub source_event_ids: Vec<Uuid>,
86}
87
88pub fn new_task(workspace_id: String, agent: String, session_id: String, content: String) -> Event {
89 Event::new(NewEvent {
90 event_type: EventType::Task,
91 workspace_id,
92 repo_id: None,
93 repo_path: None,
94 git_remote: None,
95 bit_repo_id: None,
96 branch: None,
97 commit: None,
98 repo_dirty: None,
99 agent,
100 session_id,
101 title: Some("task".to_owned()),
102 content,
103 tags: vec![TAG_OPEN.to_owned()],
104 metadata_json: json!({ "action": "created", "status": "open" }),
105 })
106}
107
108pub fn new_claim(workspace_id: String, agent: String, session_id: String, task_id: Uuid) -> Event {
109 Event::new(NewEvent {
110 event_type: EventType::Task,
111 workspace_id,
112 repo_id: None,
113 repo_path: None,
114 git_remote: None,
115 bit_repo_id: None,
116 branch: None,
117 commit: None,
118 repo_dirty: None,
119 agent: agent.clone(),
120 session_id,
121 title: Some("task claim".to_owned()),
122 content: format!("claimed task {task_id}"),
123 tags: vec![
124 TAG_CLAIMED.to_owned(),
125 claim_tag(&agent),
126 task_ref_tag(task_id),
127 ],
128 metadata_json: json!({
129 "action": "claimed",
130 "status": "claimed",
131 "task_id": task_id,
132 "claimed_by": agent,
133 }),
134 })
135}
136
137pub fn new_task_update(
138 workspace_id: String,
139 agent: String,
140 session_id: String,
141 task_id: Uuid,
142 content: String,
143) -> Event {
144 Event::new(NewEvent {
145 event_type: EventType::Task,
146 workspace_id,
147 repo_id: None,
148 repo_path: None,
149 git_remote: None,
150 bit_repo_id: None,
151 branch: None,
152 commit: None,
153 repo_dirty: None,
154 agent,
155 session_id,
156 title: Some("task update".to_owned()),
157 content,
158 tags: vec![task_ref_tag(task_id)],
159 metadata_json: json!({ "action": "updated", "task_id": task_id }),
160 })
161}
162
163pub fn new_task_done(
164 workspace_id: String,
165 agent: String,
166 session_id: String,
167 task_id: Uuid,
168) -> Event {
169 Event::new(NewEvent {
170 event_type: EventType::Task,
171 workspace_id,
172 repo_id: None,
173 repo_path: None,
174 git_remote: None,
175 bit_repo_id: None,
176 branch: None,
177 commit: None,
178 repo_dirty: None,
179 agent,
180 session_id,
181 title: Some("task completed".to_owned()),
182 content: format!("completed task {task_id}"),
183 tags: vec![TAG_DONE.to_owned(), task_ref_tag(task_id)],
184 metadata_json: json!({
185 "action": "completed",
186 "status": "completed",
187 "task_id": task_id,
188 }),
189 })
190}
191
192pub fn new_handoff(
193 workspace_id: String,
194 from_agent: String,
195 session_id: String,
196 to_agent: String,
197 content: String,
198) -> Event {
199 Event::new(NewEvent {
200 event_type: EventType::Handoff,
201 workspace_id,
202 repo_id: None,
203 repo_path: None,
204 git_remote: None,
205 bit_repo_id: None,
206 branch: None,
207 commit: None,
208 repo_dirty: None,
209 agent: from_agent.clone(),
210 session_id,
211 title: Some("handoff".to_owned()),
212 content,
213 tags: vec![TAG_HANDOFF_PENDING.to_owned()],
214 metadata_json: json!({
215 "action": "requested",
216 "status": "pending",
217 "from": from_agent,
218 "to": to_agent,
219 }),
220 })
221}
222
223pub fn new_handoff_accept(
224 workspace_id: String,
225 agent: String,
226 session_id: String,
227 handoff_id: Uuid,
228) -> Event {
229 Event::new(NewEvent {
230 event_type: EventType::Handoff,
231 workspace_id,
232 repo_id: None,
233 repo_path: None,
234 git_remote: None,
235 bit_repo_id: None,
236 branch: None,
237 commit: None,
238 repo_dirty: None,
239 agent: agent.clone(),
240 session_id,
241 title: Some("handoff accepted".to_owned()),
242 content: format!("accepted handoff {handoff_id}"),
243 tags: vec![
244 TAG_HANDOFF_ACCEPTED.to_owned(),
245 handoff_ref_tag(handoff_id),
246 claim_tag(&agent),
247 ],
248 metadata_json: json!({
249 "action": "accepted",
250 "status": "accepted",
251 "handoff_id": handoff_id,
252 "accepted_by": agent,
253 }),
254 })
255}
256
257pub fn new_handoff_done(
258 workspace_id: String,
259 agent: String,
260 session_id: String,
261 handoff_id: Uuid,
262) -> Event {
263 Event::new(NewEvent {
264 event_type: EventType::Handoff,
265 workspace_id,
266 repo_id: None,
267 repo_path: None,
268 git_remote: None,
269 bit_repo_id: None,
270 branch: None,
271 commit: None,
272 repo_dirty: None,
273 agent,
274 session_id,
275 title: Some("handoff completed".to_owned()),
276 content: format!("completed handoff {handoff_id}"),
277 tags: vec![TAG_HANDOFF_DONE.to_owned(), handoff_ref_tag(handoff_id)],
278 metadata_json: json!({
279 "action": "completed",
280 "status": "completed",
281 "handoff_id": handoff_id,
282 }),
283 })
284}
285
286pub async fn list(store: &impl EventStore) -> Result<Vec<TaskSummary>> {
287 tasks(store, None, None).await
288}
289
290pub async fn tasks(
291 store: &impl EventStore,
292 workspace_id: Option<&str>,
293 limit: Option<u32>,
294) -> Result<Vec<TaskSummary>> {
295 let events = store
296 .list(EventFilter {
297 event_type: Some(EventType::Task),
298 workspace_id: workspace_id.map(ToOwned::to_owned),
299 limit: Some(u32::MAX),
300 ..EventFilter::default()
301 })
302 .await?;
303 let mut tasks = project_tasks(events);
304 tasks.sort_by(|left, right| {
305 right
306 .updated_at
307 .cmp(&left.updated_at)
308 .then(left.id.cmp(&right.id))
309 });
310 if let Some(limit) = limit {
311 tasks.truncate(limit as usize);
312 }
313 Ok(tasks)
314}
315
316pub async fn open_tasks(
317 store: &impl EventStore,
318 workspace_id: &str,
319 limit: Option<u32>,
320) -> Result<Vec<TaskSummary>> {
321 let mut tasks = tasks(store, Some(workspace_id), None).await?;
322 tasks.retain(|task| task.status == TaskStatus::Open);
323 tasks.truncate(limit.unwrap_or(20) as usize);
324 Ok(tasks)
325}
326
327pub async fn ensure_task_exists(
328 store: &impl EventStore,
329 workspace_id: &str,
330 task_id: Uuid,
331) -> Result<()> {
332 if tasks(store, Some(workspace_id), None)
333 .await?
334 .iter()
335 .any(|task| task.id == task_id)
336 {
337 Ok(())
338 } else {
339 Err(ShuttleError::Store(format!("unknown task id: {task_id}")))
340 }
341}
342
343pub async fn claimed_tasks(
344 store: &impl EventStore,
345 workspace_id: &str,
346 limit: Option<u32>,
347) -> Result<Vec<TaskSummary>> {
348 let mut tasks = tasks(store, Some(workspace_id), None).await?;
349 tasks.retain(|task| task.status == TaskStatus::Claimed);
350 tasks.truncate(limit.unwrap_or(20) as usize);
351 Ok(tasks)
352}
353
354pub async fn handoffs(
355 store: &impl EventStore,
356 workspace_id: Option<&str>,
357 limit: Option<u32>,
358) -> Result<Vec<HandoffSummary>> {
359 let events = store
360 .list(EventFilter {
361 event_type: Some(EventType::Handoff),
362 workspace_id: workspace_id.map(ToOwned::to_owned),
363 limit: Some(u32::MAX),
364 ..EventFilter::default()
365 })
366 .await?;
367 let mut handoffs = project_handoffs(events);
368 handoffs.sort_by(|left, right| {
369 right
370 .updated_at
371 .cmp(&left.updated_at)
372 .then(left.id.cmp(&right.id))
373 });
374 if let Some(limit) = limit {
375 handoffs.truncate(limit as usize);
376 }
377 Ok(handoffs)
378}
379
380pub async fn pending_handoffs(
381 store: &impl EventStore,
382 workspace_id: &str,
383 limit: Option<u32>,
384) -> Result<Vec<HandoffSummary>> {
385 let mut handoffs = handoffs(store, Some(workspace_id), None).await?;
386 handoffs.retain(|handoff| handoff.status == HandoffStatus::Pending);
387 handoffs.truncate(limit.unwrap_or(20) as usize);
388 Ok(handoffs)
389}
390
391pub async fn completed_handoffs(
392 store: &impl EventStore,
393 workspace_id: &str,
394 limit: Option<u32>,
395) -> Result<Vec<HandoffSummary>> {
396 let mut handoffs = handoffs(store, Some(workspace_id), None).await?;
397 handoffs.retain(|handoff| handoff.status == HandoffStatus::Completed);
398 handoffs.truncate(limit.unwrap_or(20) as usize);
399 Ok(handoffs)
400}
401
402pub async fn ensure_handoff_exists(
403 store: &impl EventStore,
404 workspace_id: &str,
405 handoff_id: Uuid,
406) -> Result<()> {
407 if handoffs(store, Some(workspace_id), None)
408 .await?
409 .iter()
410 .any(|handoff| handoff.id == handoff_id)
411 {
412 Ok(())
413 } else {
414 Err(ShuttleError::Store(format!(
415 "unknown handoff id: {handoff_id}"
416 )))
417 }
418}
419
420fn project_tasks(events: Vec<Event>) -> Vec<TaskSummary> {
421 let mut events = events;
422 events.sort_by(|left, right| {
423 left.created_at
424 .cmp(&right.created_at)
425 .then(left.id.cmp(&right.id))
426 });
427 let mut tasks: HashMap<Uuid, TaskSummary> = HashMap::new();
428 for event in events {
429 let action = action(&event);
430 let task_id = referenced_id(&event, "task_id", "task_ref").unwrap_or(event.id);
431 match action.as_deref() {
432 Some("claimed") => {
433 if let Some(task) = tasks.get_mut(&task_id) {
434 task.status = TaskStatus::Claimed;
435 task.claimed_by = Some(
436 string_metadata(&event.metadata_json, "claimed_by")
437 .unwrap_or_else(|| event.agent.clone()),
438 );
439 task.updated_at = event.created_at;
440 task.source_event_ids.push(event.id);
441 }
442 }
443 Some("updated") => {
444 if let Some(task) = tasks.get_mut(&task_id) {
445 task.content = event.content.clone();
446 task.updated_at = event.created_at;
447 task.source_event_ids.push(event.id);
448 }
449 }
450 Some("completed") => {
451 if let Some(task) = tasks.get_mut(&task_id) {
452 task.status = TaskStatus::Completed;
453 task.updated_at = event.created_at;
454 task.source_event_ids.push(event.id);
455 }
456 }
457 _ if event.tags.iter().any(|tag| tag == TAG_OPEN) => {
458 tasks.entry(task_id).or_insert_with(|| TaskSummary {
459 id: task_id,
460 status: TaskStatus::Open,
461 content: event.content.clone(),
462 created_by: event.agent.clone(),
463 claimed_by: None,
464 created_at: event.created_at,
465 updated_at: event.created_at,
466 source_event_ids: vec![event.id],
467 });
468 }
469 _ if event.tags.iter().any(|tag| tag == TAG_CLAIMED) => {
470 if let Some(task) = tasks.get_mut(&task_id) {
471 task.status = TaskStatus::Claimed;
472 task.claimed_by = Some(event.agent.clone());
473 task.updated_at = event.created_at;
474 task.source_event_ids.push(event.id);
475 }
476 }
477 _ => {}
478 }
479 }
480 tasks.into_values().collect()
481}
482
483fn project_handoffs(events: Vec<Event>) -> Vec<HandoffSummary> {
484 let mut events = events;
485 events.sort_by(|left, right| {
486 left.created_at
487 .cmp(&right.created_at)
488 .then(left.id.cmp(&right.id))
489 });
490 let mut handoffs: HashMap<Uuid, HandoffSummary> = HashMap::new();
491 for event in events {
492 let action = action(&event);
493 let handoff_id = referenced_id(&event, "handoff_id", "handoff_ref").unwrap_or(event.id);
494 match action.as_deref() {
495 Some("accepted") => {
496 if let Some(handoff) = handoffs.get_mut(&handoff_id) {
497 handoff.status = HandoffStatus::Accepted;
498 handoff.accepted_by = Some(
499 string_metadata(&event.metadata_json, "accepted_by")
500 .unwrap_or_else(|| event.agent.clone()),
501 );
502 handoff.updated_at = event.created_at;
503 handoff.source_event_ids.push(event.id);
504 }
505 }
506 Some("completed") => {
507 if let Some(handoff) = handoffs.get_mut(&handoff_id) {
508 handoff.status = HandoffStatus::Completed;
509 handoff.updated_at = event.created_at;
510 handoff.source_event_ids.push(event.id);
511 }
512 }
513 _ => {
514 let to_agent = string_metadata(&event.metadata_json, "to");
515 if let Some(to_agent) = to_agent {
516 handoffs
517 .entry(handoff_id)
518 .or_insert_with(|| HandoffSummary {
519 id: handoff_id,
520 status: HandoffStatus::Pending,
521 content: event.content.clone(),
522 from_agent: string_metadata(&event.metadata_json, "from")
523 .unwrap_or_else(|| event.agent.clone()),
524 to_agent,
525 accepted_by: None,
526 created_at: event.created_at,
527 updated_at: event.created_at,
528 source_event_ids: vec![event.id],
529 });
530 }
531 }
532 }
533 }
534 handoffs.into_values().collect()
535}
536
537fn action(event: &Event) -> Option<String> {
538 string_metadata(&event.metadata_json, "action")
539}
540
541fn string_metadata(metadata: &Value, key: &str) -> Option<String> {
542 metadata
543 .get(key)
544 .and_then(Value::as_str)
545 .map(ToOwned::to_owned)
546}
547
548fn referenced_id(event: &Event, metadata_key: &str, tag_prefix: &str) -> Option<Uuid> {
549 event
550 .metadata_json
551 .get(metadata_key)
552 .and_then(Value::as_str)
553 .and_then(|value| Uuid::parse_str(value).ok())
554 .or_else(|| {
555 event
556 .tags
557 .iter()
558 .filter_map(|tag| tag.strip_prefix(&format!("{tag_prefix}:")))
559 .find_map(|id| Uuid::parse_str(id).ok())
560 })
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use crate::core::EventStore;
567 use crate::store::SqliteEventStore;
568
569 #[test]
570 fn task_create_and_claim_use_event_tags() {
571 let task = new_task(
572 "workspace".into(),
573 "codex".into(),
574 "session".into(),
575 "ship mvp".into(),
576 );
577 assert_eq!(task.event_type, EventType::Task);
578 assert_eq!(task.tags, vec![TAG_OPEN]);
579
580 let claim = new_claim(
581 "workspace".into(),
582 "codex".into(),
583 "session".into(),
584 task.id,
585 );
586 assert!(claim.tags.contains(&TAG_CLAIMED.to_owned()));
587 assert!(claim.tags.contains(&"claimed_by:codex".to_owned()));
588 assert!(claim.tags.contains(&format!("task_ref:{}", task.id)));
589 }
590
591 #[test]
592 fn open_tasks_excludes_claimed_tasks() {
593 let dir = tempfile::tempdir().unwrap();
594 let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
595 let first = new_task(
596 "workspace".into(),
597 "codex".into(),
598 "session".into(),
599 "ship first".into(),
600 );
601 let second = new_task(
602 "workspace".into(),
603 "codex".into(),
604 "session".into(),
605 "ship second".into(),
606 );
607 let claim = new_claim(
608 "workspace".into(),
609 "claude".into(),
610 "session".into(),
611 first.id,
612 );
613
614 futures_executor::block_on(store.append(first)).unwrap();
615 futures_executor::block_on(store.append(second)).unwrap();
616 futures_executor::block_on(store.append(claim)).unwrap();
617
618 let tasks = futures_executor::block_on(open_tasks(&store, "workspace", None)).unwrap();
619
620 assert_eq!(tasks.len(), 1);
621 assert_eq!(tasks[0].content, "ship second");
622 }
623
624 #[test]
625 fn open_tasks_considers_claims_beyond_default_projection_window() {
626 let dir = tempfile::tempdir().unwrap();
627 let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
628 let claimed_task = new_task(
629 "workspace".into(),
630 "codex".into(),
631 "session".into(),
632 "claimed".into(),
633 );
634 let open_task = new_task(
635 "workspace".into(),
636 "codex".into(),
637 "session".into(),
638 "still open".into(),
639 );
640 let old_claim = new_claim(
641 "workspace".into(),
642 "claude".into(),
643 "session".into(),
644 claimed_task.id,
645 );
646 futures_executor::block_on(store.append(claimed_task)).unwrap();
647 futures_executor::block_on(store.append(open_task)).unwrap();
648 futures_executor::block_on(store.append(old_claim)).unwrap();
649
650 for _ in 0..500 {
651 let task = new_task(
652 "workspace".into(),
653 "codex".into(),
654 "session".into(),
655 "noise".into(),
656 );
657 let claim = new_claim(
658 "workspace".into(),
659 "claude".into(),
660 "session".into(),
661 task.id,
662 );
663 futures_executor::block_on(store.append(task)).unwrap();
664 futures_executor::block_on(store.append(claim)).unwrap();
665 }
666
667 let tasks = futures_executor::block_on(open_tasks(&store, "workspace", None)).unwrap();
668
669 assert_eq!(tasks.len(), 1);
670 assert_eq!(tasks[0].content, "still open");
671 }
672
673 #[test]
674 fn task_projection_tracks_update_claim_and_completion() {
675 let dir = tempfile::tempdir().unwrap();
676 let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
677 let task = new_task(
678 "workspace".into(),
679 "codex".into(),
680 "session".into(),
681 "first description".into(),
682 );
683 let update = new_task_update(
684 "workspace".into(),
685 "codex".into(),
686 "session".into(),
687 task.id,
688 "latest description".into(),
689 );
690 let claim = new_claim(
691 "workspace".into(),
692 "claude".into(),
693 "session".into(),
694 task.id,
695 );
696 let done = new_task_done(
697 "workspace".into(),
698 "claude".into(),
699 "session".into(),
700 task.id,
701 );
702
703 futures_executor::block_on(store.append(task)).unwrap();
704 futures_executor::block_on(store.append(update)).unwrap();
705 futures_executor::block_on(store.append(claim)).unwrap();
706 futures_executor::block_on(store.append(done)).unwrap();
707
708 let tasks = futures_executor::block_on(tasks(&store, Some("workspace"), None)).unwrap();
709 let open = futures_executor::block_on(open_tasks(&store, "workspace", None)).unwrap();
710
711 assert_eq!(tasks.len(), 1);
712 assert_eq!(tasks[0].status, TaskStatus::Completed);
713 assert_eq!(tasks[0].content, "latest description");
714 assert_eq!(tasks[0].claimed_by.as_deref(), Some("claude"));
715 assert!(open.is_empty());
716 }
717
718 #[test]
719 fn handoff_projection_tracks_accept_and_completion() {
720 let dir = tempfile::tempdir().unwrap();
721 let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
722 let handoff = new_handoff(
723 "workspace".into(),
724 "codex".into(),
725 "session".into(),
726 "claude".into(),
727 "continue this branch".into(),
728 );
729 let accept = new_handoff_accept(
730 "workspace".into(),
731 "claude".into(),
732 "session".into(),
733 handoff.id,
734 );
735 let done = new_handoff_done(
736 "workspace".into(),
737 "claude".into(),
738 "session".into(),
739 handoff.id,
740 );
741
742 futures_executor::block_on(store.append(handoff)).unwrap();
743 let pending =
744 futures_executor::block_on(pending_handoffs(&store, "workspace", None)).unwrap();
745 assert_eq!(pending.len(), 1);
746 assert_eq!(pending[0].to_agent, "claude");
747
748 futures_executor::block_on(store.append(accept)).unwrap();
749 futures_executor::block_on(store.append(done)).unwrap();
750
751 let handoffs =
752 futures_executor::block_on(handoffs(&store, Some("workspace"), None)).unwrap();
753 let completed =
754 futures_executor::block_on(completed_handoffs(&store, "workspace", None)).unwrap();
755
756 assert_eq!(handoffs.len(), 1);
757 assert_eq!(handoffs[0].status, HandoffStatus::Completed);
758 assert_eq!(handoffs[0].accepted_by.as_deref(), Some("claude"));
759 assert_eq!(completed.len(), 1);
760 }
761}