localharness 0.26.0

A Rust-native agent SDK with pluggable LLM backends (Gemini today). Streaming, custom tools, safety policies, background triggers — zero external binaries.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
//! Stateful conversation session.
//!
//! Wraps a `Connection` and provides:
//!
//! * `send()` / `receive_steps()` — low-level streaming.
//! * `chat()` — one-shot: send a prompt, receive a `ChatResponse` whose
//!   stream of `StreamChunk` events terminates at the end of a turn.
//! * `history()` / `last_response()` / `cumulative_usage()` — introspection.
//!
//! `ChatResponse` is a multi-cursor lazy stream. Every call to
//! `ChatResponse::chunks()` returns a fresh cursor that replays from chunk
//! zero, in the same vein as the Python SDK's per-cursor design. The
//! upstream pull happens once; cursors share the buffered chunks.

use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures_core::Stream;
use futures_util::stream::StreamExt;
use parking_lot::Mutex;
use tokio::sync::Notify;

use crate::connections::Connection;
use crate::content::Content;
use crate::error::{Error, Result};
use crate::types::{Step, StreamChunk, ToolCall, UsageMetadata};

// =============================================================================
// Conversation
// =============================================================================

/// Stateful conversation session wrapping a [`Connection`].
///
/// Provides `chat()` for turn-level semantics and `send()` / `receive_steps()`
/// for lower-level streaming. Tracks history, usage, and structured output.
pub struct Conversation {
    connection: Arc<dyn Connection>,
    state: Arc<Mutex<ConversationState>>,
}

#[derive(Default)]
struct ConversationState {
    history: Vec<Step>,
    cumulative_usage: UsageMetadata,
    last_turn_usage: Option<UsageMetadata>,
    last_response: Option<String>,
    last_structured_output: Option<serde_json::Value>,
    turn_count: u64,
}

impl Conversation {
    /// Wrap a connection in a conversation session.
    pub fn new(connection: Arc<dyn Connection>) -> Self {
        Self {
            connection,
            state: Arc::new(Mutex::new(ConversationState::default())),
        }
    }

    /// Clone of the underlying connection handle.
    pub fn connection(&self) -> Arc<dyn Connection> {
        self.connection.clone()
    }

    /// The backend-assigned conversation identifier.
    pub fn conversation_id(&self) -> String {
        self.connection.conversation_id().to_string()
    }

    /// Cooperatively cancel the in-flight turn (e.g. a UI stop button).
    /// The backend stops at its next safe boundary and emits a terminal
    /// step. No-op when idle or on backends without cancellation support.
    pub fn cancel_turn(&self) {
        self.connection.cancel_turn();
    }

    /// All steps received so far, in order.
    pub fn history(&self) -> Vec<Step> {
        self.state.lock().history.clone()
    }

    /// Number of user turns sent in this session.
    pub fn turn_count(&self) -> u64 {
        self.state.lock().turn_count
    }

    /// Token usage accumulated across all turns.
    pub fn cumulative_usage(&self) -> UsageMetadata {
        self.state.lock().cumulative_usage.clone()
    }

    /// Token usage from the most recent turn only.
    pub fn last_turn_usage(&self) -> Option<UsageMetadata> {
        self.state.lock().last_turn_usage.clone()
    }

    /// The model's last textual response, if any.
    pub fn last_response(&self) -> Option<String> {
        self.state.lock().last_response.clone()
    }

    /// The model's last structured output (JSON), if any.
    pub fn last_structured_output(&self) -> Option<serde_json::Value> {
        self.state.lock().last_structured_output.clone()
    }

    /// Raw send: dispatches the prompt and returns once the bytes are on
    /// the wire. Use `chat()` for higher-level turn semantics.
    pub async fn send(&self, content: Content) -> Result<()> {
        {
            let mut state = self.state.lock();
            state.last_turn_usage = Some(UsageMetadata::default());
        }
        self.connection.send(content).await
    }

