1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5
6use crate::runtime::approvals::PendingServerRequest;
7use crate::runtime::events::Envelope;
8use crate::runtime::rpc_contract::methods as events;
9
10#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
11#[serde(rename_all = "camelCase")]
12pub enum ConnectionState {
13 Starting,
14 Handshaking,
15 Running { generation: u64 },
16 Restarting { generation: u64 },
17 ShuttingDown,
18 Dead,
19}
20
21#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
22#[serde(rename_all = "camelCase")]
23pub struct RuntimeState {
24 pub connection: ConnectionState,
25 pub threads: HashMap<String, ThreadState>,
26 pub pending_server_requests: HashMap<String, PendingServerRequest>,
27}
28
29#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
30#[serde(rename_all = "camelCase")]
31pub struct StateProjectionLimits {
32 pub max_threads: usize,
33 pub max_turns_per_thread: usize,
34 pub max_items_per_turn: usize,
35 pub max_text_bytes_per_item: usize,
36 pub max_stdout_bytes_per_item: usize,
37 pub max_stderr_bytes_per_item: usize,
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
41#[serde(rename_all = "camelCase")]
42pub struct ThreadState {
43 pub id: String,
44 pub active_turn: Option<String>,
45 pub turns: HashMap<String, TurnState>,
46 pub last_diff: Option<String>,
47 pub plan: Option<Value>,
48 pub last_seq: u64,
49}
50
51#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
52#[serde(rename_all = "camelCase")]
53pub enum TurnStatus {
54 InProgress,
55 Completed,
56 Failed,
57 Cancelled,
58 Interrupted,
59}
60
61#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
62#[serde(rename_all = "camelCase")]
63pub struct TurnState {
64 pub id: String,
65 pub status: TurnStatus,
66 pub items: HashMap<String, ItemState>,
67 pub error: Option<Value>,
68 pub last_seq: u64,
69}
70
71#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(rename_all = "camelCase")]
73pub struct ItemState {
74 pub id: String,
75 pub item_type: String,
76 pub started: Option<Value>,
77 pub completed: Option<Value>,
78 pub text_accum: String,
79 pub stdout_accum: String,
80 pub stderr_accum: String,
81 pub text_truncated: bool,
82 pub stdout_truncated: bool,
83 pub stderr_truncated: bool,
84 pub last_seq: u64,
85}
86
87impl Default for RuntimeState {
88 fn default() -> Self {
89 Self {
90 connection: ConnectionState::Starting,
91 threads: HashMap::new(),
92 pending_server_requests: HashMap::new(),
93 }
94 }
95}
96
97impl Default for StateProjectionLimits {
98 fn default() -> Self {
99 Self {
100 max_threads: 256,
101 max_turns_per_thread: 256,
102 max_items_per_turn: 256,
103 max_text_bytes_per_item: 256 * 1024,
104 max_stdout_bytes_per_item: 256 * 1024,
105 max_stderr_bytes_per_item: 256 * 1024,
106 }
107 }
108}
109
110pub fn reduce(mut state: RuntimeState, envelope: &Envelope) -> RuntimeState {
113 reduce_in_place_with_limits(&mut state, envelope, &StateProjectionLimits::default());
114 state
115}
116
117pub fn reduce_in_place(state: &mut RuntimeState, envelope: &Envelope) {
120 reduce_in_place_with_limits(state, envelope, &StateProjectionLimits::default());
121}
122
123pub fn reduce_in_place_with_limits(
130 state: &mut RuntimeState,
131 envelope: &Envelope,
132 limits: &StateProjectionLimits,
133) {
134 let Some(method) = envelope.method.as_deref() else {
135 return;
136 };
137 let seq = envelope.seq;
138 let touched_thread_id = envelope.thread_id.as_deref();
139 if is_stale_thread_event(state, touched_thread_id, seq) {
140 return;
141 }
142
143 match method {
144 events::THREAD_STARTED => handle_thread_started(state, envelope, seq),
145 events::TURN_STARTED => handle_turn_started(state, envelope, seq),
146 events::TURN_COMPLETED => {
147 handle_turn_terminal(state, envelope, seq, TurnStatus::Completed, false)
148 }
149 events::TURN_FAILED => handle_turn_terminal(state, envelope, seq, TurnStatus::Failed, true),
150 events::TURN_CANCELLED => {
151 handle_turn_terminal(state, envelope, seq, TurnStatus::Cancelled, false)
152 }
153 events::TURN_INTERRUPTED => {
154 handle_turn_terminal(state, envelope, seq, TurnStatus::Interrupted, false)
155 }
156 events::TURN_DIFF_UPDATED => handle_turn_diff_updated(state, envelope, seq),
157 events::TURN_PLAN_UPDATED => handle_turn_plan_updated(state, envelope, seq),
158 events::ITEM_STARTED => handle_item_started(state, envelope, seq),
159 events::ITEM_AGENT_MESSAGE_DELTA => {
160 handle_item_agent_message_delta(state, envelope, seq, limits)
161 }
162 events::ITEM_COMMAND_EXECUTION_OUTPUT_DELTA => {
163 handle_item_command_output_delta(state, envelope, seq, limits)
164 }
165 events::ITEM_COMPLETED => handle_item_completed(state, envelope, seq),
166 _ => {}
167 }
168
169 prune_state(state, limits, touched_thread_id);
170}
171
172fn is_stale_thread_event(state: &RuntimeState, thread_id: Option<&str>, seq: u64) -> bool {
173 let Some(thread_id) = thread_id else {
174 return false;
175 };
176 state
177 .threads
178 .get(thread_id)
179 .is_some_and(|thread| seq < thread.last_seq)
180}
181
182fn handle_thread_started(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
183 let Some(thread_id) = envelope.thread_id.as_deref() else {
184 return;
185 };
186 thread_mut(state, thread_id, seq);
187}
188
189fn handle_turn_started(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
190 let Some((thread_id, turn_id)) = thread_and_turn_ids(envelope) else {
191 return;
192 };
193 let thread = thread_mut(state, thread_id, seq);
194 thread.active_turn = Some(turn_id.to_owned());
195 let turn = turn_mut(thread, turn_id, seq);
196 turn.status = TurnStatus::InProgress;
197}
198
199fn handle_turn_terminal(
200 state: &mut RuntimeState,
201 envelope: &Envelope,
202 seq: u64,
203 status: TurnStatus,
204 with_error: bool,
205) {
206 let Some((thread_id, turn_id)) = thread_and_turn_ids(envelope) else {
207 return;
208 };
209 let thread = thread_mut(state, thread_id, seq);
210 clear_active_turn_if_matching(thread, turn_id);
211 let turn = turn_mut(thread, turn_id, seq);
212 turn.status = status;
213 if with_error {
214 turn.error = envelope
215 .json
216 .get("params")
217 .and_then(|p| p.get("error"))
218 .cloned();
219 }
220}
221
222fn clear_active_turn_if_matching(thread: &mut ThreadState, turn_id: &str) {
223 if thread.active_turn.as_deref() == Some(turn_id) {
224 thread.active_turn = None;
225 }
226}
227
228fn handle_turn_diff_updated(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
229 let Some(thread_id) = envelope.thread_id.as_deref() else {
230 return;
231 };
232 let thread = thread_mut(state, thread_id, seq);
233 thread.last_diff = envelope
234 .json
235 .get("params")
236 .and_then(|p| p.get("diff"))
237 .and_then(Value::as_str)
238 .map(ToOwned::to_owned);
239}
240
241fn handle_turn_plan_updated(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
242 let Some(thread_id) = envelope.thread_id.as_deref() else {
243 return;
244 };
245 let thread = thread_mut(state, thread_id, seq);
246 thread.plan = envelope
247 .json
248 .get("params")
249 .and_then(|p| p.get("plan"))
250 .cloned();
251}
252
253fn handle_item_started(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
254 let Some(item) = item_from_envelope(state, envelope, seq) else {
255 return;
256 };
257 item.started = envelope.json.get("params").cloned();
258 item.item_type = envelope
259 .json
260 .get("params")
261 .and_then(|p| p.get("itemType"))
262 .and_then(Value::as_str)
263 .unwrap_or("unknown")
264 .to_owned();
265}
266
267fn handle_item_agent_message_delta(
268 state: &mut RuntimeState,
269 envelope: &Envelope,
270 seq: u64,
271 limits: &StateProjectionLimits,
272) {
273 let delta = envelope
274 .json
275 .get("params")
276 .and_then(|p| p.get("delta"))
277 .and_then(Value::as_str)
278 .unwrap_or("");
279 let Some(item) = item_from_envelope(state, envelope, seq) else {
280 return;
281 };
282 append_capped(
283 &mut item.text_accum,
284 delta,
285 limits.max_text_bytes_per_item,
286 &mut item.text_truncated,
287 );
288}
289
290fn handle_item_command_output_delta(
291 state: &mut RuntimeState,
292 envelope: &Envelope,
293 seq: u64,
294 limits: &StateProjectionLimits,
295) {
296 let stdout = envelope
297 .json
298 .get("params")
299 .and_then(|p| p.get("stdout"))
300 .and_then(Value::as_str)
301 .unwrap_or("");
302 let stderr = envelope
303 .json
304 .get("params")
305 .and_then(|p| p.get("stderr"))
306 .and_then(Value::as_str)
307 .unwrap_or("");
308
309 let Some(item) = item_from_envelope(state, envelope, seq) else {
310 return;
311 };
312 append_capped(
313 &mut item.stdout_accum,
314 stdout,
315 limits.max_stdout_bytes_per_item,
316 &mut item.stdout_truncated,
317 );
318 append_capped(
319 &mut item.stderr_accum,
320 stderr,
321 limits.max_stderr_bytes_per_item,
322 &mut item.stderr_truncated,
323 );
324}
325
326fn handle_item_completed(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
327 let Some(item) = item_from_envelope(state, envelope, seq) else {
328 return;
329 };
330 item.completed = envelope.json.get("params").cloned();
331}
332
333fn thread_and_turn_ids(envelope: &Envelope) -> Option<(&str, &str)> {
334 let (Some(thread_id), Some(turn_id)) =
335 (envelope.thread_id.as_deref(), envelope.turn_id.as_deref())
336 else {
337 return None;
338 };
339 Some((thread_id, turn_id))
340}
341
342fn thread_turn_item_ids(envelope: &Envelope) -> Option<(&str, &str, &str)> {
343 let (thread_id, turn_id) = thread_and_turn_ids(envelope)?;
344 let item_id = envelope.item_id.as_deref()?;
345 Some((thread_id, turn_id, item_id))
346}
347
348fn item_from_envelope<'a>(
349 state: &'a mut RuntimeState,
350 envelope: &Envelope,
351 seq: u64,
352) -> Option<&'a mut ItemState> {
353 let (thread_id, turn_id, item_id) = thread_turn_item_ids(envelope)?;
354 let thread = thread_mut(state, thread_id, seq);
355 let turn = turn_mut(thread, turn_id, seq);
356 Some(item_mut(turn, item_id, seq))
357}
358
359fn thread_mut<'a>(state: &'a mut RuntimeState, thread_id: &str, seq: u64) -> &'a mut ThreadState {
360 let thread = state
361 .threads
362 .entry(thread_id.to_owned())
363 .or_insert_with(|| ThreadState {
364 id: thread_id.to_owned(),
365 active_turn: None,
366 turns: HashMap::new(),
367 last_diff: None,
368 plan: None,
369 last_seq: seq,
370 });
371 thread.last_seq = seq;
372 thread
373}
374
375fn turn_mut<'a>(thread: &'a mut ThreadState, turn_id: &str, seq: u64) -> &'a mut TurnState {
376 thread.last_seq = seq;
377 let turn = thread
378 .turns
379 .entry(turn_id.to_owned())
380 .or_insert_with(|| TurnState {
381 id: turn_id.to_owned(),
382 status: TurnStatus::InProgress,
383 items: HashMap::new(),
384 error: None,
385 last_seq: seq,
386 });
387 turn.last_seq = seq;
388 turn
389}
390
391fn item_mut<'a>(turn: &'a mut TurnState, item_id: &str, seq: u64) -> &'a mut ItemState {
392 turn.last_seq = seq;
393 let item = turn
394 .items
395 .entry(item_id.to_owned())
396 .or_insert_with(|| ItemState {
397 id: item_id.to_owned(),
398 item_type: "unknown".to_owned(),
399 started: None,
400 completed: None,
401 text_accum: String::new(),
402 stdout_accum: String::new(),
403 stderr_accum: String::new(),
404 text_truncated: false,
405 stdout_truncated: false,
406 stderr_truncated: false,
407 last_seq: seq,
408 });
409 item.last_seq = seq;
410 item
411}
412
413fn append_capped(out: &mut String, delta: &str, max_bytes: usize, truncated: &mut bool) {
414 if delta.is_empty() {
415 return;
416 }
417 if out.len() >= max_bytes {
418 *truncated = true;
419 return;
420 }
421 let remain = max_bytes - out.len();
422 if delta.len() <= remain {
423 out.push_str(delta);
424 return;
425 }
426 let mut cut = remain;
427 while cut > 0 && !delta.is_char_boundary(cut) {
428 cut -= 1;
429 }
430 if cut > 0 {
431 out.push_str(&delta[..cut]);
432 }
433 *truncated = true;
434}
435
436fn prune_state(
437 state: &mut RuntimeState,
438 limits: &StateProjectionLimits,
439 touched_thread_id: Option<&str>,
440) {
441 if state.threads.len() > limits.max_threads {
442 let remove_count = state.threads.len() - limits.max_threads;
443 let mut by_age: Vec<(String, u64)> = state
444 .threads
445 .iter()
446 .map(|(id, thread)| (id.clone(), thread.last_seq))
447 .collect();
448 if remove_count > 0 {
449 by_age.select_nth_unstable_by_key(remove_count - 1, |(_, seq)| *seq);
450 }
451 for (id, _) in by_age.into_iter().take(remove_count) {
452 state.threads.remove(&id);
453 }
454 }
455
456 let Some(thread_id) = touched_thread_id else {
457 return;
458 };
459 let Some(thread) = state.threads.get_mut(thread_id) else {
460 return;
461 };
462
463 prune_turns(thread, limits.max_turns_per_thread);
464 for turn in thread.turns.values_mut() {
465 prune_items(turn, limits.max_items_per_turn);
466 }
467}
468
469fn prune_turns(thread: &mut ThreadState, max_turns: usize) {
470 if thread.turns.len() <= max_turns {
471 return;
472 }
473
474 let active = thread.active_turn.as_deref();
475 let mut candidates: Vec<(String, u64)> = thread
476 .turns
477 .iter()
478 .filter(|(id, _)| Some(id.as_str()) != active)
479 .map(|(id, turn)| (id.clone(), turn.last_seq))
480 .collect();
481
482 let removable = thread.turns.len().saturating_sub(max_turns);
483 if removable > 0 && !candidates.is_empty() {
484 let partition_idx = std::cmp::min(removable - 1, candidates.len() - 1);
485 candidates.select_nth_unstable_by_key(partition_idx, |(_, seq)| *seq);
486 }
487
488 for (id, _) in candidates.into_iter().take(removable) {
489 thread.turns.remove(&id);
490 }
491}
492
493fn prune_items(turn: &mut TurnState, max_items: usize) {
494 if turn.items.len() <= max_items {
495 return;
496 }
497
498 let remove_count = turn.items.len() - max_items;
499 let mut by_age: Vec<(String, u64)> = turn
500 .items
501 .iter()
502 .map(|(id, item)| (id.clone(), item.last_seq))
503 .collect();
504 if remove_count > 0 {
505 let partition_idx = std::cmp::min(remove_count - 1, by_age.len() - 1);
506 by_age.select_nth_unstable_by_key(partition_idx, |(_, seq)| *seq);
507 }
508 for (id, _) in by_age.into_iter().take(remove_count) {
509 turn.items.remove(&id);
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use serde_json::json;
516 use std::sync::Arc;
517
518 use crate::runtime::events::{Direction, Envelope, MsgKind};
519
520 use super::*;
521
522 fn envelope_with_seq(
523 seq: u64,
524 method: &str,
525 thread: &str,
526 turn: &str,
527 item: Option<&str>,
528 params: Value,
529 ) -> Envelope {
530 Envelope {
531 seq,
532 ts_millis: 0,
533 direction: Direction::Inbound,
534 kind: MsgKind::Notification,
535 rpc_id: None,
536 method: Some(Arc::from(method)),
537 thread_id: Some(Arc::from(thread)),
538 turn_id: Some(Arc::from(turn)),
539 item_id: item.map(Arc::from),
540 json: Arc::new(json!({"method": method, "params": params})),
541 }
542 }
543
544 fn envelope(
545 method: &str,
546 thread: &str,
547 turn: &str,
548 item: Option<&str>,
549 params: Value,
550 ) -> Envelope {
551 envelope_with_seq(1, method, thread, turn, item, params)
552 }
553
554 #[test]
555 fn reduce_turn_lifecycle() {
556 let state = RuntimeState::default();
557
558 let state = reduce(
559 state,
560 &envelope("turn/started", "thr", "turn", None, json!({})),
561 );
562 assert_eq!(state.threads["thr"].active_turn.as_deref(), Some("turn"));
563 assert_eq!(
564 state.threads["thr"].turns["turn"].status,
565 TurnStatus::InProgress
566 );
567
568 let state = reduce(
569 state,
570 &envelope("turn/completed", "thr", "turn", None, json!({})),
571 );
572 assert_eq!(state.threads["thr"].active_turn, None);
573 assert_eq!(
574 state.threads["thr"].turns["turn"].status,
575 TurnStatus::Completed
576 );
577 }
578
579 #[test]
580 fn reduce_turn_cancelled_marks_cancelled_and_clears_active_turn() {
581 let state = RuntimeState::default();
582
583 let state = reduce(
584 state,
585 &envelope("turn/started", "thr", "turn", None, json!({})),
586 );
587 assert_eq!(state.threads["thr"].active_turn.as_deref(), Some("turn"));
588
589 let state = reduce(
590 state,
591 &envelope("turn/cancelled", "thr", "turn", None, json!({})),
592 );
593 assert_eq!(state.threads["thr"].active_turn, None);
594 assert_eq!(
595 state.threads["thr"].turns["turn"].status,
596 TurnStatus::Cancelled
597 );
598 }
599
600 #[test]
601 fn reduce_delta_and_output() {
602 let state = RuntimeState::default();
603 let state = reduce(
604 state,
605 &envelope("turn/started", "thr", "turn", None, json!({})),
606 );
607 let state = reduce(
608 state,
609 &envelope(
610 "item/started",
611 "thr",
612 "turn",
613 Some("item"),
614 json!({"itemType":"agentMessage"}),
615 ),
616 );
617 let state = reduce(
618 state,
619 &envelope(
620 "item/agentMessage/delta",
621 "thr",
622 "turn",
623 Some("item"),
624 json!({"delta":"hello"}),
625 ),
626 );
627
628 let state = reduce(
629 state,
630 &envelope(
631 "item/commandExecution/outputDelta",
632 "thr",
633 "turn",
634 Some("item"),
635 json!({"stdout":"out","stderr":"err"}),
636 ),
637 );
638
639 let item = &state.threads["thr"].turns["turn"].items["item"];
640 assert_eq!(item.text_accum, "hello");
641 assert_eq!(item.stdout_accum, "out");
642 assert_eq!(item.stderr_accum, "err");
643 }
644
645 #[test]
646 fn reduce_applies_text_caps_and_marks_truncated() {
647 let mut state = RuntimeState::default();
648 let limits = StateProjectionLimits {
649 max_threads: 8,
650 max_turns_per_thread: 8,
651 max_items_per_turn: 8,
652 max_text_bytes_per_item: 4,
653 max_stdout_bytes_per_item: 3,
654 max_stderr_bytes_per_item: 2,
655 };
656
657 reduce_in_place_with_limits(
658 &mut state,
659 &envelope_with_seq(
660 1,
661 "item/started",
662 "thr",
663 "turn",
664 Some("item"),
665 json!({"itemType":"agentMessage"}),
666 ),
667 &limits,
668 );
669 reduce_in_place_with_limits(
670 &mut state,
671 &envelope_with_seq(
672 2,
673 "item/agentMessage/delta",
674 "thr",
675 "turn",
676 Some("item"),
677 json!({"delta":"hello"}),
678 ),
679 &limits,
680 );
681 reduce_in_place_with_limits(
682 &mut state,
683 &envelope_with_seq(
684 3,
685 "item/commandExecution/outputDelta",
686 "thr",
687 "turn",
688 Some("item"),
689 json!({"stdout":"abcd","stderr":"xyz"}),
690 ),
691 &limits,
692 );
693
694 let item = &state.threads["thr"].turns["turn"].items["item"];
695 assert_eq!(item.text_accum, "hell");
696 assert!(item.text_truncated);
697 assert_eq!(item.stdout_accum, "abc");
698 assert!(item.stdout_truncated);
699 assert_eq!(item.stderr_accum, "xy");
700 assert!(item.stderr_truncated);
701 }
702
703 #[test]
704 fn reduce_prunes_old_threads_turns_and_items() {
705 let mut state = RuntimeState::default();
706 let limits = StateProjectionLimits {
707 max_threads: 2,
708 max_turns_per_thread: 2,
709 max_items_per_turn: 2,
710 max_text_bytes_per_item: 1024,
711 max_stdout_bytes_per_item: 1024,
712 max_stderr_bytes_per_item: 1024,
713 };
714
715 reduce_in_place_with_limits(
716 &mut state,
717 &envelope_with_seq(1, "thread/started", "thr_1", "turn_a", None, json!({})),
718 &limits,
719 );
720 reduce_in_place_with_limits(
721 &mut state,
722 &envelope_with_seq(2, "thread/started", "thr_2", "turn_a", None, json!({})),
723 &limits,
724 );
725 reduce_in_place_with_limits(
726 &mut state,
727 &envelope_with_seq(3, "thread/started", "thr_3", "turn_a", None, json!({})),
728 &limits,
729 );
730 assert!(!state.threads.contains_key("thr_1"));
731 assert!(state.threads.contains_key("thr_2"));
732 assert!(state.threads.contains_key("thr_3"));
733
734 for seq in 10..=12 {
735 let turn = format!("turn_{seq}");
736 reduce_in_place_with_limits(
737 &mut state,
738 &envelope_with_seq(
739 seq,
740 "turn/started",
741 "thr_3",
742 &turn,
743 None,
744 json!({ "threadId":"thr_3", "turnId": turn }),
745 ),
746 &limits,
747 );
748 }
749 let thr = state.threads.get("thr_3").expect("thread");
750 assert!(thr.turns.len() <= 2);
751
752 let turn_id = thr.active_turn.clone().expect("active turn");
753 for seq in 20..=22 {
754 let item = format!("item_{seq}");
755 reduce_in_place_with_limits(
756 &mut state,
757 &envelope_with_seq(
758 seq,
759 "item/started",
760 "thr_3",
761 &turn_id,
762 Some(&item),
763 json!({"itemType":"agentMessage"}),
764 ),
765 &limits,
766 );
767 }
768
769 let thr = state.threads.get("thr_3").expect("thread");
770 let turn = thr.turns.get(&turn_id).expect("turn");
771 assert!(turn.items.len() <= 2);
772 }
773
774 #[test]
775 fn reduce_drops_stale_turn_event_by_sequence() {
776 let mut state = RuntimeState::default();
777
778 reduce_in_place(
779 &mut state,
780 &envelope_with_seq(10, "turn/started", "thr", "turn", None, json!({})),
781 );
782 reduce_in_place(
783 &mut state,
784 &envelope_with_seq(11, "turn/completed", "thr", "turn", None, json!({})),
785 );
786 reduce_in_place(
787 &mut state,
788 &envelope_with_seq(
789 9,
790 "turn/failed",
791 "thr",
792 "turn",
793 None,
794 json!({"error":{"message":"stale"}}),
795 ),
796 );
797
798 let turn = &state.threads["thr"].turns["turn"];
799 assert_eq!(turn.status, TurnStatus::Completed);
800 assert_eq!(turn.error, None);
801 assert_eq!(turn.last_seq, 11);
802 assert_eq!(state.threads["thr"].last_seq, 11);
803 }
804
805 #[test]
806 fn reduce_drops_stale_item_delta_by_sequence() {
807 let mut state = RuntimeState::default();
808
809 reduce_in_place(
810 &mut state,
811 &envelope_with_seq(
812 1,
813 "item/started",
814 "thr",
815 "turn",
816 Some("item"),
817 json!({"itemType":"agentMessage"}),
818 ),
819 );
820 reduce_in_place(
821 &mut state,
822 &envelope_with_seq(
823 3,
824 "item/agentMessage/delta",
825 "thr",
826 "turn",
827 Some("item"),
828 json!({"delta":"new"}),
829 ),
830 );
831 reduce_in_place(
832 &mut state,
833 &envelope_with_seq(
834 2,
835 "item/agentMessage/delta",
836 "thr",
837 "turn",
838 Some("item"),
839 json!({"delta":"old"}),
840 ),
841 );
842
843 let item = &state.threads["thr"].turns["turn"].items["item"];
844 assert_eq!(item.text_accum, "new");
845 assert_eq!(item.last_seq, 3);
846 assert_eq!(state.threads["thr"].last_seq, 3);
847 }
848}