1use crate::identifiers::{InputKind, KindId, PolicyVersion};
6use crate::input::Input;
7use crate::policy::{
8 ApplyMode, ConsumePoint, DrainPolicy, PolicyDecision, QueueMode, RoutingDisposition, WakeMode,
9};
10
11pub const DEFAULT_POLICY_VERSION: PolicyVersion = PolicyVersion(1);
13
14#[allow(clippy::too_many_arguments)]
16fn pd(
17 apply_mode: ApplyMode,
18 wake_mode: WakeMode,
19 queue_mode: QueueMode,
20 consume_point: ConsumePoint,
21 drain_policy: DrainPolicy,
22 routing_disposition: RoutingDisposition,
23 record_transcript: bool,
24) -> PolicyDecision {
25 PolicyDecision {
26 apply_mode,
27 wake_mode,
28 queue_mode,
29 consume_point,
30 drain_policy,
31 routing_disposition,
32 record_transcript,
33 emit_operator_content: record_transcript,
34 policy_version: DEFAULT_POLICY_VERSION,
35 }
36}
37
38pub struct DefaultPolicyTable;
40
41impl DefaultPolicyTable {
42 pub fn resolve(input: &Input, runtime_idle: bool) -> PolicyDecision {
51 let kind = input.kind();
52 let is_response_progress = matches!(kind, InputKind::PeerResponseProgress);
57 if matches!(kind, InputKind::PeerResponseTerminal)
58 && let Some(mode) = input.handling_mode()
59 {
60 let (wake_mode, drain_policy, routing_disposition) = match mode {
61 meerkat_core::types::HandlingMode::Queue => (
62 if runtime_idle {
63 WakeMode::WakeIfIdle
64 } else {
65 WakeMode::None
66 },
67 DrainPolicy::QueueNextTurn,
68 RoutingDisposition::Queue,
69 ),
70 meerkat_core::types::HandlingMode::Steer => (
71 if runtime_idle {
72 WakeMode::WakeIfIdle
73 } else {
74 WakeMode::InterruptYielding
75 },
76 DrainPolicy::SteerBatch,
77 RoutingDisposition::Steer,
78 ),
79 };
80 return pd(
81 ApplyMode::StageRunStart,
82 wake_mode,
83 QueueMode::Fifo,
84 ConsumePoint::OnRunComplete,
85 drain_policy,
86 routing_disposition,
87 true,
88 );
89 }
90 if !is_response_progress && let Some(mode) = input.handling_mode() {
91 return match mode {
92 meerkat_core::types::HandlingMode::Queue => pd(
93 ApplyMode::StageRunStart,
94 if runtime_idle {
95 WakeMode::WakeIfIdle
96 } else {
97 WakeMode::None
98 },
99 QueueMode::Fifo,
100 ConsumePoint::OnRunComplete,
101 DrainPolicy::QueueNextTurn,
102 RoutingDisposition::Queue,
103 !matches!(input, Input::Continuation(_)),
104 ),
105 meerkat_core::types::HandlingMode::Steer => pd(
106 ApplyMode::StageRunBoundary,
107 if runtime_idle {
108 WakeMode::WakeIfIdle
109 } else {
110 WakeMode::InterruptYielding
111 },
112 QueueMode::Fifo,
113 ConsumePoint::OnRunComplete,
114 DrainPolicy::SteerBatch,
115 RoutingDisposition::Steer,
116 !matches!(input, Input::Continuation(_)),
117 ),
118 };
119 }
120
121 Self::resolve_by_kind(KindId::new(kind), runtime_idle)
122 }
123
124 pub fn resolve_by_kind(kind: KindId, runtime_idle: bool) -> PolicyDecision {
126 match (kind.kind(), runtime_idle) {
127 (InputKind::Prompt, true) => pd(
129 ApplyMode::StageRunStart,
130 WakeMode::WakeIfIdle,
131 QueueMode::Fifo,
132 ConsumePoint::OnRunComplete,
133 DrainPolicy::QueueNextTurn,
134 RoutingDisposition::Queue,
135 true,
136 ),
137 (InputKind::Prompt, false) => pd(
138 ApplyMode::StageRunStart,
139 WakeMode::None,
140 QueueMode::Fifo,
141 ConsumePoint::OnRunComplete,
142 DrainPolicy::QueueNextTurn,
143 RoutingDisposition::Queue,
144 true,
145 ),
146
147 (InputKind::PeerMessage, true) => pd(
150 ApplyMode::StageRunStart,
151 WakeMode::WakeIfIdle,
152 QueueMode::Fifo,
153 ConsumePoint::OnRunComplete,
154 DrainPolicy::QueueNextTurn,
155 RoutingDisposition::Queue,
156 true,
157 ),
158 (InputKind::PeerMessage, false) => pd(
159 ApplyMode::StageRunStart,
160 WakeMode::InterruptYielding,
161 QueueMode::Fifo,
162 ConsumePoint::OnRunComplete,
163 DrainPolicy::QueueNextTurn,
164 RoutingDisposition::Queue,
165 true,
166 ),
167
168 (InputKind::PeerRequest, true) => pd(
170 ApplyMode::StageRunStart,
171 WakeMode::WakeIfIdle,
172 QueueMode::Fifo,
173 ConsumePoint::OnRunComplete,
174 DrainPolicy::QueueNextTurn,
175 RoutingDisposition::Queue,
176 true,
177 ),
178 (InputKind::PeerRequest, false) => pd(
179 ApplyMode::StageRunStart,
180 WakeMode::InterruptYielding,
181 QueueMode::Fifo,
182 ConsumePoint::OnRunComplete,
183 DrainPolicy::QueueNextTurn,
184 RoutingDisposition::Queue,
185 true,
186 ),
187
188 (InputKind::PeerResponseProgress, _) => pd(
190 ApplyMode::StageRunBoundary,
191 WakeMode::None,
192 QueueMode::Coalesce,
193 ConsumePoint::OnRunComplete,
194 DrainPolicy::SteerBatch,
195 RoutingDisposition::Steer,
196 true,
197 ),
198
199 (InputKind::PeerResponseTerminal, _) => pd(
221 ApplyMode::StageRunStart,
222 WakeMode::WakeIfIdle,
223 QueueMode::Fifo,
224 ConsumePoint::OnRunComplete,
225 DrainPolicy::QueueNextTurn,
226 RoutingDisposition::Queue,
227 true,
228 ),
229
230 (InputKind::FlowStep, true) => pd(
232 ApplyMode::StageRunStart,
233 WakeMode::WakeIfIdle,
234 QueueMode::Fifo,
235 ConsumePoint::OnRunComplete,
236 DrainPolicy::QueueNextTurn,
237 RoutingDisposition::Queue,
238 true,
239 ),
240 (InputKind::FlowStep, false) => pd(
241 ApplyMode::StageRunStart,
242 WakeMode::None,
243 QueueMode::Fifo,
244 ConsumePoint::OnRunComplete,
245 DrainPolicy::QueueNextTurn,
246 RoutingDisposition::Queue,
247 true,
248 ),
249
250 (InputKind::ExternalEvent, true) => pd(
252 ApplyMode::StageRunStart,
253 WakeMode::WakeIfIdle,
254 QueueMode::Fifo,
255 ConsumePoint::OnRunComplete,
256 DrainPolicy::QueueNextTurn,
257 RoutingDisposition::Queue,
258 true,
259 ),
260 (InputKind::ExternalEvent, false) => pd(
261 ApplyMode::StageRunStart,
262 WakeMode::None,
263 QueueMode::Fifo,
264 ConsumePoint::OnRunComplete,
265 DrainPolicy::QueueNextTurn,
266 RoutingDisposition::Queue,
267 true,
268 ),
269
270 (InputKind::Continuation, true) => pd(
272 ApplyMode::StageRunBoundary,
273 WakeMode::WakeIfIdle,
274 QueueMode::Fifo,
275 ConsumePoint::OnRunComplete,
276 DrainPolicy::SteerBatch,
277 RoutingDisposition::Steer,
278 false,
279 ),
280 (InputKind::Continuation, false) => pd(
281 ApplyMode::StageRunBoundary,
282 WakeMode::InterruptYielding,
283 QueueMode::Fifo,
284 ConsumePoint::OnRunComplete,
285 DrainPolicy::SteerBatch,
286 RoutingDisposition::Steer,
287 false,
288 ),
289
290 (InputKind::Operation, _) => pd(
293 ApplyMode::Ignore,
294 WakeMode::None,
295 QueueMode::Priority,
296 ConsumePoint::OnAccept,
297 DrainPolicy::Ignore,
298 RoutingDisposition::Drop,
299 false,
300 ),
301 }
302 }
303}
304
305#[cfg(test)]
306#[allow(clippy::unwrap_used)]
307mod tests {
308 use super::*;
309
310 fn assert_cell(
311 kind: InputKind,
312 idle: bool,
313 expected_apply: ApplyMode,
314 expected_wake: WakeMode,
315 expected_queue: QueueMode,
316 expected_consume: ConsumePoint,
317 expected_transcript: bool,
318 ) {
319 let decision = DefaultPolicyTable::resolve_by_kind(KindId::new(kind), idle);
320 assert_eq!(
321 decision.apply_mode, expected_apply,
322 "kind={kind:?}, idle={idle}: apply_mode"
323 );
324 assert_eq!(
325 decision.wake_mode, expected_wake,
326 "kind={kind:?}, idle={idle}: wake_mode"
327 );
328 assert_eq!(
329 decision.queue_mode, expected_queue,
330 "kind={kind:?}, idle={idle}: queue_mode"
331 );
332 assert_eq!(
333 decision.consume_point, expected_consume,
334 "kind={kind:?}, idle={idle}: consume_point"
335 );
336 assert_eq!(
337 decision.record_transcript, expected_transcript,
338 "kind={kind:?}, idle={idle}: record_transcript"
339 );
340 }
341
342 #[test]
343 fn prompt_idle() {
344 assert_cell(
345 InputKind::Prompt,
346 true,
347 ApplyMode::StageRunStart,
348 WakeMode::WakeIfIdle,
349 QueueMode::Fifo,
350 ConsumePoint::OnRunComplete,
351 true,
352 );
353 }
354 #[test]
355 fn prompt_running() {
356 assert_cell(
357 InputKind::Prompt,
358 false,
359 ApplyMode::StageRunStart,
360 WakeMode::None,
361 QueueMode::Fifo,
362 ConsumePoint::OnRunComplete,
363 true,
364 );
365 }
366 #[test]
367 fn peer_message_idle() {
368 assert_cell(
369 InputKind::PeerMessage,
370 true,
371 ApplyMode::StageRunStart,
372 WakeMode::WakeIfIdle,
373 QueueMode::Fifo,
374 ConsumePoint::OnRunComplete,
375 true,
376 );
377 }
378 #[test]
379 fn peer_message_running() {
380 assert_cell(
381 InputKind::PeerMessage,
382 false,
383 ApplyMode::StageRunStart,
384 WakeMode::InterruptYielding,
385 QueueMode::Fifo,
386 ConsumePoint::OnRunComplete,
387 true,
388 );
389 }
390 #[test]
391 fn peer_request_idle() {
392 assert_cell(
393 InputKind::PeerRequest,
394 true,
395 ApplyMode::StageRunStart,
396 WakeMode::WakeIfIdle,
397 QueueMode::Fifo,
398 ConsumePoint::OnRunComplete,
399 true,
400 );
401 }
402 #[test]
403 fn peer_request_running() {
404 assert_cell(
405 InputKind::PeerRequest,
406 false,
407 ApplyMode::StageRunStart,
408 WakeMode::InterruptYielding,
409 QueueMode::Fifo,
410 ConsumePoint::OnRunComplete,
411 true,
412 );
413 }
414 #[test]
415 fn peer_response_progress_idle() {
416 assert_cell(
417 InputKind::PeerResponseProgress,
418 true,
419 ApplyMode::StageRunBoundary,
420 WakeMode::None,
421 QueueMode::Coalesce,
422 ConsumePoint::OnRunComplete,
423 true,
424 );
425 }
426 #[test]
427 fn peer_response_progress_running() {
428 assert_cell(
429 InputKind::PeerResponseProgress,
430 false,
431 ApplyMode::StageRunBoundary,
432 WakeMode::None,
433 QueueMode::Coalesce,
434 ConsumePoint::OnRunComplete,
435 true,
436 );
437 }
438 #[test]
439 fn peer_response_terminal_idle() {
440 assert_cell(
441 InputKind::PeerResponseTerminal,
442 true,
443 ApplyMode::StageRunStart,
444 WakeMode::WakeIfIdle,
445 QueueMode::Fifo,
446 ConsumePoint::OnRunComplete,
447 true,
448 );
449 }
450 #[test]
451 fn peer_response_terminal_running() {
452 assert_cell(
453 InputKind::PeerResponseTerminal,
454 false,
455 ApplyMode::StageRunStart,
456 WakeMode::WakeIfIdle,
457 QueueMode::Fifo,
458 ConsumePoint::OnRunComplete,
459 true,
460 );
461 }
462 #[test]
463 fn flow_step_idle() {
464 assert_cell(
465 InputKind::FlowStep,
466 true,
467 ApplyMode::StageRunStart,
468 WakeMode::WakeIfIdle,
469 QueueMode::Fifo,
470 ConsumePoint::OnRunComplete,
471 true,
472 );
473 }
474 #[test]
475 fn flow_step_running() {
476 assert_cell(
477 InputKind::FlowStep,
478 false,
479 ApplyMode::StageRunStart,
480 WakeMode::None,
481 QueueMode::Fifo,
482 ConsumePoint::OnRunComplete,
483 true,
484 );
485 }
486 #[test]
487 fn external_event_idle() {
488 assert_cell(
489 InputKind::ExternalEvent,
490 true,
491 ApplyMode::StageRunStart,
492 WakeMode::WakeIfIdle,
493 QueueMode::Fifo,
494 ConsumePoint::OnRunComplete,
495 true,
496 );
497 }
498 #[test]
499 fn external_event_running() {
500 assert_cell(
501 InputKind::ExternalEvent,
502 false,
503 ApplyMode::StageRunStart,
504 WakeMode::None,
505 QueueMode::Fifo,
506 ConsumePoint::OnRunComplete,
507 true,
508 );
509 }
510 #[test]
511 fn continuation_idle() {
512 assert_cell(
513 InputKind::Continuation,
514 true,
515 ApplyMode::StageRunBoundary,
516 WakeMode::WakeIfIdle,
517 QueueMode::Fifo,
518 ConsumePoint::OnRunComplete,
519 false,
520 );
521 }
522 #[test]
523 fn continuation_running() {
524 assert_cell(
525 InputKind::Continuation,
526 false,
527 ApplyMode::StageRunBoundary,
528 WakeMode::InterruptYielding,
529 QueueMode::Fifo,
530 ConsumePoint::OnRunComplete,
531 false,
532 );
533 }
534 #[test]
535 fn operation_idle() {
536 assert_cell(
537 InputKind::Operation,
538 true,
539 ApplyMode::Ignore,
540 WakeMode::None,
541 QueueMode::Priority,
542 ConsumePoint::OnAccept,
543 false,
544 );
545 }
546 #[test]
547 fn operation_running() {
548 assert_cell(
549 InputKind::Operation,
550 false,
551 ApplyMode::Ignore,
552 WakeMode::None,
553 QueueMode::Priority,
554 ConsumePoint::OnAccept,
555 false,
556 );
557 }
558
559 #[test]
560 fn resolve_via_input_object() {
561 use crate::input::*;
562 use chrono::Utc;
563 use meerkat_core::lifecycle::InputId;
564
565 let header = InputHeader {
566 id: InputId::new(),
567 timestamp: Utc::now(),
568 source: InputOrigin::Operator,
569 durability: InputDurability::Durable,
570 visibility: InputVisibility::default(),
571 idempotency_key: None,
572 supersession_key: None,
573 correlation_id: None,
574 };
575 let input = Input::Prompt(PromptInput {
576 header,
577 text: "hello".into(),
578 blocks: None,
579 turn_metadata: None,
580 });
581 let decision = DefaultPolicyTable::resolve(&input, true);
582 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
583 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
584 }
585
586 #[test]
587 fn explicit_steer_metadata_maps_to_checkpoint_policy() {
588 use crate::input::*;
589 use chrono::Utc;
590 use meerkat_core::lifecycle::InputId;
591 use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
592
593 let input = Input::Prompt(PromptInput {
594 header: InputHeader {
595 id: InputId::new(),
596 timestamp: Utc::now(),
597 source: InputOrigin::Operator,
598 durability: InputDurability::Durable,
599 visibility: InputVisibility::default(),
600 idempotency_key: None,
601 supersession_key: None,
602 correlation_id: None,
603 },
604 text: "hello".into(),
605 blocks: None,
606 turn_metadata: Some(RuntimeTurnMetadata {
607 handling_mode: Some(meerkat_core::types::HandlingMode::Steer),
608 ..Default::default()
609 }),
610 });
611 let decision = DefaultPolicyTable::resolve(&input, true);
612 assert_eq!(decision.apply_mode, ApplyMode::StageRunBoundary);
613 assert_eq!(decision.drain_policy, DrainPolicy::SteerBatch);
614 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
615 }
616
617 #[test]
618 fn peer_message_running_stays_queued_without_wake() {
619 let decision =
620 DefaultPolicyTable::resolve_by_kind(KindId::new(InputKind::PeerMessage), false);
621 assert_eq!(
622 decision.wake_mode,
623 WakeMode::InterruptYielding,
624 "peer_message while running must interrupt cooperative yielding"
625 );
626 }
627
628 #[test]
629 fn peer_request_running_interrupts_yielding() {
630 let decision =
631 DefaultPolicyTable::resolve_by_kind(KindId::new(InputKind::PeerRequest), false);
632 assert_eq!(
633 decision.wake_mode,
634 WakeMode::InterruptYielding,
635 "peer_request while running must interrupt cooperative yielding"
636 );
637 }
638
639 #[test]
640 fn peer_message_idle_still_wakes() {
641 let decision =
643 DefaultPolicyTable::resolve_by_kind(KindId::new(InputKind::PeerMessage), true);
644 assert_eq!(
645 decision.wake_mode,
646 WakeMode::WakeIfIdle,
647 "peer_message while idle must use WakeIfIdle"
648 );
649 }
650
651 #[test]
652 fn peer_request_idle_still_wakes() {
653 let decision =
655 DefaultPolicyTable::resolve_by_kind(KindId::new(InputKind::PeerRequest), true);
656 assert_eq!(
657 decision.wake_mode,
658 WakeMode::WakeIfIdle,
659 "peer_request while idle must use WakeIfIdle"
660 );
661 }
662
663 use crate::input::{
668 InputDurability, InputHeader, InputOrigin, InputVisibility, PeerConvention, PeerInput,
669 };
670 use chrono::Utc;
671 use meerkat_core::lifecycle::InputId;
672 use meerkat_core::types::HandlingMode;
673
674 fn make_peer_input(
675 convention: Option<PeerConvention>,
676 handling_mode: Option<HandlingMode>,
677 ) -> Input {
678 Input::Peer(PeerInput {
679 header: InputHeader {
680 id: InputId::new(),
681 timestamp: Utc::now(),
682 source: InputOrigin::Peer {
683 peer_id: "p".into(),
684 display_identity: None,
685 runtime_id: None,
686 },
687 durability: InputDurability::Durable,
688 visibility: InputVisibility::default(),
689 idempotency_key: None,
690 supersession_key: None,
691 correlation_id: None,
692 },
693 convention,
694 body: "test".into(),
695 payload: None,
696 blocks: None,
697 handling_mode,
698 })
699 }
700
701 #[test]
702 fn peer_message_with_explicit_queue_resolves_queue_semantics() {
703 let input = make_peer_input(Some(PeerConvention::Message), Some(HandlingMode::Queue));
704 let decision = DefaultPolicyTable::resolve(&input, true);
705 assert_eq!(decision.routing_disposition, RoutingDisposition::Queue);
706 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
707 }
708
709 #[test]
710 fn peer_message_with_explicit_steer_resolves_steer_semantics() {
711 let input = make_peer_input(Some(PeerConvention::Message), Some(HandlingMode::Steer));
712 let decision = DefaultPolicyTable::resolve(&input, true);
713 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
714 assert_eq!(decision.apply_mode, ApplyMode::StageRunBoundary);
715 }
716
717 #[test]
718 fn peer_request_with_explicit_steer_resolves_steer_semantics() {
719 let input = make_peer_input(
720 Some(PeerConvention::Request {
721 request_id: "r".into(),
722 intent: "i".into(),
723 }),
724 Some(HandlingMode::Steer),
725 );
726 let decision = DefaultPolicyTable::resolve(&input, false);
727 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
728 }
729
730 #[test]
731 fn peer_no_convention_with_explicit_steer_resolves_steer_semantics() {
732 let input = make_peer_input(None, Some(HandlingMode::Steer));
733 let decision = DefaultPolicyTable::resolve(&input, true);
734 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
735 }
736
737 #[test]
738 fn peer_message_without_override_preserves_kind_default() {
739 let input = make_peer_input(Some(PeerConvention::Message), None);
740 let decision = DefaultPolicyTable::resolve(&input, true);
741 assert_eq!(decision.routing_disposition, RoutingDisposition::Queue);
743 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
744 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
745 }
746
747 #[test]
752 fn response_progress_with_handling_mode_falls_through_to_kind_default() {
753 let input = make_peer_input(
756 Some(PeerConvention::ResponseProgress {
757 request_id: "r".into(),
758 phase: crate::input::ResponseProgressPhase::InProgress,
759 }),
760 Some(HandlingMode::Steer),
761 );
762 let decision = DefaultPolicyTable::resolve(&input, true);
763 assert_eq!(decision.queue_mode, QueueMode::Coalesce);
766 assert_eq!(decision.apply_mode, ApplyMode::StageRunBoundary);
767 assert_eq!(decision.wake_mode, WakeMode::None);
768 }
769
770 #[test]
771 fn response_terminal_with_steer_gets_steer_semantics() {
772 let input = make_peer_input(
773 Some(PeerConvention::ResponseTerminal {
774 request_id: "r".into(),
775 status: crate::input::ResponseTerminalStatus::Completed,
776 }),
777 Some(HandlingMode::Steer),
778 );
779 let decision = DefaultPolicyTable::resolve(&input, true);
780 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
781 assert_eq!(
782 decision.apply_mode,
783 ApplyMode::StageRunStart,
784 "terminal peer-response apply intent owns the context+reaction boundary; steer only changes urgency/lane"
785 );
786 assert_eq!(decision.drain_policy, DrainPolicy::SteerBatch);
787 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
788 assert!(decision.record_transcript);
789 }
790
791 #[test]
792 fn response_terminal_with_queue_handling_mode_gets_queue_semantics() {
793 let input = make_peer_input(
794 Some(PeerConvention::ResponseTerminal {
795 request_id: "r".into(),
796 status: crate::input::ResponseTerminalStatus::Completed,
797 }),
798 Some(HandlingMode::Queue),
799 );
800 let decision = DefaultPolicyTable::resolve(&input, true);
801 assert_eq!(decision.routing_disposition, RoutingDisposition::Queue);
802 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
803 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
804 }
805
806 #[test]
807 fn response_terminal_without_handling_mode_keeps_kind_default() {
808 let input = make_peer_input(
809 Some(PeerConvention::ResponseTerminal {
810 request_id: "r".into(),
811 status: crate::input::ResponseTerminalStatus::Completed,
812 }),
813 None,
814 );
815 let decision = DefaultPolicyTable::resolve(&input, true);
816 assert_eq!(decision.routing_disposition, RoutingDisposition::Queue);
823 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
824 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
825 }
826}