use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Mutex as AsyncMutex;
use turul_a2a_types::{Artifact, Message, TaskState, TaskStatus};
use crate::error::A2aError;
use crate::server::in_flight::InFlightHandle;
use crate::storage::{A2aAtomicStore, A2aStorageError, A2aTaskStorage};
use crate::streaming::{ArtifactUpdatePayload, StatusUpdatePayload, StreamEvent, TaskEventBroker};
#[derive(Clone)]
pub struct EventSink {
inner: Option<Arc<EventSinkInner>>,
}
struct EventSinkInner {
tenant: String,
task_id: String,
context_id: String,
owner: String,
atomic_store: Arc<dyn A2aAtomicStore>,
task_storage: Arc<dyn A2aTaskStorage>,
event_broker: TaskEventBroker,
handle: Arc<InFlightHandle>,
is_closed: AtomicBool,
commit_lock: AsyncMutex<()>,
push_dispatcher: Option<Arc<crate::push::PushDispatcher>>,
}
impl EventSink {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
tenant: String,
task_id: String,
context_id: String,
owner: String,
atomic_store: Arc<dyn A2aAtomicStore>,
task_storage: Arc<dyn A2aTaskStorage>,
event_broker: TaskEventBroker,
handle: Arc<InFlightHandle>,
push_dispatcher: Option<Arc<crate::push::PushDispatcher>>,
) -> Self {
Self {
inner: Some(Arc::new(EventSinkInner {
tenant,
task_id,
context_id,
owner,
atomic_store,
task_storage,
event_broker,
handle,
is_closed: AtomicBool::new(false),
commit_lock: AsyncMutex::new(()),
push_dispatcher,
})),
}
}
pub fn detached() -> Self {
Self { inner: None }
}
pub fn is_closed(&self) -> bool {
match &self.inner {
Some(inner) => inner.is_closed.load(Ordering::Acquire),
None => true,
}
}
pub async fn set_status(
&self,
state: TaskState,
message: Option<Message>,
) -> Result<u64, A2aError> {
if turul_a2a_types::state_machine::is_terminal(state) {
return Err(A2aError::InvalidRequest {
message: format!(
"set_status does not accept terminal state {state:?}; use complete/fail/cancelled/reject"
),
});
}
self.inner()?.commit_status(state, message).await
}
pub async fn emit_artifact(
&self,
artifact: Artifact,
append: bool,
last_chunk: bool,
) -> Result<u64, A2aError> {
self.inner()?
.commit_artifact(artifact, append, last_chunk)
.await
}
pub async fn complete(&self, final_message: Option<Message>) -> Result<u64, A2aError> {
self.inner()?
.commit_status(TaskState::Completed, final_message)
.await
}
pub async fn fail(&self, reason: Option<String>) -> Result<u64, A2aError> {
let message = reason.map(text_message_from_agent);
self.inner()?
.commit_status(TaskState::Failed, message)
.await
}
pub async fn cancelled(&self, reason: Option<String>) -> Result<u64, A2aError> {
let message = reason.map(text_message_from_agent);
self.inner()?
.commit_status(TaskState::Canceled, message)
.await
}
pub async fn reject(&self, reason: Option<String>) -> Result<u64, A2aError> {
let message = reason.map(text_message_from_agent);
self.inner()?
.commit_status(TaskState::Rejected, message)
.await
}
pub async fn require_input(&self, prompt: Option<Message>) -> Result<u64, A2aError> {
self.inner()?
.commit_status(TaskState::InputRequired, prompt)
.await
}
pub async fn require_auth(&self, challenge: Option<Message>) -> Result<u64, A2aError> {
self.inner()?
.commit_status(TaskState::AuthRequired, challenge)
.await
}
pub(crate) async fn commit_state_internal(
&self,
state: TaskState,
message: Option<Message>,
) -> Result<u64, A2aError> {
self.inner()?.commit_status(state, message).await
}
fn inner(&self) -> Result<&Arc<EventSinkInner>, A2aError> {
self.inner.as_ref().ok_or_else(|| {
A2aError::Internal(
"EventSink is detached (no framework runtime); \
construct via A2aServer or LambdaA2aHandler"
.into(),
)
})
}
}
impl std::fmt::Debug for EventSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.inner {
Some(inner) => f
.debug_struct("EventSink")
.field("tenant", &inner.tenant)
.field("task_id", &inner.task_id)
.field("is_closed", &inner.is_closed.load(Ordering::Relaxed))
.finish(),
None => f.write_str("EventSink(detached)"),
}
}
}
impl EventSinkInner {
async fn commit_status(
&self,
state: TaskState,
message: Option<Message>,
) -> Result<u64, A2aError> {
let _guard = self.commit_lock.lock().await;
if self.is_closed.load(Ordering::Acquire) {
return Err(A2aError::InvalidRequest {
message: "EventSink is closed".into(),
});
}
let mut status = TaskStatus::new(state);
if let Some(m) = message {
status = status.with_message(m);
}
let event = StreamEvent::StatusUpdate {
status_update: StatusUpdatePayload {
task_id: self.task_id.clone(),
context_id: self.context_id.clone(),
status: serde_json::to_value(&status).unwrap_or_default(),
},
};
let result = self
.atomic_store
.update_task_status_with_events(
&self.tenant,
&self.task_id,
&self.owner,
status,
vec![event],
)
.await;
match result {
Ok((task, seqs)) => {
let seq = seqs.first().copied().unwrap_or(0);
let is_terminal = turul_a2a_types::state_machine::is_terminal(state);
let is_interrupted =
matches!(state, TaskState::InputRequired | TaskState::AuthRequired);
if let Some(dispatcher) = &self.push_dispatcher {
let ev = StreamEvent::StatusUpdate {
status_update: StatusUpdatePayload {
task_id: self.task_id.clone(),
context_id: self.context_id.clone(),
status: serde_json::to_value(turul_a2a_types::TaskStatus::new(state))
.unwrap_or_default(),
},
};
dispatcher.dispatch(
self.tenant.clone(),
self.owner.clone(),
task.clone(),
vec![(seq, ev)],
);
}
if is_terminal {
self.is_closed.store(true, Ordering::Release);
}
if is_terminal || is_interrupted {
self.handle.fire_yielded(task);
}
self.event_broker.notify(&self.task_id).await;
Ok(seq)
}
Err(A2aStorageError::TerminalStateAlreadySet { current_state, .. }) => {
self.is_closed.store(true, Ordering::Release);
Err(A2aError::InvalidRequest {
message: format!("EventSink is closed: terminal already set ({current_state})"),
})
}
Err(other) => Err(A2aError::from(other)),
}
}
async fn commit_artifact(
&self,
artifact: Artifact,
append: bool,
last_chunk: bool,
) -> Result<u64, A2aError> {
let _guard = self.commit_lock.lock().await;
if self.is_closed.load(Ordering::Acquire) {
return Err(A2aError::InvalidRequest {
message: "EventSink is closed".into(),
});
}
let mut task = self
.task_storage
.get_task(&self.tenant, &self.task_id, &self.owner, None)
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: self.task_id.clone(),
})?;
task.merge_artifact(artifact.clone(), append, last_chunk);
let event = StreamEvent::ArtifactUpdate {
artifact_update: ArtifactUpdatePayload {
task_id: self.task_id.clone(),
context_id: self.context_id.clone(),
artifact: serde_json::to_value(&artifact).unwrap_or_default(),
append,
last_chunk,
},
};
let result = self
.atomic_store
.update_task_with_events(&self.tenant, &self.owner, task, vec![event])
.await;
match result {
Ok(seqs) => {
let seq = seqs.first().copied().unwrap_or(0);
self.event_broker.notify(&self.task_id).await;
Ok(seq)
}
Err(A2aStorageError::TerminalStateAlreadySet { current_state, .. }) => {
self.is_closed.store(true, Ordering::Release);
Err(A2aError::InvalidRequest {
message: format!("EventSink is closed: terminal already set ({current_state})"),
})
}
Err(other) => Err(A2aError::from(other)),
}
}
}
fn text_message_from_agent(reason: String) -> Message {
Message::new(
uuid::Uuid::now_v7().to_string(),
turul_a2a_types::Role::Agent,
vec![turul_a2a_types::Part::text(reason)],
)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn detached_sink_is_closed_by_default() {
let sink = EventSink::detached();
assert!(sink.is_closed(), "detached sink reports closed");
}
#[tokio::test]
async fn detached_sink_status_emit_returns_internal_error() {
let sink = EventSink::detached();
let err = sink
.set_status(TaskState::Working, None)
.await
.expect_err("detached sink must not accept emits");
assert!(matches!(err, A2aError::Internal(_)), "got {err:?}");
}
#[tokio::test]
async fn detached_sink_terminal_emit_returns_internal_error() {
let sink = EventSink::detached();
assert!(matches!(
sink.complete(None).await,
Err(A2aError::Internal(_))
));
assert!(matches!(
sink.fail(Some("boom".into())).await,
Err(A2aError::Internal(_))
));
assert!(matches!(
sink.cancelled(None).await,
Err(A2aError::Internal(_))
));
assert!(matches!(
sink.reject(Some("policy".into())).await,
Err(A2aError::Internal(_))
));
}
#[tokio::test]
async fn set_status_rejects_terminal_states() {
let sink = EventSink::detached();
for terminal in [
TaskState::Completed,
TaskState::Failed,
TaskState::Canceled,
TaskState::Rejected,
] {
let err = sink
.set_status(terminal, None)
.await
.expect_err("terminal via set_status must be rejected");
match err {
A2aError::InvalidRequest { message } => {
assert!(
message.contains("does not accept terminal"),
"msg: {message}"
);
}
other => panic!("expected InvalidRequest, got {other:?}"),
}
}
}
#[test]
fn text_message_from_agent_carries_reason() {
let msg = text_message_from_agent("scheduled retry".into());
assert_eq!(msg.text_parts(), vec!["scheduled retry"]);
assert_eq!(msg.as_proto().role, i32::from(turul_a2a_proto::Role::Agent));
}
use crate::server::in_flight::InFlightHandle;
use crate::storage::InMemoryA2aStorage;
use turul_a2a_types::{Artifact, Part, Task};
async fn harness() -> (
EventSink,
Arc<InMemoryA2aStorage>,
Arc<InFlightHandle>,
tokio::sync::oneshot::Receiver<Task>,
TaskEventBroker,
) {
let storage = Arc::new(InMemoryA2aStorage::new());
let broker = TaskEventBroker::new();
let task = Task::new("task-sink-1", TaskStatus::new(TaskState::Submitted))
.with_context_id("ctx-sink-1");
storage
.create_task_with_events("t-1", "owner-1", task, vec![])
.await
.expect("seed task");
storage
.update_task_status_with_events(
"t-1",
"task-sink-1",
"owner-1",
TaskStatus::new(TaskState::Working),
vec![],
)
.await
.expect("advance to WORKING");
let (yielded_tx, yielded_rx) = tokio::sync::oneshot::channel::<Task>();
let noop_join = tokio::spawn(async {});
let handle = Arc::new(InFlightHandle::new(
tokio_util::sync::CancellationToken::new(),
yielded_tx,
noop_join,
));
let task_storage: Arc<dyn crate::storage::A2aTaskStorage> = storage.clone();
let atomic_store: Arc<dyn A2aAtomicStore> = storage.clone();
let sink = EventSink::new(
"t-1".into(),
"task-sink-1".into(),
"ctx-sink-1".into(),
"owner-1".into(),
atomic_store,
task_storage,
broker.clone(),
handle.clone(),
None,
);
(sink, storage, handle, yielded_rx, broker)
}
#[tokio::test]
async fn live_sink_complete_closes_sink_fires_yielded_and_notifies_broker() {
let (sink, storage, handle, yielded_rx, broker) = harness().await;
let mut wake_rx = broker.subscribe("task-sink-1").await;
assert!(!sink.is_closed(), "pre-emit sink is open");
assert!(!handle.yielded_fired(), "yielded not fired yet");
let seq = sink
.complete(Some(Message::new(
"m-done",
turul_a2a_types::Role::Agent,
vec![Part::text("ok")],
)))
.await
.expect("complete must succeed against fresh WORKING task");
assert!(seq > 0, "terminal commit returned a sequence number");
assert!(sink.is_closed(), "complete closes the sink");
assert!(handle.yielded_fired(), "yielded fires on terminal commit");
let yielded_task = tokio::time::timeout(std::time::Duration::from_millis(200), yielded_rx)
.await
.expect("yielded fires promptly")
.expect("yielded sender not dropped");
assert_eq!(
yielded_task.status().unwrap().state().unwrap(),
TaskState::Completed,
"yielded carries the persisted terminal"
);
tokio::time::timeout(std::time::Duration::from_millis(200), wake_rx.recv())
.await
.expect("broker notify should arrive after commit")
.expect("broker channel not closed");
let persisted = storage
.get_task("t-1", "task-sink-1", "owner-1", None)
.await
.unwrap()
.unwrap();
assert_eq!(
persisted.status().unwrap().state().unwrap(),
TaskState::Completed
);
}
#[tokio::test]
async fn live_sink_emit_after_terminal_is_rejected_in_memory() {
let (sink, _storage, _handle, _yielded_rx, _broker) = harness().await;
sink.complete(None).await.expect("first terminal succeeds");
let err = sink
.emit_artifact(Artifact::new("a-1", vec![Part::text("late")]), false, true)
.await
.expect_err("emits after terminal must fail");
match err {
A2aError::InvalidRequest { message } => {
assert!(message.contains("EventSink is closed"), "msg: {message}");
}
other => panic!("expected InvalidRequest, got {other:?}"),
}
let err = sink
.set_status(TaskState::Working, None)
.await
.expect_err("set_status after terminal must fail");
assert!(matches!(err, A2aError::InvalidRequest { .. }));
}
#[tokio::test]
async fn live_sink_terminal_cas_loss_closes_sink_without_firing_yielded() {
let (sink, storage, handle, mut yielded_rx, _broker) = harness().await;
storage
.update_task_status_with_events(
"t-1",
"task-sink-1",
"owner-1",
TaskStatus::new(TaskState::Canceled),
vec![],
)
.await
.expect("framework cancel commit wins");
assert!(!handle.yielded_fired(), "pre-sink yielded not fired");
let err = sink
.complete(None)
.await
.expect_err("losing-CAS terminal must not Ok()");
match err {
A2aError::InvalidRequest { message } => {
assert!(message.contains("terminal already set"), "msg: {message}");
assert!(
message.contains("TASK_STATE_CANCELED"),
"error reports persisted terminal: {message}"
);
}
other => panic!("expected InvalidRequest, got {other:?}"),
}
assert!(sink.is_closed(), "CAS loss closes the sink");
assert!(
!handle.yielded_fired(),
"yielded must NOT fire on CAS loss — the awaiter is driven by \
the winning writer's own commit hook"
);
match yielded_rx.try_recv() {
Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {}
other => panic!("yielded must remain un-fired on CAS loss, got {other:?}"),
}
}
#[tokio::test]
async fn live_sink_emit_artifact_append_merges_by_id() {
let (sink, storage, _handle, _yielded_rx, _broker) = harness().await;
sink.emit_artifact(
Artifact::new("a-1", vec![Part::text("chunk-1 ")]),
false,
false,
)
.await
.expect("first chunk");
sink.emit_artifact(
Artifact::new("a-1", vec![Part::text("chunk-2")]),
true,
true,
)
.await
.expect("second chunk");
let persisted = storage
.get_task("t-1", "task-sink-1", "owner-1", None)
.await
.unwrap()
.unwrap();
let artifacts = persisted.artifacts();
assert_eq!(artifacts.len(), 1, "append=true must not duplicate by id");
assert_eq!(
artifacts[0].parts.len(),
2,
"append=true extends parts on same-id artifact"
);
}
#[tokio::test]
async fn live_sink_concurrent_artifact_emits_do_not_lose_updates() {
let (sink, storage, _handle, _yielded_rx, _broker) = harness().await;
let sink_a = sink.clone();
let sink_b = sink.clone();
let emit_a = tokio::spawn(async move {
sink_a
.emit_artifact(
Artifact::new("a-A", vec![Part::text("payload-A")]),
false,
true,
)
.await
});
let emit_b = tokio::spawn(async move {
sink_b
.emit_artifact(
Artifact::new("a-B", vec![Part::text("payload-B")]),
false,
true,
)
.await
});
let (res_a, res_b) = tokio::try_join!(emit_a, emit_b).expect("join");
res_a.expect("emit A succeeds");
res_b.expect("emit B succeeds");
let persisted = storage
.get_task("t-1", "task-sink-1", "owner-1", None)
.await
.unwrap()
.unwrap();
let mut ids: Vec<String> = persisted
.artifacts()
.iter()
.map(|a| a.artifact_id.clone())
.collect();
ids.sort();
assert_eq!(
ids,
vec!["a-A".to_string(), "a-B".to_string()],
"both concurrent emits must persist — no lost update"
);
}
#[tokio::test]
async fn live_sink_emit_artifact_after_terminal_is_rejected_by_storage() {
let (_seed_sink, storage, _handle, _yielded_rx, broker) = harness().await;
storage
.update_task_status_with_events(
"t-1",
"task-sink-1",
"owner-1",
TaskStatus::new(TaskState::Canceled),
vec![],
)
.await
.expect("external cancel wins");
let (yielded_tx2, _yielded_rx2) = tokio::sync::oneshot::channel::<Task>();
let noop_join2 = tokio::spawn(async {});
let handle2 = Arc::new(InFlightHandle::new(
tokio_util::sync::CancellationToken::new(),
yielded_tx2,
noop_join2,
));
let task_storage2: Arc<dyn crate::storage::A2aTaskStorage> = storage.clone();
let atomic_store2: Arc<dyn A2aAtomicStore> = storage.clone();
let late_sink = EventSink::new(
"t-1".into(),
"task-sink-1".into(),
"ctx-sink-1".into(),
"owner-1".into(),
atomic_store2,
task_storage2,
broker,
handle2.clone(),
None,
);
assert!(!late_sink.is_closed());
let err = late_sink
.emit_artifact(Artifact::new("a-x", vec![Part::text("late")]), false, true)
.await
.expect_err("artifact emit against terminal task must fail");
match err {
A2aError::InvalidRequest { message } => {
assert!(
message.contains("EventSink is closed: terminal already set"),
"message should be the sink-closed translation: {message}"
);
assert!(
message.contains("TASK_STATE_CANCELED"),
"message should name the persisted terminal in wire form: {message}"
);
}
other => panic!(
"artifact emit on a terminal task must surface as \
InvalidRequest (not the generic TaskNotCancelable), got {other:?}"
),
}
assert!(
late_sink.is_closed(),
"storage-layer terminal-preservation CAS must close the sink"
);
assert!(
!handle2.yielded_fired(),
"artifact emit never fires yielded"
);
}
}