botcore/types.rs
1use crate::error::Result;
2use async_trait::async_trait;
3use std::pin::Pin;
4use tokio_stream::Stream;
5use tokio_stream::StreamExt;
6
7/// A stream of events emitted by a [Collector].
8///
9/// This type alias represents an asynchronous stream of events that can be
10/// consumed by the engine. The stream is boxed and pinned to allow for
11/// dynamic dispatch and async iteration.
12pub type CollectorStream<'a, E> = Pin<Box<dyn Stream<Item = E> + Send + 'a>>;
13
14/// A source of events that can be processed by the engine.
15///
16/// Collectors are responsible for producing events from external sources like:
17/// - Blockchain events
18/// - API webhooks
19/// - Database changes
20/// - File system events
21/// - etc.
22///
23/// # Example
24///
25/// ```rust
26/// use botcore::types::{Collector, CollectorStream};
27/// use botcore::error::Result;
28/// use async_trait::async_trait;
29///
30/// struct BlockEvent {
31/// block_number: u64,
32/// }
33///
34/// struct BlockCollector {
35/// ws_endpoint: String,
36/// }
37///
38/// #[async_trait]
39/// impl Collector<BlockEvent> for BlockCollector {
40/// async fn get_event_stream(&self) -> Result<CollectorStream<'_, BlockEvent>> {
41/// // Implementation to stream block events
42/// # todo!()
43/// }
44/// }
45/// ```
46#[async_trait]
47pub trait Collector<E>: Send + Sync {
48 /// Returns the core event stream for the collector.
49 ///
50 /// This method should establish any necessary connections and return a stream
51 /// that will emit events of type `E`. The stream should be infinite unless
52 /// an error occurs.
53 ///
54 /// # Errors
55 ///
56 /// This method should return an error if:
57 /// - The connection to the event source fails
58 /// - Required resources are unavailable
59 /// - The stream cannot be established for any other reason
60 async fn get_event_stream(&self) -> Result<CollectorStream<'_, E>>;
61}
62
63/// A component that processes events and generates actions.
64///
65/// Strategies contain the core business logic of the bot. They receive events
66/// from collectors, process them according to some rules or algorithms, and
67/// optionally generate actions to be executed.
68///
69/// # Type Parameters
70///
71/// * `E` - The type of events this strategy can process
72/// * `A` - The type of actions this strategy can generate
73///
74/// # Example
75///
76/// ```rust
77/// use botcore::types::Strategy;
78/// use botcore::error::Result;
79/// use async_trait::async_trait;
80///
81/// struct BlockEvent {
82/// block_number: u64,
83/// }
84///
85/// struct TradeAction {
86/// amount: u64,
87/// }
88///
89/// struct TradingStrategy {
90/// min_block_interval: u64,
91/// last_trade_block: u64,
92/// }
93///
94/// #[async_trait]
95/// impl Strategy<BlockEvent, TradeAction> for TradingStrategy {
96/// async fn sync_state(&mut self) -> Result<()> {
97/// // Sync last trade block from chain
98/// # Ok(())
99/// }
100///
101/// async fn process_event(&mut self, event: BlockEvent) -> Vec<TradeAction> {
102/// if event.block_number >= self.last_trade_block + self.min_block_interval {
103/// vec![TradeAction { amount: 100 }]
104/// } else {
105/// vec![]
106/// }
107/// }
108/// }
109/// ```
110#[async_trait]
111pub trait Strategy<E, A>: Send + Sync {
112 /// Synchronizes the initial state of the strategy.
113 ///
114 /// This method is called once when the strategy is started. It should:
115 /// - Load any necessary state from storage
116 /// - Initialize connections to required services
117 /// - Perform any other one-time setup
118 ///
119 /// # Errors
120 ///
121 /// This method should return an error if the state cannot be synchronized.
122 async fn sync_state(&mut self) -> Result<()>;
123
124 /// Processes an event and optionally generates actions.
125 ///
126 /// This is the core method where the strategy's business logic lives.
127 /// It receives events one at a time and can generate zero or more actions
128 /// in response.
129 ///
130 /// # Arguments
131 ///
132 /// * `event` - The event to process
133 ///
134 /// # Returns
135 ///
136 /// A vector of actions to be executed. An empty vector means no actions
137 /// should be taken for this event.
138 async fn process_event(&mut self, event: E) -> Vec<A>;
139}
140
141/// A component that executes actions generated by strategies.
142///
143/// Executors are responsible for carrying out the actions decided upon by
144/// strategies. This might involve:
145/// - Submitting transactions
146/// - Making API calls
147/// - Writing to databases
148/// - Sending notifications
149/// - etc.
150///
151/// # Example
152///
153/// ```rust
154/// use botcore::types::Executor;
155/// use botcore::error::Result;
156/// use async_trait::async_trait;
157///
158/// struct TradeAction {
159/// amount: u64,
160/// }
161///
162/// struct TradeExecutor {
163/// api_key: String,
164/// }
165///
166/// #[async_trait]
167/// impl Executor<TradeAction> for TradeExecutor {
168/// async fn execute(&self, action: TradeAction) -> Result<()> {
169/// // Execute trade via API
170/// # Ok(())
171/// }
172/// }
173/// ```
174#[async_trait]
175pub trait Executor<A>: Send + Sync {
176 /// Executes a single action.
177 ///
178 /// This method should handle all the logic needed to execute the action,
179 /// including any retries or error handling.
180 ///
181 /// # Arguments
182 ///
183 /// * `action` - The action to execute
184 ///
185 /// # Errors
186 ///
187 /// This method should return an error if the action cannot be executed
188 /// successfully after all retries are exhausted.
189 async fn execute(&self, action: A) -> Result<()>;
190}
191
192/// A wrapper around a [Collector] that maps outgoing events to a different type.
193///
194/// This adapter allows you to transform events from one type to another as they
195/// flow through the collector. This is useful when you need to:
196/// - Convert between different event representations
197/// - Filter out unwanted events
198/// - Enrich events with additional data
199///
200/// # Type Parameters
201///
202/// * `E` - The original event type
203/// * `F` - The function type that maps events
204pub struct CollectorMap<E, F> {
205 collector: Box<dyn Collector<E>>,
206 f: F,
207}
208
209impl<E, F> CollectorMap<E, F> {
210 /// Creates a new collector map with the given collector and mapping function.
211 pub fn new(collector: Box<dyn Collector<E>>, f: F) -> Self {
212 Self { collector, f }
213 }
214}
215
216#[async_trait]
217impl<E1, E2, F> Collector<E2> for CollectorMap<E1, F>
218where
219 E1: Send + Sync + 'static,
220 E2: Send + Sync + 'static,
221 F: Fn(E1) -> E2 + Send + Sync + Clone + 'static,
222{
223 async fn get_event_stream(&self) -> Result<CollectorStream<'_, E2>> {
224 let stream = self.collector.get_event_stream().await?;
225 let f = self.f.clone();
226 let stream = stream.map(f);
227 Ok(Box::pin(stream))
228 }
229}
230
231/// A wrapper around an [Executor] that maps incoming actions to a different type.
232///
233/// This adapter allows you to transform actions from one type to another before
234/// they are executed. This is useful when you need to:
235/// - Convert between different action representations
236/// - Filter out unwanted actions
237/// - Modify actions before execution
238///
239/// # Type Parameters
240///
241/// * `A` - The original action type
242/// * `F` - The function type that maps actions
243pub struct ExecutorMap<A, F> {
244 executor: Box<dyn Executor<A>>,
245 f: F,
246}
247
248impl<A, F> ExecutorMap<A, F> {
249 /// Creates a new executor map with the given executor and mapping function.
250 pub fn new(executor: Box<dyn Executor<A>>, f: F) -> Self {
251 Self { executor, f }
252 }
253}
254
255#[async_trait]
256impl<A1, A2, F> Executor<A1> for ExecutorMap<A2, F>
257where
258 A1: Send + Sync + 'static,
259 A2: Send + Sync + 'static,
260 F: Fn(A1) -> Option<A2> + Send + Sync + Clone + 'static,
261{
262 async fn execute(&self, action: A1) -> Result<()> {
263 let action = (self.f)(action);
264 match action {
265 Some(action) => self.executor.execute(action).await,
266 None => Ok(()),
267 }
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use std::sync::Arc;
275 use tokio::sync::Mutex;
276 use tokio_stream::StreamExt;
277
278 // Test event and action types
279 #[derive(Debug, Clone, PartialEq)]
280 struct TestEvent(u64);
281
282 #[derive(Debug, Clone, PartialEq)]
283 struct TestAction(String);
284
285 // Mock collector implementation
286 struct MockCollector {
287 events: Vec<TestEvent>,
288 }
289
290 #[async_trait]
291 impl Collector<TestEvent> for MockCollector {
292 async fn get_event_stream(&self) -> Result<CollectorStream<'_, TestEvent>> {
293 let events = self.events.clone();
294 Ok(Box::pin(tokio_stream::iter(events)))
295 }
296 }
297
298 // Mock strategy implementation
299 struct MockStrategy {
300 state: Arc<Mutex<u64>>,
301 }
302
303 #[async_trait]
304 impl Strategy<TestEvent, TestAction> for MockStrategy {
305 async fn sync_state(&mut self) -> Result<()> {
306 let mut state = self.state.lock().await;
307 *state = 0;
308 Ok(())
309 }
310
311 async fn process_event(&mut self, event: TestEvent) -> Vec<TestAction> {
312 let mut state = self.state.lock().await;
313 *state += event.0;
314 vec![TestAction(format!("processed_{}", event.0))]
315 }
316 }
317
318 // Mock executor implementation
319 struct MockExecutor {
320 executed_actions: Arc<Mutex<Vec<TestAction>>>,
321 }
322
323 #[async_trait]
324 impl Executor<TestAction> for MockExecutor {
325 async fn execute(&self, action: TestAction) -> Result<()> {
326 let mut actions = self.executed_actions.lock().await;
327 actions.push(action);
328 Ok(())
329 }
330 }
331
332 #[tokio::test]
333 async fn test_collector() {
334 let collector = MockCollector {
335 events: vec![TestEvent(1), TestEvent(2), TestEvent(3)],
336 };
337
338 let mut stream = collector.get_event_stream().await.unwrap();
339 let mut events = Vec::new();
340 while let Some(event) = stream.next().await {
341 events.push(event);
342 }
343
344 assert_eq!(events, vec![TestEvent(1), TestEvent(2), TestEvent(3)]);
345 }
346
347 #[tokio::test]
348 async fn test_strategy() {
349 let state = Arc::new(Mutex::new(0));
350 let mut strategy = MockStrategy {
351 state: Arc::clone(&state),
352 };
353
354 // Test state synchronization
355 strategy.sync_state().await.unwrap();
356 assert_eq!(*state.lock().await, 0);
357
358 // Test event processing
359 let actions = strategy.process_event(TestEvent(5)).await;
360 assert_eq!(actions, vec![TestAction("processed_5".to_string())]);
361 assert_eq!(*state.lock().await, 5);
362 }
363
364 #[tokio::test]
365 async fn test_executor() {
366 let executed_actions = Arc::new(Mutex::new(Vec::new()));
367 let executor = MockExecutor {
368 executed_actions: Arc::clone(&executed_actions),
369 };
370
371 let action = TestAction("test_action".to_string());
372 executor.execute(action.clone()).await.unwrap();
373
374 let actions = executed_actions.lock().await;
375 assert_eq!(*actions, vec![action]);
376 }
377
378 #[tokio::test]
379 async fn test_collector_map() {
380 let collector = MockCollector {
381 events: vec![TestEvent(1), TestEvent(2)],
382 };
383
384 let mapped_collector = CollectorMap::new(Box::new(collector), |event: TestEvent| {
385 TestEvent(event.0 * 2)
386 });
387
388 let mut stream = mapped_collector.get_event_stream().await.unwrap();
389 let mut events = Vec::new();
390 while let Some(event) = stream.next().await {
391 events.push(event);
392 }
393
394 assert_eq!(events, vec![TestEvent(2), TestEvent(4)]);
395 }
396
397 #[tokio::test]
398 async fn test_executor_map() {
399 let executed_actions = Arc::new(Mutex::new(Vec::new()));
400 let executor = MockExecutor {
401 executed_actions: Arc::clone(&executed_actions),
402 };
403
404 let mapped_executor = ExecutorMap::new(Box::new(executor), |action: TestAction| {
405 Some(TestAction(format!("mapped_{}", action.0)))
406 });
407
408 let action = TestAction("test".to_string());
409 mapped_executor.execute(action).await.unwrap();
410
411 let actions = executed_actions.lock().await;
412 assert_eq!(*actions, vec![TestAction("mapped_test".to_string())]);
413 }
414}