pub mod render;
pub use render::{render_system, slots_from_prompt, RenderError};
use crate::core::ctx::Ctx;
use crate::core::engine::Engine;
use crate::types::{CapToken, TaskId, WorkerId};
use crate::worker::adapter::{SpawnError, SpawnerAdapter, WorkerError, WorkerResult};
use crate::worker::output::{ContentRef, OutputEvent};
use crate::worker::{Worker, WorkerJoinHandler};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WorkerBinding {
pub subagent_type: String,
pub tools: Vec<String>,
}
#[async_trait]
pub trait Operator: Send + Sync {
async fn execute(
&self,
ctx: &Ctx,
system: Option<String>,
prompt: String,
worker: Option<WorkerBinding>,
worker_token: CapToken,
) -> Result<WorkerResult, WorkerError>;
fn requires_worker_binding(&self) -> bool {
false
}
}
pub struct OperatorSpawner {
operator: Arc<dyn Operator>,
system_prompt: Option<String>,
worker_binding: Option<WorkerBinding>,
}
impl OperatorSpawner {
pub fn new(
operator: Arc<dyn Operator>,
system_prompt: Option<String>,
worker_binding: Option<WorkerBinding>,
) -> Self {
Self {
operator,
system_prompt,
worker_binding,
}
}
}
#[async_trait]
impl SpawnerAdapter for OperatorSpawner {
async fn spawn(
&self,
engine: &Engine,
ctx: &Ctx,
task_id: TaskId,
attempt: u32,
token: CapToken,
) -> Result<Box<dyn Worker>, SpawnError> {
let prompt = engine
.fetch_prompt(&token, &task_id)
.await
.map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
let system = match self.system_prompt.as_deref() {
Some(tmpl) => {
let slots = render::slots_from_prompt(&prompt);
let rendered = render::render_system(tmpl, &slots)
.map_err(|e| SpawnError::Internal(format!("render system_prompt: {e}")))?;
Some(rendered)
}
None => None,
};
engine
.bake_worker_system_prompt(&task_id, attempt, system.clone())
.await
.map_err(|e| SpawnError::Internal(format!("bake system_prompt: {e}")))?;
let op = self.operator.clone();
let engine_clone = engine.clone();
let token_clone = token.clone();
let token_for_op = token.clone();
let task_id_clone = task_id.clone();
let ctx_clone = ctx.clone();
let worker_binding = self.worker_binding.clone();
let (tx, rx) = oneshot::channel();
let cancel = CancellationToken::new();
let cancel_inner = cancel.clone();
let worker_id = WorkerId::new();
tokio::spawn(async move {
let result: Result<WorkerResult, WorkerError> = tokio::select! {
r = op.execute(&ctx_clone, system, prompt, worker_binding, token_for_op) => r,
_ = cancel_inner.cancelled() => Err(WorkerError::Cancelled),
};
if let Ok(wr) = &result {
let tail = engine_clone.output_tail(&task_id_clone, attempt).await;
let has_final = tail
.iter()
.any(|ev| matches!(ev, OutputEvent::Final { .. }));
if !has_final {
let ev = OutputEvent::Final {
content: ContentRef::Inline {
value: wr.value.clone(),
},
ok: wr.ok,
};
let _ = engine_clone
.submit_output(&token_clone, &task_id_clone, attempt, ev)
.await;
}
}
let signal: Result<(), WorkerError> = result.map(|_| ());
let _ = tx.send(signal);
});
Ok(Box::new(OperatorWorker {
handler: WorkerJoinHandler {
worker_id,
cancel,
completion: rx,
},
}))
}
}
pub struct OperatorWorker {
pub handler: WorkerJoinHandler,
}
#[async_trait]
impl Worker for OperatorWorker {
fn id(&self) -> &WorkerId {
&self.handler.worker_id
}
fn cancel_token(&self) -> CancellationToken {
self.handler.cancel.clone()
}
async fn join(self: Box<Self>) -> Result<(), WorkerError> {
self.handler.await_completion().await
}
}