1use crate::common::error::AppError;
2use crate::core::block_listener::BlockListener;
3use crate::core::executor::{BundleSender, SharedBundleSender};
4use crate::core::mempool::MempoolScanner;
5use crate::core::nonce::NonceManager;
6use crate::core::portfolio::PortfolioManager;
7use crate::core::safety::SafetyGuard;
8use crate::core::simulation::Simulator;
9use crate::core::strategy::{StrategyExecutor, StrategyStats};
10use crate::network::mev_share::MevShareClient;
11use crate::data::db::Database;
12use crate::network::gas::GasOracle;
13use crate::network::price_feed::PriceFeed;
14use crate::network::provider::{HttpProvider, WsProvider};
15use alloy::signers::local::PrivateKeySigner;
16use alloy::primitives::Address;
17use std::collections::HashSet;
18use std::sync::Arc;
19use tokio::sync::{broadcast, mpsc};
20
21pub struct Engine {
22 http_provider: HttpProvider,
23 ws_provider: WsProvider,
24 db: Database,
25 nonce_manager: NonceManager,
26 portfolio: Arc<PortfolioManager>,
27 safety_guard: Arc<SafetyGuard>,
28 dry_run: bool,
29 gas_oracle: GasOracle,
30 price_feed: PriceFeed,
31 chain_id: u64,
32 relay_url: String,
33 bundle_signer: PrivateKeySigner,
34 max_gas_price_gwei: u64,
35 simulator: Simulator,
36 metrics_port: u16,
37 strategy_enabled: bool,
38 slippage_bps: u64,
39 router_allowlist: HashSet<Address>,
40 wrapped_native: Address,
41 mev_share_stream_url: String,
42 mev_share_history_limit: u32,
43 mev_share_enabled: bool,
44}
45
46impl Engine {
47 #[allow(clippy::too_many_arguments)]
48 pub fn new(
49 http_provider: HttpProvider,
50 ws_provider: WsProvider,
51 db: Database,
52 nonce_manager: NonceManager,
53 portfolio: Arc<PortfolioManager>,
54 safety_guard: Arc<SafetyGuard>,
55 dry_run: bool,
56 gas_oracle: GasOracle,
57 price_feed: PriceFeed,
58 chain_id: u64,
59 relay_url: String,
60 bundle_signer: PrivateKeySigner,
61 max_gas_price_gwei: u64,
62 simulator: Simulator,
63 metrics_port: u16,
64 strategy_enabled: bool,
65 slippage_bps: u64,
66 router_allowlist: HashSet<Address>,
67 wrapped_native: Address,
68 mev_share_stream_url: String,
69 mev_share_history_limit: u32,
70 mev_share_enabled: bool,
71 ) -> Self {
72 Self {
73 http_provider,
74 ws_provider,
75 db,
76 nonce_manager,
77 portfolio,
78 safety_guard,
79 dry_run,
80 gas_oracle,
81 price_feed,
82 chain_id,
83 relay_url,
84 bundle_signer,
85 max_gas_price_gwei,
86 simulator,
87 metrics_port,
88 strategy_enabled,
89 slippage_bps,
90 router_allowlist,
91 wrapped_native,
92 mev_share_stream_url,
93 mev_share_history_limit,
94 mev_share_enabled,
95 }
96 }
97
98 pub async fn run(self) -> Result<(), AppError> {
99 let (tx_sender, tx_receiver) = mpsc::unbounded_channel();
100 let (block_sender, block_receiver) = broadcast::channel(32);
101
102 let mempool = MempoolScanner::new(self.ws_provider.clone(), tx_sender.clone());
103 let block_listener = BlockListener::new(
104 self.ws_provider.clone(),
105 block_sender.clone(),
106 self.nonce_manager.clone(),
107 );
108 let bundle_sender: SharedBundleSender = Arc::new(BundleSender::new(
109 self.http_provider.clone(),
110 self.dry_run,
111 self.relay_url.clone(),
112 self.bundle_signer.clone(),
113 ));
114 let stats = Arc::new(StrategyStats::default());
115 let _metrics_addr = crate::common::metrics::spawn_metrics_server(
116 self.metrics_port,
117 stats.clone(),
118 self.portfolio.clone(),
119 )
120 .await;
121 if self.strategy_enabled {
122 let strategy = StrategyExecutor::new(
123 tx_receiver,
124 block_receiver,
125 self.safety_guard.clone(),
126 bundle_sender.clone(),
127 self.db.clone(),
128 self.portfolio.clone(),
129 self.gas_oracle.clone(),
130 self.price_feed,
131 self.chain_id,
132 self.max_gas_price_gwei,
133 self.simulator,
134 stats,
135 self.bundle_signer.clone(),
136 self.nonce_manager.clone(),
137 self.slippage_bps,
138 self.http_provider.clone(),
139 self.dry_run,
140 self.router_allowlist.clone(),
141 self.wrapped_native,
142 );
143
144 if self.mev_share_enabled {
145 let mev_share = MevShareClient::new(
146 self.mev_share_stream_url.clone(),
147 self.chain_id,
148 tx_sender.clone(),
149 self.mev_share_history_limit,
150 );
151 tokio::try_join!(mempool.run(), block_listener.run(), strategy.run(), mev_share.run())
152 .map(|_| ())
153 .map_err(|e| AppError::Unknown(e.into()))
154 } else {
155 tokio::try_join!(mempool.run(), block_listener.run(), strategy.run())
156 .map(|_| ())
157 .map_err(|e| AppError::Unknown(e.into()))
158 }
159 } else {
160 tokio::try_join!(mempool.run(), block_listener.run())
161 .map(|_| ())
162 .map_err(|e| AppError::Unknown(e.into()))
163 }
164 }
165}