    /// Drains steps from the connection, accumulating into history and
    /// usage as they arrive. The stream terminates when the connection
    /// closes — callers wanting per-turn termination should use `chat()`.
    pub fn receive_steps(&self) -> crate::connections::StepStream {
        let upstream = self.connection.subscribe_steps();
        let state = self.state.clone();
        let mapped = upstream.map(move |res| {
            if let Ok(step) = &res {
                let mut s = state.lock();
                s.history.push(step.clone());
                if let Some(u) = &step.usage_metadata {
                    s.cumulative_usage.accumulate(u);
                    if let Some(turn) = s.last_turn_usage.as_mut() {
                        turn.accumulate(u);
                    } else {
                        let mut fresh = UsageMetadata::default();
                        fresh.accumulate(u);
                        s.last_turn_usage = Some(fresh);
                    }
                }
                if step.is_terminal_response() {
                    s.last_response = Some(step.content.clone());
                }
                if let Some(out) = &step.structured_output {
                    s.last_structured_output = Some(out.clone());
                }
            }
            res
        });
        // BoxStream requires Send; wasm fetch streams aren't.
        #[cfg(not(target_arch = "wasm32"))]
        {
            mapped.boxed()
        }
        #[cfg(target_arch = "wasm32")]
        {
            mapped.boxed_local()
        }
    }

    /// Sends a prompt and returns the response stream. The returned
    /// `ChatResponse` produces `StreamChunk` events until the turn ends.
    pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponse> {
        // Subscribe BEFORE sending so the producer doesn't miss the first
        // step in the rare case the harness responds before we register.
        let steps = self.receive_steps();
        {
            let mut s = self.state.lock();
            s.turn_count = s.turn_count.saturating_add(1);
        }
        self.send(content.into()).await?;
        Ok(ChatResponse::new(steps, self.state.clone()))
    }
}

// =============================================================================
// ChatResponse
// =============================================================================

/// A streaming response from a single chat turn.
///
/// Multi-cursor: each call to [`ChatResponse::chunks`] returns an independent
/// cursor that replays from chunk zero. The upstream pull happens once.
pub struct ChatResponse {
    inner: Arc<ChatInner>,
}

struct ChatInner {
    state: Mutex<ChatBuf>,
    notify: Notify,
}

struct ChatBuf {
    chunks: Vec<StreamChunk>,
    done: bool,
    error: Option<String>,
}

impl ChatResponse {
    fn new(
        mut step_stream: crate::connections::StepStream,
        conv_state: Arc<Mutex<ConversationState>>,
    ) -> Self {
        let inner = Arc::new(ChatInner {
            state: Mutex::new(ChatBuf {
                chunks: Vec::new(),
                done: false,
                error: None,
            }),
            notify: Notify::new(),
        });
        let inner_clone = inner.clone();
        crate::runtime::spawn(async move {
            let mut emitted_text = String::new();
            while let Some(step) = step_stream.next().await {
                match step {
                    Ok(step) => {
                        let mut new_chunks = step_to_chunks(&step, emitted_text.len());
                        for chunk in &new_chunks {
                            if let StreamChunk::Text { text, .. } = chunk {
                                emitted_text.push_str(text);
                            }
                        }
                        if !new_chunks.is_empty() {
                            let mut buf = inner_clone.state.lock();
                            buf.chunks.append(&mut new_chunks);
                            drop(buf);
                            inner_clone.notify.notify_waiters();
                        }
                        if step.is_terminal_response() {
                            let mut s = conv_state.lock();
                            let final_text = if !step.content.is_empty() {
                                step.content.clone()
                            } else {
                                emitted_text.clone()
                            };
                            if !final_text.is_empty() {
                                s.last_response = Some(final_text);
                            }
                            break;
                        }
                    }
                    Err(e) => {
                        let mut buf = inner_clone.state.lock();
                        buf.error = Some(e.to_string());
                        buf.done = true;
                        drop(buf);
                        inner_clone.notify.notify_waiters();
                        return;
                    }
                }
            }
            let mut buf = inner_clone.state.lock();
            buf.done = true;
            drop(buf);
            inner_clone.notify.notify_waiters();
        });

        Self { inner }
    }

    /// A fresh cursor that replays every chunk from the start. Multiple
    /// cursors can be live at once and advance independently.
    pub fn chunks(&self) -> ChatCursor {
        ChatCursor {
            inner: self.inner.clone(),
            pos: 0,
            notify: None,
        }
    }

