agent_chain_core/tracers/
streaming.rs1use std::pin::Pin;
8use uuid::Uuid;
9
10use futures::Stream;
11
12pub trait StreamingCallbackHandler<T>: Send + Sync {
20 fn tap_output_aiter(
33 &self,
34 run_id: Uuid,
35 output: Pin<Box<dyn Stream<Item = T> + Send>>,
36 ) -> Pin<Box<dyn Stream<Item = T> + Send>>;
37
38 fn tap_output_iter(
51 &self,
52 run_id: Uuid,
53 output: Box<dyn Iterator<Item = T> + Send>,
54 ) -> Box<dyn Iterator<Item = T> + Send>;
55}
56
57pub struct PassthroughStreamingHandler;
59
60impl<T: Send + 'static> StreamingCallbackHandler<T> for PassthroughStreamingHandler {
61 fn tap_output_aiter(
62 &self,
63 _run_id: Uuid,
64 output: Pin<Box<dyn Stream<Item = T> + Send>>,
65 ) -> Pin<Box<dyn Stream<Item = T> + Send>> {
66 output
67 }
68
69 fn tap_output_iter(
70 &self,
71 _run_id: Uuid,
72 output: Box<dyn Iterator<Item = T> + Send>,
73 ) -> Box<dyn Iterator<Item = T> + Send> {
74 output
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81 use futures::StreamExt;
82 use futures::stream;
83
84 #[tokio::test]
85 async fn test_passthrough_streaming_handler() {
86 let handler = PassthroughStreamingHandler;
87 let run_id = Uuid::new_v4();
88
89 let input_stream = stream::iter(vec![1, 2, 3]);
90 let boxed_stream: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(input_stream);
91
92 let output_stream = handler.tap_output_aiter(run_id, boxed_stream);
93 let result: Vec<i32> = output_stream.collect().await;
94
95 assert_eq!(result, vec![1, 2, 3]);
96 }
97
98 #[test]
99 fn test_passthrough_iter_handler() {
100 let handler = PassthroughStreamingHandler;
101 let run_id = Uuid::new_v4();
102
103 let input_iter = vec![1, 2, 3].into_iter();
104 let boxed_iter: Box<dyn Iterator<Item = i32> + Send> = Box::new(input_iter);
105
106 let output_iter = handler.tap_output_iter(run_id, boxed_iter);
107 let result: Vec<i32> = output_iter.collect();
108
109 assert_eq!(result, vec![1, 2, 3]);
110 }
111}