oxidized_builder/data/
db.rs

1use crate::common::error::AppError;
2use crate::data::schema::TransactionRecord;
3use sqlx::{sqlite::SqlitePoolOptions, Pool, Row, Sqlite};
4
5#[derive(Clone)]
6pub struct Database {
7    pool: Pool<Sqlite>,
8}
9
10impl Database {
11    pub async fn new(database_url: &str) -> Result<Self, AppError> {
12        let pool = SqlitePoolOptions::new()
13            .max_connections(5)
14            .connect(database_url)
15            .await
16            .map_err(|e| AppError::Initialization(format!("DB Connect failed: {}", e)))?;
17
18        sqlx::migrate!("./migrations")
19            .run(&pool)
20            .await
21            .map_err(|e| AppError::Initialization(format!("DB Migration failed: {}", e)))?;
22
23        Ok(Self { pool })
24    }
25
26    pub async fn save_transaction(
27        &self,
28        tx_hash: &str,
29        chain_id: u64,
30        from: &str,
31        to: Option<&str>,
32        value: &str,
33        strategy: Option<&str>,
34    ) -> Result<i64, AppError> {
35        let chain_id_i64 = chain_id as i64;
36
37        let row = sqlx::query(
38            r#"
39            INSERT INTO transactions (tx_hash, chain_id, from_address, to_address, value_wei, strategy)
40            VALUES (?, ?, ?, ?, ?, ?)
41            RETURNING id
42            "#,
43        )
44        .bind(tx_hash)
45        .bind(chain_id_i64)
46        .bind(from)
47        .bind(to)
48        .bind(value)
49        .bind(strategy)
50        .fetch_one(&self.pool)
51        .await
52        .map_err(|e| AppError::Transaction {
53            hash: tx_hash.to_string(),
54            reason: e.to_string(),
55        })?;
56        let id: i64 = row.get("id");
57
58        Ok(id)
59    }
60
61    pub async fn get_recent_txs(&self, limit: i64) -> Result<Vec<TransactionRecord>, AppError> {
62        let recs = sqlx::query_as::<_, TransactionRecord>(
63            "SELECT * FROM transactions ORDER BY timestamp DESC LIMIT ?",
64        )
65        .bind(limit)
66        .fetch_all(&self.pool)
67        .await
68        .map_err(|e| AppError::Initialization(format!("Query failed: {}", e)))?;
69
70        Ok(recs)
71    }
72
73    pub async fn save_profit_record(
74        &self,
75        tx_hash: &str,
76        chain_id: u64,
77        strategy: &str,
78        profit_eth: f64,
79        gas_cost_eth: f64,
80        net_profit_eth: f64,
81    ) -> Result<i64, AppError> {
82        let chain_id_i64 = chain_id as i64;
83        let row = sqlx::query(
84            r#"
85            INSERT INTO profit_records (tx_hash, chain_id, strategy, profit_eth, gas_cost_eth, net_profit_eth)
86            VALUES (?, ?, ?, ?, ?, ?)
87            RETURNING id
88            "#,
89        )
90        .bind(tx_hash)
91        .bind(chain_id_i64)
92        .bind(strategy)
93        .bind(profit_eth)
94        .bind(gas_cost_eth)
95        .bind(net_profit_eth)
96        .fetch_one(&self.pool)
97        .await
98        .map_err(|e| AppError::Initialization(format!("Profit insert failed: {}", e)))?;
99        let id: i64 = row.get("id");
100
101        Ok(id)
102    }
103
104    pub async fn update_status(
105        &self,
106        tx_hash: &str,
107        block_number: Option<i64>,
108        status: Option<bool>,
109    ) -> Result<(), AppError> {
110        sqlx::query(
111            r#"
112            UPDATE transactions
113            SET block_number = COALESCE(?, block_number),
114                status = COALESCE(?, status)
115            WHERE tx_hash = ?
116            "#
117        )
118        .bind(block_number)
119        .bind(status)
120        .bind(tx_hash)
121        .execute(&self.pool)
122        .await
123        .map_err(|e| AppError::Initialization(format!("Status update failed: {}", e)))?;
124
125        Ok(())
126    }
127
128    pub async fn save_market_price(
129        &self,
130        chain_id: u64,
131        symbol: &str,
132        price_usd: f64,
133        source: &str,
134    ) -> Result<i64, AppError> {
135        let chain_id_i64 = chain_id as i64;
136        let row = sqlx::query(
137            r#"
138            INSERT INTO market_prices (chain_id, symbol, price_usd, source)
139            VALUES (?, ?, ?, ?)
140            RETURNING id
141            "#,
142        )
143        .bind(chain_id_i64)
144        .bind(symbol)
145        .bind(price_usd)
146        .bind(source)
147        .fetch_one(&self.pool)
148        .await
149        .map_err(|e| AppError::Initialization(format!("Market price insert failed: {}", e)))?;
150        let id: i64 = row.get("id");
151
152        Ok(id)
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[tokio::test]
161    async fn profit_and_price_inserts() {
162        let db = Database::new("sqlite::memory:").await.expect("db");
163        let profit_id = db
164            .save_profit_record("0xabc", 1, "test", 0.2, 0.05, 0.15)
165            .await
166            .unwrap();
167        assert!(profit_id > 0);
168        let price_id = db
169            .save_market_price(1, "ETHUSD", 3200.0, "test")
170            .await
171            .unwrap();
172        assert!(price_id > 0);
173    }
174}