1use time::OffsetDateTime;
2
3use crate::events::{EventEnvelope, RoderEvent, ThreadId};
4
5use super::{
6 ThreadItem, ThreadItemDelta, ThreadItemEvent, ThreadItemEventKind, ThreadItemStatus,
7 ThreadItemTurnRecord, TurnRecord,
8};
9
10pub fn project_turns_from_events(
15 thread_id: &ThreadId,
16 events: &[EventEnvelope],
17) -> Vec<TurnRecord> {
18 let mut turns = Vec::new();
19 for envelope in events {
20 match &envelope.event {
21 RoderEvent::TurnStarted(event) => {
22 ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
23 }
24 RoderEvent::TranscriptItemAppended(event) => {
25 let turn =
26 ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
27 if let Some(item) = &event.item {
28 turn.items.push(item.clone());
29 }
30 }
31 RoderEvent::TurnCompleted(event) => {
32 let turn =
33 ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
34 turn.completed_at = Some(event.timestamp);
35 turn.usage = event.usage.clone();
36 turn.finish_reason = event.finish_reason.clone();
37 }
38 RoderEvent::TurnFailed(event) => {
39 let turn =
40 ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
41 turn.completed_at = Some(event.timestamp);
42 turn.usage = event.usage.clone();
43 }
44 RoderEvent::TurnInterrupted(event) => {
45 let turn =
46 ensure_turn_record(&mut turns, thread_id, &event.turn_id, event.timestamp);
47 turn.completed_at = Some(event.timestamp);
48 }
49 _ => continue,
50 }
51 }
52 turns
53}
54
55fn ensure_turn_record<'a>(
56 turns: &'a mut Vec<TurnRecord>,
57 thread_id: &ThreadId,
58 turn_id: &str,
59 created_at: OffsetDateTime,
60) -> &'a mut TurnRecord {
61 if let Some(index) = turns.iter().position(|turn| turn.turn_id == turn_id) {
62 return &mut turns[index];
63 }
64 turns.push(TurnRecord {
65 thread_id: thread_id.clone(),
66 turn_id: turn_id.to_string(),
67 items: Vec::new(),
68 created_at,
69 completed_at: None,
70 usage: None,
71 finish_reason: None,
72 });
73 turns.last_mut().expect("turn was just pushed")
74}
75
76pub fn project_thread_item_events(events: &[ThreadItemEvent]) -> Vec<ThreadItemTurnRecord> {
77 let mut turns = Vec::<ThreadItemTurnRecord>::new();
78 for event in events {
79 let turn_index = turns
80 .iter()
81 .position(|turn| turn.turn_id == event.turn_id)
82 .unwrap_or_else(|| {
83 turns.push(ThreadItemTurnRecord {
84 thread_id: event.thread_id.clone(),
85 turn_id: event.turn_id.clone(),
86 created_at: event.timestamp,
87 items: Vec::new(),
88 });
89 turns.len() - 1
90 });
91 apply_thread_item_event(&mut turns[turn_index].items, &event.event);
92 }
93 turns
94}
95
96fn apply_thread_item_event(items: &mut Vec<ThreadItem>, event: &ThreadItemEventKind) {
97 match event {
98 ThreadItemEventKind::ItemStarted { item } => {
99 if !upsert_item_if_missing(items, item.clone()) {
100 merge_started_item(items, item);
101 }
102 }
103 ThreadItemEventKind::ItemDelta { item_id, delta } => {
104 let index = item_index(items, item_id).unwrap_or_else(|| {
105 items.push(item_for_delta(item_id, delta));
106 items.len() - 1
107 });
108 apply_thread_item_delta(&mut items[index], delta);
109 }
110 ThreadItemEventKind::ItemCompleted { item } => {
111 if let Some(index) = item_index(items, item.id()) {
112 merge_completed_item(&mut items[index], item.clone());
113 } else {
114 items.push(item.clone());
115 }
116 }
117 }
118}
119
120fn upsert_item_if_missing(items: &mut Vec<ThreadItem>, item: ThreadItem) -> bool {
121 if item_index(items, item.id()).is_some() {
122 return false;
123 }
124 items.push(item);
125 true
126}
127
128fn merge_started_item(items: &mut [ThreadItem], incoming: &ThreadItem) {
129 if let Some(index) = item_index(items, incoming.id()) {
130 merge_completed_item(&mut items[index], incoming.clone());
131 }
132}
133
134fn item_index(items: &[ThreadItem], item_id: &str) -> Option<usize> {
135 items.iter().position(|item| item.id() == item_id)
136}
137
138fn item_for_delta(item_id: &str, delta: &ThreadItemDelta) -> ThreadItem {
139 match delta {
140 ThreadItemDelta::AgentMessageText { phase, .. } => ThreadItem::AgentMessage {
141 id: item_id.to_string(),
142 text: String::new(),
143 phase: phase.clone(),
144 status: Some(ThreadItemStatus::InProgress),
145 },
146 ThreadItemDelta::ReasoningText { content_index, .. } => ThreadItem::Reasoning {
147 id: item_id.to_string(),
148 summary: Vec::new(),
149 content: vec![String::new(); content_index + 1],
150 status: Some(ThreadItemStatus::InProgress),
151 },
152 ThreadItemDelta::ReasoningSummaryPartAdded { summary_index }
153 | ThreadItemDelta::ReasoningSummaryText { summary_index, .. } => ThreadItem::Reasoning {
154 id: item_id.to_string(),
155 summary: vec![String::new(); summary_index + 1],
156 content: Vec::new(),
157 status: Some(ThreadItemStatus::InProgress),
158 },
159 }
160}
161
162fn apply_thread_item_delta(item: &mut ThreadItem, delta: &ThreadItemDelta) {
163 match (item, delta) {
164 (
165 ThreadItem::AgentMessage {
166 text,
167 phase,
168 status,
169 ..
170 },
171 ThreadItemDelta::AgentMessageText {
172 delta,
173 phase: delta_phase,
174 },
175 ) => {
176 text.push_str(delta);
177 if phase.is_none() {
178 *phase = delta_phase.clone();
179 }
180 *status = Some(ThreadItemStatus::InProgress);
181 }
182 (
183 ThreadItem::Reasoning {
184 content, status, ..
185 },
186 ThreadItemDelta::ReasoningText {
187 delta,
188 content_index,
189 },
190 ) => {
191 ensure_vec_slot(content, *content_index);
192 content[*content_index].push_str(delta);
193 *status = Some(ThreadItemStatus::InProgress);
194 }
195 (
196 ThreadItem::Reasoning {
197 summary, status, ..
198 },
199 ThreadItemDelta::ReasoningSummaryPartAdded { summary_index },
200 ) => {
201 ensure_vec_slot(summary, *summary_index);
202 *status = Some(ThreadItemStatus::InProgress);
203 }
204 (
205 ThreadItem::Reasoning {
206 summary, status, ..
207 },
208 ThreadItemDelta::ReasoningSummaryText {
209 delta,
210 summary_index,
211 },
212 ) => {
213 ensure_vec_slot(summary, *summary_index);
214 summary[*summary_index].push_str(delta);
215 *status = Some(ThreadItemStatus::InProgress);
216 }
217 (item, delta) => {
218 *item = item_for_delta(item.id(), delta);
219 apply_thread_item_delta(item, delta);
220 }
221 }
222}
223
224fn ensure_vec_slot(values: &mut Vec<String>, index: usize) {
225 while values.len() <= index {
226 values.push(String::new());
227 }
228}
229
230fn merge_completed_item(existing: &mut ThreadItem, incoming: ThreadItem) {
231 match (&mut *existing, incoming) {
232 (
233 ThreadItem::Reasoning {
234 summary,
235 content,
236 status,
237 ..
238 },
239 ThreadItem::Reasoning {
240 summary: incoming_summary,
241 content: incoming_content,
242 status: incoming_status,
243 ..
244 },
245 ) => {
246 if !incoming_summary.is_empty() {
247 *summary = incoming_summary;
248 }
249 if !incoming_content.is_empty() {
250 *content = incoming_content;
251 }
252 *status = incoming_status.or(Some(ThreadItemStatus::Completed));
253 }
254 (
255 ThreadItem::ToolExecution {
256 status,
257 input,
258 output,
259 error,
260 ..
261 },
262 ThreadItem::ToolExecution {
263 status: incoming_status,
264 input: incoming_input,
265 output: incoming_output,
266 error: incoming_error,
267 ..
268 },
269 ) => {
270 *status = incoming_status;
271 if incoming_input.is_some() {
272 *input = incoming_input;
273 }
274 if incoming_output.is_some() {
275 *output = incoming_output;
276 }
277 if incoming_error.is_some() {
278 *error = incoming_error;
279 }
280 }
281 (slot, incoming) => *slot = incoming,
282 }
283}