botcore/
types.rs

1use crate::error::Result;
2use async_trait::async_trait;
3use std::pin::Pin;
4use tokio_stream::Stream;
5use tokio_stream::StreamExt;
6
7/// A stream of events emitted by a [Collector].
8///
9/// This type alias represents an asynchronous stream of events that can be
10/// consumed by the engine. The stream is boxed and pinned to allow for
11/// dynamic dispatch and async iteration.
12pub type CollectorStream<'a, E> = Pin<Box<dyn Stream<Item = E> + Send + 'a>>;
13
14/// A source of events that can be processed by the engine.
15///
16/// Collectors are responsible for producing events from external sources like:
17/// - Blockchain events
18/// - API webhooks
19/// - Database changes
20/// - File system events
21/// - etc.
22///
23/// # Example
24///
25/// ```rust
26/// use botcore::types::{Collector, CollectorStream};
27/// use botcore::error::Result;
28/// use async_trait::async_trait;
29///
30/// struct BlockEvent {
31///     block_number: u64,
32/// }
33///
34/// struct BlockCollector {
35///     ws_endpoint: String,
36/// }
37///
38/// #[async_trait]
39/// impl Collector<BlockEvent> for BlockCollector {
40///     async fn get_event_stream(&self) -> Result<CollectorStream<'_, BlockEvent>> {
41///         // Implementation to stream block events
42///         # todo!()
43///     }
44/// }
45/// ```
46#[async_trait]
47pub trait Collector<E>: Send + Sync {
48    /// Returns the core event stream for the collector.
49    ///
50    /// This method should establish any necessary connections and return a stream
51    /// that will emit events of type `E`. The stream should be infinite unless
52    /// an error occurs.
53    ///
54    /// # Errors
55    ///
56    /// This method should return an error if:
57    /// - The connection to the event source fails
58    /// - Required resources are unavailable
59    /// - The stream cannot be established for any other reason
60    async fn get_event_stream(&self) -> Result<CollectorStream<'_, E>>;
61}
62
63/// A component that processes events and generates actions.
64///
65/// Strategies contain the core business logic of the bot. They receive events
66/// from collectors, process them according to some rules or algorithms, and
67/// optionally generate actions to be executed.
68///
69/// # Type Parameters
70///
71/// * `E` - The type of events this strategy can process
72/// * `A` - The type of actions this strategy can generate
73///
74/// # Example
75///
76/// ```rust
77/// use botcore::types::Strategy;
78/// use botcore::error::Result;
79/// use async_trait::async_trait;
80///
81/// struct BlockEvent {
82///     block_number: u64,
83/// }
84///
85/// struct TradeAction {
86///     amount: u64,
87/// }
88///
89/// struct TradingStrategy {
90///     min_block_interval: u64,
91///     last_trade_block: u64,
92/// }
93///
94/// #[async_trait]
95/// impl Strategy<BlockEvent, TradeAction> for TradingStrategy {
96///     async fn sync_state(&mut self) -> Result<()> {
97///         // Sync last trade block from chain
98///         # Ok(())
99///     }
100///
101///     async fn process_event(&mut self, event: BlockEvent) -> Vec<TradeAction> {
102///         if event.block_number >= self.last_trade_block + self.min_block_interval {
103///             vec![TradeAction { amount: 100 }]
104///         } else {
105///             vec![]
106///         }
107///     }
108/// }
109/// ```
110#[async_trait]
111pub trait Strategy<E, A>: Send + Sync {
112    /// Synchronizes the initial state of the strategy.
113    ///
114    /// This method is called once when the strategy is started. It should:
115    /// - Load any necessary state from storage
116    /// - Initialize connections to required services
117    /// - Perform any other one-time setup
118    ///
119    /// # Errors
120    ///
121    /// This method should return an error if the state cannot be synchronized.
122    async fn sync_state(&mut self) -> Result<()>;
123
124    /// Processes an event and optionally generates actions.
125    ///
126    /// This is the core method where the strategy's business logic lives.
127    /// It receives events one at a time and can generate zero or more actions
128    /// in response.
129    ///
130    /// # Arguments
131    ///
132    /// * `event` - The event to process
133    ///
134    /// # Returns
135    ///
136    /// A vector of actions to be executed. An empty vector means no actions
137    /// should be taken for this event.
138    async fn process_event(&mut self, event: E) -> Vec<A>;
139}
140
141/// A component that executes actions generated by strategies.
142///
143/// Executors are responsible for carrying out the actions decided upon by
144/// strategies. This might involve:
145/// - Submitting transactions
146/// - Making API calls
147/// - Writing to databases
148/// - Sending notifications
149/// - etc.
150///
151/// # Example
152///
153/// ```rust
154/// use botcore::types::Executor;
155/// use botcore::error::Result;
156/// use async_trait::async_trait;
157///
158/// struct TradeAction {
159///     amount: u64,
160/// }
161///
162/// struct TradeExecutor {
163///     api_key: String,
164/// }
165///
166/// #[async_trait]
167/// impl Executor<TradeAction> for TradeExecutor {
168///     async fn execute(&self, action: TradeAction) -> Result<()> {
169///         // Execute trade via API
170///         # Ok(())
171///     }
172/// }
173/// ```
174#[async_trait]
175pub trait Executor<A>: Send + Sync {
176    /// Executes a single action.
177    ///
178    /// This method should handle all the logic needed to execute the action,
179    /// including any retries or error handling.
180    ///
181    /// # Arguments
182    ///
183    /// * `action` - The action to execute
184    ///
185    /// # Errors
186    ///
187    /// This method should return an error if the action cannot be executed
188    /// successfully after all retries are exhausted.
189    async fn execute(&self, action: A) -> Result<()>;
190}
191
192/// A wrapper around a [Collector] that maps outgoing events to a different type.
193///
194/// This adapter allows you to transform events from one type to another as they
195/// flow through the collector. This is useful when you need to:
196/// - Convert between different event representations
197/// - Filter out unwanted events
198/// - Enrich events with additional data
199///
200/// # Type Parameters
201///
202/// * `E` - The original event type
203/// * `F` - The function type that maps events
204pub struct CollectorMap<E, F> {
205    collector: Box<dyn Collector<E>>,
206    f: F,
207}
208
209impl<E, F> CollectorMap<E, F> {
210    /// Creates a new collector map with the given collector and mapping function.
211    pub fn new(collector: Box<dyn Collector<E>>, f: F) -> Self {
212        Self { collector, f }
213    }
214}
215
216#[async_trait]
217impl<E1, E2, F> Collector<E2> for CollectorMap<E1, F>
218where
219    E1: Send + Sync + 'static,
220    E2: Send + Sync + 'static,
221    F: Fn(E1) -> E2 + Send + Sync + Clone + 'static,
222{
223    async fn get_event_stream(&self) -> Result<CollectorStream<'_, E2>> {
224        let stream = self.collector.get_event_stream().await?;
225        let f = self.f.clone();
226        let stream = stream.map(f);
227        Ok(Box::pin(stream))
228    }
229}
230
231/// A wrapper around an [Executor] that maps incoming actions to a different type.
232///
233/// This adapter allows you to transform actions from one type to another before
234/// they are executed. This is useful when you need to:
235/// - Convert between different action representations
236/// - Filter out unwanted actions
237/// - Modify actions before execution
238///
239/// # Type Parameters
240///
241/// * `A` - The original action type
242/// * `F` - The function type that maps actions
243pub struct ExecutorMap<A, F> {
244    executor: Box<dyn Executor<A>>,
245    f: F,
246}
247
248impl<A, F> ExecutorMap<A, F> {
249    /// Creates a new executor map with the given executor and mapping function.
250    pub fn new(executor: Box<dyn Executor<A>>, f: F) -> Self {
251        Self { executor, f }
252    }
253}
254
255#[async_trait]
256impl<A1, A2, F> Executor<A1> for ExecutorMap<A2, F>
257where
258    A1: Send + Sync + 'static,
259    A2: Send + Sync + 'static,
260    F: Fn(A1) -> Option<A2> + Send + Sync + Clone + 'static,
261{
262    async fn execute(&self, action: A1) -> Result<()> {
263        let action = (self.f)(action);
264        match action {
265            Some(action) => self.executor.execute(action).await,
266            None => Ok(()),
267        }
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use std::sync::Arc;
275    use tokio::sync::Mutex;
276    use tokio_stream::StreamExt;
277
278    // Test event and action types
279    #[derive(Debug, Clone, PartialEq)]
280    struct TestEvent(u64);
281
282    #[derive(Debug, Clone, PartialEq)]
283    struct TestAction(String);
284
285    // Mock collector implementation
286    struct MockCollector {
287        events: Vec<TestEvent>,
288    }
289
290    #[async_trait]
291    impl Collector<TestEvent> for MockCollector {
292        async fn get_event_stream(&self) -> Result<CollectorStream<'_, TestEvent>> {
293            let events = self.events.clone();
294            Ok(Box::pin(tokio_stream::iter(events)))
295        }
296    }
297
298    // Mock strategy implementation
299    struct MockStrategy {
300        state: Arc<Mutex<u64>>,
301    }
302
303    #[async_trait]
304    impl Strategy<TestEvent, TestAction> for MockStrategy {
305        async fn sync_state(&mut self) -> Result<()> {
306            let mut state = self.state.lock().await;
307            *state = 0;
308            Ok(())
309        }
310
311        async fn process_event(&mut self, event: TestEvent) -> Vec<TestAction> {
312            let mut state = self.state.lock().await;
313            *state += event.0;
314            vec![TestAction(format!("processed_{}", event.0))]
315        }
316    }
317
318    // Mock executor implementation
319    struct MockExecutor {
320        executed_actions: Arc<Mutex<Vec<TestAction>>>,
321    }
322
323    #[async_trait]
324    impl Executor<TestAction> for MockExecutor {
325        async fn execute(&self, action: TestAction) -> Result<()> {
326            let mut actions = self.executed_actions.lock().await;
327            actions.push(action);
328            Ok(())
329        }
330    }
331
332    #[tokio::test]
333    async fn test_collector() {
334        let collector = MockCollector {
335            events: vec![TestEvent(1), TestEvent(2), TestEvent(3)],
336        };
337
338        let mut stream = collector.get_event_stream().await.unwrap();
339        let mut events = Vec::new();
340        while let Some(event) = stream.next().await {
341            events.push(event);
342        }
343
344        assert_eq!(events, vec![TestEvent(1), TestEvent(2), TestEvent(3)]);
345    }
346
347    #[tokio::test]
348    async fn test_strategy() {
349        let state = Arc::new(Mutex::new(0));
350        let mut strategy = MockStrategy {
351            state: Arc::clone(&state),
352        };
353
354        // Test state synchronization
355        strategy.sync_state().await.unwrap();
356        assert_eq!(*state.lock().await, 0);
357
358        // Test event processing
359        let actions = strategy.process_event(TestEvent(5)).await;
360        assert_eq!(actions, vec![TestAction("processed_5".to_string())]);
361        assert_eq!(*state.lock().await, 5);
362    }
363
364    #[tokio::test]
365    async fn test_executor() {
366        let executed_actions = Arc::new(Mutex::new(Vec::new()));
367        let executor = MockExecutor {
368            executed_actions: Arc::clone(&executed_actions),
369        };
370
371        let action = TestAction("test_action".to_string());
372        executor.execute(action.clone()).await.unwrap();
373
374        let actions = executed_actions.lock().await;
375        assert_eq!(*actions, vec![action]);
376    }
377
378    #[tokio::test]
379    async fn test_collector_map() {
380        let collector = MockCollector {
381            events: vec![TestEvent(1), TestEvent(2)],
382        };
383
384        let mapped_collector = CollectorMap::new(Box::new(collector), |event: TestEvent| {
385            TestEvent(event.0 * 2)
386        });
387
388        let mut stream = mapped_collector.get_event_stream().await.unwrap();
389        let mut events = Vec::new();
390        while let Some(event) = stream.next().await {
391            events.push(event);
392        }
393
394        assert_eq!(events, vec![TestEvent(2), TestEvent(4)]);
395    }
396
397    #[tokio::test]
398    async fn test_executor_map() {
399        let executed_actions = Arc::new(Mutex::new(Vec::new()));
400        let executor = MockExecutor {
401            executed_actions: Arc::clone(&executed_actions),
402        };
403
404        let mapped_executor = ExecutorMap::new(Box::new(executor), |action: TestAction| {
405            Some(TestAction(format!("mapped_{}", action.0)))
406        });
407
408        let action = TestAction("test".to_string());
409        mapped_executor.execute(action).await.unwrap();
410
411        let actions = executed_actions.lock().await;
412        assert_eq!(*actions, vec![TestAction("mapped_test".to_string())]);
413    }
414}