Skip to main content

codex_runtime/runtime/
state.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5
6use crate::runtime::approvals::PendingServerRequest;
7use crate::runtime::events::Envelope;
8use crate::runtime::rpc_contract::methods as events;
9
10#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
11#[serde(rename_all = "camelCase")]
12pub enum ConnectionState {
13    Starting,
14    Handshaking,
15    Running { generation: u64 },
16    Restarting { generation: u64 },
17    ShuttingDown,
18    Dead,
19}
20
21#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
22#[serde(rename_all = "camelCase")]
23pub struct RuntimeState {
24    pub connection: ConnectionState,
25    pub threads: HashMap<String, ThreadState>,
26    pub pending_server_requests: HashMap<String, PendingServerRequest>,
27}
28
29#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
30#[serde(rename_all = "camelCase")]
31pub struct StateProjectionLimits {
32    pub max_threads: usize,
33    pub max_turns_per_thread: usize,
34    pub max_items_per_turn: usize,
35    pub max_text_bytes_per_item: usize,
36    pub max_stdout_bytes_per_item: usize,
37    pub max_stderr_bytes_per_item: usize,
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
41#[serde(rename_all = "camelCase")]
42pub struct ThreadState {
43    pub id: String,
44    pub active_turn: Option<String>,
45    pub turns: HashMap<String, TurnState>,
46    pub last_diff: Option<String>,
47    pub plan: Option<Value>,
48    pub last_seq: u64,
49}
50
51#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
52#[serde(rename_all = "camelCase")]
53pub enum TurnStatus {
54    InProgress,
55    Completed,
56    Failed,
57    Cancelled,
58    Interrupted,
59}
60
61#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
62#[serde(rename_all = "camelCase")]
63pub struct TurnState {
64    pub id: String,
65    pub status: TurnStatus,
66    pub items: HashMap<String, ItemState>,
67    pub error: Option<Value>,
68    pub last_seq: u64,
69}
70
71#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(rename_all = "camelCase")]
73pub struct ItemState {
74    pub id: String,
75    pub item_type: String,
76    pub started: Option<Value>,
77    pub completed: Option<Value>,
78    pub text_accum: String,
79    pub stdout_accum: String,
80    pub stderr_accum: String,
81    pub text_truncated: bool,
82    pub stdout_truncated: bool,
83    pub stderr_truncated: bool,
84    pub last_seq: u64,
85}
86
87impl Default for RuntimeState {
88    fn default() -> Self {
89        Self {
90            connection: ConnectionState::Starting,
91            threads: HashMap::new(),
92            pending_server_requests: HashMap::new(),
93        }
94    }
95}
96
97impl Default for StateProjectionLimits {
98    fn default() -> Self {
99        Self {
100            max_threads: 256,
101            max_turns_per_thread: 256,
102            max_items_per_turn: 256,
103            max_text_bytes_per_item: 256 * 1024,
104            max_stdout_bytes_per_item: 256 * 1024,
105            max_stderr_bytes_per_item: 256 * 1024,
106        }
107    }
108}
109
110/// Pure reducer: consumes old state + envelope and returns next state.
111/// Delegates to `reduce_in_place_with_limits` with default retention limits.
112pub fn reduce(mut state: RuntimeState, envelope: &Envelope) -> RuntimeState {
113    reduce_in_place_with_limits(&mut state, envelope, &StateProjectionLimits::default());
114    state
115}
116
117/// In-place reducer used by runtime projection.
118/// Delegates to `reduce_in_place_with_limits` with default retention limits.
119pub fn reduce_in_place(state: &mut RuntimeState, envelope: &Envelope) {
120    reduce_in_place_with_limits(state, envelope, &StateProjectionLimits::default());
121}
122
123/// In-place reducer with explicit retention bounds for long-running runtimes.
124/// Allocation: new map entries + appended deltas; prune candidate vectors are allocated only
125/// when a cap is exceeded.
126/// Complexity: O(1) average map work per event, plus O(t) touched-thread item-cap checks
127/// (t <= max_turns_per_thread after pruning), and O(n log n) only when sorting eviction
128/// candidates for thread/turn/item pruning.
129pub fn reduce_in_place_with_limits(
130    state: &mut RuntimeState,
131    envelope: &Envelope,
132    limits: &StateProjectionLimits,
133) {
134    let Some(method) = envelope.method.as_deref() else {
135        return;
136    };
137    let seq = envelope.seq;
138    let touched_thread_id = envelope.thread_id.as_deref();
139    if is_stale_thread_event(state, touched_thread_id, seq) {
140        return;
141    }
142
143    match method {
144        events::THREAD_STARTED => handle_thread_started(state, envelope, seq),
145        events::TURN_STARTED => handle_turn_started(state, envelope, seq),
146        events::TURN_COMPLETED => {
147            handle_turn_terminal(state, envelope, seq, TurnStatus::Completed, false)
148        }
149        events::TURN_FAILED => handle_turn_terminal(state, envelope, seq, TurnStatus::Failed, true),
150        events::TURN_CANCELLED => {
151            handle_turn_terminal(state, envelope, seq, TurnStatus::Cancelled, false)
152        }
153        events::TURN_INTERRUPTED => {
154            handle_turn_terminal(state, envelope, seq, TurnStatus::Interrupted, false)
155        }
156        events::TURN_DIFF_UPDATED => handle_turn_diff_updated(state, envelope, seq),
157        events::TURN_PLAN_UPDATED => handle_turn_plan_updated(state, envelope, seq),
158        events::ITEM_STARTED => handle_item_started(state, envelope, seq),
159        events::ITEM_AGENT_MESSAGE_DELTA => {
160            handle_item_agent_message_delta(state, envelope, seq, limits)
161        }
162        events::ITEM_COMMAND_EXECUTION_OUTPUT_DELTA => {
163            handle_item_command_output_delta(state, envelope, seq, limits)
164        }
165        events::ITEM_COMPLETED => handle_item_completed(state, envelope, seq),
166        _ => {}
167    }
168
169    prune_state(state, limits, touched_thread_id);
170}
171
172fn is_stale_thread_event(state: &RuntimeState, thread_id: Option<&str>, seq: u64) -> bool {
173    let Some(thread_id) = thread_id else {
174        return false;
175    };
176    state
177        .threads
178        .get(thread_id)
179        .is_some_and(|thread| seq < thread.last_seq)
180}
181
182fn handle_thread_started(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
183    let Some(thread_id) = envelope.thread_id.as_deref() else {
184        return;
185    };
186    thread_mut(state, thread_id, seq);
187}
188
189fn handle_turn_started(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
190    let Some((thread_id, turn_id)) = thread_and_turn_ids(envelope) else {
191        return;
192    };
193    let thread = thread_mut(state, thread_id, seq);
194    thread.active_turn = Some(turn_id.to_owned());
195    let turn = turn_mut(thread, turn_id, seq);
196    turn.status = TurnStatus::InProgress;
197}
198
199fn handle_turn_terminal(
200    state: &mut RuntimeState,
201    envelope: &Envelope,
202    seq: u64,
203    status: TurnStatus,
204    with_error: bool,
205) {
206    let Some((thread_id, turn_id)) = thread_and_turn_ids(envelope) else {
207        return;
208    };
209    let thread = thread_mut(state, thread_id, seq);
210    clear_active_turn_if_matching(thread, turn_id);
211    let turn = turn_mut(thread, turn_id, seq);
212    turn.status = status;
213    if with_error {
214        turn.error = envelope
215            .json
216            .get("params")
217            .and_then(|p| p.get("error"))
218            .cloned();
219    }
220}
221
222fn clear_active_turn_if_matching(thread: &mut ThreadState, turn_id: &str) {
223    if thread.active_turn.as_deref() == Some(turn_id) {
224        thread.active_turn = None;
225    }
226}
227
228fn handle_turn_diff_updated(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
229    let Some(thread_id) = envelope.thread_id.as_deref() else {
230        return;
231    };
232    let thread = thread_mut(state, thread_id, seq);
233    thread.last_diff = envelope
234        .json
235        .get("params")
236        .and_then(|p| p.get("diff"))
237        .and_then(Value::as_str)
238        .map(ToOwned::to_owned);
239}
240
241fn handle_turn_plan_updated(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
242    let Some(thread_id) = envelope.thread_id.as_deref() else {
243        return;
244    };
245    let thread = thread_mut(state, thread_id, seq);
246    thread.plan = envelope
247        .json
248        .get("params")
249        .and_then(|p| p.get("plan"))
250        .cloned();
251}
252
253fn handle_item_started(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
254    let Some(item) = item_from_envelope(state, envelope, seq) else {
255        return;
256    };
257    item.started = envelope.json.get("params").cloned();
258    item.item_type = envelope
259        .json
260        .get("params")
261        .and_then(|p| p.get("itemType"))
262        .and_then(Value::as_str)
263        .unwrap_or("unknown")
264        .to_owned();
265}
266
267fn handle_item_agent_message_delta(
268    state: &mut RuntimeState,
269    envelope: &Envelope,
270    seq: u64,
271    limits: &StateProjectionLimits,
272) {
273    let delta = envelope
274        .json
275        .get("params")
276        .and_then(|p| p.get("delta"))
277        .and_then(Value::as_str)
278        .unwrap_or("");
279    let Some(item) = item_from_envelope(state, envelope, seq) else {
280        return;
281    };
282    append_capped(
283        &mut item.text_accum,
284        delta,
285        limits.max_text_bytes_per_item,
286        &mut item.text_truncated,
287    );
288}
289
290fn handle_item_command_output_delta(
291    state: &mut RuntimeState,
292    envelope: &Envelope,
293    seq: u64,
294    limits: &StateProjectionLimits,
295) {
296    let stdout = envelope
297        .json
298        .get("params")
299        .and_then(|p| p.get("stdout"))
300        .and_then(Value::as_str)
301        .unwrap_or("");
302    let stderr = envelope
303        .json
304        .get("params")
305        .and_then(|p| p.get("stderr"))
306        .and_then(Value::as_str)
307        .unwrap_or("");
308
309    let Some(item) = item_from_envelope(state, envelope, seq) else {
310        return;
311    };
312    append_capped(
313        &mut item.stdout_accum,
314        stdout,
315        limits.max_stdout_bytes_per_item,
316        &mut item.stdout_truncated,
317    );
318    append_capped(
319        &mut item.stderr_accum,
320        stderr,
321        limits.max_stderr_bytes_per_item,
322        &mut item.stderr_truncated,
323    );
324}
325
326fn handle_item_completed(state: &mut RuntimeState, envelope: &Envelope, seq: u64) {
327    let Some(item) = item_from_envelope(state, envelope, seq) else {
328        return;
329    };
330    item.completed = envelope.json.get("params").cloned();
331}
332
333fn thread_and_turn_ids(envelope: &Envelope) -> Option<(&str, &str)> {
334    let (Some(thread_id), Some(turn_id)) =
335        (envelope.thread_id.as_deref(), envelope.turn_id.as_deref())
336    else {
337        return None;
338    };
339    Some((thread_id, turn_id))
340}
341
342fn thread_turn_item_ids(envelope: &Envelope) -> Option<(&str, &str, &str)> {
343    let (thread_id, turn_id) = thread_and_turn_ids(envelope)?;
344    let item_id = envelope.item_id.as_deref()?;
345    Some((thread_id, turn_id, item_id))
346}
347
348fn item_from_envelope<'a>(
349    state: &'a mut RuntimeState,
350    envelope: &Envelope,
351    seq: u64,
352) -> Option<&'a mut ItemState> {
353    let (thread_id, turn_id, item_id) = thread_turn_item_ids(envelope)?;
354    let thread = thread_mut(state, thread_id, seq);
355    let turn = turn_mut(thread, turn_id, seq);
356    Some(item_mut(turn, item_id, seq))
357}
358
359fn thread_mut<'a>(state: &'a mut RuntimeState, thread_id: &str, seq: u64) -> &'a mut ThreadState {
360    let thread = state
361        .threads
362        .entry(thread_id.to_owned())
363        .or_insert_with(|| ThreadState {
364            id: thread_id.to_owned(),
365            active_turn: None,
366            turns: HashMap::new(),
367            last_diff: None,
368            plan: None,
369            last_seq: seq,
370        });
371    thread.last_seq = seq;
372    thread
373}
374
375fn turn_mut<'a>(thread: &'a mut ThreadState, turn_id: &str, seq: u64) -> &'a mut TurnState {
376    thread.last_seq = seq;
377    let turn = thread
378        .turns
379        .entry(turn_id.to_owned())
380        .or_insert_with(|| TurnState {
381            id: turn_id.to_owned(),
382            status: TurnStatus::InProgress,
383            items: HashMap::new(),
384            error: None,
385            last_seq: seq,
386        });
387    turn.last_seq = seq;
388    turn
389}
390
391fn item_mut<'a>(turn: &'a mut TurnState, item_id: &str, seq: u64) -> &'a mut ItemState {
392    turn.last_seq = seq;
393    let item = turn
394        .items
395        .entry(item_id.to_owned())
396        .or_insert_with(|| ItemState {
397            id: item_id.to_owned(),
398            item_type: "unknown".to_owned(),
399            started: None,
400            completed: None,
401            text_accum: String::new(),
402            stdout_accum: String::new(),
403            stderr_accum: String::new(),
404            text_truncated: false,
405            stdout_truncated: false,
406            stderr_truncated: false,
407            last_seq: seq,
408        });
409    item.last_seq = seq;
410    item
411}
412
413fn append_capped(out: &mut String, delta: &str, max_bytes: usize, truncated: &mut bool) {
414    if delta.is_empty() {
415        return;
416    }
417    if out.len() >= max_bytes {
418        *truncated = true;
419        return;
420    }
421    let remain = max_bytes - out.len();
422    if delta.len() <= remain {
423        out.push_str(delta);
424        return;
425    }
426    let mut cut = remain;
427    while cut > 0 && !delta.is_char_boundary(cut) {
428        cut -= 1;
429    }
430    if cut > 0 {
431        out.push_str(&delta[..cut]);
432    }
433    *truncated = true;
434}
435
436fn prune_state(
437    state: &mut RuntimeState,
438    limits: &StateProjectionLimits,
439    touched_thread_id: Option<&str>,
440) {
441    if state.threads.len() > limits.max_threads {
442        let remove_count = state.threads.len() - limits.max_threads;
443        let mut by_age: Vec<(String, u64)> = state
444            .threads
445            .iter()
446            .map(|(id, thread)| (id.clone(), thread.last_seq))
447            .collect();
448        if remove_count > 0 {
449            by_age.select_nth_unstable_by_key(remove_count - 1, |(_, seq)| *seq);
450        }
451        for (id, _) in by_age.into_iter().take(remove_count) {
452            state.threads.remove(&id);
453        }
454    }
455
456    let Some(thread_id) = touched_thread_id else {
457        return;
458    };
459    let Some(thread) = state.threads.get_mut(thread_id) else {
460        return;
461    };
462
463    prune_turns(thread, limits.max_turns_per_thread);
464    for turn in thread.turns.values_mut() {
465        prune_items(turn, limits.max_items_per_turn);
466    }
467}
468
469fn prune_turns(thread: &mut ThreadState, max_turns: usize) {
470    if thread.turns.len() <= max_turns {
471        return;
472    }
473
474    let active = thread.active_turn.as_deref();
475    let mut candidates: Vec<(String, u64)> = thread
476        .turns
477        .iter()
478        .filter(|(id, _)| Some(id.as_str()) != active)
479        .map(|(id, turn)| (id.clone(), turn.last_seq))
480        .collect();
481
482    let removable = thread.turns.len().saturating_sub(max_turns);
483    if removable > 0 && !candidates.is_empty() {
484        let partition_idx = std::cmp::min(removable - 1, candidates.len() - 1);
485        candidates.select_nth_unstable_by_key(partition_idx, |(_, seq)| *seq);
486    }
487
488    for (id, _) in candidates.into_iter().take(removable) {
489        thread.turns.remove(&id);
490    }
491}
492
493fn prune_items(turn: &mut TurnState, max_items: usize) {
494    if turn.items.len() <= max_items {
495        return;
496    }
497
498    let remove_count = turn.items.len() - max_items;
499    let mut by_age: Vec<(String, u64)> = turn
500        .items
501        .iter()
502        .map(|(id, item)| (id.clone(), item.last_seq))
503        .collect();
504    if remove_count > 0 {
505        let partition_idx = std::cmp::min(remove_count - 1, by_age.len() - 1);
506        by_age.select_nth_unstable_by_key(partition_idx, |(_, seq)| *seq);
507    }
508    for (id, _) in by_age.into_iter().take(remove_count) {
509        turn.items.remove(&id);
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use serde_json::json;
516    use std::sync::Arc;
517
518    use crate::runtime::events::{Direction, Envelope, MsgKind};
519
520    use super::*;
521
522    fn envelope_with_seq(
523        seq: u64,
524        method: &str,
525        thread: &str,
526        turn: &str,
527        item: Option<&str>,
528        params: Value,
529    ) -> Envelope {
530        Envelope {
531            seq,
532            ts_millis: 0,
533            direction: Direction::Inbound,
534            kind: MsgKind::Notification,
535            rpc_id: None,
536            method: Some(Arc::from(method)),
537            thread_id: Some(Arc::from(thread)),
538            turn_id: Some(Arc::from(turn)),
539            item_id: item.map(Arc::from),
540            json: Arc::new(json!({"method": method, "params": params})),
541        }
542    }
543
544    fn envelope(
545        method: &str,
546        thread: &str,
547        turn: &str,
548        item: Option<&str>,
549        params: Value,
550    ) -> Envelope {
551        envelope_with_seq(1, method, thread, turn, item, params)
552    }
553
554    #[test]
555    fn reduce_turn_lifecycle() {
556        let state = RuntimeState::default();
557
558        let state = reduce(
559            state,
560            &envelope("turn/started", "thr", "turn", None, json!({})),
561        );
562        assert_eq!(state.threads["thr"].active_turn.as_deref(), Some("turn"));
563        assert_eq!(
564            state.threads["thr"].turns["turn"].status,
565            TurnStatus::InProgress
566        );
567
568        let state = reduce(
569            state,
570            &envelope("turn/completed", "thr", "turn", None, json!({})),
571        );
572        assert_eq!(state.threads["thr"].active_turn, None);
573        assert_eq!(
574            state.threads["thr"].turns["turn"].status,
575            TurnStatus::Completed
576        );
577    }
578
579    #[test]
580    fn reduce_turn_cancelled_marks_cancelled_and_clears_active_turn() {
581        let state = RuntimeState::default();
582
583        let state = reduce(
584            state,
585            &envelope("turn/started", "thr", "turn", None, json!({})),
586        );
587        assert_eq!(state.threads["thr"].active_turn.as_deref(), Some("turn"));
588
589        let state = reduce(
590            state,
591            &envelope("turn/cancelled", "thr", "turn", None, json!({})),
592        );
593        assert_eq!(state.threads["thr"].active_turn, None);
594        assert_eq!(
595            state.threads["thr"].turns["turn"].status,
596            TurnStatus::Cancelled
597        );
598    }
599
600    #[test]
601    fn reduce_delta_and_output() {
602        let state = RuntimeState::default();
603        let state = reduce(
604            state,
605            &envelope("turn/started", "thr", "turn", None, json!({})),
606        );
607        let state = reduce(
608            state,
609            &envelope(
610                "item/started",
611                "thr",
612                "turn",
613                Some("item"),
614                json!({"itemType":"agentMessage"}),
615            ),
616        );
617        let state = reduce(
618            state,
619            &envelope(
620                "item/agentMessage/delta",
621                "thr",
622                "turn",
623                Some("item"),
624                json!({"delta":"hello"}),
625            ),
626        );
627
628        let state = reduce(
629            state,
630            &envelope(
631                "item/commandExecution/outputDelta",
632                "thr",
633                "turn",
634                Some("item"),
635                json!({"stdout":"out","stderr":"err"}),
636            ),
637        );
638
639        let item = &state.threads["thr"].turns["turn"].items["item"];
640        assert_eq!(item.text_accum, "hello");
641        assert_eq!(item.stdout_accum, "out");
642        assert_eq!(item.stderr_accum, "err");
643    }
644
645    #[test]
646    fn reduce_applies_text_caps_and_marks_truncated() {
647        let mut state = RuntimeState::default();
648        let limits = StateProjectionLimits {
649            max_threads: 8,
650            max_turns_per_thread: 8,
651            max_items_per_turn: 8,
652            max_text_bytes_per_item: 4,
653            max_stdout_bytes_per_item: 3,
654            max_stderr_bytes_per_item: 2,
655        };
656
657        reduce_in_place_with_limits(
658            &mut state,
659            &envelope_with_seq(
660                1,
661                "item/started",
662                "thr",
663                "turn",
664                Some("item"),
665                json!({"itemType":"agentMessage"}),
666            ),
667            &limits,
668        );
669        reduce_in_place_with_limits(
670            &mut state,
671            &envelope_with_seq(
672                2,
673                "item/agentMessage/delta",
674                "thr",
675                "turn",
676                Some("item"),
677                json!({"delta":"hello"}),
678            ),
679            &limits,
680        );
681        reduce_in_place_with_limits(
682            &mut state,
683            &envelope_with_seq(
684                3,
685                "item/commandExecution/outputDelta",
686                "thr",
687                "turn",
688                Some("item"),
689                json!({"stdout":"abcd","stderr":"xyz"}),
690            ),
691            &limits,
692        );
693
694        let item = &state.threads["thr"].turns["turn"].items["item"];
695        assert_eq!(item.text_accum, "hell");
696        assert!(item.text_truncated);
697        assert_eq!(item.stdout_accum, "abc");
698        assert!(item.stdout_truncated);
699        assert_eq!(item.stderr_accum, "xy");
700        assert!(item.stderr_truncated);
701    }
702
703    #[test]
704    fn reduce_prunes_old_threads_turns_and_items() {
705        let mut state = RuntimeState::default();
706        let limits = StateProjectionLimits {
707            max_threads: 2,
708            max_turns_per_thread: 2,
709            max_items_per_turn: 2,
710            max_text_bytes_per_item: 1024,
711            max_stdout_bytes_per_item: 1024,
712            max_stderr_bytes_per_item: 1024,
713        };
714
715        reduce_in_place_with_limits(
716            &mut state,
717            &envelope_with_seq(1, "thread/started", "thr_1", "turn_a", None, json!({})),
718            &limits,
719        );
720        reduce_in_place_with_limits(
721            &mut state,
722            &envelope_with_seq(2, "thread/started", "thr_2", "turn_a", None, json!({})),
723            &limits,
724        );
725        reduce_in_place_with_limits(
726            &mut state,
727            &envelope_with_seq(3, "thread/started", "thr_3", "turn_a", None, json!({})),
728            &limits,
729        );
730        assert!(!state.threads.contains_key("thr_1"));
731        assert!(state.threads.contains_key("thr_2"));
732        assert!(state.threads.contains_key("thr_3"));
733
734        for seq in 10..=12 {
735            let turn = format!("turn_{seq}");
736            reduce_in_place_with_limits(
737                &mut state,
738                &envelope_with_seq(
739                    seq,
740                    "turn/started",
741                    "thr_3",
742                    &turn,
743                    None,
744                    json!({ "threadId":"thr_3", "turnId": turn }),
745                ),
746                &limits,
747            );
748        }
749        let thr = state.threads.get("thr_3").expect("thread");
750        assert!(thr.turns.len() <= 2);
751
752        let turn_id = thr.active_turn.clone().expect("active turn");
753        for seq in 20..=22 {
754            let item = format!("item_{seq}");
755            reduce_in_place_with_limits(
756                &mut state,
757                &envelope_with_seq(
758                    seq,
759                    "item/started",
760                    "thr_3",
761                    &turn_id,
762                    Some(&item),
763                    json!({"itemType":"agentMessage"}),
764                ),
765                &limits,
766            );
767        }
768
769        let thr = state.threads.get("thr_3").expect("thread");
770        let turn = thr.turns.get(&turn_id).expect("turn");
771        assert!(turn.items.len() <= 2);
772    }
773
774    #[test]
775    fn reduce_drops_stale_turn_event_by_sequence() {
776        let mut state = RuntimeState::default();
777
778        reduce_in_place(
779            &mut state,
780            &envelope_with_seq(10, "turn/started", "thr", "turn", None, json!({})),
781        );
782        reduce_in_place(
783            &mut state,
784            &envelope_with_seq(11, "turn/completed", "thr", "turn", None, json!({})),
785        );
786        reduce_in_place(
787            &mut state,
788            &envelope_with_seq(
789                9,
790                "turn/failed",
791                "thr",
792                "turn",
793                None,
794                json!({"error":{"message":"stale"}}),
795            ),
796        );
797
798        let turn = &state.threads["thr"].turns["turn"];
799        assert_eq!(turn.status, TurnStatus::Completed);
800        assert_eq!(turn.error, None);
801        assert_eq!(turn.last_seq, 11);
802        assert_eq!(state.threads["thr"].last_seq, 11);
803    }
804
805    #[test]
806    fn reduce_drops_stale_item_delta_by_sequence() {
807        let mut state = RuntimeState::default();
808
809        reduce_in_place(
810            &mut state,
811            &envelope_with_seq(
812                1,
813                "item/started",
814                "thr",
815                "turn",
816                Some("item"),
817                json!({"itemType":"agentMessage"}),
818            ),
819        );
820        reduce_in_place(
821            &mut state,
822            &envelope_with_seq(
823                3,
824                "item/agentMessage/delta",
825                "thr",
826                "turn",
827                Some("item"),
828                json!({"delta":"new"}),
829            ),
830        );
831        reduce_in_place(
832            &mut state,
833            &envelope_with_seq(
834                2,
835                "item/agentMessage/delta",
836                "thr",
837                "turn",
838                Some("item"),
839                json!({"delta":"old"}),
840            ),
841        );
842
843        let item = &state.threads["thr"].turns["turn"].items["item"];
844        assert_eq!(item.text_accum, "new");
845        assert_eq!(item.last_seq, 3);
846        assert_eq!(state.threads["thr"].last_seq, 3);
847    }
848}