Skip to main content

meerkat_llm_core/
realtime_session.rs

1//! Provider-neutral realtime session traits for product-layer channel clients.
2//!
3//! These types live in `meerkat-client` because they describe provider transport
4//! capabilities and normalized event mapping, not runtime lifecycle truth.
5
6use async_trait::async_trait;
7use meerkat_contracts::{
8    RealtimeAudioChunk, RealtimeCapabilities, RealtimeEvent, RealtimeInputChunk,
9    RealtimeTurningMode, RealtimeVideoChunk,
10};
11use meerkat_core::{PendingSystemContextAppend, RealtimeTranscriptEvent, ToolResult};
12use meerkat_core::{SessionLlmIdentity, StopReason, ToolDef, types::Message, types::Usage};
13use serde_json::Value;
14
15use crate::LlmError;
16
17/// Advanced/internal target for attaching to an existing provider session.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct RealtimeExternalSessionTarget {
20    pub provider_session_id: String,
21}
22
23impl RealtimeExternalSessionTarget {
24    /// Construct a provider session target, rejecting blank identifiers.
25    pub fn new(provider_session_id: impl Into<String>) -> Result<Self, LlmError> {
26        let provider_session_id = provider_session_id.into();
27        if provider_session_id.trim().is_empty() {
28            return Err(LlmError::InvalidRequest {
29                message: "provider realtime session id must not be empty".to_string(),
30            });
31        }
32        Ok(Self {
33            provider_session_id,
34        })
35    }
36}
37
38/// Provider-neutral realtime event stream.
39#[derive(Debug, Clone, PartialEq)]
40pub enum RealtimeSessionEvent {
41    InputTranscriptPartial {
42        text: String,
43    },
44    InputTranscriptFinal {
45        text: String,
46    },
47    InputTranscriptFinalForItem {
48        item_id: String,
49        previous_item_id: Option<String>,
50        content_index: u32,
51        text: String,
52    },
53    TurnStarted,
54    TurnCommitted,
55    TurnCompleted {
56        response_id: String,
57        stop_reason: StopReason,
58        usage: Usage,
59    },
60    OutputTextDelta {
61        delta: String,
62    },
63    OutputTextDeltaForItem {
64        response_id: String,
65        delta_id: String,
66        item_id: String,
67        previous_item_id: Option<String>,
68        content_index: u32,
69        delta: String,
70    },
71    OutputAudioChunk {
72        chunk: RealtimeAudioChunk,
73    },
74    OutputVideoChunk {
75        chunk: RealtimeVideoChunk,
76    },
77    Interrupted {
78        response_id: Option<String>,
79    },
80    ToolCallRequested {
81        call_id: String,
82        tool_name: String,
83        arguments: Value,
84    },
85    /// The assistant output identified by `item_id` was truncated at
86    /// `audio_played_ms` because the user barged in. `truncated_text` is the
87    /// heard prefix, or `None` if the provider has not yet re-projected it.
88    AssistantTranscriptTruncated {
89        response_id: Option<String>,
90        item_id: String,
91        audio_played_ms: u64,
92        truncated_text: Option<String>,
93    },
94    /// Identity-bearing transcript event for providers that need to expose an
95    /// ordering/append fact without an otherwise public channel event.
96    RealtimeTranscript {
97        event: RealtimeTranscriptEvent,
98    },
99}
100
101impl RealtimeSessionEvent {
102    /// Project an internal provider event into the public channel event shape.
103    #[must_use]
104    pub fn to_public_event(&self) -> Option<RealtimeEvent> {
105        Some(match self {
106            Self::InputTranscriptPartial { text } => {
107                RealtimeEvent::InputTranscriptPartial { text: text.clone() }
108            }
109            Self::InputTranscriptFinal { text } => RealtimeEvent::InputTranscriptFinal {
110                text: text.clone(),
111                // Provider-layer prosody annotations are not surfaced
112                // through the internal adapter today; when a provider
113                // starts exposing structured prosody, the adapter fills
114                // this field before emitting the public event.
115                prosody_hint: None,
116            },
117            Self::InputTranscriptFinalForItem { text, .. } => RealtimeEvent::InputTranscriptFinal {
118                text: text.clone(),
119                prosody_hint: None,
120            },
121            Self::TurnStarted => RealtimeEvent::TurnStarted,
122            Self::TurnCommitted => RealtimeEvent::TurnCommitted,
123            Self::TurnCompleted { .. } => RealtimeEvent::TurnCompleted,
124            Self::OutputTextDelta { delta } => RealtimeEvent::OutputTextDelta {
125                delta: delta.clone(),
126            },
127            Self::OutputTextDeltaForItem { delta, .. } => RealtimeEvent::OutputTextDelta {
128                delta: delta.clone(),
129            },
130            Self::OutputAudioChunk { chunk } => RealtimeEvent::OutputAudioChunk {
131                chunk: chunk.clone(),
132            },
133            Self::OutputVideoChunk { chunk } => RealtimeEvent::OutputVideoChunk {
134                chunk: chunk.clone(),
135            },
136            Self::Interrupted { .. } => RealtimeEvent::Interrupted,
137            Self::ToolCallRequested {
138                call_id, tool_name, ..
139            } => RealtimeEvent::ToolCallRequested {
140                call_id: call_id.clone(),
141                tool_name: tool_name.clone(),
142            },
143            Self::AssistantTranscriptTruncated {
144                response_id: _,
145                item_id,
146                audio_played_ms,
147                truncated_text,
148            } => RealtimeEvent::AssistantTranscriptTruncated {
149                item_id: item_id.clone(),
150                audio_played_ms: *audio_played_ms,
151                truncated_text: truncated_text.clone(),
152            },
153            Self::RealtimeTranscript { .. } => return None,
154        })
155    }
156}
157
158/// Provider-neutral realtime session surface.
159#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
160#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
161pub trait RealtimeSession: Send {
162    /// Report the product-facing capability set the provider can honor.
163    fn capabilities(&self) -> &RealtimeCapabilities;
164
165    /// Report the turning mode selected when the session was opened.
166    fn turning_mode(&self) -> RealtimeTurningMode;
167
168    /// Refresh the provider's projection of canonical Meerkat session state.
169    ///
170    /// This is projection-only: canonical Meerkat history, visible tools, and
171    /// related policy remain the semantic owner. Providers update their local
172    /// session view from the latest canonical open config before the next user
173    /// turn, rather than becoming a second owner of conversation truth.
174    async fn refresh_projection(
175        &mut self,
176        open_config: &RealtimeSessionOpenConfig,
177    ) -> Result<(), LlmError>;
178
179    /// Stream one modality-neutral input chunk into the provider session.
180    async fn send_input(&mut self, chunk: RealtimeInputChunk) -> Result<(), LlmError>;
181
182    /// Commit the staged turn when the session is using explicit commit.
183    async fn commit_turn(&mut self) -> Result<(), LlmError>;
184
185    /// Interrupt the currently active provider response, if any.
186    async fn interrupt(&mut self) -> Result<(), LlmError>;
187
188    /// Truncate the assistant output for `item_id` to `audio_played_ms` so the
189    /// canonical session transcript reflects what the user actually heard
190    /// before barging in. The adapter is expected to eventually emit
191    /// [`RealtimeSessionEvent::AssistantTranscriptTruncated`] with the
192    /// re-projected prefix (or a best-effort approximation if the provider
193    /// cannot supply exact text).
194    async fn truncate_assistant_output(
195        &mut self,
196        item_id: String,
197        content_index: u32,
198        audio_played_ms: u64,
199    ) -> Result<(), LlmError>;
200
201    /// Submit a completed tool result back into the provider session so its
202    /// response can continue.
203    async fn submit_tool_result(&mut self, result: ToolResult) -> Result<(), LlmError>;
204
205    /// Submit a tool-dispatch error back into the provider session.
206    async fn submit_tool_error(&mut self, call_id: String, error: String) -> Result<(), LlmError>;
207
208    /// Read the next normalized realtime session event.
209    async fn next_event(&mut self) -> Result<Option<RealtimeSessionEvent>, LlmError>;
210
211    /// Close the provider session and release any local transport state.
212    async fn close(&mut self) -> Result<(), LlmError>;
213}
214
215/// Canonical live session projection used to open a provider-backed realtime session.
216///
217/// This is the product-session equivalent of a build seam: the provider session
218/// must be opened from the currently-owned Meerkat session identity, visible
219/// tools, and committed transcript instead of inventing a parallel provider-only
220/// conversation.
221#[derive(Debug, Clone)]
222pub struct RealtimeSessionOpenConfig {
223    pub turning_mode: RealtimeTurningMode,
224    pub llm_identity: SessionLlmIdentity,
225    pub visible_tools: Vec<ToolDef>,
226    pub seed_messages: Vec<Message>,
227    /// Runtime-authored system context carried as typed provenance.
228    ///
229    /// Provider adapters must treat this as the only authoritative realtime
230    /// reconstruction source for runtime context. Rendered transcript markers
231    /// are projections only and must not be parsed back into authority.
232    pub runtime_system_context: Vec<PendingSystemContextAppend>,
233    /// Per-channel override for the "nudge the provider" timeout the OpenAI
234    /// adapter uses while waiting for the first real delta after a turn is
235    /// admitted. `None` inherits the adapter's compile-time default.
236    pub response_nudge_timeout_ms: Option<u64>,
237    /// Per-channel override for the maximum number of nudge attempts before
238    /// the adapter gives up. `None` inherits the adapter default.
239    pub response_nudge_max_attempts: Option<u8>,
240}
241
242impl RealtimeSessionOpenConfig {
243    #[must_use]
244    pub fn new(
245        turning_mode: RealtimeTurningMode,
246        llm_identity: SessionLlmIdentity,
247        visible_tools: Vec<ToolDef>,
248        seed_messages: Vec<Message>,
249    ) -> Self {
250        Self {
251            turning_mode,
252            llm_identity,
253            visible_tools,
254            seed_messages,
255            runtime_system_context: Vec::new(),
256            response_nudge_timeout_ms: None,
257            response_nudge_max_attempts: None,
258        }
259    }
260
261    /// Builder-style typed runtime context for provider reconstruction.
262    #[must_use]
263    pub fn with_runtime_system_context(
264        mut self,
265        runtime_system_context: Vec<PendingSystemContextAppend>,
266    ) -> Self {
267        self.runtime_system_context = runtime_system_context;
268        self
269    }
270
271    /// Builder-style override for the per-channel nudge timeout.
272    #[must_use]
273    pub fn with_response_nudge_timeout_ms(mut self, timeout_ms: Option<u64>) -> Self {
274        self.response_nudge_timeout_ms = timeout_ms;
275        self
276    }
277
278    /// Builder-style override for the per-channel nudge max attempts.
279    #[must_use]
280    pub fn with_response_nudge_max_attempts(mut self, max_attempts: Option<u8>) -> Self {
281        self.response_nudge_max_attempts = max_attempts;
282        self
283    }
284}
285
286/// Factory for provider-neutral realtime sessions.
287#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
288#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
289pub trait RealtimeSessionFactory: Send + Sync {
290    /// Report the provider/product capability set exposed by this factory.
291    fn capabilities(&self) -> RealtimeCapabilities;
292
293    /// Open a provider-created realtime session using the selected turning mode.
294    async fn open_session(
295        &self,
296        open_config: &RealtimeSessionOpenConfig,
297    ) -> Result<Box<dyn RealtimeSession>, LlmError>;
298
299    /// Attach to an existing provider-managed realtime session.
300    async fn attach_external_session(
301        &self,
302        target: &RealtimeExternalSessionTarget,
303        turning_mode: RealtimeTurningMode,
304    ) -> Result<Box<dyn RealtimeSession>, LlmError>;
305}
306
307#[cfg(test)]
308#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
309mod tests {
310    use super::*;
311
312    #[test]
313    fn external_session_target_rejects_blank_provider_id() {
314        let error = match RealtimeExternalSessionTarget::new("   ") {
315            Ok(_) => panic!("blank provider id must fail"),
316            Err(error) => error,
317        };
318        assert!(matches!(error, LlmError::InvalidRequest { .. }));
319    }
320
321    #[test]
322    fn tool_call_projection_strips_internal_arguments_for_public_event() {
323        let public = RealtimeSessionEvent::ToolCallRequested {
324            call_id: "call_1".to_string(),
325            tool_name: "lookup".to_string(),
326            arguments: serde_json::json!({ "q": "otter" }),
327        }
328        .to_public_event();
329
330        assert_eq!(
331            public,
332            Some(RealtimeEvent::ToolCallRequested {
333                call_id: "call_1".to_string(),
334                tool_name: "lookup".to_string(),
335            })
336        );
337    }
338}