    /// Filtered cursor that yields only conversational text deltas.
    pub fn text_stream(&self) -> futures_util::stream::BoxStream<'static, Result<String>> {
        self.chunks()
            .filter_map(|res| async move {
                match res {
                    Ok(StreamChunk::Text { text, .. }) => Some(Ok(text)),
                    Ok(_) => None,
                    Err(e) => Some(Err(e)),
                }
            })
            .boxed()
    }

    /// Filtered cursor that yields only thought (reasoning) deltas.
    pub fn thoughts(&self) -> futures_util::stream::BoxStream<'static, Result<String>> {
        self.chunks()
            .filter_map(|res| async move {
                match res {
                    Ok(StreamChunk::Thought { text, .. }) => Some(Ok(text)),
                    Ok(_) => None,
                    Err(e) => Some(Err(e)),
                }
            })
            .boxed()
    }

    /// Filtered cursor that yields strongly-typed `ToolCall`s as the model
    /// dispatches them.
    pub fn tool_calls(&self) -> futures_util::stream::BoxStream<'static, Result<ToolCall>> {
        self.chunks()
            .filter_map(|res| async move {
                match res {
                    Ok(StreamChunk::ToolCall(t)) => Some(Ok(t)),
                    Ok(_) => None,
                    Err(e) => Some(Err(e)),
                }
            })
            .boxed()
    }

    /// Drain the stream and return the full concatenated text response.
    pub async fn text(&self) -> Result<String> {
        let mut out = String::new();
        let mut cursor = self.chunks();
        while let Some(res) = cursor.next().await {
            if let StreamChunk::Text { text, .. } = res? {
                out.push_str(&text);
            }
        }
        Ok(out)
    }

    /// Drain the stream and return every chunk in order.
    pub async fn resolve(&self) -> Result<Vec<StreamChunk>> {
        let mut cursor = self.chunks();
        let mut out = Vec::new();
        while let Some(res) = cursor.next().await {
            out.push(res?);
        }
        Ok(out)
    }
}

// =============================================================================
// Cursor
// =============================================================================

/// An independent cursor over a [`ChatResponse`]'s chunk buffer.
///
/// Implements [`Stream`] of `Result<StreamChunk>`. Multiple cursors
/// can be live concurrently and advance at different rates.
pub struct ChatCursor {
    inner: Arc<ChatInner>,
    pos: usize,
    notify: Option<Pin<Box<dyn std::future::Future<Output = ()> + Send>>>,
}

enum PollDecision {
    Yield(StreamChunk),
    Done,
    Error(String),
    Park,
}

impl Stream for ChatCursor {
    type Item = Result<StreamChunk>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            // Register a waiter BEFORE inspecting the buffer. tokio `Notify`
            // only wakes waiters that already exist when `notify_waiters()`
            // fires, so the previous code — which created the `notified()`
            // future AFTER the buffer check — had a lost-wakeup window: a
            // producer append+notify landing between our check and parking could
            // be missed, hanging the cursor at the tail. Creating + polling the
            // waiter first closes that window (tokio's canonical
            // register-then-check pattern). The future is built from a
            // 'static Arc clone so it satisfies Send + 'static.
            if self.notify.is_none() {
                let inner = self.inner.clone();
                self.notify = Some(Box::pin(async move {
                    inner.notify.notified().await;
                }));
            }
            let woke = matches!(
                self.notify.as_mut().unwrap().as_mut().poll(cx),
                Poll::Ready(())
            );
            if woke {
                // Wake consumed — drop it so the next iteration registers a
                // fresh waiter before re-checking.
                self.notify = None;
            }

            let snapshot = {
                let buf = self.inner.state.lock();
                if self.pos < buf.chunks.len() {
                    PollDecision::Yield(buf.chunks[self.pos].clone())
                } else if buf.done {
                    match &buf.error {
                        Some(e) => PollDecision::Error(e.clone()),
                        None => PollDecision::Done,
                    }
                } else {
                    PollDecision::Park
                }
            };
            match snapshot {
                PollDecision::Yield(chunk) => {
                    self.pos += 1;
                    return Poll::Ready(Some(Ok(chunk)));
                }
                PollDecision::Done => return Poll::Ready(None),
                PollDecision::Error(msg) => return Poll::Ready(Some(Err(Error::other(msg)))),
                // Returning Pending only here, where the poll above left a waiter
                // registered (Pending) — so a later notify always wakes us. If we
                // just consumed a wake but found nothing new, loop to re-register.
                PollDecision::Park => {
                    if woke {
                        continue;
                    }
                    return Poll::Pending;
                }
            }
        }
    }
}

// =============================================================================
// Helpers
// =============================================================================

