ig_client/storage/
historical_prices.rs

1use chrono::{DateTime, Utc};
2use sqlx::{PgPool, Row};
3use tracing::{info, warn};
4
5/// Initialize the historical_prices table in PostgreSQL
6pub async fn initialize_historical_prices_table(pool: &PgPool) -> Result<(), sqlx::Error> {
7    info!("Initializing historical_prices table...");
8
9    sqlx::query(
10        r#"
11        CREATE TABLE IF NOT EXISTS historical_prices (
12            id BIGSERIAL PRIMARY KEY,
13            epic VARCHAR(255) NOT NULL,
14            snapshot_time TIMESTAMPTZ NOT NULL,
15            open_bid DOUBLE PRECISION,
16            open_ask DOUBLE PRECISION,
17            open_last_traded DOUBLE PRECISION,
18            high_bid DOUBLE PRECISION,
19            high_ask DOUBLE PRECISION,
20            high_last_traded DOUBLE PRECISION,
21            low_bid DOUBLE PRECISION,
22            low_ask DOUBLE PRECISION,
23            low_last_traded DOUBLE PRECISION,
24            close_bid DOUBLE PRECISION,
25            close_ask DOUBLE PRECISION,
26            close_last_traded DOUBLE PRECISION,
27            last_traded_volume BIGINT,
28            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
29            updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
30            UNIQUE(epic, snapshot_time)
31        )
32        "#,
33    )
34    .execute(pool)
35    .await?;
36
37    // Create index for better query performance
38    sqlx::query(
39        r#"
40        CREATE INDEX IF NOT EXISTS idx_historical_prices_epic_time 
41        ON historical_prices(epic, snapshot_time DESC)
42        "#,
43    )
44    .execute(pool)
45    .await?;
46
47    // Create trigger for updating updated_at timestamp
48    sqlx::query(
49        r#"
50        CREATE OR REPLACE FUNCTION update_updated_at_column()
51        RETURNS TRIGGER AS $$
52        BEGIN
53            NEW.updated_at = NOW();
54            RETURN NEW;
55        END;
56        $$ language 'plpgsql'
57        "#,
58    )
59    .execute(pool)
60    .await?;
61
62    // Drop existing trigger if it exists
63    sqlx::query(
64        r#"
65        DROP TRIGGER IF EXISTS update_historical_prices_updated_at ON historical_prices
66        "#,
67    )
68    .execute(pool)
69    .await?;
70
71    // Create the trigger
72    sqlx::query(
73        r#"
74        CREATE TRIGGER update_historical_prices_updated_at
75            BEFORE UPDATE ON historical_prices
76            FOR EACH ROW
77            EXECUTE FUNCTION update_updated_at_column()
78        "#,
79    )
80    .execute(pool)
81    .await?;
82
83    info!("✅ Historical prices table initialized successfully");
84    Ok(())
85}
86
87/// Storage statistics for tracking insert/update operations
88#[derive(Debug, Default)]
89pub struct StorageStats {
90    /// Number of new records inserted into the database
91    pub inserted: usize,
92    /// Number of existing records updated in the database
93    pub updated: usize,
94    /// Number of records skipped due to errors or validation issues
95    pub skipped: usize,
96    /// Total number of records processed (inserted + updated + skipped)
97    pub total_processed: usize,
98}
99
100/// Store historical prices in PostgreSQL with UPSERT logic
101pub async fn store_historical_prices(
102    pool: &PgPool,
103    epic: &str,
104    prices: &[crate::application::models::market::HistoricalPrice],
105) -> Result<StorageStats, sqlx::Error> {
106    let mut stats = StorageStats::default();
107    let mut tx = pool.begin().await?;
108
109    info!(
110        "Processing {} price records for epic: {}",
111        prices.len(),
112        epic
113    );
114
115    for (i, price) in prices.iter().enumerate() {
116        stats.total_processed += 1;
117
118        // Parse snapshot time
119        let snapshot_time = match parse_snapshot_time(&price.snapshot_time) {
120            Ok(time) => time,
121            Err(e) => {
122                warn!(
123                    "⚠️  Skipping record {}: Invalid timestamp '{}': {}",
124                    i + 1,
125                    price.snapshot_time,
126                    e
127                );
128                stats.skipped += 1;
129                continue;
130            }
131        };
132
133        // Use UPSERT (INSERT ... ON CONFLICT ... DO UPDATE)
134        let result = sqlx::query(
135            r#"
136            INSERT INTO historical_prices (
137                epic, snapshot_time,
138                open_bid, open_ask, open_last_traded,
139                high_bid, high_ask, high_last_traded,
140                low_bid, low_ask, low_last_traded,
141                close_bid, close_ask, close_last_traded,
142                last_traded_volume
143            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
144            ON CONFLICT (epic, snapshot_time) 
145            DO UPDATE SET
146                open_bid = EXCLUDED.open_bid,
147                open_ask = EXCLUDED.open_ask,
148                open_last_traded = EXCLUDED.open_last_traded,
149                high_bid = EXCLUDED.high_bid,
150                high_ask = EXCLUDED.high_ask,
151                high_last_traded = EXCLUDED.high_last_traded,
152                low_bid = EXCLUDED.low_bid,
153                low_ask = EXCLUDED.low_ask,
154                low_last_traded = EXCLUDED.low_last_traded,
155                close_bid = EXCLUDED.close_bid,
156                close_ask = EXCLUDED.close_ask,
157                close_last_traded = EXCLUDED.close_last_traded,
158                last_traded_volume = EXCLUDED.last_traded_volume,
159                updated_at = NOW()
160            "#,
161        )
162        .bind(epic)
163        .bind(snapshot_time)
164        .bind(price.open_price.bid)
165        .bind(price.open_price.ask)
166        .bind(price.open_price.last_traded)
167        .bind(price.high_price.bid)
168        .bind(price.high_price.ask)
169        .bind(price.high_price.last_traded)
170        .bind(price.low_price.bid)
171        .bind(price.low_price.ask)
172        .bind(price.low_price.last_traded)
173        .bind(price.close_price.bid)
174        .bind(price.close_price.ask)
175        .bind(price.close_price.last_traded)
176        .bind(price.last_traded_volume)
177        .execute(&mut *tx)
178        .await?;
179
180        // Check if it was an insert or update
181        if result.rows_affected() > 0 {
182            // Query to check if this was an insert or update
183            let count: i64 = sqlx::query_scalar(
184                "SELECT COUNT(*) FROM historical_prices WHERE epic = $1 AND snapshot_time = $2 AND created_at = updated_at"
185            )
186                .bind(epic)
187                .bind(snapshot_time)
188                .fetch_one(&mut *tx)
189                .await?;
190
191            if count > 0 {
192                stats.inserted += 1;
193            } else {
194                stats.updated += 1;
195            }
196        } else {
197            stats.skipped += 1;
198        }
199
200        // Log progress every 100 records
201        if (i + 1) % 100 == 0 {
202            info!("  Processed {}/{} records...", i + 1, prices.len());
203        }
204    }
205
206    tx.commit().await?;
207    info!("✅ Transaction committed successfully");
208
209    Ok(stats)
210}
211
212/// Parse snapshot time from IG format to `DateTime<Utc>`
213fn parse_snapshot_time(snapshot_time: &str) -> Result<DateTime<Utc>, Box<dyn std::error::Error>> {
214    // IG format: "yyyy/MM/dd hh:mm:ss" or "yyyy-MM-dd hh:mm:ss"
215    let formats = [
216        "%Y/%m/%d %H:%M:%S",
217        "%Y-%m-%d %H:%M:%S",
218        "%Y/%m/%d %H:%M",
219        "%Y-%m-%d %H:%M",
220    ];
221
222    for format in &formats {
223        if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(snapshot_time, format) {
224            return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
225        }
226    }
227
228    Err(format!("Unable to parse timestamp: {}", snapshot_time).into())
229}
230
231/// Database statistics for a specific epic
232#[derive(Debug)]
233pub struct TableStats {
234    /// Total number of records in the database for this epic
235    pub total_records: i64,
236    /// Earliest date in the dataset (formatted as string)
237    pub earliest_date: String,
238    /// Latest date in the dataset (formatted as string)
239    pub latest_date: String,
240    /// Average closing price across all records
241    pub avg_close_price: f64,
242    /// Minimum price (lowest of all low prices) in the dataset
243    pub min_price: f64,
244    /// Maximum price (highest of all high prices) in the dataset
245    pub max_price: f64,
246}
247
248/// Get statistics for the historical_prices table
249pub async fn get_table_statistics(pool: &PgPool, epic: &str) -> Result<TableStats, sqlx::Error> {
250    let row = sqlx::query(
251        r#"
252        SELECT 
253            COUNT(*) as total_records,
254            MIN(snapshot_time)::text as earliest_date,
255            MAX(snapshot_time)::text as latest_date,
256            AVG(close_bid) as avg_close_price,
257            MIN(LEAST(low_bid, low_ask)) as min_price,
258            MAX(GREATEST(high_bid, high_ask)) as max_price
259        FROM historical_prices 
260        WHERE epic = $1
261        "#,
262    )
263    .bind(epic)
264    .fetch_one(pool)
265    .await?;
266
267    Ok(TableStats {
268        total_records: row.get("total_records"),
269        earliest_date: row.get("earliest_date"),
270        latest_date: row.get("latest_date"),
271        avg_close_price: row.get::<Option<f64>, _>("avg_close_price").unwrap_or(0.0),
272        min_price: row.get::<Option<f64>, _>("min_price").unwrap_or(0.0),
273        max_price: row.get::<Option<f64>, _>("max_price").unwrap_or(0.0),
274    })
275}