1use futures::StreamExt;
2
3use crate::event::{Event, MessageEvent, Role, UsageData, UsageDeltaEvent};
4use crate::runner::EventStream;
5
6pub struct NormalizeConfig {
8 pub cwd: Option<String>,
9 pub model: Option<String>,
10 pub prompt: Option<String>,
11}
12
13pub fn normalize_stream(stream: EventStream, config: NormalizeConfig) -> EventStream {
17 let state = NormalizeState {
18 session_id: String::new(),
19 start_timestamp_ms: 0,
20 last_assistant_text: String::new(),
21 accumulated_usage: UsageData::default(),
22 has_usage: false,
23 cwd: config.cwd,
24 model: config.model,
25 seen_user_message: false,
26 seen_usage_delta: false,
27 prompt: config.prompt,
28 };
29
30 let normalized = stream
31 .scan(state, |state, item| {
32 let results: Vec<crate::Result<Event>> = match item {
33 Ok(event) => state.enrich(event).into_iter().map(Ok).collect(),
34 err => vec![err],
35 };
36 std::future::ready(Some(futures::stream::iter(results)))
37 })
38 .flatten();
39
40 Box::pin(normalized)
41}
42
43struct NormalizeState {
44 session_id: String,
45 start_timestamp_ms: u64,
46 last_assistant_text: String,
47 accumulated_usage: UsageData,
48 has_usage: bool,
49 cwd: Option<String>,
50 model: Option<String>,
51 seen_user_message: bool,
52 seen_usage_delta: bool,
53 prompt: Option<String>,
54}
55
56impl NormalizeState {
57 fn accumulate_usage(&mut self, usage: &UsageData) {
58 self.has_usage = true;
59 if let Some(v) = usage.input_tokens {
60 *self.accumulated_usage.input_tokens.get_or_insert(0) += v;
61 }
62 if let Some(v) = usage.output_tokens {
63 *self.accumulated_usage.output_tokens.get_or_insert(0) += v;
64 }
65 if let Some(v) = usage.cache_read_tokens {
66 *self.accumulated_usage.cache_read_tokens.get_or_insert(0) += v;
67 }
68 if let Some(v) = usage.cache_creation_tokens {
69 *self.accumulated_usage.cache_creation_tokens.get_or_insert(0) += v;
70 }
71 if let Some(v) = usage.cost_usd {
72 *self.accumulated_usage.cost_usd.get_or_insert(0.0) += v;
73 }
74 }
75
76 fn make_user_message(&self, timestamp_ms: u64) -> Event {
78 Event::Message(MessageEvent {
79 role: Role::User,
80 text: self.prompt.clone().unwrap_or_default(),
81 usage: None,
82 timestamp_ms,
83 })
84 }
85
86 fn maybe_prepend_user_message(&mut self, event: Event, timestamp_ms: u64) -> Vec<Event> {
89 if !self.seen_user_message && self.prompt.is_some() {
90 self.seen_user_message = true;
91 vec![self.make_user_message(timestamp_ms), event]
92 } else {
93 vec![event]
94 }
95 }
96
97 fn enrich(&mut self, event: Event) -> Vec<Event> {
98 match event {
99 Event::SessionStart(mut e) => {
100 self.session_id = e.session_id.clone();
101 self.start_timestamp_ms = e.timestamp_ms;
102
103 if e.model.is_none() {
104 e.model = self.model.clone();
105 }
106 if e.cwd.is_none() {
107 e.cwd = self.cwd.clone();
108 }
109
110 vec![Event::SessionStart(e)]
113 }
114 Event::Message(ref e) if e.role == Role::User => {
115 self.seen_user_message = true;
116 vec![event]
117 }
118 Event::Message(ref e) if e.role == Role::Assistant && !e.text.is_empty() => {
119 self.last_assistant_text = e.text.clone();
120 let ts = e.timestamp_ms;
121 self.maybe_prepend_user_message(event, ts)
122 }
123 Event::UsageDelta(ref e) => {
124 self.seen_usage_delta = true;
125 self.accumulate_usage(&e.usage);
126 let ts = e.timestamp_ms;
127 self.maybe_prepend_user_message(event, ts)
128 }
129 Event::Result(mut e) => {
130 if e.text.is_empty() && !self.last_assistant_text.is_empty() {
132 e.text = self.last_assistant_text.clone();
133 }
134 if e.session_id.is_empty() && !self.session_id.is_empty() {
136 e.session_id = self.session_id.clone();
137 }
138 if e.duration_ms.is_none() && self.start_timestamp_ms > 0 && e.timestamp_ms > 0 {
140 e.duration_ms = Some(e.timestamp_ms.saturating_sub(self.start_timestamp_ms));
141 }
142 if e.usage.is_none() && self.has_usage {
144 e.usage = Some(self.accumulated_usage.clone());
145 }
146 if e.total_cost_usd.is_none() {
148 if let Some(ref usage) = e.usage {
149 if let Some(cost) = usage.cost_usd {
150 e.total_cost_usd = Some(cost);
151 }
152 }
153 }
154
155 let ts = e.timestamp_ms;
156 let result_event = Event::Result(e);
157
158 let mut events = self.maybe_prepend_user_message(result_event, ts);
160
161 if !self.seen_usage_delta {
163 if let Some(Event::Result(ref r)) = events.last() {
165 if let Some(ref usage) = r.usage {
166 let synthetic_usage = Event::UsageDelta(UsageDeltaEvent {
167 usage: usage.clone(),
168 timestamp_ms: ts,
169 });
170 let Some(result_ev) = events.pop() else {
172 return events;
173 };
174 events.push(synthetic_usage);
175 events.push(result_ev);
176 }
177 }
178 }
179
180 events
181 }
182 other => {
183 let ts = match &other {
184 Event::TextDelta(e) => e.timestamp_ms,
185 Event::ToolStart(e) => e.timestamp_ms,
186 Event::ToolEnd(e) => e.timestamp_ms,
187 Event::Error(e) => e.timestamp_ms,
188 _ => 0,
189 };
190 self.maybe_prepend_user_message(other, ts)
191 }
192 }
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::event::*;
200 use futures::StreamExt;
201
202 fn make_stream(events: Vec<Event>) -> EventStream {
203 let iter = events.into_iter().map(Ok);
204 Box::pin(futures::stream::iter(iter))
205 }
206
207 #[tokio::test]
208 async fn session_start_fills_missing_model_and_cwd() {
209 let events = vec![Event::SessionStart(SessionStartEvent {
210 session_id: "s1".into(),
211 agent: "codex".into(),
212 model: None,
213 cwd: None,
214 timestamp_ms: 1000,
215 })];
216
217 let config = NormalizeConfig {
218 cwd: Some("/home/user".into()),
219 model: Some("gpt-5-codex".into()),
220 prompt: None,
221 };
222
223 let mut stream = normalize_stream(make_stream(events), config);
224 let event = stream.next().await.unwrap().unwrap();
225
226 if let Event::SessionStart(e) = event {
227 assert_eq!(e.model, Some("gpt-5-codex".into()));
228 assert_eq!(e.cwd, Some("/home/user".into()));
229 } else {
230 panic!("expected SessionStart");
231 }
232 }
233
234 #[tokio::test]
235 async fn session_start_preserves_existing_model_and_cwd() {
236 let events = vec![Event::SessionStart(SessionStartEvent {
237 session_id: "s1".into(),
238 agent: "claude".into(),
239 model: Some("claude-opus-4-6".into()),
240 cwd: Some("/original".into()),
241 timestamp_ms: 1000,
242 })];
243
244 let config = NormalizeConfig {
245 cwd: Some("/fallback".into()),
246 model: Some("fallback-model".into()),
247 prompt: None,
248 };
249
250 let mut stream = normalize_stream(make_stream(events), config);
251 let event = stream.next().await.unwrap().unwrap();
252
253 if let Event::SessionStart(e) = event {
254 assert_eq!(e.model, Some("claude-opus-4-6".into()));
255 assert_eq!(e.cwd, Some("/original".into()));
256 } else {
257 panic!("expected SessionStart");
258 }
259 }
260
261 #[tokio::test]
262 async fn result_text_filled_from_last_assistant_message() {
263 let events = vec![
264 Event::SessionStart(SessionStartEvent {
265 session_id: "s1".into(),
266 agent: "codex".into(),
267 model: None,
268 cwd: None,
269 timestamp_ms: 1000,
270 }),
271 Event::Message(MessageEvent {
272 role: Role::Assistant,
273 text: "Hello from codex!".into(),
274 usage: None,
275 timestamp_ms: 1500,
276 }),
277 Event::Result(ResultEvent {
278 success: true,
279 text: String::new(),
280 session_id: String::new(),
281 duration_ms: None,
282 total_cost_usd: None,
283 usage: None,
284 timestamp_ms: 2000,
285 }),
286 ];
287
288 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
290 let stream = normalize_stream(make_stream(events), config);
291 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
292
293 if let Event::Result(ref r) = collected[2] {
294 assert_eq!(r.text, "Hello from codex!");
295 assert_eq!(r.session_id, "s1");
296 assert_eq!(r.duration_ms, Some(1000));
297 } else {
298 panic!("expected Result");
299 }
300 }
301
302 #[tokio::test]
303 async fn result_duration_computed_from_timestamps() {
304 let events = vec![
305 Event::SessionStart(SessionStartEvent {
306 session_id: "s1".into(),
307 agent: "opencode".into(),
308 model: None,
309 cwd: None,
310 timestamp_ms: 5000,
311 }),
312 Event::Result(ResultEvent {
313 success: true,
314 text: "done".into(),
315 session_id: "s1".into(),
316 duration_ms: None,
317 total_cost_usd: None,
318 usage: None,
319 timestamp_ms: 8000,
320 }),
321 ];
322
323 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
324 let stream = normalize_stream(make_stream(events), config);
325 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
326
327 if let Event::Result(ref r) = collected[1] {
328 assert_eq!(r.duration_ms, Some(3000));
329 } else {
330 panic!("expected Result");
331 }
332 }
333
334 #[tokio::test]
335 async fn result_preserves_existing_duration() {
336 let events = vec![
337 Event::SessionStart(SessionStartEvent {
338 session_id: "s1".into(),
339 agent: "claude".into(),
340 model: None,
341 cwd: None,
342 timestamp_ms: 1000,
343 }),
344 Event::Result(ResultEvent {
345 success: true,
346 text: "done".into(),
347 session_id: "s1".into(),
348 duration_ms: Some(999),
349 total_cost_usd: None,
350 usage: None,
351 timestamp_ms: 5000,
352 }),
353 ];
354
355 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
356 let stream = normalize_stream(make_stream(events), config);
357 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
358
359 if let Event::Result(ref r) = collected[1] {
360 assert_eq!(r.duration_ms, Some(999));
361 } else {
362 panic!("expected Result");
363 }
364 }
365
366 #[tokio::test]
367 async fn result_usage_filled_from_accumulated_deltas() {
368 let events = vec![
369 Event::SessionStart(SessionStartEvent {
370 session_id: "s1".into(),
371 agent: "codex".into(),
372 model: None,
373 cwd: None,
374 timestamp_ms: 1000,
375 }),
376 Event::UsageDelta(UsageDeltaEvent {
377 usage: UsageData {
378 input_tokens: Some(100),
379 output_tokens: Some(50),
380 cache_read_tokens: None,
381 cache_creation_tokens: None,
382 cost_usd: Some(0.01),
383 },
384 timestamp_ms: 1500,
385 }),
386 Event::UsageDelta(UsageDeltaEvent {
387 usage: UsageData {
388 input_tokens: Some(200),
389 output_tokens: Some(75),
390 cache_read_tokens: None,
391 cache_creation_tokens: None,
392 cost_usd: Some(0.02),
393 },
394 timestamp_ms: 1800,
395 }),
396 Event::Result(ResultEvent {
397 success: true,
398 text: "done".into(),
399 session_id: "s1".into(),
400 duration_ms: None,
401 total_cost_usd: None,
402 usage: None,
403 timestamp_ms: 2000,
404 }),
405 ];
406
407 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
408 let stream = normalize_stream(make_stream(events), config);
409 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
410
411 if let Event::Result(ref r) = collected[3] {
412 let usage = r.usage.as_ref().unwrap();
413 assert_eq!(usage.input_tokens, Some(300));
414 assert_eq!(usage.output_tokens, Some(125));
415 assert!((usage.cost_usd.unwrap() - 0.03).abs() < 1e-10);
416 assert!((r.total_cost_usd.unwrap() - 0.03).abs() < 1e-10);
418 } else {
419 panic!("expected Result");
420 }
421 }
422
423 #[tokio::test]
424 async fn result_preserves_existing_usage() {
425 let existing_usage = UsageData {
426 input_tokens: Some(999),
427 output_tokens: Some(888),
428 cache_read_tokens: None,
429 cache_creation_tokens: None,
430 cost_usd: Some(0.99),
431 };
432
433 let events = vec![
434 Event::UsageDelta(UsageDeltaEvent {
435 usage: UsageData {
436 input_tokens: Some(100),
437 output_tokens: Some(50),
438 cache_read_tokens: None,
439 cache_creation_tokens: None,
440 cost_usd: None,
441 },
442 timestamp_ms: 1500,
443 }),
444 Event::Result(ResultEvent {
445 success: true,
446 text: "done".into(),
447 session_id: "s1".into(),
448 duration_ms: Some(500),
449 total_cost_usd: None,
450 usage: Some(existing_usage.clone()),
451 timestamp_ms: 2000,
452 }),
453 ];
454
455 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
456 let stream = normalize_stream(make_stream(events), config);
457 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
458
459 if let Event::Result(ref r) = collected[1] {
460 assert_eq!(r.usage, Some(existing_usage));
461 } else {
462 panic!("expected Result");
463 }
464 }
465
466 #[tokio::test]
467 async fn passthrough_events_unchanged() {
468 let events = vec![
469 Event::TextDelta(TextDeltaEvent {
470 text: "hello".into(),
471 timestamp_ms: 1000,
472 }),
473 Event::ToolStart(ToolStartEvent {
474 call_id: "c1".into(),
475 tool_name: "read".into(),
476 input: None,
477 timestamp_ms: 1100,
478 }),
479 Event::ToolEnd(ToolEndEvent {
480 call_id: "c1".into(),
481 tool_name: "read".into(),
482 success: true,
483 output: Some("content".into()),
484 usage: None,
485 timestamp_ms: 1200,
486 }),
487 Event::Error(ErrorEvent {
488 message: "oops".into(),
489 code: None,
490 timestamp_ms: 1300,
491 }),
492 ];
493
494 let expected = events.clone();
495 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
496 let stream = normalize_stream(make_stream(events), config);
497 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
498
499 assert_eq!(collected, expected);
500 }
501
502 #[tokio::test]
503 async fn errors_pass_through_stream() {
504 let events: Vec<crate::Result<Event>> = vec![
505 Ok(Event::TextDelta(TextDeltaEvent {
506 text: "hi".into(),
507 timestamp_ms: 1000,
508 })),
509 Err(crate::Error::Other("test error".into())),
510 ];
511
512 let raw: EventStream = Box::pin(futures::stream::iter(events));
513 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
514 let mut stream = normalize_stream(raw, config);
515
516 let first = stream.next().await.unwrap();
517 assert!(first.is_ok());
518
519 let second = stream.next().await.unwrap();
520 assert!(second.is_err());
521 }
522
523 #[tokio::test]
526 async fn user_message_synthesized_after_session_start() {
527 let events = vec![
528 Event::SessionStart(SessionStartEvent {
529 session_id: "s1".into(),
530 agent: "codex".into(),
531 model: None,
532 cwd: None,
533 timestamp_ms: 1000,
534 }),
535 Event::Message(MessageEvent {
536 role: Role::Assistant,
537 text: "Hello!".into(),
538 usage: None,
539 timestamp_ms: 1500,
540 }),
541 Event::Result(ResultEvent {
542 success: true,
543 text: "Hello!".into(),
544 session_id: "s1".into(),
545 duration_ms: Some(500),
546 total_cost_usd: None,
547 usage: None,
548 timestamp_ms: 2000,
549 }),
550 ];
551
552 let config = NormalizeConfig {
553 cwd: None,
554 model: None,
555 prompt: Some("say hello".into()),
556 };
557 let stream = normalize_stream(make_stream(events), config);
558 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
559
560 assert_eq!(collected.len(), 4, "events: {collected:?}");
562 assert!(matches!(&collected[0], Event::SessionStart(_)));
563 if let Event::Message(ref m) = collected[1] {
564 assert_eq!(m.role, Role::User);
565 assert_eq!(m.text, "say hello");
566 assert_eq!(m.timestamp_ms, 1500);
567 } else {
568 panic!("expected synthetic user Message at [1], got {:?}", collected[1]);
569 }
570 assert!(matches!(&collected[2], Event::Message(m) if m.role == Role::Assistant));
571 assert!(matches!(&collected[3], Event::Result(_)));
572 }
573
574 #[tokio::test]
575 async fn user_message_not_duplicated_when_adapter_sends_one() {
576 let events = vec![
577 Event::SessionStart(SessionStartEvent {
578 session_id: "s1".into(),
579 agent: "cursor".into(),
580 model: None,
581 cwd: None,
582 timestamp_ms: 1000,
583 }),
584 Event::Message(MessageEvent {
585 role: Role::User,
586 text: "say hello".into(),
587 usage: None,
588 timestamp_ms: 1200,
589 }),
590 Event::Message(MessageEvent {
591 role: Role::Assistant,
592 text: "Hello!".into(),
593 usage: None,
594 timestamp_ms: 1500,
595 }),
596 Event::Result(ResultEvent {
597 success: true,
598 text: "Hello!".into(),
599 session_id: "s1".into(),
600 duration_ms: Some(500),
601 total_cost_usd: None,
602 usage: None,
603 timestamp_ms: 2000,
604 }),
605 ];
606
607 let config = NormalizeConfig {
608 cwd: None,
609 model: None,
610 prompt: Some("say hello".into()),
611 };
612 let stream = normalize_stream(make_stream(events), config);
613 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
614
615 let user_messages: Vec<_> = collected
617 .iter()
618 .filter(|e| matches!(e, Event::Message(m) if m.role == Role::User))
619 .collect();
620 assert_eq!(user_messages.len(), 1, "expected exactly 1 user message, got {user_messages:?}");
621 }
622
623 #[tokio::test]
624 async fn user_message_not_injected_without_prompt() {
625 let events = vec![
626 Event::SessionStart(SessionStartEvent {
627 session_id: "s1".into(),
628 agent: "codex".into(),
629 model: None,
630 cwd: None,
631 timestamp_ms: 1000,
632 }),
633 Event::Message(MessageEvent {
634 role: Role::Assistant,
635 text: "Hello!".into(),
636 usage: None,
637 timestamp_ms: 1500,
638 }),
639 ];
640
641 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
642 let stream = normalize_stream(make_stream(events), config);
643 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
644
645 assert_eq!(collected.len(), 2);
647 assert!(matches!(&collected[0], Event::SessionStart(_)));
648 assert!(matches!(&collected[1], Event::Message(m) if m.role == Role::Assistant));
649 }
650
651 #[tokio::test]
652 async fn total_cost_filled_from_accumulated_usage() {
653 let events = vec![
654 Event::UsageDelta(UsageDeltaEvent {
655 usage: UsageData {
656 input_tokens: Some(100),
657 output_tokens: Some(50),
658 cache_read_tokens: None,
659 cache_creation_tokens: None,
660 cost_usd: Some(0.05),
661 },
662 timestamp_ms: 1000,
663 }),
664 Event::Result(ResultEvent {
665 success: true,
666 text: "done".into(),
667 session_id: "s1".into(),
668 duration_ms: Some(500),
669 total_cost_usd: None,
670 usage: None,
671 timestamp_ms: 2000,
672 }),
673 ];
674
675 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
676 let stream = normalize_stream(make_stream(events), config);
677 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
678
679 if let Event::Result(ref r) = collected[1] {
680 assert!((r.total_cost_usd.unwrap() - 0.05).abs() < 1e-10);
681 } else {
682 panic!("expected Result");
683 }
684 }
685
686 #[tokio::test]
687 async fn usage_delta_synthesized_before_result() {
688 let events = vec![
690 Event::SessionStart(SessionStartEvent {
691 session_id: "s1".into(),
692 agent: "claude".into(),
693 model: None,
694 cwd: None,
695 timestamp_ms: 1000,
696 }),
697 Event::Result(ResultEvent {
698 success: true,
699 text: "done".into(),
700 session_id: "s1".into(),
701 duration_ms: Some(1000),
702 total_cost_usd: Some(0.01),
703 usage: Some(UsageData {
704 input_tokens: Some(200),
705 output_tokens: Some(100),
706 cache_read_tokens: None,
707 cache_creation_tokens: None,
708 cost_usd: Some(0.01),
709 }),
710 timestamp_ms: 2000,
711 }),
712 ];
713
714 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
715 let stream = normalize_stream(make_stream(events), config);
716 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
717
718 assert_eq!(collected.len(), 3, "events: {collected:?}");
720 assert!(matches!(&collected[0], Event::SessionStart(_)));
721 if let Event::UsageDelta(ref u) = collected[1] {
722 assert_eq!(u.usage.input_tokens, Some(200));
723 assert_eq!(u.usage.output_tokens, Some(100));
724 } else {
725 panic!("expected synthetic UsageDelta at [1], got {:?}", collected[1]);
726 }
727 assert!(matches!(&collected[2], Event::Result(_)));
728 }
729
730 #[tokio::test]
731 async fn no_synthetic_usage_delta_when_already_present() {
732 let events = vec![
733 Event::SessionStart(SessionStartEvent {
734 session_id: "s1".into(),
735 agent: "codex".into(),
736 model: None,
737 cwd: None,
738 timestamp_ms: 1000,
739 }),
740 Event::UsageDelta(UsageDeltaEvent {
741 usage: UsageData {
742 input_tokens: Some(100),
743 output_tokens: Some(50),
744 cache_read_tokens: None,
745 cache_creation_tokens: None,
746 cost_usd: None,
747 },
748 timestamp_ms: 1500,
749 }),
750 Event::Result(ResultEvent {
751 success: true,
752 text: "done".into(),
753 session_id: "s1".into(),
754 duration_ms: Some(1000),
755 total_cost_usd: None,
756 usage: None,
757 timestamp_ms: 2000,
758 }),
759 ];
760
761 let config = NormalizeConfig { cwd: None, model: None, prompt: None };
762 let stream = normalize_stream(make_stream(events), config);
763 let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
764
765 let usage_deltas: Vec<_> = collected
767 .iter()
768 .filter(|e| matches!(e, Event::UsageDelta(_)))
769 .collect();
770 assert_eq!(usage_deltas.len(), 1, "expected exactly 1 UsageDelta, got {usage_deltas:?}");
771 }
772}