ig_client/storage/
historical_prices.rs

1use crate::presentation::market::HistoricalPrice;
2use chrono::{DateTime, Utc};
3use sqlx::{PgPool, Row};
4use tracing::{info, warn};
5
6/// Initialize the historical_prices table in PostgreSQL
7pub async fn initialize_historical_prices_table(pool: &PgPool) -> Result<(), sqlx::Error> {
8    info!("Initializing historical_prices table...");
9
10    sqlx::query(
11        r#"
12        CREATE TABLE IF NOT EXISTS historical_prices (
13            id BIGSERIAL PRIMARY KEY,
14            epic VARCHAR(255) NOT NULL,
15            snapshot_time TIMESTAMPTZ NOT NULL,
16            open_bid DOUBLE PRECISION,
17            open_ask DOUBLE PRECISION,
18            open_last_traded DOUBLE PRECISION,
19            high_bid DOUBLE PRECISION,
20            high_ask DOUBLE PRECISION,
21            high_last_traded DOUBLE PRECISION,
22            low_bid DOUBLE PRECISION,
23            low_ask DOUBLE PRECISION,
24            low_last_traded DOUBLE PRECISION,
25            close_bid DOUBLE PRECISION,
26            close_ask DOUBLE PRECISION,
27            close_last_traded DOUBLE PRECISION,
28            last_traded_volume BIGINT,
29            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
30            updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
31            UNIQUE(epic, snapshot_time)
32        )
33        "#,
34    )
35    .execute(pool)
36    .await?;
37
38    // Create index for better query performance
39    sqlx::query(
40        r#"
41        CREATE INDEX IF NOT EXISTS idx_historical_prices_epic_time 
42        ON historical_prices(epic, snapshot_time DESC)
43        "#,
44    )
45    .execute(pool)
46    .await?;
47
48    // Create trigger for updating updated_at timestamp
49    sqlx::query(
50        r#"
51        CREATE OR REPLACE FUNCTION update_updated_at_column()
52        RETURNS TRIGGER AS $$
53        BEGIN
54            NEW.updated_at = NOW();
55            RETURN NEW;
56        END;
57        $$ language 'plpgsql'
58        "#,
59    )
60    .execute(pool)
61    .await?;
62
63    // Drop existing trigger if it exists
64    sqlx::query(
65        r#"
66        DROP TRIGGER IF EXISTS update_historical_prices_updated_at ON historical_prices
67        "#,
68    )
69    .execute(pool)
70    .await?;
71
72    // Create the trigger
73    sqlx::query(
74        r#"
75        CREATE TRIGGER update_historical_prices_updated_at
76            BEFORE UPDATE ON historical_prices
77            FOR EACH ROW
78            EXECUTE FUNCTION update_updated_at_column()
79        "#,
80    )
81    .execute(pool)
82    .await?;
83
84    info!("✅ Historical prices table initialized successfully");
85    Ok(())
86}
87
88/// Storage statistics for tracking insert/update operations
89#[derive(Debug, Default)]
90pub struct StorageStats {
91    /// Number of new records inserted into the database
92    pub inserted: usize,
93    /// Number of existing records updated in the database
94    pub updated: usize,
95    /// Number of records skipped due to errors or validation issues
96    pub skipped: usize,
97    /// Total number of records processed (inserted + updated + skipped)
98    pub total_processed: usize,
99}
100
101/// Store historical prices in PostgreSQL with UPSERT logic
102pub async fn store_historical_prices(
103    pool: &PgPool,
104    epic: &str,
105    prices: &[HistoricalPrice],
106) -> Result<StorageStats, sqlx::Error> {
107    let mut stats = StorageStats::default();
108    let mut tx = pool.begin().await?;
109
110    info!(
111        "Processing {} price records for epic: {}",
112        prices.len(),
113        epic
114    );
115
116    for (i, price) in prices.iter().enumerate() {
117        stats.total_processed += 1;
118
119        // Parse snapshot time
120        let snapshot_time = match parse_snapshot_time(&price.snapshot_time) {
121            Ok(time) => time,
122            Err(e) => {
123                warn!(
124                    "⚠️  Skipping record {}: Invalid timestamp '{}': {}",
125                    i + 1,
126                    price.snapshot_time,
127                    e
128                );
129                stats.skipped += 1;
130                continue;
131            }
132        };
133
134        // Use UPSERT (INSERT ... ON CONFLICT ... DO UPDATE)
135        let result = sqlx::query(
136            r#"
137            INSERT INTO historical_prices (
138                epic, snapshot_time,
139                open_bid, open_ask, open_last_traded,
140                high_bid, high_ask, high_last_traded,
141                low_bid, low_ask, low_last_traded,
142                close_bid, close_ask, close_last_traded,
143                last_traded_volume
144            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
145            ON CONFLICT (epic, snapshot_time) 
146            DO UPDATE SET
147                open_bid = EXCLUDED.open_bid,
148                open_ask = EXCLUDED.open_ask,
149                open_last_traded = EXCLUDED.open_last_traded,
150                high_bid = EXCLUDED.high_bid,
151                high_ask = EXCLUDED.high_ask,
152                high_last_traded = EXCLUDED.high_last_traded,
153                low_bid = EXCLUDED.low_bid,
154                low_ask = EXCLUDED.low_ask,
155                low_last_traded = EXCLUDED.low_last_traded,
156                close_bid = EXCLUDED.close_bid,
157                close_ask = EXCLUDED.close_ask,
158                close_last_traded = EXCLUDED.close_last_traded,
159                last_traded_volume = EXCLUDED.last_traded_volume,
160                updated_at = NOW()
161            "#,
162        )
163        .bind(epic)
164        .bind(snapshot_time)
165        .bind(price.open_price.bid)
166        .bind(price.open_price.ask)
167        .bind(price.open_price.last_traded)
168        .bind(price.high_price.bid)
169        .bind(price.high_price.ask)
170        .bind(price.high_price.last_traded)
171        .bind(price.low_price.bid)
172        .bind(price.low_price.ask)
173        .bind(price.low_price.last_traded)
174        .bind(price.close_price.bid)
175        .bind(price.close_price.ask)
176        .bind(price.close_price.last_traded)
177        .bind(price.last_traded_volume)
178        .execute(&mut *tx)
179        .await?;
180
181        // Check if it was an insert or update
182        if result.rows_affected() > 0 {
183            // Query to check if this was an insert or update
184            let count: i64 = sqlx::query_scalar(
185                "SELECT COUNT(*) FROM historical_prices WHERE epic = $1 AND snapshot_time = $2 AND created_at = updated_at"
186            )
187                .bind(epic)
188                .bind(snapshot_time)
189                .fetch_one(&mut *tx)
190                .await?;
191
192            if count > 0 {
193                stats.inserted += 1;
194            } else {
195                stats.updated += 1;
196            }
197        } else {
198            stats.skipped += 1;
199        }
200
201        // Log progress every 100 records
202        if (i + 1) % 100 == 0 {
203            info!("  Processed {}/{} records...", i + 1, prices.len());
204        }
205    }
206
207    tx.commit().await?;
208    info!("✅ Transaction committed successfully");
209
210    Ok(stats)
211}
212
213/// Parse snapshot time from IG format to `DateTime<Utc>`
214pub fn parse_snapshot_time(
215    snapshot_time: &str,
216) -> Result<DateTime<Utc>, Box<dyn std::error::Error>> {
217    // IG format: "yyyy/MM/dd hh:mm:ss" or "yyyy-MM-dd hh:mm:ss"
218    let formats = [
219        "%Y/%m/%d %H:%M:%S",
220        "%Y-%m-%d %H:%M:%S",
221        "%Y/%m/%d %H:%M",
222        "%Y-%m-%d %H:%M",
223    ];
224
225    for format in &formats {
226        if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(snapshot_time, format) {
227            return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
228        }
229    }
230
231    Err(format!("Unable to parse timestamp: {}", snapshot_time).into())
232}
233
234/// Database statistics for a specific epic
235#[derive(Debug)]
236pub struct TableStats {
237    /// Total number of records in the database for this epic
238    pub total_records: i64,
239    /// Earliest date in the dataset (formatted as string)
240    pub earliest_date: String,
241    /// Latest date in the dataset (formatted as string)
242    pub latest_date: String,
243    /// Average closing price across all records
244    pub avg_close_price: f64,
245    /// Minimum price (lowest of all low prices) in the dataset
246    pub min_price: f64,
247    /// Maximum price (highest of all high prices) in the dataset
248    pub max_price: f64,
249}
250
251/// Get statistics for the historical_prices table
252pub async fn get_table_statistics(pool: &PgPool, epic: &str) -> Result<TableStats, sqlx::Error> {
253    let row = sqlx::query(
254        r#"
255        SELECT 
256            COUNT(*) as total_records,
257            MIN(snapshot_time)::text as earliest_date,
258            MAX(snapshot_time)::text as latest_date,
259            AVG(close_bid) as avg_close_price,
260            MIN(LEAST(low_bid, low_ask)) as min_price,
261            MAX(GREATEST(high_bid, high_ask)) as max_price
262        FROM historical_prices 
263        WHERE epic = $1
264        "#,
265    )
266    .bind(epic)
267    .fetch_one(pool)
268    .await?;
269
270    Ok(TableStats {
271        total_records: row.get("total_records"),
272        earliest_date: row.get("earliest_date"),
273        latest_date: row.get("latest_date"),
274        avg_close_price: row.get::<Option<f64>, _>("avg_close_price").unwrap_or(0.0),
275        min_price: row.get::<Option<f64>, _>("min_price").unwrap_or(0.0),
276        max_price: row.get::<Option<f64>, _>("max_price").unwrap_or(0.0),
277    })
278}