1use std::collections::BTreeSet;
25use std::sync::Arc;
26
27use crate::LoopState;
28use crate::comms::InputSource;
29use crate::interaction::{
30 PeerIngressAdmission, PeerIngressEnvelopeFacts, PeerIngressPlainEventFacts,
31};
32use crate::lifecycle::run_primitive::ModelId;
33use crate::lifecycle::{InputId, RunId};
34use crate::ops::{AsyncOpRef, OperationId};
35use crate::peer_correlation::{
36 InboundPeerRequestState, InteractionStreamState, OutboundPeerRequestState, PeerCorrelationId,
37};
38use crate::retry::LlmRetrySchedule;
39use crate::tool_scope::{
40 ExternalToolSurfaceBaseState, ExternalToolSurfaceDeltaOperation, ExternalToolSurfaceDeltaPhase,
41 ExternalToolSurfaceFailureCause, ExternalToolSurfaceGlobalPhase, ExternalToolSurfacePendingOp,
42 ExternalToolSurfaceStagedOp,
43};
44use crate::turn_execution_authority::{
45 ContentShape, TurnFailureReason, TurnPhase, TurnPrimitiveKind, TurnTerminalCauseKind,
46 TurnTerminalOutcome,
47};
48use crate::types::SessionId;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum DrainMode {
62 Timed,
64 AttachedSession,
66 PersistentHost,
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum DrainExitReason {
73 IdleTimeout,
74 Dismissed,
75 Failed,
76 Aborted,
77 SessionShutdown,
78}
79
80pub trait ModelRoutingHandle: Send + Sync {
87 fn set_baseline(
89 &self,
90 baseline_model: ModelId,
91 realtime_capable: bool,
92 ) -> Result<(), DslTransitionError>;
93}
94
95impl DrainExitReason {
96 pub const fn as_str(self) -> &'static str {
99 match self {
100 Self::IdleTimeout => "IdleTimeout",
101 Self::Dismissed => "Dismissed",
102 Self::Failed => "Failed",
103 Self::Aborted => "Aborted",
104 Self::SessionShutdown => "SessionShutdown",
105 }
106 }
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum AuthLeasePhase {
112 Valid,
113 Expiring,
114 Refreshing,
115 ReauthRequired,
116 Released,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128pub enum DslRejectionKind {
129 NoMatchingTransition,
133 GuardRejected,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
147#[error("DSL transition rejected in {context}: {reason}")]
148pub struct DslTransitionError {
149 pub context: &'static str,
151 pub kind: DslRejectionKind,
153 pub reason: String,
156}
157
158impl DslTransitionError {
159 pub fn new(context: &'static str, reason: impl Into<String>) -> Self {
165 Self::no_matching(context, reason)
166 }
167
168 pub fn no_matching(context: &'static str, reason: impl Into<String>) -> Self {
170 Self {
171 context,
172 kind: DslRejectionKind::NoMatchingTransition,
173 reason: reason.into(),
174 }
175 }
176
177 pub fn guard_rejected(context: &'static str, reason: impl Into<String>) -> Self {
179 Self {
180 context,
181 kind: DslRejectionKind::GuardRejected,
182 reason: reason.into(),
183 }
184 }
185
186 pub fn is_guard_rejected(&self) -> bool {
188 self.kind == DslRejectionKind::GuardRejected
189 }
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
197#[serde(rename_all = "snake_case")]
198pub enum PeerResponseProgressProjectionPhase {
199 Accepted,
200 InProgress,
201 PartialResult,
202}
203
204impl PeerResponseProgressProjectionPhase {
205 fn label(self) -> &'static str {
206 match self {
207 Self::Accepted => "accepted",
208 Self::InProgress => "in_progress",
209 Self::PartialResult => "partial_result",
210 }
211 }
212}
213
214#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
215#[serde(rename_all = "snake_case")]
216pub enum PeerResponseTerminalProjectionStatus {
217 Completed,
218 Failed,
219 Cancelled,
220}
221
222impl PeerResponseTerminalProjectionStatus {
223 fn label(self) -> &'static str {
224 match self {
225 Self::Completed => "completed",
226 Self::Failed => "failed",
227 Self::Cancelled => "cancelled",
228 }
229 }
230}
231
232#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
233pub enum PeerResponseTerminalFactError {
234 #[error("transport identity cannot be empty")]
235 EmptyTransportIdentity,
236 #[error("route identity cannot be empty")]
237 EmptyRouteIdentity,
238 #[error("route identity must be a canonical peer UUID")]
239 InvalidRouteIdentity,
240 #[error("display identity is required")]
241 MissingDisplayIdentity,
242 #[error("display identity cannot be empty")]
243 EmptyDisplayIdentity,
244 #[error("display identity cannot contain control characters")]
245 InvalidDisplayIdentity,
246 #[error("correlation id cannot be empty")]
247 EmptyCorrelationId,
248 #[error("correlation id must be a UUID: {input}")]
249 InvalidCorrelationId { input: String },
250}
251
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct PeerResponseTerminalTransportIdentity(String);
254
255impl PeerResponseTerminalTransportIdentity {
256 pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
257 let raw = raw.into();
258 if raw.trim().is_empty() {
259 return Err(PeerResponseTerminalFactError::EmptyTransportIdentity);
260 }
261 Ok(Self(raw))
262 }
263
264 pub fn as_str(&self) -> &str {
265 &self.0
266 }
267}
268
269impl std::fmt::Display for PeerResponseTerminalTransportIdentity {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 self.0.fmt(f)
272 }
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
276pub struct PeerResponseTerminalRouteIdentity(String);
277
278impl PeerResponseTerminalRouteIdentity {
279 pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
280 let raw = raw.into();
281 if raw.trim().is_empty() {
282 return Err(PeerResponseTerminalFactError::EmptyRouteIdentity);
283 }
284 if raw.chars().any(char::is_control) {
285 return Err(PeerResponseTerminalFactError::InvalidRouteIdentity);
286 }
287 let peer_id = crate::comms::PeerId::parse(raw.trim())
288 .map_err(|_| PeerResponseTerminalFactError::InvalidRouteIdentity)?;
289 Ok(Self(peer_id.to_string()))
290 }
291
292 pub fn as_str(&self) -> &str {
293 &self.0
294 }
295}
296
297impl std::fmt::Display for PeerResponseTerminalRouteIdentity {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 self.0.fmt(f)
300 }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq)]
304pub struct PeerResponseTerminalDisplayIdentity(String);
305
306impl PeerResponseTerminalDisplayIdentity {
307 pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
308 let raw = raw.into();
309 if raw.trim().is_empty() {
310 return Err(PeerResponseTerminalFactError::EmptyDisplayIdentity);
311 }
312 if raw.chars().any(char::is_control) {
313 return Err(PeerResponseTerminalFactError::InvalidDisplayIdentity);
314 }
315 Ok(Self(raw))
316 }
317
318 pub fn as_str(&self) -> &str {
319 &self.0
320 }
321}
322
323impl std::fmt::Display for PeerResponseTerminalDisplayIdentity {
324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325 self.0.fmt(f)
326 }
327}
328
329#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
330pub struct PeerResponseTerminalCorrelationId(PeerCorrelationId);
331
332impl PeerResponseTerminalCorrelationId {
333 pub fn parse(raw: impl AsRef<str>) -> Result<Self, PeerResponseTerminalFactError> {
334 let raw = raw.as_ref();
335 if raw.trim().is_empty() {
336 return Err(PeerResponseTerminalFactError::EmptyCorrelationId);
337 }
338 uuid::Uuid::parse_str(raw)
339 .map(|uuid| Self(PeerCorrelationId::from_uuid(uuid)))
340 .map_err(|_| PeerResponseTerminalFactError::InvalidCorrelationId {
341 input: raw.to_string(),
342 })
343 }
344
345 pub const fn from_peer_correlation_id(correlation_id: PeerCorrelationId) -> Self {
346 Self(correlation_id)
347 }
348
349 pub const fn as_peer_correlation_id(self) -> PeerCorrelationId {
350 self.0
351 }
352}
353
354impl std::fmt::Display for PeerResponseTerminalCorrelationId {
355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356 self.0.fmt(f)
357 }
358}
359
360#[derive(Debug, Clone, PartialEq)]
361pub struct PeerResponseTerminalRenderPayload(Option<serde_json::Value>);
362
363impl PeerResponseTerminalRenderPayload {
364 pub fn new(payload: Option<serde_json::Value>) -> Self {
365 Self(payload)
366 }
367
368 pub fn as_ref(&self) -> Option<&serde_json::Value> {
369 self.0.as_ref()
370 }
371}
372
373impl From<Option<serde_json::Value>> for PeerResponseTerminalRenderPayload {
374 fn from(payload: Option<serde_json::Value>) -> Self {
375 Self::new(payload)
376 }
377}
378
379#[derive(Debug, Clone, PartialEq, Eq)]
380pub struct PeerResponseTerminalSource {
381 pub transport_identity: Option<PeerResponseTerminalTransportIdentity>,
382 pub route_identity: PeerResponseTerminalRouteIdentity,
383 pub display_identity: PeerResponseTerminalDisplayIdentity,
384}
385
386impl PeerResponseTerminalSource {
387 pub fn new(
388 transport_identity: Option<PeerResponseTerminalTransportIdentity>,
389 route_identity: PeerResponseTerminalRouteIdentity,
390 display_identity: PeerResponseTerminalDisplayIdentity,
391 ) -> Self {
392 Self {
393 transport_identity,
394 route_identity,
395 display_identity,
396 }
397 }
398
399 pub fn parse(
400 transport_identity: Option<impl Into<String>>,
401 route_identity: impl Into<String>,
402 display_identity: impl Into<String>,
403 ) -> Result<Self, PeerResponseTerminalFactError> {
404 Ok(Self::new(
405 transport_identity
406 .map(PeerResponseTerminalTransportIdentity::parse)
407 .transpose()?,
408 PeerResponseTerminalRouteIdentity::parse(route_identity)?,
409 PeerResponseTerminalDisplayIdentity::parse(display_identity)?,
410 ))
411 }
412}
413
414#[derive(Debug, Clone, PartialEq)]
415pub struct PeerResponseTerminalFact {
416 pub source: PeerResponseTerminalSource,
417 pub correlation_id: PeerResponseTerminalCorrelationId,
418 pub status: PeerResponseTerminalProjectionStatus,
419 pub render_payload: PeerResponseTerminalRenderPayload,
420}
421
422impl PeerResponseTerminalFact {
423 pub fn new(
424 source: PeerResponseTerminalSource,
425 correlation_id: PeerResponseTerminalCorrelationId,
426 status: PeerResponseTerminalProjectionStatus,
427 render_payload: PeerResponseTerminalRenderPayload,
428 ) -> Self {
429 Self {
430 source,
431 correlation_id,
432 status,
433 render_payload,
434 }
435 }
436
437 pub fn prompt_text(&self) -> String {
438 format!(
439 "Peer terminal response from {}. Request ID: {}. Status: {}. Result: {}.",
440 self.source.display_identity,
441 self.correlation_id,
442 self.status.label(),
443 format_peer_projection_payload(self.render_payload.as_ref())
444 )
445 }
446
447 pub fn context_key(&self) -> String {
448 peer_response_terminal_context_key(&self.source.route_identity, self.correlation_id)
449 }
450}
451
452#[derive(Debug, Clone, PartialEq)]
453pub enum PeerConversationProjection {
454 Message {
455 peer_id: String,
456 },
457 Request {
458 peer_id: crate::comms::PeerId,
459 display_name: Option<String>,
460 request_id: String,
461 intent: String,
462 payload: Option<serde_json::Value>,
463 },
464 ResponseProgress {
465 peer_id: String,
466 request_id: String,
467 phase: PeerResponseProgressProjectionPhase,
468 payload: Option<serde_json::Value>,
469 },
470 ResponseTerminal {
471 fact: PeerResponseTerminalFact,
472 },
473}
474
475impl PeerConversationProjection {
476 pub fn response_terminal(fact: PeerResponseTerminalFact) -> Self {
477 Self::ResponseTerminal { fact }
478 }
479
480 pub fn block_prefix_text(&self) -> Option<String> {
481 match self {
482 Self::Message { peer_id } => Some(format!("Peer message from {peer_id}")),
483 Self::Request { .. }
484 | Self::ResponseProgress { .. }
485 | Self::ResponseTerminal { .. } => None,
486 }
487 }
488
489 pub fn prompt_text(&self) -> String {
490 match self {
491 Self::Message { .. } => String::new(),
492 Self::Request {
493 peer_id,
494 display_name,
495 request_id,
496 intent,
497 payload,
498 } => {
499 let display_suffix = display_name
500 .as_deref()
501 .map(str::trim)
502 .filter(|name| !name.is_empty())
503 .map(|name| format!(" (display_name: {name})"))
504 .unwrap_or_default();
505 let response_call = crate::interaction::SendResponseCallProjection::new(
506 *peer_id,
507 display_name.as_deref(),
508 request_id.clone(),
509 );
510 format!(
511 "Peer request from peer_id {peer_id}{display_suffix}. Intent: {intent}. Request ID: {request_id}. Params: {}. This is not a normal user request and not a prompt for direct user-facing output. {} Do not use send_message for this reply.",
512 format_peer_projection_payload(payload.as_ref()),
513 response_call.instruction_text()
514 )
515 }
516 Self::ResponseProgress {
517 peer_id,
518 request_id,
519 phase,
520 payload,
521 } => format!(
522 "Peer response progress from {peer_id}. Request ID: {request_id}. Phase: {}. Payload: {}.",
523 phase.label(),
524 format_peer_projection_payload(payload.as_ref())
525 ),
526 Self::ResponseTerminal { fact } => fact.prompt_text(),
527 }
528 }
529
530 pub fn context_key(&self) -> Option<String> {
531 match self {
532 Self::ResponseTerminal { fact } => Some(fact.context_key()),
533 Self::Message { .. } | Self::Request { .. } | Self::ResponseProgress { .. } => None,
534 }
535 }
536}
537
538pub fn peer_response_terminal_context_key(
539 route_identity: &PeerResponseTerminalRouteIdentity,
540 correlation_id: PeerResponseTerminalCorrelationId,
541) -> String {
542 format!("peer_response_terminal:{route_identity}:{correlation_id}")
543}
544
545fn format_peer_projection_payload(payload: Option<&serde_json::Value>) -> String {
546 serde_json::to_string_pretty(payload.unwrap_or(&serde_json::Value::Null))
547 .unwrap_or_else(|_| "null".to_string())
548}
549
550#[derive(Debug, Clone, PartialEq, Eq)]
555pub struct TurnStateSnapshot {
556 pub active_run_id: Option<RunId>,
557 pub loop_state: LoopState,
563 pub turn_phase: TurnPhase,
564 pub primitive_kind: Option<TurnPrimitiveKind>,
567 pub admitted_content_shape: Option<ContentShape>,
568 pub vision_enabled: bool,
569 pub image_tool_results_enabled: bool,
570 pub tool_calls_pending: u64,
571 pub pending_op_refs: BTreeSet<AsyncOpRef>,
572 pub barrier_operation_ids: BTreeSet<OperationId>,
573 pub has_barrier_ops: bool,
574 pub barrier_satisfied: bool,
575 pub boundary_count: u64,
576 pub cancel_after_boundary: bool,
577 pub terminal_outcome: Option<TurnTerminalOutcome>,
580 pub terminal_cause_kind: Option<TurnTerminalCauseKind>,
583 pub extraction_attempts: u64,
584 pub max_extraction_retries: u64,
585 pub llm_retry_attempt: u32,
586 pub llm_retry_max_retries: u32,
587 pub llm_retry_selected_delay_ms: u64,
588}
589
590pub trait TurnStateHandle: Send + Sync {
592 fn start_conversation_run(
593 &self,
594 run_id: RunId,
595 primitive_kind: TurnPrimitiveKind,
596 admitted_content_shape: ContentShape,
597 vision_enabled: bool,
598 image_tool_results_enabled: bool,
599 max_extraction_retries: u64,
600 ) -> Result<(), DslTransitionError>;
601
602 fn start_immediate_append(&self, run_id: RunId) -> Result<(), DslTransitionError>;
603
604 fn start_immediate_context(&self, run_id: RunId) -> Result<(), DslTransitionError>;
605
606 fn primitive_applied(&self) -> Result<(), DslTransitionError>;
607
608 fn llm_returned_tool_calls(&self, tool_count: u64) -> Result<(), DslTransitionError>;
609
610 fn llm_returned_terminal(&self) -> Result<(), DslTransitionError>;
611
612 fn register_pending_ops(
613 &self,
614 op_refs: BTreeSet<AsyncOpRef>,
615 barrier_operation_ids: BTreeSet<OperationId>,
616 ) -> Result<(), DslTransitionError>;
617
618 fn tool_calls_resolved(&self) -> Result<(), DslTransitionError>;
619
620 fn ops_barrier_satisfied(
621 &self,
622 operation_ids: BTreeSet<OperationId>,
623 ) -> Result<(), DslTransitionError>;
624
625 fn boundary_continue(&self) -> Result<(), DslTransitionError>;
626
627 fn boundary_complete(&self) -> Result<(), DslTransitionError>;
628
629 fn enter_extraction(&self, max_retries: u32) -> Result<(), DslTransitionError>;
630
631 fn extraction_start(&self) -> Result<(), DslTransitionError>;
632
633 fn extraction_validation_passed(&self) -> Result<(), DslTransitionError>;
634
635 fn extraction_validation_failed(&self, error: String) -> Result<(), DslTransitionError>;
636
637 fn extraction_failed(&self, error: String) -> Result<(), DslTransitionError>;
638
639 fn recoverable_failure(&self, retry: LlmRetrySchedule) -> Result<(), DslTransitionError>;
640
641 fn fatal_failure(&self, reason: TurnFailureReason) -> Result<(), DslTransitionError>;
642
643 fn retry_requested(&self, retry_attempt: u32) -> Result<(), DslTransitionError>;
644
645 fn cancel_now(&self) -> Result<(), DslTransitionError>;
646
647 fn request_cancel_after_boundary(&self) -> Result<(), DslTransitionError>;
648
649 fn cancellation_observed(&self) -> Result<(), DslTransitionError>;
650
651 fn acknowledge_terminal(&self, outcome: TurnTerminalOutcome) -> Result<(), DslTransitionError>;
652
653 fn turn_limit_reached(&self) -> Result<(), DslTransitionError>;
654
655 fn budget_exhausted(&self) -> Result<(), DslTransitionError>;
656
657 fn time_budget_exceeded(&self) -> Result<(), DslTransitionError>;
658
659 fn force_cancel_no_run(&self) -> Result<(), DslTransitionError>;
660
661 fn run_completed(&self, run_id: RunId) -> Result<(), DslTransitionError>;
662
663 fn run_failed(
664 &self,
665 run_id: RunId,
666 reason: TurnFailureReason,
667 ) -> Result<(), DslTransitionError>;
668
669 fn run_cancelled(&self, run_id: RunId) -> Result<(), DslTransitionError>;
670
671 fn snapshot(&self) -> TurnStateSnapshot;
672}
673
674pub trait CommsDrainHandle: Send + Sync {
683 fn ensure_drain_running(&self) -> Result<(), DslTransitionError>;
685
686 fn spawn_drain(&self, mode: DrainMode) -> Result<(), DslTransitionError>;
688
689 fn stop_drain(&self) -> Result<(), DslTransitionError>;
691
692 fn drain_exited_clean(&self) -> Result<(), DslTransitionError>;
694
695 fn drain_exited_respawnable(&self) -> Result<(), DslTransitionError>;
697
698 fn notify_drain_exited(&self, reason: DrainExitReason) -> Result<(), DslTransitionError>;
700}
701
702#[derive(Debug, Clone, PartialEq, Eq)]
707pub struct SurfaceSnapshot {
708 pub surface_id: String,
709 pub base_state: Option<ExternalToolSurfaceBaseState>,
712 pub pending_op: ExternalToolSurfacePendingOp,
713 pub staged_op: ExternalToolSurfaceStagedOp,
714 pub staged_intent_sequence: Option<u64>,
715 pub pending_task_sequence: Option<u64>,
716 pub pending_lineage_sequence: Option<u64>,
717 pub inflight_calls: u64,
718 pub last_delta_operation: Option<ExternalToolSurfaceDeltaOperation>,
720 pub last_delta_phase: Option<ExternalToolSurfaceDeltaPhase>,
722 pub removal_draining_since_ms: Option<u64>,
723 pub removal_timeout_at_ms: Option<u64>,
724 pub removal_applied_at_turn: Option<u64>,
725}
726
727#[derive(Debug, Clone, PartialEq, Eq)]
728pub struct SurfaceDiagnosticSnapshot {
729 pub surface_phase: ExternalToolSurfaceGlobalPhase,
730 pub known_surfaces: BTreeSet<String>,
731 pub visible_surfaces: BTreeSet<String>,
732 pub snapshot_epoch: u64,
733 pub snapshot_aligned_epoch: u64,
734 pub has_pending_or_staged: bool,
735 pub entries: Vec<SurfaceSnapshot>,
736}
737
738#[derive(Debug, Clone, PartialEq, Eq)]
739pub enum ExternalToolSurfaceInput {
740 StageAdd {
741 surface_id: String,
742 now_ms: u64,
743 },
744 StageRemove {
745 surface_id: String,
746 now_ms: u64,
747 },
748 StageReload {
749 surface_id: String,
750 now_ms: u64,
751 },
752 ApplyBoundary {
753 surface_id: String,
754 now_ms: u64,
755 staged_intent_sequence: u64,
756 applied_at_turn: u64,
757 },
758 MarkPendingSucceeded {
759 surface_id: String,
760 pending_task_sequence: u64,
761 staged_intent_sequence: u64,
762 },
763 MarkPendingFailed {
764 surface_id: String,
765 pending_task_sequence: u64,
766 staged_intent_sequence: u64,
767 cause: ExternalToolSurfaceFailureCause,
768 },
769 CallStarted {
770 surface_id: String,
771 },
772 CallFinished {
773 surface_id: String,
774 },
775 FinalizeRemovalClean {
776 surface_id: String,
777 },
778 FinalizeRemovalForced {
779 surface_id: String,
780 },
781 SnapshotAligned {
782 epoch: u64,
783 },
784 Shutdown,
785}
786
787#[derive(Debug, Clone, PartialEq, Eq)]
788pub enum ExternalToolSurfaceEffect {
789 ScheduleSurfaceCompletion {
790 surface_id: String,
791 operation: ExternalToolSurfaceDeltaOperation,
792 pending_task_sequence: u64,
793 staged_intent_sequence: u64,
794 applied_at_turn: u64,
795 },
796 RefreshVisibleSurfaceSet {
797 snapshot_epoch: u64,
798 },
799 EmitExternalToolDelta {
800 surface_id: String,
801 operation: ExternalToolSurfaceDeltaOperation,
802 phase: ExternalToolSurfaceDeltaPhase,
803 cause: Option<ExternalToolSurfaceFailureCause>,
804 },
805 CloseSurfaceConnection {
806 surface_id: String,
807 },
808 RejectSurfaceCall {
809 surface_id: String,
810 cause: ExternalToolSurfaceFailureCause,
811 },
812}
813
814#[derive(Debug, Clone, PartialEq, Eq)]
815pub struct ExternalToolSurfaceTransition {
816 pub phase: ExternalToolSurfaceGlobalPhase,
817 pub effects: Vec<ExternalToolSurfaceEffect>,
818}
819
820pub trait ExternalToolSurfaceHandle: Send + Sync {
822 fn apply_surface_input(
823 &self,
824 input: ExternalToolSurfaceInput,
825 ) -> Result<ExternalToolSurfaceTransition, DslTransitionError>;
826
827 fn register(&self, surface_id: String) -> Result<(), DslTransitionError>;
828
829 fn stage_add(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
830
831 fn stage_remove(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
832
833 fn stage_reload(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
834
835 fn apply_boundary(
836 &self,
837 surface_id: String,
838 now_ms: u64,
839 staged_intent_sequence: u64,
840 applied_at_turn: u64,
841 ) -> Result<(), DslTransitionError>;
842
843 fn mark_pending_succeeded(
844 &self,
845 surface_id: String,
846 pending_task_sequence: u64,
847 staged_intent_sequence: u64,
848 ) -> Result<(), DslTransitionError>;
849
850 fn mark_pending_failed(
851 &self,
852 surface_id: String,
853 pending_task_sequence: u64,
854 staged_intent_sequence: u64,
855 cause: ExternalToolSurfaceFailureCause,
856 ) -> Result<(), DslTransitionError>;
857
858 fn call_started(&self, surface_id: String) -> Result<(), DslTransitionError>;
859
860 fn call_finished(&self, surface_id: String) -> Result<(), DslTransitionError>;
861
862 fn finalize_removal_clean(&self, surface_id: String) -> Result<(), DslTransitionError>;
863
864 fn finalize_removal_forced(&self, surface_id: String) -> Result<(), DslTransitionError>;
865
866 fn snapshot_aligned(&self, epoch: u64) -> Result<(), DslTransitionError>;
867
868 fn shutdown_surface(&self) -> Result<(), DslTransitionError>;
869
870 fn surface_snapshot(&self, surface_id: &str) -> Option<SurfaceSnapshot>;
871
872 fn diagnostic_snapshot(&self) -> SurfaceDiagnosticSnapshot;
873
874 fn visible_surfaces(&self) -> BTreeSet<String>;
875
876 fn removing_surfaces(&self) -> BTreeSet<String>;
877
878 fn pending_surfaces(&self) -> BTreeSet<String>;
879
880 fn has_pending_or_staged(&self) -> bool;
881
882 fn snapshot_epoch(&self) -> u64;
883
884 fn snapshot_aligned_epoch(&self) -> u64;
885}
886
887pub trait PeerCommsHandle: Send + Sync {
901 fn classify_external_envelope(
904 &self,
905 facts: PeerIngressEnvelopeFacts,
906 ) -> Result<PeerIngressAdmission, DslTransitionError>;
907
908 fn classify_plain_event(
911 &self,
912 facts: PeerIngressPlainEventFacts,
913 ) -> Result<PeerIngressAdmission, DslTransitionError>;
914
915 fn set_peer_ingress_context(&self, keep_alive: bool) -> Result<(), DslTransitionError>;
917}
918
919pub trait SessionAdmissionHandle: Send + Sync {
932 fn ingest(
939 &self,
940 runtime_id: &str,
941 work_id: &str,
942 origin: InputSource,
943 ) -> Result<(), DslTransitionError>;
944
945 fn accept_with_completion(
956 &self,
957 input_id: &InputId,
958 request_immediate_processing: bool,
959 interrupt_yielding: bool,
960 wake_if_idle: bool,
961 ) -> Result<(), DslTransitionError>;
962
963 fn accept_without_wake(&self, input_id: &InputId) -> Result<(), DslTransitionError>;
965
966 fn prepare(&self, run_id: &RunId) -> Result<(), DslTransitionError>;
968
969 fn commit(&self, input_id: &InputId, run_id: &RunId) -> Result<(), DslTransitionError>;
972}
973
974#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
980pub struct LeaseKey {
981 pub realm: crate::connection::RealmId,
982 pub binding: crate::connection::BindingId,
983 pub profile: Option<crate::connection::ProfileId>,
984}
985
986impl LeaseKey {
987 pub fn new(
988 realm: crate::connection::RealmId,
989 binding: crate::connection::BindingId,
990 profile: Option<crate::connection::ProfileId>,
991 ) -> Self {
992 Self {
993 realm,
994 binding,
995 profile,
996 }
997 }
998
999 pub fn from_auth_binding(auth_binding: &crate::connection::AuthBindingRef) -> Self {
1000 Self {
1001 realm: auth_binding.realm.clone(),
1002 binding: auth_binding.binding.clone(),
1003 profile: auth_binding.profile.clone(),
1004 }
1005 }
1006}
1007
1008impl std::fmt::Display for LeaseKey {
1009 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1010 match &self.profile {
1011 Some(profile) => write!(f, "{}:{}:{}", self.realm, self.binding, profile),
1012 None => write!(f, "{}:{}", self.realm, self.binding),
1013 }
1014 }
1015}
1016
1017#[derive(Debug, Clone, PartialEq, Eq)]
1034pub struct AuthLeaseSnapshot {
1035 pub phase: Option<AuthLeasePhase>,
1036 pub expires_at: Option<u64>,
1037 pub credential_present: bool,
1038 pub generation: u64,
1039 pub credential_published_at_millis: Option<u64>,
1040}
1041
1042#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1050pub struct AuthLeaseTransition {
1051 pub generation: u64,
1052 pub credential_published_at_millis: Option<u64>,
1053}
1054
1055impl AuthLeaseTransition {
1056 pub fn new(generation: u64, credential_published_at_millis: Option<u64>) -> Self {
1057 Self {
1058 generation,
1059 credential_published_at_millis,
1060 }
1061 }
1062}
1063
1064pub const AUTH_LEASE_TTL_REFRESH_WINDOW_SECS: u64 = 60;
1076
1077pub trait AuthLeaseHandle: Send + Sync + std::any::Any {
1079 fn acquire_lease(
1084 &self,
1085 lease_key: &LeaseKey,
1086 expires_at: u64,
1087 ) -> Result<AuthLeaseTransition, DslTransitionError>;
1088
1089 fn mark_expiring(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
1091
1092 fn begin_refresh(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
1100
1101 fn complete_refresh(
1105 &self,
1106 lease_key: &LeaseKey,
1107 new_expires_at: u64,
1108 now: u64,
1109 ) -> Result<AuthLeaseTransition, DslTransitionError>;
1110
1111 fn refresh_failed(
1115 &self,
1116 lease_key: &LeaseKey,
1117 permanent: bool,
1118 ) -> Result<(), DslTransitionError>;
1119
1120 fn mark_reauth_required(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
1122
1123 fn release_lease(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
1126
1127 fn release_credential_lifecycle(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError> {
1134 self.release_lease(lease_key)
1135 }
1136
1137 fn restore_auth_lifecycle_snapshot(
1144 &self,
1145 lease_key: &LeaseKey,
1146 snapshot: &AuthLeaseSnapshot,
1147 expires_at: Option<u64>,
1148 ) -> Result<(), DslTransitionError> {
1149 if !snapshot.credential_present {
1150 return Ok(());
1151 }
1152 let Some(phase) = snapshot.phase else {
1153 return Ok(());
1154 };
1155 if phase == AuthLeasePhase::Released {
1156 return Ok(());
1157 }
1158 let Some(expires_at) = expires_at else {
1159 return Ok(());
1160 };
1161 self.acquire_lease(lease_key, expires_at)?;
1162 match phase {
1163 AuthLeasePhase::Valid => Ok(()),
1164 AuthLeasePhase::Expiring => self.mark_expiring(lease_key),
1165 AuthLeasePhase::Refreshing => self.begin_refresh(lease_key),
1166 AuthLeasePhase::ReauthRequired => self.mark_reauth_required(lease_key),
1167 AuthLeasePhase::Released => Ok(()),
1168 }
1169 }
1170
1171 fn snapshot(&self, lease_key: &LeaseKey) -> AuthLeaseSnapshot;
1173}
1174
1175pub trait McpServerLifecycleHandle: Send + Sync {
1196 fn apply_connect_pending(&self, server_id: &str) -> Result<(), DslTransitionError>;
1199
1200 fn apply_connected(&self, server_id: &str) -> Result<(), DslTransitionError>;
1202
1203 fn apply_failed(&self, server_id: &str, error: &str) -> Result<(), DslTransitionError>;
1205
1206 fn apply_disconnected(&self, server_id: &str) -> Result<(), DslTransitionError>;
1208
1209 fn apply_reload(&self, server_id: &str) -> Result<(), DslTransitionError>;
1212
1213 fn pending_server_ids(&self) -> BTreeSet<String>;
1218}
1219
1220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1229#[non_exhaustive]
1230pub enum PeerTerminalDisposition {
1231 Completed,
1233 Failed,
1235}
1236
1237pub trait PeerInteractionHandle: Send + Sync {
1251 fn request_sent(
1255 &self,
1256 corr_id: PeerCorrelationId,
1257 to: String,
1258 ) -> Result<(), DslTransitionError>;
1259
1260 fn response_progress(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1266
1267 fn response_terminal(
1274 &self,
1275 corr_id: PeerCorrelationId,
1276 disposition: PeerTerminalDisposition,
1277 ) -> Result<(), DslTransitionError>;
1278
1279 fn request_timed_out(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1285
1286 fn request_received(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1290
1291 fn response_replied(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1295
1296 fn outbound_state(&self, corr_id: PeerCorrelationId) -> Option<OutboundPeerRequestState>;
1300
1301 fn inbound_state(&self, corr_id: PeerCorrelationId) -> Option<InboundPeerRequestState>;
1303
1304 fn install_cleanup_observer(&self, observer: Arc<dyn PeerInteractionCleanupObserver>);
1313}
1314
1315pub trait PeerInteractionCleanupObserver: Send + Sync {
1325 fn on_peer_interaction_cleanup(&self, corr_id: PeerCorrelationId);
1332}
1333
1334pub trait SessionContextHandle: Send + Sync {
1349 fn context_advanced(&self, updated_at_ms: u64) -> Result<bool, DslTransitionError>;
1357
1358 fn current_watermark_ms(&self) -> u64;
1366
1367 fn install_observer(&self, observer: Arc<dyn SessionContextAdvancedObserver>);
1371
1372 fn install_observer_with_baseline(
1386 &self,
1387 observer: Arc<dyn SessionContextAdvancedObserver>,
1388 ) -> u64 {
1389 self.install_observer(observer);
1393 self.current_watermark_ms()
1394 }
1395}
1396
1397pub trait SessionContextAdvancedObserver: Send + Sync {
1407 fn on_session_context_advanced(&self, updated_at_ms: u64);
1411}
1412
1413#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
1419pub enum SessionClaimError {
1420 #[error("session identity already claimed: {0}")]
1422 SessionIdentityInUse(SessionId),
1423}
1424
1425pub struct SessionClaim {
1431 session_id: SessionId,
1432 handle: Arc<dyn SessionClaimHandle>,
1433}
1434
1435impl SessionClaim {
1436 pub fn new(session_id: SessionId, handle: Arc<dyn SessionClaimHandle>) -> Self {
1440 Self { session_id, handle }
1441 }
1442
1443 pub fn session_id(&self) -> &SessionId {
1445 &self.session_id
1446 }
1447}
1448
1449impl Drop for SessionClaim {
1450 fn drop(&mut self) {
1451 self.handle.release(&self.session_id);
1452 }
1453}
1454
1455impl std::fmt::Debug for SessionClaim {
1456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1457 f.debug_struct("SessionClaim")
1458 .field("session_id", &self.session_id)
1459 .finish_non_exhaustive()
1460 }
1461}
1462
1463pub trait SessionClaimHandle: Send + Sync {
1472 fn try_acquire(
1480 self: Arc<Self>,
1481 session_id: &SessionId,
1482 ) -> Result<SessionClaim, SessionClaimError>;
1483
1484 fn release(&self, session_id: &SessionId);
1490}
1491
1492pub struct DefaultSessionClaimRegistry {
1498 claims: std::sync::Mutex<std::collections::HashSet<SessionId>>,
1499}
1500
1501impl DefaultSessionClaimRegistry {
1502 pub fn new() -> Self {
1504 Self {
1505 claims: std::sync::Mutex::new(std::collections::HashSet::new()),
1506 }
1507 }
1508
1509 pub fn global() -> Arc<Self> {
1511 use std::sync::OnceLock;
1512 static GLOBAL: OnceLock<Arc<DefaultSessionClaimRegistry>> = OnceLock::new();
1513 Arc::clone(GLOBAL.get_or_init(|| Arc::new(DefaultSessionClaimRegistry::new())))
1514 }
1515}
1516
1517impl Default for DefaultSessionClaimRegistry {
1518 fn default() -> Self {
1519 Self::new()
1520 }
1521}
1522
1523impl SessionClaimHandle for DefaultSessionClaimRegistry {
1524 fn try_acquire(
1525 self: Arc<Self>,
1526 session_id: &SessionId,
1527 ) -> Result<SessionClaim, SessionClaimError> {
1528 let mut claims = self
1529 .claims
1530 .lock()
1531 .unwrap_or_else(std::sync::PoisonError::into_inner);
1532 if !claims.insert(session_id.clone()) {
1533 return Err(SessionClaimError::SessionIdentityInUse(session_id.clone()));
1534 }
1535 drop(claims);
1536 Ok(SessionClaim::new(
1537 session_id.clone(),
1538 self as Arc<dyn SessionClaimHandle>,
1539 ))
1540 }
1541
1542 fn release(&self, session_id: &SessionId) {
1543 let mut claims = self
1544 .claims
1545 .lock()
1546 .unwrap_or_else(std::sync::PoisonError::into_inner);
1547 claims.remove(session_id);
1548 }
1549}
1550
1551pub trait InteractionStreamHandle: Send + Sync {
1569 fn reserved(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1575
1576 fn attached(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1581
1582 fn completed(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1586
1587 fn expired(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1591
1592 fn closed_early(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1596
1597 fn state(&self, corr_id: PeerCorrelationId) -> Option<InteractionStreamState>;
1604
1605 fn install_cleanup_observer(&self, observer: Arc<dyn InteractionStreamCleanupObserver>);
1610}
1611
1612pub trait InteractionStreamCleanupObserver: Send + Sync {
1621 fn on_interaction_stream_cleanup(&self, corr_id: PeerCorrelationId);
1627}
1628
1629#[cfg(test)]
1630#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
1631mod tests {
1632 use super::{
1633 ExternalToolSurfaceEffect, ExternalToolSurfaceFailureCause, ExternalToolSurfaceInput,
1634 PeerConversationProjection, PeerResponseProgressProjectionPhase,
1635 PeerResponseTerminalCorrelationId, PeerResponseTerminalDisplayIdentity,
1636 PeerResponseTerminalFact, PeerResponseTerminalFactError,
1637 PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
1638 PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
1639 PeerResponseTerminalTransportIdentity, peer_response_terminal_context_key,
1640 };
1641 use crate::tool_scope::{ExternalToolSurfaceDeltaOperation, ExternalToolSurfaceDeltaPhase};
1642
1643 #[test]
1644 fn external_tool_surface_pending_failure_cause_projects_external_code() {
1645 let input = ExternalToolSurfaceInput::MarkPendingFailed {
1646 surface_id: "alpha".to_owned(),
1647 pending_task_sequence: 7,
1648 staged_intent_sequence: 11,
1649 cause: ExternalToolSurfaceFailureCause::PendingFailed,
1650 };
1651
1652 let ExternalToolSurfaceInput::MarkPendingFailed { cause, .. } = input else {
1653 panic!("constructed MarkPendingFailed input");
1654 };
1655 assert_eq!(cause, ExternalToolSurfaceFailureCause::PendingFailed);
1656 assert_eq!(cause.as_str(), "pending_failed");
1657 assert_eq!(
1658 serde_json::to_value(cause).expect("serialize failure cause"),
1659 serde_json::json!("pending_failed")
1660 );
1661
1662 let effect = ExternalToolSurfaceEffect::EmitExternalToolDelta {
1663 surface_id: "alpha".to_owned(),
1664 operation: ExternalToolSurfaceDeltaOperation::Add,
1665 phase: ExternalToolSurfaceDeltaPhase::Failed,
1666 cause: Some(cause),
1667 };
1668 assert!(matches!(
1669 effect,
1670 ExternalToolSurfaceEffect::EmitExternalToolDelta {
1671 cause: Some(ExternalToolSurfaceFailureCause::PendingFailed),
1672 ..
1673 }
1674 ));
1675 }
1676
1677 #[test]
1678 fn peer_terminal_projection_owns_prompt_and_context_key() {
1679 let route_id = "550e8400-e29b-41d4-a716-446655440000";
1680 let route_identity =
1681 PeerResponseTerminalRouteIdentity::parse(route_id).expect("route identity");
1682 let correlation_id =
1683 PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
1684 .expect("correlation id");
1685 let projection = PeerConversationProjection::ResponseTerminal {
1686 fact: PeerResponseTerminalFact::new(
1687 PeerResponseTerminalSource::new(
1688 Some(
1689 PeerResponseTerminalTransportIdentity::parse("transport-runtime-1")
1690 .expect("transport identity"),
1691 ),
1692 route_identity,
1693 PeerResponseTerminalDisplayIdentity::parse("Analyst")
1694 .expect("display identity"),
1695 ),
1696 correlation_id,
1697 PeerResponseTerminalProjectionStatus::Completed,
1698 PeerResponseTerminalRenderPayload::new(Some(serde_json::json!({
1699 "request_intent": "checksum_token",
1700 "request_subject": "alpha beta gamma",
1701 "token": "birch seventeen"
1702 }))),
1703 ),
1704 };
1705
1706 assert_eq!(
1707 projection.context_key().as_deref(),
1708 Some(
1709 "peer_response_terminal:550e8400-e29b-41d4-a716-446655440000:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
1710 )
1711 );
1712 assert_eq!(
1713 projection.prompt_text(),
1714 "Peer terminal response from Analyst. Request ID: 018f6f79-7a82-7c4e-a552-a3b86f9630f1. Status: completed. Result: {\n \"request_intent\": \"checksum_token\",\n \"request_subject\": \"alpha beta gamma\",\n \"token\": \"birch seventeen\"\n}."
1715 );
1716 }
1717
1718 #[test]
1719 fn peer_progress_projection_formats_phase_from_shared_seam() {
1720 let projection = PeerConversationProjection::ResponseProgress {
1721 peer_id: "operator-rt".into(),
1722 request_id: "req-789".into(),
1723 phase: PeerResponseProgressProjectionPhase::PartialResult,
1724 payload: Some(serde_json::json!({ "chunk": "alpha" })),
1725 };
1726
1727 assert_eq!(projection.context_key(), None);
1728 assert_eq!(
1729 projection.prompt_text(),
1730 "Peer response progress from operator-rt. Request ID: req-789. Phase: partial_result. Payload: {\n \"chunk\": \"alpha\"\n}."
1731 );
1732 }
1733
1734 #[test]
1735 fn peer_terminal_context_key_helper_stays_canonical() {
1736 let route_id = "550e8400-e29b-41d4-a716-446655440000";
1737 let route_identity =
1738 PeerResponseTerminalRouteIdentity::parse(route_id).expect("route identity");
1739 let correlation_id =
1740 PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
1741 .expect("correlation id");
1742 assert_eq!(
1743 peer_response_terminal_context_key(&route_identity, correlation_id),
1744 "peer_response_terminal:550e8400-e29b-41d4-a716-446655440000:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
1745 );
1746 }
1747
1748 #[test]
1749 fn peer_terminal_route_identity_rejects_display_name_alias() {
1750 assert!(matches!(
1751 PeerResponseTerminalRouteIdentity::parse("analyst-rt"),
1752 Err(PeerResponseTerminalFactError::InvalidRouteIdentity)
1753 ));
1754 }
1755}