agent_chain_core/tracers/
streaming.rs

1//! Internal tracers used for stream_log and astream events implementations.
2//!
3//! This module provides the streaming callback handler trait used for
4//! astream_events and astream_log implementations.
5//! Mirrors `langchain_core.tracers._streaming`.
6
7use std::pin::Pin;
8use uuid::Uuid;
9
10use futures::Stream;
11
12/// A trait for streaming callback handlers.
13///
14/// This is a common mixin that the callback handlers for both astream events
15/// and astream log inherit from.
16///
17/// The `tap_output_aiter` method is invoked in some contexts to produce
18/// callbacks for intermediate results.
19pub trait StreamingCallbackHandler<T>: Send + Sync {
20    /// Used for internal astream_log and astream events implementations.
21    ///
22    /// Tap the output async iterator to stream its values.
23    ///
24    /// # Arguments
25    ///
26    /// * `run_id` - The ID of the run.
27    /// * `output` - The output async iterator to tap.
28    ///
29    /// # Returns
30    ///
31    /// An async iterator that yields the same values as the input.
32    fn tap_output_aiter(
33        &self,
34        run_id: Uuid,
35        output: Pin<Box<dyn Stream<Item = T> + Send>>,
36    ) -> Pin<Box<dyn Stream<Item = T> + Send>>;
37
38    /// Used for internal astream_log and astream events implementations.
39    ///
40    /// Tap the output iterator to stream its values.
41    ///
42    /// # Arguments
43    ///
44    /// * `run_id` - The ID of the run.
45    /// * `output` - The output iterator to tap.
46    ///
47    /// # Returns
48    ///
49    /// An iterator that yields the same values as the input.
50    fn tap_output_iter(
51        &self,
52        run_id: Uuid,
53        output: Box<dyn Iterator<Item = T> + Send>,
54    ) -> Box<dyn Iterator<Item = T> + Send>;
55}
56
57/// Default implementation that passes through without modification.
58pub struct PassthroughStreamingHandler;
59
60impl<T: Send + 'static> StreamingCallbackHandler<T> for PassthroughStreamingHandler {
61    fn tap_output_aiter(
62        &self,
63        _run_id: Uuid,
64        output: Pin<Box<dyn Stream<Item = T> + Send>>,
65    ) -> Pin<Box<dyn Stream<Item = T> + Send>> {
66        output
67    }
68
69    fn tap_output_iter(
70        &self,
71        _run_id: Uuid,
72        output: Box<dyn Iterator<Item = T> + Send>,
73    ) -> Box<dyn Iterator<Item = T> + Send> {
74        output
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use futures::StreamExt;
82    use futures::stream;
83
84    #[tokio::test]
85    async fn test_passthrough_streaming_handler() {
86        let handler = PassthroughStreamingHandler;
87        let run_id = Uuid::new_v4();
88
89        let input_stream = stream::iter(vec![1, 2, 3]);
90        let boxed_stream: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(input_stream);
91
92        let output_stream = handler.tap_output_aiter(run_id, boxed_stream);
93        let result: Vec<i32> = output_stream.collect().await;
94
95        assert_eq!(result, vec![1, 2, 3]);
96    }
97
98    #[test]
99    fn test_passthrough_iter_handler() {
100        let handler = PassthroughStreamingHandler;
101        let run_id = Uuid::new_v4();
102
103        let input_iter = vec![1, 2, 3].into_iter();
104        let boxed_iter: Box<dyn Iterator<Item = i32> + Send> = Box::new(input_iter);
105
106        let output_iter = handler.tap_output_iter(run_id, boxed_iter);
107        let result: Vec<i32> = output_iter.collect();
108
109        assert_eq!(result, vec![1, 2, 3]);
110    }
111}