Skip to main content

harness/
normalize.rs

1use futures::StreamExt;
2
3use crate::event::{Event, MessageEvent, Role, UsageData, UsageDeltaEvent};
4use crate::runner::EventStream;
5
6/// Configuration for the normalization layer — fallback values from the task config.
7pub struct NormalizeConfig {
8    pub cwd: Option<String>,
9    pub model: Option<String>,
10    pub prompt: Option<String>,
11}
12
13/// Wraps a raw `EventStream` with stateful enrichment so that all consumers
14/// (headless, TUI, tests, library users) get uniform events regardless of which
15/// agent backend produced them.
16pub fn normalize_stream(stream: EventStream, config: NormalizeConfig) -> EventStream {
17    let state = NormalizeState {
18        session_id: String::new(),
19        start_timestamp_ms: 0,
20        last_assistant_text: String::new(),
21        accumulated_usage: UsageData::default(),
22        has_usage: false,
23        cwd: config.cwd,
24        model: config.model,
25        seen_user_message: false,
26        seen_usage_delta: false,
27        prompt: config.prompt,
28    };
29
30    let normalized = stream
31        .scan(state, |state, item| {
32            let results: Vec<crate::Result<Event>> = match item {
33                Ok(event) => state.enrich(event).into_iter().map(Ok).collect(),
34                err => vec![err],
35            };
36            std::future::ready(Some(futures::stream::iter(results)))
37        })
38        .flatten();
39
40    Box::pin(normalized)
41}
42
43struct NormalizeState {
44    session_id: String,
45    start_timestamp_ms: u64,
46    last_assistant_text: String,
47    accumulated_usage: UsageData,
48    has_usage: bool,
49    cwd: Option<String>,
50    model: Option<String>,
51    seen_user_message: bool,
52    seen_usage_delta: bool,
53    prompt: Option<String>,
54}
55
56impl NormalizeState {
57    fn accumulate_usage(&mut self, usage: &UsageData) {
58        self.has_usage = true;
59        if let Some(v) = usage.input_tokens {
60            *self.accumulated_usage.input_tokens.get_or_insert(0) += v;
61        }
62        if let Some(v) = usage.output_tokens {
63            *self.accumulated_usage.output_tokens.get_or_insert(0) += v;
64        }
65        if let Some(v) = usage.cache_read_tokens {
66            *self.accumulated_usage.cache_read_tokens.get_or_insert(0) += v;
67        }
68        if let Some(v) = usage.cache_creation_tokens {
69            *self.accumulated_usage.cache_creation_tokens.get_or_insert(0) += v;
70        }
71        if let Some(v) = usage.cost_usd {
72            *self.accumulated_usage.cost_usd.get_or_insert(0.0) += v;
73        }
74    }
75
76    /// Synthesize a user message event with the stored prompt.
77    fn make_user_message(&self, timestamp_ms: u64) -> Event {
78        Event::Message(MessageEvent {
79            role: Role::User,
80            text: self.prompt.clone().unwrap_or_default(),
81            usage: None,
82            timestamp_ms,
83        })
84    }
85
86    /// Maybe prepend a synthetic user message before the given event.
87    /// Returns the event(s) to emit.
88    fn maybe_prepend_user_message(&mut self, event: Event, timestamp_ms: u64) -> Vec<Event> {
89        if !self.seen_user_message && self.prompt.is_some() {
90            self.seen_user_message = true;
91            vec![self.make_user_message(timestamp_ms), event]
92        } else {
93            vec![event]
94        }
95    }
96
97    fn enrich(&mut self, event: Event) -> Vec<Event> {
98        match event {
99            Event::SessionStart(mut e) => {
100                self.session_id = e.session_id.clone();
101                self.start_timestamp_ms = e.timestamp_ms;
102
103                if e.model.is_none() {
104                    e.model = self.model.clone();
105                }
106                if e.cwd.is_none() {
107                    e.cwd = self.cwd.clone();
108                }
109
110                // SessionStart itself is never preceded by a user message —
111                // the user message goes after it.
112                vec![Event::SessionStart(e)]
113            }
114            Event::Message(ref e) if e.role == Role::User => {
115                self.seen_user_message = true;
116                vec![event]
117            }
118            Event::Message(ref e) if e.role == Role::Assistant && !e.text.is_empty() => {
119                self.last_assistant_text = e.text.clone();
120                let ts = e.timestamp_ms;
121                self.maybe_prepend_user_message(event, ts)
122            }
123            Event::UsageDelta(ref e) => {
124                self.seen_usage_delta = true;
125                self.accumulate_usage(&e.usage);
126                let ts = e.timestamp_ms;
127                self.maybe_prepend_user_message(event, ts)
128            }
129            Event::Result(mut e) => {
130                // Fill text from last assistant message if empty.
131                if e.text.is_empty() && !self.last_assistant_text.is_empty() {
132                    e.text = self.last_assistant_text.clone();
133                }
134                // Fill session_id if empty.
135                if e.session_id.is_empty() && !self.session_id.is_empty() {
136                    e.session_id = self.session_id.clone();
137                }
138                // Compute duration from timestamps if not set.
139                if e.duration_ms.is_none() && self.start_timestamp_ms > 0 && e.timestamp_ms > 0 {
140                    e.duration_ms = Some(e.timestamp_ms.saturating_sub(self.start_timestamp_ms));
141                }
142                // Fill usage from accumulated deltas if not set.
143                if e.usage.is_none() && self.has_usage {
144                    e.usage = Some(self.accumulated_usage.clone());
145                }
146                // Fill total_cost_usd from accumulated usage cost if not set.
147                if e.total_cost_usd.is_none() {
148                    if let Some(ref usage) = e.usage {
149                        if let Some(cost) = usage.cost_usd {
150                            e.total_cost_usd = Some(cost);
151                        }
152                    }
153                }
154
155                let ts = e.timestamp_ms;
156                let result_event = Event::Result(e);
157
158                // Maybe prepend user message.
159                let mut events = self.maybe_prepend_user_message(result_event, ts);
160
161                // Synthesize UsageDelta before Result if none was seen.
162                if !self.seen_usage_delta {
163                    // Extract usage from the Result event (it's the last in events).
164                    if let Some(Event::Result(ref r)) = events.last() {
165                        if let Some(ref usage) = r.usage {
166                            let synthetic_usage = Event::UsageDelta(UsageDeltaEvent {
167                                usage: usage.clone(),
168                                timestamp_ms: ts,
169                            });
170                            // Insert before the last element (the Result).
171                            let Some(result_ev) = events.pop() else {
172                                return events;
173                            };
174                            events.push(synthetic_usage);
175                            events.push(result_ev);
176                        }
177                    }
178                }
179
180                events
181            }
182            other => {
183                let ts = match &other {
184                    Event::TextDelta(e) => e.timestamp_ms,
185                    Event::ToolStart(e) => e.timestamp_ms,
186                    Event::ToolEnd(e) => e.timestamp_ms,
187                    Event::Error(e) => e.timestamp_ms,
188                    _ => 0,
189                };
190                self.maybe_prepend_user_message(other, ts)
191            }
192        }
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::event::*;
200    use futures::StreamExt;
201
202    fn make_stream(events: Vec<Event>) -> EventStream {
203        let iter = events.into_iter().map(Ok);
204        Box::pin(futures::stream::iter(iter))
205    }
206
207    #[tokio::test]
208    async fn session_start_fills_missing_model_and_cwd() {
209        let events = vec![Event::SessionStart(SessionStartEvent {
210            session_id: "s1".into(),
211            agent: "codex".into(),
212            model: None,
213            cwd: None,
214            timestamp_ms: 1000,
215        })];
216
217        let config = NormalizeConfig {
218            cwd: Some("/home/user".into()),
219            model: Some("gpt-5-codex".into()),
220            prompt: None,
221        };
222
223        let mut stream = normalize_stream(make_stream(events), config);
224        let event = stream.next().await.unwrap().unwrap();
225
226        if let Event::SessionStart(e) = event {
227            assert_eq!(e.model, Some("gpt-5-codex".into()));
228            assert_eq!(e.cwd, Some("/home/user".into()));
229        } else {
230            panic!("expected SessionStart");
231        }
232    }
233
234    #[tokio::test]
235    async fn session_start_preserves_existing_model_and_cwd() {
236        let events = vec![Event::SessionStart(SessionStartEvent {
237            session_id: "s1".into(),
238            agent: "claude".into(),
239            model: Some("claude-opus-4-6".into()),
240            cwd: Some("/original".into()),
241            timestamp_ms: 1000,
242        })];
243
244        let config = NormalizeConfig {
245            cwd: Some("/fallback".into()),
246            model: Some("fallback-model".into()),
247            prompt: None,
248        };
249
250        let mut stream = normalize_stream(make_stream(events), config);
251        let event = stream.next().await.unwrap().unwrap();
252
253        if let Event::SessionStart(e) = event {
254            assert_eq!(e.model, Some("claude-opus-4-6".into()));
255            assert_eq!(e.cwd, Some("/original".into()));
256        } else {
257            panic!("expected SessionStart");
258        }
259    }
260
261    #[tokio::test]
262    async fn result_text_filled_from_last_assistant_message() {
263        let events = vec![
264            Event::SessionStart(SessionStartEvent {
265                session_id: "s1".into(),
266                agent: "codex".into(),
267                model: None,
268                cwd: None,
269                timestamp_ms: 1000,
270            }),
271            Event::Message(MessageEvent {
272                role: Role::Assistant,
273                text: "Hello from codex!".into(),
274                usage: None,
275                timestamp_ms: 1500,
276            }),
277            Event::Result(ResultEvent {
278                success: true,
279                text: String::new(),
280                session_id: String::new(),
281                duration_ms: None,
282                total_cost_usd: None,
283                usage: None,
284                timestamp_ms: 2000,
285            }),
286        ];
287
288        // No prompt → no synthetic user message, indices unchanged.
289        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
290        let stream = normalize_stream(make_stream(events), config);
291        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
292
293        if let Event::Result(ref r) = collected[2] {
294            assert_eq!(r.text, "Hello from codex!");
295            assert_eq!(r.session_id, "s1");
296            assert_eq!(r.duration_ms, Some(1000));
297        } else {
298            panic!("expected Result");
299        }
300    }
301
302    #[tokio::test]
303    async fn result_duration_computed_from_timestamps() {
304        let events = vec![
305            Event::SessionStart(SessionStartEvent {
306                session_id: "s1".into(),
307                agent: "opencode".into(),
308                model: None,
309                cwd: None,
310                timestamp_ms: 5000,
311            }),
312            Event::Result(ResultEvent {
313                success: true,
314                text: "done".into(),
315                session_id: "s1".into(),
316                duration_ms: None,
317                total_cost_usd: None,
318                usage: None,
319                timestamp_ms: 8000,
320            }),
321        ];
322
323        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
324        let stream = normalize_stream(make_stream(events), config);
325        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
326
327        if let Event::Result(ref r) = collected[1] {
328            assert_eq!(r.duration_ms, Some(3000));
329        } else {
330            panic!("expected Result");
331        }
332    }
333
334    #[tokio::test]
335    async fn result_preserves_existing_duration() {
336        let events = vec![
337            Event::SessionStart(SessionStartEvent {
338                session_id: "s1".into(),
339                agent: "claude".into(),
340                model: None,
341                cwd: None,
342                timestamp_ms: 1000,
343            }),
344            Event::Result(ResultEvent {
345                success: true,
346                text: "done".into(),
347                session_id: "s1".into(),
348                duration_ms: Some(999),
349                total_cost_usd: None,
350                usage: None,
351                timestamp_ms: 5000,
352            }),
353        ];
354
355        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
356        let stream = normalize_stream(make_stream(events), config);
357        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
358
359        if let Event::Result(ref r) = collected[1] {
360            assert_eq!(r.duration_ms, Some(999));
361        } else {
362            panic!("expected Result");
363        }
364    }
365
366    #[tokio::test]
367    async fn result_usage_filled_from_accumulated_deltas() {
368        let events = vec![
369            Event::SessionStart(SessionStartEvent {
370                session_id: "s1".into(),
371                agent: "codex".into(),
372                model: None,
373                cwd: None,
374                timestamp_ms: 1000,
375            }),
376            Event::UsageDelta(UsageDeltaEvent {
377                usage: UsageData {
378                    input_tokens: Some(100),
379                    output_tokens: Some(50),
380                    cache_read_tokens: None,
381                    cache_creation_tokens: None,
382                    cost_usd: Some(0.01),
383                },
384                timestamp_ms: 1500,
385            }),
386            Event::UsageDelta(UsageDeltaEvent {
387                usage: UsageData {
388                    input_tokens: Some(200),
389                    output_tokens: Some(75),
390                    cache_read_tokens: None,
391                    cache_creation_tokens: None,
392                    cost_usd: Some(0.02),
393                },
394                timestamp_ms: 1800,
395            }),
396            Event::Result(ResultEvent {
397                success: true,
398                text: "done".into(),
399                session_id: "s1".into(),
400                duration_ms: None,
401                total_cost_usd: None,
402                usage: None,
403                timestamp_ms: 2000,
404            }),
405        ];
406
407        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
408        let stream = normalize_stream(make_stream(events), config);
409        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
410
411        if let Event::Result(ref r) = collected[3] {
412            let usage = r.usage.as_ref().unwrap();
413            assert_eq!(usage.input_tokens, Some(300));
414            assert_eq!(usage.output_tokens, Some(125));
415            assert!((usage.cost_usd.unwrap() - 0.03).abs() < 1e-10);
416            // total_cost_usd should be filled from accumulated cost.
417            assert!((r.total_cost_usd.unwrap() - 0.03).abs() < 1e-10);
418        } else {
419            panic!("expected Result");
420        }
421    }
422
423    #[tokio::test]
424    async fn result_preserves_existing_usage() {
425        let existing_usage = UsageData {
426            input_tokens: Some(999),
427            output_tokens: Some(888),
428            cache_read_tokens: None,
429            cache_creation_tokens: None,
430            cost_usd: Some(0.99),
431        };
432
433        let events = vec![
434            Event::UsageDelta(UsageDeltaEvent {
435                usage: UsageData {
436                    input_tokens: Some(100),
437                    output_tokens: Some(50),
438                    cache_read_tokens: None,
439                    cache_creation_tokens: None,
440                    cost_usd: None,
441                },
442                timestamp_ms: 1500,
443            }),
444            Event::Result(ResultEvent {
445                success: true,
446                text: "done".into(),
447                session_id: "s1".into(),
448                duration_ms: Some(500),
449                total_cost_usd: None,
450                usage: Some(existing_usage.clone()),
451                timestamp_ms: 2000,
452            }),
453        ];
454
455        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
456        let stream = normalize_stream(make_stream(events), config);
457        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
458
459        if let Event::Result(ref r) = collected[1] {
460            assert_eq!(r.usage, Some(existing_usage));
461        } else {
462            panic!("expected Result");
463        }
464    }
465
466    #[tokio::test]
467    async fn passthrough_events_unchanged() {
468        let events = vec![
469            Event::TextDelta(TextDeltaEvent {
470                text: "hello".into(),
471                timestamp_ms: 1000,
472            }),
473            Event::ToolStart(ToolStartEvent {
474                call_id: "c1".into(),
475                tool_name: "read".into(),
476                input: None,
477                timestamp_ms: 1100,
478            }),
479            Event::ToolEnd(ToolEndEvent {
480                call_id: "c1".into(),
481                tool_name: "read".into(),
482                success: true,
483                output: Some("content".into()),
484                usage: None,
485                timestamp_ms: 1200,
486            }),
487            Event::Error(ErrorEvent {
488                message: "oops".into(),
489                code: None,
490                timestamp_ms: 1300,
491            }),
492        ];
493
494        let expected = events.clone();
495        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
496        let stream = normalize_stream(make_stream(events), config);
497        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
498
499        assert_eq!(collected, expected);
500    }
501
502    #[tokio::test]
503    async fn errors_pass_through_stream() {
504        let events: Vec<crate::Result<Event>> = vec![
505            Ok(Event::TextDelta(TextDeltaEvent {
506                text: "hi".into(),
507                timestamp_ms: 1000,
508            })),
509            Err(crate::Error::Other("test error".into())),
510        ];
511
512        let raw: EventStream = Box::pin(futures::stream::iter(events));
513        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
514        let mut stream = normalize_stream(raw, config);
515
516        let first = stream.next().await.unwrap();
517        assert!(first.is_ok());
518
519        let second = stream.next().await.unwrap();
520        assert!(second.is_err());
521    }
522
523    // ─── New round-2 tests ────────────────────────────────────────
524
525    #[tokio::test]
526    async fn user_message_synthesized_after_session_start() {
527        let events = vec![
528            Event::SessionStart(SessionStartEvent {
529                session_id: "s1".into(),
530                agent: "codex".into(),
531                model: None,
532                cwd: None,
533                timestamp_ms: 1000,
534            }),
535            Event::Message(MessageEvent {
536                role: Role::Assistant,
537                text: "Hello!".into(),
538                usage: None,
539                timestamp_ms: 1500,
540            }),
541            Event::Result(ResultEvent {
542                success: true,
543                text: "Hello!".into(),
544                session_id: "s1".into(),
545                duration_ms: Some(500),
546                total_cost_usd: None,
547                usage: None,
548                timestamp_ms: 2000,
549            }),
550        ];
551
552        let config = NormalizeConfig {
553            cwd: None,
554            model: None,
555            prompt: Some("say hello".into()),
556        };
557        let stream = normalize_stream(make_stream(events), config);
558        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
559
560        // SessionStart, Message(user), Message(assistant), Result
561        assert_eq!(collected.len(), 4, "events: {collected:?}");
562        assert!(matches!(&collected[0], Event::SessionStart(_)));
563        if let Event::Message(ref m) = collected[1] {
564            assert_eq!(m.role, Role::User);
565            assert_eq!(m.text, "say hello");
566            assert_eq!(m.timestamp_ms, 1500);
567        } else {
568            panic!("expected synthetic user Message at [1], got {:?}", collected[1]);
569        }
570        assert!(matches!(&collected[2], Event::Message(m) if m.role == Role::Assistant));
571        assert!(matches!(&collected[3], Event::Result(_)));
572    }
573
574    #[tokio::test]
575    async fn user_message_not_duplicated_when_adapter_sends_one() {
576        let events = vec![
577            Event::SessionStart(SessionStartEvent {
578                session_id: "s1".into(),
579                agent: "cursor".into(),
580                model: None,
581                cwd: None,
582                timestamp_ms: 1000,
583            }),
584            Event::Message(MessageEvent {
585                role: Role::User,
586                text: "say hello".into(),
587                usage: None,
588                timestamp_ms: 1200,
589            }),
590            Event::Message(MessageEvent {
591                role: Role::Assistant,
592                text: "Hello!".into(),
593                usage: None,
594                timestamp_ms: 1500,
595            }),
596            Event::Result(ResultEvent {
597                success: true,
598                text: "Hello!".into(),
599                session_id: "s1".into(),
600                duration_ms: Some(500),
601                total_cost_usd: None,
602                usage: None,
603                timestamp_ms: 2000,
604            }),
605        ];
606
607        let config = NormalizeConfig {
608            cwd: None,
609            model: None,
610            prompt: Some("say hello".into()),
611        };
612        let stream = normalize_stream(make_stream(events), config);
613        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
614
615        // Should NOT inject a second user message.
616        let user_messages: Vec<_> = collected
617            .iter()
618            .filter(|e| matches!(e, Event::Message(m) if m.role == Role::User))
619            .collect();
620        assert_eq!(user_messages.len(), 1, "expected exactly 1 user message, got {user_messages:?}");
621    }
622
623    #[tokio::test]
624    async fn user_message_not_injected_without_prompt() {
625        let events = vec![
626            Event::SessionStart(SessionStartEvent {
627                session_id: "s1".into(),
628                agent: "codex".into(),
629                model: None,
630                cwd: None,
631                timestamp_ms: 1000,
632            }),
633            Event::Message(MessageEvent {
634                role: Role::Assistant,
635                text: "Hello!".into(),
636                usage: None,
637                timestamp_ms: 1500,
638            }),
639        ];
640
641        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
642        let stream = normalize_stream(make_stream(events), config);
643        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
644
645        // No prompt → no user message injected.
646        assert_eq!(collected.len(), 2);
647        assert!(matches!(&collected[0], Event::SessionStart(_)));
648        assert!(matches!(&collected[1], Event::Message(m) if m.role == Role::Assistant));
649    }
650
651    #[tokio::test]
652    async fn total_cost_filled_from_accumulated_usage() {
653        let events = vec![
654            Event::UsageDelta(UsageDeltaEvent {
655                usage: UsageData {
656                    input_tokens: Some(100),
657                    output_tokens: Some(50),
658                    cache_read_tokens: None,
659                    cache_creation_tokens: None,
660                    cost_usd: Some(0.05),
661                },
662                timestamp_ms: 1000,
663            }),
664            Event::Result(ResultEvent {
665                success: true,
666                text: "done".into(),
667                session_id: "s1".into(),
668                duration_ms: Some(500),
669                total_cost_usd: None,
670                usage: None,
671                timestamp_ms: 2000,
672            }),
673        ];
674
675        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
676        let stream = normalize_stream(make_stream(events), config);
677        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
678
679        if let Event::Result(ref r) = collected[1] {
680            assert!((r.total_cost_usd.unwrap() - 0.05).abs() < 1e-10);
681        } else {
682            panic!("expected Result");
683        }
684    }
685
686    #[tokio::test]
687    async fn usage_delta_synthesized_before_result() {
688        // Stream has no UsageDelta events, but Result has usage data.
689        let events = vec![
690            Event::SessionStart(SessionStartEvent {
691                session_id: "s1".into(),
692                agent: "claude".into(),
693                model: None,
694                cwd: None,
695                timestamp_ms: 1000,
696            }),
697            Event::Result(ResultEvent {
698                success: true,
699                text: "done".into(),
700                session_id: "s1".into(),
701                duration_ms: Some(1000),
702                total_cost_usd: Some(0.01),
703                usage: Some(UsageData {
704                    input_tokens: Some(200),
705                    output_tokens: Some(100),
706                    cache_read_tokens: None,
707                    cache_creation_tokens: None,
708                    cost_usd: Some(0.01),
709                }),
710                timestamp_ms: 2000,
711            }),
712        ];
713
714        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
715        let stream = normalize_stream(make_stream(events), config);
716        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
717
718        // SessionStart, synthetic UsageDelta, Result
719        assert_eq!(collected.len(), 3, "events: {collected:?}");
720        assert!(matches!(&collected[0], Event::SessionStart(_)));
721        if let Event::UsageDelta(ref u) = collected[1] {
722            assert_eq!(u.usage.input_tokens, Some(200));
723            assert_eq!(u.usage.output_tokens, Some(100));
724        } else {
725            panic!("expected synthetic UsageDelta at [1], got {:?}", collected[1]);
726        }
727        assert!(matches!(&collected[2], Event::Result(_)));
728    }
729
730    #[tokio::test]
731    async fn no_synthetic_usage_delta_when_already_present() {
732        let events = vec![
733            Event::SessionStart(SessionStartEvent {
734                session_id: "s1".into(),
735                agent: "codex".into(),
736                model: None,
737                cwd: None,
738                timestamp_ms: 1000,
739            }),
740            Event::UsageDelta(UsageDeltaEvent {
741                usage: UsageData {
742                    input_tokens: Some(100),
743                    output_tokens: Some(50),
744                    cache_read_tokens: None,
745                    cache_creation_tokens: None,
746                    cost_usd: None,
747                },
748                timestamp_ms: 1500,
749            }),
750            Event::Result(ResultEvent {
751                success: true,
752                text: "done".into(),
753                session_id: "s1".into(),
754                duration_ms: Some(1000),
755                total_cost_usd: None,
756                usage: None,
757                timestamp_ms: 2000,
758            }),
759        ];
760
761        let config = NormalizeConfig { cwd: None, model: None, prompt: None };
762        let stream = normalize_stream(make_stream(events), config);
763        let collected: Vec<Event> = stream.map(|r| r.unwrap()).collect().await;
764
765        // Should be exactly: SessionStart, UsageDelta, Result — no extra UsageDelta.
766        let usage_deltas: Vec<_> = collected
767            .iter()
768            .filter(|e| matches!(e, Event::UsageDelta(_)))
769            .collect();
770        assert_eq!(usage_deltas.len(), 1, "expected exactly 1 UsageDelta, got {usage_deltas:?}");
771    }
772}