1use std::sync::Arc;
8
9use serde::{Deserialize, Serialize};
10
11use crate::{
12 domain::{
13 AgentError, AgentErrorKind, AgentId, DestinationKind, DestinationRef, EntityKind,
14 EntityRef, RetryClassification, RunId, SourceRef,
15 },
16 journal::{
17 JournalCursor, JournalRecord, JournalRecordBase, JournalRecordKind, JournalRecordPayload,
18 },
19 journal_ports::RunJournal,
20 ports::realtime::{RealtimeConnectRequest, RealtimeProviderAdapter},
21 realtime_records::{
22 RealtimeBackpressureState, RealtimeCloseReason, RealtimeConnectionId, RealtimeInputFrame,
23 RealtimeMediaKind, RealtimeResponseId, RealtimeSessionId, RealtimeSessionRecord,
24 RealtimeSessionRecordKind, RealtimeSessionState, RealtimeSessionStatus,
25 },
26 stream_records::{
27 StreamChannel, StreamCursor, StreamCursorPrecision, StreamDirection, safe_id_fragment,
28 },
29};
30
31#[derive(Clone)]
32pub struct RealtimeSessionController {
35 sidecar: crate::package::realtime::RealtimeSessionSidecar,
36 adapter: Arc<dyn RealtimeProviderAdapter>,
37 journal: Arc<dyn RunJournal>,
38 run_id: RunId,
39 agent_id: AgentId,
40 source: SourceRef,
41 runtime_package_fingerprint: String,
42 next_journal_seq: u64,
43 state: Option<RealtimeSessionState>,
44}
45
46impl RealtimeSessionController {
47 pub fn new(
51 sidecar: crate::package::realtime::RealtimeSessionSidecar,
52 adapter: Arc<dyn RealtimeProviderAdapter>,
53 journal: Arc<dyn RunJournal>,
54 run_id: RunId,
55 agent_id: AgentId,
56 source: SourceRef,
57 runtime_package_fingerprint: impl Into<String>,
58 ) -> Self {
59 Self {
60 sidecar,
61 adapter,
62 journal,
63 run_id,
64 agent_id,
65 source,
66 runtime_package_fingerprint: runtime_package_fingerprint.into(),
67 next_journal_seq: 1,
68 state: None,
69 }
70 }
71
72 pub fn connect(&mut self) -> Result<RealtimeSessionRecord, AgentError> {
76 self.sidecar.validate()?;
77 let session_id = RealtimeSessionId::new(format!(
78 "realtime.session.{}",
79 safe_id_fragment(self.run_id.as_str())
80 ));
81 let backpressure_state = RealtimeBackpressureState::bounded(
82 self.sidecar.queue_capacity,
83 self.sidecar.backpressure_policy_ref.clone(),
84 );
85 let requested_state = RealtimeSessionState {
86 session_id: session_id.clone(),
87 connection_id: RealtimeConnectionId::new(format!(
88 "realtime.connection.pending.{}",
89 safe_id_fragment(self.run_id.as_str())
90 )),
91 provider_route_ref: self.sidecar.provider_route_ref.clone(),
92 send_cursor: StreamCursor::chunk(0),
93 receive_cursor: StreamCursor::chunk(0),
94 restart_count: 0,
95 backpressure_state: backpressure_state.clone(),
96 lifecycle_status: RealtimeSessionStatus::Connecting,
97 policy_refs: self.sidecar.policy_refs(),
98 };
99 let requested = self.record(
100 &requested_state,
101 RealtimeSessionRecordKind::ConnectRequested,
102 "realtime connect requested before adapter call",
103 );
104 self.append_realtime_record(requested)?;
105
106 let response = self.adapter.connect(RealtimeConnectRequest {
107 session_id: session_id.clone(),
108 provider_route_ref: self.sidecar.provider_route_ref.clone(),
109 realtime_capability_ref: self.sidecar.realtime_capability_ref.clone(),
110 })?;
111 let state = RealtimeSessionState {
112 session_id,
113 connection_id: response.connection_id,
114 provider_route_ref: self.sidecar.provider_route_ref.clone(),
115 send_cursor: StreamCursor::chunk(0),
116 receive_cursor: StreamCursor::chunk(0),
117 restart_count: 0,
118 backpressure_state,
119 lifecycle_status: RealtimeSessionStatus::Connected,
120 policy_refs: self.sidecar.policy_refs(),
121 };
122 let record = self.record(
123 &state,
124 RealtimeSessionRecordKind::Connected,
125 "realtime session connected",
126 );
127 self.append_realtime_record(record.clone())?;
128 self.state = Some(state);
129 Ok(record)
130 }
131
132 pub fn send(&mut self, frame: RealtimeInputFrame) -> Result<RealtimeSessionRecord, AgentError> {
136 if self
137 .state
138 .as_ref()
139 .is_some_and(|state| state.lifecycle_status == RealtimeSessionStatus::RestartStarted)
140 {
141 return self.apply_backpressure(frame);
142 }
143
144 let state = self.connected_state()?.clone();
145 let mut requested = self.record(
146 &state,
147 RealtimeSessionRecordKind::InputSendRequested,
148 frame.redacted_summary.clone(),
149 );
150 requested.channel = StreamChannel::RealtimeMedia;
151 requested.direction = Some(StreamDirection::InputToProvider);
152 requested.media_kind = frame.media_kind;
153 requested.content_refs = frame.content_refs.clone();
154 requested.privacy = frame.privacy;
155 requested.retention = frame.retention;
156 self.append_realtime_record(requested)?;
157
158 self.adapter.send(&state.session_id, frame.clone())?;
159 let mut state = state;
160 state.send_cursor = StreamCursor {
161 chunk_sequence: state.send_cursor.chunk_sequence + 1,
162 byte_offset: 0,
163 precision: StreamCursorPrecision::ChunkSequenceOnly,
164 label: Some("send".to_string()),
165 };
166 state.lifecycle_status = RealtimeSessionStatus::InputSent;
167 let mut record = self.record(
168 &state,
169 RealtimeSessionRecordKind::InputSent,
170 frame.redacted_summary.clone(),
171 );
172 record.channel = StreamChannel::RealtimeMedia;
173 record.direction = Some(StreamDirection::InputToProvider);
174 record.media_kind = frame.media_kind;
175 record.content_refs = frame.content_refs;
176 record.privacy = frame.privacy;
177 record.retention = frame.retention;
178 self.append_realtime_record(record.clone())?;
179 self.state = Some(state);
180 Ok(record)
181 }
182
183 pub fn receive(&mut self) -> Result<Option<RealtimeSessionRecord>, AgentError> {
187 let state = self.connected_state()?.clone();
188 let requested = self.record(
189 &state,
190 RealtimeSessionRecordKind::OutputReceiveRequested,
191 "realtime receive requested before adapter call",
192 );
193 self.append_realtime_record(requested)?;
194 let Some(frame) = self.adapter.receive(&state.session_id)? else {
195 return Ok(None);
196 };
197 let mut state = state;
198 state.receive_cursor = StreamCursor {
199 chunk_sequence: state.receive_cursor.chunk_sequence + 1,
200 byte_offset: 0,
201 precision: StreamCursorPrecision::ChunkSequenceOnly,
202 label: Some("receive".to_string()),
203 };
204 state.lifecycle_status = RealtimeSessionStatus::OutputReceived;
205 let mut record = self.record(
206 &state,
207 RealtimeSessionRecordKind::OutputReceived,
208 frame.redacted_summary,
209 );
210 record.channel = StreamChannel::RealtimeTranscript;
211 record.direction = Some(StreamDirection::OutputFromProvider);
212 record.media_kind = frame.media_kind;
213 record.response_id = Some(frame.response_id);
214 record.content_refs = frame.content_refs;
215 record.privacy = frame.privacy;
216 record.retention = frame.retention;
217 self.append_realtime_record(record.clone())?;
218 self.state = Some(state);
219 Ok(Some(record))
220 }
221
222 pub fn interrupt(
226 &mut self,
227 response_id: impl Into<String>,
228 ) -> Result<RealtimeSessionRecord, AgentError> {
229 let response_id = RealtimeResponseId::new(response_id);
230 let state = self.connected_state()?.clone();
231 let mut requested = self.record(
232 &state,
233 RealtimeSessionRecordKind::InterruptRequested,
234 "realtime interrupt requested before adapter call",
235 );
236 requested.response_id = Some(response_id.clone());
237 self.append_realtime_record(requested)?;
238 let mut record = self.record(
239 &state,
240 RealtimeSessionRecordKind::Interrupted,
241 "realtime interruption acknowledged by adapter",
242 );
243 record.status = RealtimeSessionStatus::Interrupted;
244 record.response_id = Some(response_id.clone());
245 self.adapter.interrupt(&state.session_id, &response_id)?;
246 let mut next = state.clone();
247 next.lifecycle_status = RealtimeSessionStatus::Interrupted;
248 self.append_realtime_record(record.clone())?;
249 self.state = Some(next);
250 Ok(record)
251 }
252
253 pub fn begin_restart(&mut self) -> Result<Vec<RealtimeSessionRecord>, AgentError> {
257 let state = self.connected_state()?.clone();
258 let mut requested = self.record(
259 &state,
260 RealtimeSessionRecordKind::RestartRequested,
261 "realtime restart requested",
262 );
263 requested.status = RealtimeSessionStatus::RestartRequested;
264
265 let mut started_state = state;
266 started_state.lifecycle_status = RealtimeSessionStatus::RestartStarted;
267 let mut started = self.record(
268 &started_state,
269 RealtimeSessionRecordKind::RestartStarted,
270 "realtime restart started; outbound frames gated",
271 );
272 started.status = RealtimeSessionStatus::RestartStarted;
273 self.append_realtime_record(requested.clone())?;
274 self.append_realtime_record(started.clone())?;
275 self.state = Some(started_state);
276 Ok(vec![requested, started])
277 }
278
279 pub fn complete_restart(&mut self) -> Result<Vec<RealtimeSessionRecord>, AgentError> {
283 let state = self.connected_state()?.clone();
284 let response = match self
285 .adapter
286 .restart(&state.session_id, &state.connection_id)
287 {
288 Ok(response) => response,
289 Err(error) => {
290 let mut failed_state = state.clone();
291 failed_state.lifecycle_status = RealtimeSessionStatus::RestartFailed;
292 let mut failed = self.record(
293 &failed_state,
294 RealtimeSessionRecordKind::RestartFailed,
295 error.context().message,
296 );
297 failed.status = RealtimeSessionStatus::RestartFailed;
298 self.append_realtime_record(failed.clone())?;
299 self.state = Some(failed_state);
300 return Ok(vec![failed]);
301 }
302 };
303
304 let mut completed_state = state;
305 completed_state.connection_id = response.connection_id;
306 completed_state.restart_count += 1;
307 completed_state.lifecycle_status = RealtimeSessionStatus::RestartCompleted;
308 let mut completed = self.record(
309 &completed_state,
310 RealtimeSessionRecordKind::RestartCompleted,
311 "realtime restart completed",
312 );
313 completed.status = RealtimeSessionStatus::RestartCompleted;
314 self.append_realtime_record(completed.clone())?;
315 self.state = Some(completed_state);
316 Ok(vec![completed])
317 }
318
319 pub fn close(
323 &mut self,
324 reason: RealtimeCloseReason,
325 ) -> Result<RealtimeSessionRecord, AgentError> {
326 let state = self.connected_state()?.clone();
327 let mut requested = self.record(
328 &state,
329 RealtimeSessionRecordKind::CloseRequested,
330 "realtime close requested before adapter call",
331 );
332 requested.close_reason = Some(reason);
333 self.append_realtime_record(requested)?;
334 self.adapter.close(&state.session_id, reason)?;
335 let mut closed_state = state;
336 closed_state.lifecycle_status = RealtimeSessionStatus::Closed;
337 let mut record = self.record(
338 &closed_state,
339 RealtimeSessionRecordKind::Closed,
340 "realtime session closed",
341 );
342 record.status = RealtimeSessionStatus::Closed;
343 record.close_reason = Some(reason);
344 self.append_realtime_record(record.clone())?;
345 self.state = Some(closed_state);
346 Ok(record)
347 }
348
349 fn apply_backpressure(
350 &mut self,
351 frame: RealtimeInputFrame,
352 ) -> Result<RealtimeSessionRecord, AgentError> {
353 let state = self.connected_state()?.clone();
354 let mut gated_state = state;
355 gated_state.backpressure_state = gated_state.backpressure_state.clone().gate();
356 gated_state.lifecycle_status = RealtimeSessionStatus::BackpressureApplied;
357 let mut record = self.record(
358 &gated_state,
359 RealtimeSessionRecordKind::BackpressureApplied,
360 "outbound realtime frame gated during restart",
361 );
362 record.channel = StreamChannel::RealtimeMedia;
363 record.direction = Some(StreamDirection::InputToProvider);
364 record.media_kind = frame.media_kind;
365 record.content_refs = frame.content_refs;
366 record.privacy = frame.privacy;
367 record.retention = frame.retention;
368 self.append_realtime_record(record.clone())?;
369 self.state = Some(gated_state);
370 Ok(record)
371 }
372
373 fn connected_state(&self) -> Result<&RealtimeSessionState, AgentError> {
374 self.state.as_ref().ok_or_else(|| {
375 AgentError::contract_violation("realtime session must connect before use")
376 })
377 }
378
379 fn record(
380 &self,
381 state: &RealtimeSessionState,
382 kind: RealtimeSessionRecordKind,
383 redacted_summary: impl Into<String>,
384 ) -> RealtimeSessionRecord {
385 let _ = (&self.source, &self.runtime_package_fingerprint);
386 RealtimeSessionRecord {
387 kind,
388 session_id: state.session_id.clone(),
389 connection_id: Some(state.connection_id.clone()),
390 response_id: None,
391 run_id: self.run_id.clone(),
392 agent_id: self.agent_id.clone(),
393 provider_route_ref: state.provider_route_ref.clone(),
394 send_cursor: state.send_cursor.clone(),
395 receive_cursor: state.receive_cursor.clone(),
396 restart_count: state.restart_count,
397 backpressure_state: state.backpressure_state.clone(),
398 status: state.lifecycle_status,
399 close_reason: None,
400 channel: StreamChannel::RealtimeTranscript,
401 direction: None,
402 media_kind: RealtimeMediaKind::Transcript,
403 content_refs: Vec::new(),
404 policy_refs: state.policy_refs.clone(),
405 privacy: crate::domain::PrivacyClass::ContentRefsOnly,
406 retention: crate::domain::RetentionClass::RunScoped,
407 redacted_summary: redacted_summary.into(),
408 effect_intent_ref: None,
409 effect_result_ref: None,
410 effect_intent: None,
411 effect_result: None,
412 }
413 }
414
415 fn append_realtime_record(
416 &mut self,
417 record: RealtimeSessionRecord,
418 ) -> Result<JournalCursor, AgentError> {
419 let mut base = JournalRecordBase::new(
420 self.next_journal_seq,
421 format!(
422 "journal.record.{}",
423 record.event_kind_name().replace('_', ".")
424 ),
425 self.run_id.clone(),
426 self.agent_id.clone(),
427 self.source.clone(),
428 );
429 self.next_journal_seq += 1;
430 base.destination = Some(DestinationRef::with_kind(
431 DestinationKind::Provider,
432 record.provider_route_ref.clone(),
433 ));
434 base.runtime_package_fingerprint = self.runtime_package_fingerprint.clone();
435 base.privacy = record.privacy;
436 base.redaction_policy_id = record
437 .policy_refs
438 .first()
439 .map(|policy| policy.as_str().to_string())
440 .unwrap_or_else(|| "policy.redaction.realtime.default".to_string());
441 base.tags = vec!["feature:realtime".to_string()];
442 let subject_ref = EntityRef::new(EntityKind::RealtimeSession, record.session_id.as_str());
443 self.journal
444 .append(JournalRecord::feature_record(
445 base,
446 JournalRecordKind::RealtimeSession,
447 "realtime",
448 record.event_kind_name(),
449 subject_ref,
450 Vec::new(),
451 record.content_refs.clone(),
452 JournalRecordPayload::RealtimeSession(record),
453 ))
454 .map_err(journal_failure)
455 }
456}
457
458fn journal_failure(error: AgentError) -> AgentError {
459 AgentError::new(
460 AgentErrorKind::JournalFailure,
461 RetryClassification::RepairNeeded,
462 error.context().message,
463 )
464}
465
466#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
467pub struct RealtimeCompletionGate {
470 pub final_visible_output_seen: bool,
473 pub terminal_event_replayable: bool,
476 pub stream_interventions_terminal: bool,
479 pub realtime_sessions_terminal: bool,
482 pub output_delivery_terminal: bool,
485 pub approvals_terminal: bool,
488 pub journal_terminal: bool,
491}
492
493impl RealtimeCompletionGate {
494 pub fn mark_final_visible_output(&mut self) {
498 self.final_visible_output_seen = true;
499 }
500
501 pub fn mark_terminal_event_replayable(&mut self) {
504 self.terminal_event_replayable = true;
505 }
506
507 pub fn mark_stream_interventions_terminal(&mut self) {
510 self.stream_interventions_terminal = true;
511 }
512
513 pub fn mark_realtime_sessions_terminal(&mut self) {
516 self.realtime_sessions_terminal = true;
517 }
518
519 pub fn mark_output_delivery_terminal(&mut self) {
522 self.output_delivery_terminal = true;
523 }
524
525 pub fn mark_approvals_terminal(&mut self) {
529 self.approvals_terminal = true;
530 }
531
532 pub fn mark_journal_terminal(&mut self) {
536 self.journal_terminal = true;
537 }
538
539 pub fn can_complete_run(&self) -> bool {
542 self.final_visible_output_seen
543 && self.terminal_event_replayable
544 && self.stream_interventions_terminal
545 && self.realtime_sessions_terminal
546 && self.output_delivery_terminal
547 && self.approvals_terminal
548 && self.journal_terminal
549 }
550}