/// Convert one step into zero or more `StreamChunk`s. `text_emitted` is the
/// running tally of text BYTES this turn has yielded so far (the caller passes
/// `emitted_text.len()`); passing it in lets us recover the tail when a harness
/// emits the final content without preceding `content_delta`s.
fn step_to_chunks(step: &Step, text_emitted: usize) -> Vec<StreamChunk> {
    let mut out = Vec::new();
    if !step.thinking_delta.is_empty() {
        out.push(StreamChunk::Thought {
            step_index: step.step_index,
            text: step.thinking_delta.clone(),
        });
    }
    if !step.content_delta.is_empty() {
        out.push(StreamChunk::Text {
            step_index: step.step_index,
            text: step.content_delta.clone(),
        });
    } else if step.is_terminal_response() {
        // No delta was sent but `content` advanced — emit the un-emitted suffix.
        // `text_emitted` is a BYTE offset; use `str::get` so an offset that
        // doesn't land on a char boundary (a harness split a multibyte char
        // across deltas) degrades to a no-op instead of panicking on a bad
        // byte slice.
        if let Some(suffix) = step.content.get(text_emitted..) {
            if !suffix.is_empty() {
                out.push(StreamChunk::Text {
                    step_index: step.step_index,
                    text: suffix.to_string(),
                });
            }
        }
    }
    for tc in &step.tool_calls {
        out.push(StreamChunk::ToolCall(tc.clone()));
    }
    out
}

#[cfg(test)]
mod tests {
    use super::*;

    /// A turn-terminating step carrying `content` and no delta — the path that
    /// triggers tail recovery (`content.get(text_emitted..)`).
    fn terminal_step(content: &str) -> Step {
        serde_json::from_value(serde_json::json!({
            "content": content,
            "is_complete_response": true,
        }))
        .expect("valid Step json")
    }

    fn recovered_text(chunks: &[StreamChunk]) -> String {
        chunks
            .iter()
            .filter_map(|c| match c {
                StreamChunk::Text { text, .. } => Some(text.as_str()),
                _ => None,
            })
            .collect()
    }

    #[test]
    fn recovery_does_not_panic_on_non_char_boundary_offset() {
        // "héllo": 'é' is two bytes, so byte offset 2 lands MID-char. The old
        // byte slice `content[2..]` panicked; recovery must now degrade safely
        // (no panic, and no corrupt partial-char suffix).
        let step = terminal_step("héllo");
        let chunks = step_to_chunks(&step, 2);
        assert_eq!(
            recovered_text(&chunks),
            "",
            "a non-char-boundary offset must emit no (corrupt) text suffix",
        );
    }

    #[test]
    fn recovery_emits_suffix_on_valid_boundary() {
        // Byte offset 1 is a valid boundary (after 'h'); suffix is "éllo".
        let step = terminal_step("héllo");
        assert_eq!(recovered_text(&step_to_chunks(&step, 1)), "éllo");
    }

    #[test]
    fn recovery_is_noop_when_everything_emitted() {
        let step = terminal_step("hi");
        assert_eq!(recovered_text(&step_to_chunks(&step, 2)), "");
    }

    // =========================================================================
    // Multi-cursor streaming concurrency (ChatResponse + ChatCursor)
    //
    // These drive `ChatResponse::new` directly with a hand-controlled step
    // stream (no live backend). We mirror the codebase's own mock pattern:
    // a tokio mpsc whose receiver is wrapped as a `StepStream` (the backends
    // wrap a broadcast the same way via `tokio_stream::wrappers`). Sending /
    // closing the channel deterministically advances the producer; ordering is
    // driven by `yield_now` (cooperative, NOT timed) so nothing is sleep-flaky.
    // =========================================================================

    use std::sync::Arc;
    use tokio::sync::mpsc;
    use tokio_stream::wrappers::UnboundedReceiverStream;

    /// A non-terminal text delta step. `step_to_chunks` turns this into a
    /// single `StreamChunk::Text` (the streaming-delta path).
    fn delta_step(idx: u32, delta: &str) -> Step {
        serde_json::from_value(serde_json::json!({
            "step_index": idx,
            "content_delta": delta,
        }))
        .expect("valid Step json")
    }

    /// Build a `ChatResponse` fed by an mpsc sender the test controls. Steps
    /// pushed onto `tx` flow through the producer task into the shared buffer;
    /// dropping `tx` (or sending `Err`) terminates the stream. Returns the
    /// response plus the sender so the test drives the producer step-by-step.
    fn controlled_response() -> (ChatResponse, mpsc::UnboundedSender<Result<Step>>) {
        let (tx, rx) = mpsc::unbounded_channel::<Result<Step>>();
        let stream: crate::connections::StepStream =
            Box::pin(UnboundedReceiverStream::new(rx));
        let conv_state = Arc::new(Mutex::new(ConversationState::default()));
        (ChatResponse::new(stream, conv_state), tx)
    }

