Skip to main content

meerkat_runtime/
comms_sink.rs

1//! Concrete RuntimeInputSink implementation.
2//!
3//! Routes host-mode comms interactions through the RuntimeSessionAdapter
4//! instead of calling Agent::run() directly.
5
6use std::sync::Arc;
7
8use meerkat_core::agent::RuntimeInputSink;
9use meerkat_core::interaction::InboxInteraction;
10use meerkat_core::types::SessionId;
11
12use crate::comms_bridge::interaction_to_peer_input;
13use crate::identifiers::LogicalRuntimeId;
14use crate::input::{
15    Input, InputDurability, InputHeader, InputOrigin, InputVisibility, SystemGeneratedInput,
16};
17use crate::service_ext::SessionServiceRuntimeExt as _;
18use crate::session_adapter::RuntimeSessionAdapter;
19
20/// Routes host-mode comms interactions through the runtime adapter.
21///
22/// Awaits only admission (durable-before-ack), NOT execution completion —
23/// the host loop continues immediately after the input is accepted.
24pub struct RuntimeCommsInputSink {
25    adapter: Arc<RuntimeSessionAdapter>,
26    session_id: SessionId,
27    runtime_id: LogicalRuntimeId,
28}
29
30impl RuntimeCommsInputSink {
31    pub fn new(adapter: Arc<RuntimeSessionAdapter>, session_id: SessionId) -> Self {
32        let runtime_id = LogicalRuntimeId::new(session_id.to_string());
33        Self {
34            adapter,
35            session_id,
36            runtime_id,
37        }
38    }
39}
40
41#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
42#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
43impl RuntimeInputSink for RuntimeCommsInputSink {
44    async fn accept_peer_input(&self, interaction: InboxInteraction) -> Result<(), String> {
45        let input = interaction_to_peer_input(&interaction, &self.runtime_id);
46        self.adapter
47            .accept_input(&self.session_id, input)
48            .await
49            .map(|_| ())
50            .map_err(|e| e.to_string())
51    }
52
53    async fn accept_continuation(&self) -> Result<(), String> {
54        let input = Input::SystemGenerated(SystemGeneratedInput {
55            header: InputHeader {
56                id: meerkat_core::lifecycle::InputId::new(),
57                timestamp: chrono::Utc::now(),
58                source: InputOrigin::System,
59                durability: InputDurability::Ephemeral,
60                visibility: InputVisibility::default(),
61                idempotency_key: None,
62                supersession_key: None,
63                correlation_id: None,
64            },
65            generator: "comms_host_continuation".into(),
66            content: "continuation after terminal response injection".into(),
67        });
68        self.adapter
69            .accept_input(&self.session_id, input)
70            .await
71            .map(|_| ())
72            .map_err(|e| e.to_string())
73    }
74}