meerkat_core/handles.rs
1//! Cross-crate DSL handle traits.
2//!
3//! Downstream crates (`meerkat-mcp`, `meerkat-comms`, `meerkat-session`) drive
4//! DSL transitions through these trait objects without importing
5//! `meerkat-runtime`. Concrete impls live in `meerkat-runtime`, where the DSL
6//! authority lives.
7//!
8//! The mob side (`meerkat-mob`) already depends on `meerkat-runtime` and owns
9//! its MobMachine DSL authority in-crate, so it drives DSL transitions via
10//! direct `dsl_authority.apply(...)` calls — no cross-crate trait required.
11//!
12//! Trait methods are named per-DSL input, not per-authority input.
13//! DSL-owned discriminants (turn phase, drain mode, surface phase, surface
14//! pending/staged op, auth lease phase) flow as typed enums defined here in
15//! `meerkat-core` — each maps 1-to-1 with the typed DSL state that
16//! [`meerkat-runtime::meerkat_machine`] owns. Free-form `String` values are
17//! reserved for opaque identifiers (surface ids, binding keys, error
18//! messages).
19//!
20//! Return type is `Result<(), DslTransitionError>`. The DSL decides legality;
21//! phase/field reads happen elsewhere (direct DSL state accessors, not via
22//! these traits).
23
24use std::collections::BTreeSet;
25use std::sync::Arc;
26
27use crate::LoopState;
28use crate::comms::InputSource;
29use crate::interaction::{
30 PeerIngressAdmission, PeerIngressEnvelopeFacts, PeerIngressPlainEventFacts,
31};
32use crate::lifecycle::run_primitive::ModelId;
33use crate::lifecycle::{InputId, RunId};
34use crate::ops::{AsyncOpRef, OperationId};
35use crate::peer_correlation::{
36 InboundPeerRequestState, InteractionStreamState, OutboundPeerRequestState, PeerCorrelationId,
37};
38use crate::retry::LlmRetrySchedule;
39use crate::tool_scope::{
40 ExternalToolSurfaceBaseState, ExternalToolSurfaceDeltaOperation, ExternalToolSurfaceDeltaPhase,
41 ExternalToolSurfaceFailureCause, ExternalToolSurfaceGlobalPhase, ExternalToolSurfacePendingOp,
42 ExternalToolSurfaceStagedOp,
43};
44use crate::turn_execution_authority::{
45 ContentShape, TurnFailureReason, TurnPhase, TurnPrimitiveKind, TurnTerminalCauseKind,
46 TurnTerminalOutcome,
47};
48use crate::types::SessionId;
49
50// ---------------------------------------------------------------------------
51// Typed cross-crate enums for DSL-owned discriminants.
52//
53// Each maps 1-to-1 with the typed DSL state that meerkat-runtime's
54// MeerkatMachine / AuthMachine own. The runtime handle impls do a single
55// exhaustive `match` from DSL-typed to handle-typed — no string parsing,
56// no `_ => default` arms, no parallel adapters.
57// ---------------------------------------------------------------------------
58
59/// Mode for a comms drain task.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum DrainMode {
62 /// Legacy timed drain with idle timeout.
63 Timed,
64 /// Live session ingress while a runtime-backed session is attached.
65 AttachedSession,
66 /// Long-lived host drain (no idle timeout, respawnable on failure).
67 PersistentHost,
68}
69
70/// Reason a drain task exited.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum DrainExitReason {
73 IdleTimeout,
74 Dismissed,
75 Failed,
76 Aborted,
77 SessionShutdown,
78}
79
80/// Session model-routing baseline handle.
81///
82/// Runtime-backed surfaces create sessions before the factory has resolved the
83/// final LLM identity. The factory uses this handle after resolution so the DSL
84/// owns the canonical baseline model before tools such as `generate_image`
85/// resolve `Auto` provider targets.
86pub trait ModelRoutingHandle: Send + Sync {
87 /// Set the session's canonical model-routing baseline.
88 fn set_baseline(
89 &self,
90 baseline_model: ModelId,
91 realtime_capable: bool,
92 ) -> Result<(), DslTransitionError>;
93}
94
95impl DrainExitReason {
96 /// Stable discriminant for wire logging (drain exit reason is not yet a
97 /// typed DSL field; the handle passes the discriminant through).
98 pub const fn as_str(self) -> &'static str {
99 match self {
100 Self::IdleTimeout => "IdleTimeout",
101 Self::Dismissed => "Dismissed",
102 Self::Failed => "Failed",
103 Self::Aborted => "Aborted",
104 Self::SessionShutdown => "SessionShutdown",
105 }
106 }
107}
108
109/// Auth lease lifecycle phase, projected from the per-binding AuthMachine.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum AuthLeasePhase {
112 Valid,
113 Expiring,
114 Refreshing,
115 ReauthRequired,
116 Released,
117}
118
119/// Typed classification of why a DSL transition was rejected.
120///
121/// Emitted by the generated kernel's `apply` / `apply_signal` methods and
122/// bridged into [`DslTransitionError::kind`]. Callers that fire
123/// idempotently (realtime dispatchers, monotonic watermark advances,
124/// etc.) inspect this to distinguish "input was out of scope for this
125/// phase" (a real error) from "input was recognised but the guard dropped
126/// it" (a successful no-op).
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128pub enum DslRejectionKind {
129 /// No transition is declared for this `(phase, trigger)` pair — the
130 /// shell fired an input that is semantically out of scope for the
131 /// current phase. This is a programming mistake on the shell side.
132 NoMatchingTransition,
133 /// A transition is declared for this `(phase, trigger)` pair but
134 /// every candidate transition's guard evaluated false. Callers
135 /// firing idempotently treat this as a no-op; callers firing
136 /// unconditionally treat it as a user-visible error.
137 GuardRejected,
138}
139
140/// Error surfaced when a DSL transition is rejected.
141///
142/// Wraps the generated kernel's typed rejection. Trait impls populate
143/// `context` from the trait method name so callers can tell which handle
144/// rejected; `kind` lets callers distinguish guard rejection from
145/// out-of-scope input without substring-matching the rendered message.
146#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
147#[error("DSL transition rejected in {context}: {reason}")]
148pub struct DslTransitionError {
149 /// Name of the trait method / DSL variant whose transition was rejected.
150 pub context: &'static str,
151 /// Typed classification of the rejection — see [`DslRejectionKind`].
152 pub kind: DslRejectionKind,
153 /// Underlying rejection reason (typically the generated
154 /// `NoMatchingTransition`/`GuardRejected` formatted).
155 pub reason: String,
156}
157
158impl DslTransitionError {
159 /// Construct a `NoMatchingTransition` error with the given context
160 /// and reason. Back-compat constructor used by legacy callers that
161 /// haven't adopted the typed `kind` field; new code paths should
162 /// prefer [`DslTransitionError::no_matching`] or
163 /// [`DslTransitionError::guard_rejected`] for clarity.
164 pub fn new(context: &'static str, reason: impl Into<String>) -> Self {
165 Self::no_matching(context, reason)
166 }
167
168 /// Construct an error with `kind = NoMatchingTransition`.
169 pub fn no_matching(context: &'static str, reason: impl Into<String>) -> Self {
170 Self {
171 context,
172 kind: DslRejectionKind::NoMatchingTransition,
173 reason: reason.into(),
174 }
175 }
176
177 /// Construct an error with `kind = GuardRejected`.
178 pub fn guard_rejected(context: &'static str, reason: impl Into<String>) -> Self {
179 Self {
180 context,
181 kind: DslRejectionKind::GuardRejected,
182 reason: reason.into(),
183 }
184 }
185
186 /// True iff this rejection came from a guard evaluating false.
187 pub fn is_guard_rejected(&self) -> bool {
188 self.kind == DslRejectionKind::GuardRejected
189 }
190}
191
192// ---------------------------------------------------------------------------
193// Cross-crate peer prompt/context projection seam
194// ---------------------------------------------------------------------------
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
197#[serde(rename_all = "snake_case")]
198pub enum PeerResponseProgressProjectionPhase {
199 Accepted,
200 InProgress,
201 PartialResult,
202}
203
204impl PeerResponseProgressProjectionPhase {
205 fn label(self) -> &'static str {
206 match self {
207 Self::Accepted => "accepted",
208 Self::InProgress => "in_progress",
209 Self::PartialResult => "partial_result",
210 }
211 }
212}
213
214#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
215#[serde(rename_all = "snake_case")]
216pub enum PeerResponseTerminalProjectionStatus {
217 Completed,
218 Failed,
219 Cancelled,
220}
221
222impl PeerResponseTerminalProjectionStatus {
223 fn label(self) -> &'static str {
224 match self {
225 Self::Completed => "completed",
226 Self::Failed => "failed",
227 Self::Cancelled => "cancelled",
228 }
229 }
230}
231
232#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
233pub enum PeerResponseTerminalFactError {
234 #[error("transport identity cannot be empty")]
235 EmptyTransportIdentity,
236 #[error("route identity cannot be empty")]
237 EmptyRouteIdentity,
238 #[error("route identity must be a canonical peer UUID")]
239 InvalidRouteIdentity,
240 #[error("display identity is required")]
241 MissingDisplayIdentity,
242 #[error("display identity cannot be empty")]
243 EmptyDisplayIdentity,
244 #[error("display identity cannot contain control characters")]
245 InvalidDisplayIdentity,
246 #[error("correlation id cannot be empty")]
247 EmptyCorrelationId,
248 #[error("correlation id must be a UUID: {input}")]
249 InvalidCorrelationId { input: String },
250}
251
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct PeerResponseTerminalTransportIdentity(String);
254
255impl PeerResponseTerminalTransportIdentity {
256 pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
257 let raw = raw.into();
258 if raw.trim().is_empty() {
259 return Err(PeerResponseTerminalFactError::EmptyTransportIdentity);
260 }
261 Ok(Self(raw))
262 }
263
264 pub fn as_str(&self) -> &str {
265 &self.0
266 }
267}
268
269impl std::fmt::Display for PeerResponseTerminalTransportIdentity {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 self.0.fmt(f)
272 }
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
276pub struct PeerResponseTerminalRouteIdentity(String);
277
278impl PeerResponseTerminalRouteIdentity {
279 pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
280 let raw = raw.into();
281 if raw.trim().is_empty() {
282 return Err(PeerResponseTerminalFactError::EmptyRouteIdentity);
283 }
284 if raw.chars().any(char::is_control) {
285 return Err(PeerResponseTerminalFactError::InvalidRouteIdentity);
286 }
287 let peer_id = crate::comms::PeerId::parse(raw.trim())
288 .map_err(|_| PeerResponseTerminalFactError::InvalidRouteIdentity)?;
289 Ok(Self(peer_id.to_string()))
290 }
291
292 pub fn as_str(&self) -> &str {
293 &self.0
294 }
295}
296
297impl std::fmt::Display for PeerResponseTerminalRouteIdentity {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 self.0.fmt(f)
300 }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq)]
304pub struct PeerResponseTerminalDisplayIdentity(String);
305
306impl PeerResponseTerminalDisplayIdentity {
307 pub fn parse(raw: impl Into<String>) -> Result<Self, PeerResponseTerminalFactError> {
308 let raw = raw.into();
309 if raw.trim().is_empty() {
310 return Err(PeerResponseTerminalFactError::EmptyDisplayIdentity);
311 }
312 if raw.chars().any(char::is_control) {
313 return Err(PeerResponseTerminalFactError::InvalidDisplayIdentity);
314 }
315 Ok(Self(raw))
316 }
317
318 pub fn as_str(&self) -> &str {
319 &self.0
320 }
321}
322
323impl std::fmt::Display for PeerResponseTerminalDisplayIdentity {
324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325 self.0.fmt(f)
326 }
327}
328
329#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
330pub struct PeerResponseTerminalCorrelationId(PeerCorrelationId);
331
332impl PeerResponseTerminalCorrelationId {
333 pub fn parse(raw: impl AsRef<str>) -> Result<Self, PeerResponseTerminalFactError> {
334 let raw = raw.as_ref();
335 if raw.trim().is_empty() {
336 return Err(PeerResponseTerminalFactError::EmptyCorrelationId);
337 }
338 uuid::Uuid::parse_str(raw)
339 .map(|uuid| Self(PeerCorrelationId::from_uuid(uuid)))
340 .map_err(|_| PeerResponseTerminalFactError::InvalidCorrelationId {
341 input: raw.to_string(),
342 })
343 }
344
345 pub const fn from_peer_correlation_id(correlation_id: PeerCorrelationId) -> Self {
346 Self(correlation_id)
347 }
348
349 pub const fn as_peer_correlation_id(self) -> PeerCorrelationId {
350 self.0
351 }
352}
353
354impl std::fmt::Display for PeerResponseTerminalCorrelationId {
355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356 self.0.fmt(f)
357 }
358}
359
360#[derive(Debug, Clone, PartialEq)]
361pub struct PeerResponseTerminalRenderPayload(Option<serde_json::Value>);
362
363impl PeerResponseTerminalRenderPayload {
364 pub fn new(payload: Option<serde_json::Value>) -> Self {
365 Self(payload)
366 }
367
368 pub fn as_ref(&self) -> Option<&serde_json::Value> {
369 self.0.as_ref()
370 }
371}
372
373impl From<Option<serde_json::Value>> for PeerResponseTerminalRenderPayload {
374 fn from(payload: Option<serde_json::Value>) -> Self {
375 Self::new(payload)
376 }
377}
378
379#[derive(Debug, Clone, PartialEq, Eq)]
380pub struct PeerResponseTerminalSource {
381 pub transport_identity: Option<PeerResponseTerminalTransportIdentity>,
382 pub route_identity: PeerResponseTerminalRouteIdentity,
383 pub display_identity: PeerResponseTerminalDisplayIdentity,
384}
385
386impl PeerResponseTerminalSource {
387 pub fn new(
388 transport_identity: Option<PeerResponseTerminalTransportIdentity>,
389 route_identity: PeerResponseTerminalRouteIdentity,
390 display_identity: PeerResponseTerminalDisplayIdentity,
391 ) -> Self {
392 Self {
393 transport_identity,
394 route_identity,
395 display_identity,
396 }
397 }
398
399 pub fn parse(
400 transport_identity: Option<impl Into<String>>,
401 route_identity: impl Into<String>,
402 display_identity: impl Into<String>,
403 ) -> Result<Self, PeerResponseTerminalFactError> {
404 Ok(Self::new(
405 transport_identity
406 .map(PeerResponseTerminalTransportIdentity::parse)
407 .transpose()?,
408 PeerResponseTerminalRouteIdentity::parse(route_identity)?,
409 PeerResponseTerminalDisplayIdentity::parse(display_identity)?,
410 ))
411 }
412}
413
414#[derive(Debug, Clone, PartialEq)]
415pub struct PeerResponseTerminalFact {
416 pub source: PeerResponseTerminalSource,
417 pub correlation_id: PeerResponseTerminalCorrelationId,
418 pub status: PeerResponseTerminalProjectionStatus,
419 pub render_payload: PeerResponseTerminalRenderPayload,
420}
421
422impl PeerResponseTerminalFact {
423 pub fn new(
424 source: PeerResponseTerminalSource,
425 correlation_id: PeerResponseTerminalCorrelationId,
426 status: PeerResponseTerminalProjectionStatus,
427 render_payload: PeerResponseTerminalRenderPayload,
428 ) -> Self {
429 Self {
430 source,
431 correlation_id,
432 status,
433 render_payload,
434 }
435 }
436
437 pub fn prompt_text(&self) -> String {
438 format!(
439 "[SYSTEM NOTICE][PEER_RESPONSE_TERMINAL] Correlated peer response from {}. Request ID: {}. Status: {}. Result: {}.",
440 self.source.display_identity,
441 self.correlation_id,
442 self.status.label(),
443 format_peer_projection_payload(self.render_payload.as_ref())
444 )
445 }
446
447 pub fn context_key(&self) -> String {
448 peer_response_terminal_context_key(&self.source.route_identity, self.correlation_id)
449 }
450}
451
452#[derive(Debug, Clone, PartialEq)]
453pub enum PeerConversationProjection {
454 Message {
455 peer_id: String,
456 },
457 Request {
458 peer_id: crate::comms::PeerId,
459 display_name: Option<String>,
460 request_id: String,
461 intent: String,
462 payload: Option<serde_json::Value>,
463 },
464 ResponseProgress {
465 peer_id: String,
466 request_id: String,
467 phase: PeerResponseProgressProjectionPhase,
468 payload: Option<serde_json::Value>,
469 },
470 ResponseTerminal {
471 fact: PeerResponseTerminalFact,
472 },
473}
474
475impl PeerConversationProjection {
476 pub fn response_terminal(fact: PeerResponseTerminalFact) -> Self {
477 Self::ResponseTerminal { fact }
478 }
479
480 pub fn block_prefix_text(&self) -> Option<String> {
481 match self {
482 Self::Message { peer_id } => Some(format!("[COMMS MESSAGE from {peer_id}]")),
483 Self::Request { .. }
484 | Self::ResponseProgress { .. }
485 | Self::ResponseTerminal { .. } => None,
486 }
487 }
488
489 pub fn prompt_text(&self) -> String {
490 match self {
491 Self::Message { .. } => String::new(),
492 Self::Request {
493 peer_id,
494 display_name,
495 request_id,
496 intent,
497 payload,
498 } => {
499 let display_suffix = display_name
500 .as_deref()
501 .map(str::trim)
502 .filter(|name| !name.is_empty())
503 .map(|name| format!(" (display_name: {name})"))
504 .unwrap_or_default();
505 let response_call = crate::interaction::SendResponseCallProjection::new(
506 *peer_id,
507 display_name.as_deref(),
508 request_id.clone(),
509 );
510 format!(
511 "[SYSTEM NOTICE][PEER_REQUEST] Correlated peer request from peer_id {peer_id}{display_suffix}. Intent: {intent}. Request ID: {request_id}. Params: {}. This is not a normal user request and not a prompt for direct user-facing output. {} Do not use send_message for this reply.",
512 format_peer_projection_payload(payload.as_ref()),
513 response_call.instruction_text()
514 )
515 }
516 Self::ResponseProgress {
517 peer_id,
518 request_id,
519 phase,
520 payload,
521 } => format!(
522 "[SYSTEM NOTICE][PEER_RESPONSE_PROGRESS] Correlated peer response progress from {peer_id}. Request ID: {request_id}. Phase: {}. Payload: {}.",
523 phase.label(),
524 format_peer_projection_payload(payload.as_ref())
525 ),
526 Self::ResponseTerminal { fact } => fact.prompt_text(),
527 }
528 }
529
530 pub fn context_key(&self) -> Option<String> {
531 match self {
532 Self::ResponseTerminal { fact } => Some(fact.context_key()),
533 Self::Message { .. } | Self::Request { .. } | Self::ResponseProgress { .. } => None,
534 }
535 }
536}
537
538pub fn peer_response_terminal_context_key(
539 route_identity: &PeerResponseTerminalRouteIdentity,
540 correlation_id: PeerResponseTerminalCorrelationId,
541) -> String {
542 format!("peer_response_terminal:{route_identity}:{correlation_id}")
543}
544
545fn format_peer_projection_payload(payload: Option<&serde_json::Value>) -> String {
546 serde_json::to_string_pretty(payload.unwrap_or(&serde_json::Value::Null))
547 .unwrap_or_else(|_| "null".to_string())
548}
549
550// ---------------------------------------------------------------------------
551// TurnStateHandle
552// ---------------------------------------------------------------------------
553
554#[derive(Debug, Clone, PartialEq, Eq)]
555pub struct TurnStateSnapshot {
556 pub active_run_id: Option<RunId>,
557 /// Observable loop-state projection supplied by the turn-state owner.
558 ///
559 /// Consumers should not reclassify [`TurnPhase`] locally. Runtime-backed
560 /// handles derive this from the same DSL snapshot as `turn_phase`; test
561 /// handles do the same from their in-core test state.
562 pub loop_state: LoopState,
563 pub turn_phase: TurnPhase,
564 /// Typed primitive kind recorded by the DSL (dogma #5, #19 — no stringly
565 /// discriminants). `None` means no primitive is currently in flight.
566 pub primitive_kind: Option<TurnPrimitiveKind>,
567 pub admitted_content_shape: Option<ContentShape>,
568 pub vision_enabled: bool,
569 pub image_tool_results_enabled: bool,
570 pub tool_calls_pending: u64,
571 pub pending_op_refs: BTreeSet<AsyncOpRef>,
572 pub barrier_operation_ids: BTreeSet<OperationId>,
573 pub has_barrier_ops: bool,
574 pub barrier_satisfied: bool,
575 pub boundary_count: u64,
576 pub cancel_after_boundary: bool,
577 /// Typed terminal outcome recorded by the DSL (dogma #5, #19 — no stringly
578 /// discriminants). `None` means the turn has not reached a terminal phase.
579 pub terminal_outcome: Option<TurnTerminalOutcome>,
580 /// Typed terminal cause recorded by the DSL. `None` means no failure cause
581 /// has been selected for the current turn.
582 pub terminal_cause_kind: Option<TurnTerminalCauseKind>,
583 pub extraction_attempts: u64,
584 pub max_extraction_retries: u64,
585 pub llm_retry_attempt: u32,
586 pub llm_retry_max_retries: u32,
587 pub llm_retry_selected_delay_ms: u64,
588}
589
590/// Turn-execution DSL handle.
591pub trait TurnStateHandle: Send + Sync {
592 fn start_conversation_run(
593 &self,
594 run_id: RunId,
595 primitive_kind: TurnPrimitiveKind,
596 admitted_content_shape: ContentShape,
597 vision_enabled: bool,
598 image_tool_results_enabled: bool,
599 max_extraction_retries: u64,
600 ) -> Result<(), DslTransitionError>;
601
602 fn start_immediate_append(&self, run_id: RunId) -> Result<(), DslTransitionError>;
603
604 fn start_immediate_context(&self, run_id: RunId) -> Result<(), DslTransitionError>;
605
606 fn primitive_applied(&self) -> Result<(), DslTransitionError>;
607
608 fn llm_returned_tool_calls(&self, tool_count: u64) -> Result<(), DslTransitionError>;
609
610 fn llm_returned_terminal(&self) -> Result<(), DslTransitionError>;
611
612 fn register_pending_ops(
613 &self,
614 op_refs: BTreeSet<AsyncOpRef>,
615 barrier_operation_ids: BTreeSet<OperationId>,
616 ) -> Result<(), DslTransitionError>;
617
618 fn tool_calls_resolved(&self) -> Result<(), DslTransitionError>;
619
620 fn ops_barrier_satisfied(
621 &self,
622 operation_ids: BTreeSet<OperationId>,
623 ) -> Result<(), DslTransitionError>;
624
625 fn boundary_continue(&self) -> Result<(), DslTransitionError>;
626
627 fn boundary_complete(&self) -> Result<(), DslTransitionError>;
628
629 fn enter_extraction(&self, max_retries: u32) -> Result<(), DslTransitionError>;
630
631 fn extraction_start(&self) -> Result<(), DslTransitionError>;
632
633 fn extraction_validation_passed(&self) -> Result<(), DslTransitionError>;
634
635 fn extraction_validation_failed(&self, error: String) -> Result<(), DslTransitionError>;
636
637 fn extraction_failed(&self, error: String) -> Result<(), DslTransitionError>;
638
639 fn recoverable_failure(&self, retry: LlmRetrySchedule) -> Result<(), DslTransitionError>;
640
641 fn fatal_failure(&self, reason: TurnFailureReason) -> Result<(), DslTransitionError>;
642
643 fn retry_requested(&self, retry_attempt: u32) -> Result<(), DslTransitionError>;
644
645 fn cancel_now(&self) -> Result<(), DslTransitionError>;
646
647 fn request_cancel_after_boundary(&self) -> Result<(), DslTransitionError>;
648
649 fn cancellation_observed(&self) -> Result<(), DslTransitionError>;
650
651 fn acknowledge_terminal(&self, outcome: TurnTerminalOutcome) -> Result<(), DslTransitionError>;
652
653 fn turn_limit_reached(&self) -> Result<(), DslTransitionError>;
654
655 fn budget_exhausted(&self) -> Result<(), DslTransitionError>;
656
657 fn time_budget_exceeded(&self) -> Result<(), DslTransitionError>;
658
659 fn force_cancel_no_run(&self) -> Result<(), DslTransitionError>;
660
661 fn run_completed(&self, run_id: RunId) -> Result<(), DslTransitionError>;
662
663 fn run_failed(
664 &self,
665 run_id: RunId,
666 reason: TurnFailureReason,
667 ) -> Result<(), DslTransitionError>;
668
669 fn run_cancelled(&self, run_id: RunId) -> Result<(), DslTransitionError>;
670
671 fn snapshot(&self) -> TurnStateSnapshot;
672}
673
674// ---------------------------------------------------------------------------
675// CommsDrainHandle
676// ---------------------------------------------------------------------------
677
678/// Comms drain lifecycle DSL handle.
679///
680/// Covers the `drain_phase`/`drain_mode` DSL substate: ensure/spawn/stop the
681/// comms drain task and observe clean vs respawnable exits.
682pub trait CommsDrainHandle: Send + Sync {
683 /// Fire the `EnsureDrainRunning` signal — lazy spawn path.
684 fn ensure_drain_running(&self) -> Result<(), DslTransitionError>;
685
686 /// Fire the `SpawnDrain { mode }` input — explicit spawn with typed mode.
687 fn spawn_drain(&self, mode: DrainMode) -> Result<(), DslTransitionError>;
688
689 /// Fire the `StopDrain` input.
690 fn stop_drain(&self) -> Result<(), DslTransitionError>;
691
692 /// Fire the `DrainExitedClean` input (drain stopped without failure).
693 fn drain_exited_clean(&self) -> Result<(), DslTransitionError>;
694
695 /// Fire the `DrainExitedRespawnable` input (drain exited and can be respawned).
696 fn drain_exited_respawnable(&self) -> Result<(), DslTransitionError>;
697
698 /// Fire the `NotifyDrainExited { reason }` input with a typed reason.
699 fn notify_drain_exited(&self, reason: DrainExitReason) -> Result<(), DslTransitionError>;
700}
701
702// ---------------------------------------------------------------------------
703// ExternalToolSurfaceHandle
704// ---------------------------------------------------------------------------
705
706#[derive(Debug, Clone, PartialEq, Eq)]
707pub struct SurfaceSnapshot {
708 pub surface_id: String,
709 /// Typed base lifecycle state (dogma #5, #17 — no stringly discriminants
710 /// across the cross-crate handle boundary).
711 pub base_state: Option<ExternalToolSurfaceBaseState>,
712 pub pending_op: ExternalToolSurfacePendingOp,
713 pub staged_op: ExternalToolSurfaceStagedOp,
714 pub staged_intent_sequence: Option<u64>,
715 pub pending_task_sequence: Option<u64>,
716 pub pending_lineage_sequence: Option<u64>,
717 pub inflight_calls: u64,
718 /// Typed last-emitted delta operation (dogma #5, #17).
719 pub last_delta_operation: Option<ExternalToolSurfaceDeltaOperation>,
720 /// Typed last-emitted delta phase (dogma #5, #17).
721 pub last_delta_phase: Option<ExternalToolSurfaceDeltaPhase>,
722 pub removal_draining_since_ms: Option<u64>,
723 pub removal_timeout_at_ms: Option<u64>,
724 pub removal_applied_at_turn: Option<u64>,
725}
726
727#[derive(Debug, Clone, PartialEq, Eq)]
728pub struct SurfaceDiagnosticSnapshot {
729 pub surface_phase: ExternalToolSurfaceGlobalPhase,
730 pub known_surfaces: BTreeSet<String>,
731 pub visible_surfaces: BTreeSet<String>,
732 pub snapshot_epoch: u64,
733 pub snapshot_aligned_epoch: u64,
734 pub has_pending_or_staged: bool,
735 pub entries: Vec<SurfaceSnapshot>,
736}
737
738#[derive(Debug, Clone, PartialEq, Eq)]
739pub enum ExternalToolSurfaceInput {
740 StageAdd {
741 surface_id: String,
742 now_ms: u64,
743 },
744 StageRemove {
745 surface_id: String,
746 now_ms: u64,
747 },
748 StageReload {
749 surface_id: String,
750 now_ms: u64,
751 },
752 ApplyBoundary {
753 surface_id: String,
754 now_ms: u64,
755 staged_intent_sequence: u64,
756 applied_at_turn: u64,
757 },
758 MarkPendingSucceeded {
759 surface_id: String,
760 pending_task_sequence: u64,
761 staged_intent_sequence: u64,
762 },
763 MarkPendingFailed {
764 surface_id: String,
765 pending_task_sequence: u64,
766 staged_intent_sequence: u64,
767 cause: ExternalToolSurfaceFailureCause,
768 },
769 CallStarted {
770 surface_id: String,
771 },
772 CallFinished {
773 surface_id: String,
774 },
775 FinalizeRemovalClean {
776 surface_id: String,
777 },
778 FinalizeRemovalForced {
779 surface_id: String,
780 },
781 SnapshotAligned {
782 epoch: u64,
783 },
784 Shutdown,
785}
786
787#[derive(Debug, Clone, PartialEq, Eq)]
788pub enum ExternalToolSurfaceEffect {
789 ScheduleSurfaceCompletion {
790 surface_id: String,
791 operation: ExternalToolSurfaceDeltaOperation,
792 pending_task_sequence: u64,
793 staged_intent_sequence: u64,
794 applied_at_turn: u64,
795 },
796 RefreshVisibleSurfaceSet {
797 snapshot_epoch: u64,
798 },
799 EmitExternalToolDelta {
800 surface_id: String,
801 operation: ExternalToolSurfaceDeltaOperation,
802 phase: ExternalToolSurfaceDeltaPhase,
803 cause: Option<ExternalToolSurfaceFailureCause>,
804 },
805 CloseSurfaceConnection {
806 surface_id: String,
807 },
808 RejectSurfaceCall {
809 surface_id: String,
810 cause: ExternalToolSurfaceFailureCause,
811 },
812}
813
814#[derive(Debug, Clone, PartialEq, Eq)]
815pub struct ExternalToolSurfaceTransition {
816 pub phase: ExternalToolSurfaceGlobalPhase,
817 pub effects: Vec<ExternalToolSurfaceEffect>,
818}
819
820/// External tool surface lifecycle DSL handle.
821pub trait ExternalToolSurfaceHandle: Send + Sync {
822 fn apply_surface_input(
823 &self,
824 input: ExternalToolSurfaceInput,
825 ) -> Result<ExternalToolSurfaceTransition, DslTransitionError>;
826
827 fn register(&self, surface_id: String) -> Result<(), DslTransitionError>;
828
829 fn stage_add(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
830
831 fn stage_remove(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
832
833 fn stage_reload(&self, surface_id: String, now_ms: u64) -> Result<(), DslTransitionError>;
834
835 fn apply_boundary(
836 &self,
837 surface_id: String,
838 now_ms: u64,
839 staged_intent_sequence: u64,
840 applied_at_turn: u64,
841 ) -> Result<(), DslTransitionError>;
842
843 fn mark_pending_succeeded(
844 &self,
845 surface_id: String,
846 pending_task_sequence: u64,
847 staged_intent_sequence: u64,
848 ) -> Result<(), DslTransitionError>;
849
850 fn mark_pending_failed(
851 &self,
852 surface_id: String,
853 pending_task_sequence: u64,
854 staged_intent_sequence: u64,
855 cause: ExternalToolSurfaceFailureCause,
856 ) -> Result<(), DslTransitionError>;
857
858 fn call_started(&self, surface_id: String) -> Result<(), DslTransitionError>;
859
860 fn call_finished(&self, surface_id: String) -> Result<(), DslTransitionError>;
861
862 fn finalize_removal_clean(&self, surface_id: String) -> Result<(), DslTransitionError>;
863
864 fn finalize_removal_forced(&self, surface_id: String) -> Result<(), DslTransitionError>;
865
866 fn snapshot_aligned(&self, epoch: u64) -> Result<(), DslTransitionError>;
867
868 fn shutdown_surface(&self) -> Result<(), DslTransitionError>;
869
870 fn surface_snapshot(&self, surface_id: &str) -> Option<SurfaceSnapshot>;
871
872 fn diagnostic_snapshot(&self) -> SurfaceDiagnosticSnapshot;
873
874 fn visible_surfaces(&self) -> BTreeSet<String>;
875
876 fn removing_surfaces(&self) -> BTreeSet<String>;
877
878 fn pending_surfaces(&self) -> BTreeSet<String>;
879
880 fn has_pending_or_staged(&self) -> bool;
881
882 fn snapshot_epoch(&self) -> u64;
883
884 fn snapshot_aligned_epoch(&self) -> u64;
885}
886
887// ---------------------------------------------------------------------------
888// PeerCommsHandle
889// ---------------------------------------------------------------------------
890
891/// Peer comms ingress classification DSL handle.
892///
893/// Covers the peer-envelope classification signals on the MeerkatMachine DSL.
894/// Runtime-backed comms ingress hands parsed transport facts to this handle
895/// and receives the complete typed admission/classification facts back. A
896/// rejection is authoritative and callers fail closed. Standalone comms
897/// runtimes may have no session DSL handle; those retain a local
898/// `PeerIngressMachinePolicy` adapter for wire-compatible operation without a
899/// session authority.
900pub trait PeerCommsHandle: Send + Sync {
901 /// Fire the `ClassifyExternalEnvelope` signal and return machine-owned
902 /// admission facts for the parsed envelope.
903 fn classify_external_envelope(
904 &self,
905 facts: PeerIngressEnvelopeFacts,
906 ) -> Result<PeerIngressAdmission, DslTransitionError>;
907
908 /// Fire the `ClassifyPlainEvent` signal and return machine-owned
909 /// admission facts for the parsed plain event.
910 fn classify_plain_event(
911 &self,
912 facts: PeerIngressPlainEventFacts,
913 ) -> Result<PeerIngressAdmission, DslTransitionError>;
914
915 /// Fire the `SetPeerIngressContext { keep_alive }` input.
916 fn set_peer_ingress_context(&self, keep_alive: bool) -> Result<(), DslTransitionError>;
917}
918
919// ---------------------------------------------------------------------------
920// SessionAdmissionHandle
921// ---------------------------------------------------------------------------
922
923/// Session turn admission DSL handle.
924///
925/// Covers the admission-adjacent inputs on the MeerkatMachine DSL: ingest an
926/// input into the session, accept it (with or without wake), prepare a run,
927/// and commit the run. Failed run return is owned by the runtime turn-state
928/// path after a typed terminal cause is recorded. These inputs manage the input-lifecycle
929/// substate maps (`input_phases`, `input_run_associations`, etc.) and the
930/// top-level `current_run_id` / `pre_run_phase` fields.
931pub trait SessionAdmissionHandle: Send + Sync {
932 /// Fire the `Ingest { runtime_id, work_id, origin }` input.
933 ///
934 /// `runtime_id` is the stringified logical runtime id; `work_id` the
935 /// stringified work identifier (typically the same domain as `InputId`).
936 /// `origin` is the typed transport source that admitted the input
937 /// (dogma #5, #17 — no stringly discriminants across the handle boundary).
938 fn ingest(
939 &self,
940 runtime_id: &str,
941 work_id: &str,
942 origin: InputSource,
943 ) -> Result<(), DslTransitionError>;
944
945 /// Fire the `AcceptWithCompletion { input_id, request_immediate_processing,
946 /// interrupt_yielding, wake_if_idle, run_id }` input.
947 ///
948 /// `wake_if_idle` carries the policy-level "this input must wake the
949 /// runtime loop once the session reaches idle" intent (e.g.
950 /// `peer_response_terminal` staged while running): the DSL's
951 /// Running+Queued transition splits on it and emits a
952 /// `PostAdmissionSignal::WakeLoop` so the pending wake lands on the
953 /// next idle reach. Idle/Attached queued arms already wake
954 /// unconditionally, so the flag is ignored in those guards.
955 fn accept_with_completion(
956 &self,
957 input_id: &InputId,
958 request_immediate_processing: bool,
959 interrupt_yielding: bool,
960 wake_if_idle: bool,
961 ) -> Result<(), DslTransitionError>;
962
963 /// Fire the `AcceptWithoutWake { input_id }` input.
964 fn accept_without_wake(&self, input_id: &InputId) -> Result<(), DslTransitionError>;
965
966 /// Fire the `Prepare { session_id, run_id }` input — bound for the session this handle was prepared for.
967 fn prepare(&self, run_id: &RunId) -> Result<(), DslTransitionError>;
968
969 /// Observe a commit request. Runtime-backed implementations keep commit
970 /// terminalization on the machine-owned durable path.
971 fn commit(&self, input_id: &InputId, run_id: &RunId) -> Result<(), DslTransitionError>;
972}
973
974// ---------------------------------------------------------------------------
975// AuthLeaseHandle (Phase 1.5-rev)
976// ---------------------------------------------------------------------------
977
978/// Typed key for one auth lease machine.
979#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
980pub struct LeaseKey {
981 pub realm: crate::connection::RealmId,
982 pub binding: crate::connection::BindingId,
983 pub profile: Option<crate::connection::ProfileId>,
984}
985
986impl LeaseKey {
987 pub fn new(
988 realm: crate::connection::RealmId,
989 binding: crate::connection::BindingId,
990 profile: Option<crate::connection::ProfileId>,
991 ) -> Self {
992 Self {
993 realm,
994 binding,
995 profile,
996 }
997 }
998
999 pub fn from_auth_binding(auth_binding: &crate::connection::AuthBindingRef) -> Self {
1000 Self {
1001 realm: auth_binding.realm.clone(),
1002 binding: auth_binding.binding.clone(),
1003 profile: auth_binding.profile.clone(),
1004 }
1005 }
1006}
1007
1008impl std::fmt::Display for LeaseKey {
1009 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1010 match &self.profile {
1011 Some(profile) => write!(f, "{}:{}:{}", self.realm, self.binding, profile),
1012 None => write!(f, "{}:{}", self.realm, self.binding),
1013 }
1014 }
1015}
1016
1017/// Observable snapshot of an auth lease's DSL state for a given [`LeaseKey`].
1018///
1019/// Returned by [`AuthLeaseHandle::snapshot`]. If the binding is not tracked
1020/// at all, `phase` is `None` and `expires_at` is `None`. `generation`
1021/// advances when credential material is published. Non-publishing lifecycle
1022/// transitions, such as marking a lease expiring or a transient refresh failure,
1023/// do not advance this credential marker generation. This lets consumers
1024/// distinguish a stale projection from a freshly reacquired lease even when the
1025/// expiry timestamp is unchanged, without invalidating retryable stored
1026/// credentials after state-only transitions. OAuth login-flow membership
1027/// transitions also do not advance this credential marker generation.
1028/// `credential_present` distinguishes credential lifecycle authority from
1029/// OAuth login-flow membership that may keep an AuthMachine instance alive
1030/// after credential rollback.
1031/// `credential_published_at_millis` advances only when credential material is
1032/// acquired/refreshed.
1033#[derive(Debug, Clone, PartialEq, Eq)]
1034pub struct AuthLeaseSnapshot {
1035 pub phase: Option<AuthLeasePhase>,
1036 pub expires_at: Option<u64>,
1037 pub credential_present: bool,
1038 pub generation: u64,
1039 pub credential_published_at_millis: Option<u64>,
1040}
1041
1042/// Result of an accepted auth lease lifecycle transition.
1043///
1044/// `generation` is the projection version assigned while the transition is
1045/// accepted, so consumers can bind derived material to the exact lease state
1046/// that published it without taking a later snapshot.
1047/// `credential_published_at_millis` is the durable credential publication
1048/// timestamp attached to acquired/refreshed credential material.
1049#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1050pub struct AuthLeaseTransition {
1051 pub generation: u64,
1052 pub credential_published_at_millis: Option<u64>,
1053}
1054
1055impl AuthLeaseTransition {
1056 pub fn new(generation: u64, credential_published_at_millis: Option<u64>) -> Self {
1057 Self {
1058 generation,
1059 credential_published_at_millis,
1060 }
1061 }
1062}
1063
1064/// Window (in seconds) before `expires_at` at which a `valid` lease is
1065/// eligible to transition into `expiring` at the next CallingLlm
1066/// boundary. Owned here — on the handle trait module — rather than in
1067/// shell code, per dogma §9 ("policy composes at the facade/factory
1068/// seam, not in random helpers") and §20 ("every important behavior
1069/// reduces to one clear owner").
1070///
1071/// The actual state transition is gated by the AuthMachine DSL's
1072/// `MarkAuthExpiring` input (which enforces the `valid → expiring`
1073/// legality); this constant only controls *when* the runner fires
1074/// that input, not whether the transition is legal.
1075pub const AUTH_LEASE_TTL_REFRESH_WINDOW_SECS: u64 = 60;
1076
1077/// Auth lease lifecycle DSL handle.
1078pub trait AuthLeaseHandle: Send + Sync + std::any::Any {
1079 /// Fire `AcquireAuthLease { lease_key, expires_at }` — unconditional.
1080 ///
1081 /// Moves the binding into `auth_valid_leases` and records its expiry.
1082 /// Returns the generation assigned by the accepted transition.
1083 fn acquire_lease(
1084 &self,
1085 lease_key: &LeaseKey,
1086 expires_at: u64,
1087 ) -> Result<AuthLeaseTransition, DslTransitionError>;
1088
1089 /// Fire `MarkAuthExpiring { lease_key }` — only legal from `valid`.
1090 fn mark_expiring(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
1091
1092 /// Fire `BeginAuthRefresh { lease_key }` — legal from `valid` or
1093 /// `expiring`.
1094 ///
1095 /// Provides the DSL-level refresh dedup: once the binding is in
1096 /// `auth_refreshing_leases`, no concurrent `BeginAuthRefresh` is
1097 /// permitted until `CompleteAuthRefresh` or `AuthRefreshFailed` moves
1098 /// it back out.
1099 fn begin_refresh(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
1100
1101 /// Fire `CompleteAuthRefresh { lease_key, new_expires_at, now }` — only
1102 /// legal from `refreshing`. Returns the generation assigned by the accepted
1103 /// transition.
1104 fn complete_refresh(
1105 &self,
1106 lease_key: &LeaseKey,
1107 new_expires_at: u64,
1108 now: u64,
1109 ) -> Result<AuthLeaseTransition, DslTransitionError>;
1110
1111 /// Fire `AuthRefreshFailed { lease_key, permanent }` — only legal from
1112 /// `refreshing`. `permanent=true` routes to `reauth_required` and emits a
1113 /// reauth notice; `permanent=false` routes back to `expiring`.
1114 fn refresh_failed(
1115 &self,
1116 lease_key: &LeaseKey,
1117 permanent: bool,
1118 ) -> Result<(), DslTransitionError>;
1119
1120 /// Fire `MarkReauthRequired { lease_key }` — any known state → reauth.
1121 fn mark_reauth_required(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
1122
1123 /// Fire `ReleaseAuthLease { lease_key }` — removes the binding from all
1124 /// sets and the expiry map.
1125 fn release_lease(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError>;
1126
1127 /// Clear credential lifecycle authority without treating persisted token
1128 /// bytes as a new lease source.
1129 ///
1130 /// Handles that co-locate short-lived OAuth flow membership with credential
1131 /// lifecycle state should preserve those flow memberships when clearing
1132 /// only the credential side after a failed login commit.
1133 fn release_credential_lifecycle(&self, lease_key: &LeaseKey) -> Result<(), DslTransitionError> {
1134 self.release_lease(lease_key)
1135 }
1136
1137 /// Restore a captured lifecycle snapshot after a later durable write failed.
1138 ///
1139 /// The default implementation can reconstruct credential-present snapshots
1140 /// through public lease transitions. Runtime-backed handles override this to
1141 /// preserve machine-owned metadata such as credential publication time and to
1142 /// restore empty snapshots without leaving an unretryable generation behind.
1143 fn restore_auth_lifecycle_snapshot(
1144 &self,
1145 lease_key: &LeaseKey,
1146 snapshot: &AuthLeaseSnapshot,
1147 expires_at: Option<u64>,
1148 ) -> Result<(), DslTransitionError> {
1149 if !snapshot.credential_present {
1150 return Ok(());
1151 }
1152 let Some(phase) = snapshot.phase else {
1153 return Ok(());
1154 };
1155 if phase == AuthLeasePhase::Released {
1156 return Ok(());
1157 }
1158 let Some(expires_at) = expires_at else {
1159 return Ok(());
1160 };
1161 self.acquire_lease(lease_key, expires_at)?;
1162 match phase {
1163 AuthLeasePhase::Valid => Ok(()),
1164 AuthLeasePhase::Expiring => self.mark_expiring(lease_key),
1165 AuthLeasePhase::Refreshing => self.begin_refresh(lease_key),
1166 AuthLeasePhase::ReauthRequired => self.mark_reauth_required(lease_key),
1167 AuthLeasePhase::Released => Ok(()),
1168 }
1169 }
1170
1171 /// Observe the current DSL-level state of a binding.
1172 fn snapshot(&self, lease_key: &LeaseKey) -> AuthLeaseSnapshot;
1173}
1174
1175// ---------------------------------------------------------------------------
1176// McpServerLifecycleHandle (Phase 5G / T5g)
1177// ---------------------------------------------------------------------------
1178
1179/// MCP client handshake lifecycle DSL handle (session-scoped).
1180///
1181/// Routes each per-server MCP handshake event into the MeerkatMachine DSL's
1182/// `mcp_server_states` substate. Distinct from the external-tool surface
1183/// lifecycle (which tracks staged/pending *tool surface* intents): this handle
1184/// tracks per-server *connection* lifecycle (PendingConnect → Connected |
1185/// Failed | Disconnected), keyed by the configured MCP server name.
1186///
1187/// Read side (`pending_server_ids`) is the authoritative source for the
1188/// `[MCP_PENDING]` system-notice toggle — any server in `PendingConnect` means
1189/// the notice is emitted; otherwise the notice is suppressed.
1190///
1191/// Concrete impls live in `meerkat-runtime`; standalone callers (tests,
1192/// fixtures) pass `None` for the handle and the router's shell-level behavior
1193/// remains identical (DSL record-keeping is skipped, which is fine because
1194/// there is no session DSL to mirror into).
1195pub trait McpServerLifecycleHandle: Send + Sync {
1196 /// Fire `McpServerConnectPending { server_id }` — server staged for
1197 /// background connect.
1198 fn apply_connect_pending(&self, server_id: &str) -> Result<(), DslTransitionError>;
1199
1200 /// Fire `McpServerConnected { server_id }` — handshake succeeded.
1201 fn apply_connected(&self, server_id: &str) -> Result<(), DslTransitionError>;
1202
1203 /// Fire `McpServerFailed { server_id, error }` — handshake failed.
1204 fn apply_failed(&self, server_id: &str, error: &str) -> Result<(), DslTransitionError>;
1205
1206 /// Fire `McpServerDisconnected { server_id }` — connection closed.
1207 fn apply_disconnected(&self, server_id: &str) -> Result<(), DslTransitionError>;
1208
1209 /// Fire `McpServerReload { server_id }` — reload requested; server returns
1210 /// to `PendingConnect` while the shell tears down and redials.
1211 fn apply_reload(&self, server_id: &str) -> Result<(), DslTransitionError>;
1212
1213 /// Observe the set of server ids currently in `PendingConnect`.
1214 ///
1215 /// Used by the agent loop to drive the `[MCP_PENDING]` system-notice
1216 /// lifecycle: non-empty → emit notice; empty → strip notice.
1217 fn pending_server_ids(&self) -> BTreeSet<String>;
1218}
1219
1220// ---------------------------------------------------------------------------
1221// PeerInteractionHandle (W1-A / issue #264)
1222// ---------------------------------------------------------------------------
1223
1224/// Terminal disposition companion for [`PeerInteractionHandle::response_terminal`].
1225///
1226/// Carried as a typed wire value so the DSL can route `Completed` / `Failed`
1227/// terminal transitions without the shell re-interpreting `ResponseStatus`.
1228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1229#[non_exhaustive]
1230pub enum PeerTerminalDisposition {
1231 /// Terminal response with `Completed` status.
1232 Completed,
1233 /// Terminal response with `Failed` status.
1234 Failed,
1235}
1236
1237/// Peer request / response lifecycle DSL handle (W1-A).
1238///
1239/// Routes the full peer-interaction lifecycle — outbound `Sent`,
1240/// progress / terminal response arrival, timeouts, and inbound
1241/// `Received` / `Replied` — into the MeerkatMachine DSL's
1242/// `pending_peer_requests` / `inbound_peer_requests` substate maps.
1243///
1244/// Terminal transitions emit a DSL-owned cleanup effect that the shell
1245/// observes to drop any subscriber / stream channel associated with the
1246/// correlation id. The channels themselves live in shell-owned maps (they
1247/// hold `mpsc::Sender` values that cannot live in DSL state); those maps
1248/// are strict projections of DSL state, with the invariant "channel live
1249/// iff `corr_id ∈ pending ∧ state ≠ terminal`" enforced by the effect.
1250pub trait PeerInteractionHandle: Send + Sync {
1251 /// Fire `PeerRequestSent { corr_id, to }`.
1252 ///
1253 /// Guard: `corr_id` is not already in `pending_peer_requests`.
1254 fn request_sent(
1255 &self,
1256 corr_id: PeerCorrelationId,
1257 to: String,
1258 ) -> Result<(), DslTransitionError>;
1259
1260 /// Fire `PeerResponseProgressArrived { corr_id }`.
1261 ///
1262 /// Guard: `corr_id` is in `pending_peer_requests`. Progress after
1263 /// progress is admitted as a self-loop (the DSL overwrites the state
1264 /// slot). Rejects on unknown corr_id.
1265 fn response_progress(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1266
1267 /// Fire `PeerResponseTerminalArrived { corr_id, disposition }`.
1268 ///
1269 /// Guard: `corr_id` is in `pending_peer_requests`. Terminal transitions
1270 /// remove the map entry and emit the `PeerInteractionCleanup` effect,
1271 /// so any second terminal on the same corr_id is rejected at the
1272 /// `pending_exists` guard by construction.
1273 fn response_terminal(
1274 &self,
1275 corr_id: PeerCorrelationId,
1276 disposition: PeerTerminalDisposition,
1277 ) -> Result<(), DslTransitionError>;
1278
1279 /// Fire `PeerRequestTimedOut { corr_id }`.
1280 ///
1281 /// Guard: `corr_id` is in `pending_peer_requests`. Like `response_terminal`,
1282 /// the map entry is removed on success and the `PeerInteractionCleanup`
1283 /// effect is emitted; subsequent fires fail the guard.
1284 fn request_timed_out(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1285
1286 /// Fire `PeerRequestReceived { corr_id }` (inbound).
1287 ///
1288 /// Guard: `corr_id` is not already in `inbound_peer_requests`.
1289 fn request_received(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1290
1291 /// Fire `PeerResponseReplied { corr_id }` (inbound reply sent).
1292 ///
1293 /// Guard: `corr_id` is in `inbound_peer_requests` with state `Received`.
1294 fn response_replied(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1295
1296 /// Observe the DSL-owned state of an outbound peer request.
1297 ///
1298 /// Returns `None` if the correlation id is not in `pending_peer_requests`.
1299 fn outbound_state(&self, corr_id: PeerCorrelationId) -> Option<OutboundPeerRequestState>;
1300
1301 /// Observe the DSL-owned state of an inbound peer request.
1302 fn inbound_state(&self, corr_id: PeerCorrelationId) -> Option<InboundPeerRequestState>;
1303
1304 /// Install a projection-cleanup observer for the peer-interaction
1305 /// lifecycle. The runtime handle invokes the observer whenever a DSL
1306 /// transition emits `PeerInteractionCleanup`, closing the loop
1307 /// "terminal transition → effect → shell projection cleanup".
1308 ///
1309 /// Implementations with no observer simply drop any emitted cleanup
1310 /// notifications on the floor. Standalone / WASM paths leave this
1311 /// unset.
1312 fn install_cleanup_observer(&self, observer: Arc<dyn PeerInteractionCleanupObserver>);
1313}
1314
1315/// Observer invoked by [`PeerInteractionHandle`] when a DSL
1316/// `PeerInteractionCleanup` effect is emitted.
1317///
1318/// Shell-owned projection consumers (the comms runtime's subscriber /
1319/// stream registries) implement this to drop channel entries keyed on the
1320/// terminated correlation id. The observer is invoked under the same
1321/// authority lock as the transition that emitted the effect, so the
1322/// "terminal transition → effect → cleanup" chain is causal, not lexically
1323/// adjacent.
1324pub trait PeerInteractionCleanupObserver: Send + Sync {
1325 /// Called once per emitted `PeerInteractionCleanup { corr_id }` effect.
1326 ///
1327 /// Idempotent: a well-formed DSL run emits exactly one cleanup per
1328 /// correlation id because terminal transitions remove the map entry
1329 /// (subsequent attempts are rejected at the `pending_exists` guard),
1330 /// but observers should tolerate a redundant call defensively.
1331 fn on_peer_interaction_cleanup(&self, corr_id: PeerCorrelationId);
1332}
1333
1334/// Session-context advancement DSL handle (W2-E / issue #264).
1335///
1336/// Shell callers fire `context_advanced(updated_at_ms)` at every site that
1337/// mutates canonical session truth (prompt append, external content
1338/// injection, tool-result append, external assistant output,
1339/// runtime-system-context append, any `summary_tx.send_replace`). The
1340/// transition is monotonic: the DSL guard drops ticks whose `updated_at_ms`
1341/// isn't strictly greater than the last recorded watermark, so callers can
1342/// fire unconditionally post-mutation.
1343///
1344/// Every successful transition emits `SessionContextAdvanced` which is
1345/// dispatched to the installed [`SessionContextAdvancedObserver`] — the
1346/// realtime projection consumer uses the observer to drive a typed
1347/// `ProjectionFreshness` state instead of polling a watch channel.
1348pub trait SessionContextHandle: Send + Sync {
1349 /// Fire `AdvanceSessionContext { updated_at_ms }`.
1350 ///
1351 /// Guard: `updated_at_ms` is strictly greater than the last recorded
1352 /// watermark. Returns `Ok(false)` when the guard rejects the tick as
1353 /// non-advancing (duplicate or out-of-order); returns `Ok(true)` when
1354 /// the transition lands and the effect is emitted. Transition errors
1355 /// (lock poisoning, unexpected DSL state) surface as `Err`.
1356 fn context_advanced(&self, updated_at_ms: u64) -> Result<bool, DslTransitionError>;
1357
1358 /// The monotonic watermark in milliseconds of the last successful
1359 /// `AdvanceSessionContext` transition recorded on this handle.
1360 ///
1361 /// Returns `0` before any advance has been recorded. The realtime
1362 /// projection consumer reads this once at install time to seed its
1363 /// `ProjectionFreshness` baseline, so the consumer and the DSL agree
1364 /// on the initial frontier by construction (no two-read race).
1365 fn current_watermark_ms(&self) -> u64;
1366
1367 /// Install a typed observer for `SessionContextAdvanced` effect
1368 /// emission. Implementations without an installed observer drop the
1369 /// effect on the floor (standalone / WASM paths).
1370 fn install_observer(&self, observer: Arc<dyn SessionContextAdvancedObserver>);
1371
1372 /// Atomically install a typed observer and return the current watermark
1373 /// as a single critical section. Implementations MUST hold the same
1374 /// authority lock that `context_advanced` uses for both the watermark
1375 /// read and the observer installation, so no `SessionContextAdvanced`
1376 /// effect can slip between "sampled baseline" and "observer visible".
1377 ///
1378 /// Callers use the returned `u64` as their `ProjectionFreshness`
1379 /// baseline; any subsequent `context_advanced` tick is guaranteed to
1380 /// either (a) have already been included in the returned watermark, or
1381 /// (b) be visible to the observer. The `current_watermark_ms` +
1382 /// `install_observer` pair is NOT a substitute: a transition can land
1383 /// between those two non-atomic steps and be lost to both the baseline
1384 /// and the observer.
1385 fn install_observer_with_baseline(
1386 &self,
1387 observer: Arc<dyn SessionContextAdvancedObserver>,
1388 ) -> u64 {
1389 // Default combines the two primitives for backwards compatibility
1390 // with any external impls that do not yet override this method.
1391 // Runtime impls override to provide the atomic guarantee.
1392 self.install_observer(observer);
1393 self.current_watermark_ms()
1394 }
1395}
1396
1397/// Observer invoked by [`SessionContextHandle`] when a DSL
1398/// `SessionContextAdvanced` effect is emitted (W2-E / issue #264).
1399///
1400/// The realtime projection consumer implements this to advance its typed
1401/// `ProjectionFreshness` state. Runtime handles sample the installed
1402/// observer under the same authority lock as the transition that emitted
1403/// the effect, then dispatch the callback immediately after releasing the
1404/// lock so re-entrant observer implementations can safely route back
1405/// through the same DSL authority.
1406pub trait SessionContextAdvancedObserver: Send + Sync {
1407 /// Called once per emitted `SessionContextAdvanced { updated_at_ms }`
1408 /// effect. `updated_at_ms` is the monotonic millisecond watermark of
1409 /// the canonical session-context mutation that produced this tick.
1410 fn on_session_context_advanced(&self, updated_at_ms: u64);
1411}
1412
1413// ---------------------------------------------------------------------------
1414// SessionClaimHandle (dogma #2 — canonical session-identity owner)
1415// ---------------------------------------------------------------------------
1416
1417/// Error surfaced by [`SessionClaimHandle::try_acquire`].
1418#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
1419pub enum SessionClaimError {
1420 /// Another live claim already exists for this session id.
1421 #[error("session identity already claimed: {0}")]
1422 SessionIdentityInUse(SessionId),
1423}
1424
1425/// RAII token returned by [`SessionClaimHandle::try_acquire`].
1426///
1427/// While alive, the underlying registry guarantees no other caller can
1428/// acquire a claim for the same `session_id`. Drop releases the claim back
1429/// through the owning handle.
1430pub struct SessionClaim {
1431 session_id: SessionId,
1432 handle: Arc<dyn SessionClaimHandle>,
1433}
1434
1435impl SessionClaim {
1436 /// Construct a new claim — only [`SessionClaimHandle`] impls should call
1437 /// this, immediately after they have inserted `session_id` into their
1438 /// canonical registry under a single critical section.
1439 pub fn new(session_id: SessionId, handle: Arc<dyn SessionClaimHandle>) -> Self {
1440 Self { session_id, handle }
1441 }
1442
1443 /// The session id this claim covers.
1444 pub fn session_id(&self) -> &SessionId {
1445 &self.session_id
1446 }
1447}
1448
1449impl Drop for SessionClaim {
1450 fn drop(&mut self) {
1451 self.handle.release(&self.session_id);
1452 }
1453}
1454
1455impl std::fmt::Debug for SessionClaim {
1456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1457 f.debug_struct("SessionClaim")
1458 .field("session_id", &self.session_id)
1459 .finish_non_exhaustive()
1460 }
1461}
1462
1463/// Process-scope canonical owner of "this session id is currently active."
1464///
1465/// One canonical owner per process: `MeerkatMachine` exposes its registry
1466/// when a runtime is wired (so every live runtime-registered session also
1467/// owns its identity claim), and a default in-process registry covers bare
1468/// `AgentFactory` callers without a runtime. Either way, "this session id
1469/// is in use" lives in a typed owner — never in process-global shell
1470/// bookkeeping.
1471pub trait SessionClaimHandle: Send + Sync {
1472 /// Atomically reserve `session_id`. Returns a [`SessionClaim`] whose
1473 /// `Drop` releases the slot. Returns
1474 /// [`SessionClaimError::SessionIdentityInUse`] if another live claim
1475 /// already covers this session.
1476 ///
1477 /// Implementations MUST insert under a single critical section so two
1478 /// concurrent callers cannot both succeed.
1479 fn try_acquire(
1480 self: Arc<Self>,
1481 session_id: &SessionId,
1482 ) -> Result<SessionClaim, SessionClaimError>;
1483
1484 /// Release a claim previously created by [`Self::try_acquire`].
1485 ///
1486 /// Called from [`SessionClaim`]'s `Drop`. Idempotent: releasing an
1487 /// unknown id is a no-op (the registry was already cleared, e.g. via
1488 /// runtime teardown).
1489 fn release(&self, session_id: &SessionId);
1490}
1491
1492/// In-process default [`SessionClaimHandle`] for bare-usage paths that have
1493/// no `MeerkatMachine` available (standalone `AgentFactory` callers, doc
1494/// examples, simple SDK consumers). One process-global instance keeps the
1495/// "one active claim per session id" invariant intact even when no runtime
1496/// is wired.
1497pub struct DefaultSessionClaimRegistry {
1498 claims: std::sync::Mutex<std::collections::HashSet<SessionId>>,
1499}
1500
1501impl DefaultSessionClaimRegistry {
1502 /// Construct an empty registry.
1503 pub fn new() -> Self {
1504 Self {
1505 claims: std::sync::Mutex::new(std::collections::HashSet::new()),
1506 }
1507 }
1508
1509 /// Process-global instance — used by bare-usage facade builders.
1510 pub fn global() -> Arc<Self> {
1511 use std::sync::OnceLock;
1512 static GLOBAL: OnceLock<Arc<DefaultSessionClaimRegistry>> = OnceLock::new();
1513 Arc::clone(GLOBAL.get_or_init(|| Arc::new(DefaultSessionClaimRegistry::new())))
1514 }
1515}
1516
1517impl Default for DefaultSessionClaimRegistry {
1518 fn default() -> Self {
1519 Self::new()
1520 }
1521}
1522
1523impl SessionClaimHandle for DefaultSessionClaimRegistry {
1524 fn try_acquire(
1525 self: Arc<Self>,
1526 session_id: &SessionId,
1527 ) -> Result<SessionClaim, SessionClaimError> {
1528 let mut claims = self
1529 .claims
1530 .lock()
1531 .unwrap_or_else(std::sync::PoisonError::into_inner);
1532 if !claims.insert(session_id.clone()) {
1533 return Err(SessionClaimError::SessionIdentityInUse(session_id.clone()));
1534 }
1535 drop(claims);
1536 Ok(SessionClaim::new(
1537 session_id.clone(),
1538 self as Arc<dyn SessionClaimHandle>,
1539 ))
1540 }
1541
1542 fn release(&self, session_id: &SessionId) {
1543 let mut claims = self
1544 .claims
1545 .lock()
1546 .unwrap_or_else(std::sync::PoisonError::into_inner);
1547 claims.remove(session_id);
1548 }
1549}
1550
1551// ---------------------------------------------------------------------------
1552// InteractionStreamHandle (U6 / dogma #5)
1553// ---------------------------------------------------------------------------
1554
1555/// Interaction stream lifecycle DSL handle.
1556///
1557/// Routes the reservation/attach/completion/expire/close-early lifecycle of
1558/// a streamed interaction into the MeerkatMachine DSL's `interaction_streams`
1559/// substate map. The shell-side `interaction_stream_registry` projects
1560/// sender/receiver channels off this map; terminal transitions emit
1561/// [`InteractionStreamCleanupObserver::on_interaction_stream_cleanup`], which
1562/// the comms runtime uses to drop the channel projection.
1563///
1564/// Reservation TTL is shell-owned mechanics: the runtime holds the timestamp
1565/// and decides when to fire `expired`. Every state-meaning decision (is the
1566/// reservation still claimable? has the consumer attached? did a terminal
1567/// event win the race?) lives in the DSL.
1568pub trait InteractionStreamHandle: Send + Sync {
1569 /// Fire `InteractionStreamReserved { corr_id }`.
1570 ///
1571 /// Guard: `corr_id` is not already in `interaction_streams`. Rejected
1572 /// duplicates surface as [`DslTransitionError`] so the shell can refuse
1573 /// to register two channels under the same key.
1574 fn reserved(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1575
1576 /// Fire `InteractionStreamAttached { corr_id }`.
1577 ///
1578 /// Guard: state is `Reserved`. Rejected if the reservation already
1579 /// expired, the consumer already attached, or the entry never existed.
1580 fn attached(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1581
1582 /// Fire `InteractionStreamCompleted { corr_id }`.
1583 ///
1584 /// Guard: state is `Attached`. Terminal — emits the cleanup effect.
1585 fn completed(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1586
1587 /// Fire `InteractionStreamExpired { corr_id }`.
1588 ///
1589 /// Guard: state is `Reserved`. Terminal — emits the cleanup effect.
1590 fn expired(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1591
1592 /// Fire `InteractionStreamClosedEarly { corr_id }`.
1593 ///
1594 /// Guard: state is `Attached`. Terminal — emits the cleanup effect.
1595 fn closed_early(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError>;
1596
1597 /// Read the DSL-owned state for a given correlation id, if any.
1598 ///
1599 /// Returns `None` when the entry has already been removed (terminal or
1600 /// never reserved). Active states (`Reserved`, `Attached`) surface as
1601 /// `Some(..)`; terminal variants surface only via the
1602 /// `InteractionStreamStateChanged` effect, never on the active map.
1603 fn state(&self, corr_id: PeerCorrelationId) -> Option<InteractionStreamState>;
1604
1605 /// Install a projection-cleanup observer for the interaction stream
1606 /// lifecycle. The runtime handle invokes the observer whenever a DSL
1607 /// transition emits `InteractionStreamCleanup`, closing the loop
1608 /// "terminal transition → effect → shell projection cleanup".
1609 fn install_cleanup_observer(&self, observer: Arc<dyn InteractionStreamCleanupObserver>);
1610}
1611
1612/// Observer invoked by [`InteractionStreamHandle`] when a DSL
1613/// `InteractionStreamCleanup` effect is emitted.
1614///
1615/// Shell-owned projection consumers (the comms runtime's
1616/// `interaction_stream_registry`) implement this to drop channel entries
1617/// keyed on the terminated correlation id. Runtime handles sample the
1618/// observer under the same authority lock as the transition that emitted
1619/// the effect, then dispatch after releasing the lock.
1620pub trait InteractionStreamCleanupObserver: Send + Sync {
1621 /// Called once per emitted `InteractionStreamCleanup { corr_id }` effect.
1622 ///
1623 /// Idempotent in the well-formed case (terminal transitions remove the
1624 /// map entry so subsequent fires fail the guard), but observers should
1625 /// tolerate redundant calls defensively.
1626 fn on_interaction_stream_cleanup(&self, corr_id: PeerCorrelationId);
1627}
1628
1629// ---------------------------------------------------------------------------
1630// RealtimeProductTurnHandle (U9 / dogma #4 — realtime WS lifecycle owner)
1631// ---------------------------------------------------------------------------
1632
1633/// Realtime provider-session projection freshness (dogma round 2, U-C /
1634/// dogma #1, #3, #13, #20).
1635///
1636/// Canonical typed mirror of the DSL-owned `realtime_projection_freshness`
1637/// field. Replaces the socket-local `ProjectionFreshness` enum previously
1638/// owned by `meerkat-rpc::realtime_ws`. The realtime-WS dispatcher reads
1639/// this enum via [`RealtimeProductTurnHandle::projection_freshness`] and
1640/// fires typed inputs for each observer tick, turn end, and refresh drain
1641/// — no shell-local freshness state, no socket-local observer queue.
1642///
1643/// The `frontier_ms` companion carries the monotonic watermark: it is the
1644/// `baseline_ms` while `Clean`, or the `new_at_ms` of the pending advance
1645/// while `StaleDeferred` / `StaleImmediate`.
1646#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
1647pub enum RealtimeProjectionFreshness {
1648 /// Provider projection matches canonical session state at the
1649 /// frontier watermark. No refresh owed.
1650 #[default]
1651 Clean,
1652 /// Canonical state advanced while the provider turn was live; refresh
1653 /// is blocked until the turn terminates so barge-in continuity isn't
1654 /// broken.
1655 StaleDeferred,
1656 /// Refresh owed at the next drain site (idle input-chunk arrival or
1657 /// turn end).
1658 StaleImmediate,
1659}
1660
1661/// Typed classification of what a clean provider-session close means for
1662/// the realtime channel's reconnect behavior (dogma round 2, U-C /
1663/// dogma #1, #3, #18, #20).
1664///
1665/// Replaces the shell-local boolean pair
1666/// (`client_has_submitted_input`, `last_turn_terminally_completed`) that
1667/// used to co-decide `needs_reattach` in `meerkat-rpc::realtime_ws`. The
1668/// realtime-WS dispatcher reads this via
1669/// [`RealtimeProductTurnHandle::reconnect_policy_on_clean_close`] at the
1670/// clean-close branch point and dispatches on the typed value.
1671#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
1672pub enum RealtimeReconnectPolicy {
1673 /// A clean close has no in-flight work to recover — either the
1674 /// client never submitted input, or the last observed turn reached a
1675 /// terminal completion.
1676 #[default]
1677 CleanExit,
1678 /// The client issued work that has not yet reached a terminal turn
1679 /// completion; a clean close is a mid-work disconnect and the
1680 /// channel should proactively reattach.
1681 ReattachAndRecover,
1682}
1683
1684/// Observer invoked by [`RealtimeProductTurnHandle`] when the DSL emits a
1685/// `RealtimeProjectionFreshnessChanged` effect (dogma round 2, U-C).
1686///
1687/// Shell-owned consumers (the realtime-WS dispatch loop) implement this to
1688/// wake the socket's `tokio::select!` loop so it can read the new freshness
1689/// state and drain if necessary. Runtime handles sample the observer under
1690/// the same authority lock as the transition that emitted the effect, then
1691/// dispatch after releasing the lock so future observers can safely
1692/// re-enter the DSL if needed.
1693pub trait RealtimeProjectionFreshnessObserver: Send + Sync {
1694 /// Called once per emitted `RealtimeProjectionFreshnessChanged`
1695 /// effect. `new_freshness` is the post-transition discriminant;
1696 /// `frontier_ms` is the post-transition monotonic watermark.
1697 fn on_realtime_projection_freshness_changed(
1698 &self,
1699 new_freshness: RealtimeProjectionFreshness,
1700 frontier_ms: u64,
1701 );
1702}
1703
1704/// Realtime product-turn lifecycle phase (U9 / dogma #4).
1705///
1706/// Canonical typed mirror of the DSL-owned
1707/// `realtime_product_turn_phase` field. The realtime-WS shell reads this
1708/// enum via [`RealtimeProductTurnHandle::current_phase`] instead of
1709/// tracking `product_turn_in_flight` / `product_turn_committed` /
1710/// `product_output_started` as shell locals.
1711#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
1712pub enum RealtimeProductTurnPhase {
1713 /// No turn in flight on the provider session.
1714 #[default]
1715 Idle,
1716 /// Input accepted by the provider; no `TurnCommitted` or output
1717 /// delta observed yet.
1718 AwaitingProgress,
1719 /// `TurnCommitted` arrived but no output delta / tool call yet.
1720 Committed,
1721 /// Output delta / tool call arrived but `TurnCommitted` not yet.
1722 OutputStarted,
1723 /// Both `TurnCommitted` and output delta observed — preemption on a
1724 /// subsequent input chunk is semantically sound.
1725 Preemptible,
1726}
1727
1728/// Realtime product-turn lifecycle DSL handle (U9 / dogma #4).
1729///
1730/// Routes every lifecycle observation (input accepted, TurnCommitted,
1731/// output delta / tool call, interrupt, logical turn terminal) into the
1732/// session's MeerkatMachine DSL. Idempotent fires (e.g., a second
1733/// `turn_committed()` call on the same turn) are guard-rejected at the
1734/// DSL layer and surfaced as `Ok(false)` so the shell can fire
1735/// unconditionally.
1736///
1737/// The shell reads [`current_phase`] / [`is_in_flight`] /
1738/// [`should_preempt_on_input`] for routing decisions; no shell-local
1739/// boolean state, no helper-local event matching.
1740pub trait RealtimeProductTurnHandle: Send + Sync {
1741 /// Fire `ProductTurnInFlight`. Returns `Ok(true)` when the turn
1742 /// advanced from `Idle`; `Ok(false)` when already in-flight.
1743 fn turn_in_flight(&self) -> Result<bool, DslTransitionError>;
1744
1745 /// Fire `ProductTurnCommitted`. Returns `Ok(true)` when the phase
1746 /// advanced (`AwaitingProgress` → `Committed`, or `OutputStarted` →
1747 /// `Preemptible`); `Ok(false)` otherwise.
1748 fn turn_committed(&self) -> Result<bool, DslTransitionError>;
1749
1750 /// Fire `ProductOutputStarted`. Returns `Ok(true)` when the phase
1751 /// advanced (`AwaitingProgress` → `OutputStarted`, or `Committed` →
1752 /// `Preemptible`); `Ok(false)` otherwise.
1753 fn output_started(&self) -> Result<bool, DslTransitionError>;
1754
1755 /// Fire `ProductTurnInterrupted`. Returns `Ok(true)` when the
1756 /// output-started milestone was cleared (`Preemptible` →
1757 /// `Committed`, or `OutputStarted` → `AwaitingProgress`); `Ok(false)`
1758 /// otherwise.
1759 fn turn_interrupted(&self) -> Result<bool, DslTransitionError>;
1760
1761 /// Fire `ProductTurnTerminal`. Returns `Ok(true)` when the phase
1762 /// advanced back to `Idle` from any non-`Idle` phase; `Ok(false)`
1763 /// when already `Idle`.
1764 fn turn_terminal(&self) -> Result<bool, DslTransitionError>;
1765
1766 /// Read the current typed phase from the DSL.
1767 fn current_phase(&self) -> RealtimeProductTurnPhase;
1768
1769 /// True iff the provider session has an active turn (any non-`Idle`
1770 /// phase). Used by the realtime-WS projection-refresh gate.
1771 fn is_in_flight(&self) -> bool {
1772 self.current_phase() != RealtimeProductTurnPhase::Idle
1773 }
1774
1775 /// True iff an input chunk arriving now should preempt the current
1776 /// provider-managed turn — i.e. the turn has reached `Preemptible`.
1777 ///
1778 /// Preemption is only sound once the committed turn has visible
1779 /// assistant-side progress. Before that, the provider remains the
1780 /// semantic owner of whether the current input stream still belongs
1781 /// to the same utterance; preempting early would cancel a response
1782 /// before the assistant has had a chance to speak.
1783 fn should_preempt_on_input(&self) -> bool {
1784 self.current_phase() == RealtimeProductTurnPhase::Preemptible
1785 }
1786
1787 // ---- Projection freshness (dogma round 2, U-C / dogma #1, #3, #13, #20) ----
1788
1789 /// Fire `RealtimeProjectionAdvanceObserved { advanced_at_ms }` — the
1790 /// shell received a `SessionContextAdvanced` observer tick. The DSL
1791 /// decides the resulting freshness based on the current product-turn
1792 /// phase (live → `StaleDeferred`; idle → `StaleImmediate`).
1793 ///
1794 /// Returns `Ok(true)` when the advance transitioned state,
1795 /// `Ok(false)` when the DSL monotonic guard rejected the tick as
1796 /// non-advancing (or when the caller fired redundantly below the
1797 /// current frontier).
1798 fn projection_advance_observed(&self, advanced_at_ms: u64) -> Result<bool, DslTransitionError>;
1799
1800 /// Fire `RealtimeProjectionRefreshed { observed_ms }` — the shell
1801 /// completed a provider-session refresh drain. Returns to `Clean`.
1802 fn projection_refreshed(&self, observed_ms: u64) -> Result<bool, DslTransitionError>;
1803
1804 /// Fire `RealtimeProjectionBaselineObserved { observed_ms }` — the shell
1805 /// observed a provider-owned session-context mutation that the currently
1806 /// open provider session already contains. This may advance a clean
1807 /// frontier, but it must not clear `StaleDeferred` / `StaleImmediate`;
1808 /// only a real refresh drain may make stale state clean.
1809 fn projection_baseline_observed(&self, observed_ms: u64) -> Result<bool, DslTransitionError>;
1810
1811 /// Fire `RealtimeProjectionReset { baseline_ms }` — the shell closed
1812 /// or reconnected the product session and wants to re-seed the
1813 /// `Clean` baseline at the current DSL session-context watermark.
1814 fn projection_reset(&self, baseline_ms: u64) -> Result<bool, DslTransitionError>;
1815
1816 /// Read the current typed projection-freshness discriminant from the
1817 /// DSL. The shell reads this at the canonical drain sites (observer
1818 /// tick arrival, input-chunk refresh gate, turn-end drain) to decide
1819 /// whether to rebuild the provider session's projection.
1820 fn projection_freshness(&self) -> RealtimeProjectionFreshness;
1821
1822 /// Read the current monotonic frontier watermark from the DSL.
1823 fn projection_frontier_ms(&self) -> u64;
1824
1825 /// Convenience accessor: `true` iff the current freshness is
1826 /// `StaleImmediate` — the shell uses this at drain sites.
1827 fn is_projection_stale_immediate(&self) -> bool {
1828 self.projection_freshness() == RealtimeProjectionFreshness::StaleImmediate
1829 }
1830
1831 /// Install a typed observer for `RealtimeProjectionFreshnessChanged`
1832 /// effect emission. Implementations without an installed observer
1833 /// drop the effect (standalone / WASM paths).
1834 fn install_projection_freshness_observer(
1835 &self,
1836 observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
1837 );
1838
1839 /// Atomically install a typed observer and return the current
1840 /// freshness snapshot as a single authority read. Implementations
1841 /// MUST hold the same authority lock that projection transitions use
1842 /// for both observer installation and the `(freshness, frontier)`
1843 /// sample, so no `RealtimeProjectionFreshnessChanged` effect can
1844 /// slip between "observer visible" and "socket seeded from the DSL".
1845 ///
1846 /// Callers that only need best-effort notification may use
1847 /// [`Self::install_projection_freshness_observer`]. Realtime socket
1848 /// bindings should prefer this method so wake registration and the
1849 /// typed DSL snapshot share one ordering point.
1850 fn install_projection_freshness_observer_with_snapshot(
1851 &self,
1852 observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
1853 ) -> (RealtimeProjectionFreshness, u64) {
1854 self.install_projection_freshness_observer(observer);
1855 (self.projection_freshness(), self.projection_frontier_ms())
1856 }
1857
1858 // ---- Reconnect policy (dogma round 2, U-C / dogma #1, #3, #18, #20) ----
1859
1860 /// Fire `ClassifyRealtimeClientInputSubmitted` — the client's input
1861 /// chunk was accepted by the provider session. Returns `Ok(false)`
1862 /// when already `ReattachAndRecover`.
1863 fn classify_client_input_submitted(&self) -> Result<bool, DslTransitionError>;
1864
1865 /// Fire `ClassifyRealtimeMidTurnActivity` — the provider session
1866 /// emitted a non-terminal activity (e.g. a tool call) inside a live
1867 /// turn. Returns `Ok(false)` when already `ReattachAndRecover`.
1868 fn classify_mid_turn_activity(&self) -> Result<bool, DslTransitionError>;
1869
1870 /// Fire `ClassifyRealtimeTurnTerminated` — the current turn reached
1871 /// a logical terminal stop reason. Also folds in the pending
1872 /// `StaleDeferred → StaleImmediate` promotion so the turn-end drain
1873 /// site picks up any pending advance.
1874 fn classify_turn_terminated(&self) -> Result<bool, DslTransitionError>;
1875
1876 /// Read the typed reconnect-policy classification. The shell reads
1877 /// this at the clean-close branch to decide whether a clean provider-
1878 /// session close should trigger a proactive reattach
1879 /// (`ReattachAndRecover`) or close the channel cleanly
1880 /// (`CleanExit`).
1881 fn reconnect_policy_on_clean_close(&self) -> RealtimeReconnectPolicy;
1882}
1883
1884#[cfg(test)]
1885#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
1886mod tests {
1887 use super::{
1888 ExternalToolSurfaceEffect, ExternalToolSurfaceFailureCause, ExternalToolSurfaceInput,
1889 PeerConversationProjection, PeerResponseProgressProjectionPhase,
1890 PeerResponseTerminalCorrelationId, PeerResponseTerminalDisplayIdentity,
1891 PeerResponseTerminalFact, PeerResponseTerminalFactError,
1892 PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
1893 PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
1894 PeerResponseTerminalTransportIdentity, peer_response_terminal_context_key,
1895 };
1896 use crate::tool_scope::{ExternalToolSurfaceDeltaOperation, ExternalToolSurfaceDeltaPhase};
1897
1898 #[test]
1899 fn external_tool_surface_pending_failure_cause_projects_external_code() {
1900 let input = ExternalToolSurfaceInput::MarkPendingFailed {
1901 surface_id: "alpha".to_owned(),
1902 pending_task_sequence: 7,
1903 staged_intent_sequence: 11,
1904 cause: ExternalToolSurfaceFailureCause::PendingFailed,
1905 };
1906
1907 let ExternalToolSurfaceInput::MarkPendingFailed { cause, .. } = input else {
1908 panic!("constructed MarkPendingFailed input");
1909 };
1910 assert_eq!(cause, ExternalToolSurfaceFailureCause::PendingFailed);
1911 assert_eq!(cause.as_str(), "pending_failed");
1912 assert_eq!(
1913 serde_json::to_value(cause).expect("serialize failure cause"),
1914 serde_json::json!("pending_failed")
1915 );
1916
1917 let effect = ExternalToolSurfaceEffect::EmitExternalToolDelta {
1918 surface_id: "alpha".to_owned(),
1919 operation: ExternalToolSurfaceDeltaOperation::Add,
1920 phase: ExternalToolSurfaceDeltaPhase::Failed,
1921 cause: Some(cause),
1922 };
1923 assert!(matches!(
1924 effect,
1925 ExternalToolSurfaceEffect::EmitExternalToolDelta {
1926 cause: Some(ExternalToolSurfaceFailureCause::PendingFailed),
1927 ..
1928 }
1929 ));
1930 }
1931
1932 #[test]
1933 fn peer_terminal_projection_owns_prompt_and_context_key() {
1934 let route_id = "550e8400-e29b-41d4-a716-446655440000";
1935 let route_identity =
1936 PeerResponseTerminalRouteIdentity::parse(route_id).expect("route identity");
1937 let correlation_id =
1938 PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
1939 .expect("correlation id");
1940 let projection = PeerConversationProjection::ResponseTerminal {
1941 fact: PeerResponseTerminalFact::new(
1942 PeerResponseTerminalSource::new(
1943 Some(
1944 PeerResponseTerminalTransportIdentity::parse("transport-runtime-1")
1945 .expect("transport identity"),
1946 ),
1947 route_identity,
1948 PeerResponseTerminalDisplayIdentity::parse("Analyst")
1949 .expect("display identity"),
1950 ),
1951 correlation_id,
1952 PeerResponseTerminalProjectionStatus::Completed,
1953 PeerResponseTerminalRenderPayload::new(Some(serde_json::json!({
1954 "request_intent": "checksum_token",
1955 "request_subject": "alpha beta gamma",
1956 "token": "birch seventeen"
1957 }))),
1958 ),
1959 };
1960
1961 assert_eq!(
1962 projection.context_key().as_deref(),
1963 Some(
1964 "peer_response_terminal:550e8400-e29b-41d4-a716-446655440000:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
1965 )
1966 );
1967 assert_eq!(
1968 projection.prompt_text(),
1969 "[SYSTEM NOTICE][PEER_RESPONSE_TERMINAL] Correlated peer response from Analyst. Request ID: 018f6f79-7a82-7c4e-a552-a3b86f9630f1. Status: completed. Result: {\n \"request_intent\": \"checksum_token\",\n \"request_subject\": \"alpha beta gamma\",\n \"token\": \"birch seventeen\"\n}."
1970 );
1971 }
1972
1973 #[test]
1974 fn peer_progress_projection_formats_phase_from_shared_seam() {
1975 let projection = PeerConversationProjection::ResponseProgress {
1976 peer_id: "operator-rt".into(),
1977 request_id: "req-789".into(),
1978 phase: PeerResponseProgressProjectionPhase::PartialResult,
1979 payload: Some(serde_json::json!({ "chunk": "alpha" })),
1980 };
1981
1982 assert_eq!(projection.context_key(), None);
1983 assert_eq!(
1984 projection.prompt_text(),
1985 "[SYSTEM NOTICE][PEER_RESPONSE_PROGRESS] Correlated peer response progress from operator-rt. Request ID: req-789. Phase: partial_result. Payload: {\n \"chunk\": \"alpha\"\n}."
1986 );
1987 }
1988
1989 #[test]
1990 fn peer_terminal_context_key_helper_stays_canonical() {
1991 let route_id = "550e8400-e29b-41d4-a716-446655440000";
1992 let route_identity =
1993 PeerResponseTerminalRouteIdentity::parse(route_id).expect("route identity");
1994 let correlation_id =
1995 PeerResponseTerminalCorrelationId::parse("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
1996 .expect("correlation id");
1997 assert_eq!(
1998 peer_response_terminal_context_key(&route_identity, correlation_id),
1999 "peer_response_terminal:550e8400-e29b-41d4-a716-446655440000:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
2000 );
2001 }
2002
2003 #[test]
2004 fn peer_terminal_route_identity_rejects_display_name_alias() {
2005 assert!(matches!(
2006 PeerResponseTerminalRouteIdentity::parse("analyst-rt"),
2007 Err(PeerResponseTerminalFactError::InvalidRouteIdentity)
2008 ));
2009 }
2010}