1use std::collections::HashSet;
2
3use serde_json::Value;
4
5use crate::runtime::events::{extract_text_from_params, Envelope};
6use crate::runtime::id::{parse_result_thread_id, parse_result_turn_id};
7use crate::runtime::rpc_contract::methods as events;
8
9use std::sync::Arc;
10
11#[derive(Clone, Debug, Default, PartialEq, Eq)]
14pub struct AssistantTextCollector {
15 assistant_item_ids: HashSet<Arc<str>>,
16 assistant_items_with_delta: HashSet<Arc<str>>,
17 text: String,
18}
19
20impl AssistantTextCollector {
21 pub fn new() -> Self {
24 Self::default()
25 }
26
27 pub fn push_envelope(&mut self, envelope: &Envelope) {
31 track_assistant_item(&mut self.assistant_item_ids, envelope);
32 append_text_from_envelope(
33 &mut self.text,
34 &self.assistant_item_ids,
35 &mut self.assistant_items_with_delta,
36 envelope,
37 );
38 }
39
40 pub fn text(&self) -> &str {
43 &self.text
44 }
45
46 pub fn into_text(self) -> String {
49 self.text
50 }
51}
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum TurnTerminalEvent {
56 Completed,
57 Failed,
58 Interrupted,
59 Cancelled,
60}
61
62#[derive(Clone, Debug)]
65pub struct TurnStreamCollector {
66 thread_id: Arc<str>,
67 turn_id: Arc<str>,
68 matching_turn_events: usize,
69 assistant: AssistantTextCollector,
70}
71
72impl TurnStreamCollector {
73 pub fn new(thread_id: &str, turn_id: &str) -> Self {
75 Self {
76 thread_id: Arc::from(thread_id),
77 turn_id: Arc::from(turn_id),
78 matching_turn_events: 0,
79 assistant: AssistantTextCollector::new(),
80 }
81 }
82
83 pub fn push_envelope(&mut self, envelope: &Envelope) -> Option<TurnTerminalEvent> {
85 if envelope.thread_id.as_deref() != Some(self.thread_id.as_ref())
86 || envelope.turn_id.as_deref() != Some(self.turn_id.as_ref())
87 {
88 return None;
89 }
90
91 self.matching_turn_events = self.matching_turn_events.saturating_add(1);
92 self.assistant.push_envelope(envelope);
93
94 match envelope.method.as_deref() {
95 Some(events::TURN_COMPLETED) => Some(TurnTerminalEvent::Completed),
96 Some(events::TURN_FAILED) => Some(TurnTerminalEvent::Failed),
97 Some(events::TURN_INTERRUPTED) => Some(TurnTerminalEvent::Interrupted),
98 Some(events::TURN_CANCELLED) => Some(TurnTerminalEvent::Cancelled),
99 _ => None,
100 }
101 }
102
103 pub fn is_target_envelope(&self, envelope: &Envelope) -> bool {
105 envelope.thread_id.as_deref() == Some(self.thread_id.as_ref())
106 && envelope.turn_id.as_deref() == Some(self.turn_id.as_ref())
107 }
108
109 pub fn matching_turn_events(&self) -> usize {
111 self.matching_turn_events
112 }
113
114 pub fn assistant_text(&self) -> &str {
116 self.assistant.text()
117 }
118
119 pub fn into_assistant_text(self) -> String {
121 self.assistant.into_text()
122 }
123}
124
125pub fn parse_thread_id(value: &Value) -> Option<String> {
128 parse_result_thread_id(value).map(ToOwned::to_owned)
129}
130
131pub fn parse_turn_id(value: &Value) -> Option<String> {
134 parse_result_turn_id(value).map(ToOwned::to_owned)
135}
136
137fn track_assistant_item(assistant_item_ids: &mut HashSet<Arc<str>>, envelope: &Envelope) {
138 if envelope.method.as_deref() != Some(events::ITEM_STARTED) {
139 return;
140 }
141
142 let params = envelope.json.get("params");
143 let item_type = params
144 .and_then(|p| p.get("itemType"))
145 .and_then(Value::as_str)
146 .unwrap_or("");
147 if item_type != "agentMessage" && item_type != "agent_message" {
148 return;
149 }
150 if let Some(item_id) = envelope.item_id.as_ref() {
151 assistant_item_ids.insert(item_id.clone());
152 }
153}
154
155fn append_text_from_envelope(
156 out: &mut String,
157 assistant_item_ids: &HashSet<Arc<str>>,
158 assistant_items_with_delta: &mut HashSet<Arc<str>>,
159 envelope: &Envelope,
160) {
161 let params = envelope.json.get("params");
162 match envelope.method.as_deref() {
163 Some(events::ITEM_AGENT_MESSAGE_DELTA) => {
164 if let Some(delta) = params.and_then(|p| p.get("delta")).and_then(Value::as_str) {
165 if let Some(item_id) = envelope.item_id.as_ref() {
166 assistant_items_with_delta.insert(item_id.clone());
167 }
168 out.push_str(delta);
169 }
170 }
171 Some(events::ITEM_COMPLETED) => {
172 let is_assistant_item = envelope
173 .item_id
174 .as_ref()
175 .map(|id| assistant_item_ids.contains(id))
176 .unwrap_or(false)
177 || params
178 .and_then(|p| p.get("item"))
179 .and_then(|v| v.get("type"))
180 .and_then(Value::as_str)
181 .map(|t| t == "agent_message" || t == "agentMessage")
182 .unwrap_or(false);
183 if !is_assistant_item {
184 return;
185 }
186 if envelope
187 .item_id
188 .as_ref()
189 .map(|id| assistant_items_with_delta.contains(id))
190 .unwrap_or(false)
191 {
192 return;
193 }
194
195 if let Some(text) = params.and_then(extract_text_from_params) {
196 if !text.is_empty() {
197 if !out.is_empty() {
198 out.push('\n');
199 }
200 out.push_str(&text);
201 }
202 }
203 }
204 Some(events::TURN_COMPLETED) => {
205 if let Some(text) = params.and_then(extract_text_from_params) {
206 merge_turn_completed_text(out, &text);
207 }
208 }
209 _ => {}
210 }
211}
212
213fn merge_turn_completed_text(out: &mut String, text: &str) {
214 if text.is_empty() {
215 return;
216 }
217 if out.is_empty() {
218 out.push_str(text);
219 return;
220 }
221 if out == text {
222 return;
223 }
224 if text.starts_with(out.as_str()) {
227 out.clear();
228 out.push_str(text);
229 return;
230 }
231 if out.ends_with(text) {
232 return;
233 }
234 out.push('\n');
235 out.push_str(text);
236}
237
238#[cfg(test)]
239mod tests {
240 use serde_json::json;
241
242 use crate::runtime::events::{Direction, MsgKind};
243
244 use super::*;
245
246 fn envelope_for_turn(
247 method: &str,
248 thread_id: &str,
249 turn_id: &str,
250 item_id: Option<&str>,
251 params: Value,
252 ) -> Envelope {
253 Envelope {
254 seq: 1,
255 ts_millis: 0,
256 direction: Direction::Inbound,
257 kind: MsgKind::Notification,
258 rpc_id: None,
259 method: Some(Arc::from(method)),
260 thread_id: Some(Arc::from(thread_id)),
261 turn_id: Some(Arc::from(turn_id)),
262 item_id: item_id.map(Arc::from),
263 json: Arc::new(json!({"method": method, "params": params})),
264 }
265 }
266
267 fn envelope(method: &str, item_id: Option<&str>, params: Value) -> Envelope {
268 envelope_for_turn(method, "thr", "turn", item_id, params)
269 }
270
271 #[test]
272 fn collector_prefers_delta_and_ignores_completed_duplicate() {
273 let mut collector = AssistantTextCollector::new();
274 collector.push_envelope(&envelope(
275 "item/started",
276 Some("it_1"),
277 json!({"itemType":"agentMessage"}),
278 ));
279 collector.push_envelope(&envelope(
280 "item/agentMessage/delta",
281 Some("it_1"),
282 json!({"delta":"hello"}),
283 ));
284 collector.push_envelope(&envelope(
285 "item/completed",
286 Some("it_1"),
287 json!({"item":{"type":"agent_message","text":"hello"}}),
288 ));
289 assert_eq!(collector.text(), "hello");
290 }
291
292 #[test]
293 fn collector_reads_completed_text_without_delta() {
294 let mut collector = AssistantTextCollector::new();
295 collector.push_envelope(&envelope(
296 "item/started",
297 Some("it_2"),
298 json!({"itemType":"agent_message"}),
299 ));
300 collector.push_envelope(&envelope(
301 "item/completed",
302 Some("it_2"),
303 json!({"item":{"type":"agent_message","text":"world"}}),
304 ));
305 assert_eq!(collector.text(), "world");
306 }
307
308 #[test]
309 fn collector_dedups_turn_completed_text_after_item_completed() {
310 let mut collector = AssistantTextCollector::new();
311 collector.push_envelope(&envelope(
312 "item/started",
313 Some("it_3"),
314 json!({"itemType":"agent_message"}),
315 ));
316 collector.push_envelope(&envelope(
317 "item/completed",
318 Some("it_3"),
319 json!({"item":{"type":"agent_message","text":"final answer"}}),
320 ));
321 collector.push_envelope(&envelope(
322 "turn/completed",
323 None,
324 json!({"text":"final answer"}),
325 ));
326 assert_eq!(collector.text(), "final answer");
327 }
328
329 #[test]
330 fn parse_ids_from_result_shapes() {
331 let v = json!({"thread":{"id":"thr_1"},"turn":{"id":"turn_1"}});
332 assert_eq!(parse_thread_id(&v).as_deref(), Some("thr_1"));
333 assert_eq!(parse_turn_id(&v).as_deref(), Some("turn_1"));
334 }
335
336 #[test]
337 fn parse_ids_reject_loose_id_fallback_and_empty_values() {
338 assert_eq!(parse_thread_id(&json!({"id":"thr_loose"})), None);
339 assert_eq!(parse_turn_id(&json!("turn_loose")), None);
340 assert_eq!(parse_thread_id(&json!({"threadId":""})), None);
341 assert_eq!(parse_turn_id(&json!({"turn":{"id":" "}})), None);
342 }
343
344 #[test]
345 fn turn_stream_collector_ignores_other_turn_and_tracks_target_terminal() {
346 let mut stream = TurnStreamCollector::new("thr_target", "turn_target");
347
348 assert_eq!(
349 stream.push_envelope(&envelope(
350 "turn/completed",
351 None,
352 json!({"threadId":"thr_other","turnId":"turn_other"}),
353 )),
354 None
355 );
356 assert_eq!(stream.matching_turn_events(), 0);
357
358 assert_eq!(
359 stream.push_envelope(&envelope_for_turn(
360 "turn/completed",
361 "thr_target",
362 "turn_target",
363 None,
364 json!({"threadId":"thr_target","turnId":"turn_target"}),
365 )),
366 Some(TurnTerminalEvent::Completed)
367 );
368 assert_eq!(stream.matching_turn_events(), 1);
369 }
370
371 #[test]
372 fn turn_stream_collector_classifies_cancelled_terminal() {
373 let mut stream = TurnStreamCollector::new("thr", "turn");
374
375 let terminal = stream.push_envelope(&envelope_for_turn(
376 "turn/cancelled",
377 "thr",
378 "turn",
379 None,
380 json!({"threadId":"thr","turnId":"turn"}),
381 ));
382
383 assert_eq!(terminal, Some(TurnTerminalEvent::Cancelled));
384 }
385}