ai_lib_rust/client/
types.rs1use crate::client::signals::SignalsSnapshot;
2use crate::types::events::StreamingEvent;
3use crate::Result;
4use futures::stream;
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tokio::sync::{oneshot, OwnedSemaphorePermit};
9
10#[derive(Debug, Clone, Default)]
12pub struct CallStats {
13 pub model: String,
14 pub operation: String,
15 pub endpoint: String,
16 pub http_status: u16,
17 pub retry_count: u32,
18 pub duration_ms: u128,
19 pub first_event_ms: Option<u128>,
21 pub emitted_any: bool,
23 pub client_request_id: String,
25 pub upstream_request_id: Option<String>,
27 pub error_class: Option<String>,
28 pub usage: Option<serde_json::Value>,
29 pub signals: SignalsSnapshot,
31}
32
33pub struct CancelHandle {
35 sender: Option<oneshot::Sender<()>>,
36}
37
38impl CancelHandle {
39 pub fn cancel(mut self) {
40 if let Some(sender) = self.sender.take() {
41 let _ = sender.send(());
42 }
43 }
44}
45
46pub(crate) fn cancel_pair() -> (CancelHandle, oneshot::Receiver<()>) {
47 let (tx, rx) = oneshot::channel();
48 (CancelHandle { sender: Some(tx) }, rx)
49}
50
51pub(crate) struct ControlledStream {
52 pub inner: Pin<Box<dyn Stream<Item = Result<StreamingEvent>> + Send + 'static>>,
53 pub cancel_rx: Option<oneshot::Receiver<()>>,
54 pub permit: Option<OwnedSemaphorePermit>,
55 cancelled: bool,
56}
57
58impl ControlledStream {
59 pub fn new(
60 inner: Pin<Box<dyn Stream<Item = Result<StreamingEvent>> + Send + 'static>>,
61 cancel_rx: Option<oneshot::Receiver<()>>,
62 permit: Option<OwnedSemaphorePermit>,
63 ) -> Self {
64 Self {
65 inner,
66 cancel_rx,
67 permit,
68 cancelled: false,
69 }
70 }
71}
72
73impl Stream for ControlledStream {
74 type Item = Result<StreamingEvent>;
75
76 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77 if !self.cancelled {
79 if let Some(ref mut cancel_rx) = self.cancel_rx {
80 match std::future::Future::poll(Pin::new(cancel_rx), cx) {
81 Poll::Ready(Ok(())) => {
82 self.cancel_rx = None;
83 self.cancelled = true;
84 self.inner = Box::pin(stream::empty());
88 self.permit = None;
89 return Poll::Ready(Some(Ok(StreamingEvent::StreamEnd {
90 finish_reason: Some("cancelled".to_string()),
91 })));
92 }
93 Poll::Ready(Err(_)) => {
94 tracing::trace!("ControlledStream: cancel_handle sender dropped without explicit signal; ignoring.");
96 self.cancel_rx = None;
97 }
98 Poll::Pending => {}
99 }
100 }
101 }
102
103 match self.inner.as_mut().poll_next(cx) {
105 Poll::Ready(Some(Ok(StreamingEvent::StreamEnd { finish_reason }))) => {
106 self.permit = None;
108 Poll::Ready(Some(Ok(StreamingEvent::StreamEnd { finish_reason })))
109 }
110 Poll::Ready(None) => {
111 self.permit = None;
113 Poll::Ready(None)
114 }
115 other => other,
116 }
117 }
118}