a2a_rs/port/task_manager.rs
1//! Task management port definitions
2
3use async_trait::async_trait;
4
5use crate::{
6 Message,
7 domain::{
8 A2AError, ContextId, ListTasksParams, ListTasksResult, Task, TaskId, TaskIdParams,
9 TaskQueryParams, TaskState, VersionedTask,
10 },
11};
12
13/// Async task lifecycle management: the core CRUD capability over individual tasks.
14///
15/// A handler implements this trait if it can create, read, mutate, and cancel
16/// tasks. Listing/querying across tasks is a separate capability — see
17/// [`AsyncTaskQuery`]. Convenience wrappers that validate request parameters
18/// live on [`AsyncTaskLifecycleExt`], which is blanket-implemented for every
19/// `AsyncTaskLifecycle`.
20#[async_trait]
21pub trait AsyncTaskLifecycle: Send + Sync {
22 /// Create a new task in the given context.
23 async fn create(&self, id: &TaskId, context_id: &ContextId) -> Result<Task, A2AError>;
24
25 /// Get a task by ID with optional history length limit.
26 async fn get(&self, id: &TaskId, history_length: Option<u32>) -> Result<Task, A2AError>;
27
28 /// Update task status, optionally appending a message to history.
29 async fn update_status(
30 &self,
31 id: &TaskId,
32 state: TaskState,
33 message: Option<Message>,
34 ) -> Result<Task, A2AError>;
35
36 /// Cancel a task.
37 async fn cancel(&self, id: &TaskId) -> Result<Task, A2AError>;
38
39 /// Check whether a task exists.
40 async fn exists(&self, id: &TaskId) -> Result<bool, A2AError>;
41}
42
43/// Async task querying: listing tasks with filtering and pagination.
44///
45/// Kept distinct from [`AsyncTaskLifecycle`] so a handler that only stores and
46/// mutates individual tasks is not forced to implement cross-task search.
47#[async_trait]
48pub trait AsyncTaskQuery: Send + Sync {
49 /// List tasks with filtering and pagination (A2A v1.0.0 `tasks/list`).
50 async fn list(&self, params: &ListTasksParams) -> Result<ListTasksResult, A2AError>;
51}
52
53/// Optimistic-concurrency control over task mutations.
54///
55/// A distinct capability from [`AsyncTaskLifecycle`] (hex rule 2 — narrow ports):
56/// a store that needs lost-update protection implements this, while the plain
57/// lifecycle path stays version-free for callers that don't. The version is a
58/// monotonic counter the store bumps on **every** successful mutation, including
59/// the unversioned [`AsyncTaskLifecycle`] writes — so the two views never drift.
60///
61/// The classic read-modify-write loop:
62///
63/// ```
64/// # use a2a_rs::{AsyncTaskVersioning, VersionedTask};
65/// # use a2a_rs::domain::{A2AError, Message, TaskId, TaskState};
66/// # async fn read_modify_write(
67/// # store: &impl AsyncTaskVersioning,
68/// # id: TaskId,
69/// # next_state: TaskState,
70/// # msg: Option<Message>,
71/// # ) -> Result<(), A2AError> {
72/// let VersionedTask { task, version } = store.get_versioned(&id, None).await?;
73/// // … decide the next state from `task` …
74/// let _ = &task;
75/// match store.update_status_checked(&id, version, next_state, msg).await {
76/// Ok(updated) => { /* committed at updated.version */ }
77/// Err(A2AError::VersionConflict { .. }) => { /* re-read and retry */ }
78/// Err(e) => return Err(e),
79/// }
80/// # Ok(())
81/// # }
82/// ```
83#[async_trait]
84pub trait AsyncTaskVersioning: Send + Sync {
85 /// Current stored version of a task. Bumped on every successful mutation.
86 async fn version(&self, id: &TaskId) -> Result<u64, A2AError>;
87
88 /// Fetch a task together with its current version (history-limited as in
89 /// [`AsyncTaskLifecycle::get`]).
90 async fn get_versioned(
91 &self,
92 id: &TaskId,
93 history_length: Option<u32>,
94 ) -> Result<VersionedTask, A2AError>;
95
96 /// Update status only if the stored version equals `expected`.
97 ///
98 /// On success returns the mutated task and its newly bumped version. If the
99 /// stored version has advanced past `expected`, fails with
100 /// [`A2AError::VersionConflict`] and leaves the task untouched.
101 async fn update_status_checked(
102 &self,
103 id: &TaskId,
104 expected: u64,
105 state: TaskState,
106 message: Option<Message>,
107 ) -> Result<VersionedTask, A2AError>;
108}
109
110/// Validation conveniences over [`AsyncTaskLifecycle`].
111///
112/// Blanket-implemented for every `AsyncTaskLifecycle`, so implementors get these
113/// for free and only ever stub the core primitives. Constructing a [`TaskId`]
114/// from request parameters performs the empty-string validation, so these
115/// wrappers parse the wire parameters once at the boundary.
116#[async_trait]
117pub trait AsyncTaskLifecycleExt: AsyncTaskLifecycle {
118 /// Validate query parameters, then fetch the task.
119 async fn get_validated(&self, params: &TaskQueryParams) -> Result<Task, A2AError> {
120 let id: TaskId = params.id.parse()?;
121 if let Some(history_length) = params.history_length {
122 if history_length > 1000 {
123 return Err(A2AError::ValidationError {
124 field: "history_length".to_string(),
125 message: "History length cannot exceed 1000".to_string(),
126 });
127 }
128 }
129 self.get(&id, params.history_length).await
130 }
131
132 /// Validate ID parameters, then cancel the task.
133 async fn cancel_validated(&self, params: &TaskIdParams) -> Result<Task, A2AError> {
134 let id: TaskId = params.id.parse()?;
135 self.cancel(&id).await
136 }
137}
138
139impl<T: AsyncTaskLifecycle + ?Sized> AsyncTaskLifecycleExt for T {}