#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
use std::any::Any;
use std::collections::BTreeSet;
use std::sync::Arc;
use crate::LoopState;
use crate::auth::RefreshFailureObservation;
use crate::comms::InputSource;
use crate::interaction::{
PeerIngressAdmission, PeerIngressDequeueAuthority, PeerIngressDequeueFacts,
PeerIngressEnvelopeFacts, PeerIngressPlainEventFacts, PeerIngressReceiveAuthority,
PeerIngressReceiveFacts,
};
use crate::lifecycle::run_primitive::ModelId;
use crate::lifecycle::{InputId, RunId};
use crate::ops::{AsyncOpRef, OperationId};
use crate::peer_correlation::{
InboundPeerRequestState, InteractionStreamState, OutboundPeerRequestState, PeerCorrelationId,
};
use crate::retry::LlmRetrySchedule;
use crate::tool_scope::{
ExternalToolSurfaceBaseState, ExternalToolSurfaceDeltaOperation, ExternalToolSurfaceDeltaPhase,
ExternalToolSurfaceFailureCause, ExternalToolSurfaceGlobalPhase, ExternalToolSurfacePendingOp,
ExternalToolSurfaceStagedOp,
};
use crate::turn_execution_authority::{
ContentShape, TurnExecutionEffect, TurnExecutionInput, TurnFailureReason, TurnFailureSource,
TurnPhase, TurnPrimitiveKind, TurnTerminalCauseKind, TurnTerminalOutcome,
};
use crate::types::{HandlingMode, SessionId};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DrainMode {
Timed,
AttachedSession,
PersistentHost,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DrainExitReason {
IdleTimeout,
Dismissed,
Failed,
Aborted,
SessionShutdown,
}
pub trait ModelRoutingHandle: Send + Sync {
fn set_baseline(
&self,
baseline_model: ModelId,
realtime_capable: bool,
) -> Result<(), DslTransitionError>;
fn hydrate_llm_capability_surface(
&self,
identity: &crate::SessionLlmIdentity,
profile: Option<&crate::model_profile::ModelProfile>,
capability_base_filter: &crate::ToolFilter,
) -> Result<(), DslTransitionError>;
}
impl DrainExitReason {
pub const fn as_str(self) -> &'static str {
match self {
Self::IdleTimeout => "IdleTimeout",
Self::Dismissed => "Dismissed",
Self::Failed => "Failed",
Self::Aborted => "Aborted",
Self::SessionShutdown => "SessionShutdown",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuthLeasePhase {
Valid,
Expiring,
Expired,
Refreshing,
ReauthRequired,
Released,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CredentialUseIntent {
UseCredential,
HoldAuthority,
BeginRefresh,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CredentialUseDisposition {
Authorized,
RefreshRequired,
RefreshDisallowed,
ReauthRequired,
LeaseAbsent,
AlreadyRefreshing,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OAuthLoginCredentialFacts {
pub credential_present: bool,
pub force_refresh: bool,
pub refresh_allowed: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DslRejectionKind {
NoMatchingTransition,
GuardRejected,
RecoveredStateInvariantRejected,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("DSL transition rejected in {context}: {reason}")]
pub struct DslTransitionError {
pub context: &'static str,
pub kind: DslRejectionKind,
pub reason: String,
}
impl DslTransitionError {
pub fn no_matching(context: &'static str, reason: impl Into<String>) -> Self {
Self {
context,
kind: DslRejectionKind::NoMatchingTransition,
reason: reason.into(),
}
}
pub fn guard_rejected(context: &'static str, reason: impl Into<String>) -> Self {
Self {
context,
kind: DslRejectionKind::GuardRejected,
reason: reason.into(),
}
}
pub fn recovered_state_invariant_rejected(
context: &'static str,
reason: impl Into<String>,
) -> Self {
Self {
context,
kind: DslRejectionKind::RecoveredStateInvariantRejected,
reason: reason.into(),
}
}
pub fn is_guard_rejected(&self) -> bool {
self.kind == DslRejectionKind::GuardRejected
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerResponseProgressProjectionPhase {
Accepted,
InProgress,
PartialResult,
}
impl PeerResponseProgressProjectionPhase {
fn label(self) -> &'static str {
match self {
Self::Accepted => "accepted",
Self::InProgress => "in_progress",
Self::PartialResult => "partial_result",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerResponseTerminalProjectionStatus {
Completed,
Failed,
Cancelled,
}
impl PeerResponseTerminalProjectionStatus {
pub fn label(self) -> &'static str {
match self {
Self::Completed => "completed",
Self::Failed => "failed",
Self::Cancelled => "cancelled",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum PeerResponseTerminalFactError {
#[error("transport identity cannot be empty")]
EmptyTransportIdentity,
#[error("route identity cannot be empty")]
EmptyRouteIdentity,
#[error("route identity must be a canonical peer UUID")]
InvalidRouteIdentity,
#[error("display identity is required")]
MissingDisplayIdentity,
#[error("display identity cannot be empty")]
EmptyDisplayIdentity,
#[error("display identity cannot contain control characters")]
InvalidDisplayIdentity,
#[error("correlation id cannot be empty")]
EmptyCorrelationId,
#[error("correlation id must be a UUID: {input}")]
InvalidCorrelationId { input: String },
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct PeerResponseTerminalTransportIdentity(String);
impl PeerResponseTerminalTransportIdentity {
pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
let raw = raw.into();
if raw.trim().is_empty() {
return Err(PeerResponseTerminalFactError::EmptyTransportIdentity);
}
Ok(Self(raw))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for PeerResponseTerminalTransportIdentity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct PeerResponseTerminalRouteIdentity(crate::comms::PeerId);
impl PeerResponseTerminalRouteIdentity {
pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
let raw = raw.into();
if raw.trim().is_empty() {
return Err(PeerResponseTerminalFactError::EmptyRouteIdentity);
}
if raw.chars().any(char::is_control) {
return Err(PeerResponseTerminalFactError::InvalidRouteIdentity);
}
let peer_id = crate::comms::PeerId::parse(raw.trim())
.map_err(|_| PeerResponseTerminalFactError::InvalidRouteIdentity)?;
Ok(Self(peer_id))
}
pub fn peer_id(&self) -> crate::comms::PeerId {
self.0
}
pub fn as_str(&self) -> String {
self.0.as_str()
}
}
impl std::fmt::Display for PeerResponseTerminalRouteIdentity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct PeerResponseTerminalDisplayIdentity(String);
impl PeerResponseTerminalDisplayIdentity {
pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
let raw = raw.into();
if raw.trim().is_empty() {
return Err(PeerResponseTerminalFactError::EmptyDisplayIdentity);
}
if raw.chars().any(char::is_control) {
return Err(PeerResponseTerminalFactError::InvalidDisplayIdentity);
}
Ok(Self(raw))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for PeerResponseTerminalDisplayIdentity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct PeerResponseTerminalCorrelationId(PeerCorrelationId);
impl PeerResponseTerminalCorrelationId {
pub fn parse(raw: impl AsRef<str>) -> Result<Self, PeerResponseTerminalFactError> {
let raw = raw.as_ref();
if raw.trim().is_empty() {
return Err(PeerResponseTerminalFactError::EmptyCorrelationId);
}
uuid::Uuid::parse_str(raw)
.map(|uuid| Self(PeerCorrelationId::from_uuid(uuid)))
.map_err(|_| PeerResponseTerminalFactError::InvalidCorrelationId {
input: raw.to_string(),
})
}
pub const fn from_peer_correlation_id(correlation_id: PeerCorrelationId) -> Self {
Self(correlation_id)
}
pub const fn as_peer_correlation_id(self) -> PeerCorrelationId {
self.0
}
}
impl std::fmt::Display for PeerResponseTerminalCorrelationId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct PeerResponseTerminalRenderPayload(Option<serde_json::Value>);
impl PeerResponseTerminalRenderPayload {
pub fn new(payload: Option<serde_json::Value>) -> Self {
Self(payload)
}
pub fn as_ref(&self) -> Option<&serde_json::Value> {
self.0.as_ref()
}
}
impl From<Option<serde_json::Value>> for PeerResponseTerminalRenderPayload {
fn from(payload: Option<serde_json::Value>) -> Self {
Self::new(payload)
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PeerResponseTerminalSource {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transport_identity: Option<PeerResponseTerminalTransportIdentity>,
pub route_identity: PeerResponseTerminalRouteIdentity,
pub display_identity: PeerResponseTerminalDisplayIdentity,
}
impl PeerResponseTerminalSource {
pub fn new(
transport_identity: Option<PeerResponseTerminalTransportIdentity>,
route_identity: PeerResponseTerminalRouteIdentity,
display_identity: PeerResponseTerminalDisplayIdentity,
) -> Self {
Self {
transport_identity,
route_identity,
display_identity,
}
}
pub fn parse(
transport_identity: Option<impl Into<String>>,
route_identity: impl Into<String>,
display_identity: impl Into<String>,
) -> Result<Self, PeerResponseTerminalFactError> {
Ok(Self::new(
transport_identity
.map(PeerResponseTerminalTransportIdentity::parse)
.transpose()?,
PeerResponseTerminalRouteIdentity::parse(route_identity)?,
PeerResponseTerminalDisplayIdentity::parse(display_identity)?,
))
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct PeerResponseTerminalFact {
pub source: PeerResponseTerminalSource,
pub correlation_id: PeerResponseTerminalCorrelationId,
pub status: PeerResponseTerminalProjectionStatus,
pub render_payload: PeerResponseTerminalRenderPayload,
}
impl PeerResponseTerminalFact {
pub fn new(
source: PeerResponseTerminalSource,
correlation_id: PeerResponseTerminalCorrelationId,
status: PeerResponseTerminalProjectionStatus,
render_payload: PeerResponseTerminalRenderPayload,
) -> Self {
Self {
source,
correlation_id,
status,
render_payload,
}
}
pub fn prompt_text(&self) -> String {
format!(
"Peer terminal response from {}. Request ID: {}. Status: {}. Result: {}.",
self.source.display_identity,
self.correlation_id,
self.status.label(),
format_peer_projection_payload(self.render_payload.as_ref())
)
}
pub fn context_key(&self) -> String {
peer_response_terminal_context_key(&self.source.route_identity, self.correlation_id)
}
pub fn render_payload_value(&self) -> Option<&serde_json::Value> {
self.render_payload.as_ref()
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum PeerConversationProjection {
Message {
peer_id: String,
},
Request {
peer_id: crate::comms::PeerId,
display_name: Option<String>,
request_id: String,
intent: String,
payload: Option<serde_json::Value>,
},
ResponseProgress {
peer_id: String,
request_id: String,
phase: PeerResponseProgressProjectionPhase,
payload: Option<serde_json::Value>,
},
ResponseTerminal {
fact: PeerResponseTerminalFact,
},
}
impl PeerConversationProjection {
pub fn response_terminal(fact: PeerResponseTerminalFact) -> Self {
Self::ResponseTerminal { fact }
}
pub fn block_prefix_text(&self) -> Option<String> {
match self {
Self::Message { peer_id } => Some(format!("Peer message from {peer_id}")),
Self::Request { .. }
| Self::ResponseProgress { .. }
| Self::ResponseTerminal { .. } => None,
}
}
pub fn prompt_text(&self) -> String {
match self {
Self::Message { .. } => String::new(),
Self::Request {
peer_id,
display_name,
request_id,
intent,
payload,
} => {
let display_suffix = display_name
.as_deref()
.map(str::trim)
.filter(|name| !name.is_empty())
.map(|name| format!(" (display_name: {name})"))
.unwrap_or_default();
let response_call = crate::interaction::SendResponseCallProjection::new(
*peer_id,
display_name.as_deref(),
request_id.clone(),
);
format!(
"Peer request from peer_id {peer_id}{display_suffix}. Intent: {intent}. Request ID: {request_id}. Params: {}. This is not a normal user request and not a prompt for direct user-facing output. {} Do not use send_message for this reply.",
format_peer_projection_payload(payload.as_ref()),
response_call.instruction_text()
)
}
Self::ResponseProgress {
peer_id,
request_id,
phase,
payload,
} => format!(
"Peer response progress from {peer_id}. Request ID: {request_id}. Phase: {}. Payload: {}.",
phase.label(),
format_peer_projection_payload(payload.as_ref())
),
Self::ResponseTerminal { fact } => fact.prompt_text(),
}
}
pub fn context_key(&self) -> Option<String> {
match self {
Self::ResponseTerminal { fact } => Some(fact.context_key()),
Self::Message { .. } | Self::Request { .. } | Self::ResponseProgress { .. } => None,
}
}
}
pub fn peer_response_terminal_context_key(
route_identity: &PeerResponseTerminalRouteIdentity,
correlation_id: PeerResponseTerminalCorrelationId,
) -> String {
format!("peer_response_terminal:{route_identity}:{correlation_id}")
}
fn format_peer_projection_payload(payload: Option<&serde_json::Value>) -> String {
serde_json::to_string_pretty(payload.unwrap_or(&serde_json::Value::Null))
.unwrap_or_else(|_| "null".to_string())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TurnStateSnapshot {
pub active_run_id: Option<RunId>,
pub loop_state: LoopState,
pub turn_phase: TurnPhase,
pub turn_terminal: bool,
pub primitive_kind: Option<TurnPrimitiveKind>,
pub admitted_content_shape: Option<ContentShape>,
pub vision_enabled: bool,
pub image_tool_results_enabled: bool,
pub tool_calls_pending: u64,
pub pending_op_refs: BTreeSet<AsyncOpRef>,
pub barrier_operation_ids: BTreeSet<OperationId>,
pub has_barrier_ops: bool,
pub barrier_satisfied: bool,
pub boundary_count: u64,
pub cancel_after_boundary: bool,
pub terminal_outcome: Option<TurnTerminalOutcome>,
pub terminal_cause_kind: Option<TurnTerminalCauseKind>,
pub extraction_attempts: u64,
pub max_extraction_retries: u64,
pub extraction_active: bool,
pub llm_retry_attempt: u32,
pub llm_retry_max_retries: u32,
pub llm_retry_selected_delay_ms: u64,
}
pub trait TurnStateHandle: Send + Sync {
fn apply_turn_input(
&self,
input: TurnExecutionInput,
) -> Result<Vec<TurnExecutionEffect>, DslTransitionError>;
fn start_conversation_run(
&self,
run_id: RunId,
primitive_kind: TurnPrimitiveKind,
admitted_content_shape: ContentShape,
vision_enabled: bool,
image_tool_results_enabled: bool,
max_extraction_retries: u64,
) -> Result<(), DslTransitionError>;
fn start_immediate_append(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn start_immediate_context(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn primitive_applied(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn llm_returned_tool_calls(
&self,
run_id: RunId,
tool_count: u64,
) -> Result<(), DslTransitionError>;
fn llm_returned_terminal(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn register_pending_ops(
&self,
run_id: RunId,
op_refs: BTreeSet<AsyncOpRef>,
barrier_operation_ids: BTreeSet<OperationId>,
) -> Result<(), DslTransitionError>;
fn tool_calls_resolved(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn ops_barrier_satisfied(
&self,
run_id: RunId,
operation_ids: BTreeSet<OperationId>,
) -> Result<(), DslTransitionError>;
fn boundary_continue(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn boundary_complete(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn enter_extraction(&self, run_id: RunId, max_retries: u32) -> Result<(), DslTransitionError>;
fn extraction_start(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn extraction_validation_passed(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn extraction_validation_failed(
&self,
run_id: RunId,
error: String,
) -> Result<(), DslTransitionError>;
fn extraction_failed(&self, run_id: RunId, error: String) -> Result<(), DslTransitionError>;
fn recoverable_failure(
&self,
run_id: RunId,
retry: LlmRetrySchedule,
) -> Result<(), DslTransitionError>;
fn fatal_failure(
&self,
run_id: RunId,
failure: TurnFailureSource,
) -> Result<(), DslTransitionError>;
fn retry_requested(&self, run_id: RunId, retry_attempt: u32) -> Result<(), DslTransitionError>;
fn cancel_now(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn request_cancel_after_boundary(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn cancellation_observed(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn acknowledge_terminal(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn turn_limit_reached(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn budget_exhausted(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn time_budget_exceeded(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn force_cancel_no_run(&self) -> Result<(), DslTransitionError>;
fn run_completed(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn run_failed(
&self,
run_id: RunId,
reason: TurnFailureReason,
) -> Result<(), DslTransitionError>;
fn run_cancelled(&self, run_id: RunId) -> Result<(), DslTransitionError>;
fn snapshot(&self) -> TurnStateSnapshot;
}
pub trait CommsDrainHandle: Send + Sync {
fn ensure_drain_running(&self) -> Result<(), DslTransitionError>;
fn spawn_drain(&self, mode: DrainMode) -> Result<(), DslTransitionError>;
fn stop_drain(&self) -> Result<(), DslTransitionError>;
fn notify_drain_exited(&self, reason: DrainExitReason) -> Result<(), DslTransitionError>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SurfaceSnapshot {
pub surface_id: String,
pub base_state: Option<ExternalToolSurfaceBaseState>,
pub pending_op: ExternalToolSurfacePendingOp,
pub staged_op: ExternalToolSurfaceStagedOp,
pub staged_intent_sequence: Option<u64>,
pub pending_task_sequence: Option<u64>,
pub pending_lineage_sequence: Option<u64>,
pub inflight_calls: u64,
pub last_delta_operation: Option<ExternalToolSurfaceDeltaOperation>,
pub last_delta_phase: Option<ExternalToolSurfaceDeltaPhase>,
pub removal_draining_since_ms: Option<u64>,
pub removal_timeout_at_ms: Option<u64>,
pub removal_applied_at_turn: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SurfaceDiagnosticSnapshot {
pub surface_phase: ExternalToolSurfaceGlobalPhase,
pub known_surfaces: BTreeSet<String>,
pub visible_surfaces: BTreeSet<String>,
pub snapshot_epoch: u64,
pub snapshot_aligned_epoch: u64,
pub has_pending_or_staged: bool,
pub entries: Vec<SurfaceSnapshot>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExternalToolSurfaceInput {
SetRemovalTimeout {
timeout_ms: u64,
},
StageAdd {
surface_id: String,
now_ms: u64,
},
StageRemove {
surface_id: String,
now_ms: u64,
},
StageReload {
surface_id: String,
now_ms: u64,
},
ApplyBoundary {
surface_id: String,
now_ms: u64,
staged_intent_sequence: u64,
applied_at_turn: u64,
},
MarkPendingSucceeded {
surface_id: String,
pending_task_sequence: u64,
staged_intent_sequence: u64,
},
MarkPendingFailed {
surface_id: String,
pending_task_sequence: u64,
staged_intent_sequence: u64,
cause: ExternalToolSurfaceFailureCause,
},
CallStarted {
surface_id: String,
},
CallFinished {
surface_id: String,
},
FinalizeRemovalClean {
surface_id: String,
},
FinalizeRemovalForced {
surface_id: String,
},
SnapshotAligned {
epoch: u64,
},
Shutdown,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExternalToolSurfaceEffect {
ScheduleSurfaceCompletion {
surface_id: String,
operation: ExternalToolSurfaceDeltaOperation,
pending_task_sequence: u64,
staged_intent_sequence: u64,
applied_at_turn: u64,
},
RefreshVisibleSurfaceSet {
snapshot_epoch: u64,
},
EmitExternalToolDelta {
surface_id: String,
operation: ExternalToolSurfaceDeltaOperation,
phase: ExternalToolSurfaceDeltaPhase,
cause: Option<ExternalToolSurfaceFailureCause>,
},
CloseSurfaceConnection {
surface_id: String,
},
RejectSurfaceCall {
surface_id: String,
cause: ExternalToolSurfaceFailureCause,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExternalToolSurfaceTransition {
pub phase: ExternalToolSurfaceGlobalPhase,
pub effects: Vec<ExternalToolSurfaceEffect>,
}
pub trait ExternalToolSurfaceHandle: Send + Sync {
fn apply_surface_input(
&self,
input: ExternalToolSurfaceInput,
) -> Result<ExternalToolSurfaceTransition, DslTransitionError>;
fn register(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn stage_add(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
fn stage_remove(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
fn stage_reload(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
fn apply_boundary(
&self,
surface_id: String,
now_ms: u64,
staged_intent_sequence: u64,
applied_at_turn: u64,
) -> Result<(), DslTransitionError>;
fn mark_pending_succeeded(
&self,
surface_id: String,
pending_task_sequence: u64,
staged_intent_sequence: u64,
) -> Result<(), DslTransitionError>;
fn mark_pending_failed(
&self,
surface_id: String,
pending_task_sequence: u64,
staged_intent_sequence: u64,
cause: ExternalToolSurfaceFailureCause,
) -> Result<(), DslTransitionError>;
fn call_started(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn call_finished(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn finalize_removal_clean(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn finalize_removal_forced(&self, surface_id: String) -> Result<(), DslTransitionError>;
fn snapshot_aligned(&self, epoch: u64) -> Result<(), DslTransitionError>;
fn shutdown_surface(&self) -> Result<(), DslTransitionError>;
fn surface_snapshot(&self, surface_id: &str) -> Option<SurfaceSnapshot>;
fn diagnostic_snapshot(&self) -> SurfaceDiagnosticSnapshot;
fn visible_surfaces(&self) -> BTreeSet<String>;
fn removing_surfaces(&self) -> BTreeSet<String>;
fn pending_surfaces(&self) -> BTreeSet<String>;
fn has_pending_or_staged(&self) -> bool;
fn snapshot_epoch(&self) -> u64;
fn snapshot_aligned_epoch(&self) -> u64;
}
pub trait PeerCommsHandle: Send + Sync {
fn classify_external_envelope(
&self,
facts: PeerIngressEnvelopeFacts,
) -> Result<PeerIngressAdmission, DslTransitionError>;
fn classify_plain_event(
&self,
facts: PeerIngressPlainEventFacts,
) -> Result<PeerIngressAdmission, DslTransitionError>;
fn resolve_peer_ingress_receive(
&self,
facts: PeerIngressReceiveFacts,
) -> Result<PeerIngressReceiveAuthority, DslTransitionError>;
fn resolve_peer_ingress_dequeue(
&self,
facts: PeerIngressDequeueFacts,
) -> Result<PeerIngressDequeueAuthority, DslTransitionError>;
fn set_peer_ingress_context(&self, keep_alive: bool) -> Result<(), DslTransitionError>;
fn install_generated_peer_comms_on_target(
&self,
_expected_owner: &crate::comms::GeneratedPeerCommsOwnerToken,
_target: &(dyn PeerCommsInstallTarget + '_),
) -> Result<(), String> {
Err("peer-comms handle does not expose generated install target authority".to_string())
}
}
#[derive(Clone)]
pub struct GeneratedPeerCommsInstallFactory {
handle: std::sync::Arc<dyn PeerCommsHandle>,
owner_token: crate::comms::GeneratedPeerCommsOwnerToken,
}
impl std::fmt::Debug for GeneratedPeerCommsInstallFactory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GeneratedPeerCommsInstallFactory")
.field("handle", &"<dyn PeerCommsHandle>")
.field("owner_token", &self.owner_token)
.finish()
}
}
impl GeneratedPeerCommsInstallFactory {
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
#[doc(hidden)]
pub fn __from_runtime_generated_authority(
token: &'static (dyn Any + Send + Sync),
handle: std::sync::Arc<dyn PeerCommsHandle>,
owner_token: std::sync::Arc<dyn Any + Send + Sync>,
) -> Result<Self, String> {
validate_peer_comms_install_bridge_token(token)?;
Ok(Self {
handle,
owner_token: crate::comms::GeneratedPeerCommsOwnerToken::from_generated_owner_token(
owner_token,
),
})
}
pub fn peer_comms_handle(&self) -> &std::sync::Arc<dyn PeerCommsHandle> {
&self.handle
}
pub fn install_on_target(
&self,
target: &(dyn PeerCommsInstallTarget + '_),
) -> Result<(), String> {
self.handle
.install_generated_peer_comms_on_target(&self.owner_token, target)
}
}
#[derive(Clone)]
pub struct GeneratedPeerCommsInstall {
handle: std::sync::Arc<dyn PeerCommsHandle>,
owner_token: crate::comms::GeneratedPeerCommsOwnerToken,
target_peer_id: crate::comms::PeerId,
}
impl std::fmt::Debug for GeneratedPeerCommsInstall {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GeneratedPeerCommsInstall")
.field("handle", &"<dyn PeerCommsHandle>")
.field("owner_token", &self.owner_token)
.field("target_peer_id", &self.target_peer_id)
.finish()
}
}
impl GeneratedPeerCommsInstall {
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
#[doc(hidden)]
pub fn __from_runtime_generated_authority(
token: &'static (dyn Any + Send + Sync),
handle: std::sync::Arc<dyn PeerCommsHandle>,
owner_token: std::sync::Arc<dyn Any + Send + Sync>,
target_peer_id: crate::comms::PeerId,
) -> Result<Self, String> {
validate_peer_comms_install_bridge_token(token)?;
Ok(Self {
handle,
owner_token: crate::comms::GeneratedPeerCommsOwnerToken::from_generated_owner_token(
owner_token,
),
target_peer_id,
})
}
pub fn peer_comms_handle(&self) -> &std::sync::Arc<dyn PeerCommsHandle> {
&self.handle
}
pub fn owner_token(&self) -> crate::comms::GeneratedPeerCommsOwnerToken {
self.owner_token.clone()
}
pub fn target_peer_id(&self) -> crate::comms::PeerId {
self.target_peer_id
}
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
#[allow(improper_ctypes_definitions, unsafe_code)]
unsafe extern "Rust" {
#[link_name = concat!(
"__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_comms_trust_reconcile_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn runtime_peer_comms_install_generated_authority_bridge_token_is_valid(
token: &(dyn Any + Send + Sync),
) -> bool;
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
fn validate_peer_comms_install_bridge_token(token: &(dyn Any + Send + Sync)) -> Result<(), String> {
#[allow(unsafe_code)]
let valid =
unsafe { runtime_peer_comms_install_generated_authority_bridge_token_is_valid(token) };
if valid {
Ok(())
} else {
Err("generated peer-comms install requires the matching generated runtime protocol bridge token".into())
}
}
pub trait PeerCommsInstallTarget: crate::agent::CommsRuntime {
fn generated_peer_comms_target_endpoint(
&self,
) -> Result<crate::comms::TrustedPeerDescriptor, String> {
let peer_id = self
.peer_id()
.ok_or_else(|| "runtime peer_id unavailable".to_string())?;
let name = self
.comms_name()
.ok_or_else(|| "runtime comms_name unavailable".to_string())?;
let address = self
.advertised_address()
.ok_or_else(|| "runtime advertised_address unavailable".to_string())?;
let pubkey = self
.public_key_bytes()
.ok_or_else(|| "runtime public_key_bytes unavailable".to_string())?;
crate::comms::TrustedPeerDescriptor::unsigned_with_pubkey(
name,
peer_id.to_string(),
pubkey,
address,
)
.map_err(|error| format!("runtime peer-comms install target endpoint invalid: {error}"))
}
fn install_generated_peer_comms_handle(
&self,
install: GeneratedPeerCommsInstall,
) -> Result<(), String>;
}
pub trait SessionAdmissionHandle: Send + Sync {
fn ingest(
&self,
runtime_id: &str,
work_id: &str,
origin: InputSource,
) -> Result<(), DslTransitionError>;
fn accept_with_completion(
&self,
input_id: &InputId,
request_immediate_processing: bool,
interrupt_yielding: bool,
wake_if_idle: bool,
) -> Result<(), DslTransitionError>;
fn accept_without_wake(&self, input_id: &InputId) -> Result<(), DslTransitionError>;
fn prepare(&self, run_id: &RunId) -> Result<(), DslTransitionError>;
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct LeaseKey {
pub realm: crate::connection::RealmId,
pub binding: crate::connection::BindingId,
pub profile: Option<crate::connection::ProfileId>,
}
impl LeaseKey {
pub fn new(
realm: crate::connection::RealmId,
binding: crate::connection::BindingId,
profile: Option<crate::connection::ProfileId>,
) -> Self {
Self {
realm,
binding,
profile,
}
}
pub fn from_auth_binding(auth_binding: &crate::connection::AuthBindingRef) -> Self {
Self {
realm: auth_binding.realm.clone(),
binding: auth_binding.binding.clone(),
profile: auth_binding.profile.clone(),
}
}
}
impl std::fmt::Display for LeaseKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.profile {
Some(profile) => write!(f, "{}:{}:{}", self.realm, self.binding, profile),
None => write!(f, "{}:{}", self.realm, self.binding),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthLeaseSnapshot {
pub phase: Option<AuthLeasePhase>,
pub expires_at: Option<u64>,
pub credential_present: bool,
pub generation: u64,
pub credential_published_at_millis: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthLeaseRestoreSnapshot {
lease_key: LeaseKey,
snapshot: AuthLeaseSnapshot,
captured_by: std::any::TypeId,
captured_by_instance: usize,
}
impl AuthLeaseRestoreSnapshot {
fn capture(
lease_key: LeaseKey,
snapshot: AuthLeaseSnapshot,
captured_by: std::any::TypeId,
captured_by_instance: usize,
) -> Self {
Self {
lease_key,
snapshot,
captured_by,
captured_by_instance,
}
}
pub fn lease_key(&self) -> &LeaseKey {
&self.lease_key
}
pub fn snapshot(&self) -> &AuthLeaseSnapshot {
&self.snapshot
}
#[doc(hidden)]
pub fn captured_by_type_id(&self) -> std::any::TypeId {
self.captured_by
}
#[doc(hidden)]
pub fn captured_by_instance_id(&self) -> usize {
self.captured_by_instance
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthLeaseTransition {
lease_key: LeaseKey,
phase: AuthLeasePhase,
expires_at: u64,
generation: u64,
credential_published_at_millis: Option<u64>,
}
impl AuthLeaseTransition {
pub fn lease_key(&self) -> &LeaseKey {
&self.lease_key
}
pub fn phase(&self) -> AuthLeasePhase {
self.phase
}
pub fn expires_at(&self) -> u64 {
self.expires_at
}
pub fn generation(&self) -> u64 {
self.generation
}
pub fn credential_published_at_millis(&self) -> Option<u64> {
self.credential_published_at_millis
}
#[cfg_attr(
any(not(meerkat_internal_generated_authority_bridge), test),
allow(dead_code)
)]
fn from_generated_auth_lease_publication_parts(
lease_key: LeaseKey,
phase: AuthLeasePhase,
expires_at: u64,
generation: u64,
credential_published_at_millis: Option<u64>,
) -> Self {
Self {
lease_key,
phase,
expires_at,
generation,
credential_published_at_millis,
}
}
}
#[derive(Clone)]
pub struct GeneratedAuthLeaseHandle {
inner: Arc<dyn AuthLeaseHandle>,
}
impl GeneratedAuthLeaseHandle {
pub fn as_handle(&self) -> &dyn AuthLeaseHandle {
self.inner.as_ref()
}
pub fn clone_handle(&self) -> Arc<dyn AuthLeaseHandle> {
Arc::clone(&self.inner)
}
#[cfg_attr(
any(not(meerkat_internal_generated_authority_bridge), test),
allow(dead_code)
)]
fn from_generated_authority(inner: Arc<dyn AuthLeaseHandle>) -> Self {
Self { inner }
}
}
impl std::fmt::Debug for GeneratedAuthLeaseHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GeneratedAuthLeaseHandle")
.finish_non_exhaustive()
}
}
impl std::ops::Deref for GeneratedAuthLeaseHandle {
type Target = dyn AuthLeaseHandle;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
impl AsRef<dyn AuthLeaseHandle> for GeneratedAuthLeaseHandle {
fn as_ref(&self) -> &dyn AuthLeaseHandle {
self.inner.as_ref()
}
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
#[allow(improper_ctypes_definitions, unsafe_code)]
unsafe extern "Rust" {
#[link_name = concat!(
"__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_auth_lease_lifecycle_publication_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn runtime_auth_lease_lifecycle_publication_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
#[doc(hidden)]
#[allow(improper_ctypes_definitions, unsafe_code)]
#[unsafe(export_name = concat!(
"__meerkat_core_runtime_generated_auth_lease_transition_build_v1_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
))]
pub(crate) extern "Rust" fn runtime_generated_auth_lease_transition_build(
token: &'static (dyn std::any::Any + Send + Sync),
lease_key: LeaseKey,
phase: AuthLeasePhase,
expires_at: u64,
generation: u64,
credential_published_at_millis: Option<u64>,
) -> Result<AuthLeaseTransition, String> {
validate_runtime_generated_authority_bridge_token(token)?;
Ok(
AuthLeaseTransition::from_generated_auth_lease_publication_parts(
lease_key,
phase,
expires_at,
generation,
credential_published_at_millis,
),
)
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
#[doc(hidden)]
#[allow(improper_ctypes_definitions, unsafe_code)]
#[unsafe(export_name = concat!(
"__meerkat_core_runtime_generated_auth_lease_handle_build_v1_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
))]
pub(crate) extern "Rust" fn runtime_generated_auth_lease_handle_build(
token: &'static (dyn std::any::Any + Send + Sync),
handle: Arc<dyn AuthLeaseHandle>,
) -> Result<GeneratedAuthLeaseHandle, String> {
validate_runtime_generated_authority_bridge_token(token)?;
Ok(GeneratedAuthLeaseHandle::from_generated_authority(handle))
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
fn validate_runtime_generated_authority_bridge_token(
token: &(dyn std::any::Any + Send + Sync),
) -> Result<(), String> {
#[allow(unsafe_code)]
let valid = unsafe {
runtime_auth_lease_lifecycle_publication_generated_authority_bridge_token_is_valid(token)
};
if valid {
Ok(())
} else {
Err(
"generated auth lease transition requires the generated AuthMachine protocol bridge token"
.into(),
)
}
}
pub const AUTH_LEASE_TTL_REFRESH_WINDOW_SECS: u64 = 60;
pub trait AuthLeaseHandle: Send + Sync + std::any::Any {
fn acquire_lease(
&self,
lease_key: &LeaseKey,
expires_at: u64,
) -> Result<AuthLeaseTransition, DslTransitionError>;
fn mark_expiring(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
fn observe_credential_freshness(
&self,
lease_key: &LeaseKey,
now: u64,
refresh_window_secs: u64,
) -> Result<(), DslTransitionError>;
fn begin_refresh(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
fn complete_refresh(
&self,
lease_key: &LeaseKey,
new_expires_at: u64,
now: u64,
) -> Result<AuthLeaseTransition, DslTransitionError>;
fn refresh_failed(
&self,
lease_key: &LeaseKey,
observation: RefreshFailureObservation,
) -> Result<(), DslTransitionError>;
fn mark_reauth_required(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
fn release_lease(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
fn release_credential_lifecycle(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError> {
self.release_lease(lease_key)
}
fn capture_auth_lifecycle_restore_snapshot(
&self,
lease_key: &LeaseKey,
) -> AuthLeaseRestoreSnapshot {
AuthLeaseRestoreSnapshot::capture(
lease_key.clone(),
self.snapshot(lease_key),
self.type_id(),
self.auth_lifecycle_restore_instance_id(),
)
}
#[doc(hidden)]
fn auth_lifecycle_restore_instance_id(&self) -> usize {
std::ptr::from_ref(self).cast::<()>() as usize
}
fn restore_auth_lifecycle_snapshot(
&self,
snapshot: &AuthLeaseRestoreSnapshot,
) -> Result<Option<AuthLeaseTransition>, DslTransitionError> {
let _ = snapshot;
Err(DslTransitionError::no_matching(
"AuthLeaseHandle::restore_auth_lifecycle_snapshot",
"restoring auth lifecycle snapshots requires generated AuthMachine authority",
))
}
fn restore_published_credential_lifecycle(
&self,
lease_key: &LeaseKey,
publication: &crate::generated::auth_lease_durable_lifecycle_marker::AuthLeaseDurableRestorePublication,
) -> Result<AuthLeaseTransition, DslTransitionError> {
let _ = (lease_key, publication);
Err(DslTransitionError::no_matching(
"AuthLeaseHandle::restore_published_credential_lifecycle",
"restoring durable auth lifecycle publications requires generated AuthMachine authority",
))
}
fn resolve_credential_use_admission(
&self,
lease_key: &LeaseKey,
intent: CredentialUseIntent,
) -> Result<CredentialUseDisposition, DslTransitionError> {
let _ = (lease_key, intent);
Err(DslTransitionError::no_matching(
"AuthLeaseHandle::resolve_credential_use_admission",
"classifying credential-use admission requires generated AuthMachine authority",
))
}
fn resolve_oauth_login_credential_disposition(
&self,
lease_key: &LeaseKey,
facts: OAuthLoginCredentialFacts,
) -> Result<CredentialUseDisposition, DslTransitionError> {
let _ = (lease_key, facts);
Err(DslTransitionError::no_matching(
"AuthLeaseHandle::resolve_oauth_login_credential_disposition",
"classifying OAuth-login credential disposition requires generated AuthMachine authority",
))
}
fn snapshot(&self, lease_key: &LeaseKey) -> AuthLeaseSnapshot;
}
pub trait McpServerLifecycleHandle: Send + Sync {
fn apply_connect_pending(&self, server_id: &str) -> Result<(), DslTransitionError>;
fn apply_connected(&self, server_id: &str) -> Result<(), DslTransitionError>;
fn apply_failed(&self, server_id: &str, error: &str) -> Result<(), DslTransitionError>;
fn apply_disconnected(&self, server_id: &str) -> Result<(), DslTransitionError>;
fn apply_reload(&self, server_id: &str) -> Result<(), DslTransitionError>;
fn pending_server_ids(&self) -> BTreeSet<String>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PeerTerminalDisposition {
Completed,
Failed,
}
pub trait PeerInteractionHandle: Send + Sync {
fn request_sent(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn response_progress(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn response_terminal(
&self,
corr_id: PeerCorrelationId,
disposition: PeerTerminalDisposition,
) -> Result<(), DslTransitionError>;
fn response_rejected(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn request_timed_out(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn request_send_failed(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn request_received(
&self,
corr_id: PeerCorrelationId,
handling_mode: HandlingMode,
) -> Result<(), DslTransitionError>;
fn classify_response_reply(
&self,
status: crate::ResponseStatus,
) -> Result<crate::TerminalityClass, DslTransitionError>;
fn response_replied(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn outbound_state(&self, corr_id: PeerCorrelationId) -> Option<OutboundPeerRequestState>;
fn inbound_state(&self, corr_id: PeerCorrelationId) -> Option<InboundPeerRequestState>;
fn inbound_handling_mode(&self, corr_id: PeerCorrelationId) -> Option<HandlingMode>;
fn install_cleanup_observer(&self, observer: Arc<dyn PeerInteractionCleanupObserver>);
}
pub trait PeerInteractionCleanupObserver: Send + Sync {
fn on_peer_interaction_cleanup(&self, corr_id: PeerCorrelationId);
}
pub trait SessionContextHandle: Send + Sync {
fn context_advanced(&self, updated_at_ms: u64) -> Result<bool, DslTransitionError>;
fn current_watermark_ms(&self) -> u64;
fn install_observer(&self, observer: Arc<dyn SessionContextAdvancedObserver>);
fn install_observer_with_baseline(
&self,
observer: Arc<dyn SessionContextAdvancedObserver>,
) -> u64;
}
pub trait SessionContextAdvancedObserver: Send + Sync {
fn on_session_context_advanced(&self, updated_at_ms: u64);
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum SessionClaimError {
#[error("session identity already claimed: {0}")]
SessionIdentityInUse(SessionId),
}
pub struct SessionClaim {
session_id: SessionId,
handle: Arc<dyn SessionClaimHandle>,
}
impl SessionClaim {
pub fn new(session_id: SessionId, handle: Arc<dyn SessionClaimHandle>) -> Self {
Self { session_id, handle }
}
pub fn session_id(&self) -> &SessionId {
&self.session_id
}
}
impl Drop for SessionClaim {
fn drop(&mut self) {
self.handle.release(&self.session_id);
}
}
impl std::fmt::Debug for SessionClaim {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionClaim")
.field("session_id", &self.session_id)
.finish_non_exhaustive()
}
}
pub trait SessionClaimHandle: Send + Sync {
fn try_acquire(
self: Arc<Self>,
session_id: &SessionId,
) -> Result<SessionClaim, SessionClaimError>;
fn release(&self, session_id: &SessionId);
}
pub struct DefaultSessionClaimRegistry {
claims: std::sync::Mutex<std::collections::HashSet<SessionId>>,
}
impl DefaultSessionClaimRegistry {
pub fn new() -> Self {
Self {
claims: std::sync::Mutex::new(std::collections::HashSet::new()),
}
}
pub fn global() -> Arc<Self> {
use std::sync::OnceLock;
static GLOBAL: OnceLock<Arc<DefaultSessionClaimRegistry>> = OnceLock::new();
Arc::clone(GLOBAL.get_or_init(|| Arc::new(DefaultSessionClaimRegistry::new())))
}
}
impl Default for DefaultSessionClaimRegistry {
fn default() -> Self {
Self::new()
}
}
impl SessionClaimHandle for DefaultSessionClaimRegistry {
fn try_acquire(
self: Arc<Self>,
session_id: &SessionId,
) -> Result<SessionClaim, SessionClaimError> {
let mut claims = self
.claims
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if !claims.insert(session_id.clone()) {
return Err(SessionClaimError::SessionIdentityInUse(session_id.clone()));
}
drop(claims);
Ok(SessionClaim::new(
session_id.clone(),
self as Arc<dyn SessionClaimHandle>,
))
}
fn release(&self, session_id: &SessionId) {
let mut claims = self
.claims
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
claims.remove(session_id);
}
}
pub trait InteractionStreamHandle: Send + Sync {
fn reserved(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn attached(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn completed(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn expired(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn closed_early(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
fn state(&self, corr_id: PeerCorrelationId) -> Option<InteractionStreamState>;
fn install_cleanup_observer(&self, observer: Arc<dyn InteractionStreamCleanupObserver>);
}
pub trait InteractionStreamCleanupObserver: Send + Sync {
fn on_interaction_stream_cleanup(&self, corr_id: PeerCorrelationId);
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
mod tests {
use super::{
DslRejectionKind, DslTransitionError, ExternalToolSurfaceEffect,
ExternalToolSurfaceFailureCause, ExternalToolSurfaceInput, PeerConversationProjection,
PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact,
PeerResponseTerminalFactError, PeerResponseTerminalProjectionStatus,
PeerResponseTerminalRenderPayload, PeerResponseTerminalRouteIdentity,
PeerResponseTerminalSource, PeerResponseTerminalTransportIdentity,
peer_response_terminal_context_key,
};
use crate::tool_scope::{ExternalToolSurfaceDeltaOperation, ExternalToolSurfaceDeltaPhase};
#[test]
fn recovered_state_rejection_is_not_guard_noop() {
let err =
DslTransitionError::recovered_state_invariant_rejected("recover", "bad invariant");
assert_eq!(err.kind, DslRejectionKind::RecoveredStateInvariantRejected);
assert!(!err.is_guard_rejected());
}
#[test]
fn external_tool_surface_pending_failure_cause_projects_external_code() {
let input = ExternalToolSurfaceInput::MarkPendingFailed {
surface_id: "alpha".to_owned(),
pending_task_sequence: 7,
staged_intent_sequence: 11,
cause: ExternalToolSurfaceFailureCause::PendingFailed,
};
let ExternalToolSurfaceInput::MarkPendingFailed { cause, .. } = input else {
panic!("constructed MarkPendingFailed input");
};
assert_eq!(cause, ExternalToolSurfaceFailureCause::PendingFailed);
assert_eq!(cause.as_str(), "pending_failed");
assert_eq!(
serde_json::to_value(cause).expect("serialize failure cause"),
serde_json::json!("pending_failed")
);
let effect = ExternalToolSurfaceEffect::EmitExternalToolDelta {
surface_id: "alpha".to_owned(),
operation: ExternalToolSurfaceDeltaOperation::Add,
phase: ExternalToolSurfaceDeltaPhase::Failed,
cause: Some(cause),
};
assert!(matches!(
effect,
ExternalToolSurfaceEffect::EmitExternalToolDelta {
cause: Some(ExternalToolSurfaceFailureCause::PendingFailed),
..
}
));
}
#[test]
fn peer_terminal_projection_owns_prompt_and_context_key() {
let route_id = "550e8400-e29b-41d4-a716-446655440000";
let route_identity =
PeerResponseTerminalRouteIdentity::parse(route_id).expect("route identity");
let correlation_id =
PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
.expect("correlation id");
let projection = PeerConversationProjection::ResponseTerminal {
fact: PeerResponseTerminalFact::new(
PeerResponseTerminalSource::new(
Some(
PeerResponseTerminalTransportIdentity::parse("transport-runtime-1")
.expect("transport identity"),
),
route_identity,
PeerResponseTerminalDisplayIdentity::parse("Analyst")
.expect("display identity"),
),
correlation_id,
PeerResponseTerminalProjectionStatus::Completed,
PeerResponseTerminalRenderPayload::new(Some(serde_json::json!({
"request_intent": "checksum_token",
"request_subject": "alpha beta gamma",
"token": "birch seventeen"
}))),
),
};
assert_eq!(
projection.context_key().as_deref(),
Some(
"peer_response_terminal:550e8400-e29b-41d4-a716-446655440000:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
)
);
assert_eq!(
projection.prompt_text(),
"Peer terminal response from Analyst. Request ID: 018f6f79-7a82-7c4e-a552-a3b86f9630f1. Status: completed. Result: {\n \"request_intent\": \"checksum_token\",\n \"request_subject\": \"alpha beta gamma\",\n \"token\": \"birch seventeen\"\n}."
);
}
#[test]
fn peer_terminal_fact_is_structural_projection_only() {
let route_id = "550e8400-e29b-41d4-a716-446655440000";
let route_identity =
PeerResponseTerminalRouteIdentity::parse(route_id).expect("route identity");
let correlation_id =
PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
.expect("correlation id");
let fact = PeerResponseTerminalFact::new(
PeerResponseTerminalSource::new(
None,
route_identity,
PeerResponseTerminalDisplayIdentity::parse("Analyst").expect("display identity"),
),
correlation_id,
PeerResponseTerminalProjectionStatus::Cancelled,
PeerResponseTerminalRenderPayload::new(None),
);
assert_eq!(
fact.status,
PeerResponseTerminalProjectionStatus::Cancelled,
"status support is decided by generated admission authority, not fact construction"
);
}
#[test]
fn peer_progress_projection_formats_phase_from_shared_seam() {
let projection = PeerConversationProjection::ResponseProgress {
peer_id: "operator-rt".into(),
request_id: "req-789".into(),
phase: PeerResponseProgressProjectionPhase::PartialResult,
payload: Some(serde_json::json!({ "chunk": "alpha" })),
};
assert_eq!(projection.context_key(), None);
assert_eq!(
projection.prompt_text(),
"Peer response progress from operator-rt. Request ID: req-789. Phase: partial_result. Payload: {\n \"chunk\": \"alpha\"\n}."
);
}
#[test]
fn peer_terminal_context_key_helper_stays_canonical() {
let route_id = "550e8400-e29b-41d4-a716-446655440000";
let route_identity =
PeerResponseTerminalRouteIdentity::parse(route_id).expect("route identity");
let correlation_id =
PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
.expect("correlation id");
assert_eq!(
peer_response_terminal_context_key(&route_identity, correlation_id),
"peer_response_terminal:550e8400-e29b-41d4-a716-446655440000:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
);
}
#[test]
fn peer_terminal_route_identity_rejects_display_name_alias() {
assert!(matches!(
PeerResponseTerminalRouteIdentity::parse("analyst-rt"),
Err(PeerResponseTerminalFactError::InvalidRouteIdentity)
));
}
#[test]
fn peer_terminal_fact_round_trips_through_serde() {
let fact = PeerResponseTerminalFact::new(
PeerResponseTerminalSource::parse(
Some("inproc://analyst"),
"550e8400-e29b-41d4-a716-446655440000",
"analyst-rt",
)
.expect("source"),
PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
.expect("correlation id"),
PeerResponseTerminalProjectionStatus::Completed,
PeerResponseTerminalRenderPayload::new(Some(serde_json::json!({
"request_intent": "checksum_token",
"token": "birch seventeen",
}))),
);
let json = serde_json::to_string(&fact).expect("serialize fact");
let decoded: PeerResponseTerminalFact =
serde_json::from_str(&json).expect("deserialize fact");
assert_eq!(decoded, fact);
assert_eq!(
decoded.context_key(),
"peer_response_terminal:550e8400-e29b-41d4-a716-446655440000:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
);
assert_eq!(
decoded
.render_payload_value()
.and_then(|payload| payload.get("token"))
.and_then(|token| token.as_str()),
Some("birch seventeen")
);
}
}