use meerkat_core::lifecycle::InputId;
use meerkat_core::types::HandlingMode;
use serde::{Deserialize, Serialize};
use std::fmt;
use crate::driver::PostAdmissionSignal;
use crate::input::Input;
use crate::input_state::InputState;
use crate::policy::PolicyDecision;
use crate::policy_table::DefaultPolicyTable;
use crate::runtime_state::RuntimeState;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AdmissionQueueAction {
None,
EnqueueTo { target: HandlingMode },
EnqueueFront { target: HandlingMode },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExistingQueuedAdmissionAction {
Coalesce { existing_id: InputId },
Supersede { existing_id: InputId },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AdmissionPlan {
ConsumedOnAccept,
Queued {
persist_and_queue: bool,
queue_action: AdmissionQueueAction,
existing_action: Option<ExistingQueuedAdmissionAction>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CoarseAdmissionFlags {
pub request_immediate_processing: bool,
pub interrupt_yielding: bool,
pub wake_if_idle: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ResolvedAdmission {
pub policy: PolicyDecision,
pub handling_mode: HandlingMode,
pub runtime_semantics: crate::ingress_types::RuntimeInputSemantics,
pub primitive_projection: crate::ingress_types::RuntimeInputProjection,
pub admission_plan: AdmissionPlan,
pub coarse_flags: CoarseAdmissionFlags,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "reject_type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum RejectReason {
NotReady {
state: RuntimeState,
},
DurabilityViolation {
detail: String,
},
PeerHandlingModeInvalid {
detail: String,
},
PeerResponseTerminalInvalid {
detail: String,
},
}
impl fmt::Display for RejectReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotReady { state } => {
write!(f, "runtime not accepting input while in state: {state}")
}
Self::DurabilityViolation { detail } => write!(f, "{detail}"),
Self::PeerHandlingModeInvalid { detail } => write!(f, "{detail}"),
Self::PeerResponseTerminalInvalid { detail } => write!(f, "{detail}"),
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
#[allow(clippy::large_enum_variant)]
pub enum AcceptOutcome {
Accepted {
input_id: InputId,
policy: PolicyDecision,
state: InputState,
},
Deduplicated {
input_id: InputId,
existing_id: InputId,
},
Rejected {
reason: RejectReason,
},
}
impl AcceptOutcome {
pub fn is_accepted(&self) -> bool {
matches!(self, Self::Accepted { .. })
}
pub fn is_deduplicated(&self) -> bool {
matches!(self, Self::Deduplicated { .. })
}
pub fn is_rejected(&self) -> bool {
matches!(self, Self::Rejected { .. })
}
}
pub fn admission_plan_from_policy(
policy: &PolicyDecision,
handling_mode: HandlingMode,
existing_superseded_id: Option<InputId>,
) -> AdmissionPlan {
if policy.apply_mode == crate::policy::ApplyMode::Ignore
&& policy.consume_point == crate::policy::ConsumePoint::OnAccept
{
return AdmissionPlan::ConsumedOnAccept;
}
if policy.apply_mode == crate::policy::ApplyMode::Ignore {
return AdmissionPlan::Queued {
persist_and_queue: false,
queue_action: AdmissionQueueAction::None,
existing_action: None,
};
}
match policy.queue_mode {
crate::policy::QueueMode::Coalesce => AdmissionPlan::Queued {
persist_and_queue: true,
queue_action: AdmissionQueueAction::EnqueueTo {
target: handling_mode,
},
existing_action: existing_superseded_id
.map(|existing_id| ExistingQueuedAdmissionAction::Coalesce { existing_id }),
},
crate::policy::QueueMode::Supersede => AdmissionPlan::Queued {
persist_and_queue: true,
queue_action: AdmissionQueueAction::EnqueueTo {
target: handling_mode,
},
existing_action: existing_superseded_id
.map(|existing_id| ExistingQueuedAdmissionAction::Supersede { existing_id }),
},
crate::policy::QueueMode::Priority => AdmissionPlan::Queued {
persist_and_queue: true,
queue_action: AdmissionQueueAction::EnqueueFront {
target: handling_mode,
},
existing_action: None,
},
crate::policy::QueueMode::Fifo | crate::policy::QueueMode::None => AdmissionPlan::Queued {
persist_and_queue: true,
queue_action: AdmissionQueueAction::EnqueueTo {
target: handling_mode,
},
existing_action: None,
},
}
}
pub fn handling_mode_from_policy(policy: &PolicyDecision) -> HandlingMode {
match policy.routing_disposition {
crate::policy::RoutingDisposition::Steer | crate::policy::RoutingDisposition::Immediate => {
HandlingMode::Steer
}
_ => HandlingMode::Queue,
}
}
pub fn requests_immediate_processing(input: &Input) -> bool {
matches!(input.handling_mode(), Some(HandlingMode::Steer))
}
pub fn requests_wake_if_idle(input: &Input) -> bool {
matches!(
DefaultPolicyTable::resolve(input, false).wake_mode,
crate::WakeMode::WakeIfIdle,
)
}
pub fn resolve_admission(
input: &Input,
runtime_idle: bool,
silent_intents: &[String],
existing_superseded_id: Option<InputId>,
) -> ResolvedAdmission {
let mut policy = DefaultPolicyTable::resolve(input, runtime_idle);
crate::silent_intent::apply_silent_intent_override(input, silent_intents, &mut policy);
let handling_mode = handling_mode_from_policy(&policy);
let runtime_semantics =
crate::ingress_types::RuntimeInputSemantics::from_policy_and_input(&policy, input);
let primitive_projection = crate::input::runtime_input_projection(input);
let admission_plan = admission_plan_from_policy(&policy, handling_mode, existing_superseded_id);
let request_immediate_processing = requests_immediate_processing(input);
let interrupt_yielding = !request_immediate_processing
&& matches!(policy.wake_mode, crate::WakeMode::InterruptYielding);
let wake_if_idle =
!request_immediate_processing && matches!(policy.wake_mode, crate::WakeMode::WakeIfIdle);
ResolvedAdmission {
policy,
handling_mode,
runtime_semantics,
primitive_projection,
admission_plan,
coarse_flags: CoarseAdmissionFlags {
request_immediate_processing,
interrupt_yielding,
wake_if_idle,
},
}
}
pub fn post_admission_signal_from_accept_outcome(
outcome: &AcceptOutcome,
request_immediate_processing: bool,
) -> PostAdmissionSignal {
if !matches!(outcome, AcceptOutcome::Accepted { .. }) {
return PostAdmissionSignal::None;
}
if request_immediate_processing {
return PostAdmissionSignal::RequestImmediateProcessing;
}
match outcome {
AcceptOutcome::Accepted { policy, .. } => match policy.wake_mode {
crate::WakeMode::InterruptYielding => PostAdmissionSignal::InterruptYielding,
crate::WakeMode::WakeIfIdle => PostAdmissionSignal::WakeLoop,
crate::WakeMode::None => PostAdmissionSignal::None,
},
AcceptOutcome::Deduplicated { .. } | AcceptOutcome::Rejected { .. } => {
PostAdmissionSignal::None
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::identifiers::PolicyVersion;
use crate::policy::{
ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
};
#[test]
fn accepted_classifier() {
let outcome = AcceptOutcome::Accepted {
input_id: InputId::new(),
policy: PolicyDecision {
apply_mode: ApplyMode::StageRunStart,
wake_mode: WakeMode::WakeIfIdle,
queue_mode: QueueMode::Fifo,
consume_point: ConsumePoint::OnRunComplete,
drain_policy: DrainPolicy::QueueNextTurn,
routing_disposition: RoutingDisposition::Queue,
record_transcript: true,
emit_operator_content: true,
policy_version: PolicyVersion(1),
},
state: InputState::new_accepted(InputId::new()),
};
assert!(outcome.is_accepted());
assert!(!outcome.is_deduplicated());
assert!(!outcome.is_rejected());
}
#[test]
fn deduplicated_classifier() {
let outcome = AcceptOutcome::Deduplicated {
input_id: InputId::new(),
existing_id: InputId::new(),
};
assert!(!outcome.is_accepted());
assert!(outcome.is_deduplicated());
assert!(!outcome.is_rejected());
}
#[test]
fn rejected_classifier() {
let outcome = AcceptOutcome::Rejected {
reason: RejectReason::DurabilityViolation {
detail: "durability violation".into(),
},
};
assert!(!outcome.is_accepted());
assert!(!outcome.is_deduplicated());
assert!(outcome.is_rejected());
}
#[test]
fn reject_reason_display() {
let not_ready = RejectReason::NotReady {
state: RuntimeState::Stopped,
};
assert_eq!(
not_ready.to_string(),
"runtime not accepting input while in state: stopped"
);
let durability = RejectReason::DurabilityViolation {
detail: "Derived durability forbidden for prompt".into(),
};
assert_eq!(
durability.to_string(),
"Derived durability forbidden for prompt"
);
let peer = RejectReason::PeerHandlingModeInvalid {
detail: "handling_mode is forbidden on ResponseProgress peer inputs".into(),
};
assert_eq!(
peer.to_string(),
"handling_mode is forbidden on ResponseProgress peer inputs"
);
let terminal = RejectReason::PeerResponseTerminalInvalid {
detail: "correlation id cannot be empty".into(),
};
assert_eq!(terminal.to_string(), "correlation id cannot be empty");
}
#[test]
fn reject_reason_serde_round_trip() {
let reasons = vec![
RejectReason::NotReady {
state: RuntimeState::Destroyed,
},
RejectReason::DurabilityViolation {
detail: "external derived".into(),
},
RejectReason::PeerHandlingModeInvalid {
detail: "forbidden".into(),
},
RejectReason::PeerResponseTerminalInvalid {
detail: "bad terminal".into(),
},
];
for reason in reasons {
let json = serde_json::to_value(&reason).unwrap();
let parsed: RejectReason = serde_json::from_value(json).unwrap();
assert_eq!(parsed, reason);
}
}
#[test]
fn immediate_routing_uses_steer_handling_mode() {
let policy = PolicyDecision {
apply_mode: ApplyMode::InjectNow,
wake_mode: WakeMode::WakeIfIdle,
queue_mode: QueueMode::None,
consume_point: ConsumePoint::OnApply,
drain_policy: DrainPolicy::Immediate,
routing_disposition: RoutingDisposition::Immediate,
record_transcript: true,
emit_operator_content: true,
policy_version: PolicyVersion(1),
};
assert_eq!(handling_mode_from_policy(&policy), HandlingMode::Steer);
}
}