Skip to main content

meerkat_runtime/
accept.rs

1//! §14 AcceptOutcome — result of accepting an input.
2
3use meerkat_core::lifecycle::InputId;
4use meerkat_core::types::HandlingMode;
5use serde::{Deserialize, Serialize};
6use std::fmt;
7
8use crate::driver::PostAdmissionSignal;
9use crate::input::Input;
10use crate::input_state::InputState;
11use crate::policy::PolicyDecision;
12use crate::policy_table::DefaultPolicyTable;
13use crate::runtime_state::RuntimeState;
14
15// `AcceptOutcome` is a domain envelope. The wire shape lives in
16// `meerkat-contracts::wire::runtime::RuntimeAcceptResult` and is materialized
17// by per-surface handlers (see `meerkat-rpc::handlers::runtime`). The envelope
18// therefore carries the live `InputState` shell (no Serialize/Deserialize) and
19// a typed `RejectReason` that retains its own serde derives because rejection
20// payloads are translated into wire-facing strings.
21
22/// Machine-owned queue action for an admitted input.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub enum AdmissionQueueAction {
25    None,
26    EnqueueTo { target: HandlingMode },
27    EnqueueFront { target: HandlingMode },
28}
29
30/// Machine-owned action against an existing queued input.
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum ExistingQueuedAdmissionAction {
33    Coalesce { existing_id: InputId },
34    Supersede { existing_id: InputId },
35}
36
37/// Machine-owned admission plan for an accepted input.
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum AdmissionPlan {
40    ConsumedOnAccept,
41    Queued {
42        persist_and_queue: bool,
43        queue_action: AdmissionQueueAction,
44        existing_action: Option<ExistingQueuedAdmissionAction>,
45    },
46}
47
48/// Coarse accept flags used by the MeerkatMachine DSL's
49/// `AcceptWithCompletion` branches.
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct CoarseAdmissionFlags {
52    pub request_immediate_processing: bool,
53    pub interrupt_yielding: bool,
54    pub wake_if_idle: bool,
55}
56
57/// Machine-owned resolution of an accepted input's semantic admission path.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct ResolvedAdmission {
60    pub policy: PolicyDecision,
61    pub handling_mode: HandlingMode,
62    pub runtime_semantics: crate::ingress_types::RuntimeInputSemantics,
63    pub primitive_projection: crate::ingress_types::RuntimeInputProjection,
64    pub admission_plan: AdmissionPlan,
65    pub coarse_flags: CoarseAdmissionFlags,
66}
67
68/// Typed reason why an input was rejected at the accept boundary.
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
70#[serde(tag = "reject_type", rename_all = "snake_case")]
71#[non_exhaustive]
72pub enum RejectReason {
73    /// Runtime is not in a state that accepts input (e.g. stopped, destroyed).
74    NotReady {
75        /// The runtime state that caused the rejection.
76        state: RuntimeState,
77    },
78    /// Input failed durability validation.
79    DurabilityViolation {
80        /// Description of the violation.
81        detail: String,
82    },
83    /// Peer input carried a forbidden handling_mode.
84    PeerHandlingModeInvalid {
85        /// Description of the violation.
86        detail: String,
87    },
88    /// Peer response terminal fact failed typed validation.
89    PeerResponseTerminalInvalid {
90        /// Description of the violation.
91        detail: String,
92    },
93}
94
95impl fmt::Display for RejectReason {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        match self {
98            Self::NotReady { state } => {
99                write!(f, "runtime not accepting input while in state: {state}")
100            }
101            Self::DurabilityViolation { detail } => write!(f, "{detail}"),
102            Self::PeerHandlingModeInvalid { detail } => write!(f, "{detail}"),
103            Self::PeerResponseTerminalInvalid { detail } => write!(f, "{detail}"),
104        }
105    }
106}
107
108/// Outcome of `RuntimeDriver::accept_input()`.
109///
110/// Domain envelope returned to in-process callers. Surface crates translate it
111/// into the wire shape (`RuntimeAcceptResult` in `meerkat-contracts`) before
112/// emitting it on the network, so this type intentionally has no
113/// `Serialize`/`Deserialize` derives.
114#[derive(Debug, Clone)]
115#[non_exhaustive]
116#[allow(clippy::large_enum_variant)]
117pub enum AcceptOutcome {
118    /// Input was accepted and processing has begun.
119    Accepted {
120        /// The assigned input ID.
121        input_id: InputId,
122        /// The policy decision applied to this input.
123        policy: PolicyDecision,
124        /// Current input state.
125        state: InputState,
126    },
127    /// Input was deduplicated (idempotency key matched an existing input).
128    Deduplicated {
129        /// The new input ID that was deduplicated.
130        input_id: InputId,
131        /// The existing input ID that was matched.
132        existing_id: InputId,
133    },
134    /// Input was rejected (validation failed, durability violation, etc.).
135    Rejected {
136        /// Why the input was rejected.
137        reason: RejectReason,
138    },
139}
140
141impl AcceptOutcome {
142    /// Check if the input was accepted.
143    pub fn is_accepted(&self) -> bool {
144        matches!(self, Self::Accepted { .. })
145    }
146
147    /// Check if the input was deduplicated.
148    pub fn is_deduplicated(&self) -> bool {
149        matches!(self, Self::Deduplicated { .. })
150    }
151
152    /// Check if the input was rejected.
153    pub fn is_rejected(&self) -> bool {
154        matches!(self, Self::Rejected { .. })
155    }
156}
157
158/// Classify the machine-owned admission disposition for an accepted input.
159///
160/// This is the semantic answer to “what happens to an accepted input?” Helpers
161/// should only apply the already-decided queue/lifecycle mutations.
162pub fn admission_plan_from_policy(
163    policy: &PolicyDecision,
164    handling_mode: HandlingMode,
165    existing_superseded_id: Option<InputId>,
166) -> AdmissionPlan {
167    if policy.apply_mode == crate::policy::ApplyMode::Ignore
168        && policy.consume_point == crate::policy::ConsumePoint::OnAccept
169    {
170        return AdmissionPlan::ConsumedOnAccept;
171    }
172
173    if policy.apply_mode == crate::policy::ApplyMode::Ignore {
174        return AdmissionPlan::Queued {
175            persist_and_queue: false,
176            queue_action: AdmissionQueueAction::None,
177            existing_action: None,
178        };
179    }
180
181    match policy.queue_mode {
182        crate::policy::QueueMode::Coalesce => AdmissionPlan::Queued {
183            persist_and_queue: true,
184            queue_action: AdmissionQueueAction::EnqueueTo {
185                target: handling_mode,
186            },
187            existing_action: existing_superseded_id
188                .map(|existing_id| ExistingQueuedAdmissionAction::Coalesce { existing_id }),
189        },
190        crate::policy::QueueMode::Supersede => AdmissionPlan::Queued {
191            persist_and_queue: true,
192            queue_action: AdmissionQueueAction::EnqueueTo {
193                target: handling_mode,
194            },
195            existing_action: existing_superseded_id
196                .map(|existing_id| ExistingQueuedAdmissionAction::Supersede { existing_id }),
197        },
198        crate::policy::QueueMode::Priority => AdmissionPlan::Queued {
199            persist_and_queue: true,
200            queue_action: AdmissionQueueAction::EnqueueFront {
201                target: handling_mode,
202            },
203            existing_action: None,
204        },
205        crate::policy::QueueMode::Fifo | crate::policy::QueueMode::None => AdmissionPlan::Queued {
206            persist_and_queue: true,
207            queue_action: AdmissionQueueAction::EnqueueTo {
208                target: handling_mode,
209            },
210            existing_action: None,
211        },
212    }
213}
214
215/// Derive the handling mode from a resolved policy decision.
216pub fn handling_mode_from_policy(policy: &PolicyDecision) -> HandlingMode {
217    match policy.routing_disposition {
218        crate::policy::RoutingDisposition::Steer | crate::policy::RoutingDisposition::Immediate => {
219            // Immediate routing must use the steer lane so runtime-owned
220            // semantic facts (for example terminal peer responses) cannot get
221            // stranded behind ordinary queued prompts before their immediate
222            // apply boundary is drained. This preserves the checked-in policy
223            // contract without upgrading WakeIfIdle into an active-turn
224            // interrupt on this branch.
225            HandlingMode::Steer
226        }
227        _ => HandlingMode::Queue,
228    }
229}
230
231/// Whether this input requests immediate processing after admission.
232///
233/// This remains narrower than "routes through the steer lane". Some inputs
234/// route through checkpoint/steer paths for batching, but only explicit steer
235/// intent should request in-turn processing.
236pub fn requests_immediate_processing(input: &Input) -> bool {
237    matches!(input.handling_mode(), Some(HandlingMode::Steer))
238}
239
240/// Whether this input carries the "wake the runtime loop once the session
241/// reaches idle" intent.
242///
243/// Derived from the kind-level policy with `runtime_idle = false` (the
244/// Running-phase arm is where this flag actually matters; the Idle /
245/// Attached queued arms already emit `WakeLoop` unconditionally). Drives
246/// the DSL `wake_if_idle` field on `AcceptWithCompletion` so the machine
247/// emits `PostAdmissionSignal::WakeLoop` for late peer-response terminals
248/// and other queue-bound wake-if-idle inputs that arrive mid-turn.
249pub fn requests_wake_if_idle(input: &Input) -> bool {
250    matches!(
251        DefaultPolicyTable::resolve(input, false).wake_mode,
252        crate::WakeMode::WakeIfIdle,
253    )
254}
255
256/// Resolve the machine-owned semantic admission path for an accepted input.
257///
258/// Runtime helpers may still perform mechanical queue lookups (for example,
259/// determining which existing queued input would be superseded), but the
260/// semantic decision about policy, routing, and admission disposition is owned
261/// here rather than inside the driver helper.
262pub fn resolve_admission(
263    input: &Input,
264    runtime_idle: bool,
265    silent_intents: &[String],
266    existing_superseded_id: Option<InputId>,
267) -> ResolvedAdmission {
268    let mut policy = DefaultPolicyTable::resolve(input, runtime_idle);
269    crate::silent_intent::apply_silent_intent_override(input, silent_intents, &mut policy);
270    let handling_mode = handling_mode_from_policy(&policy);
271    let runtime_semantics =
272        crate::ingress_types::RuntimeInputSemantics::from_policy_and_kind(&policy, input.kind());
273    let primitive_projection = crate::input::runtime_input_projection(input);
274    let admission_plan = admission_plan_from_policy(&policy, handling_mode, existing_superseded_id);
275    let request_immediate_processing = requests_immediate_processing(input);
276    let interrupt_yielding = !request_immediate_processing
277        && matches!(policy.wake_mode, crate::WakeMode::InterruptYielding);
278    let wake_if_idle =
279        !request_immediate_processing && matches!(policy.wake_mode, crate::WakeMode::WakeIfIdle);
280
281    ResolvedAdmission {
282        policy,
283        handling_mode,
284        runtime_semantics,
285        primitive_projection,
286        admission_plan,
287        coarse_flags: CoarseAdmissionFlags {
288            request_immediate_processing,
289            interrupt_yielding,
290            wake_if_idle,
291        },
292    }
293}
294
295/// Classify the machine-owned post-admission control signal for a resolved
296/// accept outcome.
297///
298/// Admission-time wake / interrupt / immediate-processing semantics are owned
299/// by the checked-in Meerkat machine. The runtime helper may still carry other
300/// wake signals for non-admission bookkeeping, but plain accept classification
301/// should flow through this function.
302pub fn post_admission_signal_from_accept_outcome(
303    outcome: &AcceptOutcome,
304    request_immediate_processing: bool,
305) -> PostAdmissionSignal {
306    if !matches!(outcome, AcceptOutcome::Accepted { .. }) {
307        return PostAdmissionSignal::None;
308    }
309    if request_immediate_processing {
310        return PostAdmissionSignal::RequestImmediateProcessing;
311    }
312
313    match outcome {
314        AcceptOutcome::Accepted { policy, .. } => match policy.wake_mode {
315            crate::WakeMode::InterruptYielding => PostAdmissionSignal::InterruptYielding,
316            crate::WakeMode::WakeIfIdle => PostAdmissionSignal::WakeLoop,
317            crate::WakeMode::None => PostAdmissionSignal::None,
318        },
319        AcceptOutcome::Deduplicated { .. } | AcceptOutcome::Rejected { .. } => {
320            PostAdmissionSignal::None
321        }
322    }
323}
324
325#[cfg(test)]
326#[allow(clippy::unwrap_used)]
327mod tests {
328    use super::*;
329    use crate::identifiers::PolicyVersion;
330    use crate::policy::{
331        ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
332    };
333
334    #[test]
335    fn accepted_classifier() {
336        let outcome = AcceptOutcome::Accepted {
337            input_id: InputId::new(),
338            policy: PolicyDecision {
339                apply_mode: ApplyMode::StageRunStart,
340                wake_mode: WakeMode::WakeIfIdle,
341                queue_mode: QueueMode::Fifo,
342                consume_point: ConsumePoint::OnRunComplete,
343                drain_policy: DrainPolicy::QueueNextTurn,
344                routing_disposition: RoutingDisposition::Queue,
345                record_transcript: true,
346                emit_operator_content: true,
347                policy_version: PolicyVersion(1),
348            },
349            state: InputState::new_accepted(InputId::new()),
350        };
351        assert!(outcome.is_accepted());
352        assert!(!outcome.is_deduplicated());
353        assert!(!outcome.is_rejected());
354    }
355
356    #[test]
357    fn deduplicated_classifier() {
358        let outcome = AcceptOutcome::Deduplicated {
359            input_id: InputId::new(),
360            existing_id: InputId::new(),
361        };
362        assert!(!outcome.is_accepted());
363        assert!(outcome.is_deduplicated());
364        assert!(!outcome.is_rejected());
365    }
366
367    #[test]
368    fn rejected_classifier() {
369        let outcome = AcceptOutcome::Rejected {
370            reason: RejectReason::DurabilityViolation {
371                detail: "durability violation".into(),
372            },
373        };
374        assert!(!outcome.is_accepted());
375        assert!(!outcome.is_deduplicated());
376        assert!(outcome.is_rejected());
377    }
378
379    #[test]
380    fn reject_reason_display() {
381        let not_ready = RejectReason::NotReady {
382            state: RuntimeState::Stopped,
383        };
384        assert_eq!(
385            not_ready.to_string(),
386            "runtime not accepting input while in state: stopped"
387        );
388
389        let durability = RejectReason::DurabilityViolation {
390            detail: "Derived durability forbidden for prompt".into(),
391        };
392        assert_eq!(
393            durability.to_string(),
394            "Derived durability forbidden for prompt"
395        );
396
397        let peer = RejectReason::PeerHandlingModeInvalid {
398            detail: "handling_mode is forbidden on ResponseProgress peer inputs".into(),
399        };
400        assert_eq!(
401            peer.to_string(),
402            "handling_mode is forbidden on ResponseProgress peer inputs"
403        );
404
405        let terminal = RejectReason::PeerResponseTerminalInvalid {
406            detail: "correlation id cannot be empty".into(),
407        };
408        assert_eq!(terminal.to_string(), "correlation id cannot be empty");
409    }
410
411    #[test]
412    fn reject_reason_serde_round_trip() {
413        let reasons = vec![
414            RejectReason::NotReady {
415                state: RuntimeState::Destroyed,
416            },
417            RejectReason::DurabilityViolation {
418                detail: "external derived".into(),
419            },
420            RejectReason::PeerHandlingModeInvalid {
421                detail: "forbidden".into(),
422            },
423            RejectReason::PeerResponseTerminalInvalid {
424                detail: "bad terminal".into(),
425            },
426        ];
427        for reason in reasons {
428            let json = serde_json::to_value(&reason).unwrap();
429            let parsed: RejectReason = serde_json::from_value(json).unwrap();
430            assert_eq!(parsed, reason);
431        }
432    }
433
434    #[test]
435    fn immediate_routing_uses_steer_handling_mode() {
436        let policy = PolicyDecision {
437            apply_mode: ApplyMode::InjectNow,
438            wake_mode: WakeMode::WakeIfIdle,
439            queue_mode: QueueMode::None,
440            consume_point: ConsumePoint::OnApply,
441            drain_policy: DrainPolicy::Immediate,
442            routing_disposition: RoutingDisposition::Immediate,
443            record_transcript: true,
444            emit_operator_content: true,
445            policy_version: PolicyVersion(1),
446        };
447
448        assert_eq!(handling_mode_from_policy(&policy), HandlingMode::Steer);
449    }
450}