    fn text_of(chunk: &StreamChunk) -> Option<&str> {
        match chunk {
            StreamChunk::Text { text, .. } => Some(text.as_str()),
            _ => None,
        }
    }

    /// Drain a cursor to completion under a timeout. A timeout is a HANG guard,
    /// not a synchronization primitive — the channel is already closed before we
    /// call this, so a healthy cursor returns immediately; only a lost-wakeup
    /// bug would make it expire.
    async fn drain(cursor: &mut ChatCursor) -> Vec<StreamChunk> {
        let mut out = Vec::new();
        loop {
            let next = tokio::time::timeout(std::time::Duration::from_secs(5), cursor.next())
                .await
                .expect("cursor must not hang draining a closed stream");
            match next {
                Some(Ok(chunk)) => out.push(chunk),
                Some(Err(e)) => panic!("unexpected stream error: {e}"),
                None => break,
            }
        }
        out
    }

    /// CONTRACT: every cursor observes ALL chunks, in order — and a cursor
    /// created AFTER the response fully completed still replays from chunk zero.
    #[tokio::test]
    async fn late_cursor_replays_all_chunks_in_order() {
        let (resp, tx) = controlled_response();

        // Drive the whole turn, then close the stream so the producer finishes.
        tx.send(Ok(delta_step(0, "Hello"))).unwrap();
        tx.send(Ok(delta_step(1, ", "))).unwrap();
        tx.send(Ok(delta_step(2, "world"))).unwrap();
        tx.send(Ok(terminal_step("Hello, world!"))).unwrap();
        drop(tx);

        // First cursor drains everything (this also forces the producer to run
        // to completion since the cursor parks until `done`).
        let mut early = resp.chunks();
        let early_chunks = drain(&mut early).await;
        let early_text: String = early_chunks.iter().filter_map(text_of).collect();
        assert_eq!(
            early_text, "Hello, world!",
            "the deltas + recovered terminal tail must concatenate in order"
        );

        // A cursor born long after the turn ended must replay the SAME chunks
        // from the start — no shifted offset, no missed head.
        let mut late = resp.chunks();
        let late_chunks = drain(&mut late).await;
        assert_eq!(
            late_chunks, early_chunks,
            "a late cursor must replay the identical chunk sequence from zero"
        );
    }

    /// CONTRACT: cursors advance INDEPENDENTLY. A cursor parked at the tail
    /// while another races ahead must, once data arrives, still see every chunk
    /// from where it was — none dropped, none duplicated, order preserved. This
    /// exercises the park-on-`Notify` → wake path on the real async runtime.
    #[tokio::test]
    async fn cursors_advance_independently_without_dropping_chunks() {
        let (resp, tx) = controlled_response();

        let mut a = resp.chunks();
        let mut b = resp.chunks();

        // Both cursors are parked (empty buffer). Feed one chunk and let the
        // producer run; cursor A drains exactly that chunk while B stays parked.
        tx.send(Ok(delta_step(0, "one"))).unwrap();
        let a0 = a.next().await.expect("a yields").expect("ok");
        assert_eq!(text_of(&a0), Some("one"));

        // Feed a second chunk. A reads it; B — which was parked across BOTH
        // sends — must now replay from its own position 0, losing nothing.
        tx.send(Ok(delta_step(1, "two"))).unwrap();
        let a1 = a.next().await.expect("a yields").expect("ok");
        assert_eq!(text_of(&a1), Some("two"));

        let b0 = b.next().await.expect("b yields").expect("ok");
        let b1 = b.next().await.expect("b yields").expect("ok");
        assert_eq!(text_of(&b0), Some("one"), "parked cursor kept chunk 0");
        assert_eq!(text_of(&b1), Some("two"), "parked cursor kept chunk 1");

        // Close out: both terminate cleanly with no extra chunks.
        drop(tx);
        assert!(drain(&mut a).await.is_empty(), "a saw the full tail already");
        assert!(drain(&mut b).await.is_empty(), "b saw the full tail already");
    }

