use std::sync::Arc;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use turul_a2a_types::{Artifact, Message, Task, TaskState};
use crate::error::A2aError;
use crate::event_sink::EventSink;
use crate::executor::{AgentExecutor, ExecutionContext};
use crate::server::in_flight::{InFlightHandle, InFlightKey, InFlightRegistry, SupervisorSentinel};
use crate::storage::{A2aAtomicStore, A2aTaskStorage};
use crate::streaming::TaskEventBroker;
#[derive(Clone)]
pub struct SpawnDeps {
pub executor: Arc<dyn AgentExecutor>,
pub task_storage: Arc<dyn A2aTaskStorage>,
pub atomic_store: Arc<dyn A2aAtomicStore>,
pub event_broker: TaskEventBroker,
pub in_flight: Arc<InFlightRegistry>,
pub push_dispatcher: Option<Arc<crate::push::PushDispatcher>>,
}
pub struct SpawnScope {
pub tenant: String,
pub owner: String,
pub task_id: String,
pub context_id: String,
pub message: Message,
pub claims: Option<serde_json::Value>,
}
pub(crate) struct SpawnResult {
pub handle: Arc<InFlightHandle>,
pub yielded_rx: oneshot::Receiver<Task>,
pub cancellation: CancellationToken,
}
pub(crate) fn spawn_tracked_executor(
deps: SpawnDeps,
scope: SpawnScope,
) -> Result<SpawnResult, A2aError> {
let key: InFlightKey = (scope.tenant.clone(), scope.task_id.clone());
let cancellation = CancellationToken::new();
let (yielded_tx, yielded_rx) = oneshot::channel::<Task>();
let placeholder_jh = tokio::spawn(async {});
let handle = Arc::new(InFlightHandle::new(
cancellation.clone(),
yielded_tx,
placeholder_jh,
));
deps.in_flight
.try_insert(key.clone(), handle.clone())
.map_err(|collision| {
A2aError::Internal(format!("double-spawn for in-flight task: {collision}"))
})?;
let sink = EventSink::new(
scope.tenant.clone(),
scope.task_id.clone(),
scope.context_id.clone(),
scope.owner.clone(),
deps.atomic_store.clone(),
deps.task_storage.clone(),
deps.event_broker.clone(),
handle.clone(),
deps.push_dispatcher.clone(),
);
let exec_deps = deps.clone();
let exec_scope = SpawnScope {
tenant: scope.tenant.clone(),
owner: scope.owner.clone(),
task_id: scope.task_id.clone(),
context_id: scope.context_id.clone(),
message: scope.message.clone(),
claims: scope.claims.clone(),
};
let exec_cancellation = cancellation.clone();
let exec_sink = sink.clone();
let executor_jh = tokio::spawn(async move {
run_executor_for_existing_task(exec_deps, exec_scope, exec_sink, exec_cancellation).await;
});
handle.set_spawned(executor_jh);
let sup_registry = deps.in_flight.clone();
let sup_key = key;
let sup_handle = handle.clone();
tokio::spawn(async move {
let _sentinel = SupervisorSentinel::new(sup_registry, sup_key, sup_handle.clone());
if let Some(jh) = sup_handle.take_spawned() {
let _ = jh.await;
}
});
Ok(SpawnResult {
handle,
yielded_rx,
cancellation,
})
}
pub async fn run_queued_executor_job(deps: SpawnDeps, scope: SpawnScope) {
let cancellation = CancellationToken::new();
let (yielded_tx, _yielded_rx) = oneshot::channel::<Task>();
let placeholder_jh = tokio::spawn(async {});
let in_flight_handle = Arc::new(InFlightHandle::new(
cancellation.clone(),
yielded_tx,
placeholder_jh,
));
let sink = EventSink::new(
scope.tenant.clone(),
scope.task_id.clone(),
scope.context_id.clone(),
scope.owner.clone(),
deps.atomic_store.clone(),
deps.task_storage.clone(),
deps.event_broker.clone(),
in_flight_handle,
deps.push_dispatcher.clone(),
);
run_executor_for_existing_task(deps, scope, sink, cancellation).await;
}
pub(crate) async fn run_executor_for_existing_task(
deps: SpawnDeps,
scope: SpawnScope,
sink: EventSink,
cancellation: CancellationToken,
) {
let SpawnScope {
tenant,
owner,
task_id,
context_id,
message,
claims,
} = scope;
let mut task = match deps
.task_storage
.get_task(&tenant, &task_id, &owner, None)
.await
{
Ok(Some(t)) => t,
Ok(None) => {
let _ = sink
.commit_state_internal(
TaskState::Failed,
Some(agent_text(
"framework: task not found when executor body started",
)),
)
.await;
return;
}
Err(e) => {
let _ = sink
.commit_state_internal(
TaskState::Failed,
Some(agent_text(&format!(
"framework: failed to load task in executor body: {e}"
))),
)
.await;
return;
}
};
let start_artifact_sigs: Vec<(String, usize)> = task
.artifacts()
.iter()
.map(|a| (a.artifact_id.clone(), a.parts.len()))
.collect();
let ctx = ExecutionContext {
owner,
tenant: if tenant.is_empty() {
None
} else {
Some(tenant.clone())
},
task_id: task_id.clone(),
context_id: Some(context_id),
claims,
cancellation,
events: sink.clone(),
};
let result = deps.executor.execute(&mut task, &message, &ctx).await;
commit_post_execute(&sink, &task, &start_artifact_sigs, result).await;
}
pub(crate) async fn commit_post_execute(
sink: &EventSink,
task: &Task,
start_artifact_sigs: &[(String, usize)],
execute_result: Result<(), A2aError>,
) {
if sink.is_closed() {
return;
}
for (i, pb_artifact) in task.artifacts().iter().enumerate() {
let current_id = &pb_artifact.artifact_id;
let current_len = pb_artifact.parts.len();
match start_artifact_sigs.get(i) {
None => {
let wrapper: Artifact = pb_artifact.clone().into();
let _ = sink.emit_artifact(wrapper, false, true).await;
}
Some((prev_id, prev_len)) if prev_id == current_id && current_len > *prev_len => {
let mut tail = pb_artifact.clone();
tail.parts = tail.parts.split_off(*prev_len);
let wrapper: Artifact = tail.into();
let _ = sink.emit_artifact(wrapper, true, true).await;
}
Some(_) => {
}
}
if sink.is_closed() {
return;
}
}
match execute_result {
Err(e) => {
let _ = sink
.commit_state_internal(
TaskState::Failed,
Some(agent_text(&format!("executor error: {e}"))),
)
.await;
}
Ok(()) => {
let state_opt = task.status().and_then(|s| s.state().ok());
let message_opt = task
.status()
.and_then(|s| s.as_proto().message.clone())
.and_then(|pb_msg| Message::try_from(pb_msg).ok());
match state_opt {
Some(s) if turul_a2a_types::state_machine::is_terminal(s) => {
let _ = sink.commit_state_internal(s, message_opt).await;
}
Some(TaskState::InputRequired) | Some(TaskState::AuthRequired) => {
let _ = sink
.commit_state_internal(state_opt.unwrap(), message_opt)
.await;
}
Some(TaskState::Submitted) | Some(TaskState::Working) | None => {
let _ = sink
.commit_state_internal(
TaskState::Failed,
Some(agent_text(
"executor returned without reaching terminal \
state and without emitting events",
)),
)
.await;
}
Some(_) => {
let _ = sink
.commit_state_internal(
TaskState::Failed,
Some(agent_text(
"executor returned with an unrecognised task \
state; framework forced FAILED",
)),
)
.await;
}
}
}
}
}
fn agent_text(text: &str) -> Message {
Message::new(
uuid::Uuid::now_v7().to_string(),
turul_a2a_types::Role::Agent,
vec![turul_a2a_types::Part::text(text.to_string())],
)
}