oxidized_builder/core/
engine.rs

1use crate::common::error::AppError;
2use crate::core::block_listener::BlockListener;
3use crate::core::executor::{BundleSender, SharedBundleSender};
4use crate::core::mempool::MempoolScanner;
5use crate::core::nonce::NonceManager;
6use crate::core::portfolio::PortfolioManager;
7use crate::core::safety::SafetyGuard;
8use crate::core::simulation::Simulator;
9use crate::core::strategy::{StrategyExecutor, StrategyStats};
10use crate::network::mev_share::MevShareClient;
11use crate::data::db::Database;
12use crate::network::gas::GasOracle;
13use crate::network::price_feed::PriceFeed;
14use crate::network::provider::{HttpProvider, WsProvider};
15use alloy::signers::local::PrivateKeySigner;
16use alloy::primitives::Address;
17use std::collections::HashSet;
18use std::sync::Arc;
19use tokio::sync::{broadcast, mpsc};
20
21pub struct Engine {
22    http_provider: HttpProvider,
23    ws_provider: WsProvider,
24    db: Database,
25    nonce_manager: NonceManager,
26    portfolio: Arc<PortfolioManager>,
27    safety_guard: Arc<SafetyGuard>,
28    dry_run: bool,
29    gas_oracle: GasOracle,
30    price_feed: PriceFeed,
31    chain_id: u64,
32    relay_url: String,
33    bundle_signer: PrivateKeySigner,
34    max_gas_price_gwei: u64,
35    simulator: Simulator,
36    metrics_port: u16,
37    strategy_enabled: bool,
38    slippage_bps: u64,
39    router_allowlist: HashSet<Address>,
40    wrapped_native: Address,
41    mev_share_stream_url: String,
42    mev_share_history_limit: u32,
43    mev_share_enabled: bool,
44}
45
46impl Engine {
47    #[allow(clippy::too_many_arguments)]
48    pub fn new(
49        http_provider: HttpProvider,
50        ws_provider: WsProvider,
51        db: Database,
52        nonce_manager: NonceManager,
53        portfolio: Arc<PortfolioManager>,
54        safety_guard: Arc<SafetyGuard>,
55        dry_run: bool,
56        gas_oracle: GasOracle,
57        price_feed: PriceFeed,
58        chain_id: u64,
59        relay_url: String,
60        bundle_signer: PrivateKeySigner,
61        max_gas_price_gwei: u64,
62        simulator: Simulator,
63        metrics_port: u16,
64        strategy_enabled: bool,
65        slippage_bps: u64,
66        router_allowlist: HashSet<Address>,
67        wrapped_native: Address,
68        mev_share_stream_url: String,
69        mev_share_history_limit: u32,
70        mev_share_enabled: bool,
71    ) -> Self {
72        Self {
73            http_provider,
74            ws_provider,
75            db,
76            nonce_manager,
77            portfolio,
78            safety_guard,
79            dry_run,
80            gas_oracle,
81            price_feed,
82            chain_id,
83            relay_url,
84            bundle_signer,
85            max_gas_price_gwei,
86            simulator,
87            metrics_port,
88            strategy_enabled,
89            slippage_bps,
90            router_allowlist,
91            wrapped_native,
92            mev_share_stream_url,
93            mev_share_history_limit,
94            mev_share_enabled,
95        }
96    }
97
98    pub async fn run(self) -> Result<(), AppError> {
99        let (tx_sender, tx_receiver) = mpsc::unbounded_channel();
100        let (block_sender, block_receiver) = broadcast::channel(32);
101
102        let mempool = MempoolScanner::new(self.ws_provider.clone(), tx_sender.clone());
103        let block_listener = BlockListener::new(
104            self.ws_provider.clone(),
105            block_sender.clone(),
106            self.nonce_manager.clone(),
107        );
108        let bundle_sender: SharedBundleSender = Arc::new(BundleSender::new(
109            self.http_provider.clone(),
110            self.dry_run,
111            self.relay_url.clone(),
112            self.bundle_signer.clone(),
113        ));
114        let stats = Arc::new(StrategyStats::default());
115        let _metrics_addr = crate::common::metrics::spawn_metrics_server(
116            self.metrics_port,
117            stats.clone(),
118            self.portfolio.clone(),
119        )
120        .await;
121        if self.strategy_enabled {
122            let strategy = StrategyExecutor::new(
123                tx_receiver,
124                block_receiver,
125                self.safety_guard.clone(),
126                bundle_sender.clone(),
127                self.db.clone(),
128                self.portfolio.clone(),
129                self.gas_oracle.clone(),
130                self.price_feed,
131                self.chain_id,
132                self.max_gas_price_gwei,
133            self.simulator,
134            stats,
135                self.bundle_signer.clone(),
136                self.nonce_manager.clone(),
137                self.slippage_bps,
138                self.http_provider.clone(),
139                self.dry_run,
140                self.router_allowlist.clone(),
141                self.wrapped_native,
142        );
143
144            if self.mev_share_enabled {
145                let mev_share = MevShareClient::new(
146                    self.mev_share_stream_url.clone(),
147                    self.chain_id,
148                    tx_sender.clone(),
149                    self.mev_share_history_limit,
150                );
151                tokio::try_join!(mempool.run(), block_listener.run(), strategy.run(), mev_share.run())
152                    .map(|_| ())
153                    .map_err(|e| AppError::Unknown(e.into()))
154            } else {
155                tokio::try_join!(mempool.run(), block_listener.run(), strategy.run())
156                    .map(|_| ())
157                    .map_err(|e| AppError::Unknown(e.into()))
158            }
159        } else {
160            tokio::try_join!(mempool.run(), block_listener.run())
161                .map(|_| ())
162                .map_err(|e| AppError::Unknown(e.into()))
163        }
164    }
165}