langfuse/tracing/stream_wrapper.rs
1//! Stream and iterator wrappers that collect items and finalize the span on completion.
2//!
3//! [`ObservingStream`] wraps a [`futures::Stream`] and [`ObservingIterator`] wraps
4//! a standard [`Iterator`]. Both serialize each yielded item to JSON, collect the
5//! serialized representations, and when the inner stream/iterator is exhausted,
6//! set the collected output on the associated [`LangfuseSpan`] and end it.
7
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use futures::Stream;
12use pin_project_lite::pin_project;
13use serde::Serialize;
14
15use super::span::LangfuseSpan;
16
17/// A boxed transform function that converts collected JSON strings into a single output string.
18type TransformFn = Box<dyn Fn(&[String]) -> String + Send + Sync>;
19pin_project! {
20 /// A [`Stream`] wrapper that observes each yielded item and finalizes the
21 /// associated [`LangfuseSpan`] when the stream completes.
22 ///
23 /// Each item is serialized to JSON and collected. When the inner stream
24 /// yields `None`, the collected items are set as the span output (as a JSON
25 /// array of strings) and the span is ended.
26 ///
27 /// ```ignore
28 /// use langfuse::{LangfuseSpan, ObservingStream};
29 /// use futures::StreamExt;
30 ///
31 /// let span = LangfuseSpan::start("stream-op");
32 /// let stream = ObservingStream::new(span, my_stream);
33 /// while let Some(item) = stream.next().await { /* ... */ }
34 /// ```
35 ///
36 /// # Type Parameters
37 ///
38 /// - `S`: The inner stream type. Its `Item` must implement [`Serialize`].
39 pub struct ObservingStream<S: Stream> {
40 #[pin]
41 inner: S,
42 span: Option<LangfuseSpan>,
43 collected: Vec<String>,
44 transform: Option<TransformFn>,
45 }
46}
47
48impl<S: Stream> ObservingStream<S> {
49 /// Create a new `ObservingStream` wrapping the given stream and span.
50 ///
51 /// Items yielded by the inner stream are serialized to JSON and collected.
52 /// When the stream completes, the collected items are set as the span output.
53 #[must_use]
54 pub fn new(span: LangfuseSpan, inner: S) -> Self {
55 Self {
56 inner,
57 span: Some(span),
58 collected: Vec::new(),
59 transform: None,
60 }
61 }
62
63 /// Create a new `ObservingStream` with a custom transform function.
64 ///
65 /// Instead of setting the collected JSON strings directly as output, the
66 /// transform function is called with the collected strings and its return
67 /// value is used as the span output.
68 #[must_use]
69 pub fn with_transform(
70 span: LangfuseSpan,
71 inner: S,
72 transform: impl Fn(&[String]) -> String + Send + Sync + 'static,
73 ) -> Self {
74 Self {
75 inner,
76 span: Some(span),
77 collected: Vec::new(),
78 transform: Some(Box::new(transform)),
79 }
80 }
81}
82
83impl<S: Stream> std::fmt::Debug for ObservingStream<S> {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("ObservingStream")
86 .field("collected_count", &self.collected.len())
87 .finish()
88 }
89}
90
91impl<S> Stream for ObservingStream<S>
92where
93 S: Stream,
94 S::Item: Serialize,
95{
96 type Item = S::Item;
97
98 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
99 let this = self.project();
100
101 match this.inner.poll_next(cx) {
102 Poll::Ready(Some(item)) => {
103 if let Ok(json) = serde_json::to_string(&item) {
104 this.collected.push(json);
105 }
106 Poll::Ready(Some(item))
107 }
108 Poll::Ready(None) => {
109 if let Some(span) = this.span.take() {
110 if let Some(transform) = this.transform.as_ref() {
111 let output = transform(this.collected);
112 span.set_output(&output);
113 } else {
114 let output = serde_json::json!(this.collected);
115 span.set_output(&output);
116 }
117 span.end();
118 }
119 Poll::Ready(None)
120 }
121 Poll::Pending => Poll::Pending,
122 }
123 }
124
125 fn size_hint(&self) -> (usize, Option<usize>) {
126 self.inner.size_hint()
127 }
128}
129
130// ---------------------------------------------------------------------------
131// ObservingIterator
132// ---------------------------------------------------------------------------
133
134/// An [`Iterator`] wrapper that observes each yielded item and finalizes the
135/// associated [`LangfuseSpan`] when the iterator is exhausted.
136///
137/// Each item is serialized to JSON and collected. When the inner iterator
138/// returns `None`, the collected items are set as the span output (as a JSON
139/// array of strings) and the span is ended.
140///
141/// ```ignore
142/// let span = LangfuseSpan::start("iter-op");
143/// let iter = ObservingIterator::new(span, my_iter);
144/// for item in iter { /* ... */ }
145/// ```
146///
147/// # Type Parameters
148///
149/// - `I`: The inner iterator type. Its `Item` must implement [`Serialize`].
150pub struct ObservingIterator<I: Iterator> {
151 inner: I,
152 span: Option<LangfuseSpan>,
153 collected: Vec<String>,
154 transform: Option<TransformFn>,
155}
156
157impl<I: Iterator> ObservingIterator<I> {
158 /// Create a new `ObservingIterator` wrapping the given iterator and span.
159 #[must_use]
160 pub fn new(span: LangfuseSpan, inner: I) -> Self {
161 Self {
162 inner,
163 span: Some(span),
164 collected: Vec::new(),
165 transform: None,
166 }
167 }
168
169 /// Create a new `ObservingIterator` with a custom transform function.
170 #[must_use]
171 pub fn with_transform(
172 span: LangfuseSpan,
173 inner: I,
174 transform: impl Fn(&[String]) -> String + Send + Sync + 'static,
175 ) -> Self {
176 Self {
177 inner,
178 span: Some(span),
179 collected: Vec::new(),
180 transform: Some(Box::new(transform)),
181 }
182 }
183}
184
185impl<I: Iterator> std::fmt::Debug for ObservingIterator<I> {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 f.debug_struct("ObservingIterator")
188 .field("collected_count", &self.collected.len())
189 .finish()
190 }
191}
192
193impl<I> Iterator for ObservingIterator<I>
194where
195 I: Iterator,
196 I::Item: Serialize,
197{
198 type Item = I::Item;
199
200 fn next(&mut self) -> Option<Self::Item> {
201 match self.inner.next() {
202 Some(item) => {
203 if let Ok(json) = serde_json::to_string(&item) {
204 self.collected.push(json);
205 }
206 Some(item)
207 }
208 None => {
209 if let Some(span) = self.span.take() {
210 if let Some(transform) = self.transform.as_ref() {
211 let output = transform(&self.collected);
212 span.set_output(&output);
213 } else {
214 let output = serde_json::json!(self.collected);
215 span.set_output(&output);
216 }
217 span.end();
218 }
219 None
220 }
221 }
222 }
223
224 fn size_hint(&self) -> (usize, Option<usize>) {
225 self.inner.size_hint()
226 }
227}