1use meerkat_core::lifecycle::InputId;
4use meerkat_core::types::HandlingMode;
5use serde::{Deserialize, Serialize};
6use std::fmt;
7
8use crate::driver::PostAdmissionSignal;
9use crate::input::Input;
10use crate::input_state::InputState;
11use crate::policy::PolicyDecision;
12use crate::policy_table::DefaultPolicyTable;
13use crate::runtime_state::RuntimeState;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
24pub enum AdmissionQueueAction {
25 None,
26 EnqueueTo { target: HandlingMode },
27 EnqueueFront { target: HandlingMode },
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum ExistingQueuedAdmissionAction {
33 Coalesce { existing_id: InputId },
34 Supersede { existing_id: InputId },
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum AdmissionPlan {
40 ConsumedOnAccept,
41 Queued {
42 persist_and_queue: bool,
43 queue_action: AdmissionQueueAction,
44 existing_action: Option<ExistingQueuedAdmissionAction>,
45 },
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct CoarseAdmissionFlags {
52 pub request_immediate_processing: bool,
53 pub interrupt_yielding: bool,
54 pub wake_if_idle: bool,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct ResolvedAdmission {
60 pub policy: PolicyDecision,
61 pub handling_mode: HandlingMode,
62 pub runtime_semantics: crate::ingress_types::RuntimeInputSemantics,
63 pub primitive_projection: crate::ingress_types::RuntimeInputProjection,
64 pub admission_plan: AdmissionPlan,
65 pub coarse_flags: CoarseAdmissionFlags,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
70#[serde(tag = "reject_type", rename_all = "snake_case")]
71#[non_exhaustive]
72pub enum RejectReason {
73 NotReady {
75 state: RuntimeState,
77 },
78 DurabilityViolation {
80 detail: String,
82 },
83 PeerHandlingModeInvalid {
85 detail: String,
87 },
88 PeerResponseTerminalInvalid {
90 detail: String,
92 },
93}
94
95impl fmt::Display for RejectReason {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 match self {
98 Self::NotReady { state } => {
99 write!(f, "runtime not accepting input while in state: {state}")
100 }
101 Self::DurabilityViolation { detail } => write!(f, "{detail}"),
102 Self::PeerHandlingModeInvalid { detail } => write!(f, "{detail}"),
103 Self::PeerResponseTerminalInvalid { detail } => write!(f, "{detail}"),
104 }
105 }
106}
107
108#[derive(Debug, Clone)]
115#[non_exhaustive]
116#[allow(clippy::large_enum_variant)]
117pub enum AcceptOutcome {
118 Accepted {
120 input_id: InputId,
122 policy: PolicyDecision,
124 state: InputState,
126 },
127 Deduplicated {
129 input_id: InputId,
131 existing_id: InputId,
133 },
134 Rejected {
136 reason: RejectReason,
138 },
139}
140
141impl AcceptOutcome {
142 pub fn is_accepted(&self) -> bool {
144 matches!(self, Self::Accepted { .. })
145 }
146
147 pub fn is_deduplicated(&self) -> bool {
149 matches!(self, Self::Deduplicated { .. })
150 }
151
152 pub fn is_rejected(&self) -> bool {
154 matches!(self, Self::Rejected { .. })
155 }
156}
157
158pub fn admission_plan_from_policy(
163 policy: &PolicyDecision,
164 handling_mode: HandlingMode,
165 existing_superseded_id: Option<InputId>,
166) -> AdmissionPlan {
167 if policy.apply_mode == crate::policy::ApplyMode::Ignore
168 && policy.consume_point == crate::policy::ConsumePoint::OnAccept
169 {
170 return AdmissionPlan::ConsumedOnAccept;
171 }
172
173 if policy.apply_mode == crate::policy::ApplyMode::Ignore {
174 return AdmissionPlan::Queued {
175 persist_and_queue: false,
176 queue_action: AdmissionQueueAction::None,
177 existing_action: None,
178 };
179 }
180
181 match policy.queue_mode {
182 crate::policy::QueueMode::Coalesce => AdmissionPlan::Queued {
183 persist_and_queue: true,
184 queue_action: AdmissionQueueAction::EnqueueTo {
185 target: handling_mode,
186 },
187 existing_action: existing_superseded_id
188 .map(|existing_id| ExistingQueuedAdmissionAction::Coalesce { existing_id }),
189 },
190 crate::policy::QueueMode::Supersede => AdmissionPlan::Queued {
191 persist_and_queue: true,
192 queue_action: AdmissionQueueAction::EnqueueTo {
193 target: handling_mode,
194 },
195 existing_action: existing_superseded_id
196 .map(|existing_id| ExistingQueuedAdmissionAction::Supersede { existing_id }),
197 },
198 crate::policy::QueueMode::Priority => AdmissionPlan::Queued {
199 persist_and_queue: true,
200 queue_action: AdmissionQueueAction::EnqueueFront {
201 target: handling_mode,
202 },
203 existing_action: None,
204 },
205 crate::policy::QueueMode::Fifo | crate::policy::QueueMode::None => AdmissionPlan::Queued {
206 persist_and_queue: true,
207 queue_action: AdmissionQueueAction::EnqueueTo {
208 target: handling_mode,
209 },
210 existing_action: None,
211 },
212 }
213}
214
215pub fn handling_mode_from_policy(policy: &PolicyDecision) -> HandlingMode {
217 match policy.routing_disposition {
218 crate::policy::RoutingDisposition::Steer | crate::policy::RoutingDisposition::Immediate => {
219 HandlingMode::Steer
226 }
227 _ => HandlingMode::Queue,
228 }
229}
230
231pub fn requests_immediate_processing(input: &Input) -> bool {
237 matches!(input.handling_mode(), Some(HandlingMode::Steer))
238}
239
240pub fn requests_wake_if_idle(input: &Input) -> bool {
250 matches!(
251 DefaultPolicyTable::resolve(input, false).wake_mode,
252 crate::WakeMode::WakeIfIdle,
253 )
254}
255
256pub fn resolve_admission(
263 input: &Input,
264 runtime_idle: bool,
265 silent_intents: &[String],
266 existing_superseded_id: Option<InputId>,
267) -> ResolvedAdmission {
268 let mut policy = DefaultPolicyTable::resolve(input, runtime_idle);
269 crate::silent_intent::apply_silent_intent_override(input, silent_intents, &mut policy);
270 let handling_mode = handling_mode_from_policy(&policy);
271 let runtime_semantics =
272 crate::ingress_types::RuntimeInputSemantics::from_policy_and_kind(&policy, input.kind());
273 let primitive_projection = crate::input::runtime_input_projection(input);
274 let admission_plan = admission_plan_from_policy(&policy, handling_mode, existing_superseded_id);
275 let request_immediate_processing = requests_immediate_processing(input);
276 let interrupt_yielding = !request_immediate_processing
277 && matches!(policy.wake_mode, crate::WakeMode::InterruptYielding);
278 let wake_if_idle =
279 !request_immediate_processing && matches!(policy.wake_mode, crate::WakeMode::WakeIfIdle);
280
281 ResolvedAdmission {
282 policy,
283 handling_mode,
284 runtime_semantics,
285 primitive_projection,
286 admission_plan,
287 coarse_flags: CoarseAdmissionFlags {
288 request_immediate_processing,
289 interrupt_yielding,
290 wake_if_idle,
291 },
292 }
293}
294
295pub fn post_admission_signal_from_accept_outcome(
303 outcome: &AcceptOutcome,
304 request_immediate_processing: bool,
305) -> PostAdmissionSignal {
306 if !matches!(outcome, AcceptOutcome::Accepted { .. }) {
307 return PostAdmissionSignal::None;
308 }
309 if request_immediate_processing {
310 return PostAdmissionSignal::RequestImmediateProcessing;
311 }
312
313 match outcome {
314 AcceptOutcome::Accepted { policy, .. } => match policy.wake_mode {
315 crate::WakeMode::InterruptYielding => PostAdmissionSignal::InterruptYielding,
316 crate::WakeMode::WakeIfIdle => PostAdmissionSignal::WakeLoop,
317 crate::WakeMode::None => PostAdmissionSignal::None,
318 },
319 AcceptOutcome::Deduplicated { .. } | AcceptOutcome::Rejected { .. } => {
320 PostAdmissionSignal::None
321 }
322 }
323}
324
325#[cfg(test)]
326#[allow(clippy::unwrap_used)]
327mod tests {
328 use super::*;
329 use crate::identifiers::PolicyVersion;
330 use crate::policy::{
331 ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
332 };
333
334 #[test]
335 fn accepted_classifier() {
336 let outcome = AcceptOutcome::Accepted {
337 input_id: InputId::new(),
338 policy: PolicyDecision {
339 apply_mode: ApplyMode::StageRunStart,
340 wake_mode: WakeMode::WakeIfIdle,
341 queue_mode: QueueMode::Fifo,
342 consume_point: ConsumePoint::OnRunComplete,
343 drain_policy: DrainPolicy::QueueNextTurn,
344 routing_disposition: RoutingDisposition::Queue,
345 record_transcript: true,
346 emit_operator_content: true,
347 policy_version: PolicyVersion(1),
348 },
349 state: InputState::new_accepted(InputId::new()),
350 };
351 assert!(outcome.is_accepted());
352 assert!(!outcome.is_deduplicated());
353 assert!(!outcome.is_rejected());
354 }
355
356 #[test]
357 fn deduplicated_classifier() {
358 let outcome = AcceptOutcome::Deduplicated {
359 input_id: InputId::new(),
360 existing_id: InputId::new(),
361 };
362 assert!(!outcome.is_accepted());
363 assert!(outcome.is_deduplicated());
364 assert!(!outcome.is_rejected());
365 }
366
367 #[test]
368 fn rejected_classifier() {
369 let outcome = AcceptOutcome::Rejected {
370 reason: RejectReason::DurabilityViolation {
371 detail: "durability violation".into(),
372 },
373 };
374 assert!(!outcome.is_accepted());
375 assert!(!outcome.is_deduplicated());
376 assert!(outcome.is_rejected());
377 }
378
379 #[test]
380 fn reject_reason_display() {
381 let not_ready = RejectReason::NotReady {
382 state: RuntimeState::Stopped,
383 };
384 assert_eq!(
385 not_ready.to_string(),
386 "runtime not accepting input while in state: stopped"
387 );
388
389 let durability = RejectReason::DurabilityViolation {
390 detail: "Derived durability forbidden for prompt".into(),
391 };
392 assert_eq!(
393 durability.to_string(),
394 "Derived durability forbidden for prompt"
395 );
396
397 let peer = RejectReason::PeerHandlingModeInvalid {
398 detail: "handling_mode is forbidden on ResponseProgress peer inputs".into(),
399 };
400 assert_eq!(
401 peer.to_string(),
402 "handling_mode is forbidden on ResponseProgress peer inputs"
403 );
404
405 let terminal = RejectReason::PeerResponseTerminalInvalid {
406 detail: "correlation id cannot be empty".into(),
407 };
408 assert_eq!(terminal.to_string(), "correlation id cannot be empty");
409 }
410
411 #[test]
412 fn reject_reason_serde_round_trip() {
413 let reasons = vec![
414 RejectReason::NotReady {
415 state: RuntimeState::Destroyed,
416 },
417 RejectReason::DurabilityViolation {
418 detail: "external derived".into(),
419 },
420 RejectReason::PeerHandlingModeInvalid {
421 detail: "forbidden".into(),
422 },
423 RejectReason::PeerResponseTerminalInvalid {
424 detail: "bad terminal".into(),
425 },
426 ];
427 for reason in reasons {
428 let json = serde_json::to_value(&reason).unwrap();
429 let parsed: RejectReason = serde_json::from_value(json).unwrap();
430 assert_eq!(parsed, reason);
431 }
432 }
433
434 #[test]
435 fn immediate_routing_uses_steer_handling_mode() {
436 let policy = PolicyDecision {
437 apply_mode: ApplyMode::InjectNow,
438 wake_mode: WakeMode::WakeIfIdle,
439 queue_mode: QueueMode::None,
440 consume_point: ConsumePoint::OnApply,
441 drain_policy: DrainPolicy::Immediate,
442 routing_disposition: RoutingDisposition::Immediate,
443 record_transcript: true,
444 emit_operator_content: true,
445 policy_version: PolicyVersion(1),
446 };
447
448 assert_eq!(handling_mode_from_policy(&policy), HandlingMode::Steer);
449 }
450}