Skip to main content

codex_runtime/runtime/
turn_output.rs

1use std::collections::HashSet;
2
3use serde_json::Value;
4
5use crate::runtime::events::Envelope;
6use crate::runtime::id::{parse_result_thread_id, parse_result_turn_id};
7use crate::runtime::rpc_contract::methods as events;
8
9use std::sync::Arc;
10
11/// Incremental assistant text collector for one turn stream.
12/// Keeps explicit state to avoid duplicate text from both delta and completed payloads.
13#[derive(Clone, Debug, Default, PartialEq, Eq)]
14pub struct AssistantTextCollector {
15    assistant_item_ids: HashSet<Arc<str>>,
16    assistant_items_with_delta: HashSet<Arc<str>>,
17    text: String,
18}
19
20impl AssistantTextCollector {
21    /// Create empty collector.
22    /// Allocation: none. Complexity: O(1).
23    pub fn new() -> Self {
24        Self::default()
25    }
26
27    /// Consume one envelope and update internal text state.
28    /// Allocation: O(delta) for appended text and newly seen item ids.
29    /// Complexity: O(1).
30    pub fn push_envelope(&mut self, envelope: &Envelope) {
31        track_assistant_item(&mut self.assistant_item_ids, envelope);
32        append_text_from_envelope(
33            &mut self.text,
34            &self.assistant_item_ids,
35            &mut self.assistant_items_with_delta,
36            envelope,
37        );
38    }
39
40    /// Borrow collected raw text.
41    /// Allocation: none. Complexity: O(1).
42    pub fn text(&self) -> &str {
43        &self.text
44    }
45
46    /// Take ownership of collected raw text.
47    /// Allocation: none. Complexity: O(1).
48    pub fn into_text(self) -> String {
49        self.text
50    }
51}
52
53/// Terminal state of one turn observed from live stream events.
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum TurnTerminalEvent {
56    Completed,
57    Failed,
58    Interrupted,
59    Cancelled,
60}
61
62/// Shared turn stream collector engine used by runtime prompt and artifact execution flows.
63/// It filters by `(thread_id, turn_id)`, accumulates assistant text, and reports terminal events.
64#[derive(Clone, Debug)]
65pub struct TurnStreamCollector {
66    thread_id: Arc<str>,
67    turn_id: Arc<str>,
68    matching_turn_events: usize,
69    assistant: AssistantTextCollector,
70}
71
72impl TurnStreamCollector {
73    /// Create collector bound to one target turn.
74    pub fn new(thread_id: &str, turn_id: &str) -> Self {
75        Self {
76            thread_id: Arc::from(thread_id),
77            turn_id: Arc::from(turn_id),
78            matching_turn_events: 0,
79            assistant: AssistantTextCollector::new(),
80        }
81    }
82
83    /// Consume one envelope. Returns terminal event when this envelope closes the target turn.
84    pub fn push_envelope(&mut self, envelope: &Envelope) -> Option<TurnTerminalEvent> {
85        if envelope.thread_id.as_deref() != Some(self.thread_id.as_ref())
86            || envelope.turn_id.as_deref() != Some(self.turn_id.as_ref())
87        {
88            return None;
89        }
90
91        self.matching_turn_events = self.matching_turn_events.saturating_add(1);
92        self.assistant.push_envelope(envelope);
93
94        match envelope.method.as_deref() {
95            Some(events::TURN_COMPLETED) => Some(TurnTerminalEvent::Completed),
96            Some(events::TURN_FAILED) => Some(TurnTerminalEvent::Failed),
97            Some(events::TURN_INTERRUPTED) => Some(TurnTerminalEvent::Interrupted),
98            Some(events::TURN_CANCELLED) => Some(TurnTerminalEvent::Cancelled),
99            _ => None,
100        }
101    }
102
103    /// Whether one envelope belongs to this collector turn target.
104    pub fn is_target_envelope(&self, envelope: &Envelope) -> bool {
105        envelope.thread_id.as_deref() == Some(self.thread_id.as_ref())
106            && envelope.turn_id.as_deref() == Some(self.turn_id.as_ref())
107    }
108
109    /// Number of consumed envelopes that matched the target turn.
110    pub fn matching_turn_events(&self) -> usize {
111        self.matching_turn_events
112    }
113
114    /// Borrow current collected assistant text.
115    pub fn assistant_text(&self) -> &str {
116        self.assistant.text()
117    }
118
119    /// Take ownership of collected assistant text.
120    pub fn into_assistant_text(self) -> String {
121        self.assistant.into_text()
122    }
123}
124
125/// Parse thread id from common JSON-RPC result shapes.
126/// Allocation: one String on match. Complexity: O(1).
127pub fn parse_thread_id(value: &Value) -> Option<String> {
128    parse_result_thread_id(value).map(ToOwned::to_owned)
129}
130
131/// Parse turn id from common JSON-RPC result shapes.
132/// Allocation: one String on match. Complexity: O(1).
133pub fn parse_turn_id(value: &Value) -> Option<String> {
134    parse_result_turn_id(value).map(ToOwned::to_owned)
135}
136
137fn track_assistant_item(assistant_item_ids: &mut HashSet<Arc<str>>, envelope: &Envelope) {
138    if envelope.method.as_deref() != Some(events::ITEM_STARTED) {
139        return;
140    }
141
142    let params = envelope.json.get("params");
143    let item_type = params
144        .and_then(|p| p.get("itemType"))
145        .and_then(Value::as_str)
146        .unwrap_or("");
147    if item_type != "agentMessage" && item_type != "agent_message" {
148        return;
149    }
150    if let Some(item_id) = envelope.item_id.as_ref() {
151        assistant_item_ids.insert(item_id.clone());
152    }
153}
154
155fn append_text_from_envelope(
156    out: &mut String,
157    assistant_item_ids: &HashSet<Arc<str>>,
158    assistant_items_with_delta: &mut HashSet<Arc<str>>,
159    envelope: &Envelope,
160) {
161    let params = envelope.json.get("params");
162    match envelope.method.as_deref() {
163        Some(events::ITEM_AGENT_MESSAGE_DELTA) => {
164            if let Some(delta) = params.and_then(|p| p.get("delta")).and_then(Value::as_str) {
165                if let Some(item_id) = envelope.item_id.as_ref() {
166                    assistant_items_with_delta.insert(item_id.clone());
167                }
168                out.push_str(delta);
169            }
170        }
171        Some(events::ITEM_COMPLETED) => {
172            let is_assistant_item = envelope
173                .item_id
174                .as_ref()
175                .map(|id| assistant_item_ids.contains(id))
176                .unwrap_or(false)
177                || params
178                    .and_then(|p| p.get("item"))
179                    .and_then(|v| v.get("type"))
180                    .and_then(Value::as_str)
181                    .map(|t| t == "agent_message" || t == "agentMessage")
182                    .unwrap_or(false);
183            if !is_assistant_item {
184                return;
185            }
186            if envelope
187                .item_id
188                .as_ref()
189                .map(|id| assistant_items_with_delta.contains(id))
190                .unwrap_or(false)
191            {
192                return;
193            }
194
195            if let Some(text) = params.and_then(extract_text_from_params) {
196                if !text.is_empty() {
197                    if !out.is_empty() {
198                        out.push('\n');
199                    }
200                    out.push_str(&text);
201                }
202            }
203        }
204        Some(events::TURN_COMPLETED) => {
205            if let Some(text) = params.and_then(extract_text_from_params) {
206                merge_turn_completed_text(out, &text);
207            }
208        }
209        _ => {}
210    }
211}
212
213fn merge_turn_completed_text(out: &mut String, text: &str) {
214    if text.is_empty() {
215        return;
216    }
217    if out.is_empty() {
218        out.push_str(text);
219        return;
220    }
221    if out == text {
222        return;
223    }
224    // If turn/completed includes the full final text and we only collected a prefix
225    // from deltas, promote to the complete payload instead of duplicating.
226    if text.starts_with(out.as_str()) {
227        out.clear();
228        out.push_str(text);
229        return;
230    }
231    if out.ends_with(text) {
232        return;
233    }
234    out.push('\n');
235    out.push_str(text);
236}
237
238fn extract_text_from_params(params: &Value) -> Option<String> {
239    for ptr in ["/item/text", "/text", "/outputText", "/output/text"] {
240        if let Some(text) = params.pointer(ptr).and_then(Value::as_str) {
241            return Some(text.to_owned());
242        }
243    }
244    if let Some(content) = params
245        .get("item")
246        .and_then(|item| item.get("content"))
247        .and_then(Value::as_array)
248    {
249        let mut joined = String::new();
250        for part in content {
251            if let Some(text) = part.get("text").and_then(Value::as_str) {
252                joined.push_str(text);
253            }
254        }
255        if !joined.is_empty() {
256            return Some(joined);
257        }
258    }
259    None
260}
261
262#[cfg(test)]
263mod tests {
264    use serde_json::json;
265
266    use crate::runtime::events::{Direction, MsgKind};
267
268    use super::*;
269
270    fn envelope_for_turn(
271        method: &str,
272        thread_id: &str,
273        turn_id: &str,
274        item_id: Option<&str>,
275        params: Value,
276    ) -> Envelope {
277        Envelope {
278            seq: 1,
279            ts_millis: 0,
280            direction: Direction::Inbound,
281            kind: MsgKind::Notification,
282            rpc_id: None,
283            method: Some(Arc::from(method)),
284            thread_id: Some(Arc::from(thread_id)),
285            turn_id: Some(Arc::from(turn_id)),
286            item_id: item_id.map(Arc::from),
287            json: Arc::new(json!({"method": method, "params": params})),
288        }
289    }
290
291    fn envelope(method: &str, item_id: Option<&str>, params: Value) -> Envelope {
292        envelope_for_turn(method, "thr", "turn", item_id, params)
293    }
294
295    #[test]
296    fn collector_prefers_delta_and_ignores_completed_duplicate() {
297        let mut collector = AssistantTextCollector::new();
298        collector.push_envelope(&envelope(
299            "item/started",
300            Some("it_1"),
301            json!({"itemType":"agentMessage"}),
302        ));
303        collector.push_envelope(&envelope(
304            "item/agentMessage/delta",
305            Some("it_1"),
306            json!({"delta":"hello"}),
307        ));
308        collector.push_envelope(&envelope(
309            "item/completed",
310            Some("it_1"),
311            json!({"item":{"type":"agent_message","text":"hello"}}),
312        ));
313        assert_eq!(collector.text(), "hello");
314    }
315
316    #[test]
317    fn collector_reads_completed_text_without_delta() {
318        let mut collector = AssistantTextCollector::new();
319        collector.push_envelope(&envelope(
320            "item/started",
321            Some("it_2"),
322            json!({"itemType":"agent_message"}),
323        ));
324        collector.push_envelope(&envelope(
325            "item/completed",
326            Some("it_2"),
327            json!({"item":{"type":"agent_message","text":"world"}}),
328        ));
329        assert_eq!(collector.text(), "world");
330    }
331
332    #[test]
333    fn collector_dedups_turn_completed_text_after_item_completed() {
334        let mut collector = AssistantTextCollector::new();
335        collector.push_envelope(&envelope(
336            "item/started",
337            Some("it_3"),
338            json!({"itemType":"agent_message"}),
339        ));
340        collector.push_envelope(&envelope(
341            "item/completed",
342            Some("it_3"),
343            json!({"item":{"type":"agent_message","text":"final answer"}}),
344        ));
345        collector.push_envelope(&envelope(
346            "turn/completed",
347            None,
348            json!({"text":"final answer"}),
349        ));
350        assert_eq!(collector.text(), "final answer");
351    }
352
353    #[test]
354    fn parse_ids_from_result_shapes() {
355        let v = json!({"thread":{"id":"thr_1"},"turn":{"id":"turn_1"}});
356        assert_eq!(parse_thread_id(&v).as_deref(), Some("thr_1"));
357        assert_eq!(parse_turn_id(&v).as_deref(), Some("turn_1"));
358    }
359
360    #[test]
361    fn parse_ids_reject_loose_id_fallback_and_empty_values() {
362        assert_eq!(parse_thread_id(&json!({"id":"thr_loose"})), None);
363        assert_eq!(parse_turn_id(&json!("turn_loose")), None);
364        assert_eq!(parse_thread_id(&json!({"threadId":""})), None);
365        assert_eq!(parse_turn_id(&json!({"turn":{"id":"  "}})), None);
366    }
367
368    #[test]
369    fn turn_stream_collector_ignores_other_turn_and_tracks_target_terminal() {
370        let mut stream = TurnStreamCollector::new("thr_target", "turn_target");
371
372        assert_eq!(
373            stream.push_envelope(&envelope(
374                "turn/completed",
375                None,
376                json!({"threadId":"thr_other","turnId":"turn_other"}),
377            )),
378            None
379        );
380        assert_eq!(stream.matching_turn_events(), 0);
381
382        assert_eq!(
383            stream.push_envelope(&envelope_for_turn(
384                "turn/completed",
385                "thr_target",
386                "turn_target",
387                None,
388                json!({"threadId":"thr_target","turnId":"turn_target"}),
389            )),
390            Some(TurnTerminalEvent::Completed)
391        );
392        assert_eq!(stream.matching_turn_events(), 1);
393    }
394
395    #[test]
396    fn turn_stream_collector_classifies_cancelled_terminal() {
397        let mut stream = TurnStreamCollector::new("thr", "turn");
398
399        let terminal = stream.push_envelope(&envelope_for_turn(
400            "turn/cancelled",
401            "thr",
402            "turn",
403            None,
404            json!({"threadId":"thr","turnId":"turn"}),
405        ));
406
407        assert_eq!(terminal, Some(TurnTerminalEvent::Cancelled));
408    }
409}