1use std::collections::HashMap;
14use std::future::Future;
15use std::sync::Arc;
16
17use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
18use meerkat_core::lifecycle::run_control::RunControlCommand;
19use meerkat_core::lifecycle::{InputId, RunId};
20use meerkat_core::types::SessionId;
21
22use crate::accept::AcceptOutcome;
23use crate::driver::ephemeral::EphemeralRuntimeDriver;
24use crate::driver::persistent::PersistentRuntimeDriver;
25use crate::identifiers::LogicalRuntimeId;
26use crate::input::Input;
27use crate::input_machine::InputStateMachineError;
28use crate::input_state::InputState;
29use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
30use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
31use crate::store::RuntimeStore;
32use crate::tokio;
33use crate::tokio::sync::{Mutex, RwLock, mpsc};
34use crate::traits::{ResetReport, RetireReport, RuntimeDriver, RuntimeDriverError};
35
36pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
38
39pub(crate) enum DriverEntry {
41 Ephemeral(EphemeralRuntimeDriver),
42 Persistent(PersistentRuntimeDriver),
43}
44
45impl DriverEntry {
46 pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
47 match self {
48 DriverEntry::Ephemeral(d) => d,
49 DriverEntry::Persistent(d) => d,
50 }
51 }
52
53 pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
54 match self {
55 DriverEntry::Ephemeral(d) => d,
56 DriverEntry::Persistent(d) => d,
57 }
58 }
59
60 pub(crate) fn is_idle(&self) -> bool {
62 match self {
63 DriverEntry::Ephemeral(d) => d.is_idle(),
64 DriverEntry::Persistent(d) => d.is_idle(),
65 }
66 }
67
68 pub(crate) fn can_process_queue(&self) -> bool {
70 match self {
71 DriverEntry::Ephemeral(d) => d.state_machine_ref().can_process_queue(),
72 DriverEntry::Persistent(d) => d.inner_ref().state_machine_ref().can_process_queue(),
73 }
74 }
75
76 pub(crate) fn take_wake_requested(&mut self) -> bool {
78 match self {
79 DriverEntry::Ephemeral(d) => d.take_wake_requested(),
80 DriverEntry::Persistent(d) => d.take_wake_requested(),
81 }
82 }
83
84 pub(crate) fn take_process_requested(&mut self) -> bool {
86 match self {
87 DriverEntry::Ephemeral(d) => d.take_process_requested(),
88 DriverEntry::Persistent(d) => d.take_process_requested(),
89 }
90 }
91
92 pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
94 match self {
95 DriverEntry::Ephemeral(d) => d.dequeue_next(),
96 DriverEntry::Persistent(d) => d.dequeue_next(),
97 }
98 }
99
100 pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
102 match self {
103 DriverEntry::Ephemeral(d) => d.start_run(run_id),
104 DriverEntry::Persistent(d) => d.start_run(run_id),
105 }
106 }
107
108 pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
110 match self {
111 DriverEntry::Ephemeral(d) => d.complete_run(),
112 DriverEntry::Persistent(d) => d.complete_run(),
113 }
114 }
115
116 pub(crate) fn stage_input(
118 &mut self,
119 input_id: &InputId,
120 run_id: &RunId,
121 ) -> Result<(), InputStateMachineError> {
122 match self {
123 DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
124 DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
125 }
126 }
127
128 #[allow(dead_code)]
130 pub(crate) fn apply_input(
131 &mut self,
132 input_id: &InputId,
133 run_id: &RunId,
134 ) -> Result<(), InputStateMachineError> {
135 match self {
136 DriverEntry::Ephemeral(d) => d.apply_input(input_id, run_id),
137 DriverEntry::Persistent(d) => d.apply_input(input_id, run_id),
138 }
139 }
140
141 #[allow(dead_code)]
143 pub(crate) fn consume_inputs(
144 &mut self,
145 input_ids: &[InputId],
146 run_id: &RunId,
147 ) -> Result<(), InputStateMachineError> {
148 match self {
149 DriverEntry::Ephemeral(d) => d.consume_inputs(input_ids, run_id),
150 DriverEntry::Persistent(d) => d.consume_inputs(input_ids, run_id),
151 }
152 }
153
154 #[allow(dead_code)]
156 pub(crate) fn rollback_staged(
157 &mut self,
158 input_ids: &[InputId],
159 ) -> Result<(), InputStateMachineError> {
160 match self {
161 DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
162 DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
163 }
164 }
165}
166
167pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
169
170struct RuntimeSessionEntry {
172 driver: SharedDriver,
174 completions: SharedCompletionRegistry,
176 wake_tx: Option<mpsc::Sender<()>>,
178 control_tx: Option<mpsc::Sender<RunControlCommand>>,
180 _loop_handle: Option<tokio::task::JoinHandle<()>>,
182}
183
184pub struct RuntimeSessionAdapter {
191 sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
193 mode: RuntimeMode,
195 store: Option<Arc<dyn RuntimeStore>>,
197}
198
199impl RuntimeSessionAdapter {
200 pub fn ephemeral() -> Self {
202 Self {
203 sessions: RwLock::new(HashMap::new()),
204 mode: RuntimeMode::V9Compliant,
205 store: None,
206 }
207 }
208
209 pub fn persistent(store: Arc<dyn RuntimeStore>) -> Self {
211 Self {
212 sessions: RwLock::new(HashMap::new()),
213 mode: RuntimeMode::V9Compliant,
214 store: Some(store),
215 }
216 }
217
218 fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
220 let runtime_id = LogicalRuntimeId::new(session_id.to_string());
221 match &self.store {
222 Some(store) => {
223 DriverEntry::Persistent(PersistentRuntimeDriver::new(runtime_id, store.clone()))
224 }
225 None => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
226 }
227 }
228
229 pub async fn register_session(&self, session_id: SessionId) {
232 if self.contains_session(&session_id).await {
233 return;
234 }
235 let mut entry = self.make_driver(&session_id);
236 if let Err(err) = entry.as_driver_mut().recover().await {
237 tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
238 return;
239 }
240 let session_entry = RuntimeSessionEntry {
241 driver: Arc::new(Mutex::new(entry)),
242 completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
243 wake_tx: None,
244 control_tx: None,
245 _loop_handle: None,
246 };
247 let mut sessions = self.sessions.write().await;
248 sessions.entry(session_id).or_insert(session_entry);
249 }
250
251 pub async fn register_session_with_executor(
256 &self,
257 session_id: SessionId,
258 executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
259 ) {
260 self.ensure_session_with_executor(session_id, executor)
261 .await;
262 }
263
264 pub async fn ensure_session_with_executor(
270 &self,
271 session_id: SessionId,
272 executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
273 ) {
274 let mut executor = Some(executor);
275 let upgrade = {
276 let mut sessions = self.sessions.write().await;
277 if let Some(entry) = sessions.get_mut(&session_id) {
278 if entry.wake_tx.is_some() && entry.control_tx.is_some() {
279 return;
280 }
281
282 let driver = Arc::clone(&entry.driver);
283 let (wake_tx, wake_rx) = mpsc::channel(16);
284 let (control_tx, control_rx) = mpsc::channel(16);
285 let Some(executor) = executor.take() else {
286 tracing::error!(%session_id, "executor missing while upgrading existing runtime session");
287 return;
288 };
289 let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
290 driver.clone(),
291 executor,
292 wake_rx,
293 control_rx,
294 Some(entry.completions.clone()),
295 );
296
297 entry.wake_tx = Some(wake_tx.clone());
298 entry.control_tx = Some(control_tx);
299 entry._loop_handle = Some(handle);
300 Some((driver, wake_tx))
301 } else {
302 None
303 }
304 };
305
306 if let Some((driver, wake_tx)) = upgrade {
307 let should_wake = {
308 let driver = driver.lock().await;
309 !driver.as_driver().active_input_ids().is_empty()
310 };
311 if should_wake {
312 let _ = wake_tx.try_send(());
313 }
314 return;
315 }
316
317 let mut recovered_entry = self.make_driver(&session_id);
318 if let Err(err) = recovered_entry.as_driver_mut().recover().await {
319 tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
320 return;
321 }
322
323 let driver = {
324 let mut sessions = self.sessions.write().await;
325 if let Some(entry) = sessions.get(&session_id) {
326 if entry.wake_tx.is_some() && entry.control_tx.is_some() {
327 return;
328 }
329 entry.driver.clone()
330 } else {
331 let driver = Arc::new(Mutex::new(recovered_entry));
332 sessions.insert(
333 session_id.clone(),
334 RuntimeSessionEntry {
335 driver: driver.clone(),
336 completions: Arc::new(Mutex::new(
337 crate::completion::CompletionRegistry::new(),
338 )),
339 wake_tx: None,
340 control_tx: None,
341 _loop_handle: None,
342 },
343 );
344 driver
345 }
346 };
347
348 let (wake_tx, wake_rx) = mpsc::channel(16);
349 let (control_tx, control_rx) = mpsc::channel(16);
350 let Some(executor) = executor.take() else {
351 tracing::error!(%session_id, "executor missing while registering runtime session");
352 return;
353 };
354
355 let completions = {
357 let sessions = self.sessions.read().await;
358 sessions.get(&session_id).map(|e| e.completions.clone())
359 };
360
361 let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
362 driver.clone(),
363 executor,
364 wake_rx,
365 control_rx,
366 completions,
367 );
368
369 let mut sessions = self.sessions.write().await;
370 let Some(entry) = sessions.get_mut(&session_id) else {
371 return;
372 };
373 if entry.wake_tx.is_some() && entry.control_tx.is_some() {
374 return;
375 }
376 entry.wake_tx = Some(wake_tx.clone());
377 entry.control_tx = Some(control_tx);
378 entry._loop_handle = Some(handle);
379 drop(sessions);
380
381 let should_wake = {
382 let driver = driver.lock().await;
383 !driver.as_driver().active_input_ids().is_empty()
384 };
385 if should_wake {
386 let _ = wake_tx.try_send(());
387 }
388 }
389
390 pub async fn unregister_session(&self, session_id: &SessionId) {
394 self.sessions.write().await.remove(session_id);
395 }
396
397 pub async fn contains_session(&self, session_id: &SessionId) -> bool {
399 self.sessions.read().await.contains_key(session_id)
400 }
401
402 pub async fn interrupt_current_run(
404 &self,
405 session_id: &SessionId,
406 ) -> Result<(), RuntimeDriverError> {
407 let sessions = self.sessions.read().await;
408 let entry = sessions
409 .get(session_id)
410 .ok_or(RuntimeDriverError::NotReady {
411 state: RuntimeState::Destroyed,
412 })?;
413 let Some(control_tx) = &entry.control_tx else {
414 return Err(RuntimeDriverError::NotReady {
415 state: RuntimeState::Destroyed,
416 });
417 };
418 control_tx
419 .send(RunControlCommand::CancelCurrentRun {
420 reason: "mob interrupt".to_string(),
421 })
422 .await
423 .map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
424 }
425
426 pub async fn accept_input_and_run<T, F, Fut>(
431 &self,
432 session_id: &SessionId,
433 input: Input,
434 op: F,
435 ) -> Result<T, RuntimeDriverError>
436 where
437 F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
438 Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
439 {
440 let driver = {
441 let sessions = self.sessions.read().await;
442 sessions
443 .get(session_id)
444 .ok_or(RuntimeDriverError::NotReady {
445 state: RuntimeState::Destroyed,
446 })?
447 .driver
448 .clone()
449 };
450
451 let (input_id, run_id, primitive) = {
452 let mut driver = driver.lock().await;
453 if !driver.is_idle() || !driver.as_driver().active_input_ids().is_empty() {
454 return Err(RuntimeDriverError::NotReady {
455 state: driver.as_driver().runtime_state(),
456 });
457 }
458 let outcome = driver.as_driver_mut().accept_input(input).await?;
459 let input_id = match outcome {
460 AcceptOutcome::Accepted { input_id, .. } => input_id,
461 AcceptOutcome::Deduplicated { existing_id, .. } => existing_id,
462 AcceptOutcome::Rejected { reason } => {
463 return Err(RuntimeDriverError::ValidationFailed { reason });
464 }
465 };
466
467 if !driver.is_idle() {
468 return Err(RuntimeDriverError::NotReady {
469 state: driver.as_driver().runtime_state(),
470 });
471 }
472
473 let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
474 RuntimeDriverError::Internal("accepted input was not queued for execution".into())
475 })?;
476 if dequeued_id != input_id {
477 return Err(RuntimeDriverError::NotReady {
478 state: driver.as_driver().runtime_state(),
479 });
480 }
481 let run_id = RunId::new();
482 driver.start_run(run_id.clone()).map_err(|err| {
483 RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
484 })?;
485 driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
486 RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
487 })?;
488 let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
489 (input_id, run_id, primitive)
490 };
491
492 match op(run_id.clone(), primitive.clone()).await {
493 Ok((result, output)) => {
494 let mut driver = driver.lock().await;
495 if let Err(err) = driver
496 .as_driver_mut()
497 .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
498 run_id: run_id.clone(),
499 receipt: output.receipt,
500 session_snapshot: output.session_snapshot,
501 })
502 .await
503 {
504 if let Err(unwind_err) = driver
505 .as_driver_mut()
506 .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
507 run_id,
508 error: format!("boundary commit failed: {err}"),
509 recoverable: true,
510 })
511 .await
512 {
513 return Err(RuntimeDriverError::Internal(format!(
514 "runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
515 )));
516 }
517 return Err(RuntimeDriverError::Internal(format!(
518 "runtime boundary commit failed: {err}"
519 )));
520 }
521 if let Err(err) = driver
522 .as_driver_mut()
523 .on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
524 run_id,
525 consumed_input_ids: vec![input_id],
526 })
527 .await
528 {
529 drop(driver);
530 self.unregister_session(session_id).await;
531 return Err(RuntimeDriverError::Internal(format!(
532 "failed to persist runtime completion snapshot: {err}"
533 )));
534 }
535 Ok(result)
536 }
537 Err(err) => {
538 let mut driver = driver.lock().await;
539 if let Err(run_err) = driver
540 .as_driver_mut()
541 .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
542 run_id,
543 error: err.to_string(),
544 recoverable: true,
545 })
546 .await
547 {
548 drop(driver);
549 self.unregister_session(session_id).await;
550 return Err(RuntimeDriverError::Internal(format!(
551 "failed to persist runtime failure snapshot: {run_err}"
552 )));
553 }
554 Err(err)
555 }
556 }
557 }
558
559 pub async fn accept_input_with_completion(
568 &self,
569 session_id: &SessionId,
570 input: Input,
571 ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
572 {
573 let sessions = self.sessions.read().await;
574 let entry = sessions
575 .get(session_id)
576 .ok_or(RuntimeDriverError::NotReady {
577 state: RuntimeState::Destroyed,
578 })?;
579
580 let (outcome, should_wake, should_process, handle) = {
581 let mut driver = entry.driver.lock().await;
582 let result = driver.as_driver_mut().accept_input(input).await?;
583
584 match &result {
585 AcceptOutcome::Accepted { input_id, .. } => {
586 let handle = {
587 let mut completions = entry.completions.lock().await;
588 completions.register(input_id.clone())
589 };
590 let wake = driver.take_wake_requested();
591 let process_now = driver.take_process_requested();
592 (result, wake, process_now, Some(handle))
593 }
594 AcceptOutcome::Deduplicated { existing_id, .. } => {
595 let existing_state = driver.as_driver().input_state(existing_id);
597 let is_terminal = existing_state
598 .map(|s| s.current_state.is_terminal())
599 .unwrap_or(true); if is_terminal {
602 (result, false, false, None)
604 } else {
605 let handle = {
607 let mut completions = entry.completions.lock().await;
608 completions.register(existing_id.clone())
609 };
610 (result, false, false, Some(handle))
611 }
612 }
613 AcceptOutcome::Rejected { reason } => {
614 return Err(RuntimeDriverError::ValidationFailed {
615 reason: reason.clone(),
616 });
617 }
618 }
619 };
620
621 if (should_wake || should_process)
622 && let Some(ref wake_tx) = entry.wake_tx
623 {
624 let _ = wake_tx.try_send(());
625 }
626
627 Ok((outcome, handle))
628 }
629
630 #[allow(dead_code)]
634 pub(crate) async fn completion_registry(
635 &self,
636 session_id: &SessionId,
637 ) -> Option<SharedCompletionRegistry> {
638 let sessions = self.sessions.read().await;
639 sessions.get(session_id).map(|e| e.completions.clone())
640 }
641}
642
643#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
644#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
645impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
646 fn runtime_mode(&self) -> RuntimeMode {
647 self.mode
648 }
649
650 async fn accept_input(
651 &self,
652 session_id: &SessionId,
653 input: Input,
654 ) -> Result<AcceptOutcome, RuntimeDriverError> {
655 let sessions = self.sessions.read().await;
656 let entry = sessions
657 .get(session_id)
658 .ok_or(RuntimeDriverError::NotReady {
659 state: RuntimeState::Destroyed,
660 })?;
661
662 let (outcome, should_wake, should_process) = {
664 let mut driver = entry.driver.lock().await;
665 let result = driver.as_driver_mut().accept_input(input).await?;
666 let wake = driver.take_wake_requested();
667 let process_now = driver.take_process_requested();
668 (result, wake, process_now)
669 };
670
671 if (should_wake || should_process)
673 && let Some(ref wake_tx) = entry.wake_tx
674 {
675 let _ = wake_tx.try_send(());
677 }
678
679 Ok(outcome)
680 }
681
682 async fn runtime_state(
683 &self,
684 session_id: &SessionId,
685 ) -> Result<RuntimeState, RuntimeDriverError> {
686 let sessions = self.sessions.read().await;
687 let entry = sessions
688 .get(session_id)
689 .ok_or(RuntimeDriverError::NotReady {
690 state: RuntimeState::Destroyed,
691 })?;
692 let driver = entry.driver.lock().await;
693 Ok(driver.as_driver().runtime_state())
694 }
695
696 async fn retire_runtime(
697 &self,
698 session_id: &SessionId,
699 ) -> Result<RetireReport, RuntimeDriverError> {
700 let sessions = self.sessions.read().await;
701 let entry = sessions
702 .get(session_id)
703 .ok_or(RuntimeDriverError::NotReady {
704 state: RuntimeState::Destroyed,
705 })?;
706 let mut driver = entry.driver.lock().await;
707 let report = driver.as_driver_mut().retire().await?;
708 drop(driver); if report.inputs_pending_drain > 0 {
711 if let Some(ref wake_tx) = entry.wake_tx {
714 let _ = wake_tx.send(()).await;
715 }
716 }
717
718 if entry.wake_tx.is_none() {
720 let mut completions = entry.completions.lock().await;
721 completions.resolve_all_terminated("retired without runtime loop");
722 }
723
724 Ok(report)
725 }
726
727 async fn reset_runtime(
728 &self,
729 session_id: &SessionId,
730 ) -> Result<ResetReport, RuntimeDriverError> {
731 let sessions = self.sessions.read().await;
732 let entry = sessions
733 .get(session_id)
734 .ok_or(RuntimeDriverError::NotReady {
735 state: RuntimeState::Destroyed,
736 })?;
737 let mut driver = entry.driver.lock().await;
738 if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
739 return Err(RuntimeDriverError::NotReady {
740 state: RuntimeState::Running,
741 });
742 }
743 let report = driver.as_driver_mut().reset().await?;
744
745 let mut completions = entry.completions.lock().await;
747 completions.resolve_all_terminated("runtime reset");
748
749 Ok(report)
750 }
751
752 async fn input_state(
753 &self,
754 session_id: &SessionId,
755 input_id: &InputId,
756 ) -> Result<Option<InputState>, RuntimeDriverError> {
757 let sessions = self.sessions.read().await;
758 let entry = sessions
759 .get(session_id)
760 .ok_or(RuntimeDriverError::NotReady {
761 state: RuntimeState::Destroyed,
762 })?;
763 let driver = entry.driver.lock().await;
764 Ok(driver.as_driver().input_state(input_id).cloned())
765 }
766
767 async fn list_active_inputs(
768 &self,
769 session_id: &SessionId,
770 ) -> Result<Vec<InputId>, RuntimeDriverError> {
771 let sessions = self.sessions.read().await;
772 let entry = sessions
773 .get(session_id)
774 .ok_or(RuntimeDriverError::NotReady {
775 state: RuntimeState::Destroyed,
776 })?;
777 let driver = entry.driver.lock().await;
778 Ok(driver.as_driver().active_input_ids())
779 }
780}
781
782#[cfg(test)]
783#[allow(clippy::unwrap_used)]
784mod tests {
785 use super::*;
786 use crate::input::*;
787 use crate::input_state::InputState;
788 use crate::runtime_state::RuntimeState;
789 use crate::store::{RuntimeStore, RuntimeStoreError, SessionDelta};
790 use chrono::Utc;
791 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
792 use std::time::Duration;
793
794 fn make_prompt(text: &str) -> Input {
795 Input::Prompt(PromptInput {
796 header: InputHeader {
797 id: InputId::new(),
798 timestamp: Utc::now(),
799 source: InputOrigin::Operator,
800 durability: InputDurability::Durable,
801 visibility: InputVisibility::default(),
802 idempotency_key: None,
803 supersession_key: None,
804 correlation_id: None,
805 },
806 text: text.into(),
807 turn_metadata: None,
808 })
809 }
810
811 struct HarnessRuntimeStore {
812 inner: crate::store::InMemoryRuntimeStore,
813 fail_atomic_apply: bool,
814 fail_atomic_lifecycle_commit_after: Option<usize>,
816 atomic_lifecycle_commit_calls: AtomicUsize,
817 load_input_states_delay: Duration,
818 fail_persist_input_state_after: Option<usize>,
819 persist_input_state_calls: AtomicUsize,
820 }
821
822 impl HarnessRuntimeStore {
823 fn failing_atomic_apply() -> Self {
824 Self {
825 inner: crate::store::InMemoryRuntimeStore::new(),
826 fail_atomic_apply: true,
827 fail_atomic_lifecycle_commit_after: None,
828 atomic_lifecycle_commit_calls: AtomicUsize::new(0),
829 load_input_states_delay: Duration::ZERO,
830 fail_persist_input_state_after: None,
831 persist_input_state_calls: AtomicUsize::new(0),
832 }
833 }
834
835 fn delayed_recover(delay: Duration) -> Self {
836 Self {
837 inner: crate::store::InMemoryRuntimeStore::new(),
838 fail_atomic_apply: false,
839 fail_atomic_lifecycle_commit_after: None,
840 atomic_lifecycle_commit_calls: AtomicUsize::new(0),
841 load_input_states_delay: delay,
842 fail_persist_input_state_after: None,
843 persist_input_state_calls: AtomicUsize::new(0),
844 }
845 }
846
847 fn failing_terminal_snapshot() -> Self {
848 Self {
849 inner: crate::store::InMemoryRuntimeStore::new(),
850 fail_atomic_apply: false,
851 fail_atomic_lifecycle_commit_after: Some(1),
854 atomic_lifecycle_commit_calls: AtomicUsize::new(0),
855 load_input_states_delay: Duration::ZERO,
856 fail_persist_input_state_after: None,
857 persist_input_state_calls: AtomicUsize::new(0),
858 }
859 }
860 }
861
862 #[async_trait::async_trait]
863 impl RuntimeStore for HarnessRuntimeStore {
864 async fn commit_session_boundary(
865 &self,
866 runtime_id: &crate::identifiers::LogicalRuntimeId,
867 session_delta: SessionDelta,
868 run_id: RunId,
869 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
870 contributing_input_ids: Vec<InputId>,
871 input_updates: Vec<InputState>,
872 ) -> Result<meerkat_core::lifecycle::RunBoundaryReceipt, RuntimeStoreError> {
873 self.inner
874 .commit_session_boundary(
875 runtime_id,
876 session_delta,
877 run_id,
878 boundary,
879 contributing_input_ids,
880 input_updates,
881 )
882 .await
883 }
884
885 async fn atomic_apply(
886 &self,
887 runtime_id: &crate::identifiers::LogicalRuntimeId,
888 session_delta: Option<SessionDelta>,
889 receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
890 input_updates: Vec<InputState>,
891 session_store_key: Option<meerkat_core::types::SessionId>,
892 ) -> Result<(), RuntimeStoreError> {
893 if self.fail_atomic_apply {
894 return Err(RuntimeStoreError::WriteFailed(
895 "synthetic atomic_apply failure".to_string(),
896 ));
897 }
898 self.inner
899 .atomic_apply(
900 runtime_id,
901 session_delta,
902 receipt,
903 input_updates,
904 session_store_key,
905 )
906 .await
907 }
908
909 async fn load_input_states(
910 &self,
911 runtime_id: &crate::identifiers::LogicalRuntimeId,
912 ) -> Result<Vec<InputState>, RuntimeStoreError> {
913 if !self.load_input_states_delay.is_zero() {
914 tokio::time::sleep(self.load_input_states_delay).await;
915 }
916 self.inner.load_input_states(runtime_id).await
917 }
918
919 async fn load_boundary_receipt(
920 &self,
921 runtime_id: &crate::identifiers::LogicalRuntimeId,
922 run_id: &RunId,
923 sequence: u64,
924 ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeStoreError>
925 {
926 self.inner
927 .load_boundary_receipt(runtime_id, run_id, sequence)
928 .await
929 }
930
931 async fn load_session_snapshot(
932 &self,
933 runtime_id: &crate::identifiers::LogicalRuntimeId,
934 ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
935 self.inner.load_session_snapshot(runtime_id).await
936 }
937
938 async fn persist_input_state(
939 &self,
940 runtime_id: &crate::identifiers::LogicalRuntimeId,
941 state: &InputState,
942 ) -> Result<(), RuntimeStoreError> {
943 let call_index = self
944 .persist_input_state_calls
945 .fetch_add(1, Ordering::SeqCst);
946 if self
947 .fail_persist_input_state_after
948 .is_some_and(|fail_after| call_index >= fail_after)
949 {
950 return Err(RuntimeStoreError::WriteFailed(
951 "synthetic persist_input_state failure".to_string(),
952 ));
953 }
954 self.inner.persist_input_state(runtime_id, state).await
955 }
956
957 async fn load_input_state(
958 &self,
959 runtime_id: &crate::identifiers::LogicalRuntimeId,
960 input_id: &InputId,
961 ) -> Result<Option<InputState>, RuntimeStoreError> {
962 self.inner.load_input_state(runtime_id, input_id).await
963 }
964
965 async fn persist_runtime_state(
966 &self,
967 runtime_id: &crate::identifiers::LogicalRuntimeId,
968 state: RuntimeState,
969 ) -> Result<(), RuntimeStoreError> {
970 self.inner.persist_runtime_state(runtime_id, state).await
971 }
972
973 async fn load_runtime_state(
974 &self,
975 runtime_id: &crate::identifiers::LogicalRuntimeId,
976 ) -> Result<Option<RuntimeState>, RuntimeStoreError> {
977 self.inner.load_runtime_state(runtime_id).await
978 }
979
980 async fn atomic_lifecycle_commit(
981 &self,
982 runtime_id: &crate::identifiers::LogicalRuntimeId,
983 runtime_state: RuntimeState,
984 input_states: &[InputState],
985 ) -> Result<(), RuntimeStoreError> {
986 let call_index = self
987 .atomic_lifecycle_commit_calls
988 .fetch_add(1, Ordering::SeqCst);
989 if self
990 .fail_atomic_lifecycle_commit_after
991 .is_some_and(|fail_after| call_index >= fail_after)
992 {
993 return Err(RuntimeStoreError::WriteFailed(
994 "synthetic atomic_lifecycle_commit failure".to_string(),
995 ));
996 }
997 self.inner
998 .atomic_lifecycle_commit(runtime_id, runtime_state, input_states)
999 .await
1000 }
1001 }
1002
1003 #[tokio::test]
1004 async fn ephemeral_adapter_accept_and_query() {
1005 let adapter = RuntimeSessionAdapter::ephemeral();
1006 let sid = SessionId::new();
1007 adapter.register_session(sid.clone()).await;
1008
1009 let input = make_prompt("hello");
1010 let outcome = adapter.accept_input(&sid, input).await.unwrap();
1011 assert!(outcome.is_accepted());
1012
1013 let state = adapter.runtime_state(&sid).await.unwrap();
1014 assert_eq!(state, RuntimeState::Idle);
1015
1016 let active = adapter.list_active_inputs(&sid).await.unwrap();
1017 assert_eq!(active.len(), 1);
1018 }
1019
1020 #[tokio::test]
1021 async fn persistent_adapter_accept() {
1022 let store = Arc::new(crate::store::InMemoryRuntimeStore::new());
1023 let adapter = RuntimeSessionAdapter::persistent(store);
1024 let sid = SessionId::new();
1025 adapter.register_session(sid.clone()).await;
1026
1027 let input = make_prompt("hello");
1028 let outcome = adapter.accept_input(&sid, input).await.unwrap();
1029 assert!(outcome.is_accepted());
1030 }
1031
1032 #[tokio::test]
1033 async fn unregistered_session_errors() {
1034 let adapter = RuntimeSessionAdapter::ephemeral();
1035 let sid = SessionId::new();
1036 let result = adapter.accept_input(&sid, make_prompt("hi")).await;
1037 assert!(result.is_err());
1038 }
1039
1040 #[tokio::test]
1041 async fn unregister_removes_driver() {
1042 let adapter = RuntimeSessionAdapter::ephemeral();
1043 let sid = SessionId::new();
1044 adapter.register_session(sid.clone()).await;
1045 adapter.unregister_session(&sid).await;
1046
1047 let result = adapter.runtime_state(&sid).await;
1048 assert!(result.is_err());
1049 }
1050
1051 #[tokio::test]
1053 async fn accept_with_executor_triggers_loop() {
1054 use meerkat_core::lifecycle::RunId;
1055 use meerkat_core::lifecycle::core_executor::{
1056 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1057 };
1058 use meerkat_core::lifecycle::run_control::RunControlCommand;
1059 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1060 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1061 use std::sync::atomic::{AtomicBool, Ordering};
1062
1063 let apply_called = Arc::new(AtomicBool::new(false));
1065 let apply_called_clone = apply_called.clone();
1066
1067 struct TestExecutor {
1068 called: Arc<AtomicBool>,
1069 }
1070
1071 #[async_trait::async_trait]
1072 impl CoreExecutor for TestExecutor {
1073 async fn apply(
1074 &mut self,
1075 run_id: RunId,
1076 primitive: RunPrimitive,
1077 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1078 self.called.store(true, Ordering::SeqCst);
1079 Ok(CoreApplyOutput {
1080 receipt: RunBoundaryReceipt {
1081 run_id,
1082 boundary: RunApplyBoundary::RunStart,
1083 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1084 conversation_digest: None,
1085 message_count: 0,
1086 sequence: 0,
1087 },
1088 session_snapshot: None,
1089 run_result: None,
1090 })
1091 }
1092
1093 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1094 Ok(())
1095 }
1096 }
1097
1098 let adapter = RuntimeSessionAdapter::ephemeral();
1099 let sid = SessionId::new();
1100 let executor = Box::new(TestExecutor {
1101 called: apply_called_clone,
1102 });
1103 adapter
1104 .register_session_with_executor(sid.clone(), executor)
1105 .await;
1106
1107 let input = make_prompt("hello from executor test");
1109 let outcome = adapter.accept_input(&sid, input).await.unwrap();
1110 assert!(outcome.is_accepted());
1111
1112 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1114
1115 assert!(
1116 apply_called.load(Ordering::SeqCst),
1117 "CoreExecutor::apply() should have been called by the RuntimeLoop"
1118 );
1119
1120 let state = adapter.runtime_state(&sid).await.unwrap();
1122 assert_eq!(state, RuntimeState::Idle);
1123
1124 let active = adapter.list_active_inputs(&sid).await.unwrap();
1126 assert!(active.is_empty(), "All inputs should be consumed");
1127 }
1128
1129 #[tokio::test]
1131 async fn failed_executor_requeues_input() {
1132 use crate::input_state::InputLifecycleState;
1133 use meerkat_core::lifecycle::core_executor::{
1134 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1135 };
1136 use meerkat_core::lifecycle::run_control::RunControlCommand;
1137 use meerkat_core::lifecycle::run_primitive::RunPrimitive;
1138 struct FailingExecutor;
1139
1140 #[async_trait::async_trait]
1141 impl CoreExecutor for FailingExecutor {
1142 async fn apply(
1143 &mut self,
1144 _run_id: RunId,
1145 _primitive: RunPrimitive,
1146 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1147 Err(CoreExecutorError::ApplyFailed {
1148 reason: "LLM error".into(),
1149 })
1150 }
1151
1152 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1153 Ok(())
1154 }
1155 }
1156
1157 let adapter = RuntimeSessionAdapter::ephemeral();
1158 let sid = SessionId::new();
1159 adapter
1160 .register_session_with_executor(sid.clone(), Box::new(FailingExecutor))
1161 .await;
1162
1163 let input = make_prompt("hello failing");
1164 let input_id = input.id().clone();
1165 adapter.accept_input(&sid, input).await.unwrap();
1166
1167 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1169
1170 let state = adapter.runtime_state(&sid).await.unwrap();
1172 assert_eq!(state, RuntimeState::Idle);
1173
1174 let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1176 assert_eq!(
1177 is.current_state,
1178 InputLifecycleState::Queued,
1179 "Failed execution should roll input back to Queued, not strand in AppliedPendingConsumption"
1180 );
1181 }
1182
1183 #[tokio::test]
1184 async fn failed_executor_continues_processing_backlog() {
1185 use crate::input_state::InputLifecycleState;
1186 use meerkat_core::lifecycle::core_executor::{
1187 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1188 };
1189 use meerkat_core::lifecycle::run_control::RunControlCommand;
1190 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1191 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1192
1193 struct FailThenSucceedExecutor {
1194 calls: Arc<AtomicUsize>,
1195 }
1196
1197 #[async_trait::async_trait]
1198 impl CoreExecutor for FailThenSucceedExecutor {
1199 async fn apply(
1200 &mut self,
1201 run_id: RunId,
1202 primitive: RunPrimitive,
1203 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1204 let call = self.calls.fetch_add(1, Ordering::SeqCst);
1205 tokio::time::sleep(Duration::from_millis(50)).await;
1206 if call == 0 {
1207 return Err(CoreExecutorError::ApplyFailed {
1208 reason: "first run fails".into(),
1209 });
1210 }
1211 Ok(CoreApplyOutput {
1212 receipt: RunBoundaryReceipt {
1213 run_id,
1214 boundary: RunApplyBoundary::RunStart,
1215 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1216 conversation_digest: None,
1217 message_count: 0,
1218 sequence: 0,
1219 },
1220 session_snapshot: None,
1221 run_result: None,
1222 })
1223 }
1224
1225 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1226 Ok(())
1227 }
1228 }
1229
1230 let adapter = RuntimeSessionAdapter::ephemeral();
1231 let sid = SessionId::new();
1232 let calls = Arc::new(AtomicUsize::new(0));
1233 adapter
1234 .register_session_with_executor(
1235 sid.clone(),
1236 Box::new(FailThenSucceedExecutor {
1237 calls: Arc::clone(&calls),
1238 }),
1239 )
1240 .await;
1241
1242 let first = make_prompt("first");
1243 let first_id = first.id().clone();
1244 let second = make_prompt("second");
1245 let second_id = second.id().clone();
1246 adapter.accept_input(&sid, first).await.unwrap();
1247 tokio::time::sleep(Duration::from_millis(10)).await;
1248 adapter.accept_input(&sid, second).await.unwrap();
1249
1250 tokio::time::sleep(Duration::from_millis(220)).await;
1251
1252 let second_state = adapter
1253 .input_state(&sid, &second_id)
1254 .await
1255 .unwrap()
1256 .unwrap();
1257 assert_eq!(second_state.current_state, InputLifecycleState::Consumed);
1258 assert_eq!(
1259 adapter.runtime_state(&sid).await.unwrap(),
1260 RuntimeState::Idle
1261 );
1262 assert!(
1263 calls.load(Ordering::SeqCst) >= 2,
1264 "the runtime loop should keep draining queued backlog after a failed run"
1265 );
1266 let first_state = adapter.input_state(&sid, &first_id).await.unwrap().unwrap();
1267 assert!(
1268 matches!(
1269 first_state.current_state,
1270 InputLifecycleState::Queued | InputLifecycleState::Consumed
1271 ),
1272 "the initially failed input should have been safely rolled back or retried after the backlog drained"
1273 );
1274 }
1275
1276 #[tokio::test]
1277 async fn ensure_session_with_executor_upgrades_registered_session() {
1278 use crate::input_state::InputLifecycleState;
1279 use meerkat_core::lifecycle::RunId;
1280 use meerkat_core::lifecycle::core_executor::{
1281 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1282 };
1283 use meerkat_core::lifecycle::run_control::RunControlCommand;
1284 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1285 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1286 use std::sync::atomic::{AtomicBool, Ordering};
1287
1288 struct SuccessExecutor {
1289 called: Arc<AtomicBool>,
1290 }
1291
1292 #[async_trait::async_trait]
1293 impl CoreExecutor for SuccessExecutor {
1294 async fn apply(
1295 &mut self,
1296 run_id: RunId,
1297 primitive: RunPrimitive,
1298 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1299 self.called.store(true, Ordering::SeqCst);
1300 Ok(CoreApplyOutput {
1301 receipt: RunBoundaryReceipt {
1302 run_id,
1303 boundary: RunApplyBoundary::RunStart,
1304 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1305 conversation_digest: None,
1306 message_count: 0,
1307 sequence: 0,
1308 },
1309 session_snapshot: None,
1310 run_result: None,
1311 })
1312 }
1313
1314 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1315 Ok(())
1316 }
1317 }
1318
1319 let apply_called = Arc::new(AtomicBool::new(false));
1320 let adapter = RuntimeSessionAdapter::ephemeral();
1321 let sid = SessionId::new();
1322 adapter.register_session(sid.clone()).await;
1323
1324 let input = make_prompt("upgrade me");
1325 let input_id = input.id().clone();
1326 let outcome = adapter.accept_input(&sid, input).await.unwrap();
1327 assert!(outcome.is_accepted());
1328
1329 adapter
1330 .ensure_session_with_executor(
1331 sid.clone(),
1332 Box::new(SuccessExecutor {
1333 called: Arc::clone(&apply_called),
1334 }),
1335 )
1336 .await;
1337
1338 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1339
1340 assert!(
1341 apply_called.load(Ordering::SeqCst),
1342 "upgrading an already-registered session should attach a live loop"
1343 );
1344
1345 let state = adapter.runtime_state(&sid).await.unwrap();
1346 assert_eq!(state, RuntimeState::Idle);
1347
1348 let active = adapter.list_active_inputs(&sid).await.unwrap();
1349 assert!(active.is_empty(), "queued work should drain after upgrade");
1350
1351 let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1352 assert_eq!(
1353 is.current_state,
1354 InputLifecycleState::Consumed,
1355 "the pre-upgrade queued input should be processed once the loop is attached"
1356 );
1357 }
1358
1359 #[tokio::test]
1360 async fn ensure_session_with_executor_upgrades_racy_registration() {
1361 use crate::input_state::InputLifecycleState;
1362 use meerkat_core::lifecycle::RunId;
1363 use meerkat_core::lifecycle::core_executor::{
1364 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1365 };
1366 use meerkat_core::lifecycle::run_control::RunControlCommand;
1367 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1368 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1369
1370 struct SuccessExecutor {
1371 called: Arc<AtomicBool>,
1372 }
1373
1374 #[async_trait::async_trait]
1375 impl CoreExecutor for SuccessExecutor {
1376 async fn apply(
1377 &mut self,
1378 run_id: RunId,
1379 primitive: RunPrimitive,
1380 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1381 self.called.store(true, Ordering::SeqCst);
1382 Ok(CoreApplyOutput {
1383 receipt: RunBoundaryReceipt {
1384 run_id,
1385 boundary: RunApplyBoundary::RunStart,
1386 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1387 conversation_digest: None,
1388 message_count: 0,
1389 sequence: 0,
1390 },
1391 session_snapshot: None,
1392 run_result: None,
1393 })
1394 }
1395
1396 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1397 Ok(())
1398 }
1399 }
1400
1401 let store = Arc::new(HarnessRuntimeStore::delayed_recover(Duration::from_millis(
1402 75,
1403 )));
1404 let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
1405 let sid = SessionId::new();
1406 let apply_called = Arc::new(AtomicBool::new(false));
1407
1408 let ensure_task = {
1409 let adapter = Arc::clone(&adapter);
1410 let sid = sid.clone();
1411 let apply_called = Arc::clone(&apply_called);
1412 tokio::spawn(async move {
1413 adapter
1414 .ensure_session_with_executor(
1415 sid,
1416 Box::new(SuccessExecutor {
1417 called: apply_called,
1418 }),
1419 )
1420 .await;
1421 })
1422 };
1423
1424 tokio::time::sleep(Duration::from_millis(10)).await;
1425 adapter.register_session(sid.clone()).await;
1426 ensure_task.await.unwrap();
1427
1428 let input = make_prompt("race upgrade");
1429 let input_id = input.id().clone();
1430 adapter.accept_input(&sid, input).await.unwrap();
1431 tokio::time::sleep(Duration::from_millis(120)).await;
1432
1433 assert!(
1434 apply_called.load(Ordering::SeqCst),
1435 "the racy registration path should still attach a live runtime loop"
1436 );
1437 let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1438 assert_eq!(state.current_state, InputLifecycleState::Consumed);
1439 }
1440
1441 #[tokio::test]
1442 async fn boundary_commit_failure_unwinds_sync_runtime_state() {
1443 use crate::input_state::InputLifecycleState;
1444 use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
1445 use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1446 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1447
1448 let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
1449 let adapter = RuntimeSessionAdapter::persistent(store);
1450 let sid = SessionId::new();
1451 adapter.register_session(sid.clone()).await;
1452
1453 let input = make_prompt("sync boundary failure");
1454 let input_id = input.id().clone();
1455 let result = adapter
1456 .accept_input_and_run(&sid, input, move |run_id, primitive| async move {
1457 Ok((
1458 (),
1459 CoreApplyOutput {
1460 receipt: RunBoundaryReceipt {
1461 run_id,
1462 boundary: RunApplyBoundary::RunStart,
1463 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1464 conversation_digest: None,
1465 message_count: 0,
1466 sequence: 0,
1467 },
1468 session_snapshot: None,
1469 run_result: None,
1470 },
1471 ))
1472 })
1473 .await;
1474 assert!(result.is_err(), "boundary commit failure should surface");
1475 let Err(err) = result else {
1476 unreachable!("asserted runtime boundary commit failure above");
1477 };
1478 assert!(
1479 err.to_string().contains("runtime boundary commit failed"),
1480 "unexpected error: {err}"
1481 );
1482 assert_eq!(
1483 adapter.runtime_state(&sid).await.unwrap(),
1484 RuntimeState::Idle
1485 );
1486 let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1487 assert_eq!(state.current_state, InputLifecycleState::Queued);
1488 }
1489
1490 #[tokio::test]
1491 async fn boundary_commit_failure_unwinds_runtime_loop_state() {
1492 use crate::input_state::InputLifecycleState;
1493 use meerkat_core::lifecycle::core_executor::{
1494 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1495 };
1496 use meerkat_core::lifecycle::run_control::RunControlCommand;
1497 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1498 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1499
1500 struct SuccessExecutor {
1501 stop_called: Arc<AtomicBool>,
1502 }
1503
1504 #[async_trait::async_trait]
1505 impl CoreExecutor for SuccessExecutor {
1506 async fn apply(
1507 &mut self,
1508 run_id: RunId,
1509 primitive: RunPrimitive,
1510 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1511 Ok(CoreApplyOutput {
1512 receipt: RunBoundaryReceipt {
1513 run_id,
1514 boundary: RunApplyBoundary::RunStart,
1515 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1516 conversation_digest: None,
1517 message_count: 0,
1518 sequence: 0,
1519 },
1520 session_snapshot: None,
1521 run_result: None,
1522 })
1523 }
1524
1525 async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1526 if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
1527 self.stop_called.store(true, Ordering::SeqCst);
1528 }
1529 Ok(())
1530 }
1531 }
1532
1533 let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
1534 let adapter = RuntimeSessionAdapter::persistent(store);
1535 let sid = SessionId::new();
1536 let stop_called = Arc::new(AtomicBool::new(false));
1537 adapter
1538 .register_session_with_executor(
1539 sid.clone(),
1540 Box::new(SuccessExecutor {
1541 stop_called: Arc::clone(&stop_called),
1542 }),
1543 )
1544 .await;
1545
1546 let input = make_prompt("loop boundary failure");
1547 let input_id = input.id().clone();
1548 adapter.accept_input(&sid, input).await.unwrap();
1549 tokio::time::sleep(Duration::from_millis(120)).await;
1550
1551 assert!(
1552 stop_called.load(Ordering::SeqCst),
1553 "boundary commit failures should stop the dead executor path"
1554 );
1555 assert_eq!(
1556 adapter.runtime_state(&sid).await.unwrap(),
1557 RuntimeState::Idle
1558 );
1559 let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1560 assert_eq!(state.current_state, InputLifecycleState::Queued);
1561 }
1562
1563 #[tokio::test]
1564 async fn terminal_snapshot_failure_unregisters_runtime_loop_session() {
1565 use meerkat_core::lifecycle::core_executor::{
1566 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1567 };
1568 use meerkat_core::lifecycle::run_control::RunControlCommand;
1569 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1570 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1571
1572 struct SuccessExecutor {
1573 adapter: Arc<RuntimeSessionAdapter>,
1574 session_id: SessionId,
1575 stop_called: Arc<AtomicBool>,
1576 }
1577
1578 #[async_trait::async_trait]
1579 impl CoreExecutor for SuccessExecutor {
1580 async fn apply(
1581 &mut self,
1582 run_id: RunId,
1583 primitive: RunPrimitive,
1584 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1585 Ok(CoreApplyOutput {
1586 receipt: RunBoundaryReceipt {
1587 run_id,
1588 boundary: RunApplyBoundary::RunStart,
1589 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1590 conversation_digest: None,
1591 message_count: 0,
1592 sequence: 0,
1593 },
1594 session_snapshot: None,
1595 run_result: None,
1596 })
1597 }
1598
1599 async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1600 if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
1601 self.stop_called.store(true, Ordering::SeqCst);
1602 self.adapter.unregister_session(&self.session_id).await;
1603 }
1604 Ok(())
1605 }
1606 }
1607
1608 let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
1609 let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
1610 let sid = SessionId::new();
1611 let stop_called = Arc::new(AtomicBool::new(false));
1612 adapter
1613 .register_session_with_executor(
1614 sid.clone(),
1615 Box::new(SuccessExecutor {
1616 adapter: Arc::clone(&adapter),
1617 session_id: sid.clone(),
1618 stop_called: Arc::clone(&stop_called),
1619 }),
1620 )
1621 .await;
1622
1623 adapter
1624 .accept_input(&sid, make_prompt("terminal snapshot failure"))
1625 .await
1626 .unwrap();
1627 tokio::time::sleep(Duration::from_millis(120)).await;
1628
1629 assert!(
1630 stop_called.load(Ordering::SeqCst),
1631 "terminal snapshot persistence failures should stop the runtime loop"
1632 );
1633 let state_result = adapter.runtime_state(&sid).await;
1634 assert!(
1635 state_result.is_err(),
1636 "stopped runtime sessions should be unregistered"
1637 );
1638 let Err(err) = state_result else {
1639 unreachable!("asserted stopped runtime unregistration above");
1640 };
1641 assert!(matches!(
1642 err,
1643 RuntimeDriverError::NotReady {
1644 state: RuntimeState::Destroyed
1645 }
1646 ));
1647 }
1648
1649 #[tokio::test]
1650 async fn terminal_snapshot_failure_unregisters_sync_runtime_session() {
1651 use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
1652 use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1653 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1654
1655 let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
1656 let adapter = RuntimeSessionAdapter::persistent(store);
1657 let sid = SessionId::new();
1658 adapter.register_session(sid.clone()).await;
1659
1660 let result = adapter
1661 .accept_input_and_run(
1662 &sid,
1663 make_prompt("sync terminal snapshot failure"),
1664 move |run_id, primitive| async move {
1665 Ok((
1666 (),
1667 CoreApplyOutput {
1668 receipt: RunBoundaryReceipt {
1669 run_id,
1670 boundary: RunApplyBoundary::RunStart,
1671 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1672 conversation_digest: None,
1673 message_count: 0,
1674 sequence: 0,
1675 },
1676 session_snapshot: None,
1677 run_result: None,
1678 },
1679 ))
1680 },
1681 )
1682 .await;
1683 assert!(
1684 result.is_err(),
1685 "terminal snapshot persistence failure should surface"
1686 );
1687 let Err(err) = result else {
1688 unreachable!("asserted terminal snapshot failure above");
1689 };
1690
1691 assert!(
1692 err.to_string().contains("terminal event persist failed")
1693 || err
1694 .to_string()
1695 .contains("failed to persist runtime completion snapshot"),
1696 "unexpected error: {err}"
1697 );
1698 let runtime_state = adapter.runtime_state(&sid).await;
1699 assert!(
1700 matches!(
1701 runtime_state,
1702 Err(RuntimeDriverError::NotReady {
1703 state: RuntimeState::Destroyed
1704 })
1705 ),
1706 "sync path should unregister the broken runtime session"
1707 );
1708 }
1709
1710 #[tokio::test]
1714 async fn dedup_terminal_input_returns_none_handle() {
1715 use crate::identifiers::IdempotencyKey;
1716 use meerkat_core::lifecycle::core_executor::{
1717 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1718 };
1719 use meerkat_core::lifecycle::run_control::RunControlCommand;
1720 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1721 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1722 use meerkat_core::types::{RunResult, Usage};
1723
1724 struct ResultExecutor;
1725 #[async_trait::async_trait]
1726 impl CoreExecutor for ResultExecutor {
1727 async fn apply(
1728 &mut self,
1729 run_id: RunId,
1730 primitive: RunPrimitive,
1731 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1732 Ok(CoreApplyOutput {
1733 receipt: RunBoundaryReceipt {
1734 run_id,
1735 boundary: RunApplyBoundary::RunStart,
1736 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1737 conversation_digest: None,
1738 message_count: 0,
1739 sequence: 0,
1740 },
1741 session_snapshot: None,
1742 run_result: Some(RunResult {
1743 text: "done".into(),
1744 session_id: SessionId::new(),
1745 usage: Usage::default(),
1746 turns: 1,
1747 tool_calls: 0,
1748 structured_output: None,
1749 schema_warnings: None,
1750 skill_diagnostics: None,
1751 }),
1752 })
1753 }
1754 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1755 Ok(())
1756 }
1757 }
1758
1759 let adapter = RuntimeSessionAdapter::ephemeral();
1760 let sid = SessionId::new();
1761 adapter
1762 .register_session_with_executor(sid.clone(), Box::new(ResultExecutor))
1763 .await;
1764
1765 let key = IdempotencyKey::new("gate-a2");
1767 let mut input1 = make_prompt("first");
1768 if let Input::Prompt(ref mut p) = input1 {
1769 p.header.idempotency_key = Some(key.clone());
1770 }
1771 let (outcome1, handle1) = adapter
1772 .accept_input_with_completion(&sid, input1)
1773 .await
1774 .unwrap();
1775 assert!(outcome1.is_accepted());
1776 assert!(handle1.is_some(), "accepted input should have a handle");
1777
1778 let result = handle1.unwrap().wait().await;
1780 assert!(
1781 matches!(result, crate::completion::CompletionOutcome::Completed(_)),
1782 "first input should complete successfully"
1783 );
1784
1785 let mut input2 = make_prompt("duplicate");
1787 if let Input::Prompt(ref mut p) = input2 {
1788 p.header.idempotency_key = Some(key);
1789 }
1790 let (outcome2, handle2) = adapter
1791 .accept_input_with_completion(&sid, input2)
1792 .await
1793 .unwrap();
1794 assert!(
1795 outcome2.is_deduplicated(),
1796 "second input with same key should be deduplicated"
1797 );
1798 assert!(
1799 handle2.is_none(),
1800 "dedup on terminal input should return None handle"
1801 );
1802 }
1803
1804 #[tokio::test]
1807 async fn dedup_inflight_input_returns_handle_that_resolves() {
1808 use crate::identifiers::IdempotencyKey;
1809 use meerkat_core::lifecycle::core_executor::{
1810 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1811 };
1812 use meerkat_core::lifecycle::run_control::RunControlCommand;
1813 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1814 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1815 use meerkat_core::types::{RunResult, Usage};
1816
1817 struct SlowExecutor;
1818 #[async_trait::async_trait]
1819 impl CoreExecutor for SlowExecutor {
1820 async fn apply(
1821 &mut self,
1822 run_id: RunId,
1823 primitive: RunPrimitive,
1824 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1825 tokio::time::sleep(Duration::from_millis(200)).await;
1827 Ok(CoreApplyOutput {
1828 receipt: RunBoundaryReceipt {
1829 run_id,
1830 boundary: RunApplyBoundary::RunStart,
1831 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1832 conversation_digest: None,
1833 message_count: 0,
1834 sequence: 0,
1835 },
1836 session_snapshot: None,
1837 run_result: Some(RunResult {
1838 text: "slow done".into(),
1839 session_id: SessionId::new(),
1840 usage: Usage::default(),
1841 turns: 1,
1842 tool_calls: 0,
1843 structured_output: None,
1844 schema_warnings: None,
1845 skill_diagnostics: None,
1846 }),
1847 })
1848 }
1849 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1850 Ok(())
1851 }
1852 }
1853
1854 let adapter = RuntimeSessionAdapter::ephemeral();
1855 let sid = SessionId::new();
1856 adapter
1857 .register_session_with_executor(sid.clone(), Box::new(SlowExecutor))
1858 .await;
1859
1860 let key = IdempotencyKey::new("gate-a3");
1862 let mut input1 = make_prompt("original");
1863 if let Input::Prompt(ref mut p) = input1 {
1864 p.header.idempotency_key = Some(key.clone());
1865 }
1866 let (outcome1, handle1) = adapter
1867 .accept_input_with_completion(&sid, input1)
1868 .await
1869 .unwrap();
1870 assert!(outcome1.is_accepted());
1871
1872 tokio::time::sleep(Duration::from_millis(50)).await;
1874
1875 let mut input2 = make_prompt("duplicate");
1877 if let Input::Prompt(ref mut p) = input2 {
1878 p.header.idempotency_key = Some(key);
1879 }
1880 let (outcome2, handle2) = adapter
1881 .accept_input_with_completion(&sid, input2)
1882 .await
1883 .unwrap();
1884 assert!(
1885 outcome2.is_deduplicated(),
1886 "second input should be deduplicated"
1887 );
1888 assert!(
1889 handle2.is_some(),
1890 "dedup on in-flight input should return Some(handle)"
1891 );
1892
1893 let result1 = handle1.unwrap().wait().await;
1895 let result2 = handle2.unwrap().wait().await;
1896 assert!(
1897 matches!(result1, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
1898 "original handle should complete with result"
1899 );
1900 assert!(
1901 matches!(result2, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
1902 "duplicate handle should also complete with same result"
1903 );
1904 }
1905
1906 #[tokio::test]
1909 async fn completion_handle_resolves_without_result() {
1910 use meerkat_core::lifecycle::core_executor::{
1911 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1912 };
1913 use meerkat_core::lifecycle::run_control::RunControlCommand;
1914 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1915 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1916
1917 struct NoResultExecutor;
1918 #[async_trait::async_trait]
1919 impl CoreExecutor for NoResultExecutor {
1920 async fn apply(
1921 &mut self,
1922 run_id: RunId,
1923 primitive: RunPrimitive,
1924 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1925 Ok(CoreApplyOutput {
1926 receipt: RunBoundaryReceipt {
1927 run_id,
1928 boundary: RunApplyBoundary::RunStart,
1929 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1930 conversation_digest: None,
1931 message_count: 0,
1932 sequence: 0,
1933 },
1934 session_snapshot: None,
1935 run_result: None, })
1937 }
1938 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1939 Ok(())
1940 }
1941 }
1942
1943 let adapter = RuntimeSessionAdapter::ephemeral();
1944 let sid = SessionId::new();
1945 adapter
1946 .register_session_with_executor(sid.clone(), Box::new(NoResultExecutor))
1947 .await;
1948
1949 let input = make_prompt("context append");
1950 let (outcome, handle) = adapter
1951 .accept_input_with_completion(&sid, input)
1952 .await
1953 .unwrap();
1954 assert!(outcome.is_accepted());
1955
1956 let result = handle.unwrap().wait().await;
1957 assert!(
1958 matches!(
1959 result,
1960 crate::completion::CompletionOutcome::CompletedWithoutResult
1961 ),
1962 "executor returning run_result: None should resolve as CompletedWithoutResult, got {result:?}"
1963 );
1964 }
1965
1966 #[tokio::test]
1968 async fn reset_runtime_resolves_pending_waiters() {
1969 let adapter = RuntimeSessionAdapter::ephemeral();
1971 let sid = SessionId::new();
1972 adapter.register_session(sid.clone()).await;
1973
1974 let input = make_prompt("pending");
1975 let (outcome, handle) = adapter
1976 .accept_input_with_completion(&sid, input)
1977 .await
1978 .unwrap();
1979 assert!(outcome.is_accepted());
1980 assert!(handle.is_some());
1981
1982 adapter.reset_runtime(&sid).await.unwrap();
1984
1985 let result = handle.unwrap().wait().await;
1987 assert!(
1988 matches!(
1989 result,
1990 crate::completion::CompletionOutcome::RuntimeTerminated(_)
1991 ),
1992 "reset should resolve pending waiters as terminated, got {result:?}"
1993 );
1994 }
1995
1996 #[tokio::test]
1998 async fn retire_without_loop_resolves_waiters() {
1999 let adapter = RuntimeSessionAdapter::ephemeral();
2001 let sid = SessionId::new();
2002 adapter.register_session(sid.clone()).await;
2003
2004 let input = make_prompt("will be retired");
2005 let (outcome, handle) = adapter
2006 .accept_input_with_completion(&sid, input)
2007 .await
2008 .unwrap();
2009 assert!(outcome.is_accepted());
2010 assert!(handle.is_some());
2011
2012 adapter.retire_runtime(&sid).await.unwrap();
2014
2015 let result = handle.unwrap().wait().await;
2017 assert!(
2018 matches!(
2019 result,
2020 crate::completion::CompletionOutcome::RuntimeTerminated(_)
2021 ),
2022 "retire without loop should resolve pending waiters as terminated, got {result:?}"
2023 );
2024 }
2025
2026 #[tokio::test]
2028 async fn successful_execution_fires_boundary_applied() {
2029 use crate::input_state::InputLifecycleState;
2030 use meerkat_core::lifecycle::RunId;
2031 use meerkat_core::lifecycle::core_executor::{
2032 CoreApplyOutput, CoreExecutor, CoreExecutorError,
2033 };
2034 use meerkat_core::lifecycle::run_control::RunControlCommand;
2035 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
2036 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
2037
2038 struct SuccessExecutor;
2039
2040 #[async_trait::async_trait]
2041 impl CoreExecutor for SuccessExecutor {
2042 async fn apply(
2043 &mut self,
2044 run_id: RunId,
2045 primitive: RunPrimitive,
2046 ) -> Result<CoreApplyOutput, CoreExecutorError> {
2047 Ok(CoreApplyOutput {
2048 receipt: RunBoundaryReceipt {
2049 run_id,
2050 boundary: RunApplyBoundary::RunStart,
2051 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
2052 conversation_digest: None,
2053 message_count: 0,
2054 sequence: 0,
2055 },
2056 session_snapshot: None,
2057 run_result: None,
2058 })
2059 }
2060
2061 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
2062 Ok(())
2063 }
2064 }
2065
2066 let adapter = RuntimeSessionAdapter::ephemeral();
2067 let sid = SessionId::new();
2068 adapter
2069 .register_session_with_executor(sid.clone(), Box::new(SuccessExecutor))
2070 .await;
2071
2072 let input = make_prompt("hello success");
2073 let input_id = input.id().clone();
2074 adapter.accept_input(&sid, input).await.unwrap();
2075
2076 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2077
2078 let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
2080 assert_eq!(
2081 is.current_state,
2082 InputLifecycleState::Consumed,
2083 "Successful execution should consume the input"
2084 );
2085
2086 let state = adapter.runtime_state(&sid).await.unwrap();
2088 assert_eq!(state, RuntimeState::Idle);
2089 }
2090}