rust_agent/core/
runnable.rs1use std::collections::HashMap;
3use std::pin::Pin;
4use futures::stream::Stream;
5use serde_json::Value;
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::ReceiverStream;
8
9pub trait Runnable<I: Send + 'static, O: Send + 'static>: Send + Sync {
11 fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>>;
13
14 fn invoke_with_config(
16 &self,
17 input: I,
18 _config: Option<HashMap<String, Value>>
19 ) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>> {
20 self.invoke(input)
21 }
22
23 fn batch(&self, inputs: Vec<I>) -> Pin<Box<dyn std::future::Future<Output = Vec<Result<O, anyhow::Error>>> + Send>> {
25 let self_clone = self.clone_to_owned();
26 Box::pin(async move {
27 futures::future::join_all(inputs.into_iter().map(|input| {
29 let self_clone_inner = self_clone.clone_to_owned();
30 async move {
31 self_clone_inner.invoke(input).await
32 }
33 })).await
34 })
35 }
36
37 fn batch_with_config(
40 &self,
41 inputs: Vec<I>,
42 _config: Option<HashMap<String, Value>>
43 ) -> Pin<Box<dyn std::future::Future<Output = Vec<Result<O, anyhow::Error>>> + Send>> {
44 self.batch(inputs)
46 }
47
48 fn stream(&self, input: I) -> Box<dyn Stream<Item = Result<O, anyhow::Error>> + Send> {
50 let self_clone = self.clone_to_owned();
52 let (tx, rx) = tokio::sync::mpsc::channel::<Result<O, anyhow::Error>>(1);
53
54 tokio::spawn(async move {
56 let result = self_clone.invoke(input).await;
57 let _ = tx.send(result).await;
58 });
59
60 Box::new(tokio_stream::wrappers::ReceiverStream::new(rx))
62 }
63
64 fn astream(
66 &self,
67 _input: I
68 ) -> Pin<Box<dyn std::future::Future<Output = Box<dyn Stream<Item = Result<O, anyhow::Error>> + Send>> + Send>> {
69 let _self_clone = self.clone_to_owned();
70
71 Box::pin(async move {
72 let (_tx, rx) = mpsc::channel(10);
74 let stream: Box<dyn Stream<Item = Result<O, anyhow::Error>> + Send> = Box::new(ReceiverStream::new(rx));
76 stream
77 })
78 }
79
80 fn clone_to_owned(&self) -> Box<dyn Runnable<I, O> + Send + Sync>;
82}
83
84pub trait RunnableExt<I: Send + 'static, O: Send + 'static> {
86 fn pipe<NextO: Send + 'static>(
87 self: Box<Self>,
88 next: impl Runnable<O, NextO> + Send + Sync + 'static
89 ) -> impl Runnable<I, NextO> + Send + Sync
90 where
91 Self: Sized + 'static + Send + Sync;
92}
93
94impl<T: Runnable<I, O> + ?Sized, I: Send + 'static, O: Send + 'static> RunnableExt<I, O> for T {
96 fn pipe<NextO: Send + 'static>(
97 self: Box<Self>,
98 next: impl Runnable<O, NextO> + Send + Sync + 'static
99 ) -> impl Runnable<I, NextO> + Send + Sync
100 where
101 Self: Sized + 'static + Send + Sync,
102 {
103 pipe(*self, next)
105 }
106}
107
108pub fn pipe<I: Send + 'static, O1: Send + 'static, O2: Send + 'static>(
110 first: impl Runnable<I, O1> + Send + Sync + 'static,
111 second: impl Runnable<O1, O2> + Send + Sync + 'static
112) -> Box<dyn Runnable<I, O2> + Send + Sync> {
113 struct PipeImpl<I: Send + 'static, O1: Send + 'static, O2: Send + 'static> {
116 first: Box<dyn Runnable<I, O1> + Send + Sync>,
117 second: Box<dyn Runnable<O1, O2> + Send + Sync>,
118 }
119
120 impl<I: Send + 'static, O1: Send + 'static, O2: Send + 'static> Runnable<I, O2> for PipeImpl<I, O1, O2> {
121 fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O2, anyhow::Error>> + Send>> {
122 let first_clone = self.first.clone_to_owned();
123 let second_clone = self.second.clone_to_owned();
124
125 Box::pin(async move {
126 let intermediate = first_clone.invoke(input).await?;
127 second_clone.invoke(intermediate).await
128 })
129 }
130
131 fn clone_to_owned(&self) -> Box<dyn Runnable<I, O2> + Send + Sync> {
132 Box::new(PipeImpl {
134 first: self.first.clone_to_owned(),
135 second: self.second.clone_to_owned(),
136 })
137 }
138 }
139
140 Box::new(PipeImpl {
143 first: Box::new(first),
144 second: Box::new(second),
145 })
146}
147
148pub struct RunnableSequence<I, O> {
150 inner: Box<dyn Runnable<I, O> + Send + Sync>,
157}
158
159impl<I: Send + 'static, O: Send + 'static> RunnableSequence<I, O> {
161 pub fn new(runnable: impl Runnable<I, O> + Send + Sync + 'static) -> Self {
162 Self {
164 inner: Box::new(runnable),
165 }
166 }
167}
168
169impl<I: 'static + Send, O: 'static + Send> Runnable<I, O> for RunnableSequence<I, O> {
171 fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>> {
172 let inner = self.inner.clone_to_owned();
173 inner.invoke(input)
174 }
175
176 fn clone_to_owned(&self) -> Box<dyn Runnable<I, O> + Send + Sync> {
177 Box::new(RunnableSequence {
178 inner: self.inner.clone_to_owned(),
179 })
180 }
181}
182
183impl<I: Send + 'static, O: Send + 'static> Runnable<I, O> for Box<dyn Runnable<I, O> + Send + Sync> {
185 fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>> {
186 let self_clone = self.clone_to_owned();
187 Box::pin(async move {
188 self_clone.invoke(input).await
189 })
190 }
191
192 fn clone_to_owned(&self) -> Box<dyn Runnable<I, O> + Send + Sync> {
193 (**self).clone_to_owned()
194 }
195}