use crate::authority::EventAuthority;
use crate::context::{CompactionConfig, ContextCompactor};
use crate::hooks::ToolAuditSink;
use crate::llm::StopReason;
use crate::stores::{EventStore, ToolExecutionStore};
use crate::tools::{ToolContext, ToolRegistry};
#[cfg(feature = "otel")]
use crate::types::RunOptions;
use crate::types::{
AgentConfig, AgentContinuation, AgentError, AgentInput, AgentState, ListenExecutionContext,
PendingToolCallInfo, ThreadId, TokenUsage, ToolResult, TurnOptions,
};
use agent_sdk_foundation::audit::AuditProvenance;
use std::sync::Arc;
use std::time::{Duration, Instant};
use time::OffsetDateTime;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
pub(super) enum InternalTurnResult {
Continue { turn_usage: TokenUsage },
Done,
Refusal,
Cancelled { turn_usage: TokenUsage },
AwaitingConfirmation {
tool_call_id: String,
tool_name: String,
display_name: String,
input: serde_json::Value,
description: String,
continuation: Box<AgentContinuation>,
},
PendingToolCalls {
turn_usage: TokenUsage,
pending_tool_calls: Vec<PendingToolCallInfo>,
continuation: Box<AgentContinuation>,
},
Error(AgentError),
}
pub(super) const MAX_COMPACTION_RETRIES: usize = 3;
pub(super) struct TurnContext {
pub(super) thread_id: ThreadId,
pub(super) turn: usize,
pub(super) total_usage: TokenUsage,
pub(super) state: AgentState,
pub(super) start_time: Instant,
pub(super) compaction_retries: usize,
pub(super) pending_reminder: Option<String>,
pub(super) response_id: Option<String>,
pub(super) stop_reason: Option<StopReason>,
pub(super) tool_call_count: usize,
#[cfg(feature = "otel")]
pub(super) input_kind: &'static str,
}
pub(super) struct ResumeData {
pub(super) continuation: Box<AgentContinuation>,
pub(super) tool_call_id: String,
pub(super) confirmed: bool,
pub(super) rejection_reason: Option<String>,
}
pub(super) struct InitializedState {
pub(super) turn: usize,
pub(super) total_usage: TokenUsage,
pub(super) state: AgentState,
pub(super) resume_data: Option<ResumeData>,
}
pub(super) enum ToolExecutionOutcome {
Completed { tool_id: String, result: ToolResult },
RequiresConfirmation {
tool_id: String,
tool_name: String,
display_name: String,
input: serde_json::Value,
description: String,
listen_context: Option<ListenExecutionContext>,
},
Error(AgentError),
}
pub(super) const MAX_LISTEN_UPDATES: usize = 240;
pub(super) const LISTEN_UPDATE_TIMEOUT: Duration = Duration::from_secs(30);
pub(super) const LISTEN_TOTAL_TIMEOUT: Duration = Duration::from_mins(5);
pub(super) struct ListenReady {
pub(super) operation_id: String,
pub(super) revision: u64,
pub(super) snapshot: serde_json::Value,
pub(super) expires_at: Option<OffsetDateTime>,
}
#[derive(Clone, Copy)]
pub(super) enum ListenProgressStage {
Update,
Ready,
Invalidated,
}
impl ListenProgressStage {
pub(super) const fn as_str(self) -> &'static str {
match self {
Self::Update => "listen_update",
Self::Ready => "listen_ready",
Self::Invalidated => "listen_invalidated",
}
}
}
pub(super) enum ListenUpdateHandling {
Continue,
Ready(ListenReady),
}
pub(super) struct ListenUpdateContext<'a, H> {
pub(super) pending: &'a PendingToolCallInfo,
pub(super) hooks: &'a Arc<H>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) thread_id: &'a ThreadId,
pub(super) turn: usize,
pub(super) authority: &'a Arc<dyn EventAuthority>,
}
pub(super) struct ListenWaitParams<'a, Ctx, H> {
pub(super) pending: &'a PendingToolCallInfo,
pub(super) tool: &'a Arc<dyn crate::tools::ErasedListenTool<Ctx>>,
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) update_ctx: ListenUpdateContext<'a, H>,
}
pub(super) struct ToolCallExecutionContext<'a, Ctx, H> {
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) thread_id: &'a ThreadId,
pub(super) tools: &'a ToolRegistry<Ctx>,
pub(super) hooks: &'a Arc<H>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) turn: usize,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) execution_store: Option<&'a Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: &'a Arc<dyn ToolAuditSink>,
pub(super) provenance: &'a AuditProvenance,
}
pub(super) struct ConfirmedToolExecutionContext<'a, Ctx, H> {
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) thread_id: &'a ThreadId,
pub(super) tools: &'a ToolRegistry<Ctx>,
pub(super) hooks: &'a Arc<H>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) turn: usize,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) execution_store: Option<&'a Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: &'a Arc<dyn ToolAuditSink>,
pub(super) provenance: &'a AuditProvenance,
}
pub(super) enum StreamError {
Recoverable(String),
Fatal(String),
Cancelled,
}
pub(super) enum LlmOutcome {
Response(crate::llm::ChatResponse),
Cancelled,
Error(AgentError),
}
pub(super) struct ResumeSummaryMetrics {
pub(super) response_id: Option<String>,
pub(super) stop_reason: Option<StopReason>,
pub(super) tool_call_count: usize,
}
pub(super) enum ResumeProcessingResult {
Completed {
turn_usage: TokenUsage,
metrics: ResumeSummaryMetrics,
},
AwaitingConfirmation {
tool_call_id: String,
tool_name: String,
display_name: String,
input: serde_json::Value,
description: String,
continuation: Box<AgentContinuation>,
},
}
pub(super) struct RunLoopParameters<Ctx, P, H, M, S> {
pub(super) event_store: Arc<dyn EventStore>,
pub(super) authority: Arc<dyn EventAuthority>,
pub(super) thread_id: ThreadId,
pub(super) input: AgentInput,
pub(super) tool_context: ToolContext<Ctx>,
pub(super) provider: Arc<P>,
pub(super) tools: Arc<ToolRegistry<Ctx>>,
pub(super) hooks: Arc<H>,
pub(super) message_store: Arc<M>,
pub(super) state_store: Arc<S>,
pub(super) config: AgentConfig,
pub(super) compaction_config: Option<CompactionConfig>,
pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: Arc<dyn ToolAuditSink>,
pub(super) cancel_token: CancellationToken,
pub(super) input_rx: Option<mpsc::Receiver<AgentInput>>,
#[cfg(feature = "otel")]
pub(super) run_options: RunOptions,
#[cfg(feature = "otel")]
pub(super) observability_store: Option<Arc<dyn crate::observability::ObservabilityStore>>,
}
pub(super) struct ResumeProcessingParameters<'a, Ctx, H, M> {
pub(super) resume_data: ResumeData,
pub(super) turn: usize,
pub(super) total_usage: &'a TokenUsage,
pub(super) state: &'a AgentState,
pub(super) thread_id: &'a ThreadId,
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) tools: &'a Arc<ToolRegistry<Ctx>>,
pub(super) hooks: &'a Arc<H>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) message_store: &'a Arc<M>,
pub(super) execution_store: Option<&'a Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: &'a Arc<dyn ToolAuditSink>,
pub(super) provenance: &'a AuditProvenance,
}
pub(super) struct RunLoopResumeParams<'a, Ctx, H, M> {
pub(super) resume_data: ResumeData,
pub(super) turn: usize,
pub(super) total_usage: &'a TokenUsage,
pub(super) state: &'a AgentState,
pub(super) thread_id: &'a ThreadId,
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) tools: &'a Arc<ToolRegistry<Ctx>>,
pub(super) hooks: &'a Arc<H>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) message_store: &'a Arc<M>,
pub(super) execution_store: Option<&'a Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: &'a Arc<dyn ToolAuditSink>,
pub(super) provenance: &'a AuditProvenance,
}
pub(super) struct RunLoopTurnsParams<'a, Ctx, P, H, M, S> {
pub(super) ctx: &'a mut TurnContext,
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) provider: &'a Arc<P>,
pub(super) tools: &'a Arc<ToolRegistry<Ctx>>,
pub(super) hooks: &'a Arc<H>,
pub(super) message_store: &'a Arc<M>,
pub(super) state_store: &'a Arc<S>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) config: &'a AgentConfig,
pub(super) compaction_config: Option<&'a CompactionConfig>,
pub(super) compactor: Option<&'a Arc<dyn ContextCompactor>>,
pub(super) execution_store: Option<&'a Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: &'a Arc<dyn ToolAuditSink>,
pub(super) provenance: &'a AuditProvenance,
pub(super) cancel_token: &'a CancellationToken,
pub(super) input_rx: Option<&'a mut mpsc::Receiver<AgentInput>>,
pub(super) turn_options: &'a TurnOptions,
#[cfg(feature = "otel")]
pub(super) observability_store: Option<&'a Arc<dyn crate::observability::ObservabilityStore>>,
}
pub(super) struct PersistentDoneParams<'a, H, M> {
pub(super) ctx: &'a TurnContext,
pub(super) rx: &'a mut mpsc::Receiver<AgentInput>,
pub(super) message_store: &'a Arc<M>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) hooks: &'a Arc<H>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) current_turn: usize,
pub(super) cancel_token: &'a CancellationToken,
}
pub(super) struct RunLoopTurnResultParams<'a, H, M, S> {
pub(super) result: InternalTurnResult,
pub(super) ctx: &'a TurnContext,
pub(super) input_rx: Option<&'a mut mpsc::Receiver<AgentInput>>,
pub(super) message_store: &'a Arc<M>,
pub(super) state_store: &'a Arc<S>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) hooks: &'a Arc<H>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) cancel_token: &'a CancellationToken,
pub(super) current_turn: usize,
}
pub(super) struct SingleTurnResumeParams<Ctx, H, M, S> {
pub(super) resume_data: ResumeData,
pub(super) turn: usize,
pub(super) total_usage: TokenUsage,
pub(super) state: AgentState,
pub(super) thread_id: ThreadId,
pub(super) tool_context: ToolContext<Ctx>,
pub(super) tools: Arc<ToolRegistry<Ctx>>,
pub(super) hooks: Arc<H>,
pub(super) event_store: Arc<dyn EventStore>,
pub(super) authority: Arc<dyn EventAuthority>,
pub(super) message_store: Arc<M>,
pub(super) state_store: Arc<S>,
pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: Arc<dyn ToolAuditSink>,
pub(super) provenance: AuditProvenance,
pub(super) turn_options: TurnOptions,
pub(super) start_time: Instant,
}
pub(super) struct TurnParameters<Ctx, P, H, M, S> {
pub(super) event_store: Arc<dyn EventStore>,
pub(super) authority: Arc<dyn EventAuthority>,
pub(super) thread_id: ThreadId,
pub(super) input: AgentInput,
pub(super) tool_context: ToolContext<Ctx>,
pub(super) provider: Arc<P>,
pub(super) tools: Arc<ToolRegistry<Ctx>>,
pub(super) hooks: Arc<H>,
pub(super) message_store: Arc<M>,
pub(super) state_store: Arc<S>,
pub(super) config: AgentConfig,
pub(super) compaction_config: Option<CompactionConfig>,
pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: Arc<dyn ToolAuditSink>,
pub(super) cancel_token: CancellationToken,
pub(super) turn_options: TurnOptions,
#[cfg(feature = "otel")]
pub(super) run_options: RunOptions,
#[cfg(feature = "otel")]
pub(super) observability_store: Option<Arc<dyn crate::observability::ObservabilityStore>>,
}
pub(super) struct ExecuteTurnParameters<'a, Ctx, P, H, M, S> {
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) ctx: &'a mut TurnContext,
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) provider: &'a Arc<P>,
pub(super) tools: &'a Arc<ToolRegistry<Ctx>>,
pub(super) hooks: &'a Arc<H>,
pub(super) message_store: &'a Arc<M>,
pub(super) state_store: &'a Arc<S>,
pub(super) config: &'a AgentConfig,
pub(super) compaction_config: Option<&'a CompactionConfig>,
pub(super) compactor: Option<&'a Arc<dyn ContextCompactor>>,
pub(super) execution_store: Option<&'a Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: &'a Arc<dyn ToolAuditSink>,
pub(super) provenance: &'a AuditProvenance,
pub(super) turn_options: &'a TurnOptions,
pub(super) cancel_token: &'a CancellationToken,
#[cfg(feature = "otel")]
pub(super) observability_store: Option<&'a Arc<dyn crate::observability::ObservabilityStore>>,
}
pub(super) struct TurnMessageLoadParams<'a, P, H, M> {
pub(super) thread_id: &'a ThreadId,
pub(super) turn: usize,
pub(super) provider: &'a Arc<P>,
pub(super) message_store: &'a Arc<M>,
pub(super) compaction_config: Option<&'a CompactionConfig>,
pub(super) compactor: Option<&'a Arc<dyn ContextCompactor>>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) hooks: &'a Arc<H>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) cancel_token: &'a CancellationToken,
}
pub(super) struct LlmCallParams<'a, P, H> {
pub(super) provider: &'a Arc<P>,
pub(super) request: crate::llm::ChatRequest,
pub(super) config: &'a AgentConfig,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) hooks: &'a Arc<H>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) thread_id: &'a ThreadId,
pub(super) turn: usize,
pub(super) message_id: &'a str,
pub(super) thinking_id: &'a str,
pub(super) cancel_token: &'a CancellationToken,
#[cfg(feature = "otel")]
pub(super) observability_store: Option<&'a Arc<dyn crate::observability::ObservabilityStore>>,
}
pub(super) struct LlmEventContext<'a, H> {
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) hooks: &'a Arc<H>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) thread_id: &'a ThreadId,
pub(super) turn: usize,
pub(super) cancel_token: &'a CancellationToken,
}
#[derive(Clone, Copy)]
pub(super) struct LlmStreamIds<'a> {
pub(super) message_id: &'a str,
pub(super) thinking_id: &'a str,
}
pub(super) struct ProcessedTurnResponse {
pub(super) stop_reason: Option<StopReason>,
pub(super) text_content: Option<String>,
pub(super) pending_tool_calls: Vec<PendingToolCallInfo>,
}
pub(super) struct TurnResponseProcessingParams<'a, Ctx, H, M> {
pub(super) response: crate::llm::ChatResponse,
pub(super) message_id: &'a str,
pub(super) thinking_id: &'a str,
pub(super) thread_id: &'a ThreadId,
pub(super) turn: usize,
pub(super) tools: &'a Arc<ToolRegistry<Ctx>>,
pub(super) message_store: &'a Arc<M>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) hooks: &'a Arc<H>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
}
pub(super) struct ToolBatchExecutionParams<'a, Ctx, H> {
pub(super) pending_tool_calls: Vec<PendingToolCallInfo>,
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) thread_id: &'a ThreadId,
pub(super) tools: &'a Arc<ToolRegistry<Ctx>>,
pub(super) hooks: &'a Arc<H>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) execution_store: Option<&'a Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: &'a Arc<dyn ToolAuditSink>,
pub(super) provenance: &'a AuditProvenance,
pub(super) turn: usize,
pub(super) total_usage: &'a TokenUsage,
pub(super) turn_usage: &'a TokenUsage,
pub(super) state: &'a AgentState,
pub(super) response_id: Option<String>,
pub(super) stop_reason: Option<StopReason>,
}
pub(super) struct TurnCompletionParams<'a, H, M> {
pub(super) tool_results: &'a [(String, ToolResult)],
pub(super) thread_id: &'a ThreadId,
pub(super) turn: usize,
pub(super) turn_usage: &'a TokenUsage,
pub(super) message_store: &'a Arc<M>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) hooks: &'a Arc<H>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
}
pub(super) struct TurnToolPhaseParams<'a, Ctx, H, M> {
pub(super) pending_tool_calls: Vec<PendingToolCallInfo>,
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) thread_id: &'a ThreadId,
pub(super) tools: &'a Arc<ToolRegistry<Ctx>>,
pub(super) hooks: &'a Arc<H>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) execution_store: Option<&'a Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: &'a Arc<dyn ToolAuditSink>,
pub(super) provenance: &'a AuditProvenance,
pub(super) turn: usize,
pub(super) total_usage: &'a TokenUsage,
pub(super) turn_usage: &'a TokenUsage,
pub(super) state: &'a AgentState,
pub(super) message_store: &'a Arc<M>,
pub(super) response_id: Option<String>,
pub(super) stop_reason: Option<StopReason>,
}
pub(super) struct TurnStopReasonParams<'a, P, H, M> {
pub(super) stop_reason: Option<StopReason>,
pub(super) text_content: Option<String>,
pub(super) had_tool_calls: bool,
pub(super) message_id: String,
pub(super) turn_usage: TokenUsage,
pub(super) ctx: &'a mut TurnContext,
pub(super) provider: &'a Arc<P>,
pub(super) message_store: &'a Arc<M>,
pub(super) compaction_config: Option<&'a CompactionConfig>,
pub(super) compactor: Option<&'a Arc<dyn ContextCompactor>>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) hooks: &'a Arc<H>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) cancel_token: &'a CancellationToken,
}
pub(super) struct ConvertTurnResultParams<'a, H, S> {
pub(super) result: InternalTurnResult,
pub(super) ctx: TurnContext,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) hooks: &'a Arc<H>,
pub(super) authority: &'a Arc<dyn EventAuthority>,
pub(super) thread_id: ThreadId,
pub(super) current_turn: usize,
pub(super) state_store: &'a Arc<S>,
pub(super) provenance: &'a AuditProvenance,
pub(super) turn_options: &'a TurnOptions,
}
pub(super) type ExtractedContent = (
Option<String>,
Option<String>,
Vec<(String, String, serde_json::Value)>,
);