1use std::collections::{HashMap, HashSet};
14use std::future::Future;
15use std::sync::Arc;
16
17use meerkat_core::BlobStore;
18use meerkat_core::comms_drain_lifecycle_authority::{
19 CommsDrainLifecycleAuthority, CommsDrainLifecycleEffect, CommsDrainMode, DrainExitReason,
20};
21use meerkat_core::generated::{protocol_comms_drain_abort, protocol_comms_drain_spawn};
22use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
23use meerkat_core::lifecycle::run_control::RunControlCommand;
24use meerkat_core::lifecycle::{InputId, RunId};
25use meerkat_core::types::SessionId;
26
27use crate::accept::AcceptOutcome;
28use crate::driver::ephemeral::EphemeralRuntimeDriver;
29use crate::driver::persistent::PersistentRuntimeDriver;
30use crate::identifiers::LogicalRuntimeId;
31use crate::input::Input;
32use crate::input_lifecycle_authority::InputLifecycleError;
33use crate::input_state::InputState;
34use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
35use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
36use crate::store::RuntimeStore;
37use crate::tokio;
38use crate::tokio::sync::{Mutex, RwLock, mpsc};
39use crate::traits::{
40 DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
41 RuntimeControlPlaneError, RuntimeDriver, RuntimeDriverError,
42};
43
44pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
46
47pub(crate) enum DriverEntry {
49 Ephemeral(EphemeralRuntimeDriver),
50 Persistent(PersistentRuntimeDriver),
51}
52
53impl DriverEntry {
54 pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
55 match self {
56 DriverEntry::Ephemeral(d) => d,
57 DriverEntry::Persistent(d) => d,
58 }
59 }
60
61 pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
62 match self {
63 DriverEntry::Ephemeral(d) => d,
64 DriverEntry::Persistent(d) => d,
65 }
66 }
67
68 pub(crate) fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
70 match self {
71 DriverEntry::Ephemeral(d) => d.set_silent_comms_intents(intents),
72 DriverEntry::Persistent(d) => d.set_silent_comms_intents(intents),
73 }
74 }
75
76 pub(crate) fn is_idle_or_attached(&self) -> bool {
78 match self {
79 DriverEntry::Ephemeral(d) => d.is_idle_or_attached(),
80 DriverEntry::Persistent(d) => d.is_idle_or_attached(),
81 }
82 }
83
84 pub(crate) fn attach(&mut self) -> Result<(), RuntimeStateTransitionError> {
86 match self {
87 DriverEntry::Ephemeral(d) => d.attach(),
88 DriverEntry::Persistent(d) => d.attach(),
89 }
90 }
91
92 pub(crate) fn detach(
94 &mut self,
95 ) -> Result<Option<crate::runtime_state::RuntimeState>, RuntimeStateTransitionError> {
96 match self {
97 DriverEntry::Ephemeral(d) => d.detach(),
98 DriverEntry::Persistent(d) => d.detach(),
99 }
100 }
101
102 pub(crate) fn can_process_queue(&self) -> bool {
104 match self {
105 DriverEntry::Ephemeral(d) => d.control().can_process_queue(),
106 DriverEntry::Persistent(d) => d.inner_ref().control().can_process_queue(),
107 }
108 }
109
110 pub(crate) fn take_wake_requested(&mut self) -> bool {
112 match self {
113 DriverEntry::Ephemeral(d) => d.take_wake_requested(),
114 DriverEntry::Persistent(d) => d.take_wake_requested(),
115 }
116 }
117
118 pub(crate) fn take_process_requested(&mut self) -> bool {
120 match self {
121 DriverEntry::Ephemeral(d) => d.take_process_requested(),
122 DriverEntry::Persistent(d) => d.take_process_requested(),
123 }
124 }
125
126 pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
128 match self {
129 DriverEntry::Ephemeral(d) => d.dequeue_next(),
130 DriverEntry::Persistent(d) => d.dequeue_next(),
131 }
132 }
133
134 pub(crate) fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
136 match self {
137 DriverEntry::Ephemeral(d) => d.dequeue_by_id(input_id),
138 DriverEntry::Persistent(d) => d.dequeue_by_id(input_id),
139 }
140 }
141
142 pub(crate) fn ingress(&self) -> &crate::runtime_ingress_authority::RuntimeIngressAuthority {
144 match self {
145 DriverEntry::Ephemeral(d) => d.ingress(),
146 DriverEntry::Persistent(d) => d.inner_ref().ingress(),
147 }
148 }
149
150 pub(crate) fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
151 match self {
152 DriverEntry::Ephemeral(d) => d.has_queued_input_outside(excluded),
153 DriverEntry::Persistent(d) => d.has_queued_input_outside(excluded),
154 }
155 }
156
157 pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
159 match self {
160 DriverEntry::Ephemeral(d) => d.start_run(run_id),
161 DriverEntry::Persistent(d) => d.start_run(run_id),
162 }
163 }
164
165 pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
167 match self {
168 DriverEntry::Ephemeral(d) => d.complete_run(),
169 DriverEntry::Persistent(d) => d.complete_run(),
170 }
171 }
172
173 pub(crate) fn stage_input(
175 &mut self,
176 input_id: &InputId,
177 run_id: &RunId,
178 ) -> Result<(), InputLifecycleError> {
179 match self {
180 DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
181 DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
182 }
183 }
184
185 pub(crate) fn stage_batch(
187 &mut self,
188 input_ids: &[InputId],
189 run_id: &RunId,
190 ) -> Result<(), InputLifecycleError> {
191 match self {
192 DriverEntry::Ephemeral(d) => d.stage_batch(input_ids, run_id),
193 DriverEntry::Persistent(d) => d.stage_batch(input_ids, run_id),
194 }
195 }
196
197 pub(crate) fn rollback_staged(
199 &mut self,
200 input_ids: &[InputId],
201 ) -> Result<(), InputLifecycleError> {
202 match self {
203 DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
204 DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
205 }
206 }
207
208 pub(crate) async fn abandon_pending_inputs(
209 &mut self,
210 reason: crate::input_state::InputAbandonReason,
211 ) -> Result<usize, RuntimeDriverError> {
212 match self {
213 DriverEntry::Ephemeral(d) => Ok(d.abandon_pending_inputs(reason)),
214 DriverEntry::Persistent(d) => d.abandon_pending_inputs(reason).await,
215 }
216 }
217}
218
219pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
221
222struct RuntimeSessionEntry {
224 driver: SharedDriver,
226 ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
228 completions: SharedCompletionRegistry,
230 attachment: Option<RuntimeLoopAttachment>,
232}
233
234struct RuntimeLoopAttachment {
239 wake_tx: mpsc::Sender<()>,
240 control_tx: mpsc::Sender<RunControlCommand>,
241 _loop_handle: tokio::task::JoinHandle<()>,
242}
243
244impl RuntimeSessionEntry {
245 fn attachment_is_live(&self) -> bool {
246 self.attachment
247 .as_ref()
248 .map(|attachment| !attachment.wake_tx.is_closed() && !attachment.control_tx.is_closed())
249 .unwrap_or(false)
250 }
251
252 fn has_attachment(&self) -> bool {
253 self.attachment_is_live()
254 }
255
256 fn attach_runtime_loop(
257 &mut self,
258 wake_tx: mpsc::Sender<()>,
259 control_tx: mpsc::Sender<RunControlCommand>,
260 loop_handle: tokio::task::JoinHandle<()>,
261 ) {
262 self.attachment = Some(RuntimeLoopAttachment {
263 wake_tx,
264 control_tx,
265 _loop_handle: loop_handle,
266 });
267 }
268
269 fn clear_dead_attachment(&mut self) -> bool {
270 if self.attachment.is_some() && !self.attachment_is_live() {
271 self.attachment = None;
272 return true;
273 }
274 false
275 }
276
277 fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
278 if !self.attachment_is_live() {
279 return None;
280 }
281 self.attachment
282 .as_ref()
283 .map(|attachment| attachment.wake_tx.clone())
284 }
285
286 fn control_sender(&self) -> Option<mpsc::Sender<RunControlCommand>> {
287 if !self.attachment_is_live() {
288 return None;
289 }
290 self.attachment
291 .as_ref()
292 .map(|attachment| attachment.control_tx.clone())
293 }
294}
295
296struct CommsDrainSlot {
301 authority: CommsDrainLifecycleAuthority,
302 handle: Option<tokio::task::JoinHandle<()>>,
303}
304
305impl CommsDrainSlot {
306 fn new() -> Self {
307 Self {
308 authority: CommsDrainLifecycleAuthority::new(),
309 handle: None,
310 }
311 }
312}
313
314fn apply_runtime_drain_effects(slot: &mut CommsDrainSlot, effects: &[CommsDrainLifecycleEffect]) {
315 for effect in effects {
316 if let CommsDrainLifecycleEffect::AbortDrainTask = effect
317 && let Some(handle) = slot.handle.take()
318 {
319 handle.abort();
320 }
321 }
322}
323
324fn abort_slot(slot: &mut CommsDrainSlot) {
325 match protocol_comms_drain_abort::execute_stop_requested(&mut slot.authority) {
326 Ok(result) => {
327 apply_runtime_drain_effects(slot, &result.effects);
328 let _ = result.obligation;
331 }
332 Err(_) => {
333 if let Some(handle) = slot.handle.take() {
335 handle.abort();
336 }
337 }
338 }
339}
340
341pub struct RuntimeSessionAdapter {
348 sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
350 mode: RuntimeMode,
352 store: Option<Arc<dyn RuntimeStore>>,
354 blob_store: Option<Arc<dyn BlobStore>>,
356 comms_drain_slots: RwLock<HashMap<SessionId, CommsDrainSlot>>,
358}
359
360impl RuntimeSessionAdapter {
361 pub fn ephemeral() -> Self {
363 Self {
364 sessions: RwLock::new(HashMap::new()),
365 mode: RuntimeMode::V9Compliant,
366 store: None,
367 blob_store: None,
368 comms_drain_slots: RwLock::new(HashMap::new()),
369 }
370 }
371
372 pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
374 Self {
375 sessions: RwLock::new(HashMap::new()),
376 mode: RuntimeMode::V9Compliant,
377 store: Some(store),
378 blob_store: Some(blob_store),
379 comms_drain_slots: RwLock::new(HashMap::new()),
380 }
381 }
382
383 fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
385 let runtime_id = LogicalRuntimeId::new(session_id.to_string());
386 match (&self.store, &self.blob_store) {
387 (Some(store), Some(blob_store)) => DriverEntry::Persistent(
388 PersistentRuntimeDriver::new(runtime_id, store.clone(), blob_store.clone()),
389 ),
390 (Some(_store), None) => {
391 tracing::warn!(
392 %session_id,
393 "persistent runtime store present but blob store missing; \
394 falling back to ephemeral driver"
395 );
396 DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id))
397 }
398 _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
399 }
400 }
401
402 pub async fn register_session(&self, session_id: SessionId) {
405 {
406 let mut sessions = self.sessions.write().await;
407 if let Some(existing) = sessions.get_mut(&session_id) {
408 existing.clear_dead_attachment();
409 return;
410 }
411 }
412
413 let mut entry = self.make_driver(&session_id);
414 if let Err(err) = entry.as_driver_mut().recover().await {
415 tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
416 return;
417 }
418 let session_entry = RuntimeSessionEntry {
419 driver: Arc::new(Mutex::new(entry)),
420 ops_lifecycle: Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
421 completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
422 attachment: None,
423 };
424 let mut sessions = self.sessions.write().await;
425 if let Some(existing) = sessions.get_mut(&session_id) {
426 existing.clear_dead_attachment();
427 } else {
428 sessions.insert(session_id, session_entry);
429 }
430 }
431
432 pub async fn set_session_silent_intents(&self, session_id: &SessionId, intents: Vec<String>) {
437 let sessions = self.sessions.read().await;
438 if let Some(entry) = sessions.get(session_id) {
439 let mut driver = entry.driver.lock().await;
440 driver.set_silent_comms_intents(intents);
441 }
442 }
443
444 pub async fn register_session_with_executor(
449 &self,
450 session_id: SessionId,
451 executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
452 ) {
453 self.ensure_session_with_executor(session_id, executor)
454 .await;
455 }
456
457 pub async fn ensure_session_with_executor(
463 &self,
464 session_id: SessionId,
465 executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
466 ) {
467 let existing = {
468 let mut sessions = self.sessions.write().await;
469 sessions.get_mut(&session_id).map(|entry| {
470 entry.clear_dead_attachment();
471 (
472 entry.has_attachment(),
473 entry.driver.clone(),
474 entry.completions.clone(),
475 )
476 })
477 };
478
479 let (driver, completions) = if let Some((has_attachment, driver, completions)) = existing {
480 if has_attachment {
481 return;
482 }
483 (driver, completions)
484 } else {
485 let mut recovered_entry = self.make_driver(&session_id);
486 if let Err(err) = recovered_entry.as_driver_mut().recover().await {
487 tracing::error!(
488 %session_id,
489 error = %err,
490 "failed to recover runtime driver during registration"
491 );
492 return;
493 }
494
495 let mut sessions = self.sessions.write().await;
496 if let Some(entry) = sessions.get_mut(&session_id) {
497 entry.clear_dead_attachment();
498 if entry.has_attachment() {
499 return;
500 }
501 (entry.driver.clone(), entry.completions.clone())
502 } else {
503 let driver = Arc::new(Mutex::new(recovered_entry));
504 let completions =
505 Arc::new(Mutex::new(crate::completion::CompletionRegistry::new()));
506 sessions.insert(
507 session_id.clone(),
508 RuntimeSessionEntry {
509 driver: driver.clone(),
510 ops_lifecycle: Arc::new(
511 crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new(),
512 ),
513 completions: completions.clone(),
514 attachment: None,
515 },
516 );
517 (driver, completions)
518 }
519 };
520
521 let should_wake = {
522 let mut driver_guard = driver.lock().await;
523 if let Err(error) = driver_guard.attach() {
524 let repaired = if error.from == RuntimeState::Attached
525 && error.to == RuntimeState::Attached
526 {
527 tracing::warn!(
528 %session_id,
529 error = %error,
530 "runtime driver remained attached without a live published loop; detaching and retrying attachment"
531 );
532 match driver_guard.detach() {
533 Ok(_) => match driver_guard.attach() {
534 Ok(()) => true,
535 Err(retry_error) => {
536 tracing::warn!(
537 %session_id,
538 error = %retry_error,
539 "failed to re-attach runtime driver after repairing stale attachment state"
540 );
541 false
542 }
543 },
544 Err(detach_error) => {
545 tracing::warn!(
546 %session_id,
547 error = %detach_error,
548 "failed to detach stale attached runtime driver before retrying attachment"
549 );
550 false
551 }
552 }
553 } else {
554 false
555 };
556 if !repaired {
557 tracing::warn!(
558 %session_id,
559 error = %error,
560 "failed to attach runtime driver before publishing loop attachment"
561 );
562 return;
563 }
564 }
565 !driver_guard.as_driver().active_input_ids().is_empty()
566 };
567
568 let (wake_tx, wake_rx) = mpsc::channel(16);
569 let (control_tx, control_rx) = mpsc::channel(16);
570 let mut pending_loop_handle =
571 Some(crate::runtime_loop::spawn_runtime_loop_with_completions(
572 driver.clone(),
573 executor,
574 wake_rx,
575 control_rx,
576 Some(completions.clone()),
577 ));
578
579 let (published, detach_after_abort) = {
580 let mut sessions = self.sessions.write().await;
581 match sessions.get_mut(&session_id) {
582 None => (false, true),
583 Some(entry) => {
584 entry.clear_dead_attachment();
585 if entry.has_attachment() {
586 (false, false)
587 } else if !Arc::ptr_eq(&entry.driver, &driver)
588 || !Arc::ptr_eq(&entry.completions, &completions)
589 {
590 tracing::warn!(
591 %session_id,
592 "runtime session entry changed while wiring executor; aborting stale loop attachment"
593 );
594 (false, true)
595 } else {
596 match pending_loop_handle.take() {
597 Some(loop_handle) => {
598 entry.attach_runtime_loop(wake_tx.clone(), control_tx, loop_handle);
599 (true, false)
600 }
601 None => {
602 tracing::error!(
603 %session_id,
604 "runtime loop handle missing during attachment publish"
605 );
606 (false, true)
607 }
608 }
609 }
610 }
611 }
612 };
613
614 if !published {
615 if let Some(loop_handle) = pending_loop_handle.take() {
616 loop_handle.abort();
617 }
618 if detach_after_abort {
619 let mut driver_guard = driver.lock().await;
620 let _ = driver_guard.detach();
621 }
622 return;
623 }
624
625 if should_wake {
626 let _ = wake_tx.try_send(());
627 }
628 }
629
630 pub async fn unregister_session(&self, session_id: &SessionId) {
635 let entry = {
636 let mut sessions = self.sessions.write().await;
637 let mut slots = self.comms_drain_slots.write().await;
638 if let Some(mut slot) = slots.remove(session_id) {
641 abort_slot(&mut slot);
642 }
643 sessions.remove(session_id)
644 };
645
646 if let Some(entry) = entry {
647 let mut driver = entry.driver.lock().await;
648 let _ = driver.detach(); drop(driver);
650
651 let mut completions = entry.completions.lock().await;
652 completions.resolve_all_terminated("runtime session unregistered");
653 }
654 }
655
656 pub async fn contains_session(&self, session_id: &SessionId) -> bool {
658 self.sessions.read().await.contains_key(session_id)
659 }
660
661 pub async fn interrupt_current_run(
663 &self,
664 session_id: &SessionId,
665 ) -> Result<(), RuntimeDriverError> {
666 let (driver, control_tx) = {
667 let sessions = self.sessions.read().await;
668 let entry = sessions
669 .get(session_id)
670 .ok_or(RuntimeDriverError::NotReady {
671 state: RuntimeState::Destroyed,
672 })?;
673 (entry.driver.clone(), entry.control_sender())
674 };
675
676 let Some(control_tx) = control_tx else {
677 let state = {
678 let driver = driver.lock().await;
679 driver.as_driver().runtime_state()
680 };
681 return Err(RuntimeDriverError::NotReady { state });
682 };
683 control_tx
684 .send(RunControlCommand::CancelCurrentRun {
685 reason: "mob interrupt".to_string(),
686 })
687 .await
688 .map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
689 }
690
691 pub async fn stop_runtime_executor(
695 &self,
696 session_id: &SessionId,
697 command: RunControlCommand,
698 ) -> Result<(), RuntimeDriverError> {
699 let (driver, completions, control_tx) = {
700 let sessions = self.sessions.read().await;
701 let entry = sessions
702 .get(session_id)
703 .ok_or(RuntimeDriverError::NotReady {
704 state: RuntimeState::Destroyed,
705 })?;
706 (
707 entry.driver.clone(),
708 entry.completions.clone(),
709 entry.control_sender(),
710 )
711 };
712
713 if let Some(control_tx) = control_tx
714 && control_tx.send(command.clone()).await.is_ok()
715 {
716 return Ok(());
717 }
718
719 if matches!(command, RunControlCommand::StopRuntimeExecutor { .. }) {
720 let mut driver = driver.lock().await;
721 driver
722 .as_driver_mut()
723 .on_runtime_control(crate::traits::RuntimeControlCommand::Stop)
724 .await?;
725 drop(driver);
726 let mut completions = completions.lock().await;
727 completions.resolve_all_terminated("runtime stopped");
728 drop(completions);
729
730 let mut sessions = self.sessions.write().await;
733 if let Some(entry) = sessions.get_mut(session_id) {
734 entry.clear_dead_attachment();
735 }
736 Ok(())
737 } else {
738 Err(RuntimeDriverError::Internal(
739 "failed to send stop: runtime loop is unavailable".into(),
740 ))
741 }
742 }
743
744 pub async fn accept_input_and_run<T, F, Fut>(
749 &self,
750 session_id: &SessionId,
751 input: Input,
752 op: F,
753 ) -> Result<T, RuntimeDriverError>
754 where
755 F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
756 Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
757 {
758 let driver = {
759 let sessions = self.sessions.read().await;
760 sessions
761 .get(session_id)
762 .ok_or(RuntimeDriverError::NotReady {
763 state: RuntimeState::Destroyed,
764 })?
765 .driver
766 .clone()
767 };
768
769 let (input_id, run_id, primitive) = {
770 let mut driver = driver.lock().await;
771 if !driver.is_idle_or_attached() {
772 return Err(RuntimeDriverError::NotReady {
773 state: driver.as_driver().runtime_state(),
774 });
775 }
776
777 let active_input_ids = driver.as_driver().active_input_ids();
778 if !active_input_ids.is_empty() {
779 let duplicate_active_input =
780 input.header().idempotency_key.as_ref().and_then(|key| {
781 active_input_ids.iter().find(|active_id| {
782 driver
783 .as_driver()
784 .input_state(active_id)
785 .and_then(|state| state.idempotency_key.as_ref())
786 == Some(key)
787 })
788 });
789 if let Some(existing_id) = duplicate_active_input {
790 return Err(RuntimeDriverError::ValidationFailed {
791 reason: format!(
792 "accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
793 ),
794 });
795 }
796 return Err(RuntimeDriverError::NotReady {
797 state: driver.as_driver().runtime_state(),
798 });
799 }
800
801 let outcome = driver.as_driver_mut().accept_input(input).await?;
802 let input_id = match outcome {
803 AcceptOutcome::Accepted { input_id, .. } => input_id,
804 AcceptOutcome::Deduplicated { existing_id, .. } => {
805 return Err(RuntimeDriverError::ValidationFailed {
806 reason: format!(
807 "accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
808 ),
809 });
810 }
811 AcceptOutcome::Rejected { reason } => {
812 return Err(RuntimeDriverError::ValidationFailed { reason });
813 }
814 };
815
816 if !driver.is_idle_or_attached() {
817 return Err(RuntimeDriverError::NotReady {
818 state: driver.as_driver().runtime_state(),
819 });
820 }
821
822 let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
823 RuntimeDriverError::Internal("accepted input was not queued for execution".into())
824 })?;
825 if dequeued_id != input_id {
826 return Err(RuntimeDriverError::NotReady {
827 state: driver.as_driver().runtime_state(),
828 });
829 }
830 let run_id = RunId::new();
831 driver.start_run(run_id.clone()).map_err(|err| {
832 RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
833 })?;
834 driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
835 RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
836 })?;
837 let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
838 (input_id, run_id, primitive)
839 };
840
841 match op(run_id.clone(), primitive.clone()).await {
842 Ok((result, output)) => {
843 let mut driver = driver.lock().await;
844 if let Err(err) = driver
845 .as_driver_mut()
846 .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
847 run_id: run_id.clone(),
848 receipt: output.receipt,
849 session_snapshot: output.session_snapshot,
850 })
851 .await
852 {
853 if let Err(unwind_err) = driver
854 .as_driver_mut()
855 .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
856 run_id,
857 error: format!("boundary commit failed: {err}"),
858 recoverable: true,
859 })
860 .await
861 {
862 return Err(RuntimeDriverError::Internal(format!(
863 "runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
864 )));
865 }
866 return Err(RuntimeDriverError::Internal(format!(
867 "runtime boundary commit failed: {err}"
868 )));
869 }
870 if let Err(err) = driver
871 .as_driver_mut()
872 .on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
873 run_id,
874 consumed_input_ids: vec![input_id],
875 })
876 .await
877 {
878 drop(driver);
879 self.unregister_session(session_id).await;
880 return Err(RuntimeDriverError::Internal(format!(
881 "failed to persist runtime completion snapshot: {err}"
882 )));
883 }
884 Ok(result)
885 }
886 Err(err) => {
887 let mut driver = driver.lock().await;
888 if let Err(run_err) = driver
889 .as_driver_mut()
890 .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
891 run_id,
892 error: err.to_string(),
893 recoverable: true,
894 })
895 .await
896 {
897 drop(driver);
898 self.unregister_session(session_id).await;
899 return Err(RuntimeDriverError::Internal(format!(
900 "failed to persist runtime failure snapshot: {run_err}"
901 )));
902 }
903 Err(err)
904 }
905 }
906 }
907
908 pub async fn accept_input_with_completion(
918 &self,
919 session_id: &SessionId,
920 input: Input,
921 ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
922 {
923 let (driver, completions, wake_tx) = {
924 let sessions = self.sessions.read().await;
925 let entry = sessions
926 .get(session_id)
927 .ok_or(RuntimeDriverError::NotReady {
928 state: RuntimeState::Destroyed,
929 })?;
930 (
931 entry.driver.clone(),
932 entry.completions.clone(),
933 entry.wake_sender(),
934 )
935 };
936
937 let (outcome, should_wake, should_process, handle) = {
938 let mut driver = driver.lock().await;
939 let result = driver.as_driver_mut().accept_input(input).await?;
940
941 match &result {
942 AcceptOutcome::Accepted { input_id, .. } => {
943 let is_terminal = driver
944 .as_driver()
945 .input_state(input_id)
946 .map(|state| state.current_state().is_terminal())
947 .unwrap_or(true);
948 let handle = if is_terminal {
949 None
950 } else {
951 Some({
952 let mut completions = completions.lock().await;
953 completions.register(input_id.clone())
954 })
955 };
956 let wake = driver.take_wake_requested();
957 let process_now = driver.take_process_requested();
958 (result, wake, process_now, handle)
959 }
960 AcceptOutcome::Deduplicated { existing_id, .. } => {
961 let existing_state = driver.as_driver().input_state(existing_id);
963 let is_terminal = existing_state
964 .map(|s| s.current_state().is_terminal())
965 .unwrap_or(true); if is_terminal {
968 (result, false, false, None)
970 } else {
971 let handle = {
973 let mut completions = completions.lock().await;
974 completions.register(existing_id.clone())
975 };
976 (result, false, false, Some(handle))
977 }
978 }
979 AcceptOutcome::Rejected { reason } => {
980 return Err(RuntimeDriverError::ValidationFailed {
981 reason: reason.clone(),
982 });
983 }
984 }
985 };
986
987 if (should_wake || should_process)
988 && let Some(ref wake_tx) = wake_tx
989 {
990 let _ = wake_tx.try_send(());
991 }
992
993 Ok((outcome, handle))
994 }
995
996 pub async fn accept_input_without_wake(
1002 &self,
1003 session_id: &SessionId,
1004 input: Input,
1005 ) -> Result<AcceptOutcome, RuntimeDriverError> {
1006 let driver = {
1007 let sessions = self.sessions.read().await;
1008 let entry = sessions
1009 .get(session_id)
1010 .ok_or(RuntimeDriverError::NotReady {
1011 state: RuntimeState::Destroyed,
1012 })?;
1013 entry.driver.clone()
1014 };
1015
1016 let outcome = {
1017 let mut driver = driver.lock().await;
1018 let result = driver.as_driver_mut().accept_input(input).await?;
1019 let _ = driver.take_wake_requested();
1020 let process_requested = driver.take_process_requested();
1021 debug_assert!(
1022 !process_requested,
1023 "queue-only admission unexpectedly requested immediate processing"
1024 );
1025 result
1026 };
1027
1028 Ok(outcome)
1029 }
1030
1031 pub async fn ops_lifecycle_registry(
1033 &self,
1034 session_id: &SessionId,
1035 ) -> Option<Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>> {
1036 let sessions = self.sessions.read().await;
1037 sessions
1038 .get(session_id)
1039 .map(|e| Arc::clone(&e.ops_lifecycle))
1040 }
1041
1042 pub async fn maybe_spawn_comms_drain(
1050 self: &Arc<Self>,
1051 session_id: &SessionId,
1052 keep_alive: bool,
1053 comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
1054 ) -> bool {
1055 if !keep_alive {
1056 self.abort_comms_drain(session_id).await;
1058 return false;
1059 }
1060
1061 let mode = CommsDrainMode::PersistentHost;
1062
1063 let comms = match comms_runtime {
1064 Some(c) => c,
1065 None => return false,
1066 };
1067
1068 let sessions = self.sessions.read().await;
1069 if !sessions.contains_key(session_id) {
1070 tracing::warn!(
1071 %session_id,
1072 "refusing to spawn comms drain for unregistered session"
1073 );
1074 return false;
1075 }
1076 let mut slots = self.comms_drain_slots.write().await;
1079 let slot = slots
1080 .entry(session_id.clone())
1081 .or_insert_with(CommsDrainSlot::new);
1082
1083 let result =
1084 match protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode) {
1085 Ok(r) => r,
1086 Err(e) => {
1087 tracing::trace!(error = %e, "comms drain authority rejected EnsureRunning");
1088 return false;
1089 }
1090 };
1091
1092 for effect in &result.effects {
1094 match effect {
1095 CommsDrainLifecycleEffect::SpawnDrainTask { mode: spawn_mode } => {
1096 let idle_timeout = match spawn_mode {
1097 CommsDrainMode::PersistentHost => Some(std::time::Duration::MAX),
1098 CommsDrainMode::Timed => None,
1099 };
1100 let handle = crate::comms_drain::spawn_comms_drain(
1101 Arc::clone(self),
1102 session_id.clone(),
1103 comms.clone(),
1104 idle_timeout,
1105 );
1106 slot.handle = Some(handle);
1107 }
1108 CommsDrainLifecycleEffect::AbortDrainTask => {
1109 if let Some(handle) = slot.handle.take() {
1110 handle.abort();
1111 }
1112 }
1113 }
1114 }
1115
1116 let Some(obligation) = result.obligation else {
1117 tracing::warn!(
1118 %session_id,
1119 "comms drain spawn transition emitted no obligation"
1120 );
1121 return false;
1122 };
1123
1124 match protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation) {
1127 Ok(_effects) => {}
1128 Err(e) => {
1129 tracing::trace!(error = %e, "comms drain authority rejected TaskSpawned");
1130 }
1131 }
1132 true
1133 }
1134
1135 pub async fn notify_comms_drain_exited(&self, session_id: &SessionId, reason: DrainExitReason) {
1141 let mut slots = self.comms_drain_slots.write().await;
1142 if let Some(slot) = slots.get_mut(session_id) {
1143 slot.handle.take(); match protocol_comms_drain_spawn::notify_task_exited(&mut slot.authority, reason) {
1145 Ok(_effects) => {}
1146 Err(e) => {
1147 tracing::warn!(error = %e, "comms drain authority rejected TaskExited");
1148 }
1149 }
1150 }
1151 }
1152
1153 pub async fn abort_comms_drains(&self) {
1155 let mut slots = self.comms_drain_slots.write().await;
1156 for (_, slot) in slots.iter_mut() {
1157 abort_slot(slot);
1158 }
1159 }
1160
1161 pub async fn abort_comms_drain(&self, session_id: &SessionId) {
1163 let mut slots = self.comms_drain_slots.write().await;
1164 if let Some(slot) = slots.get_mut(session_id) {
1165 abort_slot(slot);
1166 }
1167 }
1168
1169 pub async fn wait_comms_drain(&self, session_id: &SessionId) {
1176 let handle = {
1177 let mut slots = self.comms_drain_slots.write().await;
1178 slots
1179 .get_mut(session_id)
1180 .and_then(|slot| slot.handle.take())
1181 };
1182 if let Some(handle) = handle {
1183 let _ = handle.await;
1184 }
1185 let mut slots = self.comms_drain_slots.write().await;
1189 if let Some(slot) = slots.get_mut(session_id)
1190 && slot.authority.phase()
1191 == meerkat_core::comms_drain_lifecycle_authority::CommsDrainPhase::Running
1192 {
1193 tracing::warn!(
1194 "comms_drain: task exited without notifying authority (likely panicked), \
1195 submitting Failed safety net"
1196 );
1197 match protocol_comms_drain_spawn::notify_task_exited(
1198 &mut slot.authority,
1199 DrainExitReason::Failed,
1200 ) {
1201 Ok(effects) => {
1202 apply_runtime_drain_effects(slot, &effects);
1203 }
1204 Err(e) => {
1205 tracing::warn!(error = %e, "comms drain authority rejected safety-net TaskExited");
1206 }
1207 }
1208 }
1209 }
1210}
1211
1212#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
1213#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
1214impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
1215 fn runtime_mode(&self) -> RuntimeMode {
1216 self.mode
1217 }
1218
1219 async fn accept_input(
1220 &self,
1221 session_id: &SessionId,
1222 input: Input,
1223 ) -> Result<AcceptOutcome, RuntimeDriverError> {
1224 let (driver, wake_tx) = {
1225 let sessions = self.sessions.read().await;
1226 let entry = sessions
1227 .get(session_id)
1228 .ok_or(RuntimeDriverError::NotReady {
1229 state: RuntimeState::Destroyed,
1230 })?;
1231 (entry.driver.clone(), entry.wake_sender())
1232 };
1233
1234 let (outcome, should_wake, should_process) = {
1236 let mut driver = driver.lock().await;
1237 let result = driver.as_driver_mut().accept_input(input).await?;
1238 let wake = driver.take_wake_requested();
1239 let process_now = driver.take_process_requested();
1240 (result, wake, process_now)
1241 };
1242
1243 if (should_wake || should_process)
1245 && let Some(ref wake_tx) = wake_tx
1246 {
1247 let _ = wake_tx.try_send(());
1249 }
1250
1251 Ok(outcome)
1252 }
1253
1254 async fn accept_input_with_completion(
1255 &self,
1256 session_id: &SessionId,
1257 input: Input,
1258 ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
1259 {
1260 RuntimeSessionAdapter::accept_input_with_completion(self, session_id, input).await
1261 }
1262
1263 async fn runtime_state(
1264 &self,
1265 session_id: &SessionId,
1266 ) -> Result<RuntimeState, RuntimeDriverError> {
1267 let driver = {
1268 let sessions = self.sessions.read().await;
1269 let entry = sessions
1270 .get(session_id)
1271 .ok_or(RuntimeDriverError::NotReady {
1272 state: RuntimeState::Destroyed,
1273 })?;
1274 entry.driver.clone()
1275 };
1276 let driver = driver.lock().await;
1277 Ok(driver.as_driver().runtime_state())
1278 }
1279
1280 async fn retire_runtime(
1281 &self,
1282 session_id: &SessionId,
1283 ) -> Result<RetireReport, RuntimeDriverError> {
1284 let (driver_handle, completions, wake_tx) = {
1285 let sessions = self.sessions.read().await;
1286 let entry = sessions
1287 .get(session_id)
1288 .ok_or(RuntimeDriverError::NotReady {
1289 state: RuntimeState::Destroyed,
1290 })?;
1291 (
1292 entry.driver.clone(),
1293 entry.completions.clone(),
1294 entry.wake_sender(),
1295 )
1296 };
1297 let mut driver = driver_handle.lock().await;
1298 let mut report = driver.as_driver_mut().retire().await?;
1299 drop(driver); if report.inputs_pending_drain > 0 {
1302 if let Some(ref wake_tx) = wake_tx
1305 && wake_tx.send(()).await.is_ok()
1306 {
1307 return Ok(report);
1308 }
1309
1310 let mut driver = driver_handle.lock().await;
1314 let abandoned = driver
1315 .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
1316 .await?;
1317 drop(driver);
1318 let mut completions = completions.lock().await;
1319 completions.resolve_all_terminated("retired without runtime loop");
1320 report.inputs_abandoned += abandoned;
1321 report.inputs_pending_drain = 0;
1322 }
1323
1324 Ok(report)
1325 }
1326
1327 async fn reset_runtime(
1328 &self,
1329 session_id: &SessionId,
1330 ) -> Result<ResetReport, RuntimeDriverError> {
1331 let (driver, completions) = {
1332 let sessions = self.sessions.read().await;
1333 let entry = sessions
1334 .get(session_id)
1335 .ok_or(RuntimeDriverError::NotReady {
1336 state: RuntimeState::Destroyed,
1337 })?;
1338 (entry.driver.clone(), entry.completions.clone())
1339 };
1340 let mut driver = driver.lock().await;
1341 if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
1342 return Err(RuntimeDriverError::NotReady {
1343 state: RuntimeState::Running,
1344 });
1345 }
1346 let report = driver.as_driver_mut().reset().await?;
1347 drop(driver);
1348
1349 let mut completions = completions.lock().await;
1351 completions.resolve_all_terminated("runtime reset");
1352
1353 Ok(report)
1354 }
1355
1356 async fn input_state(
1357 &self,
1358 session_id: &SessionId,
1359 input_id: &InputId,
1360 ) -> Result<Option<InputState>, RuntimeDriverError> {
1361 let driver = {
1362 let sessions = self.sessions.read().await;
1363 let entry = sessions
1364 .get(session_id)
1365 .ok_or(RuntimeDriverError::NotReady {
1366 state: RuntimeState::Destroyed,
1367 })?;
1368 entry.driver.clone()
1369 };
1370 let driver = driver.lock().await;
1371 Ok(driver.as_driver().input_state(input_id).cloned())
1372 }
1373
1374 async fn list_active_inputs(
1375 &self,
1376 session_id: &SessionId,
1377 ) -> Result<Vec<InputId>, RuntimeDriverError> {
1378 let driver = {
1379 let sessions = self.sessions.read().await;
1380 let entry = sessions
1381 .get(session_id)
1382 .ok_or(RuntimeDriverError::NotReady {
1383 state: RuntimeState::Destroyed,
1384 })?;
1385 entry.driver.clone()
1386 };
1387 let driver = driver.lock().await;
1388 Ok(driver.as_driver().active_input_ids())
1389 }
1390}
1391
1392impl RuntimeSessionAdapter {
1397 fn resolve_session_id(
1403 runtime_id: &LogicalRuntimeId,
1404 ) -> Result<SessionId, RuntimeControlPlaneError> {
1405 runtime_id
1406 .0
1407 .parse::<uuid::Uuid>()
1408 .map(SessionId)
1409 .map_err(|_| RuntimeControlPlaneError::NotFound(runtime_id.clone()))
1410 }
1411
1412 async fn lookup_entry(
1415 &self,
1416 runtime_id: &LogicalRuntimeId,
1417 ) -> Result<
1418 (
1419 SessionId,
1420 SharedDriver,
1421 SharedCompletionRegistry,
1422 Option<mpsc::Sender<()>>,
1423 ),
1424 RuntimeControlPlaneError,
1425 > {
1426 let session_id = Self::resolve_session_id(runtime_id)?;
1427 let sessions = self.sessions.read().await;
1428 let entry = sessions
1429 .get(&session_id)
1430 .ok_or_else(|| RuntimeControlPlaneError::NotFound(runtime_id.clone()))?;
1431 Ok((
1432 session_id,
1433 entry.driver.clone(),
1434 entry.completions.clone(),
1435 entry.wake_sender(),
1436 ))
1437 }
1438}
1439
1440#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
1441#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
1442impl crate::traits::RuntimeControlPlane for RuntimeSessionAdapter {
1443 async fn ingest(
1444 &self,
1445 runtime_id: &LogicalRuntimeId,
1446 input: Input,
1447 ) -> Result<AcceptOutcome, RuntimeControlPlaneError> {
1448 let (session_id, driver, _completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1449 let _ = session_id;
1450
1451 let (outcome, should_wake, should_process) = {
1452 let mut drv = driver.lock().await;
1453 let result = drv
1454 .as_driver_mut()
1455 .accept_input(input)
1456 .await
1457 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1458 let wake = drv.take_wake_requested();
1459 let process_now = drv.take_process_requested();
1460 (result, wake, process_now)
1461 };
1462
1463 if (should_wake || should_process)
1464 && let Some(ref tx) = wake_tx
1465 {
1466 let _ = tx.try_send(());
1467 }
1468
1469 Ok(outcome)
1470 }
1471
1472 async fn publish_event(
1473 &self,
1474 event: crate::runtime_event::RuntimeEventEnvelope,
1475 ) -> Result<(), RuntimeControlPlaneError> {
1476 let runtime_id = event.runtime_id.clone();
1477 let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(&runtime_id).await?;
1478
1479 let mut drv = driver.lock().await;
1480 drv.as_driver_mut()
1481 .on_runtime_event(event)
1482 .await
1483 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))
1484 }
1485
1486 async fn retire(
1487 &self,
1488 runtime_id: &LogicalRuntimeId,
1489 ) -> Result<RetireReport, RuntimeControlPlaneError> {
1490 let (session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1491 let _ = session_id;
1492
1493 let mut drv = driver.lock().await;
1494 let mut report = drv
1495 .as_driver_mut()
1496 .retire()
1497 .await
1498 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1499 drop(drv);
1500
1501 if report.inputs_pending_drain > 0 {
1502 if let Some(ref tx) = wake_tx
1503 && tx.send(()).await.is_ok()
1504 {
1505 return Ok(report);
1506 }
1507
1508 let mut drv = driver.lock().await;
1510 let abandoned = drv
1511 .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
1512 .await
1513 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1514 drop(drv);
1515 let mut comp = completions.lock().await;
1516 comp.resolve_all_terminated("retired without runtime loop");
1517 report.inputs_abandoned += abandoned;
1518 report.inputs_pending_drain = 0;
1519 }
1520
1521 Ok(report)
1522 }
1523
1524 async fn recycle(
1525 &self,
1526 runtime_id: &LogicalRuntimeId,
1527 ) -> Result<RecycleReport, RuntimeControlPlaneError> {
1528 let (_session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1529
1530 let (transferred, active_after_recycle) = {
1531 let mut drv = driver.lock().await;
1532 let state = drv.as_driver().runtime_state();
1533 if matches!(state, RuntimeState::Running) {
1534 return Err(RuntimeControlPlaneError::InvalidState { state });
1535 }
1536 let should_restore_attached = matches!(state, RuntimeState::Attached);
1537
1538 let transferred = match &mut *drv {
1539 DriverEntry::Ephemeral(driver) => driver
1540 .recycle_preserving_work()
1541 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
1542 DriverEntry::Persistent(driver) => driver
1543 .recycle_preserving_work()
1544 .await
1545 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
1546 };
1547
1548 if should_restore_attached
1549 && matches!(drv.as_driver().runtime_state(), RuntimeState::Idle)
1550 {
1551 drv.attach()
1552 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1553 }
1554
1555 let active_after_recycle = drv.as_driver().active_input_ids();
1556 (transferred, active_after_recycle)
1557 };
1558
1559 {
1562 let pending_after: HashSet<InputId> = active_after_recycle.into_iter().collect();
1563 let mut comp = completions.lock().await;
1564 comp.resolve_not_pending(
1565 |input_id| pending_after.contains(input_id),
1566 "recycled input no longer pending",
1567 );
1568 }
1569
1570 if let Some(ref tx) = wake_tx {
1572 let _ = tx.try_send(());
1573 }
1574
1575 Ok(RecycleReport {
1576 inputs_transferred: transferred,
1577 })
1578 }
1579
1580 async fn reset(
1581 &self,
1582 runtime_id: &LogicalRuntimeId,
1583 ) -> Result<crate::traits::ResetReport, RuntimeControlPlaneError> {
1584 let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
1585
1586 let mut drv = driver.lock().await;
1587 if matches!(drv.as_driver().runtime_state(), RuntimeState::Running) {
1588 return Err(RuntimeControlPlaneError::InvalidState {
1589 state: RuntimeState::Running,
1590 });
1591 }
1592 let report = drv
1593 .as_driver_mut()
1594 .reset()
1595 .await
1596 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1597 drop(drv);
1598
1599 let mut comp = completions.lock().await;
1600 comp.resolve_all_terminated("runtime reset");
1601
1602 Ok(report)
1603 }
1604
1605 async fn recover(
1606 &self,
1607 runtime_id: &LogicalRuntimeId,
1608 ) -> Result<RecoveryReport, RuntimeControlPlaneError> {
1609 let (_session_id, driver, _completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1610
1611 let mut drv = driver.lock().await;
1612 let report = drv
1613 .as_driver_mut()
1614 .recover()
1615 .await
1616 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1617 drop(drv);
1618
1619 if let Some(ref tx) = wake_tx {
1620 let _ = tx.try_send(());
1621 }
1622
1623 Ok(report)
1624 }
1625
1626 async fn destroy(
1627 &self,
1628 runtime_id: &LogicalRuntimeId,
1629 ) -> Result<DestroyReport, RuntimeControlPlaneError> {
1630 let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
1631
1632 let mut drv = driver.lock().await;
1633 let report = drv
1634 .as_driver_mut()
1635 .destroy()
1636 .await
1637 .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1638 drop(drv);
1639
1640 let mut comp = completions.lock().await;
1641 comp.resolve_all_terminated("runtime destroyed");
1642
1643 Ok(report)
1644 }
1645
1646 async fn runtime_state(
1647 &self,
1648 runtime_id: &LogicalRuntimeId,
1649 ) -> Result<RuntimeState, RuntimeControlPlaneError> {
1650 let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
1651
1652 let drv = driver.lock().await;
1653 Ok(drv.as_driver().runtime_state())
1654 }
1655
1656 async fn load_boundary_receipt(
1657 &self,
1658 runtime_id: &LogicalRuntimeId,
1659 run_id: &RunId,
1660 sequence: u64,
1661 ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeControlPlaneError> {
1662 match &self.store {
1663 Some(store) => store
1664 .load_boundary_receipt(runtime_id, run_id, sequence)
1665 .await
1666 .map_err(|e| RuntimeControlPlaneError::StoreError(e.to_string())),
1667 None => {
1668 Ok(None)
1670 }
1671 }
1672 }
1673}
1674
1675#[cfg(test)]
1676#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
1677mod tests {
1678 use super::*;
1679 use std::sync::atomic::{AtomicBool, Ordering};
1680 use std::time::Duration;
1681
1682 use meerkat_core::agent::{CommsCapabilityError, CommsRuntime};
1683 use meerkat_core::comms_drain_lifecycle_authority::{CommsDrainMode, CommsDrainPhase};
1684 use tokio::sync::Notify;
1685
1686 struct FakeDrainRuntime {
1687 notify: Arc<Notify>,
1688 dismiss: AtomicBool,
1689 }
1690
1691 impl FakeDrainRuntime {
1692 fn dismissing() -> Self {
1693 Self {
1694 notify: Arc::new(Notify::new()),
1695 dismiss: AtomicBool::new(true),
1696 }
1697 }
1698
1699 fn idle() -> Self {
1700 Self {
1701 notify: Arc::new(Notify::new()),
1702 dismiss: AtomicBool::new(false),
1703 }
1704 }
1705 }
1706
1707 #[async_trait::async_trait]
1708 impl CommsRuntime for FakeDrainRuntime {
1709 async fn drain_messages(&self) -> Vec<String> {
1710 Vec::new()
1711 }
1712
1713 fn inbox_notify(&self) -> Arc<Notify> {
1714 Arc::clone(&self.notify)
1715 }
1716
1717 fn dismiss_received(&self) -> bool {
1718 self.dismiss.load(Ordering::Acquire)
1719 }
1720
1721 async fn drain_classified_inbox_interactions(
1722 &self,
1723 ) -> Result<Vec<meerkat_core::interaction::ClassifiedInboxInteraction>, CommsCapabilityError>
1724 {
1725 Ok(Vec::new())
1726 }
1727 }
1728
1729 async fn spawn_test_comms_drain(
1730 adapter: &Arc<RuntimeSessionAdapter>,
1731 session_id: &SessionId,
1732 mode: CommsDrainMode,
1733 comms_runtime: Arc<dyn CommsRuntime>,
1734 idle_timeout: Duration,
1735 ) {
1736 adapter.register_session(session_id.clone()).await;
1737 let mut slots = adapter.comms_drain_slots.write().await;
1738 let slot = slots
1739 .entry(session_id.clone())
1740 .or_insert_with(CommsDrainSlot::new);
1741 let result = protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode)
1742 .expect("ensure running");
1743 let obligation = result
1744 .obligation
1745 .expect("spawn obligation should be present");
1746
1747 apply_runtime_drain_effects(slot, &result.effects);
1748 for effect in &result.effects {
1749 if let CommsDrainLifecycleEffect::SpawnDrainTask { .. } = effect {
1750 slot.handle = Some(crate::comms_drain::spawn_comms_drain(
1751 Arc::clone(adapter),
1752 session_id.clone(),
1753 Arc::clone(&comms_runtime),
1754 Some(idle_timeout),
1755 ));
1756 }
1757 }
1758
1759 let feedback_effects =
1760 protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation)
1761 .expect("task spawned");
1762 apply_runtime_drain_effects(slot, &feedback_effects);
1763 }
1764
1765 async fn current_phase(
1766 adapter: &Arc<RuntimeSessionAdapter>,
1767 session_id: &SessionId,
1768 ) -> Option<CommsDrainPhase> {
1769 let slots = adapter.comms_drain_slots.read().await;
1770 slots.get(session_id).map(|slot| slot.authority.phase())
1771 }
1772
1773 async fn handle_present(adapter: &Arc<RuntimeSessionAdapter>, session_id: &SessionId) -> bool {
1774 let slots = adapter.comms_drain_slots.read().await;
1775 slots
1776 .get(session_id)
1777 .and_then(|slot| slot.handle.as_ref())
1778 .is_some()
1779 }
1780
1781 async fn wait_for_phase(
1782 adapter: &Arc<RuntimeSessionAdapter>,
1783 session_id: &SessionId,
1784 expected: CommsDrainPhase,
1785 ) {
1786 tokio::time::timeout(Duration::from_secs(1), async {
1787 loop {
1788 if current_phase(adapter, session_id).await == Some(expected) {
1789 break;
1790 }
1791 tokio::time::sleep(Duration::from_millis(5)).await;
1792 }
1793 })
1794 .await
1795 .expect("phase transition");
1796 }
1797
1798 #[tokio::test]
1799 async fn dismiss_exit_updates_authority_before_join() {
1800 let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
1801 let session_id = SessionId::new();
1802 let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::dismissing());
1803
1804 spawn_test_comms_drain(
1805 &adapter,
1806 &session_id,
1807 CommsDrainMode::PersistentHost,
1808 comms_runtime,
1809 Duration::from_millis(25),
1810 )
1811 .await;
1812
1813 wait_for_phase(&adapter, &session_id, CommsDrainPhase::Stopped).await;
1814 assert!(
1815 !handle_present(&adapter, &session_id).await,
1816 "drain task should clear its slot before wait_comms_drain joins"
1817 );
1818
1819 adapter.wait_comms_drain(&session_id).await;
1820 assert_eq!(
1821 current_phase(&adapter, &session_id).await,
1822 Some(CommsDrainPhase::Stopped)
1823 );
1824 }
1825
1826 #[tokio::test]
1827 async fn idle_timeout_updates_authority_before_join() {
1828 let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
1829 let session_id = SessionId::new();
1830 let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::idle());
1831
1832 spawn_test_comms_drain(
1833 &adapter,
1834 &session_id,
1835 CommsDrainMode::Timed,
1836 comms_runtime,
1837 Duration::from_millis(25),
1838 )
1839 .await;
1840
1841 wait_for_phase(&adapter, &session_id, CommsDrainPhase::Stopped).await;
1842 assert!(
1843 !handle_present(&adapter, &session_id).await,
1844 "drain task should clear its slot before wait_comms_drain joins"
1845 );
1846
1847 adapter.wait_comms_drain(&session_id).await;
1848 assert_eq!(
1849 current_phase(&adapter, &session_id).await,
1850 Some(CommsDrainPhase::Stopped)
1851 );
1852 }
1853
1854 #[tokio::test]
1855 async fn unregister_session_aborts_and_removes_drain_slot() {
1856 let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
1857 let session_id = SessionId::new();
1858 let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::idle());
1859
1860 adapter.register_session(session_id.clone()).await;
1861 spawn_test_comms_drain(
1862 &adapter,
1863 &session_id,
1864 CommsDrainMode::PersistentHost,
1865 comms_runtime,
1866 Duration::from_secs(60),
1867 )
1868 .await;
1869
1870 assert_eq!(
1871 current_phase(&adapter, &session_id).await,
1872 Some(CommsDrainPhase::Running)
1873 );
1874 assert!(handle_present(&adapter, &session_id).await);
1875
1876 adapter.unregister_session(&session_id).await;
1877
1878 let slots = adapter.comms_drain_slots.read().await;
1879 assert!(
1880 !slots.contains_key(&session_id),
1881 "unregister must remove the comms drain slot entirely"
1882 );
1883 }
1884}