use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use super::event::EventQueue;
use super::middleware::User;
use super::task_store::TaskStore;
use crate::error::Result;
use crate::types::{Message, Task};
pub trait AgentExecutor: Send + Sync {
fn execute<'a>(
&'a self,
ctx: &'a RequestContext,
queue: &'a EventQueue,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
fn cancel<'a>(
&'a self,
ctx: &'a RequestContext,
queue: &'a EventQueue,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
}
#[derive(Debug, Clone)]
pub struct RequestContext {
pub task_id: String,
pub context_id: String,
pub message: Option<Message>,
pub stored_task: Option<Task>,
pub related_tasks: Vec<Task>,
pub metadata: crate::types::Metadata,
pub user: Arc<dyn User>,
pub tenant: Option<String>,
pub service_params: std::collections::HashMap<String, Vec<String>>,
}
impl RequestContext {
pub fn new(task_id: impl Into<String>, context_id: impl Into<String>) -> Self {
Self {
task_id: task_id.into(),
context_id: context_id.into(),
message: None,
stored_task: None,
related_tasks: Vec::new(),
metadata: std::collections::HashMap::new(),
user: Arc::new(super::middleware::UnauthenticatedUser),
tenant: None,
service_params: std::collections::HashMap::new(),
}
}
#[must_use]
pub fn create() -> Self {
Self::new(
uuid::Uuid::new_v4().to_string(),
uuid::Uuid::new_v4().to_string(),
)
}
}
pub trait RequestContextInterceptor: Send + Sync {
fn intercept<'a>(
&'a self,
ctx: &'a mut RequestContext,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
}
pub struct ReferencedTasksLoader {
store: Arc<dyn TaskStore>,
}
impl std::fmt::Debug for ReferencedTasksLoader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReferencedTasksLoader")
.finish_non_exhaustive()
}
}
impl ReferencedTasksLoader {
pub fn new(store: Arc<dyn TaskStore>) -> Self {
Self { store }
}
}
impl RequestContextInterceptor for ReferencedTasksLoader {
fn intercept<'a>(
&'a self,
ctx: &'a mut RequestContext,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(self.load_referenced_tasks(ctx))
}
}
impl ReferencedTasksLoader {
async fn load_referenced_tasks(&self, ctx: &mut RequestContext) -> Result<()> {
let reference_ids = match ctx.message.as_ref() {
Some(m) if !m.reference_task_ids.is_empty() => m.reference_task_ids.clone(),
_ => return Ok(()),
};
let mut tasks = Vec::new();
for task_id in &reference_ids {
match self.store.get(task_id).await {
Ok(Some((t, _version))) => tasks.push(t),
Ok(None) => {
tracing::info!(referenced_task_id = %task_id, "Referenced task not found");
}
Err(e) => {
tracing::info!(error = %e, referenced_task_id = %task_id, "Failed to load referenced task");
}
}
}
if !tasks.is_empty() {
ctx.related_tasks = tasks;
}
Ok(())
}
}