use std::collections::{BTreeMap, BTreeSet};
use crate::generated::session_document::{
RealtimeTranscriptLaneKind, RealtimeTranscriptMaterializeDecision, RealtimeTranscriptRoleKind,
RealtimeTranscriptStopReasonKind, SessionDocumentEffect, SessionDocumentError,
SessionDocumentMachineAuthority,
};
use crate::realtime_transcript::{
RealtimeTranscriptApplyOutcome, RealtimeTranscriptEvent, RealtimeTranscriptMaterializedMessage,
RealtimeTranscriptRole, TranscriptLane,
};
use crate::types::{
AssistantBlock, BlockAssistantMessage, ContentInput, Message, StopReason, TranscriptSource,
Usage, UserMessage,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RealtimeTranscriptShellError {
op: &'static str,
}
impl std::fmt::Display for RealtimeTranscriptShellError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"session document authority rejected realtime-transcript {}",
self.op
)
}
}
impl std::error::Error for RealtimeTranscriptShellError {}
impl From<SessionDocumentError> for RealtimeTranscriptShellError {
fn from(err: SessionDocumentError) -> Self {
let _ = err;
Self {
op: "decision_rejected",
}
}
}
#[derive(Debug, Clone, Copy)]
struct RealtimeTranscriptEventDecision {
observe_item: bool,
observe_skipped: bool,
write_user_segment: bool,
append_assistant_segment: bool,
replace_assistant_segment: bool,
promote_lane: bool,
mark_item_ready: bool,
record_delta_id: bool,
remove_completion: bool,
record_completion: bool,
discard_response: bool,
discard_response_by_lane: bool,
mark_response_ready: bool,
materialize_ready_items: bool,
}
#[derive(Debug, Clone, Copy)]
struct MaterializeCandidateDecision {
decision: RealtimeTranscriptMaterializeDecision,
consume_usage: bool,
}
fn document_authority() -> SessionDocumentMachineAuthority {
SessionDocumentMachineAuthority::new()
}
fn resolve_realtime_event(
apply: impl FnOnce(
&mut SessionDocumentMachineAuthority,
) -> Result<Vec<SessionDocumentEffect>, SessionDocumentError>,
) -> Result<RealtimeTranscriptEventDecision, RealtimeTranscriptShellError> {
let mut authority = document_authority();
let effects = apply(&mut authority)?;
effects
.into_iter()
.find_map(|effect| match effect {
SessionDocumentEffect::RealtimeTranscriptEventResolved {
observe_item,
observe_skipped,
write_user_segment,
append_assistant_segment,
replace_assistant_segment,
promote_lane,
mark_item_ready,
record_delta_id,
remove_completion,
record_completion,
discard_response,
discard_response_by_lane,
mark_response_ready,
materialize_ready_items,
} => Some(RealtimeTranscriptEventDecision {
observe_item,
observe_skipped,
write_user_segment,
append_assistant_segment,
replace_assistant_segment,
promote_lane,
mark_item_ready,
record_delta_id,
remove_completion,
record_completion,
discard_response,
discard_response_by_lane,
mark_response_ready,
materialize_ready_items,
}),
_ => None,
})
.ok_or(RealtimeTranscriptShellError {
op: "event_resolved",
})
}
fn resolve_materialize_candidate(
state: &SessionRealtimeTranscriptState,
item: &RealtimeTranscriptItemState,
text_present: bool,
completion: Option<&RealtimeAssistantCompletion>,
) -> Result<MaterializeCandidateDecision, RealtimeTranscriptShellError> {
let mut authority = document_authority();
let effects = authority.resolve_realtime_materialize_candidate(
item.materialized,
realtime_predecessor_materialized(state, item.previous_item_id.as_deref()),
item.skipped,
item.ready,
text_present,
role_kind(item.role),
item.response_id.is_some(),
completion.is_some(),
completion.is_some_and(|completion| completion.usage_consumed),
)?;
effects
.into_iter()
.find_map(|effect| match effect {
SessionDocumentEffect::RealtimeMaterializeCandidateResolved {
decision,
consume_usage,
} => Some(MaterializeCandidateDecision {
decision,
consume_usage,
}),
_ => None,
})
.ok_or(RealtimeTranscriptShellError {
op: "materialize_candidate_resolved",
})
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct SessionRealtimeTranscriptState {
#[serde(default)]
items: BTreeMap<String, RealtimeTranscriptItemState>,
#[serde(default)]
first_seen_order: Vec<String>,
#[serde(default)]
seen_delta_ids: BTreeSet<String>,
#[serde(default)]
assistant_completions: BTreeMap<String, RealtimeAssistantCompletion>,
#[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
discarded_assistant_response_ids: BTreeSet<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
struct RealtimeTranscriptItemState {
role: RealtimeTranscriptRole,
#[serde(default)]
previous_item_id: Option<String>,
#[serde(default)]
response_id: Option<String>,
#[serde(default)]
content_segments: BTreeMap<u32, String>,
#[serde(default)]
skipped: bool,
#[serde(default)]
ready: bool,
#[serde(default)]
materialized: bool,
#[serde(default)]
lane: TranscriptLane,
}
impl RealtimeTranscriptItemState {
fn new(
role: RealtimeTranscriptRole,
previous_item_id: Option<String>,
response_id: Option<String>,
) -> Self {
Self {
role,
previous_item_id,
response_id,
content_segments: BTreeMap::new(),
skipped: false,
ready: false,
materialized: false,
lane: TranscriptLane::Display,
}
}
fn skipped(previous_item_id: Option<String>) -> Self {
Self {
role: RealtimeTranscriptRole::Assistant,
previous_item_id,
response_id: None,
content_segments: BTreeMap::new(),
skipped: true,
ready: true,
materialized: false,
lane: TranscriptLane::Display,
}
}
fn text(&self) -> String {
self.content_segments.values().cloned().collect()
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
struct RealtimeAssistantCompletion {
stop_reason: StopReason,
usage: Usage,
usage_consumed: bool,
}
#[derive(Debug, Clone, Default)]
pub struct RealtimeTranscriptApplyCommit {
pub outcome: RealtimeTranscriptApplyOutcome,
pub messages: Vec<Message>,
pub usage: Usage,
}
pub fn restore_realtime_transcript_state(
state: SessionRealtimeTranscriptState,
) -> Result<SessionRealtimeTranscriptState, RealtimeTranscriptShellError> {
let first_seen_unique_count = state
.first_seen_order
.iter()
.cloned()
.collect::<BTreeSet<_>>()
.len();
let mut authority = document_authority();
authority.restore_realtime_transcript_state(
usize_to_u64(state.items.len()),
usize_to_u64(state.first_seen_order.len()),
usize_to_u64(first_seen_unique_count),
state
.items
.keys()
.all(|item_id| state.first_seen_order.iter().any(|seen| seen == item_id)),
state
.first_seen_order
.iter()
.all(|item_id| state.items.contains_key(item_id)),
realtime_transcript_state_identity_fields_valid(&state),
realtime_transcript_delta_ids_valid(&state),
realtime_transcript_completion_ids_valid(&state),
realtime_transcript_discarded_ids_valid(&state),
realtime_transcript_materialized_items_were_ready_or_skipped(&state),
realtime_transcript_assistant_items_have_response_unless_skipped(&state),
realtime_transcript_ready_assistant_items_have_completion_or_are_skipped(&state),
realtime_transcript_materialized_assistant_completions_consumed(&state),
realtime_transcript_completed_assistant_text_items_are_ready_or_materialized_or_skipped(
&state,
),
realtime_transcript_discarded_assistant_items_are_skipped_or_materialized(&state),
)?;
Ok(state)
}
pub fn apply_realtime_transcript_event(
state: &mut SessionRealtimeTranscriptState,
event: RealtimeTranscriptEvent,
) -> Result<RealtimeTranscriptApplyCommit, RealtimeTranscriptShellError> {
let commit = match event {
RealtimeTranscriptEvent::ItemObserved {
item_id,
previous_item_id,
role,
response_id,
} => {
let response_id = normalize_realtime_optional_response_id(response_id);
let response_discarded = role == RealtimeTranscriptRole::Assistant
&& response_id
.as_ref()
.is_some_and(|id| state.discarded_assistant_response_ids.contains(id));
let decision = resolve_realtime_event(|authority| {
authority.resolve_realtime_item_observed(role_kind(role), response_discarded)
})?;
apply_item_observation_decision(
state,
decision,
item_id,
previous_item_id,
role,
response_id,
)?
}
RealtimeTranscriptEvent::ItemSkipped {
item_id,
previous_item_id,
} => {
let decision =
resolve_realtime_event(|authority| authority.resolve_realtime_item_skipped())?;
apply_item_observation_decision(
state,
decision,
item_id,
previous_item_id,
RealtimeTranscriptRole::Assistant,
None,
)?
}
RealtimeTranscriptEvent::UserTranscriptFinal {
item_id,
previous_item_id,
content_index,
text,
} => apply_user_transcript_final(state, item_id, previous_item_id, content_index, text)?,
RealtimeTranscriptEvent::AssistantTextDelta {
response_id,
delta_id,
item_id,
previous_item_id,
content_index,
delta,
} => apply_assistant_delta(
state,
response_id,
delta_id,
item_id,
previous_item_id,
content_index,
delta,
TranscriptLane::Display,
)?,
RealtimeTranscriptEvent::AssistantTranscriptDelta {
response_id,
delta_id,
item_id,
previous_item_id,
content_index,
delta,
} => apply_assistant_delta(
state,
response_id,
delta_id,
item_id,
previous_item_id,
content_index,
delta,
TranscriptLane::Spoken,
)?,
RealtimeTranscriptEvent::AssistantTranscriptTruncated {
response_id,
item_id,
content_index,
text,
} => apply_assistant_text_replacement(
state,
response_id,
item_id,
content_index,
text,
"AssistantTranscriptTruncated",
)?,
RealtimeTranscriptEvent::AssistantTranscriptFinalText {
response_id,
item_id,
content_index,
text,
} => apply_assistant_text_replacement(
state,
response_id,
item_id,
content_index,
text,
"AssistantTranscriptFinalText",
)?,
RealtimeTranscriptEvent::AssistantTurnCompleted {
response_id,
stop_reason,
usage,
} => apply_assistant_turn_completed(state, response_id, stop_reason, usage)?,
RealtimeTranscriptEvent::AssistantTurnInterrupted { response_id } => {
apply_assistant_turn_interrupted(state, response_id)?
}
};
Ok(commit)
}
fn apply_item_observation_decision(
state: &mut SessionRealtimeTranscriptState,
decision: RealtimeTranscriptEventDecision,
item_id: String,
previous_item_id: Option<String>,
role: RealtimeTranscriptRole,
response_id: Option<String>,
) -> Result<RealtimeTranscriptApplyCommit, RealtimeTranscriptShellError> {
if decision.observe_skipped {
observe_realtime_skipped_item(state, item_id, previous_item_id);
} else if decision.observe_item {
observe_realtime_item(state, item_id, previous_item_id, role, response_id);
}
finish_realtime_event(state, decision)
}
fn apply_user_transcript_final(
state: &mut SessionRealtimeTranscriptState,
item_id: String,
previous_item_id: Option<String>,
content_index: u32,
text: String,
) -> Result<RealtimeTranscriptApplyCommit, RealtimeTranscriptShellError> {
let existing_segment = state
.items
.get(&item_id)
.and_then(|item| item.content_segments.get(&content_index));
let segment_empty = existing_segment.is_none_or(String::is_empty);
let segment_matches = existing_segment.is_some_and(|segment| segment == &text);
let decision = resolve_realtime_event(|authority| {
authority.resolve_realtime_user_transcript_final(
!text.is_empty(),
segment_empty,
segment_matches,
)
})?;
if let Some(item) = if decision.observe_item {
observe_realtime_item(
state,
item_id,
previous_item_id,
RealtimeTranscriptRole::User,
None,
)
} else {
None
} {
if decision.write_user_segment {
item.content_segments.insert(content_index, text);
} else if !text.is_empty() && !segment_empty && !segment_matches {
tracing::warn!(
content_index,
"ignoring conflicting realtime user transcript segment replay"
);
}
if decision.mark_item_ready {
item.ready = true;
}
}
finish_realtime_event(state, decision)
}
#[allow(clippy::too_many_arguments)]
fn apply_assistant_delta(
state: &mut SessionRealtimeTranscriptState,
response_id: String,
delta_id: String,
item_id: String,
previous_item_id: Option<String>,
content_index: u32,
delta: String,
requested_lane: TranscriptLane,
) -> Result<RealtimeTranscriptApplyCommit, RealtimeTranscriptShellError> {
let response_id = normalize_realtime_response_id(response_id);
let delta_id_present = !delta_id.trim().is_empty();
let delta_id_seen = delta_id_present && state.seen_delta_ids.contains(&delta_id);
let response_discarded = response_id
.as_ref()
.is_some_and(|id| state.discarded_assistant_response_ids.contains(id));
let item_has_text = state
.items
.get(&item_id)
.is_some_and(|item| !item.text().is_empty());
let current_lane = state
.items
.get(&item_id)
.map(|item| item.lane)
.unwrap_or_default();
let text_after_write_present = item_has_text || !delta.is_empty();
let response_completed = response_id
.as_ref()
.is_some_and(|id| state.assistant_completions.contains_key(id));
let decision = resolve_realtime_event(|authority| {
authority.resolve_realtime_assistant_delta(
response_id.is_some(),
response_discarded,
delta_id_present,
delta_id_seen,
item_has_text,
lane_kind(current_lane),
lane_kind(requested_lane),
response_completed,
text_after_write_present,
)
})?;
if decision.observe_skipped {
observe_realtime_skipped_item(state, item_id, previous_item_id);
} else if let Some(response_id) = response_id {
if decision.record_delta_id {
state.seen_delta_ids.insert(delta_id);
}
let item = if decision.observe_item {
observe_realtime_item(
state,
item_id,
previous_item_id,
RealtimeTranscriptRole::Assistant,
Some(response_id),
)
} else {
None
};
if let Some(item) = item {
if decision.promote_lane {
item.lane = requested_lane;
}
if decision.append_assistant_segment {
item.content_segments
.entry(content_index)
.or_default()
.push_str(&delta);
} else if decision.observe_item && current_lane != requested_lane && item_has_text {
tracing::warn!(
"realtime assistant delta routed to an incompatible transcript lane; dropping delta to preserve generated lane decision"
);
}
if decision.mark_item_ready {
item.ready = true;
}
}
}
finish_realtime_event(state, decision)
}
fn apply_assistant_text_replacement(
state: &mut SessionRealtimeTranscriptState,
response_id: String,
item_id: String,
content_index: u32,
text: String,
op: &'static str,
) -> Result<RealtimeTranscriptApplyCommit, RealtimeTranscriptShellError> {
let response_id = normalize_realtime_response_id(response_id);
let response_discarded = response_id
.as_ref()
.is_some_and(|id| state.discarded_assistant_response_ids.contains(id));
let item_has_text = state
.items
.get(&item_id)
.is_some_and(|item| !item.text().is_empty());
let current_lane = state
.items
.get(&item_id)
.map(|item| item.lane)
.unwrap_or_default();
let item_materialized = state
.items
.get(&item_id)
.is_some_and(|item| item.materialized);
let text_after_replace_present =
text_after_replacing_segment_present(state.items.get(&item_id), content_index, &text);
let response_completed = response_id
.as_ref()
.is_some_and(|id| state.assistant_completions.contains_key(id));
let decision = resolve_realtime_event(|authority| {
authority.resolve_realtime_assistant_text_replacement(
response_id.is_some(),
response_discarded,
item_materialized,
item_has_text,
lane_kind(current_lane),
lane_kind(TranscriptLane::Spoken),
response_completed,
text_after_replace_present,
)
})?;
if decision.observe_skipped {
observe_realtime_skipped_item(state, item_id, None);
} else if let Some(response_id) = response_id {
let item_id_for_log = item_id.clone();
let item = if decision.observe_item {
observe_realtime_item(
state,
item_id,
None,
RealtimeTranscriptRole::Assistant,
Some(response_id.clone()),
)
} else {
None
};
if let Some(item) = item {
if item_materialized {
tracing::warn!(
target: "meerkat::session",
item_id = %item_id_for_log,
response_id = %response_id,
"{op} arrived after item already materialized; canonical message is locked, late replacement dropped",
);
} else if decision.promote_lane {
item.lane = TranscriptLane::Spoken;
} else if decision.observe_item
&& current_lane != TranscriptLane::Spoken
&& item_has_text
{
tracing::warn!(
"{op} routed to a Display-lane item; dropping replacement to preserve generated lane decision"
);
}
if decision.replace_assistant_segment {
item.content_segments.insert(content_index, text);
}
if decision.mark_item_ready {
item.ready = true;
}
}
}
finish_realtime_event(state, decision)
}
fn apply_assistant_turn_completed(
state: &mut SessionRealtimeTranscriptState,
response_id: String,
stop_reason: StopReason,
usage: Usage,
) -> Result<RealtimeTranscriptApplyCommit, RealtimeTranscriptShellError> {
let response_id = normalize_realtime_response_id(response_id);
let response_discarded = response_id
.as_ref()
.is_some_and(|id| state.discarded_assistant_response_ids.contains(id));
let decision = resolve_realtime_event(|authority| {
authority.resolve_realtime_assistant_turn_completed(
response_id.is_some(),
response_discarded,
stop_reason_kind(stop_reason),
)
})?;
if let Some(response_id) = response_id {
if decision.discard_response {
discard_realtime_assistant_response(state, &response_id);
}
if decision.remove_completion {
state.assistant_completions.remove(&response_id);
}
if decision.record_completion {
state
.assistant_completions
.entry(response_id.clone())
.or_insert(RealtimeAssistantCompletion {
stop_reason,
usage,
usage_consumed: false,
});
}
if decision.mark_response_ready {
mark_realtime_assistant_response_ready(state, &response_id);
}
}
finish_realtime_event(state, decision)
}
fn apply_assistant_turn_interrupted(
state: &mut SessionRealtimeTranscriptState,
response_id: String,
) -> Result<RealtimeTranscriptApplyCommit, RealtimeTranscriptShellError> {
let response_id = normalize_realtime_response_id(response_id);
let decision = resolve_realtime_event(|authority| {
authority.resolve_realtime_assistant_turn_interrupted(response_id.is_some())
})?;
if let Some(response_id) = response_id {
if decision.discard_response_by_lane {
discard_realtime_assistant_response_by_lane(state, &response_id);
}
if decision.record_completion {
state
.assistant_completions
.entry(response_id.clone())
.or_insert(RealtimeAssistantCompletion {
stop_reason: StopReason::Cancelled,
usage: Usage::default(),
usage_consumed: false,
});
}
if decision.mark_response_ready {
mark_realtime_assistant_response_ready(state, &response_id);
}
}
finish_realtime_event(state, decision)
}
fn finish_realtime_event(
state: &mut SessionRealtimeTranscriptState,
decision: RealtimeTranscriptEventDecision,
) -> Result<RealtimeTranscriptApplyCommit, RealtimeTranscriptShellError> {
if decision.materialize_ready_items {
materialize_realtime_transcript_ready_items(state)
} else {
Ok(RealtimeTranscriptApplyCommit::default())
}
}
#[must_use]
pub fn in_flight_realtime_assistant_response_ids(
state: &SessionRealtimeTranscriptState,
) -> Vec<String> {
let mut seen: BTreeSet<String> = BTreeSet::new();
let mut out: Vec<String> = Vec::new();
for item_id in &state.first_seen_order {
let Some(item) = state.items.get(item_id) else {
continue;
};
if item.role != RealtimeTranscriptRole::Assistant {
continue;
}
if item.materialized || item.skipped {
continue;
}
let Some(response_id) = item.response_id.as_ref() else {
continue;
};
if state.discarded_assistant_response_ids.contains(response_id) {
continue;
}
if seen.insert(response_id.clone()) {
out.push(response_id.clone());
}
}
out
}
fn materialize_realtime_transcript_ready_items(
state: &mut SessionRealtimeTranscriptState,
) -> Result<RealtimeTranscriptApplyCommit, RealtimeTranscriptShellError> {
let mut materialized = Vec::new();
let mut messages = Vec::new();
let mut committed_usage = Usage::default();
let mut pending_blocks: Vec<AssistantBlock> = Vec::new();
let mut pending_response_id: Option<String> = None;
let mut pending_stop_reason: StopReason = StopReason::EndTurn;
let mut pending_usage: Usage = Usage::default();
loop {
let order = realtime_transcript_order(state);
let mut skipped_batch = Vec::new();
let mut batch = Vec::new();
for item_id in order {
let Some(item) = state.items.get(&item_id) else {
continue;
};
let text = item.text();
let completion = item
.response_id
.as_ref()
.and_then(|response_id| state.assistant_completions.get(response_id));
let decision =
resolve_materialize_candidate(state, item, !text.is_empty(), completion)?;
match decision.decision {
RealtimeTranscriptMaterializeDecision::Wait => {}
RealtimeTranscriptMaterializeDecision::MarkSkipped => {
skipped_batch.push(item_id.clone());
}
RealtimeTranscriptMaterializeDecision::MaterializeUser => {
batch.push(ResolvedMaterialization::User {
item_id: item_id.clone(),
text,
});
}
RealtimeTranscriptMaterializeDecision::MaterializeAssistant => {
let Some(response_id) = item.response_id.as_ref() else {
continue;
};
let Some(completion) = completion else {
continue;
};
let usage = if decision.consume_usage {
completion.usage.clone()
} else {
Usage::default()
};
batch.push(ResolvedMaterialization::Assistant {
item_id: item_id.clone(),
response_id: response_id.clone(),
text,
stop_reason: completion.stop_reason,
usage,
lane: item.lane,
consume_usage: decision.consume_usage,
});
}
}
}
if skipped_batch.is_empty() && batch.is_empty() {
break;
}
for item_id in skipped_batch {
if let Some(item) = state.items.get_mut(&item_id) {
item.materialized = true;
}
}
for resolved in batch {
match resolved {
ResolvedMaterialization::User { item_id, text } => {
flush_pending_assistant_blocks(
&mut messages,
&mut committed_usage,
&mut pending_blocks,
pending_stop_reason,
&mut pending_usage,
);
pending_response_id = None;
if let Some(item) = state.items.get_mut(&item_id) {
item.materialized = true;
}
messages.push(Message::User(UserMessage::with_blocks(
ContentInput::Text(text.clone()).into_blocks(),
)));
materialized
.push(RealtimeTranscriptMaterializedMessage::User { item_id, text });
}
ResolvedMaterialization::Assistant {
item_id,
response_id,
text,
stop_reason,
usage,
lane,
consume_usage,
} => {
if pending_response_id
.as_ref()
.is_some_and(|existing| existing != &response_id)
&& !pending_blocks.is_empty()
{
flush_pending_assistant_blocks(
&mut messages,
&mut committed_usage,
&mut pending_blocks,
pending_stop_reason,
&mut pending_usage,
);
pending_response_id = None;
}
if let Some(item) = state.items.get_mut(&item_id) {
item.materialized = true;
}
if consume_usage
&& let Some(completion) = state.assistant_completions.get_mut(&response_id)
{
completion.usage_consumed = true;
}
let block = match lane {
TranscriptLane::Display => AssistantBlock::Text {
text: text.clone(),
meta: None,
},
TranscriptLane::Spoken => AssistantBlock::Transcript {
text: text.clone(),
source: TranscriptSource::Spoken,
meta: None,
},
};
if pending_response_id.is_none() {
pending_response_id = Some(response_id.clone());
pending_stop_reason = stop_reason;
pending_usage = usage.clone();
}
pending_blocks.push(block);
materialized.push(RealtimeTranscriptMaterializedMessage::Assistant {
item_id,
response_id,
text,
stop_reason,
usage,
lane,
});
}
}
}
}
flush_pending_assistant_blocks(
&mut messages,
&mut committed_usage,
&mut pending_blocks,
pending_stop_reason,
&mut pending_usage,
);
Ok(RealtimeTranscriptApplyCommit {
outcome: RealtimeTranscriptApplyOutcome {
materialized_messages: materialized,
},
messages,
usage: committed_usage,
})
}
enum ResolvedMaterialization {
User {
item_id: String,
text: String,
},
Assistant {
item_id: String,
response_id: String,
text: String,
stop_reason: StopReason,
usage: Usage,
lane: TranscriptLane,
consume_usage: bool,
},
}
fn flush_pending_assistant_blocks(
messages: &mut Vec<Message>,
committed_usage: &mut Usage,
pending_blocks: &mut Vec<AssistantBlock>,
pending_stop_reason: StopReason,
pending_usage: &mut Usage,
) {
if pending_blocks.is_empty() {
*pending_usage = Usage::default();
return;
}
let blocks = std::mem::take(pending_blocks);
messages.push(Message::BlockAssistant(BlockAssistantMessage::new(
blocks,
pending_stop_reason,
)));
if *pending_usage != Usage::default() {
committed_usage.add(pending_usage);
*pending_usage = Usage::default();
}
}
fn observe_realtime_item(
state: &mut SessionRealtimeTranscriptState,
item_id: String,
previous_item_id: Option<String>,
role: RealtimeTranscriptRole,
response_id: Option<String>,
) -> Option<&mut RealtimeTranscriptItemState> {
if item_id.trim().is_empty() {
return None;
}
if !state.items.contains_key(&item_id) {
state.first_seen_order.push(item_id.clone());
state.items.insert(
item_id.clone(),
RealtimeTranscriptItemState::new(role, previous_item_id, response_id),
);
} else if let Some(item) = state.items.get_mut(&item_id) {
if item.previous_item_id.is_none() {
item.previous_item_id = previous_item_id;
}
if item.response_id.is_none() {
item.response_id = response_id;
}
if item.role != role {
tracing::warn!("ignoring conflicting realtime item role replay");
}
}
state.items.get_mut(&item_id)
}
fn observe_realtime_skipped_item(
state: &mut SessionRealtimeTranscriptState,
item_id: String,
previous_item_id: Option<String>,
) {
if item_id.trim().is_empty() {
return;
}
if !state.items.contains_key(&item_id) {
state.first_seen_order.push(item_id.clone());
state.items.insert(
item_id,
RealtimeTranscriptItemState::skipped(previous_item_id),
);
} else if let Some(item) = state.items.get_mut(&item_id) {
if item.previous_item_id.is_none() {
item.previous_item_id = previous_item_id;
}
item.skipped = true;
item.ready = true;
}
}
fn mark_realtime_assistant_response_ready(
state: &mut SessionRealtimeTranscriptState,
response_id: &str,
) {
for item in state.items.values_mut() {
if item.role == RealtimeTranscriptRole::Assistant
&& item.response_id.as_deref() == Some(response_id)
&& !item.text().is_empty()
{
item.ready = true;
}
}
}
fn discard_realtime_assistant_response(
state: &mut SessionRealtimeTranscriptState,
response_id: &str,
) {
state
.discarded_assistant_response_ids
.insert(response_id.to_string());
for item in state.items.values_mut() {
if item.role == RealtimeTranscriptRole::Assistant
&& item.response_id.as_deref() == Some(response_id)
&& !item.materialized
{
item.skipped = true;
item.ready = true;
}
}
}
fn discard_realtime_assistant_response_by_lane(
state: &mut SessionRealtimeTranscriptState,
response_id: &str,
) {
state
.discarded_assistant_response_ids
.insert(response_id.to_string());
for item in state.items.values_mut() {
if item.role != RealtimeTranscriptRole::Assistant
|| item.response_id.as_deref() != Some(response_id)
|| item.materialized
{
continue;
}
let has_content = item
.content_segments
.values()
.any(|segment| !segment.is_empty());
if item.lane == TranscriptLane::Display && has_content {
continue;
}
item.content_segments.clear();
item.skipped = true;
item.ready = true;
}
}
fn realtime_transcript_order(state: &SessionRealtimeTranscriptState) -> Vec<String> {
let mut out = Vec::new();
let mut emitted = BTreeSet::new();
loop {
let mut progressed = false;
for item_id in &state.first_seen_order {
if emitted.contains(item_id) {
continue;
}
let Some(item) = state.items.get(item_id) else {
continue;
};
if item
.previous_item_id
.as_ref()
.is_some_and(|prev| state.items.contains_key(prev) && !emitted.contains(prev))
{
continue;
}
emitted.insert(item_id.clone());
out.push(item_id.clone());
progressed = true;
}
if !progressed {
for item_id in &state.first_seen_order {
if emitted.insert(item_id.clone()) {
out.push(item_id.clone());
}
}
break;
}
if emitted.len() >= state.items.len() {
break;
}
}
out
}
fn realtime_predecessor_materialized(
state: &SessionRealtimeTranscriptState,
previous_item_id: Option<&str>,
) -> bool {
match previous_item_id {
None => true,
Some(previous_item_id) => state
.items
.get(previous_item_id)
.is_some_and(|item| item.materialized),
}
}
fn text_after_replacing_segment_present(
item: Option<&RealtimeTranscriptItemState>,
content_index: u32,
text: &str,
) -> bool {
if !text.is_empty() {
return true;
}
item.is_some_and(|item| {
item.content_segments
.iter()
.any(|(index, segment)| *index != content_index && !segment.is_empty())
})
}
fn normalize_realtime_response_id(response_id: String) -> Option<String> {
let trimmed = response_id.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn normalize_realtime_optional_response_id(response_id: Option<String>) -> Option<String> {
response_id.and_then(normalize_realtime_response_id)
}
fn role_kind(role: RealtimeTranscriptRole) -> RealtimeTranscriptRoleKind {
match role {
RealtimeTranscriptRole::User => RealtimeTranscriptRoleKind::User,
RealtimeTranscriptRole::Assistant => RealtimeTranscriptRoleKind::Assistant,
}
}
fn lane_kind(lane: TranscriptLane) -> RealtimeTranscriptLaneKind {
match lane {
TranscriptLane::Display => RealtimeTranscriptLaneKind::Display,
TranscriptLane::Spoken => RealtimeTranscriptLaneKind::Spoken,
}
}
fn stop_reason_kind(stop_reason: StopReason) -> RealtimeTranscriptStopReasonKind {
match stop_reason {
StopReason::Cancelled => RealtimeTranscriptStopReasonKind::Cancelled,
StopReason::ToolUse => RealtimeTranscriptStopReasonKind::ToolUse,
_ => RealtimeTranscriptStopReasonKind::Other,
}
}
fn realtime_transcript_state_identity_fields_valid(state: &SessionRealtimeTranscriptState) -> bool {
state.items.iter().all(|(item_id, item)| {
!item_id.trim().is_empty()
&& item
.previous_item_id
.as_ref()
.is_none_or(|previous| !previous.trim().is_empty())
&& item
.response_id
.as_ref()
.is_none_or(|response| !response.trim().is_empty())
})
}
fn realtime_transcript_delta_ids_valid(state: &SessionRealtimeTranscriptState) -> bool {
state
.seen_delta_ids
.iter()
.all(|delta_id| !delta_id.trim().is_empty())
}
fn realtime_transcript_completion_ids_valid(state: &SessionRealtimeTranscriptState) -> bool {
state
.assistant_completions
.keys()
.all(|response_id| !response_id.trim().is_empty())
}
fn realtime_transcript_discarded_ids_valid(state: &SessionRealtimeTranscriptState) -> bool {
state
.discarded_assistant_response_ids
.iter()
.all(|response_id| !response_id.trim().is_empty())
}
fn realtime_transcript_materialized_items_were_ready_or_skipped(
state: &SessionRealtimeTranscriptState,
) -> bool {
state
.items
.values()
.all(|item| !item.materialized || item.ready || item.skipped)
}
fn realtime_transcript_assistant_items_have_response_unless_skipped(
state: &SessionRealtimeTranscriptState,
) -> bool {
state.items.values().all(|item| {
item.role != RealtimeTranscriptRole::Assistant
|| item.skipped
|| item.response_id.is_some()
|| (!item.ready && !item.materialized && item.text().is_empty())
})
}
fn realtime_transcript_ready_assistant_items_have_completion_or_are_skipped(
state: &SessionRealtimeTranscriptState,
) -> bool {
state.items.values().all(|item| {
if item.role != RealtimeTranscriptRole::Assistant || item.skipped || !item.ready {
return true;
}
item.response_id
.as_ref()
.is_some_and(|response_id| state.assistant_completions.contains_key(response_id))
})
}
fn realtime_transcript_materialized_assistant_completions_consumed(
state: &SessionRealtimeTranscriptState,
) -> bool {
state.items.values().all(|item| {
if item.role != RealtimeTranscriptRole::Assistant || !item.materialized || item.skipped {
return true;
}
item.response_id.as_ref().is_some_and(|response_id| {
state
.assistant_completions
.get(response_id)
.is_some_and(|completion| completion.usage_consumed)
})
})
}
fn realtime_transcript_completed_assistant_text_items_are_ready_or_materialized_or_skipped(
state: &SessionRealtimeTranscriptState,
) -> bool {
state.items.values().all(|item| {
if item.role != RealtimeTranscriptRole::Assistant || item.text().is_empty() {
return true;
}
let Some(response_id) = item.response_id.as_ref() else {
return false;
};
if !state.assistant_completions.contains_key(response_id) {
return true;
}
item.ready || item.materialized || item.skipped
})
}
fn realtime_transcript_discarded_assistant_items_are_skipped_or_materialized(
state: &SessionRealtimeTranscriptState,
) -> bool {
state.items.values().all(|item| {
if item.role != RealtimeTranscriptRole::Assistant {
return true;
}
let Some(response_id) = item.response_id.as_ref() else {
return true;
};
if !state.discarded_assistant_response_ids.contains(response_id) {
return true;
}
item.skipped || item.materialized || item.lane == TranscriptLane::Display
})
}
fn usize_to_u64(value: usize) -> u64 {
u64::try_from(value).unwrap_or(u64::MAX)
}