1use std::collections::HashMap;
14use std::future::Future;
15use std::sync::Arc;
16
17use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
18use meerkat_core::lifecycle::run_control::RunControlCommand;
19use meerkat_core::lifecycle::{InputId, RunId};
20use meerkat_core::types::SessionId;
21
22use crate::accept::AcceptOutcome;
23use crate::driver::ephemeral::EphemeralRuntimeDriver;
24use crate::driver::persistent::PersistentRuntimeDriver;
25use crate::identifiers::LogicalRuntimeId;
26use crate::input::Input;
27use crate::input_machine::InputStateMachineError;
28use crate::input_state::InputState;
29use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
30use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
31use crate::store::RuntimeStore;
32use crate::tokio;
33use crate::tokio::sync::{Mutex, RwLock, mpsc};
34use crate::traits::{ResetReport, RetireReport, RuntimeDriver, RuntimeDriverError};
35
36pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
38
39pub(crate) enum DriverEntry {
41 Ephemeral(EphemeralRuntimeDriver),
42 Persistent(PersistentRuntimeDriver),
43}
44
45impl DriverEntry {
46 pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
47 match self {
48 DriverEntry::Ephemeral(d) => d,
49 DriverEntry::Persistent(d) => d,
50 }
51 }
52
53 pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
54 match self {
55 DriverEntry::Ephemeral(d) => d,
56 DriverEntry::Persistent(d) => d,
57 }
58 }
59
60 pub(crate) fn is_idle(&self) -> bool {
62 match self {
63 DriverEntry::Ephemeral(d) => d.is_idle(),
64 DriverEntry::Persistent(d) => d.is_idle(),
65 }
66 }
67
68 pub(crate) fn can_process_queue(&self) -> bool {
70 match self {
71 DriverEntry::Ephemeral(d) => d.state_machine_ref().can_process_queue(),
72 DriverEntry::Persistent(d) => d.inner_ref().state_machine_ref().can_process_queue(),
73 }
74 }
75
76 pub(crate) fn take_wake_requested(&mut self) -> bool {
78 match self {
79 DriverEntry::Ephemeral(d) => d.take_wake_requested(),
80 DriverEntry::Persistent(d) => d.take_wake_requested(),
81 }
82 }
83
84 pub(crate) fn take_process_requested(&mut self) -> bool {
86 match self {
87 DriverEntry::Ephemeral(d) => d.take_process_requested(),
88 DriverEntry::Persistent(d) => d.take_process_requested(),
89 }
90 }
91
92 pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
94 match self {
95 DriverEntry::Ephemeral(d) => d.dequeue_next(),
96 DriverEntry::Persistent(d) => d.dequeue_next(),
97 }
98 }
99
100 pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
102 match self {
103 DriverEntry::Ephemeral(d) => d.start_run(run_id),
104 DriverEntry::Persistent(d) => d.start_run(run_id),
105 }
106 }
107
108 pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
110 match self {
111 DriverEntry::Ephemeral(d) => d.complete_run(),
112 DriverEntry::Persistent(d) => d.complete_run(),
113 }
114 }
115
116 pub(crate) fn stage_input(
118 &mut self,
119 input_id: &InputId,
120 run_id: &RunId,
121 ) -> Result<(), InputStateMachineError> {
122 match self {
123 DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
124 DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
125 }
126 }
127
128 #[allow(dead_code)]
130 pub(crate) fn apply_input(
131 &mut self,
132 input_id: &InputId,
133 run_id: &RunId,
134 ) -> Result<(), InputStateMachineError> {
135 match self {
136 DriverEntry::Ephemeral(d) => d.apply_input(input_id, run_id),
137 DriverEntry::Persistent(d) => d.apply_input(input_id, run_id),
138 }
139 }
140
141 #[allow(dead_code)]
143 pub(crate) fn consume_inputs(
144 &mut self,
145 input_ids: &[InputId],
146 run_id: &RunId,
147 ) -> Result<(), InputStateMachineError> {
148 match self {
149 DriverEntry::Ephemeral(d) => d.consume_inputs(input_ids, run_id),
150 DriverEntry::Persistent(d) => d.consume_inputs(input_ids, run_id),
151 }
152 }
153
154 #[allow(dead_code)]
156 pub(crate) fn rollback_staged(
157 &mut self,
158 input_ids: &[InputId],
159 ) -> Result<(), InputStateMachineError> {
160 match self {
161 DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
162 DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
163 }
164 }
165}
166
167pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
169
170struct RuntimeSessionEntry {
172 driver: SharedDriver,
174 completions: SharedCompletionRegistry,
176 wake_tx: Option<mpsc::Sender<()>>,
178 control_tx: Option<mpsc::Sender<RunControlCommand>>,
180 _loop_handle: Option<tokio::task::JoinHandle<()>>,
182}
183
184pub struct RuntimeSessionAdapter {
191 sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
193 mode: RuntimeMode,
195 store: Option<Arc<dyn RuntimeStore>>,
197}
198
199impl RuntimeSessionAdapter {
200 pub fn ephemeral() -> Self {
202 Self {
203 sessions: RwLock::new(HashMap::new()),
204 mode: RuntimeMode::V9Compliant,
205 store: None,
206 }
207 }
208
209 pub fn persistent(store: Arc<dyn RuntimeStore>) -> Self {
211 Self {
212 sessions: RwLock::new(HashMap::new()),
213 mode: RuntimeMode::V9Compliant,
214 store: Some(store),
215 }
216 }
217
218 fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
220 let runtime_id = LogicalRuntimeId::new(session_id.to_string());
221 match &self.store {
222 Some(store) => {
223 DriverEntry::Persistent(PersistentRuntimeDriver::new(runtime_id, store.clone()))
224 }
225 None => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
226 }
227 }
228
229 pub async fn register_session(&self, session_id: SessionId) {
232 if self.contains_session(&session_id).await {
233 return;
234 }
235 let mut entry = self.make_driver(&session_id);
236 if let Err(err) = entry.as_driver_mut().recover().await {
237 tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
238 return;
239 }
240 let session_entry = RuntimeSessionEntry {
241 driver: Arc::new(Mutex::new(entry)),
242 completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
243 wake_tx: None,
244 control_tx: None,
245 _loop_handle: None,
246 };
247 let mut sessions = self.sessions.write().await;
248 sessions.entry(session_id).or_insert(session_entry);
249 }
250
251 pub async fn register_session_with_executor(
256 &self,
257 session_id: SessionId,
258 executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
259 ) {
260 self.ensure_session_with_executor(session_id, executor)
261 .await;
262 }
263
264 pub async fn ensure_session_with_executor(
270 &self,
271 session_id: SessionId,
272 executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
273 ) {
274 let mut executor = Some(executor);
275 let upgrade = {
276 let mut sessions = self.sessions.write().await;
277 if let Some(entry) = sessions.get_mut(&session_id) {
278 if entry.wake_tx.is_some() && entry.control_tx.is_some() {
279 return;
280 }
281
282 let driver = Arc::clone(&entry.driver);
283 let (wake_tx, wake_rx) = mpsc::channel(16);
284 let (control_tx, control_rx) = mpsc::channel(16);
285 let Some(executor) = executor.take() else {
286 tracing::error!(%session_id, "executor missing while upgrading existing runtime session");
287 return;
288 };
289 let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
290 driver.clone(),
291 executor,
292 wake_rx,
293 control_rx,
294 Some(entry.completions.clone()),
295 );
296
297 entry.wake_tx = Some(wake_tx.clone());
298 entry.control_tx = Some(control_tx);
299 entry._loop_handle = Some(handle);
300 Some((driver, wake_tx))
301 } else {
302 None
303 }
304 };
305
306 if let Some((driver, wake_tx)) = upgrade {
307 let should_wake = {
308 let driver = driver.lock().await;
309 !driver.as_driver().active_input_ids().is_empty()
310 };
311 if should_wake {
312 let _ = wake_tx.try_send(());
313 }
314 return;
315 }
316
317 let mut recovered_entry = self.make_driver(&session_id);
318 if let Err(err) = recovered_entry.as_driver_mut().recover().await {
319 tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
320 return;
321 }
322
323 let driver = {
324 let mut sessions = self.sessions.write().await;
325 if let Some(entry) = sessions.get(&session_id) {
326 if entry.wake_tx.is_some() && entry.control_tx.is_some() {
327 return;
328 }
329 entry.driver.clone()
330 } else {
331 let driver = Arc::new(Mutex::new(recovered_entry));
332 sessions.insert(
333 session_id.clone(),
334 RuntimeSessionEntry {
335 driver: driver.clone(),
336 completions: Arc::new(Mutex::new(
337 crate::completion::CompletionRegistry::new(),
338 )),
339 wake_tx: None,
340 control_tx: None,
341 _loop_handle: None,
342 },
343 );
344 driver
345 }
346 };
347
348 let (wake_tx, wake_rx) = mpsc::channel(16);
349 let (control_tx, control_rx) = mpsc::channel(16);
350 let Some(executor) = executor.take() else {
351 tracing::error!(%session_id, "executor missing while registering runtime session");
352 return;
353 };
354
355 let completions = {
357 let sessions = self.sessions.read().await;
358 sessions.get(&session_id).map(|e| e.completions.clone())
359 };
360
361 let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
362 driver.clone(),
363 executor,
364 wake_rx,
365 control_rx,
366 completions,
367 );
368
369 let mut sessions = self.sessions.write().await;
370 let Some(entry) = sessions.get_mut(&session_id) else {
371 return;
372 };
373 if entry.wake_tx.is_some() && entry.control_tx.is_some() {
374 return;
375 }
376 entry.wake_tx = Some(wake_tx.clone());
377 entry.control_tx = Some(control_tx);
378 entry._loop_handle = Some(handle);
379 drop(sessions);
380
381 let should_wake = {
382 let driver = driver.lock().await;
383 !driver.as_driver().active_input_ids().is_empty()
384 };
385 if should_wake {
386 let _ = wake_tx.try_send(());
387 }
388 }
389
390 pub async fn unregister_session(&self, session_id: &SessionId) {
394 self.sessions.write().await.remove(session_id);
395 }
396
397 pub async fn contains_session(&self, session_id: &SessionId) -> bool {
399 self.sessions.read().await.contains_key(session_id)
400 }
401
402 pub async fn interrupt_current_run(
404 &self,
405 session_id: &SessionId,
406 ) -> Result<(), RuntimeDriverError> {
407 let sessions = self.sessions.read().await;
408 let entry = sessions
409 .get(session_id)
410 .ok_or(RuntimeDriverError::NotReady {
411 state: RuntimeState::Destroyed,
412 })?;
413 let Some(control_tx) = &entry.control_tx else {
414 return Err(RuntimeDriverError::NotReady {
415 state: RuntimeState::Destroyed,
416 });
417 };
418 control_tx
419 .send(RunControlCommand::CancelCurrentRun {
420 reason: "mob interrupt".to_string(),
421 })
422 .await
423 .map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
424 }
425
426 pub async fn accept_input_and_run<T, F, Fut>(
431 &self,
432 session_id: &SessionId,
433 input: Input,
434 op: F,
435 ) -> Result<T, RuntimeDriverError>
436 where
437 F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
438 Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
439 {
440 let driver = {
441 let sessions = self.sessions.read().await;
442 sessions
443 .get(session_id)
444 .ok_or(RuntimeDriverError::NotReady {
445 state: RuntimeState::Destroyed,
446 })?
447 .driver
448 .clone()
449 };
450
451 let (input_id, run_id, primitive) = {
452 let mut driver = driver.lock().await;
453 if !driver.is_idle() || !driver.as_driver().active_input_ids().is_empty() {
454 return Err(RuntimeDriverError::NotReady {
455 state: driver.as_driver().runtime_state(),
456 });
457 }
458 let outcome = driver.as_driver_mut().accept_input(input).await?;
459 let input_id = match outcome {
460 AcceptOutcome::Accepted { input_id, .. } => input_id,
461 AcceptOutcome::Deduplicated { existing_id, .. } => existing_id,
462 AcceptOutcome::Rejected { reason } => {
463 return Err(RuntimeDriverError::ValidationFailed { reason });
464 }
465 };
466
467 if !driver.is_idle() {
468 return Err(RuntimeDriverError::NotReady {
469 state: driver.as_driver().runtime_state(),
470 });
471 }
472
473 let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
474 RuntimeDriverError::Internal("accepted input was not queued for execution".into())
475 })?;
476 if dequeued_id != input_id {
477 return Err(RuntimeDriverError::NotReady {
478 state: driver.as_driver().runtime_state(),
479 });
480 }
481 let run_id = RunId::new();
482 driver.start_run(run_id.clone()).map_err(|err| {
483 RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
484 })?;
485 driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
486 RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
487 })?;
488 let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
489 (input_id, run_id, primitive)
490 };
491
492 match op(run_id.clone(), primitive.clone()).await {
493 Ok((result, output)) => {
494 let mut driver = driver.lock().await;
495 if let Err(err) = driver
496 .as_driver_mut()
497 .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
498 run_id: run_id.clone(),
499 receipt: output.receipt,
500 session_snapshot: output.session_snapshot,
501 })
502 .await
503 {
504 if let Err(unwind_err) = driver
505 .as_driver_mut()
506 .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
507 run_id,
508 error: format!("boundary commit failed: {err}"),
509 recoverable: true,
510 })
511 .await
512 {
513 return Err(RuntimeDriverError::Internal(format!(
514 "runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
515 )));
516 }
517 return Err(RuntimeDriverError::Internal(format!(
518 "runtime boundary commit failed: {err}"
519 )));
520 }
521 if let Err(err) = driver
522 .as_driver_mut()
523 .on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
524 run_id,
525 consumed_input_ids: vec![input_id],
526 })
527 .await
528 {
529 drop(driver);
530 self.unregister_session(session_id).await;
531 return Err(RuntimeDriverError::Internal(format!(
532 "failed to persist runtime completion snapshot: {err}"
533 )));
534 }
535 Ok(result)
536 }
537 Err(err) => {
538 let mut driver = driver.lock().await;
539 if let Err(run_err) = driver
540 .as_driver_mut()
541 .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
542 run_id,
543 error: err.to_string(),
544 recoverable: true,
545 })
546 .await
547 {
548 drop(driver);
549 self.unregister_session(session_id).await;
550 return Err(RuntimeDriverError::Internal(format!(
551 "failed to persist runtime failure snapshot: {run_err}"
552 )));
553 }
554 Err(err)
555 }
556 }
557 }
558
559 pub async fn accept_input_with_completion(
568 &self,
569 session_id: &SessionId,
570 input: Input,
571 ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
572 {
573 let sessions = self.sessions.read().await;
574 let entry = sessions
575 .get(session_id)
576 .ok_or(RuntimeDriverError::NotReady {
577 state: RuntimeState::Destroyed,
578 })?;
579
580 let (outcome, should_wake, should_process, handle) = {
581 let mut driver = entry.driver.lock().await;
582 let result = driver.as_driver_mut().accept_input(input).await?;
583
584 match &result {
585 AcceptOutcome::Accepted { input_id, .. } => {
586 let handle = {
587 let mut completions = entry.completions.lock().await;
588 completions.register(input_id.clone())
589 };
590 let wake = driver.take_wake_requested();
591 let process_now = driver.take_process_requested();
592 (result, wake, process_now, Some(handle))
593 }
594 AcceptOutcome::Deduplicated { existing_id, .. } => {
595 let existing_state = driver.as_driver().input_state(existing_id);
597 let is_terminal = existing_state
598 .map(|s| s.current_state.is_terminal())
599 .unwrap_or(true); if is_terminal {
602 (result, false, false, None)
604 } else {
605 let handle = {
607 let mut completions = entry.completions.lock().await;
608 completions.register(existing_id.clone())
609 };
610 (result, false, false, Some(handle))
611 }
612 }
613 AcceptOutcome::Rejected { reason } => {
614 return Err(RuntimeDriverError::ValidationFailed {
615 reason: reason.clone(),
616 });
617 }
618 }
619 };
620
621 if (should_wake || should_process)
622 && let Some(ref wake_tx) = entry.wake_tx
623 {
624 let _ = wake_tx.try_send(());
625 }
626
627 Ok((outcome, handle))
628 }
629
630 #[allow(dead_code)]
634 pub(crate) async fn completion_registry(
635 &self,
636 session_id: &SessionId,
637 ) -> Option<SharedCompletionRegistry> {
638 let sessions = self.sessions.read().await;
639 sessions.get(session_id).map(|e| e.completions.clone())
640 }
641}
642
643#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
644#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
645impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
646 fn runtime_mode(&self) -> RuntimeMode {
647 self.mode
648 }
649
650 async fn accept_input(
651 &self,
652 session_id: &SessionId,
653 input: Input,
654 ) -> Result<AcceptOutcome, RuntimeDriverError> {
655 let sessions = self.sessions.read().await;
656 let entry = sessions
657 .get(session_id)
658 .ok_or(RuntimeDriverError::NotReady {
659 state: RuntimeState::Destroyed,
660 })?;
661
662 let (outcome, should_wake, should_process) = {
664 let mut driver = entry.driver.lock().await;
665 let result = driver.as_driver_mut().accept_input(input).await?;
666 let wake = driver.take_wake_requested();
667 let process_now = driver.take_process_requested();
668 (result, wake, process_now)
669 };
670
671 if (should_wake || should_process)
673 && let Some(ref wake_tx) = entry.wake_tx
674 {
675 let _ = wake_tx.try_send(());
677 }
678
679 Ok(outcome)
680 }
681
682 async fn runtime_state(
683 &self,
684 session_id: &SessionId,
685 ) -> Result<RuntimeState, RuntimeDriverError> {
686 let sessions = self.sessions.read().await;
687 let entry = sessions
688 .get(session_id)
689 .ok_or(RuntimeDriverError::NotReady {
690 state: RuntimeState::Destroyed,
691 })?;
692 let driver = entry.driver.lock().await;
693 Ok(driver.as_driver().runtime_state())
694 }
695
696 async fn retire_runtime(
697 &self,
698 session_id: &SessionId,
699 ) -> Result<RetireReport, RuntimeDriverError> {
700 let sessions = self.sessions.read().await;
701 let entry = sessions
702 .get(session_id)
703 .ok_or(RuntimeDriverError::NotReady {
704 state: RuntimeState::Destroyed,
705 })?;
706 let mut driver = entry.driver.lock().await;
707 let report = driver.as_driver_mut().retire().await?;
708 drop(driver); if report.inputs_pending_drain > 0 {
711 if let Some(ref wake_tx) = entry.wake_tx {
714 let _ = wake_tx.send(()).await;
715 }
716 }
717
718 if entry.wake_tx.is_none() {
720 let mut completions = entry.completions.lock().await;
721 completions.resolve_all_terminated("retired without runtime loop");
722 }
723
724 Ok(report)
725 }
726
727 async fn reset_runtime(
728 &self,
729 session_id: &SessionId,
730 ) -> Result<ResetReport, RuntimeDriverError> {
731 let sessions = self.sessions.read().await;
732 let entry = sessions
733 .get(session_id)
734 .ok_or(RuntimeDriverError::NotReady {
735 state: RuntimeState::Destroyed,
736 })?;
737 let mut driver = entry.driver.lock().await;
738 if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
739 return Err(RuntimeDriverError::NotReady {
740 state: RuntimeState::Running,
741 });
742 }
743 let report = driver.as_driver_mut().reset().await?;
744
745 let mut completions = entry.completions.lock().await;
747 completions.resolve_all_terminated("runtime reset");
748
749 Ok(report)
750 }
751
752 async fn input_state(
753 &self,
754 session_id: &SessionId,
755 input_id: &InputId,
756 ) -> Result<Option<InputState>, RuntimeDriverError> {
757 let sessions = self.sessions.read().await;
758 let entry = sessions
759 .get(session_id)
760 .ok_or(RuntimeDriverError::NotReady {
761 state: RuntimeState::Destroyed,
762 })?;
763 let driver = entry.driver.lock().await;
764 Ok(driver.as_driver().input_state(input_id).cloned())
765 }
766
767 async fn list_active_inputs(
768 &self,
769 session_id: &SessionId,
770 ) -> Result<Vec<InputId>, RuntimeDriverError> {
771 let sessions = self.sessions.read().await;
772 let entry = sessions
773 .get(session_id)
774 .ok_or(RuntimeDriverError::NotReady {
775 state: RuntimeState::Destroyed,
776 })?;
777 let driver = entry.driver.lock().await;
778 Ok(driver.as_driver().active_input_ids())
779 }
780}
781
782#[cfg(test)]
783#[allow(clippy::unwrap_used)]
784mod tests {
785 use super::*;
786 use crate::input::*;
787 use crate::input_state::InputState;
788 use crate::runtime_state::RuntimeState;
789 use crate::store::{RuntimeStore, RuntimeStoreError, SessionDelta};
790 use chrono::Utc;
791 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
792 use std::time::Duration;
793
794 fn make_prompt(text: &str) -> Input {
795 Input::Prompt(PromptInput {
796 header: InputHeader {
797 id: InputId::new(),
798 timestamp: Utc::now(),
799 source: InputOrigin::Operator,
800 durability: InputDurability::Durable,
801 visibility: InputVisibility::default(),
802 idempotency_key: None,
803 supersession_key: None,
804 correlation_id: None,
805 },
806 text: text.into(),
807 blocks: None,
808 turn_metadata: None,
809 })
810 }
811
812 struct HarnessRuntimeStore {
813 inner: crate::store::InMemoryRuntimeStore,
814 fail_atomic_apply: bool,
815 fail_atomic_lifecycle_commit_after: Option<usize>,
817 atomic_lifecycle_commit_calls: AtomicUsize,
818 load_input_states_delay: Duration,
819 fail_persist_input_state_after: Option<usize>,
820 persist_input_state_calls: AtomicUsize,
821 }
822
823 impl HarnessRuntimeStore {
824 fn failing_atomic_apply() -> Self {
825 Self {
826 inner: crate::store::InMemoryRuntimeStore::new(),
827 fail_atomic_apply: true,
828 fail_atomic_lifecycle_commit_after: None,
829 atomic_lifecycle_commit_calls: AtomicUsize::new(0),
830 load_input_states_delay: Duration::ZERO,
831 fail_persist_input_state_after: None,
832 persist_input_state_calls: AtomicUsize::new(0),
833 }
834 }
835
836 fn delayed_recover(delay: Duration) -> Self {
837 Self {
838 inner: crate::store::InMemoryRuntimeStore::new(),
839 fail_atomic_apply: false,
840 fail_atomic_lifecycle_commit_after: None,
841 atomic_lifecycle_commit_calls: AtomicUsize::new(0),
842 load_input_states_delay: delay,
843 fail_persist_input_state_after: None,
844 persist_input_state_calls: AtomicUsize::new(0),
845 }
846 }
847
848 fn failing_terminal_snapshot() -> Self {
849 Self {
850 inner: crate::store::InMemoryRuntimeStore::new(),
851 fail_atomic_apply: false,
852 fail_atomic_lifecycle_commit_after: Some(1),
855 atomic_lifecycle_commit_calls: AtomicUsize::new(0),
856 load_input_states_delay: Duration::ZERO,
857 fail_persist_input_state_after: None,
858 persist_input_state_calls: AtomicUsize::new(0),
859 }
860 }
861 }
862
863 #[async_trait::async_trait]
864 impl RuntimeStore for HarnessRuntimeStore {
865 async fn commit_session_boundary(
866 &self,
867 runtime_id: &crate::identifiers::LogicalRuntimeId,
868 session_delta: SessionDelta,
869 run_id: RunId,
870 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
871 contributing_input_ids: Vec<InputId>,
872 input_updates: Vec<InputState>,
873 ) -> Result<meerkat_core::lifecycle::RunBoundaryReceipt, RuntimeStoreError> {
874 self.inner
875 .commit_session_boundary(
876 runtime_id,
877 session_delta,
878 run_id,
879 boundary,
880 contributing_input_ids,
881 input_updates,
882 )
883 .await
884 }
885
886 async fn atomic_apply(
887 &self,
888 runtime_id: &crate::identifiers::LogicalRuntimeId,
889 session_delta: Option<SessionDelta>,
890 receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
891 input_updates: Vec<InputState>,
892 session_store_key: Option<meerkat_core::types::SessionId>,
893 ) -> Result<(), RuntimeStoreError> {
894 if self.fail_atomic_apply {
895 return Err(RuntimeStoreError::WriteFailed(
896 "synthetic atomic_apply failure".to_string(),
897 ));
898 }
899 self.inner
900 .atomic_apply(
901 runtime_id,
902 session_delta,
903 receipt,
904 input_updates,
905 session_store_key,
906 )
907 .await
908 }
909
910 async fn load_input_states(
911 &self,
912 runtime_id: &crate::identifiers::LogicalRuntimeId,
913 ) -> Result<Vec<InputState>, RuntimeStoreError> {
914 if !self.load_input_states_delay.is_zero() {
915 tokio::time::sleep(self.load_input_states_delay).await;
916 }
917 self.inner.load_input_states(runtime_id).await
918 }
919
920 async fn load_boundary_receipt(
921 &self,
922 runtime_id: &crate::identifiers::LogicalRuntimeId,
923 run_id: &RunId,
924 sequence: u64,
925 ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeStoreError>
926 {
927 self.inner
928 .load_boundary_receipt(runtime_id, run_id, sequence)
929 .await
930 }
931
932 async fn load_session_snapshot(
933 &self,
934 runtime_id: &crate::identifiers::LogicalRuntimeId,
935 ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
936 self.inner.load_session_snapshot(runtime_id).await
937 }
938
939 async fn persist_input_state(
940 &self,
941 runtime_id: &crate::identifiers::LogicalRuntimeId,
942 state: &InputState,
943 ) -> Result<(), RuntimeStoreError> {
944 let call_index = self
945 .persist_input_state_calls
946 .fetch_add(1, Ordering::SeqCst);
947 if self
948 .fail_persist_input_state_after
949 .is_some_and(|fail_after| call_index >= fail_after)
950 {
951 return Err(RuntimeStoreError::WriteFailed(
952 "synthetic persist_input_state failure".to_string(),
953 ));
954 }
955 self.inner.persist_input_state(runtime_id, state).await
956 }
957
958 async fn load_input_state(
959 &self,
960 runtime_id: &crate::identifiers::LogicalRuntimeId,
961 input_id: &InputId,
962 ) -> Result<Option<InputState>, RuntimeStoreError> {
963 self.inner.load_input_state(runtime_id, input_id).await
964 }
965
966 async fn persist_runtime_state(
967 &self,
968 runtime_id: &crate::identifiers::LogicalRuntimeId,
969 state: RuntimeState,
970 ) -> Result<(), RuntimeStoreError> {
971 self.inner.persist_runtime_state(runtime_id, state).await
972 }
973
974 async fn load_runtime_state(
975 &self,
976 runtime_id: &crate::identifiers::LogicalRuntimeId,
977 ) -> Result<Option<RuntimeState>, RuntimeStoreError> {
978 self.inner.load_runtime_state(runtime_id).await
979 }
980
981 async fn atomic_lifecycle_commit(
982 &self,
983 runtime_id: &crate::identifiers::LogicalRuntimeId,
984 runtime_state: RuntimeState,
985 input_states: &[InputState],
986 ) -> Result<(), RuntimeStoreError> {
987 let call_index = self
988 .atomic_lifecycle_commit_calls
989 .fetch_add(1, Ordering::SeqCst);
990 if self
991 .fail_atomic_lifecycle_commit_after
992 .is_some_and(|fail_after| call_index >= fail_after)
993 {
994 return Err(RuntimeStoreError::WriteFailed(
995 "synthetic atomic_lifecycle_commit failure".to_string(),
996 ));
997 }
998 self.inner
999 .atomic_lifecycle_commit(runtime_id, runtime_state, input_states)
1000 .await
1001 }
1002 }
1003
1004 #[tokio::test]
1005 async fn ephemeral_adapter_accept_and_query() {
1006 let adapter = RuntimeSessionAdapter::ephemeral();
1007 let sid = SessionId::new();
1008 adapter.register_session(sid.clone()).await;
1009
1010 let input = make_prompt("hello");
1011 let outcome = adapter.accept_input(&sid, input).await.unwrap();
1012 assert!(outcome.is_accepted());
1013
1014 let state = adapter.runtime_state(&sid).await.unwrap();
1015 assert_eq!(state, RuntimeState::Idle);
1016
1017 let active = adapter.list_active_inputs(&sid).await.unwrap();
1018 assert_eq!(active.len(), 1);
1019 }
1020
1021 #[tokio::test]
1022 async fn persistent_adapter_accept() {
1023 let store = Arc::new(crate::store::InMemoryRuntimeStore::new());
1024 let adapter = RuntimeSessionAdapter::persistent(store);
1025 let sid = SessionId::new();
1026 adapter.register_session(sid.clone()).await;
1027
1028 let input = make_prompt("hello");
1029 let outcome = adapter.accept_input(&sid, input).await.unwrap();
1030 assert!(outcome.is_accepted());
1031 }
1032
1033 #[tokio::test]
1034 async fn unregistered_session_errors() {
1035 let adapter = RuntimeSessionAdapter::ephemeral();
1036 let sid = SessionId::new();
1037 let result = adapter.accept_input(&sid, make_prompt("hi")).await;
1038 assert!(result.is_err());
1039 }
1040
1041 #[tokio::test]
1042 async fn unregister_removes_driver() {
1043 let adapter = RuntimeSessionAdapter::ephemeral();
1044 let sid = SessionId::new();
1045 adapter.register_session(sid.clone()).await;
1046 adapter.unregister_session(&sid).await;
1047
1048 let result = adapter.runtime_state(&sid).await;
1049 assert!(result.is_err());
1050 }
1051
1052 #[tokio::test]
1054 async fn accept_with_executor_triggers_loop() {
1055 use meerkat_core::lifecycle::RunId;
1056 use meerkat_core::lifecycle::core_executor::{
1057 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1058 };
1059 use meerkat_core::lifecycle::run_control::RunControlCommand;
1060 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1061 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1062 use std::sync::atomic::{AtomicBool, Ordering};
1063
1064 let apply_called = Arc::new(AtomicBool::new(false));
1066 let apply_called_clone = apply_called.clone();
1067
1068 struct TestExecutor {
1069 called: Arc<AtomicBool>,
1070 }
1071
1072 #[async_trait::async_trait]
1073 impl CoreExecutor for TestExecutor {
1074 async fn apply(
1075 &mut self,
1076 run_id: RunId,
1077 primitive: RunPrimitive,
1078 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1079 self.called.store(true, Ordering::SeqCst);
1080 Ok(CoreApplyOutput {
1081 receipt: RunBoundaryReceipt {
1082 run_id,
1083 boundary: RunApplyBoundary::RunStart,
1084 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1085 conversation_digest: None,
1086 message_count: 0,
1087 sequence: 0,
1088 },
1089 session_snapshot: None,
1090 run_result: None,
1091 })
1092 }
1093
1094 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1095 Ok(())
1096 }
1097 }
1098
1099 let adapter = RuntimeSessionAdapter::ephemeral();
1100 let sid = SessionId::new();
1101 let executor = Box::new(TestExecutor {
1102 called: apply_called_clone,
1103 });
1104 adapter
1105 .register_session_with_executor(sid.clone(), executor)
1106 .await;
1107
1108 let input = make_prompt("hello from executor test");
1110 let outcome = adapter.accept_input(&sid, input).await.unwrap();
1111 assert!(outcome.is_accepted());
1112
1113 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1115
1116 assert!(
1117 apply_called.load(Ordering::SeqCst),
1118 "CoreExecutor::apply() should have been called by the RuntimeLoop"
1119 );
1120
1121 let state = adapter.runtime_state(&sid).await.unwrap();
1123 assert_eq!(state, RuntimeState::Idle);
1124
1125 let active = adapter.list_active_inputs(&sid).await.unwrap();
1127 assert!(active.is_empty(), "All inputs should be consumed");
1128 }
1129
1130 #[tokio::test]
1132 async fn failed_executor_requeues_input() {
1133 use crate::input_state::InputLifecycleState;
1134 use meerkat_core::lifecycle::core_executor::{
1135 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1136 };
1137 use meerkat_core::lifecycle::run_control::RunControlCommand;
1138 use meerkat_core::lifecycle::run_primitive::RunPrimitive;
1139 struct FailingExecutor;
1140
1141 #[async_trait::async_trait]
1142 impl CoreExecutor for FailingExecutor {
1143 async fn apply(
1144 &mut self,
1145 _run_id: RunId,
1146 _primitive: RunPrimitive,
1147 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1148 Err(CoreExecutorError::ApplyFailed {
1149 reason: "LLM error".into(),
1150 })
1151 }
1152
1153 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1154 Ok(())
1155 }
1156 }
1157
1158 let adapter = RuntimeSessionAdapter::ephemeral();
1159 let sid = SessionId::new();
1160 adapter
1161 .register_session_with_executor(sid.clone(), Box::new(FailingExecutor))
1162 .await;
1163
1164 let input = make_prompt("hello failing");
1165 let input_id = input.id().clone();
1166 adapter.accept_input(&sid, input).await.unwrap();
1167
1168 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1170
1171 let state = adapter.runtime_state(&sid).await.unwrap();
1173 assert_eq!(state, RuntimeState::Idle);
1174
1175 let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1177 assert_eq!(
1178 is.current_state,
1179 InputLifecycleState::Queued,
1180 "Failed execution should roll input back to Queued, not strand in AppliedPendingConsumption"
1181 );
1182 }
1183
1184 #[tokio::test]
1185 async fn failed_executor_continues_processing_backlog() {
1186 use crate::input_state::InputLifecycleState;
1187 use meerkat_core::lifecycle::core_executor::{
1188 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1189 };
1190 use meerkat_core::lifecycle::run_control::RunControlCommand;
1191 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1192 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1193
1194 struct FailThenSucceedExecutor {
1195 calls: Arc<AtomicUsize>,
1196 }
1197
1198 #[async_trait::async_trait]
1199 impl CoreExecutor for FailThenSucceedExecutor {
1200 async fn apply(
1201 &mut self,
1202 run_id: RunId,
1203 primitive: RunPrimitive,
1204 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1205 let call = self.calls.fetch_add(1, Ordering::SeqCst);
1206 tokio::time::sleep(Duration::from_millis(50)).await;
1207 if call == 0 {
1208 return Err(CoreExecutorError::ApplyFailed {
1209 reason: "first run fails".into(),
1210 });
1211 }
1212 Ok(CoreApplyOutput {
1213 receipt: RunBoundaryReceipt {
1214 run_id,
1215 boundary: RunApplyBoundary::RunStart,
1216 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1217 conversation_digest: None,
1218 message_count: 0,
1219 sequence: 0,
1220 },
1221 session_snapshot: None,
1222 run_result: None,
1223 })
1224 }
1225
1226 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1227 Ok(())
1228 }
1229 }
1230
1231 let adapter = RuntimeSessionAdapter::ephemeral();
1232 let sid = SessionId::new();
1233 let calls = Arc::new(AtomicUsize::new(0));
1234 adapter
1235 .register_session_with_executor(
1236 sid.clone(),
1237 Box::new(FailThenSucceedExecutor {
1238 calls: Arc::clone(&calls),
1239 }),
1240 )
1241 .await;
1242
1243 let first = make_prompt("first");
1244 let first_id = first.id().clone();
1245 let second = make_prompt("second");
1246 let second_id = second.id().clone();
1247 adapter.accept_input(&sid, first).await.unwrap();
1248 tokio::time::sleep(Duration::from_millis(10)).await;
1249 adapter.accept_input(&sid, second).await.unwrap();
1250
1251 tokio::time::sleep(Duration::from_millis(220)).await;
1252
1253 let second_state = adapter
1254 .input_state(&sid, &second_id)
1255 .await
1256 .unwrap()
1257 .unwrap();
1258 assert_eq!(second_state.current_state, InputLifecycleState::Consumed);
1259 assert_eq!(
1260 adapter.runtime_state(&sid).await.unwrap(),
1261 RuntimeState::Idle
1262 );
1263 assert!(
1264 calls.load(Ordering::SeqCst) >= 2,
1265 "the runtime loop should keep draining queued backlog after a failed run"
1266 );
1267 let first_state = adapter.input_state(&sid, &first_id).await.unwrap().unwrap();
1268 assert!(
1269 matches!(
1270 first_state.current_state,
1271 InputLifecycleState::Queued | InputLifecycleState::Consumed
1272 ),
1273 "the initially failed input should have been safely rolled back or retried after the backlog drained"
1274 );
1275 }
1276
1277 #[tokio::test]
1278 async fn ensure_session_with_executor_upgrades_registered_session() {
1279 use crate::input_state::InputLifecycleState;
1280 use meerkat_core::lifecycle::RunId;
1281 use meerkat_core::lifecycle::core_executor::{
1282 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1283 };
1284 use meerkat_core::lifecycle::run_control::RunControlCommand;
1285 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1286 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1287 use std::sync::atomic::{AtomicBool, Ordering};
1288
1289 struct SuccessExecutor {
1290 called: Arc<AtomicBool>,
1291 }
1292
1293 #[async_trait::async_trait]
1294 impl CoreExecutor for SuccessExecutor {
1295 async fn apply(
1296 &mut self,
1297 run_id: RunId,
1298 primitive: RunPrimitive,
1299 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1300 self.called.store(true, Ordering::SeqCst);
1301 Ok(CoreApplyOutput {
1302 receipt: RunBoundaryReceipt {
1303 run_id,
1304 boundary: RunApplyBoundary::RunStart,
1305 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1306 conversation_digest: None,
1307 message_count: 0,
1308 sequence: 0,
1309 },
1310 session_snapshot: None,
1311 run_result: None,
1312 })
1313 }
1314
1315 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1316 Ok(())
1317 }
1318 }
1319
1320 let apply_called = Arc::new(AtomicBool::new(false));
1321 let adapter = RuntimeSessionAdapter::ephemeral();
1322 let sid = SessionId::new();
1323 adapter.register_session(sid.clone()).await;
1324
1325 let input = make_prompt("upgrade me");
1326 let input_id = input.id().clone();
1327 let outcome = adapter.accept_input(&sid, input).await.unwrap();
1328 assert!(outcome.is_accepted());
1329
1330 adapter
1331 .ensure_session_with_executor(
1332 sid.clone(),
1333 Box::new(SuccessExecutor {
1334 called: Arc::clone(&apply_called),
1335 }),
1336 )
1337 .await;
1338
1339 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1340
1341 assert!(
1342 apply_called.load(Ordering::SeqCst),
1343 "upgrading an already-registered session should attach a live loop"
1344 );
1345
1346 let state = adapter.runtime_state(&sid).await.unwrap();
1347 assert_eq!(state, RuntimeState::Idle);
1348
1349 let active = adapter.list_active_inputs(&sid).await.unwrap();
1350 assert!(active.is_empty(), "queued work should drain after upgrade");
1351
1352 let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1353 assert_eq!(
1354 is.current_state,
1355 InputLifecycleState::Consumed,
1356 "the pre-upgrade queued input should be processed once the loop is attached"
1357 );
1358 }
1359
1360 #[tokio::test]
1361 async fn ensure_session_with_executor_upgrades_racy_registration() {
1362 use crate::input_state::InputLifecycleState;
1363 use meerkat_core::lifecycle::RunId;
1364 use meerkat_core::lifecycle::core_executor::{
1365 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1366 };
1367 use meerkat_core::lifecycle::run_control::RunControlCommand;
1368 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1369 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1370
1371 struct SuccessExecutor {
1372 called: Arc<AtomicBool>,
1373 }
1374
1375 #[async_trait::async_trait]
1376 impl CoreExecutor for SuccessExecutor {
1377 async fn apply(
1378 &mut self,
1379 run_id: RunId,
1380 primitive: RunPrimitive,
1381 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1382 self.called.store(true, Ordering::SeqCst);
1383 Ok(CoreApplyOutput {
1384 receipt: RunBoundaryReceipt {
1385 run_id,
1386 boundary: RunApplyBoundary::RunStart,
1387 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1388 conversation_digest: None,
1389 message_count: 0,
1390 sequence: 0,
1391 },
1392 session_snapshot: None,
1393 run_result: None,
1394 })
1395 }
1396
1397 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1398 Ok(())
1399 }
1400 }
1401
1402 let store = Arc::new(HarnessRuntimeStore::delayed_recover(Duration::from_millis(
1403 75,
1404 )));
1405 let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
1406 let sid = SessionId::new();
1407 let apply_called = Arc::new(AtomicBool::new(false));
1408
1409 let ensure_task = {
1410 let adapter = Arc::clone(&adapter);
1411 let sid = sid.clone();
1412 let apply_called = Arc::clone(&apply_called);
1413 tokio::spawn(async move {
1414 adapter
1415 .ensure_session_with_executor(
1416 sid,
1417 Box::new(SuccessExecutor {
1418 called: apply_called,
1419 }),
1420 )
1421 .await;
1422 })
1423 };
1424
1425 tokio::time::sleep(Duration::from_millis(10)).await;
1426 adapter.register_session(sid.clone()).await;
1427 ensure_task.await.unwrap();
1428
1429 let input = make_prompt("race upgrade");
1430 let input_id = input.id().clone();
1431 adapter.accept_input(&sid, input).await.unwrap();
1432 tokio::time::sleep(Duration::from_millis(120)).await;
1433
1434 assert!(
1435 apply_called.load(Ordering::SeqCst),
1436 "the racy registration path should still attach a live runtime loop"
1437 );
1438 let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1439 assert_eq!(state.current_state, InputLifecycleState::Consumed);
1440 }
1441
1442 #[tokio::test]
1443 async fn boundary_commit_failure_unwinds_sync_runtime_state() {
1444 use crate::input_state::InputLifecycleState;
1445 use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
1446 use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1447 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1448
1449 let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
1450 let adapter = RuntimeSessionAdapter::persistent(store);
1451 let sid = SessionId::new();
1452 adapter.register_session(sid.clone()).await;
1453
1454 let input = make_prompt("sync boundary failure");
1455 let input_id = input.id().clone();
1456 let result = adapter
1457 .accept_input_and_run(&sid, input, move |run_id, primitive| async move {
1458 Ok((
1459 (),
1460 CoreApplyOutput {
1461 receipt: RunBoundaryReceipt {
1462 run_id,
1463 boundary: RunApplyBoundary::RunStart,
1464 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1465 conversation_digest: None,
1466 message_count: 0,
1467 sequence: 0,
1468 },
1469 session_snapshot: None,
1470 run_result: None,
1471 },
1472 ))
1473 })
1474 .await;
1475 assert!(result.is_err(), "boundary commit failure should surface");
1476 let Err(err) = result else {
1477 unreachable!("asserted runtime boundary commit failure above");
1478 };
1479 assert!(
1480 err.to_string().contains("runtime boundary commit failed"),
1481 "unexpected error: {err}"
1482 );
1483 assert_eq!(
1484 adapter.runtime_state(&sid).await.unwrap(),
1485 RuntimeState::Idle
1486 );
1487 let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1488 assert_eq!(state.current_state, InputLifecycleState::Queued);
1489 }
1490
1491 #[tokio::test]
1492 async fn boundary_commit_failure_unwinds_runtime_loop_state() {
1493 use crate::input_state::InputLifecycleState;
1494 use meerkat_core::lifecycle::core_executor::{
1495 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1496 };
1497 use meerkat_core::lifecycle::run_control::RunControlCommand;
1498 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1499 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1500
1501 struct SuccessExecutor {
1502 stop_called: Arc<AtomicBool>,
1503 }
1504
1505 #[async_trait::async_trait]
1506 impl CoreExecutor for SuccessExecutor {
1507 async fn apply(
1508 &mut self,
1509 run_id: RunId,
1510 primitive: RunPrimitive,
1511 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1512 Ok(CoreApplyOutput {
1513 receipt: RunBoundaryReceipt {
1514 run_id,
1515 boundary: RunApplyBoundary::RunStart,
1516 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1517 conversation_digest: None,
1518 message_count: 0,
1519 sequence: 0,
1520 },
1521 session_snapshot: None,
1522 run_result: None,
1523 })
1524 }
1525
1526 async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1527 if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
1528 self.stop_called.store(true, Ordering::SeqCst);
1529 }
1530 Ok(())
1531 }
1532 }
1533
1534 let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
1535 let adapter = RuntimeSessionAdapter::persistent(store);
1536 let sid = SessionId::new();
1537 let stop_called = Arc::new(AtomicBool::new(false));
1538 adapter
1539 .register_session_with_executor(
1540 sid.clone(),
1541 Box::new(SuccessExecutor {
1542 stop_called: Arc::clone(&stop_called),
1543 }),
1544 )
1545 .await;
1546
1547 let input = make_prompt("loop boundary failure");
1548 let input_id = input.id().clone();
1549 adapter.accept_input(&sid, input).await.unwrap();
1550 tokio::time::sleep(Duration::from_millis(120)).await;
1551
1552 assert!(
1553 stop_called.load(Ordering::SeqCst),
1554 "boundary commit failures should stop the dead executor path"
1555 );
1556 assert_eq!(
1557 adapter.runtime_state(&sid).await.unwrap(),
1558 RuntimeState::Idle
1559 );
1560 let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1561 assert_eq!(state.current_state, InputLifecycleState::Queued);
1562 }
1563
1564 #[tokio::test]
1565 async fn terminal_snapshot_failure_unregisters_runtime_loop_session() {
1566 use meerkat_core::lifecycle::core_executor::{
1567 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1568 };
1569 use meerkat_core::lifecycle::run_control::RunControlCommand;
1570 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1571 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1572
1573 struct SuccessExecutor {
1574 adapter: Arc<RuntimeSessionAdapter>,
1575 session_id: SessionId,
1576 stop_called: Arc<AtomicBool>,
1577 }
1578
1579 #[async_trait::async_trait]
1580 impl CoreExecutor for SuccessExecutor {
1581 async fn apply(
1582 &mut self,
1583 run_id: RunId,
1584 primitive: RunPrimitive,
1585 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1586 Ok(CoreApplyOutput {
1587 receipt: RunBoundaryReceipt {
1588 run_id,
1589 boundary: RunApplyBoundary::RunStart,
1590 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1591 conversation_digest: None,
1592 message_count: 0,
1593 sequence: 0,
1594 },
1595 session_snapshot: None,
1596 run_result: None,
1597 })
1598 }
1599
1600 async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1601 if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
1602 self.stop_called.store(true, Ordering::SeqCst);
1603 self.adapter.unregister_session(&self.session_id).await;
1604 }
1605 Ok(())
1606 }
1607 }
1608
1609 let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
1610 let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
1611 let sid = SessionId::new();
1612 let stop_called = Arc::new(AtomicBool::new(false));
1613 adapter
1614 .register_session_with_executor(
1615 sid.clone(),
1616 Box::new(SuccessExecutor {
1617 adapter: Arc::clone(&adapter),
1618 session_id: sid.clone(),
1619 stop_called: Arc::clone(&stop_called),
1620 }),
1621 )
1622 .await;
1623
1624 adapter
1625 .accept_input(&sid, make_prompt("terminal snapshot failure"))
1626 .await
1627 .unwrap();
1628 tokio::time::sleep(Duration::from_millis(120)).await;
1629
1630 assert!(
1631 stop_called.load(Ordering::SeqCst),
1632 "terminal snapshot persistence failures should stop the runtime loop"
1633 );
1634 let state_result = adapter.runtime_state(&sid).await;
1635 assert!(
1636 state_result.is_err(),
1637 "stopped runtime sessions should be unregistered"
1638 );
1639 let Err(err) = state_result else {
1640 unreachable!("asserted stopped runtime unregistration above");
1641 };
1642 assert!(matches!(
1643 err,
1644 RuntimeDriverError::NotReady {
1645 state: RuntimeState::Destroyed
1646 }
1647 ));
1648 }
1649
1650 #[tokio::test]
1651 async fn terminal_snapshot_failure_unregisters_sync_runtime_session() {
1652 use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
1653 use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1654 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1655
1656 let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
1657 let adapter = RuntimeSessionAdapter::persistent(store);
1658 let sid = SessionId::new();
1659 adapter.register_session(sid.clone()).await;
1660
1661 let result = adapter
1662 .accept_input_and_run(
1663 &sid,
1664 make_prompt("sync terminal snapshot failure"),
1665 move |run_id, primitive| async move {
1666 Ok((
1667 (),
1668 CoreApplyOutput {
1669 receipt: RunBoundaryReceipt {
1670 run_id,
1671 boundary: RunApplyBoundary::RunStart,
1672 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1673 conversation_digest: None,
1674 message_count: 0,
1675 sequence: 0,
1676 },
1677 session_snapshot: None,
1678 run_result: None,
1679 },
1680 ))
1681 },
1682 )
1683 .await;
1684 assert!(
1685 result.is_err(),
1686 "terminal snapshot persistence failure should surface"
1687 );
1688 let Err(err) = result else {
1689 unreachable!("asserted terminal snapshot failure above");
1690 };
1691
1692 assert!(
1693 err.to_string().contains("terminal event persist failed")
1694 || err
1695 .to_string()
1696 .contains("failed to persist runtime completion snapshot"),
1697 "unexpected error: {err}"
1698 );
1699 let runtime_state = adapter.runtime_state(&sid).await;
1700 assert!(
1701 matches!(
1702 runtime_state,
1703 Err(RuntimeDriverError::NotReady {
1704 state: RuntimeState::Destroyed
1705 })
1706 ),
1707 "sync path should unregister the broken runtime session"
1708 );
1709 }
1710
1711 #[tokio::test]
1715 async fn dedup_terminal_input_returns_none_handle() {
1716 use crate::identifiers::IdempotencyKey;
1717 use meerkat_core::lifecycle::core_executor::{
1718 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1719 };
1720 use meerkat_core::lifecycle::run_control::RunControlCommand;
1721 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1722 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1723 use meerkat_core::types::{RunResult, Usage};
1724
1725 struct ResultExecutor;
1726 #[async_trait::async_trait]
1727 impl CoreExecutor for ResultExecutor {
1728 async fn apply(
1729 &mut self,
1730 run_id: RunId,
1731 primitive: RunPrimitive,
1732 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1733 Ok(CoreApplyOutput {
1734 receipt: RunBoundaryReceipt {
1735 run_id,
1736 boundary: RunApplyBoundary::RunStart,
1737 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1738 conversation_digest: None,
1739 message_count: 0,
1740 sequence: 0,
1741 },
1742 session_snapshot: None,
1743 run_result: Some(RunResult {
1744 text: "done".into(),
1745 session_id: SessionId::new(),
1746 usage: Usage::default(),
1747 turns: 1,
1748 tool_calls: 0,
1749 structured_output: None,
1750 schema_warnings: None,
1751 skill_diagnostics: None,
1752 }),
1753 })
1754 }
1755 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1756 Ok(())
1757 }
1758 }
1759
1760 let adapter = RuntimeSessionAdapter::ephemeral();
1761 let sid = SessionId::new();
1762 adapter
1763 .register_session_with_executor(sid.clone(), Box::new(ResultExecutor))
1764 .await;
1765
1766 let key = IdempotencyKey::new("gate-a2");
1768 let mut input1 = make_prompt("first");
1769 if let Input::Prompt(ref mut p) = input1 {
1770 p.header.idempotency_key = Some(key.clone());
1771 }
1772 let (outcome1, handle1) = adapter
1773 .accept_input_with_completion(&sid, input1)
1774 .await
1775 .unwrap();
1776 assert!(outcome1.is_accepted());
1777 assert!(handle1.is_some(), "accepted input should have a handle");
1778
1779 let result = handle1.unwrap().wait().await;
1781 assert!(
1782 matches!(result, crate::completion::CompletionOutcome::Completed(_)),
1783 "first input should complete successfully"
1784 );
1785
1786 let mut input2 = make_prompt("duplicate");
1788 if let Input::Prompt(ref mut p) = input2 {
1789 p.header.idempotency_key = Some(key);
1790 }
1791 let (outcome2, handle2) = adapter
1792 .accept_input_with_completion(&sid, input2)
1793 .await
1794 .unwrap();
1795 assert!(
1796 outcome2.is_deduplicated(),
1797 "second input with same key should be deduplicated"
1798 );
1799 assert!(
1800 handle2.is_none(),
1801 "dedup on terminal input should return None handle"
1802 );
1803 }
1804
1805 #[tokio::test]
1808 async fn dedup_inflight_input_returns_handle_that_resolves() {
1809 use crate::identifiers::IdempotencyKey;
1810 use meerkat_core::lifecycle::core_executor::{
1811 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1812 };
1813 use meerkat_core::lifecycle::run_control::RunControlCommand;
1814 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1815 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1816 use meerkat_core::types::{RunResult, Usage};
1817
1818 struct SlowExecutor;
1819 #[async_trait::async_trait]
1820 impl CoreExecutor for SlowExecutor {
1821 async fn apply(
1822 &mut self,
1823 run_id: RunId,
1824 primitive: RunPrimitive,
1825 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1826 tokio::time::sleep(Duration::from_millis(200)).await;
1828 Ok(CoreApplyOutput {
1829 receipt: RunBoundaryReceipt {
1830 run_id,
1831 boundary: RunApplyBoundary::RunStart,
1832 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1833 conversation_digest: None,
1834 message_count: 0,
1835 sequence: 0,
1836 },
1837 session_snapshot: None,
1838 run_result: Some(RunResult {
1839 text: "slow done".into(),
1840 session_id: SessionId::new(),
1841 usage: Usage::default(),
1842 turns: 1,
1843 tool_calls: 0,
1844 structured_output: None,
1845 schema_warnings: None,
1846 skill_diagnostics: None,
1847 }),
1848 })
1849 }
1850 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1851 Ok(())
1852 }
1853 }
1854
1855 let adapter = RuntimeSessionAdapter::ephemeral();
1856 let sid = SessionId::new();
1857 adapter
1858 .register_session_with_executor(sid.clone(), Box::new(SlowExecutor))
1859 .await;
1860
1861 let key = IdempotencyKey::new("gate-a3");
1863 let mut input1 = make_prompt("original");
1864 if let Input::Prompt(ref mut p) = input1 {
1865 p.header.idempotency_key = Some(key.clone());
1866 }
1867 let (outcome1, handle1) = adapter
1868 .accept_input_with_completion(&sid, input1)
1869 .await
1870 .unwrap();
1871 assert!(outcome1.is_accepted());
1872
1873 tokio::time::sleep(Duration::from_millis(50)).await;
1875
1876 let mut input2 = make_prompt("duplicate");
1878 if let Input::Prompt(ref mut p) = input2 {
1879 p.header.idempotency_key = Some(key);
1880 }
1881 let (outcome2, handle2) = adapter
1882 .accept_input_with_completion(&sid, input2)
1883 .await
1884 .unwrap();
1885 assert!(
1886 outcome2.is_deduplicated(),
1887 "second input should be deduplicated"
1888 );
1889 assert!(
1890 handle2.is_some(),
1891 "dedup on in-flight input should return Some(handle)"
1892 );
1893
1894 let result1 = handle1.unwrap().wait().await;
1896 let result2 = handle2.unwrap().wait().await;
1897 assert!(
1898 matches!(result1, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
1899 "original handle should complete with result"
1900 );
1901 assert!(
1902 matches!(result2, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
1903 "duplicate handle should also complete with same result"
1904 );
1905 }
1906
1907 #[tokio::test]
1910 async fn completion_handle_resolves_without_result() {
1911 use meerkat_core::lifecycle::core_executor::{
1912 CoreApplyOutput, CoreExecutor, CoreExecutorError,
1913 };
1914 use meerkat_core::lifecycle::run_control::RunControlCommand;
1915 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1916 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1917
1918 struct NoResultExecutor;
1919 #[async_trait::async_trait]
1920 impl CoreExecutor for NoResultExecutor {
1921 async fn apply(
1922 &mut self,
1923 run_id: RunId,
1924 primitive: RunPrimitive,
1925 ) -> Result<CoreApplyOutput, CoreExecutorError> {
1926 Ok(CoreApplyOutput {
1927 receipt: RunBoundaryReceipt {
1928 run_id,
1929 boundary: RunApplyBoundary::RunStart,
1930 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1931 conversation_digest: None,
1932 message_count: 0,
1933 sequence: 0,
1934 },
1935 session_snapshot: None,
1936 run_result: None, })
1938 }
1939 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1940 Ok(())
1941 }
1942 }
1943
1944 let adapter = RuntimeSessionAdapter::ephemeral();
1945 let sid = SessionId::new();
1946 adapter
1947 .register_session_with_executor(sid.clone(), Box::new(NoResultExecutor))
1948 .await;
1949
1950 let input = make_prompt("context append");
1951 let (outcome, handle) = adapter
1952 .accept_input_with_completion(&sid, input)
1953 .await
1954 .unwrap();
1955 assert!(outcome.is_accepted());
1956
1957 let result = handle.unwrap().wait().await;
1958 assert!(
1959 matches!(
1960 result,
1961 crate::completion::CompletionOutcome::CompletedWithoutResult
1962 ),
1963 "executor returning run_result: None should resolve as CompletedWithoutResult, got {result:?}"
1964 );
1965 }
1966
1967 #[tokio::test]
1969 async fn reset_runtime_resolves_pending_waiters() {
1970 let adapter = RuntimeSessionAdapter::ephemeral();
1972 let sid = SessionId::new();
1973 adapter.register_session(sid.clone()).await;
1974
1975 let input = make_prompt("pending");
1976 let (outcome, handle) = adapter
1977 .accept_input_with_completion(&sid, input)
1978 .await
1979 .unwrap();
1980 assert!(outcome.is_accepted());
1981 assert!(handle.is_some());
1982
1983 adapter.reset_runtime(&sid).await.unwrap();
1985
1986 let result = handle.unwrap().wait().await;
1988 assert!(
1989 matches!(
1990 result,
1991 crate::completion::CompletionOutcome::RuntimeTerminated(_)
1992 ),
1993 "reset should resolve pending waiters as terminated, got {result:?}"
1994 );
1995 }
1996
1997 #[tokio::test]
1999 async fn retire_without_loop_resolves_waiters() {
2000 let adapter = RuntimeSessionAdapter::ephemeral();
2002 let sid = SessionId::new();
2003 adapter.register_session(sid.clone()).await;
2004
2005 let input = make_prompt("will be retired");
2006 let (outcome, handle) = adapter
2007 .accept_input_with_completion(&sid, input)
2008 .await
2009 .unwrap();
2010 assert!(outcome.is_accepted());
2011 assert!(handle.is_some());
2012
2013 adapter.retire_runtime(&sid).await.unwrap();
2015
2016 let result = handle.unwrap().wait().await;
2018 assert!(
2019 matches!(
2020 result,
2021 crate::completion::CompletionOutcome::RuntimeTerminated(_)
2022 ),
2023 "retire without loop should resolve pending waiters as terminated, got {result:?}"
2024 );
2025 }
2026
2027 #[tokio::test]
2029 async fn successful_execution_fires_boundary_applied() {
2030 use crate::input_state::InputLifecycleState;
2031 use meerkat_core::lifecycle::RunId;
2032 use meerkat_core::lifecycle::core_executor::{
2033 CoreApplyOutput, CoreExecutor, CoreExecutorError,
2034 };
2035 use meerkat_core::lifecycle::run_control::RunControlCommand;
2036 use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
2037 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
2038
2039 struct SuccessExecutor;
2040
2041 #[async_trait::async_trait]
2042 impl CoreExecutor for SuccessExecutor {
2043 async fn apply(
2044 &mut self,
2045 run_id: RunId,
2046 primitive: RunPrimitive,
2047 ) -> Result<CoreApplyOutput, CoreExecutorError> {
2048 Ok(CoreApplyOutput {
2049 receipt: RunBoundaryReceipt {
2050 run_id,
2051 boundary: RunApplyBoundary::RunStart,
2052 contributing_input_ids: primitive.contributing_input_ids().to_vec(),
2053 conversation_digest: None,
2054 message_count: 0,
2055 sequence: 0,
2056 },
2057 session_snapshot: None,
2058 run_result: None,
2059 })
2060 }
2061
2062 async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
2063 Ok(())
2064 }
2065 }
2066
2067 let adapter = RuntimeSessionAdapter::ephemeral();
2068 let sid = SessionId::new();
2069 adapter
2070 .register_session_with_executor(sid.clone(), Box::new(SuccessExecutor))
2071 .await;
2072
2073 let input = make_prompt("hello success");
2074 let input_id = input.id().clone();
2075 adapter.accept_input(&sid, input).await.unwrap();
2076
2077 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2078
2079 let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
2081 assert_eq!(
2082 is.current_state,
2083 InputLifecycleState::Consumed,
2084 "Successful execution should consume the input"
2085 );
2086
2087 let state = adapter.runtime_state(&sid).await.unwrap();
2089 assert_eq!(state, RuntimeState::Idle);
2090 }
2091}