rust_agent/core/
runnable.rs

1// Runnable interface definition - core concept of the framework
2use std::collections::HashMap;
3use std::pin::Pin;
4use futures::stream::Stream;
5use serde_json::Value;
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::ReceiverStream;
8
9// Runnable interface definition
10pub trait Runnable<I: Send + 'static, O: Send + 'static>: Send + Sync {
11    // Core async call method (main entry point)
12    fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>>;
13    
14    // Call variant with configuration - optional implementation
15    fn invoke_with_config(
16        &self, 
17        input: I, 
18        _config: Option<HashMap<String, Value>>
19    ) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>> {
20        self.invoke(input)
21    }
22    
23    // Async batch processing for multiple inputs
24    fn batch(&self, inputs: Vec<I>) -> Pin<Box<dyn std::future::Future<Output = Vec<Result<O, anyhow::Error>>> + Send>> {
25        let self_clone = self.clone_to_owned();
26        Box::pin(async move {
27            // Provide default implementation, specific components can override for optimization
28            futures::future::join_all(inputs.into_iter().map(|input| {
29                let self_clone_inner = self_clone.clone_to_owned();
30                async move {
31                    self_clone_inner.invoke(input).await
32                }
33            })).await
34        })
35    }
36    
37    // Variant of batch processing - optional implementation
38    // Temporarily simplified implementation to avoid complex async composition issues
39    fn batch_with_config(
40        &self, 
41        inputs: Vec<I>, 
42        _config: Option<HashMap<String, Value>>
43    ) -> Pin<Box<dyn std::future::Future<Output = Vec<Result<O, anyhow::Error>>> + Send>> {
44        // Directly call the batch method
45        self.batch(inputs)
46    }
47    
48    // Stream processing interface - synchronous implementation
49    fn stream(&self, input: I) -> Box<dyn Stream<Item = Result<O, anyhow::Error>> + Send> {
50        // Simple implementation: use futures' stream::once to wrap a single result
51        let self_clone = self.clone_to_owned();
52        let (tx, rx) = tokio::sync::mpsc::channel::<Result<O, anyhow::Error>>(1);
53        
54        // Execute invoke in a separate task and send the result
55        tokio::spawn(async move {
56            let result = self_clone.invoke(input).await;
57            let _ = tx.send(result).await;
58        });
59        
60        // Convert mpsc receiver to Stream
61        Box::new(tokio_stream::wrappers::ReceiverStream::new(rx))
62    }
63    
64    // Async stream processing - optional implementation
65    fn astream(
66        &self, 
67        _input: I
68    ) -> Pin<Box<dyn std::future::Future<Output = Box<dyn Stream<Item = Result<O, anyhow::Error>> + Send>> + Send>> {
69        let _self_clone = self.clone_to_owned();
70        
71        Box::pin(async move {
72            // Simple implementation: return an empty stream
73            let (_tx, rx) = mpsc::channel(10);
74            // Create and return an empty stream, add explicit type conversion
75            let stream: Box<dyn Stream<Item = Result<O, anyhow::Error>> + Send> = Box::new(ReceiverStream::new(rx));
76            stream
77        })
78    }
79    
80    // Helper method for astream default implementation, needs to be provided when implementing
81    fn clone_to_owned(&self) -> Box<dyn Runnable<I, O> + Send + Sync>;
82}
83
84// Runnable extension trait
85pub trait RunnableExt<I: Send + 'static, O: Send + 'static> {
86    fn pipe<NextO: Send + 'static>(
87        self: Box<Self>,
88        next: impl Runnable<O, NextO> + Send + Sync + 'static
89    ) -> impl Runnable<I, NextO> + Send + Sync
90    where
91        Self: Sized + 'static + Send + Sync;
92}
93
94// Provide extension methods for Runnable
95impl<T: Runnable<I, O> + ?Sized, I: Send + 'static, O: Send + 'static> RunnableExt<I, O> for T {
96    fn pipe<NextO: Send + 'static>(
97        self: Box<Self>,
98        next: impl Runnable<O, NextO> + Send + Sync + 'static
99    ) -> impl Runnable<I, NextO> + Send + Sync
100    where
101        Self: Sized + 'static + Send + Sync,
102    {
103        // Call the pipe function to combine two Runnables
104        pipe(*self, next)
105    }
106}
107
108// Utility function: create a pipeline connecting two Runnables
109pub fn pipe<I: Send + 'static, O1: Send + 'static, O2: Send + 'static>(
110    first: impl Runnable<I, O1> + Send + Sync + 'static,
111    second: impl Runnable<O1, O2> + Send + Sync + 'static
112) -> Box<dyn Runnable<I, O2> + Send + Sync> {
113    // Implement composition logic: create a struct that implements Runnable
114    // Wrap two components and execute them in sequence
115    struct PipeImpl<I: Send + 'static, O1: Send + 'static, O2: Send + 'static> {
116        first: Box<dyn Runnable<I, O1> + Send + Sync>,
117        second: Box<dyn Runnable<O1, O2> + Send + Sync>,
118    }
119    
120    impl<I: Send + 'static, O1: Send + 'static, O2: Send + 'static> Runnable<I, O2> for PipeImpl<I, O1, O2> {
121        fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O2, anyhow::Error>> + Send>> {
122            let first_clone = self.first.clone_to_owned();
123            let second_clone = self.second.clone_to_owned();
124            
125            Box::pin(async move {
126                let intermediate = first_clone.invoke(input).await?;
127                second_clone.invoke(intermediate).await
128            })
129        }
130        
131        fn clone_to_owned(&self) -> Box<dyn Runnable<I, O2> + Send + Sync> {
132            // Note: this implementation assumes components can be cloned, actual implementation may need adjustment
133            Box::new(PipeImpl {
134                first: self.first.clone_to_owned(),
135                second: self.second.clone_to_owned(),
136            })
137        }
138    }
139    
140    // Send and Sync will be automatically derived because internal fields are already Send + Sync
141    
142    Box::new(PipeImpl {
143        first: Box::new(first),
144        second: Box::new(second),
145    })
146}
147
148// RunnableSequence struct
149pub struct RunnableSequence<I, O> {
150    // In actual implementation, this needs to store the various components in the chain
151    // For example: for a simple two-component chain
152    // first: Box<dyn Runnable<I, O1> + Send + Sync>,
153    // second: Box<dyn Runnable<O1, O> + Send + Sync>,
154    
155    // Actual implementation may be more complex, depending on the supported chain length
156    inner: Box<dyn Runnable<I, O> + Send + Sync>,
157}
158
159// Helper methods for RunnableSequence
160impl<I: Send + 'static, O: Send + 'static> RunnableSequence<I, O> {
161    pub fn new(runnable: impl Runnable<I, O> + Send + Sync + 'static) -> Self {
162        // In actual implementation, need to store runnable in the struct
163        Self {
164            inner: Box::new(runnable),
165        }
166    }
167}
168
169// Implement Runnable interface for RunnableSequence
170impl<I: 'static + Send, O: 'static + Send> Runnable<I, O> for RunnableSequence<I, O> {
171    fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>> {
172        let inner = self.inner.clone_to_owned();
173        inner.invoke(input)
174    }
175    
176    fn clone_to_owned(&self) -> Box<dyn Runnable<I, O> + Send + Sync> {
177        Box::new(RunnableSequence {
178            inner: self.inner.clone_to_owned(),
179        })
180    }
181}
182
183// Example implementation of clone_to_owned method for Box<dyn Runnable>
184impl<I: Send + 'static, O: Send + 'static> Runnable<I, O> for Box<dyn Runnable<I, O> + Send + Sync> {
185    fn invoke(&self, input: I) -> Pin<Box<dyn std::future::Future<Output = Result<O, anyhow::Error>> + Send>> {
186        let self_clone = self.clone_to_owned();
187        Box::pin(async move {
188            self_clone.invoke(input).await
189        })
190    }
191    
192    fn clone_to_owned(&self) -> Box<dyn Runnable<I, O> + Send + Sync> {
193        (**self).clone_to_owned()
194    }
195}