1use std::collections::HashSet;
2
3use serde_json::Value;
4
5use crate::runtime::events::Envelope;
6use crate::runtime::id::{parse_result_thread_id, parse_result_turn_id};
7use crate::runtime::rpc_contract::methods as events;
8
9use std::sync::Arc;
10
11#[derive(Clone, Debug, Default, PartialEq, Eq)]
14pub struct AssistantTextCollector {
15 assistant_item_ids: HashSet<Arc<str>>,
16 assistant_items_with_delta: HashSet<Arc<str>>,
17 text: String,
18}
19
20impl AssistantTextCollector {
21 pub fn new() -> Self {
24 Self::default()
25 }
26
27 pub fn push_envelope(&mut self, envelope: &Envelope) {
31 track_assistant_item(&mut self.assistant_item_ids, envelope);
32 append_text_from_envelope(
33 &mut self.text,
34 &self.assistant_item_ids,
35 &mut self.assistant_items_with_delta,
36 envelope,
37 );
38 }
39
40 pub fn text(&self) -> &str {
43 &self.text
44 }
45
46 pub fn into_text(self) -> String {
49 self.text
50 }
51}
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum TurnTerminalEvent {
56 Completed,
57 Failed,
58 Interrupted,
59 Cancelled,
60}
61
62#[derive(Clone, Debug)]
65pub struct TurnStreamCollector {
66 thread_id: Arc<str>,
67 turn_id: Arc<str>,
68 matching_turn_events: usize,
69 assistant: AssistantTextCollector,
70}
71
72impl TurnStreamCollector {
73 pub fn new(thread_id: &str, turn_id: &str) -> Self {
75 Self {
76 thread_id: Arc::from(thread_id),
77 turn_id: Arc::from(turn_id),
78 matching_turn_events: 0,
79 assistant: AssistantTextCollector::new(),
80 }
81 }
82
83 pub fn push_envelope(&mut self, envelope: &Envelope) -> Option<TurnTerminalEvent> {
85 if envelope.thread_id.as_deref() != Some(self.thread_id.as_ref())
86 || envelope.turn_id.as_deref() != Some(self.turn_id.as_ref())
87 {
88 return None;
89 }
90
91 self.matching_turn_events = self.matching_turn_events.saturating_add(1);
92 self.assistant.push_envelope(envelope);
93
94 match envelope.method.as_deref() {
95 Some(events::TURN_COMPLETED) => Some(TurnTerminalEvent::Completed),
96 Some(events::TURN_FAILED) => Some(TurnTerminalEvent::Failed),
97 Some(events::TURN_INTERRUPTED) => Some(TurnTerminalEvent::Interrupted),
98 Some(events::TURN_CANCELLED) => Some(TurnTerminalEvent::Cancelled),
99 _ => None,
100 }
101 }
102
103 pub fn is_target_envelope(&self, envelope: &Envelope) -> bool {
105 envelope.thread_id.as_deref() == Some(self.thread_id.as_ref())
106 && envelope.turn_id.as_deref() == Some(self.turn_id.as_ref())
107 }
108
109 pub fn matching_turn_events(&self) -> usize {
111 self.matching_turn_events
112 }
113
114 pub fn assistant_text(&self) -> &str {
116 self.assistant.text()
117 }
118
119 pub fn into_assistant_text(self) -> String {
121 self.assistant.into_text()
122 }
123}
124
125pub fn parse_thread_id(value: &Value) -> Option<String> {
128 parse_result_thread_id(value).map(ToOwned::to_owned)
129}
130
131pub fn parse_turn_id(value: &Value) -> Option<String> {
134 parse_result_turn_id(value).map(ToOwned::to_owned)
135}
136
137fn track_assistant_item(assistant_item_ids: &mut HashSet<Arc<str>>, envelope: &Envelope) {
138 if envelope.method.as_deref() != Some(events::ITEM_STARTED) {
139 return;
140 }
141
142 let params = envelope.json.get("params");
143 let item_type = params
144 .and_then(|p| p.get("itemType"))
145 .and_then(Value::as_str)
146 .unwrap_or("");
147 if item_type != "agentMessage" && item_type != "agent_message" {
148 return;
149 }
150 if let Some(item_id) = envelope.item_id.as_ref() {
151 assistant_item_ids.insert(item_id.clone());
152 }
153}
154
155fn append_text_from_envelope(
156 out: &mut String,
157 assistant_item_ids: &HashSet<Arc<str>>,
158 assistant_items_with_delta: &mut HashSet<Arc<str>>,
159 envelope: &Envelope,
160) {
161 let params = envelope.json.get("params");
162 match envelope.method.as_deref() {
163 Some(events::ITEM_AGENT_MESSAGE_DELTA) => {
164 if let Some(delta) = params.and_then(|p| p.get("delta")).and_then(Value::as_str) {
165 if let Some(item_id) = envelope.item_id.as_ref() {
166 assistant_items_with_delta.insert(item_id.clone());
167 }
168 out.push_str(delta);
169 }
170 }
171 Some(events::ITEM_COMPLETED) => {
172 let is_assistant_item = envelope
173 .item_id
174 .as_ref()
175 .map(|id| assistant_item_ids.contains(id))
176 .unwrap_or(false)
177 || params
178 .and_then(|p| p.get("item"))
179 .and_then(|v| v.get("type"))
180 .and_then(Value::as_str)
181 .map(|t| t == "agent_message" || t == "agentMessage")
182 .unwrap_or(false);
183 if !is_assistant_item {
184 return;
185 }
186 if envelope
187 .item_id
188 .as_ref()
189 .map(|id| assistant_items_with_delta.contains(id))
190 .unwrap_or(false)
191 {
192 return;
193 }
194
195 if let Some(text) = params.and_then(extract_text_from_params) {
196 if !text.is_empty() {
197 if !out.is_empty() {
198 out.push('\n');
199 }
200 out.push_str(&text);
201 }
202 }
203 }
204 Some(events::TURN_COMPLETED) => {
205 if let Some(text) = params.and_then(extract_text_from_params) {
206 merge_turn_completed_text(out, &text);
207 }
208 }
209 _ => {}
210 }
211}
212
213fn merge_turn_completed_text(out: &mut String, text: &str) {
214 if text.is_empty() {
215 return;
216 }
217 if out.is_empty() {
218 out.push_str(text);
219 return;
220 }
221 if out == text {
222 return;
223 }
224 if text.starts_with(out.as_str()) {
227 out.clear();
228 out.push_str(text);
229 return;
230 }
231 if out.ends_with(text) {
232 return;
233 }
234 out.push('\n');
235 out.push_str(text);
236}
237
238fn extract_text_from_params(params: &Value) -> Option<String> {
239 for ptr in ["/item/text", "/text", "/outputText", "/output/text"] {
240 if let Some(text) = params.pointer(ptr).and_then(Value::as_str) {
241 return Some(text.to_owned());
242 }
243 }
244 if let Some(content) = params
245 .get("item")
246 .and_then(|item| item.get("content"))
247 .and_then(Value::as_array)
248 {
249 let mut joined = String::new();
250 for part in content {
251 if let Some(text) = part.get("text").and_then(Value::as_str) {
252 joined.push_str(text);
253 }
254 }
255 if !joined.is_empty() {
256 return Some(joined);
257 }
258 }
259 None
260}
261
262#[cfg(test)]
263mod tests {
264 use serde_json::json;
265
266 use crate::runtime::events::{Direction, MsgKind};
267
268 use super::*;
269
270 fn envelope_for_turn(
271 method: &str,
272 thread_id: &str,
273 turn_id: &str,
274 item_id: Option<&str>,
275 params: Value,
276 ) -> Envelope {
277 Envelope {
278 seq: 1,
279 ts_millis: 0,
280 direction: Direction::Inbound,
281 kind: MsgKind::Notification,
282 rpc_id: None,
283 method: Some(Arc::from(method)),
284 thread_id: Some(Arc::from(thread_id)),
285 turn_id: Some(Arc::from(turn_id)),
286 item_id: item_id.map(Arc::from),
287 json: Arc::new(json!({"method": method, "params": params})),
288 }
289 }
290
291 fn envelope(method: &str, item_id: Option<&str>, params: Value) -> Envelope {
292 envelope_for_turn(method, "thr", "turn", item_id, params)
293 }
294
295 #[test]
296 fn collector_prefers_delta_and_ignores_completed_duplicate() {
297 let mut collector = AssistantTextCollector::new();
298 collector.push_envelope(&envelope(
299 "item/started",
300 Some("it_1"),
301 json!({"itemType":"agentMessage"}),
302 ));
303 collector.push_envelope(&envelope(
304 "item/agentMessage/delta",
305 Some("it_1"),
306 json!({"delta":"hello"}),
307 ));
308 collector.push_envelope(&envelope(
309 "item/completed",
310 Some("it_1"),
311 json!({"item":{"type":"agent_message","text":"hello"}}),
312 ));
313 assert_eq!(collector.text(), "hello");
314 }
315
316 #[test]
317 fn collector_reads_completed_text_without_delta() {
318 let mut collector = AssistantTextCollector::new();
319 collector.push_envelope(&envelope(
320 "item/started",
321 Some("it_2"),
322 json!({"itemType":"agent_message"}),
323 ));
324 collector.push_envelope(&envelope(
325 "item/completed",
326 Some("it_2"),
327 json!({"item":{"type":"agent_message","text":"world"}}),
328 ));
329 assert_eq!(collector.text(), "world");
330 }
331
332 #[test]
333 fn collector_dedups_turn_completed_text_after_item_completed() {
334 let mut collector = AssistantTextCollector::new();
335 collector.push_envelope(&envelope(
336 "item/started",
337 Some("it_3"),
338 json!({"itemType":"agent_message"}),
339 ));
340 collector.push_envelope(&envelope(
341 "item/completed",
342 Some("it_3"),
343 json!({"item":{"type":"agent_message","text":"final answer"}}),
344 ));
345 collector.push_envelope(&envelope(
346 "turn/completed",
347 None,
348 json!({"text":"final answer"}),
349 ));
350 assert_eq!(collector.text(), "final answer");
351 }
352
353 #[test]
354 fn parse_ids_from_result_shapes() {
355 let v = json!({"thread":{"id":"thr_1"},"turn":{"id":"turn_1"}});
356 assert_eq!(parse_thread_id(&v).as_deref(), Some("thr_1"));
357 assert_eq!(parse_turn_id(&v).as_deref(), Some("turn_1"));
358 }
359
360 #[test]
361 fn parse_ids_reject_loose_id_fallback_and_empty_values() {
362 assert_eq!(parse_thread_id(&json!({"id":"thr_loose"})), None);
363 assert_eq!(parse_turn_id(&json!("turn_loose")), None);
364 assert_eq!(parse_thread_id(&json!({"threadId":""})), None);
365 assert_eq!(parse_turn_id(&json!({"turn":{"id":" "}})), None);
366 }
367
368 #[test]
369 fn turn_stream_collector_ignores_other_turn_and_tracks_target_terminal() {
370 let mut stream = TurnStreamCollector::new("thr_target", "turn_target");
371
372 assert_eq!(
373 stream.push_envelope(&envelope(
374 "turn/completed",
375 None,
376 json!({"threadId":"thr_other","turnId":"turn_other"}),
377 )),
378 None
379 );
380 assert_eq!(stream.matching_turn_events(), 0);
381
382 assert_eq!(
383 stream.push_envelope(&envelope_for_turn(
384 "turn/completed",
385 "thr_target",
386 "turn_target",
387 None,
388 json!({"threadId":"thr_target","turnId":"turn_target"}),
389 )),
390 Some(TurnTerminalEvent::Completed)
391 );
392 assert_eq!(stream.matching_turn_events(), 1);
393 }
394
395 #[test]
396 fn turn_stream_collector_classifies_cancelled_terminal() {
397 let mut stream = TurnStreamCollector::new("thr", "turn");
398
399 let terminal = stream.push_envelope(&envelope_for_turn(
400 "turn/cancelled",
401 "thr",
402 "turn",
403 None,
404 json!({"threadId":"thr","turnId":"turn"}),
405 ));
406
407 assert_eq!(terminal, Some(TurnTerminalEvent::Cancelled));
408 }
409}