Skip to main content

langfuse/tracing/
stream_wrapper.rs

1//! Stream and iterator wrappers that collect items and finalize the span on completion.
2//!
3//! [`ObservingStream`] wraps a [`futures::Stream`] and [`ObservingIterator`] wraps
4//! a standard [`Iterator`]. Both serialize each yielded item to JSON, collect the
5//! serialized representations, and when the inner stream/iterator is exhausted,
6//! set the collected output on the associated [`LangfuseSpan`] and end it.
7
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use futures::Stream;
12use pin_project_lite::pin_project;
13use serde::Serialize;
14
15use super::span::LangfuseSpan;
16
17/// A boxed transform function that converts collected JSON strings into a single output string.
18type TransformFn = Box<dyn Fn(&[String]) -> String + Send + Sync>;
19pin_project! {
20    /// A [`Stream`] wrapper that observes each yielded item and finalizes the
21    /// associated [`LangfuseSpan`] when the stream completes.
22    ///
23    /// Each item is serialized to JSON and collected. When the inner stream
24    /// yields `None`, the collected items are set as the span output (as a JSON
25    /// array of strings) and the span is ended.
26    ///
27    /// ```ignore
28    /// use langfuse::{LangfuseSpan, ObservingStream};
29    /// use futures::StreamExt;
30    ///
31    /// let span = LangfuseSpan::start("stream-op");
32    /// let stream = ObservingStream::new(span, my_stream);
33    /// while let Some(item) = stream.next().await { /* ... */ }
34    /// ```
35    ///
36    /// # Type Parameters
37    ///
38    /// - `S`: The inner stream type. Its `Item` must implement [`Serialize`].
39    pub struct ObservingStream<S: Stream> {
40        #[pin]
41        inner: S,
42        span: Option<LangfuseSpan>,
43        collected: Vec<String>,
44        transform: Option<TransformFn>,
45    }
46}
47
48impl<S: Stream> ObservingStream<S> {
49    /// Create a new `ObservingStream` wrapping the given stream and span.
50    ///
51    /// Items yielded by the inner stream are serialized to JSON and collected.
52    /// When the stream completes, the collected items are set as the span output.
53    #[must_use]
54    pub fn new(span: LangfuseSpan, inner: S) -> Self {
55        Self {
56            inner,
57            span: Some(span),
58            collected: Vec::new(),
59            transform: None,
60        }
61    }
62
63    /// Create a new `ObservingStream` with a custom transform function.
64    ///
65    /// Instead of setting the collected JSON strings directly as output, the
66    /// transform function is called with the collected strings and its return
67    /// value is used as the span output.
68    #[must_use]
69    pub fn with_transform(
70        span: LangfuseSpan,
71        inner: S,
72        transform: impl Fn(&[String]) -> String + Send + Sync + 'static,
73    ) -> Self {
74        Self {
75            inner,
76            span: Some(span),
77            collected: Vec::new(),
78            transform: Some(Box::new(transform)),
79        }
80    }
81}
82
83impl<S: Stream> std::fmt::Debug for ObservingStream<S> {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("ObservingStream")
86            .field("collected_count", &self.collected.len())
87            .finish()
88    }
89}
90
91impl<S> Stream for ObservingStream<S>
92where
93    S: Stream,
94    S::Item: Serialize,
95{
96    type Item = S::Item;
97
98    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
99        let this = self.project();
100
101        match this.inner.poll_next(cx) {
102            Poll::Ready(Some(item)) => {
103                if let Ok(json) = serde_json::to_string(&item) {
104                    this.collected.push(json);
105                }
106                Poll::Ready(Some(item))
107            }
108            Poll::Ready(None) => {
109                if let Some(span) = this.span.take() {
110                    if let Some(transform) = this.transform.as_ref() {
111                        let output = transform(this.collected);
112                        span.set_output(&output);
113                    } else {
114                        let output = serde_json::json!(this.collected);
115                        span.set_output(&output);
116                    }
117                    span.end();
118                }
119                Poll::Ready(None)
120            }
121            Poll::Pending => Poll::Pending,
122        }
123    }
124
125    fn size_hint(&self) -> (usize, Option<usize>) {
126        self.inner.size_hint()
127    }
128}
129
130// ---------------------------------------------------------------------------
131// ObservingIterator
132// ---------------------------------------------------------------------------
133
134/// An [`Iterator`] wrapper that observes each yielded item and finalizes the
135/// associated [`LangfuseSpan`] when the iterator is exhausted.
136///
137/// Each item is serialized to JSON and collected. When the inner iterator
138/// returns `None`, the collected items are set as the span output (as a JSON
139/// array of strings) and the span is ended.
140///
141/// ```ignore
142/// let span = LangfuseSpan::start("iter-op");
143/// let iter = ObservingIterator::new(span, my_iter);
144/// for item in iter { /* ... */ }
145/// ```
146///
147/// # Type Parameters
148///
149/// - `I`: The inner iterator type. Its `Item` must implement [`Serialize`].
150pub struct ObservingIterator<I: Iterator> {
151    inner: I,
152    span: Option<LangfuseSpan>,
153    collected: Vec<String>,
154    transform: Option<TransformFn>,
155}
156
157impl<I: Iterator> ObservingIterator<I> {
158    /// Create a new `ObservingIterator` wrapping the given iterator and span.
159    #[must_use]
160    pub fn new(span: LangfuseSpan, inner: I) -> Self {
161        Self {
162            inner,
163            span: Some(span),
164            collected: Vec::new(),
165            transform: None,
166        }
167    }
168
169    /// Create a new `ObservingIterator` with a custom transform function.
170    #[must_use]
171    pub fn with_transform(
172        span: LangfuseSpan,
173        inner: I,
174        transform: impl Fn(&[String]) -> String + Send + Sync + 'static,
175    ) -> Self {
176        Self {
177            inner,
178            span: Some(span),
179            collected: Vec::new(),
180            transform: Some(Box::new(transform)),
181        }
182    }
183}
184
185impl<I: Iterator> std::fmt::Debug for ObservingIterator<I> {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        f.debug_struct("ObservingIterator")
188            .field("collected_count", &self.collected.len())
189            .finish()
190    }
191}
192
193impl<I> Iterator for ObservingIterator<I>
194where
195    I: Iterator,
196    I::Item: Serialize,
197{
198    type Item = I::Item;
199
200    fn next(&mut self) -> Option<Self::Item> {
201        match self.inner.next() {
202            Some(item) => {
203                if let Ok(json) = serde_json::to_string(&item) {
204                    self.collected.push(json);
205                }
206                Some(item)
207            }
208            None => {
209                if let Some(span) = self.span.take() {
210                    if let Some(transform) = self.transform.as_ref() {
211                        let output = transform(&self.collected);
212                        span.set_output(&output);
213                    } else {
214                        let output = serde_json::json!(self.collected);
215                        span.set_output(&output);
216                    }
217                    span.end();
218                }
219                None
220            }
221        }
222    }
223
224    fn size_hint(&self) -> (usize, Option<usize>) {
225        self.inner.size_hint()
226    }
227}