use crate::error::MobError;
use crate::runtime::bridge::MobBoundMemberRuntimeBridge;
use crate::runtime::bridge_protocol::{
BridgeAck, BridgeDeliveryOutcome, BridgeDeliveryRejectionCause, BridgeDeliveryResponse,
BridgeDestroyResponse, BridgeMemberRuntimeState, BridgeObservationResponse,
BridgePeerConnectivity, BridgePeerSpec, BridgeRetireResponse,
};
use async_trait::async_trait;
use meerkat_core::types::{ContentInput, HandlingMode, SessionId};
use meerkat_runtime::MeerkatMachine;
use meerkat_runtime::identifiers::LogicalRuntimeId;
#[allow(unused_imports)]
use meerkat_runtime::service_ext::SessionServiceRuntimeExt as _;
use std::sync::Arc;
pub struct LocalMobRuntimeBridge {
machine: Arc<MeerkatMachine>,
session_id: SessionId,
}
impl LocalMobRuntimeBridge {
pub fn new(machine: Arc<MeerkatMachine>, session_id: SessionId) -> Self {
Self {
machine,
session_id,
}
}
}
fn runtime_state_to_bridge(
state: meerkat_runtime::RuntimeState,
) -> Result<BridgeMemberRuntimeState, MobError> {
let state = match state {
meerkat_runtime::RuntimeState::Initializing => BridgeMemberRuntimeState::Initializing,
meerkat_runtime::RuntimeState::Idle => BridgeMemberRuntimeState::Idle,
meerkat_runtime::RuntimeState::Attached => BridgeMemberRuntimeState::Attached,
meerkat_runtime::RuntimeState::Running => BridgeMemberRuntimeState::Running,
meerkat_runtime::RuntimeState::Retired => BridgeMemberRuntimeState::Retired,
meerkat_runtime::RuntimeState::Stopped => BridgeMemberRuntimeState::Stopped,
meerkat_runtime::RuntimeState::Destroyed => BridgeMemberRuntimeState::Destroyed,
_ => return Err(MobError::Internal(
"unknown RuntimeState observed over LocalMobRuntimeBridge; bridge state mapping must be extended before it can classify terminality".to_string(),
)),
};
Ok(state)
}
fn bridge_delivery_rejection_cause(
reason: &meerkat_runtime::RejectReason,
) -> BridgeDeliveryRejectionCause {
match reason {
meerkat_runtime::RejectReason::NotReady { state } => {
BridgeDeliveryRejectionCause::NotReady {
state: match runtime_state_to_bridge(*state) {
Ok(state) => state,
Err(err) => {
return BridgeDeliveryRejectionCause::Internal {
detail: err.to_string(),
};
}
},
}
}
meerkat_runtime::RejectReason::DurabilityViolation { detail } => {
BridgeDeliveryRejectionCause::DurabilityViolation {
detail: detail.clone(),
}
}
meerkat_runtime::RejectReason::PeerHandlingModeInvalid { detail } => {
BridgeDeliveryRejectionCause::PeerHandlingModeInvalid {
detail: detail.clone(),
}
}
_ => BridgeDeliveryRejectionCause::Internal {
detail: reason.to_string(),
},
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl MobBoundMemberRuntimeBridge for LocalMobRuntimeBridge {
async fn authorize_supervisor(&self) -> Result<BridgeAck, MobError> {
Ok(BridgeAck { ok: true })
}
async fn revoke_supervisor(&self) -> Result<BridgeAck, MobError> {
Ok(BridgeAck { ok: true })
}
async fn deliver_member_input(
&self,
input_id: &str,
content: ContentInput,
handling_mode: HandlingMode,
) -> Result<BridgeDeliveryResponse, MobError> {
use meerkat_runtime::input::{
Input, InputDurability, InputHeader, InputOrigin, InputVisibility, PeerConvention,
PeerInput,
};
let (body, blocks) = match content {
ContentInput::Text(body) => (body, None),
ContentInput::Blocks(blocks) => {
let body = meerkat_core::types::text_content(&blocks);
(body, Some(blocks))
}
};
let input = Input::Peer(PeerInput {
header: InputHeader {
id: meerkat_core::lifecycle::InputId::new(),
timestamp: chrono::Utc::now(),
source: InputOrigin::Peer {
peer_id: format!("local-bridge:{}", self.session_id),
display_identity: Some(format!("local-bridge:{}", self.session_id)),
runtime_id: Some(LogicalRuntimeId::for_session(&self.session_id)),
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: Some(meerkat_runtime::identifiers::IdempotencyKey::new(
input_id.to_string(),
)),
supersession_key: None,
correlation_id: None,
},
convention: Some(PeerConvention::Message),
body,
payload: None,
blocks,
handling_mode: match handling_mode {
HandlingMode::Queue => None,
mode => Some(mode),
},
});
match self
.machine
.accept_input_without_wake(&self.session_id, input)
.await
{
Ok(outcome) => {
let response = match outcome {
meerkat_runtime::AcceptOutcome::Accepted { input_id: id, .. } => {
BridgeDeliveryResponse {
input_id: input_id.to_string(),
canonical_input_id: Some(id.to_string()),
outcome: BridgeDeliveryOutcome::Accepted,
}
}
meerkat_runtime::AcceptOutcome::Deduplicated { existing_id, .. } => {
let existing_id = existing_id.to_string();
BridgeDeliveryResponse {
input_id: input_id.to_string(),
canonical_input_id: Some(existing_id.clone()),
outcome: BridgeDeliveryOutcome::Deduplicated {
existing_input_id: existing_id,
},
}
}
meerkat_runtime::AcceptOutcome::Rejected { reason } => {
let cause = bridge_delivery_rejection_cause(&reason);
BridgeDeliveryResponse {
input_id: input_id.to_string(),
canonical_input_id: None,
outcome: BridgeDeliveryOutcome::Rejected {
cause,
reason: reason.to_string(),
},
}
}
_ => BridgeDeliveryResponse {
input_id: input_id.to_string(),
canonical_input_id: None,
outcome: BridgeDeliveryOutcome::Rejected {
cause: BridgeDeliveryRejectionCause::Internal {
detail: "unexpected accept outcome".to_string(),
},
reason: "unexpected accept outcome".to_string(),
},
},
};
Ok(response)
}
Err(error) => Err(MobError::Internal(format!(
"local deliver_member_input failed: {error}"
))),
}
}
async fn observe_member(&self) -> Result<BridgeObservationResponse, MobError> {
use meerkat_runtime::service_ext::SessionServiceRuntimeExt as _;
let state = self
.machine
.runtime_state(&self.session_id)
.await
.map_err(|error| MobError::Internal(format!("observe_member failed: {error}")))?;
let current_run_id = self
.machine
.meerkat_machine_spine_snapshot(&self.session_id)
.await
.and_then(|snapshot| {
snapshot
.control
.current_run_id
.map(|run_id| run_id.to_string())
});
Ok(BridgeObservationResponse::new(
runtime_state_to_bridge(state)?,
Some(state.can_accept_input()),
current_run_id,
Some(BridgePeerConnectivity::Reachable),
None,
chrono::Utc::now().to_rfc3339(),
))
}
async fn interrupt_member(&self) -> Result<BridgeAck, MobError> {
match self.machine.cancel_after_boundary(&self.session_id).await {
Ok(()) => {}
Err(meerkat_runtime::RuntimeDriverError::NotReady {
state: meerkat_runtime::RuntimeState::Retired,
}) => {
}
Err(error) => {
return Err(MobError::Internal(format!(
"local interrupt_member failed: {error}"
)));
}
}
Ok(BridgeAck { ok: true })
}
async fn retire_member(&self) -> Result<BridgeRetireResponse, MobError> {
match self.machine.retire_runtime(&self.session_id).await {
Ok(report) => Ok(BridgeRetireResponse {
inputs_abandoned: report.inputs_abandoned,
inputs_pending_drain: report.inputs_pending_drain,
}),
Err(error) => Err(MobError::Internal(format!(
"local retire_member failed: {error}"
))),
}
}
async fn destroy_member(&self) -> Result<BridgeDestroyResponse, MobError> {
use meerkat_runtime::traits::RuntimeControlPlane;
let runtime_id = LogicalRuntimeId::for_session(&self.session_id);
let report = RuntimeControlPlane::destroy(self.machine.as_ref(), &runtime_id)
.await
.map_err(|error| MobError::Internal(format!("local destroy_member failed: {error}")))?;
Ok(BridgeDestroyResponse {
inputs_abandoned: report.inputs_abandoned,
})
}
async fn wire_member(&self, _peer_spec: BridgePeerSpec) -> Result<BridgeAck, MobError> {
Err(MobError::Internal(
"local bridge wire_member called — callers must branch on MemberRef".to_string(),
))
}
async fn unwire_member(&self, _peer_spec: BridgePeerSpec) -> Result<BridgeAck, MobError> {
Err(MobError::Internal(
"local bridge unwire_member called — callers must branch on MemberRef".to_string(),
))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorBoundaryHandle, CoreExecutorError,
CoreExecutorInterruptHandle,
};
use meerkat_core::lifecycle::run_primitive::RunPrimitive;
use meerkat_core::lifecycle::{RunApplyBoundary, RunBoundaryReceipt, RunId};
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Notify;
#[tokio::test]
async fn local_bridge_observe_returns_idle_for_registered_session() {
let machine = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
machine.register_session(session_id.clone()).await;
let bridge = LocalMobRuntimeBridge::new(machine, session_id);
let observation = bridge.observe_member().await.unwrap();
assert_eq!(observation.state, BridgeMemberRuntimeState::Idle);
assert!(observation.current_run_id.is_none());
}
#[tokio::test]
async fn local_bridge_retire_returns_report() {
let machine = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
machine.register_session(session_id.clone()).await;
let bridge = LocalMobRuntimeBridge::new(machine, session_id);
let report = bridge.retire_member().await.unwrap();
assert_eq!(report.inputs_abandoned, 0);
assert_eq!(report.inputs_pending_drain, 0);
}
#[tokio::test]
async fn local_bridge_interrupt_retired_runtime_is_terminal_noop() {
let machine = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
machine.register_session(session_id.clone()).await;
let bridge = LocalMobRuntimeBridge::new(machine, session_id);
bridge.retire_member().await.unwrap();
let ack = bridge.interrupt_member().await.unwrap();
assert!(ack.ok);
}
#[tokio::test]
async fn local_bridge_interrupt_member_uses_boundary_cancel_not_hard_cancel() {
struct BoundaryHandle {
calls: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl CoreExecutorBoundaryHandle for BoundaryHandle {
async fn cancel_after_boundary(
&self,
_reason: String,
) -> Result<(), CoreExecutorError> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
struct InterruptHandle {
calls: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl CoreExecutorInterruptHandle for InterruptHandle {
async fn hard_cancel_current_run(
&self,
_reason: String,
) -> Result<(), CoreExecutorError> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
struct BlockingExecutor {
boundary_calls: Arc<AtomicUsize>,
interrupt_calls: Arc<AtomicUsize>,
apply_started: Arc<Notify>,
apply_finished: Arc<Notify>,
allow_finish: Arc<Notify>,
}
#[async_trait::async_trait]
impl CoreExecutor for BlockingExecutor {
fn boundary_handle(&self) -> Option<Arc<dyn CoreExecutorBoundaryHandle>> {
Some(Arc::new(BoundaryHandle {
calls: Arc::clone(&self.boundary_calls),
}))
}
fn interrupt_handle(&self) -> Option<Arc<dyn CoreExecutorInterruptHandle>> {
Some(Arc::new(InterruptHandle {
calls: Arc::clone(&self.interrupt_calls),
}))
}
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
self.apply_started.notify_waiters();
self.allow_finish.notified().await;
self.apply_finished.notify_waiters();
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
terminal: None,
})
}
async fn cancel_after_boundary(
&mut self,
_reason: String,
) -> Result<(), CoreExecutorError> {
Ok(())
}
async fn stop_runtime_executor(
&mut self,
_reason: String,
) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let machine = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
let boundary_calls = Arc::new(AtomicUsize::new(0));
let interrupt_calls = Arc::new(AtomicUsize::new(0));
let apply_started = Arc::new(Notify::new());
let apply_finished = Arc::new(Notify::new());
let allow_finish = Arc::new(Notify::new());
machine
.register_session_with_executor(
session_id.clone(),
Box::new(BlockingExecutor {
boundary_calls: Arc::clone(&boundary_calls),
interrupt_calls: Arc::clone(&interrupt_calls),
apply_started: Arc::clone(&apply_started),
apply_finished: Arc::clone(&apply_finished),
allow_finish: Arc::clone(&allow_finish),
}),
)
.await;
let input =
meerkat_runtime::input::Input::Prompt(meerkat_runtime::input::PromptInput::new(
"local bridge running turn",
Some(
meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata {
handling_mode: Some(HandlingMode::Steer),
..Default::default()
},
),
));
let (outcome, _completion) = machine
.accept_input_with_completion(&session_id, input)
.await
.expect("attached prompt should be accepted");
assert!(outcome.is_accepted());
tokio::time::timeout(std::time::Duration::from_secs(1), apply_started.notified())
.await
.expect("attached prompt should start running");
let bridge = LocalMobRuntimeBridge::new(Arc::clone(&machine), session_id);
let ack = bridge.interrupt_member().await.unwrap();
assert!(ack.ok);
assert_eq!(
boundary_calls.load(Ordering::SeqCst),
1,
"local bridge interrupt must use cooperative boundary authority"
);
assert_eq!(
interrupt_calls.load(Ordering::SeqCst),
0,
"local bridge interrupt must not mint user hard-cancel authority"
);
allow_finish.notify_waiters();
tokio::time::timeout(std::time::Duration::from_secs(1), apply_finished.notified())
.await
.expect("attached prompt should finish after release");
}
#[tokio::test]
async fn local_bridge_authorize_is_noop() {
let machine = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
let bridge = LocalMobRuntimeBridge::new(machine, session_id);
let ack = bridge.authorize_supervisor().await.unwrap();
assert!(ack.ok);
}
fn sample_peer_spec() -> BridgePeerSpec {
BridgePeerSpec {
name: "peer-a".to_string(),
peer_id: "peer-a-id".to_string(),
address: "inproc://peer-a".to_string(),
pubkey: [0u8; 32],
}
}
#[tokio::test]
async fn local_bridge_wire_is_programming_error() {
let machine = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
let bridge = LocalMobRuntimeBridge::new(machine, session_id);
let err = bridge.wire_member(sample_peer_spec()).await.unwrap_err();
match err {
MobError::Internal(reason) => {
assert_eq!(
reason,
"local bridge wire_member called — callers must branch on MemberRef",
);
}
other => panic!("expected MobError::Internal, got {other:?}"),
}
}
#[tokio::test]
async fn local_bridge_unwire_is_programming_error() {
let machine = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
let bridge = LocalMobRuntimeBridge::new(machine, session_id);
let err = bridge.unwire_member(sample_peer_spec()).await.unwrap_err();
match err {
MobError::Internal(reason) => {
assert_eq!(
reason,
"local bridge unwire_member called — callers must branch on MemberRef",
);
}
other => panic!("expected MobError::Internal, got {other:?}"),
}
}
#[tokio::test]
async fn local_bridge_destroy_returns_report() {
let machine = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
machine.register_session(session_id.clone()).await;
let bridge = LocalMobRuntimeBridge::new(machine, session_id);
let report = bridge.destroy_member().await.unwrap();
assert_eq!(report.inputs_abandoned, 0);
}
#[tokio::test]
async fn local_bridge_observe_with_unregistered_session_surfaces_internal_error() {
let machine = Arc::new(MeerkatMachine::ephemeral());
let bridge = LocalMobRuntimeBridge::new(machine, SessionId::new());
let err = bridge
.observe_member()
.await
.expect_err("observe on unregistered session must return MobError, not succeed");
match err {
MobError::Internal(reason) => {
assert!(
reason.starts_with("observe_member failed:"),
"error should identify the observe path, got: {reason}"
);
}
other => panic!("expected MobError::Internal, got {other:?}"),
}
}
}