use std::sync::Arc;
use turul_a2a_types::Task;
use url::Url;
use crate::push::claim::ReclaimableClaim;
use crate::push::delivery::{DeliveryReport, PushDeliveryWorker, PushTarget};
use crate::push::secret::Secret;
use crate::storage::{A2aPushNotificationStorage, A2aTaskStorage};
use crate::streaming::StreamEvent;
#[derive(Clone)]
pub struct PushDispatcher {
worker: Arc<PushDeliveryWorker>,
push_storage: Arc<dyn A2aPushNotificationStorage>,
task_storage: Arc<dyn A2aTaskStorage>,
}
impl PushDispatcher {
pub fn new(
worker: Arc<PushDeliveryWorker>,
push_storage: Arc<dyn A2aPushNotificationStorage>,
task_storage: Arc<dyn A2aTaskStorage>,
) -> Self {
Self {
worker,
push_storage,
task_storage,
}
}
pub fn dispatch(
&self,
tenant: String,
owner: String,
task: Task,
events: Vec<(u64, StreamEvent)>,
) {
let terminal_seqs: Vec<u64> = events
.into_iter()
.filter(|(_, e)| dispatch_eligible(e))
.map(|(seq, _)| seq)
.collect();
if terminal_seqs.is_empty() {
return;
}
let task_id = match task.as_proto().id.as_str() {
s if !s.is_empty() => s.to_string(),
_ => return,
};
let self_clone = self.clone();
tokio::spawn(async move {
let _ = self_clone
.run_fanout(tenant, owner, task_id, task, terminal_seqs)
.await;
});
}
async fn run_fanout(
&self,
tenant: String,
owner: String,
task_id: String,
task: Task,
terminal_seqs: Vec<u64>,
) -> Result<(), crate::storage::A2aStorageError> {
let payload = match serde_json::to_vec(&task) {
Ok(p) => p,
Err(e) => {
return Err(crate::storage::A2aStorageError::SerializationError(
format!("task payload serialise failed: {e}"),
));
}
};
const LIST_MAX_ATTEMPTS: u32 = 3;
let backoffs = [
std::time::Duration::from_millis(50),
std::time::Duration::from_millis(150),
std::time::Duration::from_millis(500),
];
let mut first_err: Option<crate::storage::A2aStorageError> = None;
for seq in &terminal_seqs {
let mut configs: Vec<turul_a2a_proto::TaskPushNotificationConfig> = Vec::new();
let mut page_token: Option<String> = None;
let fetch_outcome: Result<(), crate::storage::A2aStorageError> = loop {
let mut last_err: Option<crate::storage::A2aStorageError> = None;
let page = 'retry: {
for attempt in 0..LIST_MAX_ATTEMPTS {
if attempt > 0 {
tokio::time::sleep(
backoffs[(attempt as usize).min(backoffs.len() - 1)],
)
.await;
}
match self
.push_storage
.list_configs_eligible_at_event(
&tenant,
&task_id,
*seq,
page_token.as_deref(),
None,
)
.await
{
Ok(p) => break 'retry Some(p),
Err(e) => last_err = Some(e),
}
}
None
};
match page {
Some(p) => {
configs.extend(p.configs);
if p.next_page_token.is_empty() {
break Ok(());
}
page_token = Some(p.next_page_token);
}
None => {
break Err(last_err.unwrap_or_else(|| {
crate::storage::A2aStorageError::DatabaseError(
"list_configs_eligible_at_event exhausted retry budget \
without recording an error"
.into(),
)
}));
}
}
};
if let Err(e) = fetch_outcome {
tracing::error!(
target: "turul_a2a::push_dispatch_config_list_failed",
tenant = %tenant,
task_id = %task_id,
event_sequence = *seq,
error = %e,
"push dispatch aborted: list_configs_eligible_at_event failed after \
bounded retry; pending-dispatch marker retained for reclaim sweep"
);
if first_err.is_none() {
first_err = Some(e);
}
continue;
}
if configs.is_empty() {
let _ = self
.push_delivery_store_handle()
.delete_pending_dispatch(&tenant, &task_id, *seq)
.await;
continue;
}
let mut join_handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
for cfg in &configs {
let Some(target) = PushTarget::from_config(&tenant, &owner, &task_id, *seq, cfg)
else {
continue;
};
let worker = self.worker.clone();
let payload = payload.clone();
let log_tenant = target.tenant.clone();
let log_task = target.task_id.clone();
let log_cfg = target.config_id.clone();
let log_seq = target.event_sequence;
join_handles.push(tokio::spawn(async move {
let report = worker.deliver(&target, &payload).await;
if matches!(report, DeliveryReport::TransientStoreError) {
tracing::warn!(
target: "turul_a2a::push_delivery_stuck",
tenant = %log_tenant,
task_id = %log_task,
config_id = %log_cfg,
event_sequence = log_seq,
"push delivery POST succeeded but terminal claim write \
did not persist; row remains non-terminal"
);
}
}));
}
for h in join_handles {
let _ = h.await;
}
let _ = self
.push_delivery_store_handle()
.delete_pending_dispatch(&tenant, &task_id, *seq)
.await;
}
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
}
fn push_delivery_store_handle(&self) -> &Arc<dyn crate::push::A2aPushDeliveryStore> {
&self.worker.push_delivery_store
}
pub async fn redispatch_pending(&self, pending: crate::push::claim::PendingDispatch) {
let crate::push::claim::PendingDispatch {
tenant,
owner,
task_id,
event_sequence,
..
} = pending;
let task_result = self
.task_storage
.get_task(&tenant, &task_id, &owner, None)
.await;
let task = match task_result {
Ok(Some(task)) => task,
Ok(None) => {
let _ = self
.push_delivery_store_handle()
.delete_pending_dispatch(&tenant, &task_id, event_sequence)
.await;
return;
}
Err(e) => {
tracing::warn!(
target: "turul_a2a::push_redispatch_pending_read_error",
tenant = %tenant,
task_id = %task_id,
event_sequence = event_sequence,
error = %e,
"pending redispatch skipped: task store read failed; \
marker retained for next sweep tick"
);
return;
}
};
if let Err(e) = self
.push_delivery_store_handle()
.record_pending_dispatch(&tenant, &owner, &task_id, event_sequence)
.await
{
tracing::warn!(
target: "turul_a2a::push_pending_dispatch_refresh_failed",
tenant = %tenant,
task_id = %task_id,
event_sequence = event_sequence,
error = %e,
"redispatch marker refresh failed; fan-out continues but \
scheduler may re-pick the marker before this tick finishes"
);
}
let _ = self
.run_fanout(tenant, owner, task_id, task, vec![event_sequence])
.await;
}
pub async fn try_redispatch_pending(
&self,
pending: crate::push::claim::PendingDispatch,
) -> Result<(), crate::storage::A2aStorageError> {
let crate::push::claim::PendingDispatch {
tenant,
owner,
task_id,
event_sequence,
..
} = pending;
let task = match self
.task_storage
.get_task(&tenant, &task_id, &owner, None)
.await
{
Ok(Some(task)) => task,
Ok(None) => {
self.push_delivery_store_handle()
.delete_pending_dispatch(&tenant, &task_id, event_sequence)
.await?;
return Ok(());
}
Err(e) => return Err(e),
};
if let Err(e) = self
.push_delivery_store_handle()
.record_pending_dispatch(&tenant, &owner, &task_id, event_sequence)
.await
{
tracing::warn!(
target: "turul_a2a::push_pending_dispatch_refresh_failed",
tenant = %tenant,
task_id = %task_id,
event_sequence = event_sequence,
error = %e,
"recovery redispatch marker refresh failed; fan-out continues"
);
}
self.run_fanout(tenant, owner, task_id, task, vec![event_sequence])
.await
}
pub async fn redispatch_one(&self, claim: ReclaimableClaim) {
use crate::push::claim::AbandonedReason;
let ReclaimableClaim {
tenant,
owner,
task_id,
event_sequence,
config_id,
} = claim;
let config_result = self
.push_storage
.get_config(&tenant, &task_id, &config_id)
.await;
let task_result = self
.task_storage
.get_task(&tenant, &task_id, &owner, None)
.await;
if let Err(e) = &config_result {
tracing::warn!(
target: "turul_a2a::push_redispatch_read_error",
tenant = %tenant,
task_id = %task_id,
config_id = %config_id,
event_sequence,
error = %e,
"reclaim redispatch skipped: push-config store read failed; will retry next sweep"
);
return;
}
if let Err(e) = &task_result {
tracing::warn!(
target: "turul_a2a::push_redispatch_read_error",
tenant = %tenant,
task_id = %task_id,
config_id = %config_id,
event_sequence,
error = %e,
"reclaim redispatch skipped: task store read failed; will retry next sweep"
);
return;
}
let config = config_result.expect("Err handled above");
let task = task_result.expect("Err handled above");
match (task, config) {
(Some(task), Some(cfg)) => {
let Some(target) =
PushTarget::from_config(&tenant, &owner, &task_id, event_sequence, &cfg)
else {
return;
};
let payload = match serde_json::to_vec(&task) {
Ok(p) => p,
Err(_) => return,
};
let log_tenant = target.tenant.clone();
let log_task = target.task_id.clone();
let log_cfg = target.config_id.clone();
let log_seq = target.event_sequence;
let report = self.worker.deliver(&target, &payload).await;
if matches!(report, DeliveryReport::TransientStoreError) {
tracing::warn!(
target: "turul_a2a::push_redispatch_stuck",
tenant = %log_tenant,
task_id = %log_task,
config_id = %log_cfg,
event_sequence = log_seq,
"reclaim redispatch POST succeeded but terminal \
claim write still did not persist"
);
}
}
(task, cfg) => {
let abandon_reason = if task.is_none() {
AbandonedReason::TaskDeleted
} else {
AbandonedReason::ConfigDeleted
};
let url = cfg
.as_ref()
.and_then(|c| Url::parse(&c.url).ok())
.unwrap_or_else(|| {
Url::parse("https://invalid.abandoned.push/").expect("static")
});
let target = PushTarget {
tenant: tenant.clone(),
owner: owner.clone(),
task_id: task_id.clone(),
event_sequence,
config_id: config_id.clone(),
url,
auth_scheme: String::new(),
auth_credentials: Secret::new(String::new()),
token: None,
};
let _ = self.worker.abandon_reclaimed(&target, abandon_reason).await;
}
}
}
}
fn dispatch_eligible(ev: &StreamEvent) -> bool {
matches!(ev, StreamEvent::StatusUpdate { .. }) && ev.is_terminal()
}
impl PushTarget {
pub fn from_config(
tenant: &str,
owner: &str,
task_id: &str,
event_sequence: u64,
cfg: &turul_a2a_proto::TaskPushNotificationConfig,
) -> Option<Self> {
let url = Url::parse(&cfg.url).ok()?;
let (auth_scheme, auth_credentials) = match cfg.authentication.as_ref() {
Some(a) => (a.scheme.clone(), Secret::new(a.credentials.clone())),
None => (String::new(), Secret::new(String::new())),
};
let token = if cfg.token.is_empty() {
None
} else {
Some(Secret::new(cfg.token.clone()))
};
Some(PushTarget {
tenant: tenant.to_string(),
owner: owner.to_string(),
task_id: task_id.to_string(),
event_sequence,
config_id: cfg.id.clone(),
url,
auth_scheme,
auth_credentials,
token,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn artifact_events_are_not_dispatched() {
let ev = StreamEvent::ArtifactUpdate {
artifact_update: crate::streaming::ArtifactUpdatePayload {
task_id: "t".into(),
context_id: "c".into(),
artifact: serde_json::json!({}),
append: false,
last_chunk: true,
},
};
assert!(!dispatch_eligible(&ev));
}
#[test]
fn non_terminal_status_is_not_dispatched() {
let status = turul_a2a_types::TaskStatus::new(turul_a2a_types::TaskState::Working);
let ev = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: "t".into(),
context_id: "c".into(),
status: serde_json::to_value(&status).unwrap(),
},
};
assert!(!dispatch_eligible(&ev));
}
#[test]
fn terminal_status_is_dispatched() {
for state in [
turul_a2a_types::TaskState::Completed,
turul_a2a_types::TaskState::Failed,
turul_a2a_types::TaskState::Canceled,
turul_a2a_types::TaskState::Rejected,
] {
let status = turul_a2a_types::TaskStatus::new(state);
let ev = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: "t".into(),
context_id: "c".into(),
status: serde_json::to_value(&status).unwrap(),
},
};
assert!(dispatch_eligible(&ev), "state {state:?} must dispatch");
}
}
#[test]
fn push_target_from_config_wraps_secrets() {
let cfg = turul_a2a_proto::TaskPushNotificationConfig {
tenant: "t".into(),
id: "cfg-A".into(),
task_id: "task-1".into(),
url: "https://example.com/hook".into(),
token: "TOKEN-X".into(),
authentication: Some(turul_a2a_proto::AuthenticationInfo {
scheme: "Bearer".into(),
credentials: "CRED-Y".into(),
}),
};
let t =
PushTarget::from_config("t", "anonymous", "task-1", 42, &cfg).expect("valid target");
assert_eq!(t.config_id, "cfg-A");
assert_eq!(t.event_sequence, 42);
assert_eq!(t.auth_scheme, "Bearer");
assert!(!format!("{:?}", t.auth_credentials).contains("CRED-Y"));
assert!(!format!("{:?}", t.token).contains("TOKEN-X"));
}
#[test]
fn push_target_from_config_rejects_malformed_url() {
let cfg = turul_a2a_proto::TaskPushNotificationConfig {
tenant: "t".into(),
id: "cfg-A".into(),
task_id: "task-1".into(),
url: "not a url".into(),
token: String::new(),
authentication: None,
};
assert!(PushTarget::from_config("t", "anonymous", "task-1", 1, &cfg).is_none());
}
}