    /// CONTRACT: a mid-stream error terminates the buffer and propagates to
    /// EVERY cursor — both an in-flight one and one created after the fact.
    #[tokio::test]
    async fn error_propagates_to_every_cursor() {
        let (resp, tx) = controlled_response();

        tx.send(Ok(delta_step(0, "partial"))).unwrap();
        tx.send(Err(Error::other("upstream exploded"))).unwrap();
        drop(tx);

        // Cursor created BEFORE we read: sees the one good chunk, then the error.
        let mut a = resp.chunks();
        let first = a.next().await.expect("a yields").expect("ok");
        assert_eq!(text_of(&first), Some("partial"));
        let err = a
            .next()
            .await
            .expect("a yields again")
            .expect_err("must surface the error");
        assert!(
            err.to_string().contains("upstream exploded"),
            "the upstream message must survive, got: {err}"
        );

        // Cursor created AFTER the error landed: replays the good chunk, then
        // the SAME error. Errors aren't swallowed by buffering.
        let mut b = resp.chunks();
        let b0 = b.next().await.expect("b yields").expect("ok");
        assert_eq!(text_of(&b0), Some("partial"));
        let b_err = b
            .next()
            .await
            .expect("b yields again")
            .expect_err("late cursor must also see the error");
        assert!(b_err.to_string().contains("upstream exploded"));
    }

    /// CONTRACT: terminal completion. After the stream ends, a cursor returns
    /// the remaining buffered chunks and THEN completes (`None`) — no hang, no
    /// missed tail. Reading past completion keeps returning `None`.
    #[tokio::test]
    async fn cursor_completes_after_tail_with_no_hang() {
        let (resp, tx) = controlled_response();

        // Two deltas stream "Hi" incrementally. The terminal step carries the
        // CUMULATIVE turn content "Hi there" with no delta of its own, so the
        // producer's tail-recovery emits exactly the un-streamed suffix
        // (" there"). This mirrors a harness that sends the final whole content
        // after the streamed deltas — the very case `text_emitted` exists for.
        tx.send(Ok(delta_step(0, "Hi"))).unwrap();
        tx.send(Ok(terminal_step("Hi there"))).unwrap();
        drop(tx);

        let mut cursor = resp.chunks();
        let chunks = drain(&mut cursor).await;
        let text: String = chunks.iter().filter_map(text_of).collect();
        assert_eq!(text, "Hi there", "streamed delta then the recovered tail");

        // Past completion the cursor is permanently fused to `None`.
        let after = tokio::time::timeout(std::time::Duration::from_secs(5), cursor.next())
            .await
            .expect("polling a completed cursor must not hang");
        assert!(after.is_none(), "a completed cursor stays completed");
    }

    /// CONTRACT (concurrency stress): a cursor that is `.await`-parked at the
    /// live tail in one task must wake and receive a chunk pushed concurrently
    /// from another task. This is the lost-wakeup window — cursor decides to
    /// park, releases the lock, THEN builds its `Notify::notified()` future; if
    /// the producer fires `notify_waiters()` inside that window the cursor can
    /// miss the wake. We run on a multi-thread runtime so the cursor's park
    /// window and the producer's `notify_waiters` can genuinely interleave, and
    /// guard with a timeout so a regression shows as a failure, not a hung suite.
    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn parked_cursor_wakes_on_concurrent_push() {
        let (resp, tx) = controlled_response();
        let mut cursor = resp.chunks();

        // Park the cursor on the empty buffer in a spawned task. Awaiting
        // `next()` polls it to Pending (registered as a waiter).
        let handle = tokio::spawn(async move {
            let next = tokio::time::timeout(std::time::Duration::from_secs(5), cursor.next())
                .await
                .expect("a parked cursor must wake on a concurrent push");
            next.expect("yields").expect("ok")
        });

        // Yield so the spawned task actually reaches its parked Pending state
        // before we push (cooperative, not a timed sleep).
        tokio::task::yield_now().await;
        tokio::task::yield_now().await;

        tx.send(Ok(delta_step(0, "woke"))).unwrap();

        let chunk = handle.await.expect("task joins");
        assert_eq!(text_of(&chunk), Some("woke"));
    }

