Skip to main content

codex_runtime/runtime/
turn_output.rs

1use std::collections::HashSet;
2
3use serde_json::Value;
4
5use crate::runtime::events::{extract_text_from_params, Envelope};
6use crate::runtime::id::{parse_result_thread_id, parse_result_turn_id};
7use crate::runtime::rpc_contract::methods as events;
8
9use std::sync::Arc;
10
11/// Incremental assistant text collector for one turn stream.
12/// Keeps explicit state to avoid duplicate text from both delta and completed payloads.
13#[derive(Clone, Debug, Default, PartialEq, Eq)]
14pub struct AssistantTextCollector {
15    assistant_item_ids: HashSet<Arc<str>>,
16    assistant_items_with_delta: HashSet<Arc<str>>,
17    text: String,
18}
19
20impl AssistantTextCollector {
21    /// Create empty collector.
22    /// Allocation: none. Complexity: O(1).
23    pub fn new() -> Self {
24        Self::default()
25    }
26
27    /// Consume one envelope and update internal text state.
28    /// Allocation: O(delta) for appended text and newly seen item ids.
29    /// Complexity: O(1).
30    pub fn push_envelope(&mut self, envelope: &Envelope) {
31        track_assistant_item(&mut self.assistant_item_ids, envelope);
32        append_text_from_envelope(
33            &mut self.text,
34            &self.assistant_item_ids,
35            &mut self.assistant_items_with_delta,
36            envelope,
37        );
38    }
39
40    /// Borrow collected raw text.
41    /// Allocation: none. Complexity: O(1).
42    pub fn text(&self) -> &str {
43        &self.text
44    }
45
46    /// Take ownership of collected raw text.
47    /// Allocation: none. Complexity: O(1).
48    pub fn into_text(self) -> String {
49        self.text
50    }
51}
52
53/// Terminal state of one turn observed from live stream events.
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum TurnTerminalEvent {
56    Completed,
57    Failed,
58    Interrupted,
59    Cancelled,
60}
61
62/// Shared turn stream collector engine used by runtime prompt and artifact execution flows.
63/// It filters by `(thread_id, turn_id)`, accumulates assistant text, and reports terminal events.
64#[derive(Clone, Debug)]
65pub struct TurnStreamCollector {
66    thread_id: Arc<str>,
67    turn_id: Arc<str>,
68    matching_turn_events: usize,
69    assistant: AssistantTextCollector,
70}
71
72impl TurnStreamCollector {
73    /// Create collector bound to one target turn.
74    pub fn new(thread_id: &str, turn_id: &str) -> Self {
75        Self {
76            thread_id: Arc::from(thread_id),
77            turn_id: Arc::from(turn_id),
78            matching_turn_events: 0,
79            assistant: AssistantTextCollector::new(),
80        }
81    }
82
83    /// Consume one envelope. Returns terminal event when this envelope closes the target turn.
84    pub fn push_envelope(&mut self, envelope: &Envelope) -> Option<TurnTerminalEvent> {
85        if envelope.thread_id.as_deref() != Some(self.thread_id.as_ref())
86            || envelope.turn_id.as_deref() != Some(self.turn_id.as_ref())
87        {
88            return None;
89        }
90
91        self.matching_turn_events = self.matching_turn_events.saturating_add(1);
92        self.assistant.push_envelope(envelope);
93
94        match envelope.method.as_deref() {
95            Some(events::TURN_COMPLETED) => Some(TurnTerminalEvent::Completed),
96            Some(events::TURN_FAILED) => Some(TurnTerminalEvent::Failed),
97            Some(events::TURN_INTERRUPTED) => Some(TurnTerminalEvent::Interrupted),
98            Some(events::TURN_CANCELLED) => Some(TurnTerminalEvent::Cancelled),
99            _ => None,
100        }
101    }
102
103    /// Whether one envelope belongs to this collector turn target.
104    pub fn is_target_envelope(&self, envelope: &Envelope) -> bool {
105        envelope.thread_id.as_deref() == Some(self.thread_id.as_ref())
106            && envelope.turn_id.as_deref() == Some(self.turn_id.as_ref())
107    }
108
109    /// Number of consumed envelopes that matched the target turn.
110    pub fn matching_turn_events(&self) -> usize {
111        self.matching_turn_events
112    }
113
114    /// Borrow current collected assistant text.
115    pub fn assistant_text(&self) -> &str {
116        self.assistant.text()
117    }
118
119    /// Take ownership of collected assistant text.
120    pub fn into_assistant_text(self) -> String {
121        self.assistant.into_text()
122    }
123}
124
125/// Parse thread id from common JSON-RPC result shapes.
126/// Allocation: one String on match. Complexity: O(1).
127pub fn parse_thread_id(value: &Value) -> Option<String> {
128    parse_result_thread_id(value).map(ToOwned::to_owned)
129}
130
131/// Parse turn id from common JSON-RPC result shapes.
132/// Allocation: one String on match. Complexity: O(1).
133pub fn parse_turn_id(value: &Value) -> Option<String> {
134    parse_result_turn_id(value).map(ToOwned::to_owned)
135}
136
137fn track_assistant_item(assistant_item_ids: &mut HashSet<Arc<str>>, envelope: &Envelope) {
138    if envelope.method.as_deref() != Some(events::ITEM_STARTED) {
139        return;
140    }
141
142    let params = envelope.json.get("params");
143    let item_type = params
144        .and_then(|p| p.get("itemType"))
145        .and_then(Value::as_str)
146        .unwrap_or("");
147    if item_type != "agentMessage" && item_type != "agent_message" {
148        return;
149    }
150    if let Some(item_id) = envelope.item_id.as_ref() {
151        assistant_item_ids.insert(item_id.clone());
152    }
153}
154
155fn append_text_from_envelope(
156    out: &mut String,
157    assistant_item_ids: &HashSet<Arc<str>>,
158    assistant_items_with_delta: &mut HashSet<Arc<str>>,
159    envelope: &Envelope,
160) {
161    let params = envelope.json.get("params");
162    match envelope.method.as_deref() {
163        Some(events::ITEM_AGENT_MESSAGE_DELTA) => {
164            if let Some(delta) = params.and_then(|p| p.get("delta")).and_then(Value::as_str) {
165                if let Some(item_id) = envelope.item_id.as_ref() {
166                    assistant_items_with_delta.insert(item_id.clone());
167                }
168                out.push_str(delta);
169            }
170        }
171        Some(events::ITEM_COMPLETED) => {
172            let is_assistant_item = envelope
173                .item_id
174                .as_ref()
175                .map(|id| assistant_item_ids.contains(id))
176                .unwrap_or(false)
177                || params
178                    .and_then(|p| p.get("item"))
179                    .and_then(|v| v.get("type"))
180                    .and_then(Value::as_str)
181                    .map(|t| t == "agent_message" || t == "agentMessage")
182                    .unwrap_or(false);
183            if !is_assistant_item {
184                return;
185            }
186            if envelope
187                .item_id
188                .as_ref()
189                .map(|id| assistant_items_with_delta.contains(id))
190                .unwrap_or(false)
191            {
192                return;
193            }
194
195            if let Some(text) = params.and_then(extract_text_from_params) {
196                if !text.is_empty() {
197                    if !out.is_empty() {
198                        out.push('\n');
199                    }
200                    out.push_str(&text);
201                }
202            }
203        }
204        Some(events::TURN_COMPLETED) => {
205            if let Some(text) = params.and_then(extract_text_from_params) {
206                merge_turn_completed_text(out, &text);
207            }
208        }
209        _ => {}
210    }
211}
212
213fn merge_turn_completed_text(out: &mut String, text: &str) {
214    if text.is_empty() {
215        return;
216    }
217    if out.is_empty() {
218        out.push_str(text);
219        return;
220    }
221    if out == text {
222        return;
223    }
224    // If turn/completed includes the full final text and we only collected a prefix
225    // from deltas, promote to the complete payload instead of duplicating.
226    if text.starts_with(out.as_str()) {
227        out.clear();
228        out.push_str(text);
229        return;
230    }
231    if out.ends_with(text) {
232        return;
233    }
234    out.push('\n');
235    out.push_str(text);
236}
237
238#[cfg(test)]
239mod tests {
240    use serde_json::json;
241
242    use crate::runtime::events::{Direction, MsgKind};
243
244    use super::*;
245
246    fn envelope_for_turn(
247        method: &str,
248        thread_id: &str,
249        turn_id: &str,
250        item_id: Option<&str>,
251        params: Value,
252    ) -> Envelope {
253        Envelope {
254            seq: 1,
255            ts_millis: 0,
256            direction: Direction::Inbound,
257            kind: MsgKind::Notification,
258            rpc_id: None,
259            method: Some(Arc::from(method)),
260            thread_id: Some(Arc::from(thread_id)),
261            turn_id: Some(Arc::from(turn_id)),
262            item_id: item_id.map(Arc::from),
263            json: Arc::new(json!({"method": method, "params": params})),
264        }
265    }
266
267    fn envelope(method: &str, item_id: Option<&str>, params: Value) -> Envelope {
268        envelope_for_turn(method, "thr", "turn", item_id, params)
269    }
270
271    #[test]
272    fn collector_prefers_delta_and_ignores_completed_duplicate() {
273        let mut collector = AssistantTextCollector::new();
274        collector.push_envelope(&envelope(
275            "item/started",
276            Some("it_1"),
277            json!({"itemType":"agentMessage"}),
278        ));
279        collector.push_envelope(&envelope(
280            "item/agentMessage/delta",
281            Some("it_1"),
282            json!({"delta":"hello"}),
283        ));
284        collector.push_envelope(&envelope(
285            "item/completed",
286            Some("it_1"),
287            json!({"item":{"type":"agent_message","text":"hello"}}),
288        ));
289        assert_eq!(collector.text(), "hello");
290    }
291
292    #[test]
293    fn collector_reads_completed_text_without_delta() {
294        let mut collector = AssistantTextCollector::new();
295        collector.push_envelope(&envelope(
296            "item/started",
297            Some("it_2"),
298            json!({"itemType":"agent_message"}),
299        ));
300        collector.push_envelope(&envelope(
301            "item/completed",
302            Some("it_2"),
303            json!({"item":{"type":"agent_message","text":"world"}}),
304        ));
305        assert_eq!(collector.text(), "world");
306    }
307
308    #[test]
309    fn collector_dedups_turn_completed_text_after_item_completed() {
310        let mut collector = AssistantTextCollector::new();
311        collector.push_envelope(&envelope(
312            "item/started",
313            Some("it_3"),
314            json!({"itemType":"agent_message"}),
315        ));
316        collector.push_envelope(&envelope(
317            "item/completed",
318            Some("it_3"),
319            json!({"item":{"type":"agent_message","text":"final answer"}}),
320        ));
321        collector.push_envelope(&envelope(
322            "turn/completed",
323            None,
324            json!({"text":"final answer"}),
325        ));
326        assert_eq!(collector.text(), "final answer");
327    }
328
329    #[test]
330    fn parse_ids_from_result_shapes() {
331        let v = json!({"thread":{"id":"thr_1"},"turn":{"id":"turn_1"}});
332        assert_eq!(parse_thread_id(&v).as_deref(), Some("thr_1"));
333        assert_eq!(parse_turn_id(&v).as_deref(), Some("turn_1"));
334    }
335
336    #[test]
337    fn parse_ids_reject_loose_id_fallback_and_empty_values() {
338        assert_eq!(parse_thread_id(&json!({"id":"thr_loose"})), None);
339        assert_eq!(parse_turn_id(&json!("turn_loose")), None);
340        assert_eq!(parse_thread_id(&json!({"threadId":""})), None);
341        assert_eq!(parse_turn_id(&json!({"turn":{"id":"  "}})), None);
342    }
343
344    #[test]
345    fn turn_stream_collector_ignores_other_turn_and_tracks_target_terminal() {
346        let mut stream = TurnStreamCollector::new("thr_target", "turn_target");
347
348        assert_eq!(
349            stream.push_envelope(&envelope(
350                "turn/completed",
351                None,
352                json!({"threadId":"thr_other","turnId":"turn_other"}),
353            )),
354            None
355        );
356        assert_eq!(stream.matching_turn_events(), 0);
357
358        assert_eq!(
359            stream.push_envelope(&envelope_for_turn(
360                "turn/completed",
361                "thr_target",
362                "turn_target",
363                None,
364                json!({"threadId":"thr_target","turnId":"turn_target"}),
365            )),
366            Some(TurnTerminalEvent::Completed)
367        );
368        assert_eq!(stream.matching_turn_events(), 1);
369    }
370
371    #[test]
372    fn turn_stream_collector_classifies_cancelled_terminal() {
373        let mut stream = TurnStreamCollector::new("thr", "turn");
374
375        let terminal = stream.push_envelope(&envelope_for_turn(
376            "turn/cancelled",
377            "thr",
378            "turn",
379            None,
380            json!({"threadId":"thr","turnId":"turn"}),
381        ));
382
383        assert_eq!(terminal, Some(TurnTerminalEvent::Cancelled));
384    }
385}