Skip to main content

ai_lib_rust/client/
types.rs

1use crate::client::signals::SignalsSnapshot;
2use crate::types::events::StreamingEvent;
3use crate::Result;
4use futures::stream;
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tokio::sync::{oneshot, OwnedSemaphorePermit};
9
10/// Per-call statistics for observability and model selection.
11#[derive(Debug, Clone, Default)]
12pub struct CallStats {
13    pub model: String,
14    pub operation: String,
15    pub endpoint: String,
16    pub http_status: u16,
17    pub retry_count: u32,
18    pub duration_ms: u128,
19    /// For streaming calls: time from request start to first emitted event (best-effort).
20    pub first_event_ms: Option<u128>,
21    /// For streaming calls: whether any event was emitted to the caller.
22    pub emitted_any: bool,
23    /// Always present; generated by the runtime for linkage.
24    pub client_request_id: String,
25    /// Best-effort request identifier from provider/edge headers (if available).
26    pub upstream_request_id: Option<String>,
27    pub error_class: Option<String>,
28    pub usage: Option<serde_json::Value>,
29    /// Snapshot of runtime signals captured for this call.
30    pub signals: SignalsSnapshot,
31}
32
33/// Streaming response cancel handle.
34pub struct CancelHandle {
35    sender: Option<oneshot::Sender<()>>,
36}
37
38impl CancelHandle {
39    pub fn cancel(mut self) {
40        if let Some(sender) = self.sender.take() {
41            let _ = sender.send(());
42        }
43    }
44}
45
46pub(crate) fn cancel_pair() -> (CancelHandle, oneshot::Receiver<()>) {
47    let (tx, rx) = oneshot::channel();
48    (CancelHandle { sender: Some(tx) }, rx)
49}
50
51pub(crate) struct ControlledStream {
52    pub inner: Pin<Box<dyn Stream<Item = Result<StreamingEvent>> + Send + 'static>>,
53    pub cancel_rx: Option<oneshot::Receiver<()>>,
54    pub permit: Option<OwnedSemaphorePermit>,
55    cancelled: bool,
56}
57
58impl ControlledStream {
59    pub fn new(
60        inner: Pin<Box<dyn Stream<Item = Result<StreamingEvent>> + Send + 'static>>,
61        cancel_rx: Option<oneshot::Receiver<()>>,
62        permit: Option<OwnedSemaphorePermit>,
63    ) -> Self {
64        Self {
65            inner,
66            cancel_rx,
67            permit,
68            cancelled: false,
69        }
70    }
71}
72
73impl Stream for ControlledStream {
74    type Item = Result<StreamingEvent>;
75
76    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77        // 1) Check cancellation (emit StreamEnd once, then terminate)
78        if !self.cancelled {
79            if let Some(ref mut cancel_rx) = self.cancel_rx {
80                match std::future::Future::poll(Pin::new(cancel_rx), cx) {
81                    Poll::Ready(Ok(())) => {
82                        self.cancel_rx = None;
83                        self.cancelled = true;
84                        // Strong cancellation:
85                        // - Drop the inner stream to stop network reads ASAP.
86                        // - Release backpressure permit immediately.
87                        self.inner = Box::pin(stream::empty());
88                        self.permit = None;
89                        return Poll::Ready(Some(Ok(StreamingEvent::StreamEnd {
90                            finish_reason: Some("cancelled".to_string()),
91                        })));
92                    }
93                    Poll::Ready(Err(_)) => {
94                        // Sender was dropped without sending; ignore it but log trace for debugging.
95                        tracing::trace!("ControlledStream: cancel_handle sender dropped without explicit signal; ignoring.");
96                        self.cancel_rx = None;
97                    }
98                    Poll::Pending => {}
99                }
100            }
101        }
102
103        // 2) Poll inner stream
104        match self.inner.as_mut().poll_next(cx) {
105            Poll::Ready(Some(Ok(StreamingEvent::StreamEnd { finish_reason }))) => {
106                // Stream finished normally; release permit early.
107                self.permit = None;
108                Poll::Ready(Some(Ok(StreamingEvent::StreamEnd { finish_reason })))
109            }
110            Poll::Ready(None) => {
111                // Underlying stream ended; release permit early.
112                self.permit = None;
113                Poll::Ready(None)
114            }
115            other => other,
116        }
117    }
118}