Skip to main content

a2a_rs/port/
task_manager.rs

1//! Task management port definitions
2
3use async_trait::async_trait;
4
5use crate::{
6    Message,
7    domain::{
8        A2AError, ContextId, ListTasksParams, ListTasksResult, Task, TaskId, TaskIdParams,
9        TaskQueryParams, TaskState, VersionedTask,
10    },
11};
12
13/// Async task lifecycle management: the core CRUD capability over individual tasks.
14///
15/// A handler implements this trait if it can create, read, mutate, and cancel
16/// tasks. Listing/querying across tasks is a separate capability — see
17/// [`AsyncTaskQuery`]. Convenience wrappers that validate request parameters
18/// live on [`AsyncTaskLifecycleExt`], which is blanket-implemented for every
19/// `AsyncTaskLifecycle`.
20#[async_trait]
21pub trait AsyncTaskLifecycle: Send + Sync {
22    /// Create a new task in the given context.
23    async fn create(&self, id: &TaskId, context_id: &ContextId) -> Result<Task, A2AError>;
24
25    /// Get a task by ID with optional history length limit.
26    async fn get(&self, id: &TaskId, history_length: Option<u32>) -> Result<Task, A2AError>;
27
28    /// Update task status, optionally appending a message to history.
29    async fn update_status(
30        &self,
31        id: &TaskId,
32        state: TaskState,
33        message: Option<Message>,
34    ) -> Result<Task, A2AError>;
35
36    /// Cancel a task.
37    async fn cancel(&self, id: &TaskId) -> Result<Task, A2AError>;
38
39    /// Check whether a task exists.
40    async fn exists(&self, id: &TaskId) -> Result<bool, A2AError>;
41}
42
43/// Async task querying: listing tasks with filtering and pagination.
44///
45/// Kept distinct from [`AsyncTaskLifecycle`] so a handler that only stores and
46/// mutates individual tasks is not forced to implement cross-task search.
47#[async_trait]
48pub trait AsyncTaskQuery: Send + Sync {
49    /// List tasks with filtering and pagination (A2A v1.0.0 `tasks/list`).
50    async fn list(&self, params: &ListTasksParams) -> Result<ListTasksResult, A2AError>;
51}
52
53/// Optimistic-concurrency control over task mutations.
54///
55/// A distinct capability from [`AsyncTaskLifecycle`] (hex rule 2 — narrow ports):
56/// a store that needs lost-update protection implements this, while the plain
57/// lifecycle path stays version-free for callers that don't. The version is a
58/// monotonic counter the store bumps on **every** successful mutation, including
59/// the unversioned [`AsyncTaskLifecycle`] writes — so the two views never drift.
60///
61/// The classic read-modify-write loop:
62///
63/// ```
64/// # use a2a_rs::{AsyncTaskVersioning, VersionedTask};
65/// # use a2a_rs::domain::{A2AError, Message, TaskId, TaskState};
66/// # async fn read_modify_write(
67/// #     store: &impl AsyncTaskVersioning,
68/// #     id: TaskId,
69/// #     next_state: TaskState,
70/// #     msg: Option<Message>,
71/// # ) -> Result<(), A2AError> {
72/// let VersionedTask { task, version } = store.get_versioned(&id, None).await?;
73/// // … decide the next state from `task` …
74/// let _ = &task;
75/// match store.update_status_checked(&id, version, next_state, msg).await {
76///     Ok(updated) => { /* committed at updated.version */ }
77///     Err(A2AError::VersionConflict { .. }) => { /* re-read and retry */ }
78///     Err(e) => return Err(e),
79/// }
80/// # Ok(())
81/// # }
82/// ```
83#[async_trait]
84pub trait AsyncTaskVersioning: Send + Sync {
85    /// Current stored version of a task. Bumped on every successful mutation.
86    async fn version(&self, id: &TaskId) -> Result<u64, A2AError>;
87
88    /// Fetch a task together with its current version (history-limited as in
89    /// [`AsyncTaskLifecycle::get`]).
90    async fn get_versioned(
91        &self,
92        id: &TaskId,
93        history_length: Option<u32>,
94    ) -> Result<VersionedTask, A2AError>;
95
96    /// Update status only if the stored version equals `expected`.
97    ///
98    /// On success returns the mutated task and its newly bumped version. If the
99    /// stored version has advanced past `expected`, fails with
100    /// [`A2AError::VersionConflict`] and leaves the task untouched.
101    async fn update_status_checked(
102        &self,
103        id: &TaskId,
104        expected: u64,
105        state: TaskState,
106        message: Option<Message>,
107    ) -> Result<VersionedTask, A2AError>;
108}
109
110/// Validation conveniences over [`AsyncTaskLifecycle`].
111///
112/// Blanket-implemented for every `AsyncTaskLifecycle`, so implementors get these
113/// for free and only ever stub the core primitives. Constructing a [`TaskId`]
114/// from request parameters performs the empty-string validation, so these
115/// wrappers parse the wire parameters once at the boundary.
116#[async_trait]
117pub trait AsyncTaskLifecycleExt: AsyncTaskLifecycle {
118    /// Validate query parameters, then fetch the task.
119    async fn get_validated(&self, params: &TaskQueryParams) -> Result<Task, A2AError> {
120        let id: TaskId = params.id.parse()?;
121        if let Some(history_length) = params.history_length {
122            if history_length > 1000 {
123                return Err(A2AError::ValidationError {
124                    field: "history_length".to_string(),
125                    message: "History length cannot exceed 1000".to_string(),
126                });
127            }
128        }
129        self.get(&id, params.history_length).await
130    }
131
132    /// Validate ID parameters, then cancel the task.
133    async fn cancel_validated(&self, params: &TaskIdParams) -> Result<Task, A2AError> {
134        let id: TaskId = params.id.parse()?;
135        self.cancel(&id).await
136    }
137}
138
139impl<T: AsyncTaskLifecycle + ?Sized> AsyncTaskLifecycleExt for T {}