1use meerkat_core::lifecycle::InputId;
4use meerkat_core::types::HandlingMode;
5use serde::{Deserialize, Serialize};
6use std::fmt;
7
8use crate::input_state::{InputState, InputStateSeed};
9use crate::meerkat_machine::dsl as mm_dsl;
10use crate::policy::PolicyDecision;
11use crate::runtime_state::RuntimeState;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum AdmissionQueueAction {
23 None,
24 EnqueueTo { target: HandlingMode },
25 EnqueueFront { target: HandlingMode },
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum ExistingQueuedAdmissionAction {
31 Coalesce { existing_id: InputId },
32 Supersede { existing_id: InputId },
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum AdmissionPlan {
38 ConsumedOnAccept,
39 Queued {
40 persist_and_queue: bool,
41 queue_action: AdmissionQueueAction,
42 existing_action: Option<ExistingQueuedAdmissionAction>,
43 },
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct CoarseAdmissionFlags {
50 pub request_immediate_processing: bool,
51 pub interrupt_yielding: bool,
52 pub wake_if_idle: bool,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub(crate) struct MachineAdmissionAuthority {
58 input_id: String,
59 input_kind: mm_dsl::AdmissionInputKind,
60 requested_lane: Option<mm_dsl::InputLane>,
61 continuation_kind: mm_dsl::AdmissionContinuationKind,
62 silent_intent_match: bool,
63 existing_superseded_input_id: Option<String>,
64 runtime_running: bool,
65 active_turn_boundary_available: bool,
66 without_wake: bool,
67}
68
69impl MachineAdmissionAuthority {
70 #[allow(clippy::too_many_arguments)]
71 pub(crate) fn new(
72 input_id: String,
73 input_kind: mm_dsl::AdmissionInputKind,
74 requested_lane: Option<mm_dsl::InputLane>,
75 continuation_kind: mm_dsl::AdmissionContinuationKind,
76 silent_intent_match: bool,
77 existing_superseded_input_id: Option<InputId>,
78 runtime_running: bool,
79 active_turn_boundary_available: bool,
80 without_wake: bool,
81 ) -> Self {
82 Self {
83 input_id,
84 input_kind,
85 requested_lane,
86 continuation_kind,
87 silent_intent_match,
88 existing_superseded_input_id: existing_superseded_input_id.map(|id| id.to_string()),
89 runtime_running,
90 active_turn_boundary_available,
91 without_wake,
92 }
93 }
94
95 pub(crate) fn input_id(&self) -> &str {
96 &self.input_id
97 }
98
99 pub(crate) fn without_wake(&self) -> bool {
100 self.without_wake
101 }
102
103 pub(crate) fn active_turn_boundary_available(&self) -> bool {
104 self.active_turn_boundary_available
105 }
106
107 pub(crate) fn to_dsl_input(&self) -> mm_dsl::MeerkatMachineInput {
108 mm_dsl::MeerkatMachineInput::ResolveAdmissionPlan {
109 input_id: self.input_id.clone(),
110 input_kind: self.input_kind,
111 requested_lane: self.requested_lane,
112 continuation_kind: self.continuation_kind,
113 silent_intent_match: self.silent_intent_match,
114 existing_superseded_input_id: self.existing_superseded_input_id.clone(),
115 runtime_running: self.runtime_running,
116 active_turn_boundary_available: self.active_turn_boundary_available,
117 without_wake: self.without_wake,
118 }
119 }
120}
121
122#[derive(Debug, Clone, PartialEq)]
126pub struct ResolvedAdmission {
127 policy: PolicyDecision,
128 handling_mode: HandlingMode,
129 runtime_semantics: crate::ingress_types::RuntimeInputSemantics,
130 primitive_projection: crate::ingress_types::RuntimeInputProjection,
131 admission_plan: AdmissionPlan,
132 coarse_flags: CoarseAdmissionFlags,
133 requires_active_pre_admission: bool,
134 authority: MachineAdmissionAuthority,
135}
136
137impl ResolvedAdmission {
138 #[allow(clippy::too_many_arguments)]
139 pub(crate) fn from_machine_resolution(
140 policy: PolicyDecision,
141 handling_mode: HandlingMode,
142 runtime_semantics: crate::ingress_types::RuntimeInputSemantics,
143 primitive_projection: crate::ingress_types::RuntimeInputProjection,
144 admission_plan: AdmissionPlan,
145 coarse_flags: CoarseAdmissionFlags,
146 requires_active_pre_admission: bool,
147 authority: MachineAdmissionAuthority,
148 ) -> Self {
149 Self {
150 policy,
151 handling_mode,
152 runtime_semantics,
153 primitive_projection,
154 admission_plan,
155 coarse_flags,
156 requires_active_pre_admission,
157 authority,
158 }
159 }
160
161 pub(crate) fn coarse_flags(&self) -> CoarseAdmissionFlags {
162 self.coarse_flags
163 }
164
165 pub(crate) fn requires_active_runtime_pre_admission(&self) -> bool {
166 self.requires_active_pre_admission
167 }
168
169 pub(crate) fn stages_run_boundary(&self) -> bool {
170 self.policy.apply_mode == crate::policy::ApplyMode::StageRunBoundary
171 }
172
173 pub(crate) fn authority(&self) -> &MachineAdmissionAuthority {
174 &self.authority
175 }
176
177 pub(crate) fn into_parts(
178 self,
179 ) -> (
180 PolicyDecision,
181 HandlingMode,
182 crate::ingress_types::RuntimeInputSemantics,
183 crate::ingress_types::RuntimeInputProjection,
184 AdmissionPlan,
185 ) {
186 (
187 self.policy,
188 self.handling_mode,
189 self.runtime_semantics,
190 self.primitive_projection,
191 self.admission_plan,
192 )
193 }
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
198#[serde(tag = "reject_type", rename_all = "snake_case")]
199#[non_exhaustive]
200pub enum RejectReason {
201 NotReady {
203 state: RuntimeState,
205 },
206 DurabilityViolation {
208 detail: String,
210 },
211 PeerHandlingModeInvalid {
213 detail: String,
215 },
216 PeerResponseTerminalInvalid {
218 detail: String,
220 },
221}
222
223impl fmt::Display for RejectReason {
224 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225 match self {
226 Self::NotReady { state } => {
227 write!(f, "runtime not accepting input while in state: {state}")
228 }
229 Self::DurabilityViolation { detail } => write!(f, "{detail}"),
230 Self::PeerHandlingModeInvalid { detail } => write!(f, "{detail}"),
231 Self::PeerResponseTerminalInvalid { detail } => write!(f, "{detail}"),
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
243#[non_exhaustive]
244#[allow(clippy::large_enum_variant)]
245pub enum AcceptOutcome {
246 Accepted {
248 input_id: InputId,
250 policy: PolicyDecision,
252 state: InputState,
254 seed: InputStateSeed,
256 },
257 Deduplicated {
259 input_id: InputId,
261 existing_id: InputId,
263 },
264 Rejected {
266 reason: RejectReason,
268 },
269}
270
271impl AcceptOutcome {
272 pub fn is_accepted(&self) -> bool {
274 matches!(self, Self::Accepted { .. })
275 }
276
277 pub fn is_deduplicated(&self) -> bool {
279 matches!(self, Self::Deduplicated { .. })
280 }
281
282 pub fn is_rejected(&self) -> bool {
284 matches!(self, Self::Rejected { .. })
285 }
286}
287
288pub fn handling_mode_from_policy(policy: &PolicyDecision) -> HandlingMode {
290 match policy.routing_disposition {
291 crate::policy::RoutingDisposition::Steer | crate::policy::RoutingDisposition::Immediate => {
292 HandlingMode::Steer
299 }
300 _ => HandlingMode::Queue,
301 }
302}
303
304#[cfg(test)]
305#[allow(clippy::unwrap_used)]
306mod tests {
307 use super::*;
308 use crate::identifiers::PolicyVersion;
309 use crate::policy::{
310 ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
311 };
312
313 #[test]
314 fn accepted_classifier() {
315 let outcome = AcceptOutcome::Accepted {
316 input_id: InputId::new(),
317 policy: PolicyDecision {
318 apply_mode: ApplyMode::StageRunStart,
319 wake_mode: WakeMode::WakeIfIdle,
320 queue_mode: QueueMode::Fifo,
321 consume_point: ConsumePoint::OnRunComplete,
322 drain_policy: DrainPolicy::QueueNextTurn,
323 routing_disposition: RoutingDisposition::Queue,
324 record_transcript: true,
325 emit_operator_content: true,
326 policy_version: PolicyVersion(1),
327 },
328 state: InputState::new_accepted(InputId::new()),
329 seed: InputStateSeed::new_accepted(),
330 };
331 assert!(outcome.is_accepted());
332 assert!(!outcome.is_deduplicated());
333 assert!(!outcome.is_rejected());
334 }
335
336 #[test]
337 fn deduplicated_classifier() {
338 let outcome = AcceptOutcome::Deduplicated {
339 input_id: InputId::new(),
340 existing_id: InputId::new(),
341 };
342 assert!(!outcome.is_accepted());
343 assert!(outcome.is_deduplicated());
344 assert!(!outcome.is_rejected());
345 }
346
347 #[test]
348 fn rejected_classifier() {
349 let outcome = AcceptOutcome::Rejected {
350 reason: RejectReason::DurabilityViolation {
351 detail: "durability violation".into(),
352 },
353 };
354 assert!(!outcome.is_accepted());
355 assert!(!outcome.is_deduplicated());
356 assert!(outcome.is_rejected());
357 }
358
359 #[test]
360 fn reject_reason_display() {
361 let not_ready = RejectReason::NotReady {
362 state: RuntimeState::Stopped,
363 };
364 assert_eq!(
365 not_ready.to_string(),
366 "runtime not accepting input while in state: stopped"
367 );
368
369 let durability = RejectReason::DurabilityViolation {
370 detail: "Derived durability forbidden for prompt".into(),
371 };
372 assert_eq!(
373 durability.to_string(),
374 "Derived durability forbidden for prompt"
375 );
376
377 let peer = RejectReason::PeerHandlingModeInvalid {
378 detail: "handling_mode is forbidden on ResponseProgress peer inputs".into(),
379 };
380 assert_eq!(
381 peer.to_string(),
382 "handling_mode is forbidden on ResponseProgress peer inputs"
383 );
384
385 let terminal = RejectReason::PeerResponseTerminalInvalid {
386 detail: "correlation id cannot be empty".into(),
387 };
388 assert_eq!(terminal.to_string(), "correlation id cannot be empty");
389 }
390
391 #[test]
392 fn reject_reason_serde_round_trip() {
393 let reasons = vec![
394 RejectReason::NotReady {
395 state: RuntimeState::Destroyed,
396 },
397 RejectReason::DurabilityViolation {
398 detail: "external derived".into(),
399 },
400 RejectReason::PeerHandlingModeInvalid {
401 detail: "forbidden".into(),
402 },
403 RejectReason::PeerResponseTerminalInvalid {
404 detail: "bad terminal".into(),
405 },
406 ];
407 for reason in reasons {
408 let json = serde_json::to_value(&reason).unwrap();
409 let parsed: RejectReason = serde_json::from_value(json).unwrap();
410 assert_eq!(parsed, reason);
411 }
412 }
413
414 #[test]
415 fn immediate_routing_uses_steer_handling_mode() {
416 let policy = PolicyDecision {
417 apply_mode: ApplyMode::InjectNow,
418 wake_mode: WakeMode::WakeIfIdle,
419 queue_mode: QueueMode::None,
420 consume_point: ConsumePoint::OnApply,
421 drain_policy: DrainPolicy::Immediate,
422 routing_disposition: RoutingDisposition::Immediate,
423 record_transcript: true,
424 emit_operator_content: true,
425 policy_version: PolicyVersion(1),
426 };
427
428 assert_eq!(handling_mode_from_policy(&policy), HandlingMode::Steer);
429 }
430}