Skip to main content

ai_lib_rust/client/
types.rs

1use crate::client::signals::SignalsSnapshot;
2use crate::types::events::StreamingEvent;
3use crate::Result;
4use futures::stream;
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tokio::sync::{oneshot, OwnedSemaphorePermit};
9
10/// Snapshot of cumulative client metrics for monitoring and routing.
11#[derive(Debug, Clone, Default)]
12pub struct ClientMetrics {
13    pub total_requests: u64,
14    pub successful_requests: u64,
15    pub total_tokens: u64,
16}
17
18/// Per-call statistics for observability and model selection.
19#[derive(Debug, Clone, Default)]
20pub struct CallStats {
21    pub model: String,
22    pub operation: String,
23    pub endpoint: String,
24    pub http_status: u16,
25    pub retry_count: u32,
26    pub duration_ms: u128,
27    /// For streaming calls: time from request start to first emitted event (best-effort).
28    pub first_event_ms: Option<u128>,
29    /// For streaming calls: whether any event was emitted to the caller.
30    pub emitted_any: bool,
31    /// Always present; generated by the runtime for linkage.
32    pub client_request_id: String,
33    /// Best-effort request identifier from provider/edge headers (if available).
34    pub upstream_request_id: Option<String>,
35    pub error_class: Option<String>,
36    pub usage: Option<serde_json::Value>,
37    /// Snapshot of runtime signals captured for this call.
38    pub signals: SignalsSnapshot,
39}
40
41/// Handle to cancel an in-flight streaming request.
42///
43/// Call [`cancel`](Self::cancel) to signal the stream to stop. The stream will emit
44/// a final `StreamEnd { finish_reason: Some("cancelled") }` and then terminate.
45///
46/// # Example
47///
48/// ```ignore
49/// let (stream, cancel_handle) = client.chat()
50///     .messages(msgs)
51///     .stream()
52///     .execute_stream_with_cancel()
53///     .await?;
54///
55/// // Cancel from another task or on user action
56/// tokio::spawn(async move {
57///     tokio::time::sleep(Duration::from_secs(5)).await;
58///     cancel_handle.cancel();
59/// });
60/// ```
61pub struct CancelHandle {
62    sender: Option<oneshot::Sender<()>>,
63}
64
65impl CancelHandle {
66    pub fn cancel(mut self) {
67        if let Some(sender) = self.sender.take() {
68            let _ = sender.send(());
69        }
70    }
71}
72
73pub(crate) fn cancel_pair() -> (CancelHandle, oneshot::Receiver<()>) {
74    let (tx, rx) = oneshot::channel();
75    (CancelHandle { sender: Some(tx) }, rx)
76}
77
78pub(crate) struct ControlledStream {
79    pub inner: Pin<Box<dyn Stream<Item = Result<StreamingEvent>> + Send + 'static>>,
80    pub cancel_rx: Option<oneshot::Receiver<()>>,
81    pub permit: Option<OwnedSemaphorePermit>,
82    cancelled: bool,
83}
84
85impl ControlledStream {
86    pub fn new(
87        inner: Pin<Box<dyn Stream<Item = Result<StreamingEvent>> + Send + 'static>>,
88        cancel_rx: Option<oneshot::Receiver<()>>,
89        permit: Option<OwnedSemaphorePermit>,
90    ) -> Self {
91        Self {
92            inner,
93            cancel_rx,
94            permit,
95            cancelled: false,
96        }
97    }
98}
99
100impl Stream for ControlledStream {
101    type Item = Result<StreamingEvent>;
102
103    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104        // 1) Check cancellation (emit StreamEnd once, then terminate)
105        if !self.cancelled {
106            if let Some(ref mut cancel_rx) = self.cancel_rx {
107                match std::future::Future::poll(Pin::new(cancel_rx), cx) {
108                    Poll::Ready(Ok(())) => {
109                        self.cancel_rx = None;
110                        self.cancelled = true;
111                        // Strong cancellation:
112                        // - Drop the inner stream to stop network reads ASAP.
113                        // - Release backpressure permit immediately.
114                        self.inner = Box::pin(stream::empty());
115                        self.permit = None;
116                        return Poll::Ready(Some(Ok(StreamingEvent::StreamEnd {
117                            finish_reason: Some("cancelled".to_string()),
118                        })));
119                    }
120                    Poll::Ready(Err(_)) => {
121                        // Sender was dropped without sending; ignore it but log trace for debugging.
122                        tracing::trace!("ControlledStream: cancel_handle sender dropped without explicit signal; ignoring.");
123                        self.cancel_rx = None;
124                    }
125                    Poll::Pending => {}
126                }
127            }
128        }
129
130        // 2) Poll inner stream
131        match self.inner.as_mut().poll_next(cx) {
132            Poll::Ready(Some(Ok(StreamingEvent::StreamEnd { finish_reason }))) => {
133                // Stream finished normally; release permit early.
134                self.permit = None;
135                Poll::Ready(Some(Ok(StreamingEvent::StreamEnd { finish_reason })))
136            }
137            Poll::Ready(None) => {
138                // Underlying stream ended; release permit early.
139                self.permit = None;
140                Poll::Ready(None)
141            }
142            other => other,
143        }
144    }
145}