Skip to main content

aagt_core/trading/
strategy.rs

1//! Strategy and pipeline system for automated trading
2
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7
8use crate::error::Result;
9use crate::trading::pipeline::{self, Step, Context};
10use rust_decimal::Decimal;
11
12/// A condition that can trigger a strategy
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(tag = "type", rename_all = "snake_case")]
15pub enum Condition {
16    /// Price crosses above threshold
17    PriceAbove {
18        token: String,
19        threshold: Decimal,
20    },
21    /// Price crosses below threshold
22    PriceBelow {
23        token: String,
24        threshold: Decimal,
25    },
26    /// Price changes by percentage
27    PriceChange {
28        token: String,
29        percent: Decimal,
30        direction: PriceDirection,
31    },
32    /// Time-based trigger
33    Schedule {
34        cron: String,
35    },
36    /// Manual trigger
37    Manual,
38    /// All conditions must be true
39    And(Vec<Condition>),
40    /// Any condition must be true
41    Or(Vec<Condition>),
42}
43
44/// Direction of price change
45#[derive(Debug, Clone, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47pub enum PriceDirection {
48    Up,
49    Down,
50    Any,
51}
52
53/// An action to execute
54#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(tag = "type", rename_all = "snake_case")]
56pub enum Action {
57    /// Swap tokens
58    Swap {
59        from_token: String,
60        to_token: String,
61        amount: String, // Can be "100" or "50%" or "max"
62    },
63    /// Send notification
64    Notify {
65        channel: NotifyChannel,
66        message: String,
67    },
68    /// Wait for duration
69    Wait {
70        seconds: u64,
71    },
72    /// Cancel pipeline
73    Cancel {
74        reason: String,
75    },
76}
77
78use crate::infra::notification::NotifyChannel;
79
80/// A trading strategy/pipeline
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct Strategy {
83    /// Unique ID
84    pub id: String,
85    /// User who owns this strategy
86    pub user_id: String,
87    /// Name of the strategy
88    pub name: String,
89    /// Description
90    pub description: Option<String>,
91    /// Trigger condition
92    pub condition: Condition,
93    /// Actions to execute
94    pub actions: Vec<Action>,
95    /// Is strategy active
96    pub active: bool,
97    /// Created timestamp
98    pub created_at: i64,
99}
100
101/// Status of a pipeline execution
102#[derive(Debug, Clone, Serialize, Deserialize)]
103#[serde(rename_all = "snake_case")]
104pub enum PipelineStatus {
105    /// Waiting for trigger
106    Pending,
107    /// Currently running
108    Running,
109    /// Completed successfully
110    Completed,
111    /// Failed with error
112    Failed { error: String },
113    /// Cancelled
114    Cancelled { reason: String },
115}
116
117/// A pipeline execution record
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct Pipeline {
120    /// Execution ID
121    pub id: String,
122    /// Strategy ID
123    pub strategy_id: String,
124    /// User ID
125    pub user_id: String,
126    /// Current status
127    pub status: PipelineStatus,
128    /// Current step index
129    pub current_step: usize,
130    /// Results from each step
131    pub step_results: Vec<StepResult>,
132    /// Started at
133    pub started_at: i64,
134    /// Completed at
135    pub completed_at: Option<i64>,
136}
137
138/// Result of a pipeline step
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct StepResult {
141    /// Step index
142    pub index: usize,
143    /// Action that was executed
144    pub action: Action,
145    /// Success or failure
146    pub success: bool,
147    /// Result message
148    pub message: String,
149    /// Timestamp
150    pub timestamp: i64,
151}
152
153/// Trait for condition evaluators
154#[async_trait::async_trait]
155pub trait ConditionEvaluator: Send + Sync {
156    /// Evaluate if condition is met
157    async fn evaluate(&self, condition: &Condition) -> Result<bool>;
158}
159
160#[async_trait::async_trait]
161pub trait ActionExecutor: Send + Sync {
162    /// Execute an action
163    async fn execute(&self, action: &Action, context: &pipeline::Context) -> Result<String>;
164}
165
166/// Adapter to run a strategy Action as a pipeline Step
167pub struct ActionStep {
168    action: Action,
169    executor: Arc<dyn ActionExecutor>,
170}
171
172impl ActionStep {
173    pub fn new(action: Action, executor: Arc<dyn ActionExecutor>) -> Self {
174        Self { action, executor }
175    }
176}
177
178#[async_trait::async_trait]
179impl Step for ActionStep {
180    async fn execute(&self, ctx: &mut Context) -> anyhow::Result<()> {
181        let res = self.executor.execute(&self.action, ctx).await?;
182        ctx.log(format!("Action '{}' result: {}", self.name(), res));
183        Ok(())
184    }
185
186    fn name(&self) -> &str {
187        match &self.action {
188            Action::Swap { .. } => "swap",
189            Action::Notify { .. } => "notify",
190            Action::Wait { .. } => "wait",
191            Action::Cancel { .. } => "cancel",
192        }
193    }
194}
195
196
197
198/// Persistence for strategies
199#[async_trait::async_trait]
200pub trait StrategyStore: Send + Sync {
201    /// Load all active strategies
202    async fn load(&self) -> Result<Vec<Strategy>>;
203    /// Save a strategy (create or update)
204    async fn save(&self, strategy: &Strategy) -> Result<()>;
205    /// Delete a strategy
206    async fn delete(&self, id: &str) -> Result<()>;
207}
208
209/// Simple JSON file store for strategies (Actor-based)
210pub struct FileStrategyStore {
211    sender: tokio::sync::mpsc::Sender<StrategyCommand>,
212}
213
214enum StrategyCommand {
215    Load { reply: tokio::sync::oneshot::Sender<Result<Vec<Strategy>>> },
216    Save { strategy: Strategy, reply: tokio::sync::oneshot::Sender<Result<()>> },
217    Delete { id: String, reply: tokio::sync::oneshot::Sender<Result<()>> },
218}
219
220struct StrategyActor {
221    path: std::path::PathBuf,
222    receiver: tokio::sync::mpsc::Receiver<StrategyCommand>,
223}
224
225impl StrategyActor {
226    fn read_strategies(&self) -> Result<Vec<Strategy>> {
227        if !self.path.exists() {
228            return Ok(Vec::new());
229        }
230        
231        let file = std::fs::File::open(&self.path)
232            .map_err(|e| crate::error::Error::Internal(format!("Open error: {}", e)))?;
233            
234        // Fix: Shared Lock for Reading
235
236        file.lock_shared()
237            .map_err(|e| crate::error::Error::Internal(format!("Lock error: {}", e)))?;
238            
239        // Use a scope guard or explicit unlock? Flocking releases on close automatically.
240        // We read content while locked.
241        let buf_reader = std::io::BufReader::new(&file);
242        let strategies: Vec<Strategy> = match serde_json::from_reader(buf_reader) {
243            Ok(s) => s,
244            Err(e) => {
245                // If empty or corrupt, return empty or error?
246                // For robustness, if file is empty JSON might fail.
247                // Check metadata size?
248                if file.metadata().map(|m| m.len()).unwrap_or(0) == 0 {
249                    Vec::new()
250                } else {
251                    return Err(crate::error::Error::Internal(format!("Parse error: {}", e)));
252                }
253            }
254        };
255        
256        file.unlock().ok(); // Best effort unlock
257        
258        Ok(strategies)
259    }
260    
261    fn write_strategies(&self, strategies: &[Strategy]) -> Result<()> {
262        // Ensure parent dir exists
263        if let Some(parent) = self.path.parent() {
264            std::fs::create_dir_all(parent)
265                .map_err(|e| crate::error::Error::Internal(format!("Dir creation error: {}", e)))?;
266        }
267        
268        // Open with write access and create if missing
269        let file = std::fs::OpenOptions::new()
270            .read(true)
271            .write(true)
272            .create(true)
273            .truncate(false) // Don't truncate yet, wait for lock
274            .open(&self.path)
275            .map_err(|e| crate::error::Error::Internal(format!("File open error: {}", e)))?;
276            
277        // Fix: Exclusive Lock for Writing
278        use fs2::FileExt;
279        file.lock_exclusive()
280            .map_err(|e| crate::error::Error::Internal(format!("Lock error: {}", e)))?;
281            
282        // Truncate now that we own the lock
283        file.set_len(0)
284             .map_err(|e| crate::error::Error::Internal(format!("Truncate error: {}", e)))?;
285             
286        // Re-open/Seek to start? Or just write to this handle.
287        // File position might not be 0 after open? open options doc says it is.
288        // But better safe.
289        use std::io::Seek;
290        let mut file = file; // shadowing to mut
291        file.seek(std::io::SeekFrom::Start(0))
292            .map_err(|e| crate::error::Error::Internal(format!("Seek error: {}", e)))?;
293            
294        serde_json::to_writer_pretty(&file, strategies)
295            .map_err(|e| crate::error::Error::Internal(format!("Serialization error: {}", e)))?;
296            
297        file.sync_all()
298            .map_err(|e| crate::error::Error::Internal(format!("Sync error: {}", e)))?;
299            
300        file.unlock().ok();
301        
302        Ok(())
303    }
304    
305    fn handle_load(&self) -> Result<Vec<Strategy>> {
306        self.read_strategies()
307    }
308    
309    fn handle_save(&self, strategy: Strategy) -> Result<()> {
310        let mut strategies = self.read_strategies()?;
311        
312        // Update or insert
313        if let Some(pos) = strategies.iter().position(|s| s.id == strategy.id) {
314            strategies[pos] = strategy;
315        } else {
316            strategies.push(strategy);
317        }
318        
319        self.write_strategies(&strategies)
320    }
321    
322    fn handle_delete(&self, id: &str) -> Result<()> {
323        let mut strategies = self.read_strategies()?;
324        
325        if let Some(pos) = strategies.iter().position(|s| s.id == id) {
326            strategies.remove(pos);
327            self.write_strategies(&strategies)?;
328        }
329        
330        Ok(())
331    }
332    
333    async fn run(mut self) {
334        let path = self.path.clone();
335        
336        loop {
337            let rx = &mut self.receiver;
338            
339            match rx.recv().await {
340                Some(cmd) => {
341                    let path = path.clone();
342                    
343                    // Offload blocking I/O to blocking thread
344                    match cmd {
345                        StrategyCommand::Load { reply } => {
346                            let path_clone = path.clone();
347                            let result = tokio::task::spawn_blocking(move || {
348                                let actor = StrategyActor {
349                                    path: path_clone,
350                                    receiver: tokio::sync::mpsc::channel(1).1, // Dummy receiver
351                                };
352                                actor.handle_load()
353                            }).await;
354                            
355                            let res = match result {
356                                Ok(r) => r,
357                                Err(e) => Err(crate::error::Error::Internal(format!("Task error: {}", e))),
358                            };
359                            
360                            let _ = reply.send(res);
361                        }
362                        StrategyCommand::Save { strategy, reply } => {
363                            let path_clone = path.clone();
364                            let result = tokio::task::spawn_blocking(move || {
365                                let actor = StrategyActor {
366                                    path: path_clone,
367                                    receiver: tokio::sync::mpsc::channel(1).1,
368                                };
369                                actor.handle_save(strategy)
370                            }).await;
371                            
372                            let res = match result {
373                                Ok(r) => r,
374                                Err(e) => Err(crate::error::Error::Internal(format!("Task error: {}", e))),
375                            };
376                            
377                            let _ = reply.send(res);
378                        }
379                        StrategyCommand::Delete { id, reply } => {
380                            let path_clone = path.clone();
381                            let result = tokio::task::spawn_blocking(move || {
382                                let actor = StrategyActor {
383                                    path: path_clone,
384                                    receiver: tokio::sync::mpsc::channel(1).1,
385                                };
386                                actor.handle_delete(&id)
387                            }).await;
388                            
389                            let res = match result {
390                                Ok(r) => r,
391                                Err(e) => Err(crate::error::Error::Internal(format!("Task error: {}", e))),
392                            };
393                            
394                            let _ = reply.send(res);
395                        }
396                    }
397                }
398                None => break, // Channel closed
399            }
400        }
401    }
402}
403
404impl FileStrategyStore {
405    pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
406        let (tx, rx) = tokio::sync::mpsc::channel(100);
407        
408        let actor = StrategyActor {
409            path: path.into(),
410            receiver: rx,
411        };
412        
413        tokio::spawn(actor.run());
414        
415        Self { sender: tx }
416    }
417}
418
419#[async_trait::async_trait]
420impl StrategyStore for FileStrategyStore {
421    async fn load(&self) -> Result<Vec<Strategy>> {
422        let (tx, rx) = tokio::sync::oneshot::channel();
423        self.sender.send(StrategyCommand::Load { reply: tx })
424            .await
425            .map_err(|_| crate::error::Error::Internal("Strategy actor closed".to_string()))?;
426        
427        rx.await
428            .map_err(|_| crate::error::Error::Internal("Strategy actor dropped reply".to_string()))?
429    }
430    
431    async fn save(&self, strategy: &Strategy) -> Result<()> {
432        let (tx, rx) = tokio::sync::oneshot::channel();
433        self.sender.send(StrategyCommand::Save { 
434            strategy: strategy.clone(), 
435            reply: tx 
436        })
437            .await
438            .map_err(|_| crate::error::Error::Internal("Strategy actor closed".to_string()))?;
439        
440        rx.await
441            .map_err(|_| crate::error::Error::Internal("Strategy actor dropped reply".to_string()))?
442    }
443    
444    async fn delete(&self, id: &str) -> Result<()> {
445        let (tx, rx) = tokio::sync::oneshot::channel();
446        self.sender.send(StrategyCommand::Delete { 
447            id: id.to_string(), 
448            reply: tx 
449        })
450            .await
451            .map_err(|_| crate::error::Error::Internal("Strategy actor closed".to_string()))?;
452        
453        rx.await
454            .map_err(|_| crate::error::Error::Internal("Strategy actor dropped reply".to_string()))?
455    }
456}
457
458
459/// In-memory no-op store
460pub struct InMemoryStrategyStore;
461
462#[async_trait::async_trait]
463impl StrategyStore for InMemoryStrategyStore {
464    async fn load(&self) -> Result<Vec<Strategy>> { Ok(Vec::new()) }
465    async fn save(&self, _strategy: &Strategy) -> Result<()> { Ok(()) }
466    async fn delete(&self, _id: &str) -> Result<()> { Ok(()) }
467}
468
469/// Strategy engine for managing and executing strategies
470pub struct StrategyEngine {
471    /// Condition evaluator
472    evaluator: Arc<dyn ConditionEvaluator>,
473    /// Action executor
474    executor: Arc<dyn ActionExecutor>,
475    /// Strategy persistence
476    store: Arc<dyn StrategyStore>,
477    /// Shutdown signal receiver
478    shutdown_rx: Option<mpsc::Receiver<()>>,
479}
480
481impl StrategyEngine {
482    /// Create a new strategy engine
483    pub fn new(
484        evaluator: Arc<dyn ConditionEvaluator>,
485        executor: Arc<dyn ActionExecutor>,
486        store: Arc<dyn StrategyStore>,
487    ) -> Self {
488        Self {
489            evaluator,
490            executor,
491            store,
492            shutdown_rx: None,
493        }
494    }
495    
496    /// Create with default in-memory store (backward compatibility helpers)
497    pub fn simple(
498        evaluator: Arc<dyn ConditionEvaluator>,
499        executor: Arc<dyn ActionExecutor>,
500    ) -> Self {
501        Self::new(evaluator, executor, Arc::new(InMemoryStrategyStore))
502    }
503
504    /// Set shutdown signal channel
505    pub fn with_shutdown(mut self, rx: mpsc::Receiver<()>) -> Self {
506        self.shutdown_rx = Some(rx);
507        self
508    }
509    
510    /// Load all active strategies from store
511    pub async fn load_active_strategies(&self) -> Result<Vec<Strategy>> {
512        let strategies = self.store.load().await?;
513        Ok(strategies.into_iter().filter(|s| s.active).collect())
514    }
515    
516    /// Save/Register a strategy
517    pub async fn register_strategy(&self, strategy: Strategy) -> Result<()> {
518        self.store.save(&strategy).await
519    }
520    
521    /// Delete a strategy
522    pub async fn remove_strategy(&self, id: &str) -> Result<()> {
523        self.store.delete(id).await
524    }
525
526    /// Execute a pipeline with timeout and graceful shutdown
527    pub async fn execute_pipeline(
528        &self,
529        strategy: &Strategy,
530        pipeline_id: String,
531    ) -> Result<Pipeline> {
532        let now = chrono::Utc::now().timestamp();
533        
534        // 1. Build the generic pipeline
535        let mut generic_pipeline = pipeline::Pipeline::new(&strategy.name);
536        
537        for action in &strategy.actions {
538            let step = ActionStep::new(action.clone(), self.executor.clone());
539            generic_pipeline = generic_pipeline.add_step(step);
540        }
541
542        // 2. Prepare Context
543        let mut ctx = Context::new(format!("Strategy execution: {}", strategy.name));
544        ctx.set("user_id", strategy.user_id.clone());
545        ctx.set("strategy_id", strategy.id.clone());
546        ctx.set("pipeline_id", pipeline_id.clone());
547
548        // 3. Run (using shared logic from pipeline.rs)
549        let result_ctx = generic_pipeline.run_with_context(ctx).await
550            .map_err(|e| crate::error::Error::Internal(format!("Pipeline execution failed: {}", e)))?;
551
552        // 4. Map back to Strategy-specific Pipeline record for compatibility
553        let pipeline = Pipeline {
554            id: pipeline_id,
555            strategy_id: strategy.id.clone(),
556            user_id: strategy.user_id.clone(),
557            status: if result_ctx.aborted { 
558                PipelineStatus::Cancelled { reason: "Aborted".to_string() } 
559            } else { 
560                PipelineStatus::Completed 
561            },
562            current_step: strategy.actions.len(), // Assume finished if generic pipeline finished
563            step_results: Vec::new(), // We could populate this from result_ctx.trace if needed
564            started_at: now,
565            completed_at: Some(chrono::Utc::now().timestamp()),
566        };
567
568        Ok(pipeline)
569    }
570}