Skip to main content

agent_sdk_core/application/
realtime.rs

1//! Application-layer coordination over core primitives. Use these services to lower
2//! helpers, drive runs, validate output, coordinate tools, approvals, delivery,
3//! isolation, telemetry, and feature layers. Methods in this layer may call
4//! configured ports, mutate in-memory stores, append journals, or publish events as
5//! documented. This file contains the realtime portion of that contract.
6//!
7use std::sync::Arc;
8
9use serde::{Deserialize, Serialize};
10
11use crate::{
12    domain::{
13        AgentError, AgentErrorKind, AgentId, DestinationKind, DestinationRef, EntityKind,
14        EntityRef, RetryClassification, RunId, SourceRef,
15    },
16    journal::{
17        JournalCursor, JournalRecord, JournalRecordBase, JournalRecordKind, JournalRecordPayload,
18    },
19    journal_ports::RunJournal,
20    ports::realtime::{RealtimeConnectRequest, RealtimeProviderAdapter},
21    realtime_records::{
22        RealtimeBackpressureState, RealtimeCloseReason, RealtimeConnectionId, RealtimeInputFrame,
23        RealtimeMediaKind, RealtimeResponseId, RealtimeSessionId, RealtimeSessionRecord,
24        RealtimeSessionRecordKind, RealtimeSessionState, RealtimeSessionStatus,
25    },
26    stream_records::{
27        StreamChannel, StreamCursor, StreamCursorPrecision, StreamDirection, safe_id_fragment,
28    },
29};
30
31#[derive(Clone)]
32/// Holds realtime session controller application-layer state or configuration.
33/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
34pub struct RealtimeSessionController {
35    sidecar: crate::package::realtime::RealtimeSessionSidecar,
36    adapter: Arc<dyn RealtimeProviderAdapter>,
37    journal: Arc<dyn RunJournal>,
38    run_id: RunId,
39    agent_id: AgentId,
40    source: SourceRef,
41    runtime_package_fingerprint: String,
42    next_journal_seq: u64,
43    state: Option<RealtimeSessionState>,
44}
45
46impl RealtimeSessionController {
47    /// Creates a new application::realtime value with explicit
48    /// caller-provided inputs. This constructor is data-only and
49    /// performs no I/O or external side effects.
50    pub fn new(
51        sidecar: crate::package::realtime::RealtimeSessionSidecar,
52        adapter: Arc<dyn RealtimeProviderAdapter>,
53        journal: Arc<dyn RunJournal>,
54        run_id: RunId,
55        agent_id: AgentId,
56        source: SourceRef,
57        runtime_package_fingerprint: impl Into<String>,
58    ) -> Self {
59        Self {
60            sidecar,
61            adapter,
62            journal,
63            run_id,
64            agent_id,
65            source,
66            runtime_package_fingerprint: runtime_package_fingerprint.into(),
67            next_journal_seq: 1,
68            state: None,
69        }
70    }
71
72    /// Connect.
73    /// This appends realtime connection intent/result records through the journal path and
74    /// calls the configured realtime adapter to open the session.
75    pub fn connect(&mut self) -> Result<RealtimeSessionRecord, AgentError> {
76        self.sidecar.validate()?;
77        let session_id = RealtimeSessionId::new(format!(
78            "realtime.session.{}",
79            safe_id_fragment(self.run_id.as_str())
80        ));
81        let backpressure_state = RealtimeBackpressureState::bounded(
82            self.sidecar.queue_capacity,
83            self.sidecar.backpressure_policy_ref.clone(),
84        );
85        let requested_state = RealtimeSessionState {
86            session_id: session_id.clone(),
87            connection_id: RealtimeConnectionId::new(format!(
88                "realtime.connection.pending.{}",
89                safe_id_fragment(self.run_id.as_str())
90            )),
91            provider_route_ref: self.sidecar.provider_route_ref.clone(),
92            send_cursor: StreamCursor::chunk(0),
93            receive_cursor: StreamCursor::chunk(0),
94            restart_count: 0,
95            backpressure_state: backpressure_state.clone(),
96            lifecycle_status: RealtimeSessionStatus::Connecting,
97            policy_refs: self.sidecar.policy_refs(),
98        };
99        let requested = self.record(
100            &requested_state,
101            RealtimeSessionRecordKind::ConnectRequested,
102            "realtime connect requested before adapter call",
103        );
104        self.append_realtime_record(requested)?;
105
106        let response = self.adapter.connect(RealtimeConnectRequest {
107            session_id: session_id.clone(),
108            provider_route_ref: self.sidecar.provider_route_ref.clone(),
109            realtime_capability_ref: self.sidecar.realtime_capability_ref.clone(),
110        })?;
111        let state = RealtimeSessionState {
112            session_id,
113            connection_id: response.connection_id,
114            provider_route_ref: self.sidecar.provider_route_ref.clone(),
115            send_cursor: StreamCursor::chunk(0),
116            receive_cursor: StreamCursor::chunk(0),
117            restart_count: 0,
118            backpressure_state,
119            lifecycle_status: RealtimeSessionStatus::Connected,
120            policy_refs: self.sidecar.policy_refs(),
121        };
122        let record = self.record(
123            &state,
124            RealtimeSessionRecordKind::Connected,
125            "realtime session connected",
126        );
127        self.append_realtime_record(record.clone())?;
128        self.state = Some(state);
129        Ok(record)
130    }
131
132    /// Send.
133    /// This journals the realtime send path and forwards one frame to the configured realtime
134    /// adapter.
135    pub fn send(&mut self, frame: RealtimeInputFrame) -> Result<RealtimeSessionRecord, AgentError> {
136        if self
137            .state
138            .as_ref()
139            .is_some_and(|state| state.lifecycle_status == RealtimeSessionStatus::RestartStarted)
140        {
141            return self.apply_backpressure(frame);
142        }
143
144        let state = self.connected_state()?.clone();
145        let mut requested = self.record(
146            &state,
147            RealtimeSessionRecordKind::InputSendRequested,
148            frame.redacted_summary.clone(),
149        );
150        requested.channel = StreamChannel::RealtimeMedia;
151        requested.direction = Some(StreamDirection::InputToProvider);
152        requested.media_kind = frame.media_kind;
153        requested.content_refs = frame.content_refs.clone();
154        requested.privacy = frame.privacy;
155        requested.retention = frame.retention;
156        self.append_realtime_record(requested)?;
157
158        self.adapter.send(&state.session_id, frame.clone())?;
159        let mut state = state;
160        state.send_cursor = StreamCursor {
161            chunk_sequence: state.send_cursor.chunk_sequence + 1,
162            byte_offset: 0,
163            precision: StreamCursorPrecision::ChunkSequenceOnly,
164            label: Some("send".to_string()),
165        };
166        state.lifecycle_status = RealtimeSessionStatus::InputSent;
167        let mut record = self.record(
168            &state,
169            RealtimeSessionRecordKind::InputSent,
170            frame.redacted_summary.clone(),
171        );
172        record.channel = StreamChannel::RealtimeMedia;
173        record.direction = Some(StreamDirection::InputToProvider);
174        record.media_kind = frame.media_kind;
175        record.content_refs = frame.content_refs;
176        record.privacy = frame.privacy;
177        record.retention = frame.retention;
178        self.append_realtime_record(record.clone())?;
179        self.state = Some(state);
180        Ok(record)
181    }
182
183    /// Receives one realtime output frame through the configured adapter.
184    /// This records a receive request, calls the adapter, appends a received record when output is
185    /// available, and updates the in-memory session cursor.
186    pub fn receive(&mut self) -> Result<Option<RealtimeSessionRecord>, AgentError> {
187        let state = self.connected_state()?.clone();
188        let requested = self.record(
189            &state,
190            RealtimeSessionRecordKind::OutputReceiveRequested,
191            "realtime receive requested before adapter call",
192        );
193        self.append_realtime_record(requested)?;
194        let Some(frame) = self.adapter.receive(&state.session_id)? else {
195            return Ok(None);
196        };
197        let mut state = state;
198        state.receive_cursor = StreamCursor {
199            chunk_sequence: state.receive_cursor.chunk_sequence + 1,
200            byte_offset: 0,
201            precision: StreamCursorPrecision::ChunkSequenceOnly,
202            label: Some("receive".to_string()),
203        };
204        state.lifecycle_status = RealtimeSessionStatus::OutputReceived;
205        let mut record = self.record(
206            &state,
207            RealtimeSessionRecordKind::OutputReceived,
208            frame.redacted_summary,
209        );
210        record.channel = StreamChannel::RealtimeTranscript;
211        record.direction = Some(StreamDirection::OutputFromProvider);
212        record.media_kind = frame.media_kind;
213        record.response_id = Some(frame.response_id);
214        record.content_refs = frame.content_refs;
215        record.privacy = frame.privacy;
216        record.retention = frame.retention;
217        self.append_realtime_record(record.clone())?;
218        self.state = Some(state);
219        Ok(Some(record))
220    }
221
222    /// Interrupt.
223    /// This records the interrupt path and sends the configured realtime interruption frame to
224    /// the adapter session.
225    pub fn interrupt(
226        &mut self,
227        response_id: impl Into<String>,
228    ) -> Result<RealtimeSessionRecord, AgentError> {
229        let response_id = RealtimeResponseId::new(response_id);
230        let state = self.connected_state()?.clone();
231        let mut requested = self.record(
232            &state,
233            RealtimeSessionRecordKind::InterruptRequested,
234            "realtime interrupt requested before adapter call",
235        );
236        requested.response_id = Some(response_id.clone());
237        self.append_realtime_record(requested)?;
238        let mut record = self.record(
239            &state,
240            RealtimeSessionRecordKind::Interrupted,
241            "realtime interruption acknowledged by adapter",
242        );
243        record.status = RealtimeSessionStatus::Interrupted;
244        record.response_id = Some(response_id.clone());
245        self.adapter.interrupt(&state.session_id, &response_id)?;
246        let mut next = state.clone();
247        next.lifecycle_status = RealtimeSessionStatus::Interrupted;
248        self.append_realtime_record(record.clone())?;
249        self.state = Some(next);
250        Ok(record)
251    }
252
253    /// Marks the active realtime session as beginning a restart.
254    /// This appends restart-requested and restart-started records and updates session state; the
255    /// adapter restart call happens in `complete_restart`.
256    pub fn begin_restart(&mut self) -> Result<Vec<RealtimeSessionRecord>, AgentError> {
257        let state = self.connected_state()?.clone();
258        let mut requested = self.record(
259            &state,
260            RealtimeSessionRecordKind::RestartRequested,
261            "realtime restart requested",
262        );
263        requested.status = RealtimeSessionStatus::RestartRequested;
264
265        let mut started_state = state;
266        started_state.lifecycle_status = RealtimeSessionStatus::RestartStarted;
267        let mut started = self.record(
268            &started_state,
269            RealtimeSessionRecordKind::RestartStarted,
270            "realtime restart started; outbound frames gated",
271        );
272        started.status = RealtimeSessionStatus::RestartStarted;
273        self.append_realtime_record(requested.clone())?;
274        self.append_realtime_record(started.clone())?;
275        self.state = Some(started_state);
276        Ok(vec![requested, started])
277    }
278
279    /// Complete restart.
280    /// This records restart completion and updates session state after the adapter reports
281    /// success.
282    pub fn complete_restart(&mut self) -> Result<Vec<RealtimeSessionRecord>, AgentError> {
283        let state = self.connected_state()?.clone();
284        let response = match self
285            .adapter
286            .restart(&state.session_id, &state.connection_id)
287        {
288            Ok(response) => response,
289            Err(error) => {
290                let mut failed_state = state.clone();
291                failed_state.lifecycle_status = RealtimeSessionStatus::RestartFailed;
292                let mut failed = self.record(
293                    &failed_state,
294                    RealtimeSessionRecordKind::RestartFailed,
295                    error.context().message,
296                );
297                failed.status = RealtimeSessionStatus::RestartFailed;
298                self.append_realtime_record(failed.clone())?;
299                self.state = Some(failed_state);
300                return Ok(vec![failed]);
301            }
302        };
303
304        let mut completed_state = state;
305        completed_state.connection_id = response.connection_id;
306        completed_state.restart_count += 1;
307        completed_state.lifecycle_status = RealtimeSessionStatus::RestartCompleted;
308        let mut completed = self.record(
309            &completed_state,
310            RealtimeSessionRecordKind::RestartCompleted,
311            "realtime restart completed",
312        );
313        completed.status = RealtimeSessionStatus::RestartCompleted;
314        self.append_realtime_record(completed.clone())?;
315        self.state = Some(completed_state);
316        Ok(vec![completed])
317    }
318
319    /// Close.
320    /// This journals close intent/result and calls the realtime adapter to close the active
321    /// session.
322    pub fn close(
323        &mut self,
324        reason: RealtimeCloseReason,
325    ) -> Result<RealtimeSessionRecord, AgentError> {
326        let state = self.connected_state()?.clone();
327        let mut requested = self.record(
328            &state,
329            RealtimeSessionRecordKind::CloseRequested,
330            "realtime close requested before adapter call",
331        );
332        requested.close_reason = Some(reason);
333        self.append_realtime_record(requested)?;
334        self.adapter.close(&state.session_id, reason)?;
335        let mut closed_state = state;
336        closed_state.lifecycle_status = RealtimeSessionStatus::Closed;
337        let mut record = self.record(
338            &closed_state,
339            RealtimeSessionRecordKind::Closed,
340            "realtime session closed",
341        );
342        record.status = RealtimeSessionStatus::Closed;
343        record.close_reason = Some(reason);
344        self.append_realtime_record(record.clone())?;
345        self.state = Some(closed_state);
346        Ok(record)
347    }
348
349    fn apply_backpressure(
350        &mut self,
351        frame: RealtimeInputFrame,
352    ) -> Result<RealtimeSessionRecord, AgentError> {
353        let state = self.connected_state()?.clone();
354        let mut gated_state = state;
355        gated_state.backpressure_state = gated_state.backpressure_state.clone().gate();
356        gated_state.lifecycle_status = RealtimeSessionStatus::BackpressureApplied;
357        let mut record = self.record(
358            &gated_state,
359            RealtimeSessionRecordKind::BackpressureApplied,
360            "outbound realtime frame gated during restart",
361        );
362        record.channel = StreamChannel::RealtimeMedia;
363        record.direction = Some(StreamDirection::InputToProvider);
364        record.media_kind = frame.media_kind;
365        record.content_refs = frame.content_refs;
366        record.privacy = frame.privacy;
367        record.retention = frame.retention;
368        self.append_realtime_record(record.clone())?;
369        self.state = Some(gated_state);
370        Ok(record)
371    }
372
373    fn connected_state(&self) -> Result<&RealtimeSessionState, AgentError> {
374        self.state.as_ref().ok_or_else(|| {
375            AgentError::contract_violation("realtime session must connect before use")
376        })
377    }
378
379    fn record(
380        &self,
381        state: &RealtimeSessionState,
382        kind: RealtimeSessionRecordKind,
383        redacted_summary: impl Into<String>,
384    ) -> RealtimeSessionRecord {
385        let _ = (&self.source, &self.runtime_package_fingerprint);
386        RealtimeSessionRecord {
387            kind,
388            session_id: state.session_id.clone(),
389            connection_id: Some(state.connection_id.clone()),
390            response_id: None,
391            run_id: self.run_id.clone(),
392            agent_id: self.agent_id.clone(),
393            provider_route_ref: state.provider_route_ref.clone(),
394            send_cursor: state.send_cursor.clone(),
395            receive_cursor: state.receive_cursor.clone(),
396            restart_count: state.restart_count,
397            backpressure_state: state.backpressure_state.clone(),
398            status: state.lifecycle_status,
399            close_reason: None,
400            channel: StreamChannel::RealtimeTranscript,
401            direction: None,
402            media_kind: RealtimeMediaKind::Transcript,
403            content_refs: Vec::new(),
404            policy_refs: state.policy_refs.clone(),
405            privacy: crate::domain::PrivacyClass::ContentRefsOnly,
406            retention: crate::domain::RetentionClass::RunScoped,
407            redacted_summary: redacted_summary.into(),
408            effect_intent_ref: None,
409            effect_result_ref: None,
410            effect_intent: None,
411            effect_result: None,
412        }
413    }
414
415    fn append_realtime_record(
416        &mut self,
417        record: RealtimeSessionRecord,
418    ) -> Result<JournalCursor, AgentError> {
419        let mut base = JournalRecordBase::new(
420            self.next_journal_seq,
421            format!(
422                "journal.record.{}",
423                record.event_kind_name().replace('_', ".")
424            ),
425            self.run_id.clone(),
426            self.agent_id.clone(),
427            self.source.clone(),
428        );
429        self.next_journal_seq += 1;
430        base.destination = Some(DestinationRef::with_kind(
431            DestinationKind::Provider,
432            record.provider_route_ref.clone(),
433        ));
434        base.runtime_package_fingerprint = self.runtime_package_fingerprint.clone();
435        base.privacy = record.privacy;
436        base.redaction_policy_id = record
437            .policy_refs
438            .first()
439            .map(|policy| policy.as_str().to_string())
440            .unwrap_or_else(|| "policy.redaction.realtime.default".to_string());
441        base.tags = vec!["feature:realtime".to_string()];
442        let subject_ref = EntityRef::new(EntityKind::RealtimeSession, record.session_id.as_str());
443        self.journal
444            .append(JournalRecord::feature_record(
445                base,
446                JournalRecordKind::RealtimeSession,
447                "realtime",
448                record.event_kind_name(),
449                subject_ref,
450                Vec::new(),
451                record.content_refs.clone(),
452                JournalRecordPayload::RealtimeSession(record),
453            ))
454            .map_err(journal_failure)
455    }
456}
457
458fn journal_failure(error: AgentError) -> AgentError {
459    AgentError::new(
460        AgentErrorKind::JournalFailure,
461        RetryClassification::RepairNeeded,
462        error.context().message,
463    )
464}
465
466#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
467/// Holds realtime completion gate application-layer state or configuration.
468/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
469pub struct RealtimeCompletionGate {
470    /// Whether final visible output seen is enabled.
471    /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
472    pub final_visible_output_seen: bool,
473    /// Whether terminal event replayable is enabled.
474    /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
475    pub terminal_event_replayable: bool,
476    /// Whether stream-intervention processing has reached its terminal completion gate.
477    /// Run completion should wait for this when stream rules can mask, abort, or retry output.
478    pub stream_interventions_terminal: bool,
479    /// Whether realtime sessions terminal is enabled.
480    /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
481    pub realtime_sessions_terminal: bool,
482    /// Output delivery setting or policy.
483    /// Delivery coordinators use it to decide sink mode, dedupe, and required evidence.
484    pub output_delivery_terminal: bool,
485    /// Whether approvals terminal is enabled.
486    /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
487    pub approvals_terminal: bool,
488    /// Whether journal terminal is enabled.
489    /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
490    pub journal_terminal: bool,
491}
492
493impl RealtimeCompletionGate {
494    /// Mark final visible output.
495    /// This marks the in-memory completion gate for final visible output and does not publish
496    /// events.
497    pub fn mark_final_visible_output(&mut self) {
498        self.final_visible_output_seen = true;
499    }
500
501    /// Mark terminal event replayable.
502    /// This flips the in-memory completion gate for replayable terminal events.
503    pub fn mark_terminal_event_replayable(&mut self) {
504        self.terminal_event_replayable = true;
505    }
506
507    /// Mark stream interventions terminal.
508    /// This operates on realtime session or completion-gate state only.
509    pub fn mark_stream_interventions_terminal(&mut self) {
510        self.stream_interventions_terminal = true;
511    }
512
513    /// Mark realtime sessions terminal.
514    /// This flips the in-memory completion gate for terminal realtime sessions.
515    pub fn mark_realtime_sessions_terminal(&mut self) {
516        self.realtime_sessions_terminal = true;
517    }
518
519    /// Mark output delivery terminal.
520    /// This flips the in-memory completion gate for terminal output delivery.
521    pub fn mark_output_delivery_terminal(&mut self) {
522        self.output_delivery_terminal = true;
523    }
524
525    /// Mark approvals terminal.
526    /// This marks the in-memory completion gate for terminal approvals and does not publish
527    /// events.
528    pub fn mark_approvals_terminal(&mut self) {
529        self.approvals_terminal = true;
530    }
531
532    /// Mark journal terminal.
533    /// This marks the in-memory completion gate for terminal journal state and does not append
534    /// a record.
535    pub fn mark_journal_terminal(&mut self) {
536        self.journal_terminal = true;
537    }
538
539    /// Returns whether can complete run applies for this contract.
540    /// This reads the realtime completion gates and does not mutate state.
541    pub fn can_complete_run(&self) -> bool {
542        self.final_visible_output_seen
543            && self.terminal_event_replayable
544            && self.stream_interventions_terminal
545            && self.realtime_sessions_terminal
546            && self.output_delivery_terminal
547            && self.approvals_terminal
548            && self.journal_terminal
549    }
550}