1use std::collections::{HashMap, HashSet};
14use std::future::Future;
15use std::sync::Arc;
16
17use meerkat_core::BlobStore;
18use meerkat_core::comms_drain_lifecycle_authority::{
19 CommsDrainLifecycleAuthority, CommsDrainLifecycleEffect, CommsDrainMode, DrainExitReason,
20};
21use meerkat_core::generated::{protocol_comms_drain_abort, protocol_comms_drain_spawn};
22use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
23use meerkat_core::lifecycle::run_control::RunControlCommand;
24use meerkat_core::lifecycle::{InputId, RunId};
25use meerkat_core::types::SessionId;
26
27use crate::accept::AcceptOutcome;
28use crate::driver::ephemeral::EphemeralRuntimeDriver;
29use crate::driver::persistent::PersistentRuntimeDriver;
30use crate::identifiers::LogicalRuntimeId;
31use crate::input::Input;
32use crate::input_lifecycle_authority::InputLifecycleError;
33use crate::input_state::InputState;
34use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
35use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
36use crate::store::RuntimeStore;
37use crate::tokio;
38use crate::tokio::sync::{Mutex, RwLock, mpsc};
39use crate::traits::{
40 DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
41 RuntimeControlPlaneError, RuntimeDriver, RuntimeDriverError,
42};
43
44#[derive(Debug, thiserror::Error)]
46pub enum RuntimeBindingsError {
47 #[error("session {0} not found in runtime adapter after registration")]
49 SessionNotFound(SessionId),
50}
51
52pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
54
55pub(crate) enum DriverEntry {
57 Ephemeral(EphemeralRuntimeDriver),
58 Persistent(PersistentRuntimeDriver),
59}
60
61impl DriverEntry {
62 pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
63 match self {
64 DriverEntry::Ephemeral(d) => d,
65 DriverEntry::Persistent(d) => d,
66 }
67 }
68
69 pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
70 match self {
71 DriverEntry::Ephemeral(d) => d,
72 DriverEntry::Persistent(d) => d,
73 }
74 }
75
76 pub(crate) fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
78 match self {
79 DriverEntry::Ephemeral(d) => d.set_silent_comms_intents(intents),
80 DriverEntry::Persistent(d) => d.set_silent_comms_intents(intents),
81 }
82 }
83
84 pub(crate) fn is_idle_or_attached(&self) -> bool {
86 match self {
87 DriverEntry::Ephemeral(d) => d.is_idle_or_attached(),
88 DriverEntry::Persistent(d) => d.is_idle_or_attached(),
89 }
90 }
91
92 pub(crate) fn is_quiescent_for_detached_wake(&self) -> bool {
99 self.is_idle_or_attached() && self.as_driver().active_input_ids().is_empty()
100 }
101
102 pub(crate) fn attach(&mut self) -> Result<(), RuntimeStateTransitionError> {
104 match self {
105 DriverEntry::Ephemeral(d) => d.attach(),
106 DriverEntry::Persistent(d) => d.attach(),
107 }
108 }
109
110 pub(crate) fn detach(
112 &mut self,
113 ) -> Result<Option<crate::runtime_state::RuntimeState>, RuntimeStateTransitionError> {
114 match self {
115 DriverEntry::Ephemeral(d) => d.detach(),
116 DriverEntry::Persistent(d) => d.detach(),
117 }
118 }
119
120 pub(crate) fn can_process_queue(&self) -> bool {
122 match self {
123 DriverEntry::Ephemeral(d) => d.control().can_process_queue(),
124 DriverEntry::Persistent(d) => d.inner_ref().control().can_process_queue(),
125 }
126 }
127
128 pub(crate) fn take_post_admission_signal(
130 &mut self,
131 ) -> crate::driver::ephemeral::PostAdmissionSignal {
132 match self {
133 DriverEntry::Ephemeral(d) => d.take_post_admission_signal(),
134 DriverEntry::Persistent(d) => d.take_post_admission_signal(),
135 }
136 }
137
138 pub(crate) fn take_wake_requested(&mut self) -> bool {
140 match self {
141 DriverEntry::Ephemeral(d) => d.take_wake_requested(),
142 DriverEntry::Persistent(d) => d.take_wake_requested(),
143 }
144 }
145
146 pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
148 match self {
149 DriverEntry::Ephemeral(d) => d.dequeue_next(),
150 DriverEntry::Persistent(d) => d.dequeue_next(),
151 }
152 }
153
154 pub(crate) fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
156 match self {
157 DriverEntry::Ephemeral(d) => d.dequeue_by_id(input_id),
158 DriverEntry::Persistent(d) => d.dequeue_by_id(input_id),
159 }
160 }
161
162 pub(crate) fn ingress(&self) -> &crate::runtime_ingress_authority::RuntimeIngressAuthority {
164 match self {
165 DriverEntry::Ephemeral(d) => d.ingress(),
166 DriverEntry::Persistent(d) => d.inner_ref().ingress(),
167 }
168 }
169
170 pub(crate) fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
171 match self {
172 DriverEntry::Ephemeral(d) => d.has_queued_input_outside(excluded),
173 DriverEntry::Persistent(d) => d.has_queued_input_outside(excluded),
174 }
175 }
176
177 pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
179 match self {
180 DriverEntry::Ephemeral(d) => d.start_run(run_id),
181 DriverEntry::Persistent(d) => d.start_run(run_id),
182 }
183 }
184
185 pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
187 match self {
188 DriverEntry::Ephemeral(d) => d.complete_run(),
189 DriverEntry::Persistent(d) => d.complete_run(),
190 }
191 }
192
193 pub(crate) fn stage_input(
195 &mut self,
196 input_id: &InputId,
197 run_id: &RunId,
198 ) -> Result<(), InputLifecycleError> {
199 match self {
200 DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
201 DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
202 }
203 }
204
205 pub(crate) fn stage_batch(
207 &mut self,
208 input_ids: &[InputId],
209 run_id: &RunId,
210 ) -> Result<(), InputLifecycleError> {
211 match self {
212 DriverEntry::Ephemeral(d) => d.stage_batch(input_ids, run_id),
213 DriverEntry::Persistent(d) => d.stage_batch(input_ids, run_id),
214 }
215 }
216
217 pub(crate) fn rollback_staged(
219 &mut self,
220 input_ids: &[InputId],
221 ) -> Result<(), InputLifecycleError> {
222 match self {
223 DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
224 DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
225 }
226 }
227
228 pub(crate) async fn abandon_pending_inputs(
229 &mut self,
230 reason: crate::input_state::InputAbandonReason,
231 ) -> Result<usize, RuntimeDriverError> {
232 match self {
233 DriverEntry::Ephemeral(d) => Ok(d.abandon_pending_inputs(reason)),
234 DriverEntry::Persistent(d) => d.abandon_pending_inputs(reason).await,
235 }
236 }
237}
238
239pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
241
242struct RuntimeSessionEntry {
244 driver: SharedDriver,
246 ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
248 epoch_id: meerkat_core::RuntimeEpochId,
250 cursor_state: Arc<meerkat_core::EpochCursorState>,
252 completions: SharedCompletionRegistry,
254 phase: RegistrationPhase,
257 detached_wake: Option<Arc<crate::detached_wake::DetachedWakeState>>,
260 keep_alive: bool,
262 comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
264}
265
266struct RuntimeLoopAttachment {
271 wake_tx: mpsc::Sender<()>,
272 control_tx: mpsc::Sender<RunControlCommand>,
273 _loop_handle: tokio::task::JoinHandle<()>,
274}
275
276enum RegistrationPhase {
282 Queuing,
285 Attaching,
289 Active(RuntimeLoopAttachment),
292}
293
294impl RuntimeSessionEntry {
295 fn attachment_is_live(&self) -> bool {
296 match &self.phase {
297 RegistrationPhase::Active(attachment) => {
298 !attachment.wake_tx.is_closed() && !attachment.control_tx.is_closed()
299 }
300 RegistrationPhase::Queuing | RegistrationPhase::Attaching => false,
301 }
302 }
303
304 fn has_attachment_or_attaching(&self) -> bool {
309 matches!(self.phase, RegistrationPhase::Attaching) || self.attachment_is_live()
310 }
311
312 fn has_live_attachment(&self) -> bool {
316 self.attachment_is_live()
317 }
318
319 fn attach_runtime_loop(
320 &mut self,
321 wake_tx: mpsc::Sender<()>,
322 control_tx: mpsc::Sender<RunControlCommand>,
323 loop_handle: tokio::task::JoinHandle<()>,
324 ) {
325 self.phase = RegistrationPhase::Active(RuntimeLoopAttachment {
326 wake_tx,
327 control_tx,
328 _loop_handle: loop_handle,
329 });
330 }
331
332 fn clear_dead_attachment(&mut self) -> bool {
333 if matches!(self.phase, RegistrationPhase::Active(_)) && !self.attachment_is_live() {
334 self.phase = RegistrationPhase::Queuing;
337 self.detached_wake = None;
340 return true;
341 }
342 false
343 }
344
345 fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
346 match &self.phase {
347 RegistrationPhase::Active(attachment)
348 if !attachment.wake_tx.is_closed() && !attachment.control_tx.is_closed() =>
349 {
350 Some(attachment.wake_tx.clone())
351 }
352 _ => None,
353 }
354 }
355
356 fn control_sender(&self) -> Option<mpsc::Sender<RunControlCommand>> {
357 match &self.phase {
358 RegistrationPhase::Active(attachment)
359 if !attachment.wake_tx.is_closed() && !attachment.control_tx.is_closed() =>
360 {
361 Some(attachment.control_tx.clone())
362 }
363 _ => None,
364 }
365 }
366}
367
368struct CommsDrainSlot {
373 authority: CommsDrainLifecycleAuthority,
374 handle: Option<tokio::task::JoinHandle<()>>,
375}
376
377impl CommsDrainSlot {
378 fn new() -> Self {
379 Self {
380 authority: CommsDrainLifecycleAuthority::new(),
381 handle: None,
382 }
383 }
384}
385
386fn apply_runtime_drain_effects(slot: &mut CommsDrainSlot, effects: &[CommsDrainLifecycleEffect]) {
387 for effect in effects {
388 if let CommsDrainLifecycleEffect::AbortDrainTask = effect
389 && let Some(handle) = slot.handle.take()
390 {
391 handle.abort();
392 }
393 }
394}
395
396fn abort_slot(slot: &mut CommsDrainSlot) {
397 match protocol_comms_drain_abort::execute_stop_requested(&mut slot.authority) {
398 Ok(result) => {
399 apply_runtime_drain_effects(slot, &result.effects);
400 let _ = result.obligation;
403 }
404 Err(_) => {
405 if let Some(handle) = slot.handle.take() {
407 handle.abort();
408 }
409 }
410 }
411}
412
413fn desired_peer_ingress_mode(
414 runtime_state: RuntimeState,
415 comms_enabled: bool,
416 keep_alive: bool,
417) -> Option<CommsDrainMode> {
418 if !comms_enabled {
419 return None;
420 }
421
422 match runtime_state {
423 RuntimeState::Attached | RuntimeState::Running | RuntimeState::Recovering => {
424 Some(CommsDrainMode::AttachedSession)
425 }
426 RuntimeState::Idle if keep_alive => Some(CommsDrainMode::PersistentHost),
427 RuntimeState::Initializing
428 | RuntimeState::Idle
429 | RuntimeState::Retired
430 | RuntimeState::Stopped
431 | RuntimeState::Destroyed => None,
432 }
433}
434
435pub struct RuntimeSessionAdapter {
442 sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
444 mode: RuntimeMode,
446 store: Option<Arc<dyn RuntimeStore>>,
448 blob_store: Option<Arc<dyn BlobStore>>,
450 comms_drain_slots: RwLock<HashMap<SessionId, CommsDrainSlot>>,
452}
453
454impl RuntimeSessionAdapter {
455 pub fn ephemeral() -> Self {
457 Self {
458 sessions: RwLock::new(HashMap::new()),
459 mode: RuntimeMode::V9Compliant,
460 store: None,
461 blob_store: None,
462 comms_drain_slots: RwLock::new(HashMap::new()),
463 }
464 }
465
466 pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
468 Self {
469 sessions: RwLock::new(HashMap::new()),
470 mode: RuntimeMode::V9Compliant,
471 store: Some(store),
472 blob_store: Some(blob_store),
473 comms_drain_slots: RwLock::new(HashMap::new()),
474 }
475 }
476
477 pub fn persistent_without_blobs(store: Arc<dyn RuntimeStore>) -> Self {
484 Self {
485 sessions: RwLock::new(HashMap::new()),
486 mode: RuntimeMode::V9Compliant,
487 store: Some(store),
488 blob_store: None,
489 comms_drain_slots: RwLock::new(HashMap::new()),
490 }
491 }
492
493 fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
495 let runtime_id = LogicalRuntimeId::new(session_id.to_string());
496 match (&self.store, &self.blob_store) {
497 (Some(store), Some(blob_store)) => DriverEntry::Persistent(
498 PersistentRuntimeDriver::new(runtime_id, store.clone(), blob_store.clone()),
499 ),
500 (Some(_store), None) => {
501 tracing::warn!(
502 %session_id,
503 "persistent runtime store present but blob store missing; \
504 falling back to ephemeral driver"
505 );
506 DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id))
507 }
508 _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
509 }
510 }
511
512 async fn recover_or_create_ops_state(
519 &self,
520 session_id: &SessionId,
521 ) -> (
522 Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
523 meerkat_core::RuntimeEpochId,
524 Arc<meerkat_core::EpochCursorState>,
525 ) {
526 if let Some(ref store) = self.store {
527 let runtime_id = crate::identifiers::LogicalRuntimeId::new(session_id.to_string());
528 match store.load_ops_lifecycle(&runtime_id).await {
529 Ok(Some(snapshot)) => {
530 let recovered_epoch = snapshot.epoch_id.clone();
531 let recovered_cursors = meerkat_core::EpochCursorState::from_recovered(
532 snapshot.cursors.agent_applied_cursor,
533 snapshot.cursors.runtime_observed_seq,
534 snapshot.cursors.runtime_last_injected_seq,
535 );
536 let recovered_ops_count = snapshot.completion_entries.len();
537 let registry =
538 crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
539 tracing::info!(
540 %session_id,
541 epoch_id = %recovered_epoch,
542 recovered_ops = recovered_ops_count,
543 "ops lifecycle recovered from durable store (same epoch)"
544 );
545 (
546 Arc::new(registry),
547 recovered_epoch,
548 Arc::new(recovered_cursors),
549 )
550 }
551 Ok(None) => {
552 tracing::debug!(%session_id, "no persisted ops lifecycle; fresh epoch");
553 (
554 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
555 meerkat_core::RuntimeEpochId::new(),
556 Arc::new(meerkat_core::EpochCursorState::new()),
557 )
558 }
559 Err(err) => {
560 tracing::warn!(
561 %session_id,
562 error = %err,
563 "failed to load ops lifecycle; epoch rotated"
564 );
565 (
566 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
567 meerkat_core::RuntimeEpochId::new(),
568 Arc::new(meerkat_core::EpochCursorState::new()),
569 )
570 }
571 }
572 } else {
573 (
574 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
575 meerkat_core::RuntimeEpochId::new(),
576 Arc::new(meerkat_core::EpochCursorState::new()),
577 )
578 }
579 }
580
581 pub async fn register_session(&self, session_id: SessionId) {
584 {
585 let mut sessions = self.sessions.write().await;
586 if let Some(existing) = sessions.get_mut(&session_id) {
587 existing.clear_dead_attachment();
588 return;
589 }
590 }
591
592 let mut entry = self.make_driver(&session_id);
593 if let Err(err) = entry.as_driver_mut().recover().await {
594 tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
595 return;
596 }
597
598 let (ops_lifecycle, epoch_id, cursor_state) =
599 self.recover_or_create_ops_state(&session_id).await;
600
601 let session_entry = RuntimeSessionEntry {
602 driver: Arc::new(Mutex::new(entry)),
603 ops_lifecycle,
604 epoch_id,
605 cursor_state,
606 completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
607 phase: RegistrationPhase::Queuing,
608 detached_wake: None,
609 keep_alive: false,
610 comms_runtime: None,
611 };
612 let mut sessions = self.sessions.write().await;
613 if let Some(existing) = sessions.get_mut(&session_id) {
614 existing.clear_dead_attachment();
615 } else {
616 sessions.insert(session_id, session_entry);
617 }
618 }
619
620 pub async fn set_session_silent_intents(&self, session_id: &SessionId, intents: Vec<String>) {
625 let sessions = self.sessions.read().await;
626 if let Some(entry) = sessions.get(session_id) {
627 let mut driver = entry.driver.lock().await;
628 driver.set_silent_comms_intents(intents);
629 }
630 }
631
632 pub async fn register_session_with_executor(
637 &self,
638 session_id: SessionId,
639 executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
640 ) {
641 self.ensure_session_with_executor(session_id, executor)
642 .await;
643 }
644
645 pub async fn ensure_session_with_executor(
651 &self,
652 session_id: SessionId,
653 executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
654 ) {
655 let existing = {
656 let mut sessions = self.sessions.write().await;
657 sessions.get_mut(&session_id).map(|entry| {
658 entry.clear_dead_attachment();
659 let occupied = entry.has_attachment_or_attaching();
660 if !occupied {
661 entry.phase = RegistrationPhase::Attaching;
665 }
666 (
667 occupied,
668 entry.driver.clone(),
669 entry.completions.clone(),
670 entry.ops_lifecycle.clone(),
671 )
672 })
673 };
674
675 let (driver, completions, ops_lifecycle) =
676 if let Some((has_attachment, driver, completions, ops_lifecycle)) = existing {
677 if has_attachment {
678 return;
679 }
680 (driver, completions, ops_lifecycle)
681 } else {
682 let mut recovered_entry = self.make_driver(&session_id);
683 if let Err(err) = recovered_entry.as_driver_mut().recover().await {
684 tracing::error!(
685 %session_id,
686 error = %err,
687 "failed to recover runtime driver during registration"
688 );
689 return;
690 }
691
692 let (recovered_ops, recovered_epoch, recovered_cursors) =
695 self.recover_or_create_ops_state(&session_id).await;
696
697 let mut sessions = self.sessions.write().await;
700 if let Some(entry) = sessions.get_mut(&session_id) {
701 entry.clear_dead_attachment();
702 if entry.has_attachment_or_attaching() {
703 return;
704 }
705 entry.phase = RegistrationPhase::Attaching;
706 (
707 entry.driver.clone(),
708 entry.completions.clone(),
709 entry.ops_lifecycle.clone(),
710 )
711 } else {
712 let driver = Arc::new(Mutex::new(recovered_entry));
713 let completions =
714 Arc::new(Mutex::new(crate::completion::CompletionRegistry::new()));
715 sessions.insert(
716 session_id.clone(),
717 RuntimeSessionEntry {
718 driver: driver.clone(),
719 ops_lifecycle: recovered_ops.clone(),
720 epoch_id: recovered_epoch,
721 cursor_state: recovered_cursors,
722 completions: completions.clone(),
723 phase: RegistrationPhase::Queuing,
724 detached_wake: None,
725 keep_alive: false,
726 comms_runtime: None,
727 },
728 );
729 (driver, completions, recovered_ops)
730 }
731 };
732
733 let should_wake = {
734 let mut driver_guard = driver.lock().await;
735 if let Err(error) = driver_guard.attach() {
736 let repaired = if error.from == RuntimeState::Attached
737 && error.to == RuntimeState::Attached
738 {
739 tracing::warn!(
740 %session_id,
741 error = %error,
742 "runtime driver remained attached without a live published loop; detaching and retrying attachment"
743 );
744 match driver_guard.detach() {
745 Ok(_) => match driver_guard.attach() {
746 Ok(()) => true,
747 Err(retry_error) => {
748 tracing::warn!(
749 %session_id,
750 error = %retry_error,
751 "failed to re-attach runtime driver after repairing stale attachment state"
752 );
753 false
754 }
755 },
756 Err(detach_error) => {
757 tracing::warn!(
758 %session_id,
759 error = %detach_error,
760 "failed to detach stale attached runtime driver before retrying attachment"
761 );
762 false
763 }
764 }
765 } else {
766 false
767 };
768 if !repaired {
769 tracing::warn!(
770 %session_id,
771 error = %error,
772 "failed to attach runtime driver before publishing loop attachment"
773 );
774 self.revert_attaching(&session_id).await;
775 return;
776 }
777 }
778 !driver_guard.as_driver().active_input_ids().is_empty()
779 };
780
781 let detached_wake_state = Arc::new(crate::detached_wake::DetachedWakeState::new());
786 ops_lifecycle.set_detached_wake(Arc::clone(&detached_wake_state));
787
788 if let Some(ref store) = self.store {
790 let (persist_tx, mut persist_rx) =
791 crate::tokio::sync::mpsc::channel::<crate::ops_lifecycle::PersistedOpsSnapshot>(16);
792 let entry_epoch_id = {
793 let sessions = self.sessions.read().await;
794 sessions
795 .get(&session_id)
796 .map(|e| e.epoch_id.clone())
797 .unwrap_or_else(meerkat_core::RuntimeEpochId::new)
798 };
799 let entry_cursor = {
800 let sessions = self.sessions.read().await;
801 sessions
802 .get(&session_id)
803 .map(|e| Arc::clone(&e.cursor_state))
804 .unwrap_or_else(|| Arc::new(meerkat_core::EpochCursorState::new()))
805 };
806 ops_lifecycle.set_persistence_channel(persist_tx, entry_epoch_id, entry_cursor);
807
808 let store_clone = Arc::clone(store);
810 let runtime_id = crate::identifiers::LogicalRuntimeId::new(session_id.to_string());
811 crate::tokio::spawn(async move {
812 while let Some(snapshot) = persist_rx.recv().await {
813 if let Err(e) = store_clone
814 .persist_ops_lifecycle(&runtime_id, &snapshot)
815 .await
816 {
817 tracing::warn!(
818 error = %e,
819 "failed to persist ops lifecycle snapshot"
820 );
821 }
822 }
823 });
824 }
825
826 let completion_feed = ops_lifecycle.completion_feed_handle();
828
829 let (wake_tx, wake_rx) = mpsc::channel(16);
830 let (control_tx, control_rx) = mpsc::channel(16);
831 let entry_cursor_state = {
832 let sessions = self.sessions.read().await;
833 sessions
834 .get(&session_id)
835 .map(|e| Arc::clone(&e.cursor_state))
836 };
837 let mut pending_loop_handle =
838 Some(crate::runtime_loop::spawn_runtime_loop_with_completions(
839 driver.clone(),
840 executor,
841 wake_rx,
842 control_rx,
843 Some(completions.clone()),
844 Some(Arc::clone(&detached_wake_state)),
845 Some(completion_feed),
846 entry_cursor_state,
847 ));
848
849 let (published, detach_after_abort) = {
850 let mut sessions = self.sessions.write().await;
851 match sessions.get_mut(&session_id) {
852 None => (false, true),
853 Some(entry) => {
854 entry.clear_dead_attachment();
855 if entry.has_live_attachment() {
856 (false, false)
857 } else if !Arc::ptr_eq(&entry.driver, &driver)
858 || !Arc::ptr_eq(&entry.completions, &completions)
859 {
860 tracing::warn!(
861 %session_id,
862 "runtime session entry changed while wiring executor; aborting stale loop attachment"
863 );
864 (false, true)
865 } else {
866 match pending_loop_handle.take() {
867 Some(loop_handle) => {
868 entry.attach_runtime_loop(wake_tx.clone(), control_tx, loop_handle);
869 entry.detached_wake = Some(Arc::clone(&detached_wake_state));
870 (true, false)
871 }
872 None => {
873 tracing::error!(
874 %session_id,
875 "runtime loop handle missing during attachment publish"
876 );
877 (false, true)
878 }
879 }
880 }
881 }
882 }
883 };
884
885 if !published {
886 if let Some(loop_handle) = pending_loop_handle.take() {
887 loop_handle.abort();
888 }
889 if detach_after_abort {
890 let mut driver_guard = driver.lock().await;
891 let _ = driver_guard.detach();
892 }
893 self.revert_attaching(&session_id).await;
894 return;
895 }
896
897 if should_wake {
898 let _ = wake_tx.try_send(());
899 }
900 }
901
902 async fn revert_attaching(&self, session_id: &SessionId) {
906 let mut sessions = self.sessions.write().await;
907 if let Some(entry) = sessions.get_mut(session_id)
908 && matches!(entry.phase, RegistrationPhase::Attaching)
909 {
910 entry.phase = RegistrationPhase::Queuing;
911 }
912 }
913
914 pub async fn unregister_session(&self, session_id: &SessionId) {
919 let entry = {
920 let mut sessions = self.sessions.write().await;
921 let mut slots = self.comms_drain_slots.write().await;
922 if let Some(mut slot) = slots.remove(session_id) {
925 abort_slot(&mut slot);
926 }
927 sessions.remove(session_id)
928 };
929
930 if let Some(entry) = entry {
931 let mut driver = entry.driver.lock().await;
932 let _ = driver.detach(); drop(driver);
934
935 let mut completions = entry.completions.lock().await;
936 completions.resolve_all_terminated("runtime session unregistered");
937 }
938 }
939
940 pub async fn contains_session(&self, session_id: &SessionId) -> bool {
942 self.sessions.read().await.contains_key(session_id)
943 }
944
945 pub async fn session_has_executor(&self, session_id: &SessionId) -> bool {
949 let sessions = self.sessions.read().await;
950 sessions
951 .get(session_id)
952 .map(RuntimeSessionEntry::has_attachment_or_attaching)
953 .unwrap_or(false)
954 }
955
956 pub async fn session_has_comms(&self, session_id: &SessionId) -> bool {
962 let sessions = self.sessions.read().await;
963 sessions
964 .get(session_id)
965 .map(|entry| entry.comms_runtime.is_some())
966 .unwrap_or(false)
967 }
968
969 pub async fn interrupt_current_run(
971 &self,
972 session_id: &SessionId,
973 ) -> Result<(), RuntimeDriverError> {
974 let (driver, control_tx) = {
975 let sessions = self.sessions.read().await;
976 let entry = sessions
977 .get(session_id)
978 .ok_or(RuntimeDriverError::NotReady {
979 state: RuntimeState::Destroyed,
980 })?;
981 (entry.driver.clone(), entry.control_sender())
982 };
983
984 let Some(control_tx) = control_tx else {
985 let state = {
986 let driver = driver.lock().await;
987 driver.as_driver().runtime_state()
988 };
989 return Err(RuntimeDriverError::NotReady { state });
990 };
991 control_tx
992 .send(RunControlCommand::CancelCurrentRun {
993 reason: "mob interrupt".to_string(),
994 })
995 .await
996 .map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
997 }
998
999 pub async fn stop_runtime_executor(
1003 &self,
1004 session_id: &SessionId,
1005 command: RunControlCommand,
1006 ) -> Result<(), RuntimeDriverError> {
1007 let (driver, completions, control_tx) = {
1008 let sessions = self.sessions.read().await;
1009 let entry = sessions
1010 .get(session_id)
1011 .ok_or(RuntimeDriverError::NotReady {
1012 state: RuntimeState::Destroyed,
1013 })?;
1014 (
1015 entry.driver.clone(),
1016 entry.completions.clone(),
1017 entry.control_sender(),
1018 )
1019 };
1020
1021 if let Some(control_tx) = control_tx
1022 && control_tx.send(command.clone()).await.is_ok()
1023 {
1024 return Ok(());
1025 }
1026
1027 if matches!(command, RunControlCommand::StopRuntimeExecutor { .. }) {
1028 let mut driver = driver.lock().await;
1029 driver
1030 .as_driver_mut()
1031 .on_runtime_control(crate::traits::RuntimeControlCommand::Stop)
1032 .await?;
1033 drop(driver);
1034 let mut completions = completions.lock().await;
1035 completions.resolve_all_terminated("runtime stopped");
1036 drop(completions);
1037
1038 let mut sessions = self.sessions.write().await;
1041 if let Some(entry) = sessions.get_mut(session_id) {
1042 entry.clear_dead_attachment();
1043 }
1044 Ok(())
1045 } else {
1046 Err(RuntimeDriverError::Internal(
1047 "failed to send stop: runtime loop is unavailable".into(),
1048 ))
1049 }
1050 }
1051
1052 pub async fn accept_input_and_run<T, F, Fut>(
1057 &self,
1058 session_id: &SessionId,
1059 input: Input,
1060 op: F,
1061 ) -> Result<T, RuntimeDriverError>
1062 where
1063 F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
1064 Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
1065 {
1066 let driver = {
1067 let sessions = self.sessions.read().await;
1068 sessions
1069 .get(session_id)
1070 .ok_or(RuntimeDriverError::NotReady {
1071 state: RuntimeState::Destroyed,
1072 })?
1073 .driver
1074 .clone()
1075 };
1076
1077 let (input_id, run_id, primitive) = {
1078 let mut driver = driver.lock().await;
1079 if !driver.is_idle_or_attached() {
1080 return Err(RuntimeDriverError::NotReady {
1081 state: driver.as_driver().runtime_state(),
1082 });
1083 }
1084
1085 let active_input_ids = driver.as_driver().active_input_ids();
1086 if !active_input_ids.is_empty() {
1087 let duplicate_active_input =
1088 input.header().idempotency_key.as_ref().and_then(|key| {
1089 active_input_ids.iter().find(|active_id| {
1090 driver
1091 .as_driver()
1092 .input_state(active_id)
1093 .and_then(|state| state.idempotency_key.as_ref())
1094 == Some(key)
1095 })
1096 });
1097 if let Some(existing_id) = duplicate_active_input {
1098 return Err(RuntimeDriverError::ValidationFailed {
1099 reason: format!(
1100 "accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
1101 ),
1102 });
1103 }
1104 return Err(RuntimeDriverError::NotReady {
1105 state: driver.as_driver().runtime_state(),
1106 });
1107 }
1108
1109 let outcome = driver.as_driver_mut().accept_input(input).await?;
1110 let input_id = match outcome {
1111 AcceptOutcome::Accepted { input_id, .. } => input_id,
1112 AcceptOutcome::Deduplicated { existing_id, .. } => {
1113 return Err(RuntimeDriverError::ValidationFailed {
1114 reason: format!(
1115 "accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
1116 ),
1117 });
1118 }
1119 AcceptOutcome::Rejected { reason } => {
1120 return Err(RuntimeDriverError::ValidationFailed {
1121 reason: reason.to_string(),
1122 });
1123 }
1124 };
1125
1126 if !driver.is_idle_or_attached() {
1127 return Err(RuntimeDriverError::NotReady {
1128 state: driver.as_driver().runtime_state(),
1129 });
1130 }
1131
1132 let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
1133 RuntimeDriverError::Internal("accepted input was not queued for execution".into())
1134 })?;
1135 if dequeued_id != input_id {
1136 return Err(RuntimeDriverError::NotReady {
1137 state: driver.as_driver().runtime_state(),
1138 });
1139 }
1140 let run_id = RunId::new();
1141 driver.start_run(run_id.clone()).map_err(|err| {
1142 RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
1143 })?;
1144 driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
1145 RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
1146 })?;
1147 let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
1148 (input_id, run_id, primitive)
1149 };
1150
1151 match op(run_id.clone(), primitive.clone()).await {
1152 Ok((result, output)) => {
1153 let mut driver = driver.lock().await;
1154 if let Err(err) = driver
1155 .as_driver_mut()
1156 .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
1157 run_id: run_id.clone(),
1158 receipt: output.receipt,
1159 session_snapshot: output.session_snapshot,
1160 })
1161 .await
1162 {
1163 if let Err(unwind_err) = driver
1164 .as_driver_mut()
1165 .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
1166 run_id,
1167 error: format!("boundary commit failed: {err}"),
1168 recoverable: true,
1169 })
1170 .await
1171 {
1172 return Err(RuntimeDriverError::Internal(format!(
1173 "runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
1174 )));
1175 }
1176 return Err(RuntimeDriverError::Internal(format!(
1177 "runtime boundary commit failed: {err}"
1178 )));
1179 }
1180 if let Err(err) = driver
1181 .as_driver_mut()
1182 .on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
1183 run_id,
1184 consumed_input_ids: vec![input_id],
1185 })
1186 .await
1187 {
1188 drop(driver);
1189 self.unregister_session(session_id).await;
1190 return Err(RuntimeDriverError::Internal(format!(
1191 "failed to persist runtime completion snapshot: {err}"
1192 )));
1193 }
1194 Ok(result)
1195 }
1196 Err(err) => {
1197 let mut driver = driver.lock().await;
1198 if let Err(run_err) = driver
1199 .as_driver_mut()
1200 .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
1201 run_id,
1202 error: err.to_string(),
1203 recoverable: true,
1204 })
1205 .await
1206 {
1207 drop(driver);
1208 self.unregister_session(session_id).await;
1209 return Err(RuntimeDriverError::Internal(format!(
1210 "failed to persist runtime failure snapshot: {run_err}"
1211 )));
1212 }
1213 Err(err)
1214 }
1215 }
1216 }
1217
1218 pub async fn accept_input_with_completion(
1228 &self,
1229 session_id: &SessionId,
1230 input: Input,
1231 ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
1232 {
1233 let (driver, completions, wake_tx, _control_tx) = {
1234 let sessions = self.sessions.read().await;
1235 let entry = sessions
1236 .get(session_id)
1237 .ok_or(RuntimeDriverError::NotReady {
1238 state: RuntimeState::Destroyed,
1239 })?;
1240 (
1241 entry.driver.clone(),
1242 entry.completions.clone(),
1243 entry.wake_sender(),
1244 entry.control_sender(),
1245 )
1246 };
1247
1248 let (outcome, signal, handle) = {
1249 let mut driver = driver.lock().await;
1250 let result = driver.as_driver_mut().accept_input(input).await?;
1251
1252 match &result {
1253 AcceptOutcome::Accepted { input_id, .. } => {
1254 let is_terminal = driver
1255 .as_driver()
1256 .input_state(input_id)
1257 .map(|state| state.current_state().is_terminal())
1258 .unwrap_or(true);
1259 let handle = if is_terminal {
1260 None
1261 } else {
1262 Some({
1263 let mut completions = completions.lock().await;
1264 completions.register(input_id.clone())
1265 })
1266 };
1267 let signal = driver.take_post_admission_signal();
1268 (result, signal, handle)
1269 }
1270 AcceptOutcome::Deduplicated { existing_id, .. } => {
1271 let existing_state = driver.as_driver().input_state(existing_id);
1273 let is_terminal = existing_state
1274 .map(|s| s.current_state().is_terminal())
1275 .unwrap_or(true); if is_terminal {
1278 (
1280 result,
1281 crate::driver::ephemeral::PostAdmissionSignal::None,
1282 None,
1283 )
1284 } else {
1285 let handle = {
1287 let mut completions = completions.lock().await;
1288 completions.register(existing_id.clone())
1289 };
1290 (
1291 result,
1292 crate::driver::ephemeral::PostAdmissionSignal::None,
1293 Some(handle),
1294 )
1295 }
1296 }
1297 AcceptOutcome::Rejected { reason } => {
1298 return Err(RuntimeDriverError::ValidationFailed {
1299 reason: reason.to_string(),
1300 });
1301 }
1302 }
1303 };
1304
1305 if signal.should_wake()
1306 && let Some(ref wake_tx) = wake_tx
1307 {
1308 let _ = wake_tx.try_send(());
1309 }
1310
1311 Ok((outcome, handle))
1312 }
1313
1314 pub async fn accept_input_without_wake(
1320 &self,
1321 session_id: &SessionId,
1322 input: Input,
1323 ) -> Result<AcceptOutcome, RuntimeDriverError> {
1324 let driver = {
1325 let sessions = self.sessions.read().await;
1326 let entry = sessions
1327 .get(session_id)
1328 .ok_or(RuntimeDriverError::NotReady {
1329 state: RuntimeState::Destroyed,
1330 })?;
1331 entry.driver.clone()
1332 };
1333
1334 let outcome = {
1335 let mut driver = driver.lock().await;
1336 let result = driver.as_driver_mut().accept_input(input).await?;
1337 let signal = driver.take_post_admission_signal();
1338 debug_assert!(
1339 !signal.should_process_immediately(),
1340 "queue-only admission unexpectedly requested immediate processing"
1341 );
1342 result
1344 };
1345
1346 Ok(outcome)
1347 }
1348
1349 pub async fn ops_lifecycle_registry(
1351 &self,
1352 session_id: &SessionId,
1353 ) -> Option<Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>> {
1354 let sessions = self.sessions.read().await;
1355 sessions
1356 .get(session_id)
1357 .map(|e| Arc::clone(&e.ops_lifecycle))
1358 }
1359
1360 pub async fn prepare_bindings(
1370 &self,
1371 session_id: SessionId,
1372 ) -> Result<meerkat_core::SessionRuntimeBindings, RuntimeBindingsError> {
1373 self.register_session(session_id.clone()).await;
1374 let sessions = self.sessions.read().await;
1375 let entry = sessions
1376 .get(&session_id)
1377 .ok_or(RuntimeBindingsError::SessionNotFound(session_id.clone()))?;
1378 Ok(meerkat_core::SessionRuntimeBindings {
1379 session_id,
1380 epoch_id: entry.epoch_id.clone(),
1381 ops_lifecycle: Arc::clone(&entry.ops_lifecycle)
1382 as Arc<dyn meerkat_core::OpsLifecycleRegistry>,
1383 cursor_state: Arc::clone(&entry.cursor_state),
1384 })
1385 }
1386
1387 pub async fn update_peer_ingress_context(
1393 self: &Arc<Self>,
1394 session_id: &SessionId,
1395 keep_alive: bool,
1396 comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
1397 ) -> bool {
1398 {
1399 let mut sessions = self.sessions.write().await;
1400 let Some(entry) = sessions.get_mut(session_id) else {
1401 tracing::warn!(
1402 %session_id,
1403 "refusing to configure comms drain for unregistered session"
1404 );
1405 return false;
1406 };
1407 entry.keep_alive = keep_alive;
1408 entry.comms_runtime = comms_runtime;
1409 }
1410 self.reconcile_peer_ingress(session_id).await
1411 }
1412
1413 async fn reconcile_peer_ingress(self: &Arc<Self>, session_id: &SessionId) -> bool {
1414 let (keep_alive, comms_runtime) = {
1415 let mut sessions = self.sessions.write().await;
1416 let Some(entry) = sessions.get_mut(session_id) else {
1417 return false;
1418 };
1419 entry.clear_dead_attachment();
1420 (entry.keep_alive, entry.comms_runtime.clone())
1421 };
1422
1423 let state = match self.runtime_state(session_id).await {
1424 Ok(state) => state,
1425 Err(_) => RuntimeState::Destroyed,
1426 };
1427 let desired = desired_peer_ingress_mode(state, comms_runtime.is_some(), keep_alive);
1428
1429 let Some(comms) = comms_runtime else {
1430 if desired.is_none() {
1431 self.abort_comms_drain(session_id).await;
1432 }
1433 return false;
1434 };
1435
1436 let mut slots = self.comms_drain_slots.write().await;
1437 let slot = slots
1438 .entry(session_id.clone())
1439 .or_insert_with(CommsDrainSlot::new);
1440
1441 let Some(mode) = desired else {
1442 abort_slot(slot);
1443 return false;
1444 };
1445
1446 let result =
1447 match protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode) {
1448 Ok(r) => r,
1449 Err(e) => {
1450 tracing::trace!(error = %e, "comms drain authority rejected EnsureRunning");
1451 return false;
1452 }
1453 };
1454
1455 for effect in &result.effects {
1457 match effect {
1458 CommsDrainLifecycleEffect::SpawnDrainTask { .. } => {
1459 let handle = crate::comms_drain::spawn_comms_drain(
1460 Arc::clone(self),
1461 session_id.clone(),
1462 comms.clone(),
1463 );
1464 slot.handle = Some(handle);
1465 }
1466 CommsDrainLifecycleEffect::AbortDrainTask => {
1467 if let Some(handle) = slot.handle.take() {
1468 handle.abort();
1469 }
1470 }
1471 }
1472 }
1473
1474 let Some(obligation) = result.obligation else {
1475 tracing::warn!(
1476 %session_id,
1477 "comms drain spawn transition emitted no obligation"
1478 );
1479 return false;
1480 };
1481
1482 match protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation) {
1485 Ok(_effects) => {}
1486 Err(e) => {
1487 tracing::trace!(error = %e, "comms drain authority rejected TaskSpawned");
1488 }
1489 }
1490 true
1491 }
1492
1493 pub async fn notify_comms_drain_exited(
1499 self: &Arc<Self>,
1500 session_id: &SessionId,
1501 reason: DrainExitReason,
1502 ) {
1503 {
1504 let mut slots = self.comms_drain_slots.write().await;
1505 if let Some(slot) = slots.get_mut(session_id) {
1506 slot.handle.take(); match protocol_comms_drain_spawn::notify_task_exited(&mut slot.authority, reason) {
1508 Ok(_effects) => {}
1509 Err(e) => {
1510 tracing::warn!(error = %e, "comms drain authority rejected TaskExited");
1511 }
1512 }
1513 }
1514 }
1515 let _ = self.reconcile_peer_ingress(session_id).await;
1516 }
1517
1518 pub async fn abort_comms_drains(&self) {
1520 let mut slots = self.comms_drain_slots.write().await;
1521 for (_, slot) in slots.iter_mut() {
1522 abort_slot(slot);
1523 }
1524 }
1525
1526 pub async fn abort_comms_drain(&self, session_id: &SessionId) {
1528 let mut slots = self.comms_drain_slots.write().await;
1529 if let Some(slot) = slots.get_mut(session_id) {
1530 abort_slot(slot);
1531 }
1532 }
1533
1534 pub async fn wait_comms_drain(&self, session_id: &SessionId) {
1541 let handle = {
1542 let mut slots = self.comms_drain_slots.write().await;
1543 slots
1544 .get_mut(session_id)
1545 .and_then(|slot| slot.handle.take())
1546 };
1547 if let Some(handle) = handle {
1548 let _ = handle.await;
1549 }
1550 let mut slots = self.comms_drain_slots.write().await;
1554 if let Some(slot) = slots.get_mut(session_id)
1555 && slot.authority.phase()
1556 == meerkat_core::comms_drain_lifecycle_authority::CommsDrainPhase::Running
1557 {
1558 tracing::warn!(
1559 "comms_drain: task exited without notifying authority (likely panicked), \
1560 submitting Failed safety net"
1561 );
1562 match protocol_comms_drain_spawn::notify_task_exited(
1563 &mut slot.authority,
1564 DrainExitReason::Failed,
1565 ) {
1566 Ok(effects) => {
1567 apply_runtime_drain_effects(slot, &effects);
1568 }
1569 Err(e) => {
1570 tracing::warn!(error = %e, "comms drain authority rejected safety-net TaskExited");
1571 }
1572 }
1573 }
1574 }
1575}
1576
1577#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
1578#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
1579impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
1580 fn runtime_mode(&self) -> RuntimeMode {
1581 self.mode
1582 }
1583
1584 async fn accept_input(
1585 &self,
1586 session_id: &SessionId,
1587 input: Input,
1588 ) -> Result<AcceptOutcome, RuntimeDriverError> {
1589 let (driver, wake_tx, _control_tx) = {
1590 let sessions = self.sessions.read().await;
1591 let entry = sessions
1592 .get(session_id)
1593 .ok_or(RuntimeDriverError::NotReady {
1594 state: RuntimeState::Destroyed,
1595 })?;
1596 (
1597 entry.driver.clone(),
1598 entry.wake_sender(),
1599 entry.control_sender(),
1600 )
1601 };
1602
1603 let (outcome, signal) = {
1605 let mut driver = driver.lock().await;
1606 let result = driver.as_driver_mut().accept_input(input).await?;
1607 let signal = driver.take_post_admission_signal();
1608 (result, signal)
1609 };
1610
1611 if signal.should_wake() {
1613 match wake_tx {
1614 Some(ref wake_tx) => {
1615 let _ = wake_tx.try_send(());
1616 }
1617 None => {
1618 tracing::warn!(
1619 %session_id,
1620 "input accepted but runtime loop is not attached — \
1621 wake signal dropped, input will remain queued until \
1622 a loop is re-attached"
1623 );
1624 }
1625 }
1626 }
1627
1628 Ok(outcome)
1629 }
1630
1631 async fn accept_input_with_completion(
1632 &self,
1633 session_id: &SessionId,
1634 input: Input,
1635 ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
1636 {
1637 RuntimeSessionAdapter::accept_input_with_completion(self, session_id, input).await
1638 }
1639
1640 async fn runtime_state(
1641 &self,
1642 session_id: &SessionId,
1643 ) -> Result<RuntimeState, RuntimeDriverError> {
1644 let driver = {
1645 let sessions = self.sessions.read().await;
1646 let entry = sessions
1647 .get(session_id)
1648 .ok_or(RuntimeDriverError::NotReady {
1649 state: RuntimeState::Destroyed,
1650 })?;
1651 entry.driver.clone()
1652 };
1653 let driver = driver.lock().await;
1654 Ok(driver.as_driver().runtime_state())
1655 }
1656
1657 async fn retire_runtime(
1658 &self,
1659 session_id: &SessionId,
1660 ) -> Result<RetireReport, RuntimeDriverError> {
1661 let (driver_handle, completions, wake_tx) = {
1662 let sessions = self.sessions.read().await;
1663 let entry = sessions
1664 .get(session_id)
1665 .ok_or(RuntimeDriverError::NotReady {
1666 state: RuntimeState::Destroyed,
1667 })?;
1668 (
1669 entry.driver.clone(),
1670 entry.completions.clone(),
1671 entry.wake_sender(),
1672 )
1673 };
1674 let mut driver = driver_handle.lock().await;
1675 let mut report = driver.as_driver_mut().retire().await?;
1676 drop(driver); if report.inputs_pending_drain > 0 {
1679 if let Some(ref wake_tx) = wake_tx
1682 && wake_tx.send(()).await.is_ok()
1683 {
1684 return Ok(report);
1685 }
1686
1687 let mut driver = driver_handle.lock().await;
1691 let abandoned = driver
1692 .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
1693 .await?;
1694 drop(driver);
1695 let mut completions = completions.lock().await;
1696 completions.resolve_all_terminated("retired without runtime loop");
1697 report.inputs_abandoned += abandoned;
1698 report.inputs_pending_drain = 0;
1699 }
1700
1701 Ok(report)
1702 }
1703
1704 async fn reset_runtime(
1705 &self,
1706 session_id: &SessionId,
1707 ) -> Result<ResetReport, RuntimeDriverError> {
1708 let (driver, completions) = {
1709 let sessions = self.sessions.read().await;
1710 let entry = sessions
1711 .get(session_id)
1712 .ok_or(RuntimeDriverError::NotReady {
1713 state: RuntimeState::Destroyed,
1714 })?;
1715 (entry.driver.clone(), entry.completions.clone())
1716 };
1717 let mut driver = driver.lock().await;
1718 if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
1719 return Err(RuntimeDriverError::NotReady {
1720 state: RuntimeState::Running,
1721 });
1722 }
1723 let report = driver.as_driver_mut().reset().await?;
1724 drop(driver);
1725
1726 let mut completions = completions.lock().await;
1728 completions.resolve_all_terminated("runtime reset");
1729
1730 Ok(report)
1731 }
1732
1733 async fn input_state(
1734 &self,
1735 session_id: &SessionId,
1736 input_id: &InputId,
1737 ) -> Result<Option<InputState>, RuntimeDriverError> {
1738 let driver = {
1739 let sessions = self.sessions.read().await;
1740 let entry = sessions
1741 .get(session_id)
1742 .ok_or(RuntimeDriverError::NotReady {
1743 state: RuntimeState::Destroyed,
1744 })?;
1745 entry.driver.clone()
1746 };
1747 let driver = driver.lock().await;
1748 Ok(driver.as_driver().input_state(input_id).cloned())
1749 }
1750
1751 async fn list_active_inputs(
1752 &self,
1753 session_id: &SessionId,
1754 ) -> Result<Vec<InputId>, RuntimeDriverError> {
1755 let driver = {
1756 let sessions = self.sessions.read().await;
1757 let entry = sessions
1758 .get(session_id)
1759 .ok_or(RuntimeDriverError::NotReady {
1760 state: RuntimeState::Destroyed,
1761 })?;
1762 entry.driver.clone()
1763 };
1764 let driver = driver.lock().await;
1765 Ok(driver.as_driver().active_input_ids())
1766 }
1767}
1768
1769impl RuntimeSessionAdapter {
1774 fn resolve_session_id(
1780 runtime_id: &LogicalRuntimeId,
1781 ) -> Result<SessionId, RuntimeControlPlaneError> {
1782 runtime_id
1783 .0
1784 .parse::<uuid::Uuid>()
1785 .map(SessionId)
1786 .map_err(|_| RuntimeControlPlaneError::NotFound(runtime_id.clone()))
1787 }
1788
1789 async fn lookup_entry(
1792 &self,
1793 runtime_id: &LogicalRuntimeId,
1794 ) -> Result<
1795 (
1796 SessionId,
1797 SharedDriver,
1798 SharedCompletionRegistry,
1799 Option<mpsc::Sender<()>>,
1800 ),
1801 RuntimeControlPlaneError,
1802 > {
1803 let session_id = Self::resolve_session_id(runtime_id)?;
1804 let sessions = self.sessions.read().await;
1805 let entry = sessions
1806 .get(&session_id)
1807 .ok_or_else(|| RuntimeControlPlaneError::NotFound(runtime_id.clone()))?;
1808 Ok((
1809 session_id,
1810 entry.driver.clone(),
1811 entry.completions.clone(),
1812 entry.wake_sender(),
1813 ))
1814 }
1815}
1816
1817#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
1818#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
1819impl crate::traits::RuntimeControlPlane for RuntimeSessionAdapter {
1820 async fn ingest(
1821 &self,
1822 runtime_id: &LogicalRuntimeId,
1823 input: Input,
1824 ) -> Result<AcceptOutcome, RuntimeControlPlaneError> {
1825 let (session_id, driver, _completions, wake_tx, _control_tx) = {
1826 let (sid, d, c, w) = self.lookup_entry(runtime_id).await?;
1827 let ctrl = {
1828 let sessions = self.sessions.read().await;
1829 sessions
1830 .get(&sid)
1831 .and_then(RuntimeSessionEntry::control_sender)
1832 };
1833 (sid, d, c, w, ctrl)
1834 };
1835 let _ = session_id;
1836
1837 let (outcome, signal) = {
1838 let mut drv = driver.lock().await;
1839 let result = drv
1840 .as_driver_mut()
1841 .accept_input(input)
1842 .await
1843 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1844 let signal = drv.take_post_admission_signal();
1845 (result, signal)
1846 };
1847
1848 if signal.should_wake()
1849 && let Some(ref tx) = wake_tx
1850 {
1851 let _ = tx.try_send(());
1852 }
1853
1854 Ok(outcome)
1855 }
1856
1857 async fn publish_event(
1858 &self,
1859 event: crate::runtime_event::RuntimeEventEnvelope,
1860 ) -> Result<(), RuntimeControlPlaneError> {
1861 let runtime_id = event.runtime_id.clone();
1862 let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(&runtime_id).await?;
1863
1864 let mut drv = driver.lock().await;
1865 drv.as_driver_mut()
1866 .on_runtime_event(event)
1867 .await
1868 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))
1869 }
1870
1871 async fn retire(
1872 &self,
1873 runtime_id: &LogicalRuntimeId,
1874 ) -> Result<RetireReport, RuntimeControlPlaneError> {
1875 let (session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1876 let _ = session_id;
1877
1878 let mut drv = driver.lock().await;
1879 let mut report = drv
1880 .as_driver_mut()
1881 .retire()
1882 .await
1883 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1884 drop(drv);
1885
1886 if report.inputs_pending_drain > 0 {
1887 if let Some(ref tx) = wake_tx
1888 && tx.send(()).await.is_ok()
1889 {
1890 return Ok(report);
1891 }
1892
1893 let mut drv = driver.lock().await;
1895 let abandoned = drv
1896 .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
1897 .await
1898 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1899 drop(drv);
1900 let mut comp = completions.lock().await;
1901 comp.resolve_all_terminated("retired without runtime loop");
1902 report.inputs_abandoned += abandoned;
1903 report.inputs_pending_drain = 0;
1904 }
1905
1906 Ok(report)
1907 }
1908
1909 async fn recycle(
1910 &self,
1911 runtime_id: &LogicalRuntimeId,
1912 ) -> Result<RecycleReport, RuntimeControlPlaneError> {
1913 let (_session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1914
1915 let (transferred, active_after_recycle) = {
1916 let mut drv = driver.lock().await;
1917 let state = drv.as_driver().runtime_state();
1918 if matches!(state, RuntimeState::Running) {
1919 return Err(RuntimeControlPlaneError::InvalidState { state });
1920 }
1921 let should_restore_attached = matches!(state, RuntimeState::Attached);
1922
1923 let transferred = match &mut *drv {
1924 DriverEntry::Ephemeral(driver) => driver
1925 .recycle_preserving_work()
1926 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
1927 DriverEntry::Persistent(driver) => driver
1928 .recycle_preserving_work()
1929 .await
1930 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
1931 };
1932
1933 if should_restore_attached
1934 && matches!(drv.as_driver().runtime_state(), RuntimeState::Idle)
1935 {
1936 drv.attach()
1937 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1938 }
1939
1940 let active_after_recycle = drv.as_driver().active_input_ids();
1941 (transferred, active_after_recycle)
1942 };
1943
1944 {
1947 let pending_after: HashSet<InputId> = active_after_recycle.into_iter().collect();
1948 let mut comp = completions.lock().await;
1949 comp.resolve_not_pending(
1950 |input_id| pending_after.contains(input_id),
1951 "recycled input no longer pending",
1952 );
1953 }
1954
1955 if let Some(ref tx) = wake_tx {
1957 let _ = tx.try_send(());
1958 }
1959
1960 Ok(RecycleReport {
1961 inputs_transferred: transferred,
1962 })
1963 }
1964
1965 async fn reset(
1966 &self,
1967 runtime_id: &LogicalRuntimeId,
1968 ) -> Result<crate::traits::ResetReport, RuntimeControlPlaneError> {
1969 let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
1970
1971 let mut drv = driver.lock().await;
1972 if matches!(drv.as_driver().runtime_state(), RuntimeState::Running) {
1973 return Err(RuntimeControlPlaneError::InvalidState {
1974 state: RuntimeState::Running,
1975 });
1976 }
1977 let report = drv
1978 .as_driver_mut()
1979 .reset()
1980 .await
1981 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1982 drop(drv);
1983
1984 let mut comp = completions.lock().await;
1985 comp.resolve_all_terminated("runtime reset");
1986
1987 Ok(report)
1988 }
1989
1990 async fn recover(
1991 &self,
1992 runtime_id: &LogicalRuntimeId,
1993 ) -> Result<RecoveryReport, RuntimeControlPlaneError> {
1994 let (_session_id, driver, _completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1995
1996 let mut drv = driver.lock().await;
1997 let report = drv
1998 .as_driver_mut()
1999 .recover()
2000 .await
2001 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
2002 drop(drv);
2003
2004 if let Some(ref tx) = wake_tx {
2005 let _ = tx.try_send(());
2006 }
2007
2008 Ok(report)
2009 }
2010
2011 async fn destroy(
2012 &self,
2013 runtime_id: &LogicalRuntimeId,
2014 ) -> Result<DestroyReport, RuntimeControlPlaneError> {
2015 let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
2016
2017 let mut drv = driver.lock().await;
2018 let report = drv
2019 .as_driver_mut()
2020 .destroy()
2021 .await
2022 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
2023 drop(drv);
2024
2025 let mut comp = completions.lock().await;
2026 comp.resolve_all_terminated("runtime destroyed");
2027
2028 Ok(report)
2029 }
2030
2031 async fn runtime_state(
2032 &self,
2033 runtime_id: &LogicalRuntimeId,
2034 ) -> Result<RuntimeState, RuntimeControlPlaneError> {
2035 let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
2036
2037 let drv = driver.lock().await;
2038 Ok(drv.as_driver().runtime_state())
2039 }
2040
2041 async fn load_boundary_receipt(
2042 &self,
2043 runtime_id: &LogicalRuntimeId,
2044 run_id: &RunId,
2045 sequence: u64,
2046 ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeControlPlaneError> {
2047 match &self.store {
2048 Some(store) => store
2049 .load_boundary_receipt(runtime_id, run_id, sequence)
2050 .await
2051 .map_err(|e| RuntimeControlPlaneError::StoreError(e.to_string())),
2052 None => {
2053 Ok(None)
2055 }
2056 }
2057 }
2058}
2059
2060#[cfg(test)]
2061#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
2062mod tests {
2063 use super::*;
2064 use std::sync::atomic::{AtomicBool, Ordering};
2065 use std::time::Duration;
2066
2067 use meerkat_core::agent::CommsRuntime;
2068 use meerkat_core::comms_drain_lifecycle_authority::{CommsDrainMode, CommsDrainPhase};
2069 use tokio::sync::Notify;
2070
2071 struct FakeDrainRuntime {
2072 notify: Arc<Notify>,
2073 dismiss: AtomicBool,
2074 }
2075
2076 impl FakeDrainRuntime {
2077 fn dismissing() -> Self {
2078 Self {
2079 notify: Arc::new(Notify::new()),
2080 dismiss: AtomicBool::new(true),
2081 }
2082 }
2083
2084 fn idle() -> Self {
2085 Self {
2086 notify: Arc::new(Notify::new()),
2087 dismiss: AtomicBool::new(false),
2088 }
2089 }
2090 }
2091
2092 #[async_trait::async_trait]
2093 impl CommsRuntime for FakeDrainRuntime {
2094 async fn drain_messages(&self) -> Vec<String> {
2095 Vec::new()
2096 }
2097
2098 fn inbox_notify(&self) -> Arc<Notify> {
2099 Arc::clone(&self.notify)
2100 }
2101
2102 fn dismiss_received(&self) -> bool {
2103 self.dismiss.load(Ordering::Acquire)
2104 }
2105
2106 async fn drain_peer_input_candidates(
2107 &self,
2108 ) -> Vec<meerkat_core::interaction::PeerInputCandidate> {
2109 Vec::new()
2110 }
2111 }
2112
2113 async fn spawn_test_comms_drain(
2114 adapter: &Arc<RuntimeSessionAdapter>,
2115 session_id: &SessionId,
2116 mode: CommsDrainMode,
2117 comms_runtime: Arc<dyn CommsRuntime>,
2118 ) {
2119 adapter.register_session(session_id.clone()).await;
2120 let mut slots = adapter.comms_drain_slots.write().await;
2121 let slot = slots
2122 .entry(session_id.clone())
2123 .or_insert_with(CommsDrainSlot::new);
2124 let result = protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode)
2125 .expect("ensure running");
2126 let obligation = result
2127 .obligation
2128 .expect("spawn obligation should be present");
2129
2130 apply_runtime_drain_effects(slot, &result.effects);
2131 for effect in &result.effects {
2132 if let CommsDrainLifecycleEffect::SpawnDrainTask { .. } = effect {
2133 slot.handle = Some(crate::comms_drain::spawn_comms_drain(
2134 Arc::clone(adapter),
2135 session_id.clone(),
2136 Arc::clone(&comms_runtime),
2137 ));
2138 }
2139 }
2140
2141 let feedback_effects =
2142 protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation)
2143 .expect("task spawned");
2144 apply_runtime_drain_effects(slot, &feedback_effects);
2145 }
2146
2147 async fn current_phase(
2148 adapter: &Arc<RuntimeSessionAdapter>,
2149 session_id: &SessionId,
2150 ) -> Option<CommsDrainPhase> {
2151 let slots = adapter.comms_drain_slots.read().await;
2152 slots.get(session_id).map(|slot| slot.authority.phase())
2153 }
2154
2155 async fn handle_present(adapter: &Arc<RuntimeSessionAdapter>, session_id: &SessionId) -> bool {
2156 let slots = adapter.comms_drain_slots.read().await;
2157 slots
2158 .get(session_id)
2159 .and_then(|slot| slot.handle.as_ref())
2160 .is_some()
2161 }
2162
2163 async fn wait_for_phase(
2164 adapter: &Arc<RuntimeSessionAdapter>,
2165 session_id: &SessionId,
2166 expected: CommsDrainPhase,
2167 ) {
2168 tokio::time::timeout(Duration::from_secs(1), async {
2169 loop {
2170 if current_phase(adapter, session_id).await == Some(expected) {
2171 break;
2172 }
2173 tokio::time::sleep(Duration::from_millis(5)).await;
2174 }
2175 })
2176 .await
2177 .expect("phase transition");
2178 }
2179
2180 #[tokio::test]
2181 async fn dismiss_exit_updates_authority_before_join() {
2182 let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
2183 let session_id = SessionId::new();
2184 let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::dismissing());
2185
2186 spawn_test_comms_drain(
2187 &adapter,
2188 &session_id,
2189 CommsDrainMode::PersistentHost,
2190 comms_runtime,
2191 )
2192 .await;
2193
2194 wait_for_phase(&adapter, &session_id, CommsDrainPhase::Stopped).await;
2195 assert!(
2196 !handle_present(&adapter, &session_id).await,
2197 "drain task should clear its slot before wait_comms_drain joins"
2198 );
2199
2200 adapter.wait_comms_drain(&session_id).await;
2201 assert_eq!(
2202 current_phase(&adapter, &session_id).await,
2203 Some(CommsDrainPhase::Stopped)
2204 );
2205 }
2206
2207 #[test]
2208 fn desired_peer_ingress_mode_tracks_live_attachment_and_keep_alive() {
2209 assert_eq!(
2210 desired_peer_ingress_mode(RuntimeState::Attached, true, false),
2211 Some(CommsDrainMode::AttachedSession)
2212 );
2213 assert_eq!(
2214 desired_peer_ingress_mode(RuntimeState::Running, true, false),
2215 Some(CommsDrainMode::AttachedSession)
2216 );
2217 assert_eq!(
2218 desired_peer_ingress_mode(RuntimeState::Recovering, true, false),
2219 Some(CommsDrainMode::AttachedSession)
2220 );
2221 assert_eq!(
2222 desired_peer_ingress_mode(RuntimeState::Idle, true, true),
2223 Some(CommsDrainMode::PersistentHost)
2224 );
2225 assert_eq!(
2226 desired_peer_ingress_mode(RuntimeState::Idle, true, false),
2227 None
2228 );
2229 assert_eq!(
2230 desired_peer_ingress_mode(RuntimeState::Attached, false, true),
2231 None
2232 );
2233 assert_eq!(
2234 desired_peer_ingress_mode(RuntimeState::Initializing, true, true),
2235 None
2236 );
2237 assert_eq!(
2238 desired_peer_ingress_mode(RuntimeState::Retired, true, true),
2239 None
2240 );
2241 assert_eq!(
2242 desired_peer_ingress_mode(RuntimeState::Stopped, true, true),
2243 None
2244 );
2245 assert_eq!(
2246 desired_peer_ingress_mode(RuntimeState::Destroyed, true, true),
2247 None
2248 );
2249 }
2250
2251 #[tokio::test]
2252 async fn unregister_session_aborts_and_removes_drain_slot() {
2253 let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
2254 let session_id = SessionId::new();
2255 let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::idle());
2256
2257 adapter.register_session(session_id.clone()).await;
2258 spawn_test_comms_drain(
2259 &adapter,
2260 &session_id,
2261 CommsDrainMode::PersistentHost,
2262 comms_runtime,
2263 )
2264 .await;
2265
2266 assert_eq!(
2267 current_phase(&adapter, &session_id).await,
2268 Some(CommsDrainPhase::Running)
2269 );
2270 assert!(handle_present(&adapter, &session_id).await);
2271
2272 adapter.unregister_session(&session_id).await;
2273
2274 let slots = adapter.comms_drain_slots.read().await;
2275 assert!(
2276 !slots.contains_key(&session_id),
2277 "unregister must remove the comms drain slot entirely"
2278 );
2279 }
2280}