Skip to main content

roder_api/thread/
projection.rs

1use time::OffsetDateTime;
2
3use crate::events::{EventEnvelope, RoderEvent, ThreadId};
4
5use super::{
6    ThreadItem, ThreadItemDelta, ThreadItemEvent, ThreadItemEventKind, ThreadItemStatus,
7    ThreadItemTurnRecord, TurnRecord,
8};
9
10/// Projects raw runtime event envelopes into per-turn transcript records.
11///
12/// Thread stores that persist `EventEnvelope` logs can use this to populate a
13/// snapshot's `turns` field without duplicating the runtime replay rules.
14pub fn project_turns_from_events(
15    thread_id: &ThreadId,
16    events: &[EventEnvelope],
17) -> Vec<TurnRecord> {
18    let mut turns = Vec::new();
19    for envelope in events {
20        match &envelope.event {
21            RoderEvent::TurnStarted(event) => {
22                ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
23            }
24            RoderEvent::TranscriptItemAppended(event) => {
25                let turn =
26                    ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
27                if let Some(item) = &event.item {
28                    turn.items.push(item.clone());
29                }
30            }
31            RoderEvent::TurnCompleted(event) => {
32                let turn =
33                    ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
34                turn.completed_at = Some(event.timestamp);
35                turn.usage = event.usage.clone();
36                turn.finish_reason = event.finish_reason.clone();
37            }
38            RoderEvent::TurnFailed(event) => {
39                let turn =
40                    ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
41                turn.completed_at = Some(event.timestamp);
42                turn.usage = event.usage.clone();
43            }
44            RoderEvent::TurnInterrupted(event) => {
45                let turn =
46                    ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
47                turn.completed_at = Some(event.timestamp);
48            }
49            _ => continue,
50        }
51    }
52    turns
53}
54
55fn ensure_turn_record<'a>(
56    turns: &'a mut Vec<TurnRecord>,
57    thread_id: &ThreadId,
58    turn_id: &str,
59    created_at: OffsetDateTime,
60) -> &'a mut TurnRecord {
61    if let Some(index) = turns.iter().position(|turn| turn.turn_id == turn_id) {
62        return &mut turns[index];
63    }
64    turns.push(TurnRecord {
65        thread_id: thread_id.clone(),
66        turn_id: turn_id.to_string(),
67        items: Vec::new(),
68        created_at,
69        completed_at: None,
70        usage: None,
71        finish_reason: None,
72    });
73    turns.last_mut().expect("turn was just pushed")
74}
75
76pub fn project_thread_item_events(events: &[ThreadItemEvent]) -> Vec<ThreadItemTurnRecord> {
77    let mut turns = Vec::<ThreadItemTurnRecord>::new();
78    for event in events {
79        let turn_index = turns
80            .iter()
81            .position(|turn| turn.turn_id == event.turn_id)
82            .unwrap_or_else(|| {
83                turns.push(ThreadItemTurnRecord {
84                    thread_id: event.thread_id.clone(),
85                    turn_id: event.turn_id.clone(),
86                    created_at: event.timestamp,
87                    items: Vec::new(),
88                });
89                turns.len() - 1
90            });
91        apply_thread_item_event(&mut turns[turn_index].items, &event.event);
92    }
93    turns
94}
95
96fn apply_thread_item_event(items: &mut Vec<ThreadItem>, event: &ThreadItemEventKind) {
97    match event {
98        ThreadItemEventKind::ItemStarted { item } => {
99            if !upsert_item_if_missing(items, item.clone()) {
100                merge_started_item(items, item);
101            }
102        }
103        ThreadItemEventKind::ItemDelta { item_id, delta } => {
104            let index = item_index(items, item_id).unwrap_or_else(|| {
105                items.push(item_for_delta(item_id, delta));
106                items.len() - 1
107            });
108            apply_thread_item_delta(&mut items[index], delta);
109        }
110        ThreadItemEventKind::ItemCompleted { item } => {
111            if let Some(index) = item_index(items, item.id()) {
112                merge_completed_item(&mut items[index], item.clone());
113            } else {
114                items.push(item.clone());
115            }
116        }
117    }
118}
119
120fn upsert_item_if_missing(items: &mut Vec<ThreadItem>, item: ThreadItem) -> bool {
121    if item_index(items, item.id()).is_some() {
122        return false;
123    }
124    items.push(item);
125    true
126}
127
128fn merge_started_item(items: &mut [ThreadItem], incoming: &ThreadItem) {
129    if let Some(index) = item_index(items, incoming.id()) {
130        merge_completed_item(&mut items[index], incoming.clone());
131    }
132}
133
134fn item_index(items: &[ThreadItem], item_id: &str) -> Option<usize> {
135    items.iter().position(|item| item.id() == item_id)
136}
137
138fn item_for_delta(item_id: &str, delta: &ThreadItemDelta) -> ThreadItem {
139    match delta {
140        ThreadItemDelta::AgentMessageText { phase, .. } => ThreadItem::AgentMessage {
141            id: item_id.to_string(),
142            text: String::new(),
143            phase: phase.clone(),
144            status: Some(ThreadItemStatus::InProgress),
145        },
146        ThreadItemDelta::ReasoningText { content_index, .. } => ThreadItem::Reasoning {
147            id: item_id.to_string(),
148            summary: Vec::new(),
149            content: vec![String::new(); content_index + 1],
150            status: Some(ThreadItemStatus::InProgress),
151        },
152        ThreadItemDelta::ReasoningSummaryPartAdded { summary_index }
153        | ThreadItemDelta::ReasoningSummaryText { summary_index, .. } => ThreadItem::Reasoning {
154            id: item_id.to_string(),
155            summary: vec![String::new(); summary_index + 1],
156            content: Vec::new(),
157            status: Some(ThreadItemStatus::InProgress),
158        },
159    }
160}
161
162fn apply_thread_item_delta(item: &mut ThreadItem, delta: &ThreadItemDelta) {
163    match (item, delta) {
164        (
165            ThreadItem::AgentMessage {
166                text,
167                phase,
168                status,
169                ..
170            },
171            ThreadItemDelta::AgentMessageText {
172                delta,
173                phase: delta_phase,
174            },
175        ) => {
176            text.push_str(delta);
177            if phase.is_none() {
178                *phase = delta_phase.clone();
179            }
180            *status = Some(ThreadItemStatus::InProgress);
181        }
182        (
183            ThreadItem::Reasoning {
184                content, status, ..
185            },
186            ThreadItemDelta::ReasoningText {
187                delta,
188                content_index,
189            },
190        ) => {
191            ensure_vec_slot(content, *content_index);
192            content[*content_index].push_str(delta);
193            *status = Some(ThreadItemStatus::InProgress);
194        }
195        (
196            ThreadItem::Reasoning {
197                summary, status, ..
198            },
199            ThreadItemDelta::ReasoningSummaryPartAdded { summary_index },
200        ) => {
201            ensure_vec_slot(summary, *summary_index);
202            *status = Some(ThreadItemStatus::InProgress);
203        }
204        (
205            ThreadItem::Reasoning {
206                summary, status, ..
207            },
208            ThreadItemDelta::ReasoningSummaryText {
209                delta,
210                summary_index,
211            },
212        ) => {
213            ensure_vec_slot(summary, *summary_index);
214            summary[*summary_index].push_str(delta);
215            *status = Some(ThreadItemStatus::InProgress);
216        }
217        (item, delta) => {
218            *item = item_for_delta(item.id(), delta);
219            apply_thread_item_delta(item, delta);
220        }
221    }
222}
223
224fn ensure_vec_slot(values: &mut Vec<String>, index: usize) {
225    while values.len() <= index {
226        values.push(String::new());
227    }
228}
229
230fn merge_completed_item(existing: &mut ThreadItem, incoming: ThreadItem) {
231    match (&mut *existing, incoming) {
232        (
233            ThreadItem::Reasoning {
234                summary,
235                content,
236                status,
237                ..
238            },
239            ThreadItem::Reasoning {
240                summary: incoming_summary,
241                content: incoming_content,
242                status: incoming_status,
243                ..
244            },
245        ) => {
246            if !incoming_summary.is_empty() {
247                *summary = incoming_summary;
248            }
249            if !incoming_content.is_empty() {
250                *content = incoming_content;
251            }
252            *status = incoming_status.or(Some(ThreadItemStatus::Completed));
253        }
254        (
255            ThreadItem::ToolExecution {
256                status,
257                input,
258                output,
259                error,
260                ..
261            },
262            ThreadItem::ToolExecution {
263                status: incoming_status,
264                input: incoming_input,
265                output: incoming_output,
266                error: incoming_error,
267                ..
268            },
269        ) => {
270            *status = incoming_status;
271            if incoming_input.is_some() {
272                *input = incoming_input;
273            }
274            if incoming_output.is_some() {
275                *output = incoming_output;
276            }
277            if incoming_error.is_some() {
278                *error = incoming_error;
279            }
280        }
281        (slot, incoming) => *slot = incoming,
282    }
283}