1use crate::identifiers::{KindId, PolicyVersion};
6use crate::input::Input;
7use crate::policy::{
8 ApplyMode, ConsumePoint, DrainPolicy, InterruptPolicy, PolicyDecision, QueueMode,
9 RoutingDisposition, WakeMode,
10};
11
12pub const DEFAULT_POLICY_VERSION: PolicyVersion = PolicyVersion(1);
14
15#[allow(clippy::too_many_arguments)]
17fn pd(
18 apply_mode: ApplyMode,
19 wake_mode: WakeMode,
20 queue_mode: QueueMode,
21 consume_point: ConsumePoint,
22 interrupt_policy: InterruptPolicy,
23 drain_policy: DrainPolicy,
24 routing_disposition: RoutingDisposition,
25 record_transcript: bool,
26) -> PolicyDecision {
27 PolicyDecision {
28 apply_mode,
29 wake_mode,
30 queue_mode,
31 consume_point,
32 interrupt_policy,
33 drain_policy,
34 routing_disposition,
35 record_transcript,
36 emit_operator_content: record_transcript,
37 policy_version: DEFAULT_POLICY_VERSION,
38 }
39}
40
41pub struct DefaultPolicyTable;
43
44impl DefaultPolicyTable {
45 pub fn resolve(input: &Input, runtime_idle: bool) -> PolicyDecision {
47 if let Some(mode) = input.handling_mode() {
48 return match mode {
49 meerkat_core::types::HandlingMode::Queue => pd(
50 ApplyMode::StageRunStart,
51 if runtime_idle {
52 WakeMode::WakeIfIdle
53 } else {
54 WakeMode::None
55 },
56 QueueMode::Fifo,
57 ConsumePoint::OnRunComplete,
58 InterruptPolicy::None,
59 DrainPolicy::QueueNextTurn,
60 RoutingDisposition::Queue,
61 !matches!(input, Input::Continuation(_)),
62 ),
63 meerkat_core::types::HandlingMode::Steer => pd(
64 ApplyMode::StageRunBoundary,
65 if runtime_idle {
66 WakeMode::WakeIfIdle
67 } else {
68 WakeMode::InterruptYielding
69 },
70 QueueMode::Fifo,
71 ConsumePoint::OnRunComplete,
72 InterruptPolicy::InterruptYielding,
73 DrainPolicy::SteerBatch,
74 RoutingDisposition::Steer,
75 !matches!(input, Input::Continuation(_)),
76 ),
77 };
78 }
79
80 let kind = input.kind_id();
81 Self::resolve_by_kind(&kind, runtime_idle)
82 }
83
84 pub fn resolve_by_kind(kind: &KindId, runtime_idle: bool) -> PolicyDecision {
86 match (kind.0.as_str(), runtime_idle) {
87 ("prompt", true) => pd(
89 ApplyMode::StageRunStart,
90 WakeMode::WakeIfIdle,
91 QueueMode::Fifo,
92 ConsumePoint::OnRunComplete,
93 InterruptPolicy::None,
94 DrainPolicy::QueueNextTurn,
95 RoutingDisposition::Queue,
96 true,
97 ),
98 ("prompt", false) => pd(
99 ApplyMode::StageRunStart,
100 WakeMode::None,
101 QueueMode::Fifo,
102 ConsumePoint::OnRunComplete,
103 InterruptPolicy::None,
104 DrainPolicy::QueueNextTurn,
105 RoutingDisposition::Queue,
106 true,
107 ),
108
109 ("peer_message", true) => pd(
113 ApplyMode::StageRunStart,
114 WakeMode::WakeIfIdle,
115 QueueMode::Fifo,
116 ConsumePoint::OnRunComplete,
117 InterruptPolicy::None,
118 DrainPolicy::QueueNextTurn,
119 RoutingDisposition::Queue,
120 true,
121 ),
122 ("peer_message", false) => pd(
123 ApplyMode::StageRunStart,
124 WakeMode::InterruptYielding,
125 QueueMode::Fifo,
126 ConsumePoint::OnRunComplete,
127 InterruptPolicy::InterruptYielding,
128 DrainPolicy::QueueNextTurn,
129 RoutingDisposition::Queue,
130 true,
131 ),
132
133 ("peer_request", true) => pd(
135 ApplyMode::StageRunStart,
136 WakeMode::WakeIfIdle,
137 QueueMode::Fifo,
138 ConsumePoint::OnRunComplete,
139 InterruptPolicy::None,
140 DrainPolicy::QueueNextTurn,
141 RoutingDisposition::Queue,
142 true,
143 ),
144 ("peer_request", false) => pd(
145 ApplyMode::StageRunStart,
146 WakeMode::InterruptYielding,
147 QueueMode::Fifo,
148 ConsumePoint::OnRunComplete,
149 InterruptPolicy::InterruptYielding,
150 DrainPolicy::QueueNextTurn,
151 RoutingDisposition::Queue,
152 true,
153 ),
154
155 ("peer_response_progress", true) => pd(
157 ApplyMode::StageRunBoundary,
158 WakeMode::None,
159 QueueMode::Coalesce,
160 ConsumePoint::OnRunComplete,
161 InterruptPolicy::None,
162 DrainPolicy::SteerBatch,
163 RoutingDisposition::Steer,
164 true,
165 ),
166 ("peer_response_progress", false) => pd(
167 ApplyMode::StageRunBoundary,
168 WakeMode::None,
169 QueueMode::Coalesce,
170 ConsumePoint::OnRunComplete,
171 InterruptPolicy::None,
172 DrainPolicy::SteerBatch,
173 RoutingDisposition::Steer,
174 true,
175 ),
176
177 ("peer_response_terminal", true) => pd(
179 ApplyMode::StageRunStart,
180 WakeMode::WakeIfIdle,
181 QueueMode::Fifo,
182 ConsumePoint::OnRunComplete,
183 InterruptPolicy::None,
184 DrainPolicy::QueueNextTurn,
185 RoutingDisposition::Queue,
186 true,
187 ),
188 ("peer_response_terminal", false) => pd(
189 ApplyMode::StageRunStart,
190 WakeMode::None,
191 QueueMode::Fifo,
192 ConsumePoint::OnRunComplete,
193 InterruptPolicy::None,
194 DrainPolicy::QueueNextTurn,
195 RoutingDisposition::Queue,
196 true,
197 ),
198
199 ("flow_step", true) => pd(
201 ApplyMode::StageRunStart,
202 WakeMode::WakeIfIdle,
203 QueueMode::Fifo,
204 ConsumePoint::OnRunComplete,
205 InterruptPolicy::None,
206 DrainPolicy::QueueNextTurn,
207 RoutingDisposition::Queue,
208 true,
209 ),
210 ("flow_step", false) => pd(
211 ApplyMode::StageRunStart,
212 WakeMode::None,
213 QueueMode::Fifo,
214 ConsumePoint::OnRunComplete,
215 InterruptPolicy::None,
216 DrainPolicy::QueueNextTurn,
217 RoutingDisposition::Queue,
218 true,
219 ),
220
221 ("external_event", true) => pd(
223 ApplyMode::StageRunStart,
224 WakeMode::WakeIfIdle,
225 QueueMode::Fifo,
226 ConsumePoint::OnRunComplete,
227 InterruptPolicy::None,
228 DrainPolicy::QueueNextTurn,
229 RoutingDisposition::Queue,
230 true,
231 ),
232 ("external_event", false) => pd(
233 ApplyMode::StageRunStart,
234 WakeMode::None,
235 QueueMode::Fifo,
236 ConsumePoint::OnRunComplete,
237 InterruptPolicy::None,
238 DrainPolicy::QueueNextTurn,
239 RoutingDisposition::Queue,
240 true,
241 ),
242
243 ("continuation", true) => pd(
245 ApplyMode::StageRunBoundary,
246 WakeMode::WakeIfIdle,
247 QueueMode::Fifo,
248 ConsumePoint::OnRunComplete,
249 InterruptPolicy::InterruptYielding,
250 DrainPolicy::SteerBatch,
251 RoutingDisposition::Steer,
252 false,
253 ),
254 ("continuation", false) => pd(
255 ApplyMode::StageRunBoundary,
256 WakeMode::InterruptYielding,
257 QueueMode::Fifo,
258 ConsumePoint::OnRunComplete,
259 InterruptPolicy::InterruptYielding,
260 DrainPolicy::SteerBatch,
261 RoutingDisposition::Steer,
262 false,
263 ),
264
265 ("operation", true | false) => pd(
268 ApplyMode::Ignore,
269 WakeMode::None,
270 QueueMode::Priority,
271 ConsumePoint::OnAccept,
272 InterruptPolicy::None,
273 DrainPolicy::Ignore,
274 RoutingDisposition::Drop,
275 false,
276 ),
277
278 (_, _) => pd(
280 ApplyMode::StageRunStart,
281 WakeMode::None,
282 QueueMode::Fifo,
283 ConsumePoint::OnRunComplete,
284 InterruptPolicy::None,
285 DrainPolicy::QueueNextTurn,
286 RoutingDisposition::Queue,
287 true,
288 ),
289 }
290 }
291}
292
293#[cfg(test)]
294#[allow(clippy::unwrap_used)]
295mod tests {
296 use super::*;
297
298 fn assert_cell(
299 kind: &str,
300 idle: bool,
301 expected_apply: ApplyMode,
302 expected_wake: WakeMode,
303 expected_queue: QueueMode,
304 expected_consume: ConsumePoint,
305 expected_transcript: bool,
306 ) {
307 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new(kind), idle);
308 assert_eq!(
309 decision.apply_mode, expected_apply,
310 "kind={kind}, idle={idle}: apply_mode"
311 );
312 assert_eq!(
313 decision.wake_mode, expected_wake,
314 "kind={kind}, idle={idle}: wake_mode"
315 );
316 assert_eq!(
317 decision.queue_mode, expected_queue,
318 "kind={kind}, idle={idle}: queue_mode"
319 );
320 assert_eq!(
321 decision.consume_point, expected_consume,
322 "kind={kind}, idle={idle}: consume_point"
323 );
324 assert_eq!(
325 decision.record_transcript, expected_transcript,
326 "kind={kind}, idle={idle}: record_transcript"
327 );
328 }
329
330 #[test]
331 fn prompt_idle() {
332 assert_cell(
333 "prompt",
334 true,
335 ApplyMode::StageRunStart,
336 WakeMode::WakeIfIdle,
337 QueueMode::Fifo,
338 ConsumePoint::OnRunComplete,
339 true,
340 );
341 }
342 #[test]
343 fn prompt_running() {
344 assert_cell(
345 "prompt",
346 false,
347 ApplyMode::StageRunStart,
348 WakeMode::None,
349 QueueMode::Fifo,
350 ConsumePoint::OnRunComplete,
351 true,
352 );
353 }
354 #[test]
355 fn peer_message_idle() {
356 assert_cell(
357 "peer_message",
358 true,
359 ApplyMode::StageRunStart,
360 WakeMode::WakeIfIdle,
361 QueueMode::Fifo,
362 ConsumePoint::OnRunComplete,
363 true,
364 );
365 }
366 #[test]
367 fn peer_message_running() {
368 assert_cell(
369 "peer_message",
370 false,
371 ApplyMode::StageRunStart,
372 WakeMode::InterruptYielding,
373 QueueMode::Fifo,
374 ConsumePoint::OnRunComplete,
375 true,
376 );
377 }
378 #[test]
379 fn peer_request_idle() {
380 assert_cell(
381 "peer_request",
382 true,
383 ApplyMode::StageRunStart,
384 WakeMode::WakeIfIdle,
385 QueueMode::Fifo,
386 ConsumePoint::OnRunComplete,
387 true,
388 );
389 }
390 #[test]
391 fn peer_request_running() {
392 assert_cell(
393 "peer_request",
394 false,
395 ApplyMode::StageRunStart,
396 WakeMode::InterruptYielding,
397 QueueMode::Fifo,
398 ConsumePoint::OnRunComplete,
399 true,
400 );
401 }
402 #[test]
403 fn peer_response_progress_idle() {
404 assert_cell(
405 "peer_response_progress",
406 true,
407 ApplyMode::StageRunBoundary,
408 WakeMode::None,
409 QueueMode::Coalesce,
410 ConsumePoint::OnRunComplete,
411 true,
412 );
413 }
414 #[test]
415 fn peer_response_progress_running() {
416 assert_cell(
417 "peer_response_progress",
418 false,
419 ApplyMode::StageRunBoundary,
420 WakeMode::None,
421 QueueMode::Coalesce,
422 ConsumePoint::OnRunComplete,
423 true,
424 );
425 }
426 #[test]
427 fn peer_response_terminal_idle() {
428 assert_cell(
429 "peer_response_terminal",
430 true,
431 ApplyMode::StageRunStart,
432 WakeMode::WakeIfIdle,
433 QueueMode::Fifo,
434 ConsumePoint::OnRunComplete,
435 true,
436 );
437 }
438 #[test]
439 fn peer_response_terminal_running() {
440 assert_cell(
441 "peer_response_terminal",
442 false,
443 ApplyMode::StageRunStart,
444 WakeMode::None,
445 QueueMode::Fifo,
446 ConsumePoint::OnRunComplete,
447 true,
448 );
449 }
450 #[test]
451 fn flow_step_idle() {
452 assert_cell(
453 "flow_step",
454 true,
455 ApplyMode::StageRunStart,
456 WakeMode::WakeIfIdle,
457 QueueMode::Fifo,
458 ConsumePoint::OnRunComplete,
459 true,
460 );
461 }
462 #[test]
463 fn flow_step_running() {
464 assert_cell(
465 "flow_step",
466 false,
467 ApplyMode::StageRunStart,
468 WakeMode::None,
469 QueueMode::Fifo,
470 ConsumePoint::OnRunComplete,
471 true,
472 );
473 }
474 #[test]
475 fn external_event_idle() {
476 assert_cell(
477 "external_event",
478 true,
479 ApplyMode::StageRunStart,
480 WakeMode::WakeIfIdle,
481 QueueMode::Fifo,
482 ConsumePoint::OnRunComplete,
483 true,
484 );
485 }
486 #[test]
487 fn external_event_running() {
488 assert_cell(
489 "external_event",
490 false,
491 ApplyMode::StageRunStart,
492 WakeMode::None,
493 QueueMode::Fifo,
494 ConsumePoint::OnRunComplete,
495 true,
496 );
497 }
498 #[test]
499 fn continuation_idle() {
500 assert_cell(
501 "continuation",
502 true,
503 ApplyMode::StageRunBoundary,
504 WakeMode::WakeIfIdle,
505 QueueMode::Fifo,
506 ConsumePoint::OnRunComplete,
507 false,
508 );
509 }
510 #[test]
511 fn continuation_running() {
512 assert_cell(
513 "continuation",
514 false,
515 ApplyMode::StageRunBoundary,
516 WakeMode::InterruptYielding,
517 QueueMode::Fifo,
518 ConsumePoint::OnRunComplete,
519 false,
520 );
521 }
522 #[test]
523 fn operation_idle() {
524 assert_cell(
525 "operation",
526 true,
527 ApplyMode::Ignore,
528 WakeMode::None,
529 QueueMode::Priority,
530 ConsumePoint::OnAccept,
531 false,
532 );
533 }
534 #[test]
535 fn operation_running() {
536 assert_cell(
537 "operation",
538 false,
539 ApplyMode::Ignore,
540 WakeMode::None,
541 QueueMode::Priority,
542 ConsumePoint::OnAccept,
543 false,
544 );
545 }
546
547 #[test]
548 fn resolve_via_input_object() {
549 use crate::input::*;
550 use chrono::Utc;
551 use meerkat_core::lifecycle::InputId;
552
553 let header = InputHeader {
554 id: InputId::new(),
555 timestamp: Utc::now(),
556 source: InputOrigin::Operator,
557 durability: InputDurability::Durable,
558 visibility: InputVisibility::default(),
559 idempotency_key: None,
560 supersession_key: None,
561 correlation_id: None,
562 };
563 let input = Input::Prompt(PromptInput {
564 header,
565 text: "hello".into(),
566 blocks: None,
567 turn_metadata: None,
568 });
569 let decision = DefaultPolicyTable::resolve(&input, true);
570 assert_eq!(decision.apply_mode, ApplyMode::StageRunStart);
571 assert_eq!(decision.wake_mode, WakeMode::WakeIfIdle);
572 }
573
574 #[test]
575 fn explicit_steer_metadata_maps_to_checkpoint_policy() {
576 use crate::input::*;
577 use chrono::Utc;
578 use meerkat_core::lifecycle::InputId;
579 use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
580
581 let input = Input::Prompt(PromptInput {
582 header: InputHeader {
583 id: InputId::new(),
584 timestamp: Utc::now(),
585 source: InputOrigin::Operator,
586 durability: InputDurability::Durable,
587 visibility: InputVisibility::default(),
588 idempotency_key: None,
589 supersession_key: None,
590 correlation_id: None,
591 },
592 text: "hello".into(),
593 blocks: None,
594 turn_metadata: Some(RuntimeTurnMetadata {
595 handling_mode: Some(meerkat_core::types::HandlingMode::Steer),
596 ..Default::default()
597 }),
598 });
599 let decision = DefaultPolicyTable::resolve(&input, true);
600 assert_eq!(decision.apply_mode, ApplyMode::StageRunBoundary);
601 assert_eq!(decision.drain_policy, DrainPolicy::SteerBatch);
602 assert_eq!(decision.routing_disposition, RoutingDisposition::Steer);
603 }
604
605 #[test]
606 fn peer_message_running_interrupts_yielding() {
607 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new("peer_message"), false);
610 assert_eq!(
611 decision.wake_mode,
612 WakeMode::InterruptYielding,
613 "peer_message while running must use InterruptYielding"
614 );
615 assert_ne!(
617 decision.wake_mode,
618 WakeMode::WakeIfIdle,
619 "peer_message while running must not use WakeIfIdle"
620 );
621 }
622
623 #[test]
624 fn peer_request_running_interrupts_yielding() {
625 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new("peer_request"), false);
627 assert_eq!(
628 decision.wake_mode,
629 WakeMode::InterruptYielding,
630 "peer_request while running must use InterruptYielding"
631 );
632 }
633
634 #[test]
635 fn peer_message_idle_still_wakes() {
636 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new("peer_message"), true);
638 assert_eq!(
639 decision.wake_mode,
640 WakeMode::WakeIfIdle,
641 "peer_message while idle must use WakeIfIdle"
642 );
643 }
644
645 #[test]
646 fn peer_request_idle_still_wakes() {
647 let decision = DefaultPolicyTable::resolve_by_kind(&KindId::new("peer_request"), true);
649 assert_eq!(
650 decision.wake_mode,
651 WakeMode::WakeIfIdle,
652 "peer_request while idle must use WakeIfIdle"
653 );
654 }
655}