use std::collections::BTreeSet;
use std::sync::Arc;
use crate::LoopState;
use crate::comms::InputSource;
use crate::interaction::{
PeerIngressAdmission, PeerIngressEnvelopeFacts, PeerIngressPlainEventFacts,
};
use crate::lifecycle::run_primitive::ModelId;
use crate::lifecycle::{InputId, RunId};
use crate::ops::{AsyncOpRef, OperationId};
use crate::peer_correlation::{
InboundPeerRequestState, InteractionStreamState, OutboundPeerRequestState, PeerCorrelationId,
};
use crate::retry::LlmRetrySchedule;
use crate::tool_scope::{
ExternalToolSurfaceBaseState, ExternalToolSurfaceDeltaOperation, ExternalToolSurfaceDeltaPhase,
ExternalToolSurfaceFailureCause, ExternalToolSurfaceGlobalPhase, ExternalToolSurfacePendingOp,
ExternalToolSurfaceStagedOp,
};
use crate::turn_execution_authority::{
ContentShape, TurnFailureReason, TurnPhase, TurnPrimitiveKind, TurnTerminalCauseKind,
TurnTerminalOutcome,
};
use crate::types::SessionId;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DrainMode {
Timed,
AttachedSession,
PersistentHost,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DrainExitReason {
IdleTimeout,
Dismissed,
Failed,
Aborted,
SessionShutdown,
}
pub trait ModelRoutingHandle: Send + Sync {
fn set_baseline(
&self,
baseline_model: ModelId,
realtime_capable: bool,
) -> Result<(), DslTransitionError>;
}
impl DrainExitReason {
pub const fn as_str(self) -> &'static str {
match self {
Self::IdleTimeout => "IdleTimeout",
Self::Dismissed => "Dismissed",
Self::Failed => "Failed",
Self::Aborted => "Aborted",
Self::SessionShutdown => "SessionShutdown",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuthLeasePhase {
Valid,
Expiring,
Refreshing,
ReauthRequired,
Released,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DslRejectionKind {
NoMatchingTransition,
GuardRejected,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("DSL transition rejected in {context}: {reason}")]
pub struct DslTransitionError {
pub context: &'static str,
pub kind: DslRejectionKind,
pub reason: String,
}
impl DslTransitionError {
pub fn new(context: &'static str, reason: impl Into<String>) -> Self {
Self::no_matching(context, reason)
}
pub fn no_matching(context: &'static str, reason: impl Into<String>) -> Self {
Self {
context,
kind: DslRejectionKind::NoMatchingTransition,
reason: reason.into(),
}
}
pub fn guard_rejected(context: &'static str, reason: impl Into<String>) -> Self {
Self {
context,
kind: DslRejectionKind::GuardRejected,
reason: reason.into(),
}
}
pub fn is_guard_rejected(&self) -> bool {
self.kind == DslRejectionKind::GuardRejected
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerResponseProgressProjectionPhase {
Accepted,
InProgress,
PartialResult,
}
impl PeerResponseProgressProjectionPhase {
fn label(self) -> &'static str {
match self {
Self::Accepted => "accepted",
Self::InProgress => "in_progress",
Self::PartialResult => "partial_result",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerResponseTerminalProjectionStatus {
Completed,
Failed,
Cancelled,
}
impl PeerResponseTerminalProjectionStatus {
fn label(self) -> &'static str {
match self {
Self::Completed => "completed",
Self::Failed => "failed",
Self::Cancelled => "cancelled",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum PeerResponseTerminalFactError {
#[error("transport identity cannot be empty")]
EmptyTransportIdentity,
#[error("route identity cannot be empty")]
EmptyRouteIdentity,
#[error("route identity cannot contain control characters")]
InvalidRouteIdentity,
#[error("display identity is required")]
MissingDisplayIdentity,
#[error("display identity cannot be empty")]
EmptyDisplayIdentity,
#[error("display identity cannot contain control characters")]
InvalidDisplayIdentity,
#[error("correlation id cannot be empty")]
EmptyCorrelationId,
#[error("correlation id must be a UUID: {input}")]
InvalidCorrelationId { input: String },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerResponseTerminalTransportIdentity(String);
impl PeerResponseTerminalTransportIdentity {
pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
let raw = raw.into();
if raw.trim().is_empty() {
return Err(PeerResponseTerminalFactError::EmptyTransportIdentity);
}
Ok(Self(raw))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for PeerResponseTerminalTransportIdentity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerResponseTerminalRouteIdentity(String);
impl PeerResponseTerminalRouteIdentity {
pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
let raw = raw.into();
if raw.trim().is_empty() {
return Err(PeerResponseTerminalFactError::EmptyRouteIdentity);
}
if raw.chars().any(char::is_control) {
return Err(PeerResponseTerminalFactError::InvalidRouteIdentity);
}
Ok(Self(raw))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for PeerResponseTerminalRouteIdentity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerResponseTerminalDisplayIdentity(String);
impl PeerResponseTerminalDisplayIdentity {
pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
let raw = raw.into();
if raw.trim().is_empty() {
return Err(PeerResponseTerminalFactError::EmptyDisplayIdentity);
}
if raw.chars().any(char::is_control) {
return Err(PeerResponseTerminalFactError::InvalidDisplayIdentity);
}
Ok(Self(raw))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for PeerResponseTerminalDisplayIdentity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PeerResponseTerminalCorrelationId(PeerCorrelationId);
impl PeerResponseTerminalCorrelationId {
pub fn parse(raw: impl AsRef<str>) -> Result<Self, PeerResponseTerminalFactError> {
let raw = raw.as_ref();
if raw.trim().is_empty() {
return Err(PeerResponseTerminalFactError::EmptyCorrelationId);
}
uuid::Uuid::parse_str(raw)
.map(|uuid| Self(PeerCorrelationId::from_uuid(uuid)))
.map_err(|_| PeerResponseTerminalFactError::InvalidCorrelationId {
input: raw.to_string(),
})
}
pub const fn from_peer_correlation_id(correlation_id: PeerCorrelationId) -> Self {
Self(correlation_id)
}
pub const fn as_peer_correlation_id(self) -> PeerCorrelationId {
self.0
}
}
impl std::fmt::Display for PeerResponseTerminalCorrelationId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PeerResponseTerminalRenderPayload(Option<serde_json::Value>);
impl PeerResponseTerminalRenderPayload {
pub fn new(payload: Option<serde_json::Value>) -> Self {
Self(payload)
}
pub fn as_ref(&self) -> Option<&serde_json::Value> {
self.0.as_ref()
}
}
impl From<Option<serde_json::Value>> for PeerResponseTerminalRenderPayload {
fn from(payload: Option<serde_json::Value>) -> Self {
Self::new(payload)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerResponseTerminalSource {
pub transport_identity: Option<PeerResponseTerminalTransportIdentity>,
pub route_identity: PeerResponseTerminalRouteIdentity,
pub display_identity: PeerResponseTerminalDisplayIdentity,
}
impl PeerResponseTerminalSource {
pub fn new(
transport_identity: Option<PeerResponseTerminalTransportIdentity>,
route_identity: PeerResponseTerminalRouteIdentity,
display_identity: PeerResponseTerminalDisplayIdentity,
) -> Self {
Self {
transport_identity,
route_identity,
display_identity,
}
}
pub fn parse(
transport_identity: Option<impl Into<String>>,
route_identity: impl Into<String>,
display_identity: impl Into<String>,
) -> Result<Self, PeerResponseTerminalFactError> {
Ok(Self::new(
transport_identity
.map(PeerResponseTerminalTransportIdentity::parse)
.transpose()?,
PeerResponseTerminalRouteIdentity::parse(route_identity)?,
PeerResponseTerminalDisplayIdentity::parse(display_identity)?,
))
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PeerResponseTerminalFact {
pub source: PeerResponseTerminalSource,
pub correlation_id: PeerResponseTerminalCorrelationId,
pub status: PeerResponseTerminalProjectionStatus,
pub render_payload: PeerResponseTerminalRenderPayload,
}
impl PeerResponseTerminalFact {
pub fn new(
source: PeerResponseTerminalSource,
correlation_id: PeerResponseTerminalCorrelationId,
status: PeerResponseTerminalProjectionStatus,
render_payload: PeerResponseTerminalRenderPayload,
) -> Self {
Self {
source,
correlation_id,
status,
render_payload,
}
}
pub fn prompt_text(&self) -> String {
format!(
"[SYSTEM NOTICE][PEER_RESPONSE_TERMINAL] Correlated peer response from {}. Request ID: {}. Status: {}. Result: {}.",
self.source.display_identity,
self.correlation_id,
self.status.label(),
format_peer_projection_payload(self.render_payload.as_ref())
)
}
pub fn context_key(&self) -> String {
peer_response_terminal_context_key(&self.source.route_identity, self.correlation_id)
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum PeerConversationProjection {
Message {
peer_id: String,
},
Request {
peer_id: crate::comms::PeerId,
display_name: Option<String>,
request_id: String,
intent: String,
payload: Option<serde_json::Value>,
},
ResponseProgress {
peer_id: String,
request_id: String,
phase: PeerResponseProgressProjectionPhase,
payload: Option<serde_json::Value>,
},
ResponseTerminal {
fact: PeerResponseTerminalFact,
},
}
impl PeerConversationProjection {
pub fn response_terminal(fact: PeerResponseTerminalFact) -> Self {
Self::ResponseTerminal { fact }
}
pub fn block_prefix_text(&self) -> Option<String> {
match self {
Self::Message { peer_id } => Some(format!("[COMMS MESSAGE from {peer_id}]")),
Self::Request { .. }
| Self::ResponseProgress { .. }
| Self::ResponseTerminal { .. } => None,
}
}
pub fn prompt_text(&self) -> String {
match self {
Self::Message { .. } => String::new(),
Self::Request {
peer_id,
display_name,
request_id,
intent,
payload,
} => {
let display_suffix = display_name
.as_deref()
.map(str::trim)
.filter(|name| !name.is_empty())
.map(|name| format!(" (display_name: {name})"))
.unwrap_or_default();
let response_call = crate::interaction::SendResponseCallProjection::new(
*peer_id,
display_name.as_deref(),
request_id.clone(),
);
format!(
"[SYSTEM NOTICE][PEER_REQUEST] Correlated peer request from peer_id {peer_id}{display_suffix}. Intent: {intent}. Request ID: {request_id}. Params: {}. This is not a normal user request and not a prompt for direct user-facing output. {} Do not use send_message for this reply.",
format_peer_projection_payload(payload.as_ref()),
response_call.instruction_text()
)
}
Self::ResponseProgress {
peer_id,
request_id,
phase,
payload,
} => format!(
"[SYSTEM NOTICE][PEER_RESPONSE_PROGRESS] Correlated peer response progress from {peer_id}. Request ID: {request_id}. Phase: {}. Payload: {}.",
phase.label(),
format_peer_projection_payload(payload.as_ref())
),
Self::ResponseTerminal { fact } => fact.prompt_text(),
}
}
pub fn context_key(&self) -> Option<String> {
match self {
Self::ResponseTerminal { fact } => Some(fact.context_key()),
Self::Message { .. } | Self::Request { .. } | Self::ResponseProgress { .. } => None,
}
}
}
pub fn peer_response_terminal_context_key(
route_identity: &PeerResponseTerminalRouteIdentity,
correlation_id: PeerResponseTerminalCorrelationId,
) -> String {
format!("peer_response_terminal:{route_identity}:{correlation_id}")
}
fn format_peer_projection_payload(payload: Option<&serde_json::Value>) -> String {
serde_json::to_string_pretty(payload.unwrap_or(&serde_json::Value::Null))
.unwrap_or_else(|_| "null".to_string())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TurnStateSnapshot {
pub active_run_id: Option<RunId>,
pub loop_state: LoopState,
pub turn_phase: TurnPhase,
pub primitive_kind: Option<TurnPrimitiveKind>,
pub admitted_content_shape: Option<ContentShape>,
pub vision_enabled: bool,
pub image_tool_results_enabled: bool,
pub tool_calls_pending: u64,
pub pending_op_refs: BTreeSet<AsyncOpRef>,
pub barrier_operation_ids: BTreeSet<OperationId>,
pub has_barrier_ops: bool,
pub barrier_satisfied: bool,
pub boundary_count: u64,
pub cancel_after_boundary: bool,
pub terminal_outcome: Option<TurnTerminalOutcome>,
pub terminal_cause_kind: Option<TurnTerminalCauseKind>,
pub extraction_attempts: u64,
pub max_extraction_retries: u64,
pub llm_retry_attempt: u32,
pub llm_retry_max_retries: u32,
pub llm_retry_selected_delay_ms: u64,
}
pub trait TurnStateHandle: Send + Sync {
fn start_conversation_run(
&self,
run_id: RunId,
primitive_kind: TurnPrimitiveKind,
admitted_content_shape: ContentShape,
vision_enabled: bool,
image_tool_results_enabled: bool,
max_extraction_retries: u64,
) -> Result<(), DslTransitionError>;
fn start_immediate_append(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn start_immediate_context(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn primitive_applied(&self) -> Result<(), DslTransitionError>;
fn llm_returned_tool_calls(&self, tool_count: u64) -> Result<(), DslTransitionError>;
fn llm_returned_terminal(&self) -> Result<(), DslTransitionError>;
fn register_pending_ops(
&self,
op_refs: BTreeSet<AsyncOpRef>,
barrier_operation_ids: BTreeSet<OperationId>,
) -> Result<(), DslTransitionError>;
fn tool_calls_resolved(&self) -> Result<(), DslTransitionError>;
fn ops_barrier_satisfied(
&self,
operation_ids: BTreeSet<OperationId>,
) -> Result<(), DslTransitionError>;
fn boundary_continue(&self) -> Result<(), DslTransitionError>;
fn boundary_complete(&self) -> Result<(), DslTransitionError>;
fn enter_extraction(&self, max_retries: u32) -> Result<(), DslTransitionError>;
fn extraction_start(&self) -> Result<(), DslTransitionError>;
fn extraction_validation_passed(&self) -> Result<(), DslTransitionError>;
fn extraction_validation_failed(&self, error: String) -> Result<(), DslTransitionError>;
fn extraction_failed(&self, error: String) -> Result<(), DslTransitionError>;
fn recoverable_failure(&self, retry: LlmRetrySchedule) -> Result<(), DslTransitionError>;
fn fatal_failure(&self, reason: TurnFailureReason) -> Result<(), DslTransitionError>;
fn retry_requested(&self, retry_attempt: u32) -> Result<(), DslTransitionError>;
fn cancel_now(&self) -> Result<(), DslTransitionError>;
fn request_cancel_after_boundary(&self) -> Result<(), DslTransitionError>;
fn cancellation_observed(&self) -> Result<(), DslTransitionError>;
fn acknowledge_terminal(&self, outcome: TurnTerminalOutcome) -> Result<(), DslTransitionError>;
fn turn_limit_reached(&self) -> Result<(), DslTransitionError>;
fn budget_exhausted(&self) -> Result<(), DslTransitionError>;
fn time_budget_exceeded(&self) -> Result<(), DslTransitionError>;
fn force_cancel_no_run(&self) -> Result<(), DslTransitionError>;
fn run_completed(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn run_failed(
&self,
run_id: RunId,
reason: TurnFailureReason,
) -> Result<(), DslTransitionError>;
fn run_cancelled(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn snapshot(&self) -> TurnStateSnapshot;
}
pub trait CommsDrainHandle: Send + Sync {
fn ensure_drain_running(&self) -> Result<(), DslTransitionError>;
fn spawn_drain(&self, mode: DrainMode) -> Result<(), DslTransitionError>;
fn stop_drain(&self) -> Result<(), DslTransitionError>;
fn drain_exited_clean(&self) -> Result<(), DslTransitionError>;
fn drain_exited_respawnable(&self) -> Result<(), DslTransitionError>;
fn notify_drain_exited(&self, reason: DrainExitReason) -> Result<(), DslTransitionError>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SurfaceSnapshot {
pub surface_id: String,
pub base_state: Option<ExternalToolSurfaceBaseState>,
pub pending_op: ExternalToolSurfacePendingOp,
pub staged_op: ExternalToolSurfaceStagedOp,
pub staged_intent_sequence: Option<u64>,
pub pending_task_sequence: Option<u64>,
pub pending_lineage_sequence: Option<u64>,
pub inflight_calls: u64,
pub last_delta_operation: Option<ExternalToolSurfaceDeltaOperation>,
pub last_delta_phase: Option<ExternalToolSurfaceDeltaPhase>,
pub removal_draining_since_ms: Option<u64>,
pub removal_timeout_at_ms: Option<u64>,
pub removal_applied_at_turn: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SurfaceDiagnosticSnapshot {
pub surface_phase: ExternalToolSurfaceGlobalPhase,
pub known_surfaces: BTreeSet<String>,
pub visible_surfaces: BTreeSet<String>,
pub snapshot_epoch: u64,
pub snapshot_aligned_epoch: u64,
pub has_pending_or_staged: bool,
pub entries: Vec<SurfaceSnapshot>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExternalToolSurfaceInput {
StageAdd {
surface_id: String,
now_ms: u64,
},
StageRemove {
surface_id: String,
now_ms: u64,
},
StageReload {
surface_id: String,
now_ms: u64,
},
ApplyBoundary {
surface_id: String,
now_ms: u64,
staged_intent_sequence: u64,
applied_at_turn: u64,
},
MarkPendingSucceeded {
surface_id: String,
pending_task_sequence: u64,
staged_intent_sequence: u64,
},
MarkPendingFailed {
surface_id: String,
pending_task_sequence: u64,
staged_intent_sequence: u64,
cause: ExternalToolSurfaceFailureCause,
},
CallStarted {
surface_id: String,
},
CallFinished {
surface_id: String,
},
FinalizeRemovalClean {
surface_id: String,
},
FinalizeRemovalForced {
surface_id: String,
},
SnapshotAligned {
epoch: u64,
},
Shutdown,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExternalToolSurfaceEffect {
ScheduleSurfaceCompletion {
surface_id: String,
operation: ExternalToolSurfaceDeltaOperation,
pending_task_sequence: u64,
staged_intent_sequence: u64,
applied_at_turn: u64,
},
RefreshVisibleSurfaceSet {
snapshot_epoch: u64,
},
EmitExternalToolDelta {
surface_id: String,
operation: ExternalToolSurfaceDeltaOperation,
phase: ExternalToolSurfaceDeltaPhase,
cause: Option<ExternalToolSurfaceFailureCause>,
},
CloseSurfaceConnection {
surface_id: String,
},
RejectSurfaceCall {
surface_id: String,
cause: ExternalToolSurfaceFailureCause,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExternalToolSurfaceTransition {
pub phase: ExternalToolSurfaceGlobalPhase,
pub effects: Vec<ExternalToolSurfaceEffect>,
}
pub trait ExternalToolSurfaceHandle: Send + Sync {
fn apply_surface_input(
&self,
input: ExternalToolSurfaceInput,
) -> Result<ExternalToolSurfaceTransition, DslTransitionError>;
fn register(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn stage_add(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
fn stage_remove(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
fn stage_reload(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
fn apply_boundary(
&self,
surface_id: String,
now_ms: u64,
staged_intent_sequence: u64,
applied_at_turn: u64,
) -> Result<(), DslTransitionError>;
fn mark_pending_succeeded(
&self,
surface_id: String,
pending_task_sequence: u64,
staged_intent_sequence: u64,
) -> Result<(), DslTransitionError>;
fn mark_pending_failed(
&self,
surface_id: String,
pending_task_sequence: u64,
staged_intent_sequence: u64,
cause: ExternalToolSurfaceFailureCause,
) -> Result<(), DslTransitionError>;
fn call_started(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn call_finished(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn finalize_removal_clean(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn finalize_removal_forced(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn snapshot_aligned(&self, epoch: u64) -> Result<(), DslTransitionError>;
fn shutdown_surface(&self) -> Result<(), DslTransitionError>;
fn surface_snapshot(&self, surface_id: &str) -> Option<SurfaceSnapshot>;
fn diagnostic_snapshot(&self) -> SurfaceDiagnosticSnapshot;
fn visible_surfaces(&self) -> BTreeSet<String>;
fn removing_surfaces(&self) -> BTreeSet<String>;
fn pending_surfaces(&self) -> BTreeSet<String>;
fn has_pending_or_staged(&self) -> bool;
fn snapshot_epoch(&self) -> u64;
fn snapshot_aligned_epoch(&self) -> u64;
}
pub trait PeerCommsHandle: Send + Sync {
fn classify_external_envelope(
&self,
facts: PeerIngressEnvelopeFacts,
) -> Result<PeerIngressAdmission, DslTransitionError>;
fn classify_plain_event(
&self,
facts: PeerIngressPlainEventFacts,
) -> Result<PeerIngressAdmission, DslTransitionError>;
fn set_peer_ingress_context(&self, keep_alive: bool) -> Result<(), DslTransitionError>;
}
pub trait SessionAdmissionHandle: Send + Sync {
fn ingest(
&self,
runtime_id: &str,
work_id: &str,
origin: InputSource,
) -> Result<(), DslTransitionError>;
fn accept_with_completion(
&self,
input_id: &InputId,
request_immediate_processing: bool,
interrupt_yielding: bool,
wake_if_idle: bool,
) -> Result<(), DslTransitionError>;
fn accept_without_wake(&self, input_id: &InputId) -> Result<(), DslTransitionError>;
fn prepare(&self, run_id: &RunId) -> Result<(), DslTransitionError>;
fn commit(&self, input_id: &InputId, run_id: &RunId) -> Result<(), DslTransitionError>;
fn recycle(&self) -> Result<(), DslTransitionError>;
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct LeaseKey {
pub realm: crate::connection::RealmId,
pub binding: crate::connection::BindingId,
pub profile: Option<crate::connection::ProfileId>,
}
impl LeaseKey {
pub fn new(
realm: crate::connection::RealmId,
binding: crate::connection::BindingId,
profile: Option<crate::connection::ProfileId>,
) -> Self {
Self {
realm,
binding,
profile,
}
}
pub fn from_auth_binding(auth_binding: &crate::connection::AuthBindingRef) -> Self {
Self {
realm: auth_binding.realm.clone(),
binding: auth_binding.binding.clone(),
profile: auth_binding.profile.clone(),
}
}
}
impl std::fmt::Display for LeaseKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.profile {
Some(profile) => write!(f, "{}:{}:{}", self.realm, self.binding, profile),
None => write!(f, "{}:{}", self.realm, self.binding),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthLeaseSnapshot {
pub phase: Option<AuthLeasePhase>,
pub expires_at: Option<u64>,
pub credential_present: bool,
pub generation: u64,
pub credential_published_at_millis: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct AuthLeaseTransition {
pub generation: u64,
pub credential_published_at_millis: Option<u64>,
}
impl AuthLeaseTransition {
pub fn new(generation: u64, credential_published_at_millis: Option<u64>) -> Self {
Self {
generation,
credential_published_at_millis,
}
}
}
pub const AUTH_LEASE_TTL_REFRESH_WINDOW_SECS: u64 = 60;
pub trait AuthLeaseHandle: Send + Sync + std::any::Any {
fn acquire_lease(
&self,
lease_key: &LeaseKey,
expires_at: u64,
) -> Result<AuthLeaseTransition, DslTransitionError>;
fn mark_expiring(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
fn begin_refresh(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
fn complete_refresh(
&self,
lease_key: &LeaseKey,
new_expires_at: u64,
now: u64,
) -> Result<AuthLeaseTransition, DslTransitionError>;
fn refresh_failed(
&self,
lease_key: &LeaseKey,
permanent: bool,
) -> Result<(), DslTransitionError>;
fn mark_reauth_required(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
fn release_lease(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
fn release_credential_lifecycle(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError> {
self.release_lease(lease_key)
}
fn restore_auth_lifecycle_snapshot(
&self,
lease_key: &LeaseKey,
snapshot: &AuthLeaseSnapshot,
expires_at: Option<u64>,
) -> Result<(), DslTransitionError> {
if !snapshot.credential_present {
return Ok(());
}
let Some(phase) = snapshot.phase else {
return Ok(());
};
if phase == AuthLeasePhase::Released {
return Ok(());
}
let Some(expires_at) = expires_at else {
return Ok(());
};
self.acquire_lease(lease_key, expires_at)?;
match phase {
AuthLeasePhase::Valid => Ok(()),
AuthLeasePhase::Expiring => self.mark_expiring(lease_key),
AuthLeasePhase::Refreshing => self.begin_refresh(lease_key),
AuthLeasePhase::ReauthRequired => self.mark_reauth_required(lease_key),
AuthLeasePhase::Released => Ok(()),
}
}
fn snapshot(&self, lease_key: &LeaseKey) -> AuthLeaseSnapshot;
}
pub trait McpServerLifecycleHandle: Send + Sync {
fn apply_connect_pending(&self, server_id: &str) -> Result<(), DslTransitionError>;
fn apply_connected(&self, server_id: &str) -> Result<(), DslTransitionError>;
fn apply_failed(&self, server_id: &str, error: &str) -> Result<(), DslTransitionError>;
fn apply_disconnected(&self, server_id: &str) -> Result<(), DslTransitionError>;
fn apply_reload(&self, server_id: &str) -> Result<(), DslTransitionError>;
fn pending_server_ids(&self) -> BTreeSet<String>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PeerTerminalDisposition {
Completed,
Failed,
}
pub trait PeerInteractionHandle: Send + Sync {
fn request_sent(
&self,
corr_id: PeerCorrelationId,
to: String,
) -> Result<(), DslTransitionError>;
fn response_progress(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn response_terminal(
&self,
corr_id: PeerCorrelationId,
disposition: PeerTerminalDisposition,
) -> Result<(), DslTransitionError>;
fn request_timed_out(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn request_received(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn response_replied(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn outbound_state(&self, corr_id: PeerCorrelationId) -> Option<OutboundPeerRequestState>;
fn inbound_state(&self, corr_id: PeerCorrelationId) -> Option<InboundPeerRequestState>;
fn install_cleanup_observer(&self, observer: Arc<dyn PeerInteractionCleanupObserver>);
}
pub trait PeerInteractionCleanupObserver: Send + Sync {
fn on_peer_interaction_cleanup(&self, corr_id: PeerCorrelationId);
}
pub trait SessionContextHandle: Send + Sync {
fn context_advanced(&self, updated_at_ms: u64) -> Result<bool, DslTransitionError>;
fn current_watermark_ms(&self) -> u64;
fn install_observer(&self, observer: Arc<dyn SessionContextAdvancedObserver>);
fn install_observer_with_baseline(
&self,
observer: Arc<dyn SessionContextAdvancedObserver>,
) -> u64 {
self.install_observer(observer);
self.current_watermark_ms()
}
}
pub trait SessionContextAdvancedObserver: Send + Sync {
fn on_session_context_advanced(&self, updated_at_ms: u64);
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum SessionClaimError {
#[error("session identity already claimed: {0}")]
SessionIdentityInUse(SessionId),
}
pub struct SessionClaim {
session_id: SessionId,
handle: Arc<dyn SessionClaimHandle>,
}
impl SessionClaim {
pub fn new(session_id: SessionId, handle: Arc<dyn SessionClaimHandle>) -> Self {
Self { session_id, handle }
}
pub fn session_id(&self) -> &SessionId {
&self.session_id
}
}
impl Drop for SessionClaim {
fn drop(&mut self) {
self.handle.release(&self.session_id);
}
}
impl std::fmt::Debug for SessionClaim {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionClaim")
.field("session_id", &self.session_id)
.finish_non_exhaustive()
}
}
pub trait SessionClaimHandle: Send + Sync {
fn try_acquire(
self: Arc<Self>,
session_id: &SessionId,
) -> Result<SessionClaim, SessionClaimError>;
fn release(&self, session_id: &SessionId);
}
pub struct DefaultSessionClaimRegistry {
claims: std::sync::Mutex<std::collections::HashSet<SessionId>>,
}
impl DefaultSessionClaimRegistry {
pub fn new() -> Self {
Self {
claims: std::sync::Mutex::new(std::collections::HashSet::new()),
}
}
pub fn global() -> Arc<Self> {
use std::sync::OnceLock;
static GLOBAL: OnceLock<Arc<DefaultSessionClaimRegistry>> = OnceLock::new();
Arc::clone(GLOBAL.get_or_init(|| Arc::new(DefaultSessionClaimRegistry::new())))
}
}
impl Default for DefaultSessionClaimRegistry {
fn default() -> Self {
Self::new()
}
}
impl SessionClaimHandle for DefaultSessionClaimRegistry {
fn try_acquire(
self: Arc<Self>,
session_id: &SessionId,
) -> Result<SessionClaim, SessionClaimError> {
let mut claims = self
.claims
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if !claims.insert(session_id.clone()) {
return Err(SessionClaimError::SessionIdentityInUse(session_id.clone()));
}
drop(claims);
Ok(SessionClaim::new(
session_id.clone(),
self as Arc<dyn SessionClaimHandle>,
))
}
fn release(&self, session_id: &SessionId) {
let mut claims = self
.claims
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
claims.remove(session_id);
}
}
pub trait InteractionStreamHandle: Send + Sync {
fn reserved(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn attached(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn completed(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn expired(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn closed_early(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn state(&self, corr_id: PeerCorrelationId) -> Option<InteractionStreamState>;
fn install_cleanup_observer(&self, observer: Arc<dyn InteractionStreamCleanupObserver>);
}
pub trait InteractionStreamCleanupObserver: Send + Sync {
fn on_interaction_stream_cleanup(&self, corr_id: PeerCorrelationId);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum RealtimeProjectionFreshness {
#[default]
Clean,
StaleDeferred,
StaleImmediate,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum RealtimeReconnectPolicy {
#[default]
CleanExit,
ReattachAndRecover,
}
pub trait RealtimeProjectionFreshnessObserver: Send + Sync {
fn on_realtime_projection_freshness_changed(
&self,
new_freshness: RealtimeProjectionFreshness,
frontier_ms: u64,
);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum RealtimeProductTurnPhase {
#[default]
Idle,
AwaitingProgress,
Committed,
OutputStarted,
Preemptible,
}
pub trait RealtimeProductTurnHandle: Send + Sync {
fn turn_in_flight(&self) -> Result<bool, DslTransitionError>;
fn turn_committed(&self) -> Result<bool, DslTransitionError>;
fn output_started(&self) -> Result<bool, DslTransitionError>;
fn turn_interrupted(&self) -> Result<bool, DslTransitionError>;
fn turn_terminal(&self) -> Result<bool, DslTransitionError>;
fn current_phase(&self) -> RealtimeProductTurnPhase;
fn is_in_flight(&self) -> bool {
self.current_phase() != RealtimeProductTurnPhase::Idle
}
fn should_preempt_on_input(&self) -> bool {
self.current_phase() == RealtimeProductTurnPhase::Preemptible
}
fn projection_advance_observed(&self, advanced_at_ms: u64) -> Result<bool, DslTransitionError>;
fn projection_refreshed(&self, observed_ms: u64) -> Result<bool, DslTransitionError>;
fn projection_reset(&self, baseline_ms: u64) -> Result<bool, DslTransitionError>;
fn projection_freshness(&self) -> RealtimeProjectionFreshness;
fn projection_frontier_ms(&self) -> u64;
fn is_projection_stale_immediate(&self) -> bool {
self.projection_freshness() == RealtimeProjectionFreshness::StaleImmediate
}
fn install_projection_freshness_observer(
&self,
observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
);
fn install_projection_freshness_observer_with_snapshot(
&self,
observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
) -> (RealtimeProjectionFreshness, u64) {
self.install_projection_freshness_observer(observer);
(self.projection_freshness(), self.projection_frontier_ms())
}
fn classify_client_input_submitted(&self) -> Result<bool, DslTransitionError>;
fn classify_mid_turn_activity(&self) -> Result<bool, DslTransitionError>;
fn classify_turn_terminated(&self) -> Result<bool, DslTransitionError>;
fn reconnect_policy_on_clean_close(&self) -> RealtimeReconnectPolicy;
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
mod tests {
use super::{
ExternalToolSurfaceEffect, ExternalToolSurfaceFailureCause, ExternalToolSurfaceInput,
PeerConversationProjection, PeerResponseProgressProjectionPhase,
PeerResponseTerminalCorrelationId, PeerResponseTerminalDisplayIdentity,
PeerResponseTerminalFact, PeerResponseTerminalProjectionStatus,
PeerResponseTerminalRenderPayload, PeerResponseTerminalRouteIdentity,
PeerResponseTerminalSource, PeerResponseTerminalTransportIdentity,
peer_response_terminal_context_key,
};
use crate::tool_scope::{ExternalToolSurfaceDeltaOperation, ExternalToolSurfaceDeltaPhase};
#[test]
fn external_tool_surface_pending_failure_cause_projects_external_code() {
let input = ExternalToolSurfaceInput::MarkPendingFailed {
surface_id: "alpha".to_owned(),
pending_task_sequence: 7,
staged_intent_sequence: 11,
cause: ExternalToolSurfaceFailureCause::PendingFailed,
};
let ExternalToolSurfaceInput::MarkPendingFailed { cause, .. } = input else {
panic!("constructed MarkPendingFailed input");
};
assert_eq!(cause, ExternalToolSurfaceFailureCause::PendingFailed);
assert_eq!(cause.as_str(), "pending_failed");
assert_eq!(
serde_json::to_value(cause).expect("serialize failure cause"),
serde_json::json!("pending_failed")
);
let effect = ExternalToolSurfaceEffect::EmitExternalToolDelta {
surface_id: "alpha".to_owned(),
operation: ExternalToolSurfaceDeltaOperation::Add,
phase: ExternalToolSurfaceDeltaPhase::Failed,
cause: Some(cause),
};
assert!(matches!(
effect,
ExternalToolSurfaceEffect::EmitExternalToolDelta {
cause: Some(ExternalToolSurfaceFailureCause::PendingFailed),
..
}
));
}
#[test]
fn peer_terminal_projection_owns_prompt_and_context_key() {
let route_identity =
PeerResponseTerminalRouteIdentity::parse("analyst-rt").expect("route identity");
let correlation_id =
PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
.expect("correlation id");
let projection = PeerConversationProjection::ResponseTerminal {
fact: PeerResponseTerminalFact::new(
PeerResponseTerminalSource::new(
Some(
PeerResponseTerminalTransportIdentity::parse("transport-runtime-1")
.expect("transport identity"),
),
route_identity,
PeerResponseTerminalDisplayIdentity::parse("Analyst")
.expect("display identity"),
),
correlation_id,
PeerResponseTerminalProjectionStatus::Completed,
PeerResponseTerminalRenderPayload::new(Some(serde_json::json!({
"request_intent": "checksum_token",
"request_subject": "alpha beta gamma",
"token": "birch seventeen"
}))),
),
};
assert_eq!(
projection.context_key().as_deref(),
Some("peer_response_terminal:analyst-rt:018f6f79-7a82-7c4e-a552-a3b86f9630f1")
);
assert_eq!(
projection.prompt_text(),
"[SYSTEM NOTICE][PEER_RESPONSE_TERMINAL] Correlated peer response from Analyst. Request ID: 018f6f79-7a82-7c4e-a552-a3b86f9630f1. Status: completed. Result: {\n \"request_intent\": \"checksum_token\",\n \"request_subject\": \"alpha beta gamma\",\n \"token\": \"birch seventeen\"\n}."
);
}
#[test]
fn peer_progress_projection_formats_phase_from_shared_seam() {
let projection = PeerConversationProjection::ResponseProgress {
peer_id: "operator-rt".into(),
request_id: "req-789".into(),
phase: PeerResponseProgressProjectionPhase::PartialResult,
payload: Some(serde_json::json!({ "chunk": "alpha" })),
};
assert_eq!(projection.context_key(), None);
assert_eq!(
projection.prompt_text(),
"[SYSTEM NOTICE][PEER_RESPONSE_PROGRESS] Correlated peer response progress from operator-rt. Request ID: req-789. Phase: partial_result. Payload: {\n \"chunk\": \"alpha\"\n}."
);
}
#[test]
fn peer_terminal_context_key_helper_stays_canonical() {
let route_identity =
PeerResponseTerminalRouteIdentity::parse("peer-a").expect("route identity");
let correlation_id =
PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
.expect("correlation id");
assert_eq!(
peer_response_terminal_context_key(&route_identity, correlation_id),
"peer_response_terminal:peer-a:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
);
}
}