1use crate::identifiers::{KindId, PolicyVersion};
6use crate::input::Input;
7use crate::policy::{
8 ApplyMode, ConsumePoint, DrainPolicy, PolicyDecision, QueueMode, RoutingDisposition, WakeMode,
9};
10
11pub const DEFAULT_POLICY_VERSION: PolicyVersion = PolicyVersion(1);
13
14#[allow(clippy::too_many_arguments)]
16fn pd(
17 apply_mode: ApplyMode,
18 wake_mode: WakeMode,
19 queue_mode: QueueMode,
20 consume_point: ConsumePoint,
21 drain_policy: DrainPolicy,
22 routing_disposition: RoutingDisposition,
23 record_transcript: bool,
24) -> PolicyDecision {
25 PolicyDecision {
26 apply_mode,
27 wake_mode,
28 queue_mode,
29 consume_point,
30 drain_policy,
31 routing_disposition,
32 record_transcript,
33 emit_operator_content: record_transcript,
34 policy_version: DEFAULT_POLICY_VERSION,
35 }
36}
37
38pub struct DefaultPolicyTable;
40
41impl DefaultPolicyTable {
42 pub fn resolve(input: &Input, runtime_idle: bool) -> PolicyDecision {
51 let kind = input.kind_id();
52 let is_response_convention = matches!(kind.0.as_str(), "peer_response_progress");
57 if !is_response_convention && let Some(mode) = input.handling_mode() {
58 return match mode {
59 meerkat_core::types::HandlingMode::Queue => pd(
60 ApplyMode::StageRunStart,
61 if runtime_idle {
62 WakeMode::WakeIfIdle
63 } else {
64 WakeMode::None
65 },
66 QueueMode::Fifo,
67 ConsumePoint::OnRunComplete,
68 DrainPolicy::QueueNextTurn,
69 RoutingDisposition::Queue,
70 !matches!(input, Input::Continuation(_)),
71 ),
72 meerkat_core::types::HandlingMode::Steer => pd(
73 ApplyMode::StageRunBoundary,
74 if runtime_idle {
75 WakeMode::WakeIfIdle
76 } else {
77 WakeMode::None
78 },
79 QueueMode::Fifo,
80 ConsumePoint::OnRunComplete,
81 DrainPolicy::SteerBatch,
82 RoutingDisposition::Steer,
83 !matches!(input, Input::Continuation(_)),
84 ),
85 };
86 }
87
88 Self::resolve_by_kind(&kind, runtime_idle)
90 }
91
92 pub fn resolve_by_kind(kind: &KindId, runtime_idle: bool) -> PolicyDecision {
94 match (kind.0.as_str(), runtime_idle) {
95 ("prompt", true) => pd(
97 ApplyMode::StageRunStart,
98 WakeMode::WakeIfIdle,
99 QueueMode::Fifo,
100 ConsumePoint::OnRunComplete,
101 DrainPolicy::QueueNextTurn,
102 RoutingDisposition::Queue,
103 true,
104 ),
105 ("prompt", false) => pd(
106 ApplyMode::StageRunStart,
107 WakeMode::None,
108 QueueMode::Fifo,
109 ConsumePoint::OnRunComplete,
110 DrainPolicy::QueueNextTurn,
111 RoutingDisposition::Queue,
112 true,
113 ),
114
115 ("peer_message", true) => pd(
117 ApplyMode::StageRunStart,
118 WakeMode::WakeIfIdle,
119 QueueMode::Fifo,
120 ConsumePoint::OnRunComplete,
121 DrainPolicy::QueueNextTurn,
122 RoutingDisposition::Queue,
123 true,
124 ),
125 ("peer_message", false) => pd(
126 ApplyMode::StageRunStart,
127 WakeMode::None,
128 QueueMode::Fifo,
129 ConsumePoint::OnRunComplete,
130 DrainPolicy::QueueNextTurn,
131 RoutingDisposition::Queue,
132 true,
133 ),
134
135 ("peer_request", true) => pd(
137 ApplyMode::StageRunStart,
138 WakeMode::WakeIfIdle,
139 QueueMode::Fifo,
140 ConsumePoint::OnRunComplete,
141 DrainPolicy::QueueNextTurn,
142 RoutingDisposition::Queue,
143 true,
144 ),
145 ("peer_request", false) => pd(
146 ApplyMode::StageRunStart,
147 WakeMode::None,
148 QueueMode::Fifo,
149 ConsumePoint::OnRunComplete,
150 DrainPolicy::QueueNextTurn,
151 RoutingDisposition::Queue,
152 true,
153 ),
154
155 ("peer_response_progress", true) => pd(
157 ApplyMode::StageRunBoundary,
158 WakeMode::None,
159 QueueMode::Coalesce,
160 ConsumePoint::OnRunComplete,
161 DrainPolicy::SteerBatch,
162 RoutingDisposition::Steer,
163 true,
164 ),
165 ("peer_response_progress", false) => pd(
166 ApplyMode::StageRunBoundary,
167 WakeMode::None,
168 QueueMode::Coalesce,
169 ConsumePoint::OnRunComplete,
170 DrainPolicy::SteerBatch,
171 RoutingDisposition::Steer,
172 true,
173 ),
174
175 ("peer_response_terminal", true) => pd(
177 ApplyMode::StageRunStart,
178 WakeMode::WakeIfIdle,
179 QueueMode::Fifo,
180 ConsumePoint::OnRunComplete,
181 DrainPolicy::QueueNextTurn,
182 RoutingDisposition::Queue,
183 true,
184 ),
185 ("peer_response_terminal", false) => pd(
186 ApplyMode::StageRunStart,
187 WakeMode::None,
188 QueueMode::Fifo,
189 ConsumePoint::OnRunComplete,
190 DrainPolicy::QueueNextTurn,
191 RoutingDisposition::Queue,
192 true,
193 ),
194
195 ("flow_step", true) => pd(
197 ApplyMode::StageRunStart,
198 WakeMode::WakeIfIdle,
199 QueueMode::Fifo,
200 ConsumePoint::OnRunComplete,
201 DrainPolicy::QueueNextTurn,
202 RoutingDisposition::Queue,
203 true,
204 ),
205 ("flow_step", false) => pd(
206 ApplyMode::StageRunStart,
207 WakeMode::None,
208 QueueMode::Fifo,
209 ConsumePoint::OnRunComplete,
210 DrainPolicy::QueueNextTurn,
211 RoutingDisposition::Queue,
212 true,
213 ),
214
215 ("external_event", true) => pd(
217 ApplyMode::StageRunStart,
218 WakeMode::WakeIfIdle,
219 QueueMode::Fifo,
220 ConsumePoint::OnRunComplete,
221 DrainPolicy::QueueNextTurn,
222 RoutingDisposition::Queue,
223 true,
224 ),
225 ("external_event", false) => pd(
226 ApplyMode::StageRunStart,
227 WakeMode::None,
228 QueueMode::Fifo,
229 ConsumePoint::OnRunComplete,
230 DrainPolicy::QueueNextTurn,
231 RoutingDisposition::Queue,
232 true,
233 ),
234
235 ("continuation", true) => pd(
237 ApplyMode::StageRunBoundary,
238 WakeMode::WakeIfIdle,
239 QueueMode::Fifo,
240 ConsumePoint::OnRunComplete,
241 DrainPolicy::SteerBatch,
242 RoutingDisposition::Steer,
243 false,
244 ),
245 ("continuation", false) => pd(
246 ApplyMode::StageRunBoundary,
247 WakeMode::None,
248 QueueMode::Fifo,
249 ConsumePoint::OnRunComplete,
250 DrainPolicy::SteerBatch,
251 RoutingDisposition::Steer,
252 false,
253 ),
254
255 ("operation", true | false) => pd(
258 ApplyMode::Ignore,
259 WakeMode::None,
260 QueueMode::Priority,
261 ConsumePoint::OnAccept,
262 DrainPolicy::Ignore,
263 RoutingDisposition::Drop,
264 false,
265 ),
266
267 (_, _) => pd(
269 ApplyMode::StageRunStart,
270 WakeMode::None,
271 QueueMode::Fifo,
272 ConsumePoint::OnRunComplete,
273 DrainPolicy::QueueNextTurn,
274 RoutingDisposition::Queue,
275 true,
276 ),
277 }
278 }
279}
280
281#[cfg(test)]
282#[allow(clippy::unwrap_used)]
283mod tests {
284 use super::*;
285
286 fn assert_cell(
287 kind: &str,
288 idle: bool,
289 expected_apply: ApplyMode,
290 expected_wake: WakeMode,
291 expected_queue: QueueMode,
292 expected_consume: ConsumePoint,
293 expected_transcript: bool,
294 ) {
295 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new(kind), idle);
296 assert_eq!(
297 decision.apply_mode, expected_apply,
298 "kind={kind}, idle={idle}: apply_mode"
299 );
300 assert_eq!(
301 decision.wake_mode, expected_wake,
302 "kind={kind}, idle={idle}: wake_mode"
303 );
304 assert_eq!(
305 decision.queue_mode, expected_queue,
306 "kind={kind}, idle={idle}: queue_mode"
307 );
308 assert_eq!(
309 decision.consume_point, expected_consume,
310 "kind={kind}, idle={idle}: consume_point"
311 );
312 assert_eq!(
313 decision.record_transcript, expected_transcript,
314 "kind={kind}, idle={idle}: record_transcript"
315 );
316 }
317
318 #[test]
319 fn prompt_idle() {
320 assert_cell(
321 "prompt",
322 true,
323 ApplyMode::StageRunStart,
324 WakeMode::WakeIfIdle,
325 QueueMode::Fifo,
326 ConsumePoint::OnRunComplete,
327 true,
328 );
329 }
330 #[test]
331 fn prompt_running() {
332 assert_cell(
333 "prompt",
334 false,
335 ApplyMode::StageRunStart,
336 WakeMode::None,
337 QueueMode::Fifo,
338 ConsumePoint::OnRunComplete,
339 true,
340 );
341 }
342 #[test]
343 fn peer_message_idle() {
344 assert_cell(
345 "peer_message",
346 true,
347 ApplyMode::StageRunStart,
348 WakeMode::WakeIfIdle,
349 QueueMode::Fifo,
350 ConsumePoint::OnRunComplete,
351 true,
352 );
353 }
354 #[test]
355 fn peer_message_running() {
356 assert_cell(
357 "peer_message",
358 false,
359 ApplyMode::StageRunStart,
360 WakeMode::None,
361 QueueMode::Fifo,
362 ConsumePoint::OnRunComplete,
363 true,
364 );
365 }
366 #[test]
367 fn peer_request_idle() {
368 assert_cell(
369 "peer_request",
370 true,
371 ApplyMode::StageRunStart,
372 WakeMode::WakeIfIdle,
373 QueueMode::Fifo,
374 ConsumePoint::OnRunComplete,
375 true,
376 );
377 }
378 #[test]
379 fn peer_request_running() {
380 assert_cell(
381 "peer_request",
382 false,
383 ApplyMode::StageRunStart,
384 WakeMode::None,
385 QueueMode::Fifo,
386 ConsumePoint::OnRunComplete,
387 true,
388 );
389 }
390 #[test]
391 fn peer_response_progress_idle() {
392 assert_cell(
393 "peer_response_progress",
394 true,
395 ApplyMode::StageRunBoundary,
396 WakeMode::None,
397 QueueMode::Coalesce,
398 ConsumePoint::OnRunComplete,
399 true,
400 );
401 }
402 #[test]
403 fn peer_response_progress_running() {
404 assert_cell(
405 "peer_response_progress",
406 false,
407 ApplyMode::StageRunBoundary,
408 WakeMode::None,
409 QueueMode::Coalesce,
410 ConsumePoint::OnRunComplete,
411 true,
412 );
413 }
414 #[test]
415 fn peer_response_terminal_idle() {
416 assert_cell(
417 "peer_response_terminal",
418 true,
419 ApplyMode::StageRunStart,
420 WakeMode::WakeIfIdle,
421 QueueMode::Fifo,
422 ConsumePoint::OnRunComplete,
423 true,
424 );
425 }
426 #[test]
427 fn peer_response_terminal_running() {
428 assert_cell(
429 "peer_response_terminal",
430 false,
431 ApplyMode::StageRunStart,
432 WakeMode::None,
433 QueueMode::Fifo,
434 ConsumePoint::OnRunComplete,
435 true,
436 );
437 }
438 #[test]
439 fn flow_step_idle() {
440 assert_cell(
441 "flow_step",
442 true,
443 ApplyMode::StageRunStart,
444 WakeMode::WakeIfIdle,
445 QueueMode::Fifo,
446 ConsumePoint::OnRunComplete,
447 true,
448 );
449 }
450 #[test]
451 fn flow_step_running() {
452 assert_cell(
453 "flow_step",
454 false,
455 ApplyMode::StageRunStart,
456 WakeMode::None,
457 QueueMode::Fifo,
458 ConsumePoint::OnRunComplete,
459 true,
460 );
461 }
462 #[test]
463 fn external_event_idle() {
464 assert_cell(
465 "external_event",
466 true,
467 ApplyMode::StageRunStart,
468 WakeMode::WakeIfIdle,
469 QueueMode::Fifo,
470 ConsumePoint::OnRunComplete,
471 true,
472 );
473 }
474 #[test]
475 fn external_event_running() {
476 assert_cell(
477 "external_event",
478 false,
479 ApplyMode::StageRunStart,
480 WakeMode::None,
481 QueueMode::Fifo,
482 ConsumePoint::OnRunComplete,
483 true,
484 );
485 }
486 #[test]
487 fn continuation_idle() {
488 assert_cell(
489 "continuation",
490 true,
491 ApplyMode::StageRunBoundary,
492 WakeMode::WakeIfIdle,
493 QueueMode::Fifo,
494 ConsumePoint::OnRunComplete,
495 false,
496 );
497 }
498 #[test]
499 fn continuation_running() {
500 assert_cell(
501 "continuation",
502 false,
503 ApplyMode::StageRunBoundary,
504 WakeMode::None,
505 QueueMode::Fifo,
506 ConsumePoint::OnRunComplete,
507 false,
508 );
509 }
510 #[test]
511 fn operation_idle() {
512 assert_cell(
513 "operation",
514 true,
515 ApplyMode::Ignore,
516 WakeMode::None,
517 QueueMode::Priority,
518 ConsumePoint::OnAccept,
519 false,
520 );
521 }
522 #[test]
523 fn operation_running() {
524 assert_cell(
525 "operation",
526 false,
527 ApplyMode::Ignore,
528 WakeMode::None,
529 QueueMode::Priority,
530 ConsumePoint::OnAccept,
531 false,
532 );
533 }
534
535 #[test]
536 fn resolve_via_input_object() {
537 use crate::input::*;
538 use chrono::Utc;
539 use meerkat_core::lifecycle::InputId;
540
541 let header = InputHeader {
542 id: InputId::new(),
543 timestamp: Utc::now(),
544 source: InputOrigin::Operator,
545 durability: InputDurability::Durable,
546 visibility: InputVisibility::default(),
547 idempotency_key: None,
548 supersession_key: None,
549 correlation_id: None,
550 };
551 let input = Input::Prompt(PromptInput {
552 header,
553 text: "hello".into(),
554 blocks: None,
555 turn_metadata: None,
556 });
557 let decision = DefaultPolicyTable::resolve(&input, true);
558 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
559 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
560 }
561
562 #[test]
563 fn explicit_steer_metadata_maps_to_checkpoint_policy() {
564 use crate::input::*;
565 use chrono::Utc;
566 use meerkat_core::lifecycle::InputId;
567 use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
568
569 let input = Input::Prompt(PromptInput {
570 header: InputHeader {
571 id: InputId::new(),
572 timestamp: Utc::now(),
573 source: InputOrigin::Operator,
574 durability: InputDurability::Durable,
575 visibility: InputVisibility::default(),
576 idempotency_key: None,
577 supersession_key: None,
578 correlation_id: None,
579 },
580 text: "hello".into(),
581 blocks: None,
582 turn_metadata: Some(RuntimeTurnMetadata {
583 handling_mode: Some(meerkat_core::types::HandlingMode::Steer),
584 ..Default::default()
585 }),
586 });
587 let decision = DefaultPolicyTable::resolve(&input, true);
588 assert_eq!(decision.apply_mode, ApplyMode::StageRunBoundary);
589 assert_eq!(decision.drain_policy, DrainPolicy::SteerBatch);
590 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
591 }
592
593 #[test]
594 fn peer_message_running_stays_queued_without_wake() {
595 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new("peer_message"), false);
596 assert_eq!(
597 decision.wake_mode,
598 WakeMode::None,
599 "peer_message while running must not request a wake"
600 );
601 assert_ne!(
602 decision.wake_mode,
603 WakeMode::WakeIfIdle,
604 "peer_message while running must not use WakeIfIdle"
605 );
606 }
607
608 #[test]
609 fn peer_request_running_stays_queued_without_wake() {
610 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new("peer_request"), false);
611 assert_eq!(
612 decision.wake_mode,
613 WakeMode::None,
614 "peer_request while running must not request a wake"
615 );
616 }
617
618 #[test]
619 fn peer_message_idle_still_wakes() {
620 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new("peer_message"), true);
622 assert_eq!(
623 decision.wake_mode,
624 WakeMode::WakeIfIdle,
625 "peer_message while idle must use WakeIfIdle"
626 );
627 }
628
629 #[test]
630 fn peer_request_idle_still_wakes() {
631 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new("peer_request"), true);
633 assert_eq!(
634 decision.wake_mode,
635 WakeMode::WakeIfIdle,
636 "peer_request while idle must use WakeIfIdle"
637 );
638 }
639
640 use crate::input::{
645 InputDurability, InputHeader, InputOrigin, InputVisibility, PeerConvention, PeerInput,
646 };
647 use chrono::Utc;
648 use meerkat_core::lifecycle::InputId;
649 use meerkat_core::types::HandlingMode;
650
651 fn make_peer_input(
652 convention: Option<PeerConvention>,
653 handling_mode: Option<HandlingMode>,
654 ) -> Input {
655 Input::Peer(PeerInput {
656 header: InputHeader {
657 id: InputId::new(),
658 timestamp: Utc::now(),
659 source: InputOrigin::Peer {
660 peer_id: "p".into(),
661 runtime_id: None,
662 },
663 durability: InputDurability::Durable,
664 visibility: InputVisibility::default(),
665 idempotency_key: None,
666 supersession_key: None,
667 correlation_id: None,
668 },
669 convention,
670 body: "test".into(),
671 blocks: None,
672 handling_mode,
673 })
674 }
675
676 #[test]
677 fn peer_message_with_explicit_queue_resolves_queue_semantics() {
678 let input = make_peer_input(Some(PeerConvention::Message), Some(HandlingMode::Queue));
679 let decision = DefaultPolicyTable::resolve(&input, true);
680 assert_eq!(decision.routing_disposition, RoutingDisposition::Queue);
681 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
682 }
683
684 #[test]
685 fn peer_message_with_explicit_steer_resolves_steer_semantics() {
686 let input = make_peer_input(Some(PeerConvention::Message), Some(HandlingMode::Steer));
687 let decision = DefaultPolicyTable::resolve(&input, true);
688 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
689 assert_eq!(decision.apply_mode, ApplyMode::StageRunBoundary);
690 }
691
692 #[test]
693 fn peer_request_with_explicit_steer_resolves_steer_semantics() {
694 let input = make_peer_input(
695 Some(PeerConvention::Request {
696 request_id: "r".into(),
697 intent: "i".into(),
698 }),
699 Some(HandlingMode::Steer),
700 );
701 let decision = DefaultPolicyTable::resolve(&input, false);
702 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
703 }
704
705 #[test]
706 fn peer_no_convention_with_explicit_steer_resolves_steer_semantics() {
707 let input = make_peer_input(None, Some(HandlingMode::Steer));
708 let decision = DefaultPolicyTable::resolve(&input, true);
709 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
710 }
711
712 #[test]
713 fn peer_message_without_override_preserves_kind_default() {
714 let input = make_peer_input(Some(PeerConvention::Message), None);
715 let decision = DefaultPolicyTable::resolve(&input, true);
716 assert_eq!(decision.routing_disposition, RoutingDisposition::Queue);
718 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
719 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
720 }
721
722 #[test]
727 fn response_progress_with_handling_mode_falls_through_to_kind_default() {
728 let input = make_peer_input(
731 Some(PeerConvention::ResponseProgress {
732 request_id: "r".into(),
733 phase: crate::input::ResponseProgressPhase::InProgress,
734 }),
735 Some(HandlingMode::Steer),
736 );
737 let decision = DefaultPolicyTable::resolve(&input, true);
738 assert_eq!(decision.queue_mode, QueueMode::Coalesce);
741 assert_eq!(decision.apply_mode, ApplyMode::StageRunBoundary);
742 assert_eq!(decision.wake_mode, WakeMode::None);
743 }
744
745 #[test]
746 fn response_terminal_with_steer_gets_steer_semantics() {
747 let input = make_peer_input(
748 Some(PeerConvention::ResponseTerminal {
749 request_id: "r".into(),
750 status: crate::input::ResponseTerminalStatus::Completed,
751 }),
752 Some(HandlingMode::Steer),
753 );
754 let decision = DefaultPolicyTable::resolve(&input, true);
755 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
756 assert_eq!(decision.apply_mode, ApplyMode::StageRunBoundary);
757 assert!(decision.record_transcript);
758 }
759
760 #[test]
761 fn response_terminal_with_queue_handling_mode_gets_queue_semantics() {
762 let input = make_peer_input(
763 Some(PeerConvention::ResponseTerminal {
764 request_id: "r".into(),
765 status: crate::input::ResponseTerminalStatus::Completed,
766 }),
767 Some(HandlingMode::Queue),
768 );
769 let decision = DefaultPolicyTable::resolve(&input, true);
770 assert_eq!(decision.routing_disposition, RoutingDisposition::Queue);
771 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
772 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
773 }
774
775 #[test]
776 fn response_terminal_without_handling_mode_keeps_kind_default() {
777 let input = make_peer_input(
778 Some(PeerConvention::ResponseTerminal {
779 request_id: "r".into(),
780 status: crate::input::ResponseTerminalStatus::Completed,
781 }),
782 None,
783 );
784 let decision = DefaultPolicyTable::resolve(&input, true);
785 assert_eq!(decision.routing_disposition, RoutingDisposition::Queue);
787 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
788 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
789 }
790}