ai_lib_rust/client/
types.rs1use crate::client::signals::SignalsSnapshot;
2use crate::types::events::StreamingEvent;
3use crate::Result;
4use futures::stream;
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tokio::sync::{oneshot, OwnedSemaphorePermit};
9
10#[derive(Debug, Clone, Default)]
12pub struct ClientMetrics {
13 pub total_requests: u64,
14 pub successful_requests: u64,
15 pub total_tokens: u64,
16}
17
18#[derive(Debug, Clone, Default)]
20pub struct CallStats {
21 pub model: String,
22 pub operation: String,
23 pub endpoint: String,
24 pub http_status: u16,
25 pub retry_count: u32,
26 pub duration_ms: u128,
27 pub first_event_ms: Option<u128>,
29 pub emitted_any: bool,
31 pub client_request_id: String,
33 pub upstream_request_id: Option<String>,
35 pub error_class: Option<String>,
36 pub usage: Option<serde_json::Value>,
37 pub signals: SignalsSnapshot,
39}
40
41pub struct CancelHandle {
62 sender: Option<oneshot::Sender<()>>,
63}
64
65impl CancelHandle {
66 pub fn cancel(mut self) {
67 if let Some(sender) = self.sender.take() {
68 let _ = sender.send(());
69 }
70 }
71}
72
73pub(crate) fn cancel_pair() -> (CancelHandle, oneshot::Receiver<()>) {
74 let (tx, rx) = oneshot::channel();
75 (CancelHandle { sender: Some(tx) }, rx)
76}
77
78pub(crate) struct ControlledStream {
79 pub inner: Pin<Box<dyn Stream<Item = Result<StreamingEvent>> + Send + 'static>>,
80 pub cancel_rx: Option<oneshot::Receiver<()>>,
81 pub permit: Option<OwnedSemaphorePermit>,
82 cancelled: bool,
83}
84
85impl ControlledStream {
86 pub fn new(
87 inner: Pin<Box<dyn Stream<Item = Result<StreamingEvent>> + Send + 'static>>,
88 cancel_rx: Option<oneshot::Receiver<()>>,
89 permit: Option<OwnedSemaphorePermit>,
90 ) -> Self {
91 Self {
92 inner,
93 cancel_rx,
94 permit,
95 cancelled: false,
96 }
97 }
98}
99
100impl Stream for ControlledStream {
101 type Item = Result<StreamingEvent>;
102
103 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104 if !self.cancelled {
106 if let Some(ref mut cancel_rx) = self.cancel_rx {
107 match std::future::Future::poll(Pin::new(cancel_rx), cx) {
108 Poll::Ready(Ok(())) => {
109 self.cancel_rx = None;
110 self.cancelled = true;
111 self.inner = Box::pin(stream::empty());
115 self.permit = None;
116 return Poll::Ready(Some(Ok(StreamingEvent::StreamEnd {
117 finish_reason: Some("cancelled".to_string()),
118 })));
119 }
120 Poll::Ready(Err(_)) => {
121 tracing::trace!("ControlledStream: cancel_handle sender dropped without explicit signal; ignoring.");
123 self.cancel_rx = None;
124 }
125 Poll::Pending => {}
126 }
127 }
128 }
129
130 match self.inner.as_mut().poll_next(cx) {
132 Poll::Ready(Some(Ok(StreamingEvent::StreamEnd { finish_reason }))) => {
133 self.permit = None;
135 Poll::Ready(Some(Ok(StreamingEvent::StreamEnd { finish_reason })))
136 }
137 Poll::Ready(None) => {
138 self.permit = None;
140 Poll::Ready(None)
141 }
142 other => other,
143 }
144 }
145}