    // =========================================================================
    // Usage accounting across turns (cumulative_usage / last_turn_usage /
    // turn_count). These drive a real `Conversation` over a mock `Connection`
    // whose step stream is a tokio broadcast — exactly the shape the live
    // backends expose (`GeminiConnection::subscribe_steps` wraps a broadcast).
    // The mock mirrors the live backends' invariant that usage is reported ONLY
    // on the turn-terminal step; intermediate deltas carry `usage_metadata:
    // None`. That invariant is what makes `cumulative_usage.accumulate(u)` in
    // `receive_steps` count each turn's usage exactly once.
    // =========================================================================

    use crate::connections::{Connection, StepStream};
    use crate::content::Content;
    use crate::types::ToolResult;
    use async_trait::async_trait;
    use std::sync::atomic::{AtomicBool, Ordering};
    use tokio::sync::broadcast;

    /// Mock connection: holds a broadcast sender; the test pushes `Step`s onto
    /// it directly to simulate a turn. `subscribe_steps` hands out independent
    /// receivers (late subscribers see only steps sent after they subscribe —
    /// same as the live broadcast).
    struct MockConn {
        steps_tx: broadcast::Sender<Step>,
        idle: AtomicBool,
    }

    impl MockConn {
        fn new() -> Arc<Self> {
            let (steps_tx, _) = broadcast::channel(64);
            Arc::new(Self {
                steps_tx,
                idle: AtomicBool::new(true),
            })
        }
        /// Push one step onto the broadcast (what a live turn's producer does).
        fn push(&self, step: Step) {
            let _ = self.steps_tx.send(step);
        }
    }

    #[async_trait]
    impl Connection for MockConn {
        fn is_idle(&self) -> bool {
            self.idle.load(Ordering::Acquire)
        }
        fn conversation_id(&self) -> &str {
            "mock"
        }
        async fn send(&self, _content: Content) -> Result<()> {
            Ok(())
        }
        async fn send_trigger(&self, _content: String) -> Result<()> {
            Ok(())
        }
        async fn send_tool_results(&self, _results: Vec<ToolResult>) -> Result<()> {
            Ok(())
        }
        fn subscribe_steps(&self) -> StepStream {
            let rx = self.steps_tx.subscribe();
            Box::pin(
                tokio_stream::wrappers::BroadcastStream::new(rx)
                    .map(|r| r.map_err(|e| Error::other(format!("lag: {e}")))),
            )
        }
        async fn wait_for_idle(&self) -> Result<()> {
            Ok(())
        }
        async fn shutdown(&self) -> Result<()> {
            Ok(())
        }
    }

    /// Build a terminal step carrying both `content` and a `usage_metadata`
    /// with the given prompt / candidates / total token counts. This is the
    /// ONLY step in a turn that reports usage (mirrors the live backends).
    fn terminal_with_usage(content: &str, prompt: i32, cand: i32, total: i32) -> Step {
        serde_json::from_value(serde_json::json!({
            "content": content,
            "is_complete_response": true,
            "usage_metadata": {
                "prompt_token_count": prompt,
                "candidates_token_count": cand,
                "total_token_count": total,
            }
        }))
        .expect("valid Step json")
    }

    /// Drive one full turn through a Conversation: subscribe via `chat`, push
    /// the turn's steps, drain the response to completion (forces the producer +
    /// the `receive_steps` accumulator to run over every step).
    async fn run_turn(conv: &Conversation, conn: &MockConn, steps: Vec<Step>) {
        let resp = conv.chat("hi").await.expect("chat starts");
        for s in steps {
            conn.push(s);
        }
        // Draining forces the producer task (and thus the `receive_steps`
        // accumulator closure) to observe every step including the terminal.
        let _ = tokio::time::timeout(std::time::Duration::from_secs(5), resp.text())
            .await
            .expect("turn must terminate")
            .expect("no stream error");
    }

