mod push_delivery;
mod state_machine;
use std::sync::Arc;
use a2a_protocol_types::error::A2aResult;
use a2a_protocol_types::events::StreamResponse;
use a2a_protocol_types::task::{TaskId, TaskState, TaskStatus};
use tokio::sync::mpsc;
use super::super::RequestHandler;
use state_machine::process_event_bg;
impl RequestHandler {
#[allow(clippy::too_many_lines)]
pub(crate) fn spawn_background_event_processor(
&self,
task_id: TaskId,
executor_handle: tokio::task::JoinHandle<()>,
persistence_rx: Option<mpsc::Receiver<A2aResult<StreamResponse>>>,
) {
let task_store = Arc::clone(&self.task_store);
let push_config_store = Arc::clone(&self.push_config_store);
let push_sender = self.push_sender.clone();
let limits = self.limits.clone();
let tenant = crate::store::tenant::TenantContext::current();
tokio::spawn(crate::store::tenant::TenantContext::scope(
tenant,
async move {
let Some(mut persistence_reader) = persistence_rx else {
trace_warn!(
task_id = %task_id,
"background event processor: no persistence channel provided"
);
return;
};
let mut last_task = match task_store.get(&task_id).await {
Ok(Some(task)) => task,
Ok(None) => {
trace_error!(
task_id = %task_id,
"background processor: task not found in store, cannot process events"
);
return;
}
Err(_e) => {
trace_error!(
task_id = %task_id,
"background processor: failed to read task from store"
);
return;
}
};
let mut executor_done = false;
let mut handle_fuse = executor_handle;
loop {
if executor_done {
match persistence_reader.recv().await {
Some(event) => {
process_event_bg(
event,
&task_id,
&mut last_task,
&*task_store,
&*push_config_store,
push_sender.as_deref(),
&limits,
)
.await;
}
None => break,
}
} else {
tokio::select! {
biased;
event = persistence_reader.recv() => {
match event {
Some(event) => {
process_event_bg(
event,
&task_id,
&mut last_task,
&*task_store,
&*push_config_store,
push_sender.as_deref(),
&limits,
)
.await;
}
None => break,
}
}
result = &mut handle_fuse => {
executor_done = true;
if result.is_err() {
trace_error!(
task_id = %task_id,
"executor task panicked (background processor)"
);
if !last_task.status.state.is_terminal() {
last_task.status = TaskStatus::with_timestamp(TaskState::Failed);
if let Err(_e) = task_store.save(&last_task).await {
trace_error!(
task_id = %task_id,
"background processor: task store save failed after executor panic"
);
}
}
}
}
}
}
}
},
));
}
}