Skip to main content

meerkat_runtime/
accept.rs

1//! §14 AcceptOutcome — result of accepting an input.
2
3use meerkat_core::lifecycle::InputId;
4use meerkat_core::types::HandlingMode;
5use serde::{Deserialize, Serialize};
6use std::fmt;
7
8use crate::input_state::{InputState, InputStateSeed};
9use crate::meerkat_machine::dsl as mm_dsl;
10use crate::policy::PolicyDecision;
11use crate::runtime_state::RuntimeState;
12
13// `AcceptOutcome` is a domain envelope. The wire shape lives in
14// `meerkat-contracts::wire::runtime::RuntimeAcceptResult` and is materialized
15// by per-surface handlers (see `meerkat-rpc::handlers::runtime`). The envelope
16// therefore carries the live `InputState` shell (no Serialize/Deserialize) and
17// a typed `RejectReason` that retains its own serde derives because rejection
18// payloads are translated into wire-facing strings.
19
20/// Machine-owned queue action for an admitted input.
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum AdmissionQueueAction {
23    None,
24    EnqueueTo { target: HandlingMode },
25    EnqueueFront { target: HandlingMode },
26}
27
28/// Machine-owned action against an existing queued input.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum ExistingQueuedAdmissionAction {
31    Coalesce { existing_id: InputId },
32    Supersede { existing_id: InputId },
33}
34
35/// Machine-owned admission plan for an accepted input.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum AdmissionPlan {
38    ConsumedOnAccept,
39    Queued {
40        persist_and_queue: bool,
41        queue_action: AdmissionQueueAction,
42        existing_action: Option<ExistingQueuedAdmissionAction>,
43    },
44}
45
46/// Coarse accept flags used by the MeerkatMachine DSL's
47/// `AcceptWithCompletion` branches.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct CoarseAdmissionFlags {
50    pub request_immediate_processing: bool,
51    pub interrupt_yielding: bool,
52    pub wake_if_idle: bool,
53}
54
55/// Typed machine input that authorized a live admission resolution.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub(crate) struct MachineAdmissionAuthority {
58    input_id: String,
59    input_kind: mm_dsl::AdmissionInputKind,
60    requested_lane: Option<mm_dsl::InputLane>,
61    continuation_kind: mm_dsl::AdmissionContinuationKind,
62    silent_intent_match: bool,
63    existing_superseded_input_id: Option<String>,
64    runtime_running: bool,
65    active_turn_boundary_available: bool,
66    without_wake: bool,
67}
68
69impl MachineAdmissionAuthority {
70    #[allow(clippy::too_many_arguments)]
71    pub(crate) fn new(
72        input_id: String,
73        input_kind: mm_dsl::AdmissionInputKind,
74        requested_lane: Option<mm_dsl::InputLane>,
75        continuation_kind: mm_dsl::AdmissionContinuationKind,
76        silent_intent_match: bool,
77        existing_superseded_input_id: Option<InputId>,
78        runtime_running: bool,
79        active_turn_boundary_available: bool,
80        without_wake: bool,
81    ) -> Self {
82        Self {
83            input_id,
84            input_kind,
85            requested_lane,
86            continuation_kind,
87            silent_intent_match,
88            existing_superseded_input_id: existing_superseded_input_id.map(|id| id.to_string()),
89            runtime_running,
90            active_turn_boundary_available,
91            without_wake,
92        }
93    }
94
95    pub(crate) fn input_id(&self) -> &str {
96        &self.input_id
97    }
98
99    pub(crate) fn without_wake(&self) -> bool {
100        self.without_wake
101    }
102
103    pub(crate) fn active_turn_boundary_available(&self) -> bool {
104        self.active_turn_boundary_available
105    }
106
107    pub(crate) fn to_dsl_input(&self) -> mm_dsl::MeerkatMachineInput {
108        mm_dsl::MeerkatMachineInput::ResolveAdmissionPlan {
109            input_id: self.input_id.clone(),
110            input_kind: self.input_kind,
111            requested_lane: self.requested_lane,
112            continuation_kind: self.continuation_kind,
113            silent_intent_match: self.silent_intent_match,
114            existing_superseded_input_id: self.existing_superseded_input_id.clone(),
115            runtime_running: self.runtime_running,
116            active_turn_boundary_available: self.active_turn_boundary_available,
117            without_wake: self.without_wake,
118        }
119    }
120}
121
122/// Machine-owned resolution of an accepted input's semantic admission path.
123// Cannot derive `Eq`: `RuntimeInputProjection` carries a typed
124// `peer_response_terminal` fact whose render payload is a `serde_json::Value`.
125#[derive(Debug, Clone, PartialEq)]
126pub struct ResolvedAdmission {
127    policy: PolicyDecision,
128    handling_mode: HandlingMode,
129    runtime_semantics: crate::ingress_types::RuntimeInputSemantics,
130    primitive_projection: crate::ingress_types::RuntimeInputProjection,
131    admission_plan: AdmissionPlan,
132    coarse_flags: CoarseAdmissionFlags,
133    requires_active_pre_admission: bool,
134    authority: MachineAdmissionAuthority,
135}
136
137impl ResolvedAdmission {
138    #[allow(clippy::too_many_arguments)]
139    pub(crate) fn from_machine_resolution(
140        policy: PolicyDecision,
141        handling_mode: HandlingMode,
142        runtime_semantics: crate::ingress_types::RuntimeInputSemantics,
143        primitive_projection: crate::ingress_types::RuntimeInputProjection,
144        admission_plan: AdmissionPlan,
145        coarse_flags: CoarseAdmissionFlags,
146        requires_active_pre_admission: bool,
147        authority: MachineAdmissionAuthority,
148    ) -> Self {
149        Self {
150            policy,
151            handling_mode,
152            runtime_semantics,
153            primitive_projection,
154            admission_plan,
155            coarse_flags,
156            requires_active_pre_admission,
157            authority,
158        }
159    }
160
161    pub(crate) fn coarse_flags(&self) -> CoarseAdmissionFlags {
162        self.coarse_flags
163    }
164
165    pub(crate) fn requires_active_runtime_pre_admission(&self) -> bool {
166        self.requires_active_pre_admission
167    }
168
169    pub(crate) fn stages_run_boundary(&self) -> bool {
170        self.policy.apply_mode == crate::policy::ApplyMode::StageRunBoundary
171    }
172
173    pub(crate) fn authority(&self) -> &MachineAdmissionAuthority {
174        &self.authority
175    }
176
177    pub(crate) fn into_parts(
178        self,
179    ) -> (
180        PolicyDecision,
181        HandlingMode,
182        crate::ingress_types::RuntimeInputSemantics,
183        crate::ingress_types::RuntimeInputProjection,
184        AdmissionPlan,
185    ) {
186        (
187            self.policy,
188            self.handling_mode,
189            self.runtime_semantics,
190            self.primitive_projection,
191            self.admission_plan,
192        )
193    }
194}
195
196/// Typed reason why an input was rejected at the accept boundary.
197#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
198#[serde(tag = "reject_type", rename_all = "snake_case")]
199#[non_exhaustive]
200pub enum RejectReason {
201    /// Runtime is not in a state that accepts input (e.g. stopped, destroyed).
202    NotReady {
203        /// The runtime state that caused the rejection.
204        state: RuntimeState,
205    },
206    /// Input failed durability validation.
207    DurabilityViolation {
208        /// Description of the violation.
209        detail: String,
210    },
211    /// Peer input carried a forbidden handling_mode.
212    PeerHandlingModeInvalid {
213        /// Description of the violation.
214        detail: String,
215    },
216    /// Peer response terminal fact failed typed validation.
217    PeerResponseTerminalInvalid {
218        /// Description of the violation.
219        detail: String,
220    },
221}
222
223impl fmt::Display for RejectReason {
224    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225        match self {
226            Self::NotReady { state } => {
227                write!(f, "runtime not accepting input while in state: {state}")
228            }
229            Self::DurabilityViolation { detail } => write!(f, "{detail}"),
230            Self::PeerHandlingModeInvalid { detail } => write!(f, "{detail}"),
231            Self::PeerResponseTerminalInvalid { detail } => write!(f, "{detail}"),
232        }
233    }
234}
235
236/// Outcome of `RuntimeDriver::accept_input()`.
237///
238/// Domain envelope returned to in-process callers. Surface crates translate it
239/// into the wire shape (`RuntimeAcceptResult` in `meerkat-contracts`) before
240/// emitting it on the network, so this type intentionally has no
241/// `Serialize`/`Deserialize` derives.
242#[derive(Debug, Clone)]
243#[non_exhaustive]
244#[allow(clippy::large_enum_variant)]
245pub enum AcceptOutcome {
246    /// Input was accepted and processing has begun.
247    Accepted {
248        /// The assigned input ID.
249        input_id: InputId,
250        /// The policy decision applied to this input.
251        policy: PolicyDecision,
252        /// Current input state.
253        state: InputState,
254        /// Machine-owned lifecycle seed paired with `state`.
255        seed: InputStateSeed,
256    },
257    /// Input was deduplicated (idempotency key matched an existing input).
258    Deduplicated {
259        /// The new input ID that was deduplicated.
260        input_id: InputId,
261        /// The existing input ID that was matched.
262        existing_id: InputId,
263    },
264    /// Input was rejected (validation failed, durability violation, etc.).
265    Rejected {
266        /// Why the input was rejected.
267        reason: RejectReason,
268    },
269}
270
271impl AcceptOutcome {
272    /// Check if the input was accepted.
273    pub fn is_accepted(&self) -> bool {
274        matches!(self, Self::Accepted { .. })
275    }
276
277    /// Check if the input was deduplicated.
278    pub fn is_deduplicated(&self) -> bool {
279        matches!(self, Self::Deduplicated { .. })
280    }
281
282    /// Check if the input was rejected.
283    pub fn is_rejected(&self) -> bool {
284        matches!(self, Self::Rejected { .. })
285    }
286}
287
288/// Derive the handling mode from a resolved policy decision.
289pub fn handling_mode_from_policy(policy: &PolicyDecision) -> HandlingMode {
290    match policy.routing_disposition {
291        crate::policy::RoutingDisposition::Steer | crate::policy::RoutingDisposition::Immediate => {
292            // Immediate routing must use the steer lane so runtime-owned
293            // semantic facts (for example terminal peer responses) cannot get
294            // stranded behind ordinary queued prompts before their immediate
295            // apply boundary is drained. This preserves the checked-in policy
296            // contract without upgrading WakeIfIdle into an active-turn
297            // interrupt on this branch.
298            HandlingMode::Steer
299        }
300        _ => HandlingMode::Queue,
301    }
302}
303
304#[cfg(test)]
305#[allow(clippy::unwrap_used)]
306mod tests {
307    use super::*;
308    use crate::identifiers::PolicyVersion;
309    use crate::policy::{
310        ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
311    };
312
313    #[test]
314    fn accepted_classifier() {
315        let outcome = AcceptOutcome::Accepted {
316            input_id: InputId::new(),
317            policy: PolicyDecision {
318                apply_mode: ApplyMode::StageRunStart,
319                wake_mode: WakeMode::WakeIfIdle,
320                queue_mode: QueueMode::Fifo,
321                consume_point: ConsumePoint::OnRunComplete,
322                drain_policy: DrainPolicy::QueueNextTurn,
323                routing_disposition: RoutingDisposition::Queue,
324                record_transcript: true,
325                emit_operator_content: true,
326                policy_version: PolicyVersion(1),
327            },
328            state: InputState::new_accepted(InputId::new()),
329            seed: InputStateSeed::new_accepted(),
330        };
331        assert!(outcome.is_accepted());
332        assert!(!outcome.is_deduplicated());
333        assert!(!outcome.is_rejected());
334    }
335
336    #[test]
337    fn deduplicated_classifier() {
338        let outcome = AcceptOutcome::Deduplicated {
339            input_id: InputId::new(),
340            existing_id: InputId::new(),
341        };
342        assert!(!outcome.is_accepted());
343        assert!(outcome.is_deduplicated());
344        assert!(!outcome.is_rejected());
345    }
346
347    #[test]
348    fn rejected_classifier() {
349        let outcome = AcceptOutcome::Rejected {
350            reason: RejectReason::DurabilityViolation {
351                detail: "durability violation".into(),
352            },
353        };
354        assert!(!outcome.is_accepted());
355        assert!(!outcome.is_deduplicated());
356        assert!(outcome.is_rejected());
357    }
358
359    #[test]
360    fn reject_reason_display() {
361        let not_ready = RejectReason::NotReady {
362            state: RuntimeState::Stopped,
363        };
364        assert_eq!(
365            not_ready.to_string(),
366            "runtime not accepting input while in state: stopped"
367        );
368
369        let durability = RejectReason::DurabilityViolation {
370            detail: "Derived durability forbidden for prompt".into(),
371        };
372        assert_eq!(
373            durability.to_string(),
374            "Derived durability forbidden for prompt"
375        );
376
377        let peer = RejectReason::PeerHandlingModeInvalid {
378            detail: "handling_mode is forbidden on ResponseProgress peer inputs".into(),
379        };
380        assert_eq!(
381            peer.to_string(),
382            "handling_mode is forbidden on ResponseProgress peer inputs"
383        );
384
385        let terminal = RejectReason::PeerResponseTerminalInvalid {
386            detail: "correlation id cannot be empty".into(),
387        };
388        assert_eq!(terminal.to_string(), "correlation id cannot be empty");
389    }
390
391    #[test]
392    fn reject_reason_serde_round_trip() {
393        let reasons = vec![
394            RejectReason::NotReady {
395                state: RuntimeState::Destroyed,
396            },
397            RejectReason::DurabilityViolation {
398                detail: "external derived".into(),
399            },
400            RejectReason::PeerHandlingModeInvalid {
401                detail: "forbidden".into(),
402            },
403            RejectReason::PeerResponseTerminalInvalid {
404                detail: "bad terminal".into(),
405            },
406        ];
407        for reason in reasons {
408            let json = serde_json::to_value(&reason).unwrap();
409            let parsed: RejectReason = serde_json::from_value(json).unwrap();
410            assert_eq!(parsed, reason);
411        }
412    }
413
414    #[test]
415    fn immediate_routing_uses_steer_handling_mode() {
416        let policy = PolicyDecision {
417            apply_mode: ApplyMode::InjectNow,
418            wake_mode: WakeMode::WakeIfIdle,
419            queue_mode: QueueMode::None,
420            consume_point: ConsumePoint::OnApply,
421            drain_policy: DrainPolicy::Immediate,
422            routing_disposition: RoutingDisposition::Immediate,
423            record_transcript: true,
424            emit_operator_content: true,
425            policy_version: PolicyVersion(1),
426        };
427
428        assert_eq!(handling_mode_from_policy(&policy), HandlingMode::Steer);
429    }
430}