    /// CONTRACT: `turn_count` increments exactly once per `chat`, and
    /// `cumulative_usage` is the SUM across turns while `last_turn_usage`
    /// reflects ONLY the most recent turn (reset at each `send`). This is the
    /// core anti-double-count / anti-leak guarantee.
    #[tokio::test]
    async fn cumulative_sums_while_last_turn_resets() {
        let conn = MockConn::new();
        let conv = Conversation::new(conn.clone());

        // Turn 1: prompt 10, candidates 5, total 15.
        run_turn(
            &conv,
            &conn,
            vec![
                delta_step(0, "ans1"),
                terminal_with_usage("ans1", 10, 5, 15),
            ],
        )
        .await;
        assert_eq!(conv.turn_count(), 1);
        let cum1 = conv.cumulative_usage();
        assert_eq!(cum1.total_token_count, Some(15));
        let last1 = conv.last_turn_usage().expect("turn 1 usage");
        assert_eq!(last1.total_token_count, Some(15));

        // Turn 2: prompt 20, candidates 8, total 28.
        run_turn(
            &conv,
            &conn,
            vec![
                delta_step(0, "ans2"),
                terminal_with_usage("ans2", 20, 8, 28),
            ],
        )
        .await;
        assert_eq!(conv.turn_count(), 2, "turn_count increments once per chat");

        // Cumulative is the SUM of both turns — not the last, not double-counted.
        let cum2 = conv.cumulative_usage();
        assert_eq!(cum2.prompt_token_count, Some(30), "10 + 20");
        assert_eq!(cum2.candidates_token_count, Some(13), "5 + 8");
        assert_eq!(cum2.total_token_count, Some(43), "15 + 28");

        // last_turn_usage was RESET at turn 2's send → it is ONLY turn 2,
        // never turn 1 + turn 2 (the leak we guard against).
        let last2 = conv.last_turn_usage().expect("turn 2 usage");
        assert_eq!(last2.total_token_count, Some(28), "only turn 2, not 43");
        assert_eq!(last2.prompt_token_count, Some(20));
    }

    /// CONTRACT: the turn-terminal usage is counted EXACTLY ONCE even though a
    /// turn emits many steps (deltas + tool-call steps). A regression that
    /// accumulated usage on every step would inflate the total. We model a turn
    /// with several no-usage intermediate steps and a single usage-bearing
    /// terminal — the live backends' exact shape.
    #[tokio::test]
    async fn usage_counted_once_despite_many_steps() {
        let conn = MockConn::new();
        let conv = Conversation::new(conn.clone());
        run_turn(
            &conv,
            &conn,
            vec![
                delta_step(0, "a"),
                delta_step(1, "b"),
                delta_step(2, "c"),
                terminal_with_usage("abc", 100, 50, 150),
            ],
        )
        .await;
        assert_eq!(conv.cumulative_usage().total_token_count, Some(150));
        assert_eq!(
            conv.last_turn_usage().unwrap().total_token_count,
            Some(150),
            "the single terminal usage, counted once"
        );
    }

    /// CONTRACT: `send()` resets `last_turn_usage` to an empty (non-None) usage
    /// BEFORE the turn's steps arrive — so a turn that reports NO usage leaves
    /// `last_turn_usage` as Some(default), distinguishable from "never sent"
    /// (None), while cumulative stays untouched.
    #[tokio::test]
    async fn send_resets_last_turn_even_when_turn_reports_no_usage() {
        let conn = MockConn::new();
        let conv = Conversation::new(conn.clone());

        // A turn WITH usage, then a turn with NONE.
        run_turn(
            &conv,
            &conn,
            vec![terminal_with_usage("first", 10, 5, 15)],
        )
        .await;
        assert_eq!(conv.last_turn_usage().unwrap().total_token_count, Some(15));

        // Turn 2 reports no usage at all (terminal step without usage_metadata).
        run_turn(
            &conv,
            &conn,
            vec![terminal_step("second")],
        )
        .await;

        // last_turn_usage was reset at turn 2's send → empty, NOT turn 1's 15.
        let last = conv.last_turn_usage().expect("send set it to Some(default)");
        assert_eq!(
            last.total_token_count, None,
            "turn 2 reported nothing; last_turn must not leak turn 1's tokens"
        );
        // Cumulative is unchanged from turn 1 (turn 2 added nothing).
        assert_eq!(conv.cumulative_usage().total_token_count, Some(15));
    }

    /// CONTRACT: a raw `send()` (no `chat`) resets `last_turn_usage` to
    /// Some(default) and does NOT touch turn_count (only `chat` bumps it).
    #[tokio::test]
    async fn raw_send_resets_usage_but_not_turn_count() {
        let conn = MockConn::new();
        let conv = Conversation::new(conn.clone());
        assert_eq!(conv.turn_count(), 0);
        assert!(conv.last_turn_usage().is_none());

        conv.send(Content::text("x")).await.expect("send ok");
        assert_eq!(conv.turn_count(), 0, "raw send does not count a turn");
        assert!(
            conv.last_turn_usage().is_some(),
            "send primes last_turn_usage to Some(default)"
        );
        assert_eq!(conv.last_turn_usage().unwrap(), UsageMetadata::default());
    }
}