1use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7
8use crate::error::Result;
9use crate::trading::pipeline::{self, Step, Context};
10use rust_decimal::Decimal;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(tag = "type", rename_all = "snake_case")]
15pub enum Condition {
16 PriceAbove {
18 token: String,
19 threshold: Decimal,
20 },
21 PriceBelow {
23 token: String,
24 threshold: Decimal,
25 },
26 PriceChange {
28 token: String,
29 percent: Decimal,
30 direction: PriceDirection,
31 },
32 Schedule {
34 cron: String,
35 },
36 Manual,
38 And(Vec<Condition>),
40 Or(Vec<Condition>),
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47pub enum PriceDirection {
48 Up,
49 Down,
50 Any,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(tag = "type", rename_all = "snake_case")]
56pub enum Action {
57 Swap {
59 from_token: String,
60 to_token: String,
61 amount: String, },
63 Notify {
65 channel: NotifyChannel,
66 message: String,
67 },
68 Wait {
70 seconds: u64,
71 },
72 Cancel {
74 reason: String,
75 },
76}
77
78use crate::infra::notification::NotifyChannel;
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct Strategy {
83 pub id: String,
85 pub user_id: String,
87 pub name: String,
89 pub description: Option<String>,
91 pub condition: Condition,
93 pub actions: Vec<Action>,
95 pub active: bool,
97 pub created_at: i64,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103#[serde(rename_all = "snake_case")]
104pub enum PipelineStatus {
105 Pending,
107 Running,
109 Completed,
111 Failed { error: String },
113 Cancelled { reason: String },
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct Pipeline {
120 pub id: String,
122 pub strategy_id: String,
124 pub user_id: String,
126 pub status: PipelineStatus,
128 pub current_step: usize,
130 pub step_results: Vec<StepResult>,
132 pub started_at: i64,
134 pub completed_at: Option<i64>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct StepResult {
141 pub index: usize,
143 pub action: Action,
145 pub success: bool,
147 pub message: String,
149 pub timestamp: i64,
151}
152
153#[async_trait::async_trait]
155pub trait ConditionEvaluator: Send + Sync {
156 async fn evaluate(&self, condition: &Condition) -> Result<bool>;
158}
159
160#[async_trait::async_trait]
161pub trait ActionExecutor: Send + Sync {
162 async fn execute(&self, action: &Action, context: &pipeline::Context) -> Result<String>;
164}
165
166pub struct ActionStep {
168 action: Action,
169 executor: Arc<dyn ActionExecutor>,
170}
171
172impl ActionStep {
173 pub fn new(action: Action, executor: Arc<dyn ActionExecutor>) -> Self {
174 Self { action, executor }
175 }
176}
177
178#[async_trait::async_trait]
179impl Step for ActionStep {
180 async fn execute(&self, ctx: &mut Context) -> anyhow::Result<()> {
181 let res = self.executor.execute(&self.action, ctx).await?;
182 ctx.log(format!("Action '{}' result: {}", self.name(), res));
183 Ok(())
184 }
185
186 fn name(&self) -> &str {
187 match &self.action {
188 Action::Swap { .. } => "swap",
189 Action::Notify { .. } => "notify",
190 Action::Wait { .. } => "wait",
191 Action::Cancel { .. } => "cancel",
192 }
193 }
194}
195
196
197
198#[async_trait::async_trait]
200pub trait StrategyStore: Send + Sync {
201 async fn load(&self) -> Result<Vec<Strategy>>;
203 async fn save(&self, strategy: &Strategy) -> Result<()>;
205 async fn delete(&self, id: &str) -> Result<()>;
207}
208
209pub struct FileStrategyStore {
211 sender: tokio::sync::mpsc::Sender<StrategyCommand>,
212}
213
214enum StrategyCommand {
215 Load { reply: tokio::sync::oneshot::Sender<Result<Vec<Strategy>>> },
216 Save { strategy: Strategy, reply: tokio::sync::oneshot::Sender<Result<()>> },
217 Delete { id: String, reply: tokio::sync::oneshot::Sender<Result<()>> },
218}
219
220struct StrategyActor {
221 path: std::path::PathBuf,
222 receiver: tokio::sync::mpsc::Receiver<StrategyCommand>,
223}
224
225impl StrategyActor {
226 fn read_strategies(&self) -> Result<Vec<Strategy>> {
227 if !self.path.exists() {
228 return Ok(Vec::new());
229 }
230
231 let file = std::fs::File::open(&self.path)
232 .map_err(|e| crate::error::Error::Internal(format!("Open error: {}", e)))?;
233
234 file.lock_shared()
237 .map_err(|e| crate::error::Error::Internal(format!("Lock error: {}", e)))?;
238
239 let buf_reader = std::io::BufReader::new(&file);
242 let strategies: Vec<Strategy> = match serde_json::from_reader(buf_reader) {
243 Ok(s) => s,
244 Err(e) => {
245 if file.metadata().map(|m| m.len()).unwrap_or(0) == 0 {
249 Vec::new()
250 } else {
251 return Err(crate::error::Error::Internal(format!("Parse error: {}", e)));
252 }
253 }
254 };
255
256 file.unlock().ok(); Ok(strategies)
259 }
260
261 fn write_strategies(&self, strategies: &[Strategy]) -> Result<()> {
262 if let Some(parent) = self.path.parent() {
264 std::fs::create_dir_all(parent)
265 .map_err(|e| crate::error::Error::Internal(format!("Dir creation error: {}", e)))?;
266 }
267
268 let file = std::fs::OpenOptions::new()
270 .read(true)
271 .write(true)
272 .create(true)
273 .truncate(false) .open(&self.path)
275 .map_err(|e| crate::error::Error::Internal(format!("File open error: {}", e)))?;
276
277 use fs2::FileExt;
279 file.lock_exclusive()
280 .map_err(|e| crate::error::Error::Internal(format!("Lock error: {}", e)))?;
281
282 file.set_len(0)
284 .map_err(|e| crate::error::Error::Internal(format!("Truncate error: {}", e)))?;
285
286 use std::io::Seek;
290 let mut file = file; file.seek(std::io::SeekFrom::Start(0))
292 .map_err(|e| crate::error::Error::Internal(format!("Seek error: {}", e)))?;
293
294 serde_json::to_writer_pretty(&file, strategies)
295 .map_err(|e| crate::error::Error::Internal(format!("Serialization error: {}", e)))?;
296
297 file.sync_all()
298 .map_err(|e| crate::error::Error::Internal(format!("Sync error: {}", e)))?;
299
300 file.unlock().ok();
301
302 Ok(())
303 }
304
305 fn handle_load(&self) -> Result<Vec<Strategy>> {
306 self.read_strategies()
307 }
308
309 fn handle_save(&self, strategy: Strategy) -> Result<()> {
310 let mut strategies = self.read_strategies()?;
311
312 if let Some(pos) = strategies.iter().position(|s| s.id == strategy.id) {
314 strategies[pos] = strategy;
315 } else {
316 strategies.push(strategy);
317 }
318
319 self.write_strategies(&strategies)
320 }
321
322 fn handle_delete(&self, id: &str) -> Result<()> {
323 let mut strategies = self.read_strategies()?;
324
325 if let Some(pos) = strategies.iter().position(|s| s.id == id) {
326 strategies.remove(pos);
327 self.write_strategies(&strategies)?;
328 }
329
330 Ok(())
331 }
332
333 async fn run(mut self) {
334 let path = self.path.clone();
335
336 loop {
337 let rx = &mut self.receiver;
338
339 match rx.recv().await {
340 Some(cmd) => {
341 let path = path.clone();
342
343 match cmd {
345 StrategyCommand::Load { reply } => {
346 let path_clone = path.clone();
347 let result = tokio::task::spawn_blocking(move || {
348 let actor = StrategyActor {
349 path: path_clone,
350 receiver: tokio::sync::mpsc::channel(1).1, };
352 actor.handle_load()
353 }).await;
354
355 let res = match result {
356 Ok(r) => r,
357 Err(e) => Err(crate::error::Error::Internal(format!("Task error: {}", e))),
358 };
359
360 let _ = reply.send(res);
361 }
362 StrategyCommand::Save { strategy, reply } => {
363 let path_clone = path.clone();
364 let result = tokio::task::spawn_blocking(move || {
365 let actor = StrategyActor {
366 path: path_clone,
367 receiver: tokio::sync::mpsc::channel(1).1,
368 };
369 actor.handle_save(strategy)
370 }).await;
371
372 let res = match result {
373 Ok(r) => r,
374 Err(e) => Err(crate::error::Error::Internal(format!("Task error: {}", e))),
375 };
376
377 let _ = reply.send(res);
378 }
379 StrategyCommand::Delete { id, reply } => {
380 let path_clone = path.clone();
381 let result = tokio::task::spawn_blocking(move || {
382 let actor = StrategyActor {
383 path: path_clone,
384 receiver: tokio::sync::mpsc::channel(1).1,
385 };
386 actor.handle_delete(&id)
387 }).await;
388
389 let res = match result {
390 Ok(r) => r,
391 Err(e) => Err(crate::error::Error::Internal(format!("Task error: {}", e))),
392 };
393
394 let _ = reply.send(res);
395 }
396 }
397 }
398 None => break, }
400 }
401 }
402}
403
404impl FileStrategyStore {
405 pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
406 let (tx, rx) = tokio::sync::mpsc::channel(100);
407
408 let actor = StrategyActor {
409 path: path.into(),
410 receiver: rx,
411 };
412
413 tokio::spawn(actor.run());
414
415 Self { sender: tx }
416 }
417}
418
419#[async_trait::async_trait]
420impl StrategyStore for FileStrategyStore {
421 async fn load(&self) -> Result<Vec<Strategy>> {
422 let (tx, rx) = tokio::sync::oneshot::channel();
423 self.sender.send(StrategyCommand::Load { reply: tx })
424 .await
425 .map_err(|_| crate::error::Error::Internal("Strategy actor closed".to_string()))?;
426
427 rx.await
428 .map_err(|_| crate::error::Error::Internal("Strategy actor dropped reply".to_string()))?
429 }
430
431 async fn save(&self, strategy: &Strategy) -> Result<()> {
432 let (tx, rx) = tokio::sync::oneshot::channel();
433 self.sender.send(StrategyCommand::Save {
434 strategy: strategy.clone(),
435 reply: tx
436 })
437 .await
438 .map_err(|_| crate::error::Error::Internal("Strategy actor closed".to_string()))?;
439
440 rx.await
441 .map_err(|_| crate::error::Error::Internal("Strategy actor dropped reply".to_string()))?
442 }
443
444 async fn delete(&self, id: &str) -> Result<()> {
445 let (tx, rx) = tokio::sync::oneshot::channel();
446 self.sender.send(StrategyCommand::Delete {
447 id: id.to_string(),
448 reply: tx
449 })
450 .await
451 .map_err(|_| crate::error::Error::Internal("Strategy actor closed".to_string()))?;
452
453 rx.await
454 .map_err(|_| crate::error::Error::Internal("Strategy actor dropped reply".to_string()))?
455 }
456}
457
458
459pub struct InMemoryStrategyStore;
461
462#[async_trait::async_trait]
463impl StrategyStore for InMemoryStrategyStore {
464 async fn load(&self) -> Result<Vec<Strategy>> { Ok(Vec::new()) }
465 async fn save(&self, _strategy: &Strategy) -> Result<()> { Ok(()) }
466 async fn delete(&self, _id: &str) -> Result<()> { Ok(()) }
467}
468
469pub struct StrategyEngine {
471 evaluator: Arc<dyn ConditionEvaluator>,
473 executor: Arc<dyn ActionExecutor>,
475 store: Arc<dyn StrategyStore>,
477 shutdown_rx: Option<mpsc::Receiver<()>>,
479}
480
481impl StrategyEngine {
482 pub fn new(
484 evaluator: Arc<dyn ConditionEvaluator>,
485 executor: Arc<dyn ActionExecutor>,
486 store: Arc<dyn StrategyStore>,
487 ) -> Self {
488 Self {
489 evaluator,
490 executor,
491 store,
492 shutdown_rx: None,
493 }
494 }
495
496 pub fn simple(
498 evaluator: Arc<dyn ConditionEvaluator>,
499 executor: Arc<dyn ActionExecutor>,
500 ) -> Self {
501 Self::new(evaluator, executor, Arc::new(InMemoryStrategyStore))
502 }
503
504 pub fn with_shutdown(mut self, rx: mpsc::Receiver<()>) -> Self {
506 self.shutdown_rx = Some(rx);
507 self
508 }
509
510 pub async fn load_active_strategies(&self) -> Result<Vec<Strategy>> {
512 let strategies = self.store.load().await?;
513 Ok(strategies.into_iter().filter(|s| s.active).collect())
514 }
515
516 pub async fn register_strategy(&self, strategy: Strategy) -> Result<()> {
518 self.store.save(&strategy).await
519 }
520
521 pub async fn remove_strategy(&self, id: &str) -> Result<()> {
523 self.store.delete(id).await
524 }
525
526 pub async fn execute_pipeline(
528 &self,
529 strategy: &Strategy,
530 pipeline_id: String,
531 ) -> Result<Pipeline> {
532 let now = chrono::Utc::now().timestamp();
533
534 let mut generic_pipeline = pipeline::Pipeline::new(&strategy.name);
536
537 for action in &strategy.actions {
538 let step = ActionStep::new(action.clone(), self.executor.clone());
539 generic_pipeline = generic_pipeline.add_step(step);
540 }
541
542 let mut ctx = Context::new(format!("Strategy execution: {}", strategy.name));
544 ctx.set("user_id", strategy.user_id.clone());
545 ctx.set("strategy_id", strategy.id.clone());
546 ctx.set("pipeline_id", pipeline_id.clone());
547
548 let result_ctx = generic_pipeline.run_with_context(ctx).await
550 .map_err(|e| crate::error::Error::Internal(format!("Pipeline execution failed: {}", e)))?;
551
552 let pipeline = Pipeline {
554 id: pipeline_id,
555 strategy_id: strategy.id.clone(),
556 user_id: strategy.user_id.clone(),
557 status: if result_ctx.aborted {
558 PipelineStatus::Cancelled { reason: "Aborted".to_string() }
559 } else {
560 PipelineStatus::Completed
561 },
562 current_step: strategy.actions.len(), step_results: Vec::new(), started_at: now,
565 completed_at: Some(chrono::Utc::now().timestamp()),
566 };
567
568 Ok(pipeline)
569 }
570}