Skip to main content

ig_client/storage/
historical_prices.rs

1use crate::error::AppError;
2use crate::presentation::market::HistoricalPrice;
3use chrono::{DateTime, Utc};
4use sqlx::{PgPool, Row};
5use tracing::{info, warn};
6
7/// Initialize the historical_prices table in PostgreSQL
8pub async fn initialize_historical_prices_table(pool: &PgPool) -> Result<(), sqlx::Error> {
9    info!("Initializing historical_prices table...");
10
11    sqlx::query(
12        r#"
13        CREATE TABLE IF NOT EXISTS historical_prices (
14            id BIGSERIAL PRIMARY KEY,
15            epic VARCHAR(255) NOT NULL,
16            resolution VARCHAR(50) NOT NULL,
17            snapshot_time TIMESTAMPTZ NOT NULL,
18            open_bid DOUBLE PRECISION,
19            open_ask DOUBLE PRECISION,
20            open_last_traded DOUBLE PRECISION,
21            high_bid DOUBLE PRECISION,
22            high_ask DOUBLE PRECISION,
23            high_last_traded DOUBLE PRECISION,
24            low_bid DOUBLE PRECISION,
25            low_ask DOUBLE PRECISION,
26            low_last_traded DOUBLE PRECISION,
27            close_bid DOUBLE PRECISION,
28            close_ask DOUBLE PRECISION,
29            close_last_traded DOUBLE PRECISION,
30            last_traded_volume BIGINT,
31            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
32            updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
33            UNIQUE(epic, resolution, snapshot_time)
34        )
35        "#,
36    )
37    .execute(pool)
38    .await?;
39
40    // Ensure the table schema has 'resolution' for backwards compatibility
41    if let Err(e) = sqlx::query(
42        r#"
43        ALTER TABLE historical_prices 
44        ADD COLUMN IF NOT EXISTS resolution VARCHAR(50) NOT NULL DEFAULT 'UNKNOWN'
45        "#,
46    )
47    .execute(pool)
48    .await
49    {
50        info!(
51            "Column 'resolution' check/migration skipped or already present: {}",
52            e
53        );
54    }
55
56    // Attempt to drop the old unique constraint
57    let _ = sqlx::query(
58        r#"
59        ALTER TABLE historical_prices 
60        DROP CONSTRAINT IF EXISTS historical_prices_epic_snapshot_time_key;
61        
62        ALTER TABLE historical_prices
63        DROP CONSTRAINT IF EXISTS historical_prices_epic_resolution_snapshot_time_key;
64        
65        ALTER TABLE historical_prices 
66        ADD CONSTRAINT historical_prices_epic_resolution_snapshot_time_key 
67        UNIQUE (epic, resolution, snapshot_time);
68        "#,
69    )
70    .execute(pool)
71    .await;
72
73    // Create index for better query performance
74    sqlx::query(
75        r#"
76        CREATE INDEX IF NOT EXISTS idx_historical_prices_epic_res_time 
77        ON historical_prices(epic, resolution, snapshot_time DESC)
78        "#,
79    )
80    .execute(pool)
81    .await?;
82
83    // Create trigger for updating updated_at timestamp
84    sqlx::query(
85        r#"
86        CREATE OR REPLACE FUNCTION update_updated_at_column()
87        RETURNS TRIGGER AS $$
88        BEGIN
89            NEW.updated_at = NOW();
90            RETURN NEW;
91        END;
92        $$ language 'plpgsql'
93        "#,
94    )
95    .execute(pool)
96    .await?;
97
98    // Drop existing trigger if it exists
99    sqlx::query(
100        r#"
101        DROP TRIGGER IF EXISTS update_historical_prices_updated_at ON historical_prices
102        "#,
103    )
104    .execute(pool)
105    .await?;
106
107    // Create the trigger
108    sqlx::query(
109        r#"
110        CREATE TRIGGER update_historical_prices_updated_at
111            BEFORE UPDATE ON historical_prices
112            FOR EACH ROW
113            EXECUTE FUNCTION update_updated_at_column()
114        "#,
115    )
116    .execute(pool)
117    .await?;
118
119    info!("✅ Historical prices table initialized successfully");
120    Ok(())
121}
122
123/// Storage statistics for tracking insert/update operations
124#[derive(Debug, Default)]
125pub struct StorageStats {
126    /// Number of new records inserted into the database
127    pub inserted: usize,
128    /// Number of existing records updated in the database
129    pub updated: usize,
130    /// Number of records skipped due to errors or validation issues
131    pub skipped: usize,
132    /// Total number of records processed (inserted + updated + skipped)
133    pub total_processed: usize,
134}
135
136/// Store historical prices in PostgreSQL with UPSERT logic
137pub async fn store_historical_prices(
138    pool: &PgPool,
139    epic: &str,
140    resolution: &str,
141    prices: &[HistoricalPrice],
142) -> Result<StorageStats, sqlx::Error> {
143    let mut stats = StorageStats::default();
144    let mut tx = pool.begin().await?;
145
146    info!(
147        "Processing {} price records for epic: {}",
148        prices.len(),
149        epic
150    );
151
152    for (i, price) in prices.iter().enumerate() {
153        stats.total_processed += 1;
154
155        // Parse snapshot time
156        let snapshot_time = match parse_snapshot_time(&price.snapshot_time) {
157            Ok(time) => time,
158            Err(e) => {
159                warn!(
160                    "⚠️  Skipping record {}: Invalid timestamp '{}': {}",
161                    i + 1,
162                    price.snapshot_time,
163                    e
164                );
165                stats.skipped += 1;
166                continue;
167            }
168        };
169
170        // Use UPSERT (INSERT ... ON CONFLICT ... DO UPDATE)
171        let result = sqlx::query(
172            r#"
173            INSERT INTO historical_prices (
174                epic, resolution, snapshot_time,
175                open_bid, open_ask, open_last_traded,
176                high_bid, high_ask, high_last_traded,
177                low_bid, low_ask, low_last_traded,
178                close_bid, close_ask, close_last_traded,
179                last_traded_volume
180            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
181            ON CONFLICT (epic, resolution, snapshot_time) 
182            DO UPDATE SET
183                open_bid = EXCLUDED.open_bid,
184                open_ask = EXCLUDED.open_ask,
185                open_last_traded = EXCLUDED.open_last_traded,
186                high_bid = EXCLUDED.high_bid,
187                high_ask = EXCLUDED.high_ask,
188                high_last_traded = EXCLUDED.high_last_traded,
189                low_bid = EXCLUDED.low_bid,
190                low_ask = EXCLUDED.low_ask,
191                low_last_traded = EXCLUDED.low_last_traded,
192                close_bid = EXCLUDED.close_bid,
193                close_ask = EXCLUDED.close_ask,
194                close_last_traded = EXCLUDED.close_last_traded,
195                last_traded_volume = EXCLUDED.last_traded_volume,
196                updated_at = NOW()
197            "#,
198        )
199        .bind(epic)
200        .bind(resolution)
201        .bind(snapshot_time)
202        .bind(price.open_price.bid)
203        .bind(price.open_price.ask)
204        .bind(price.open_price.last_traded)
205        .bind(price.high_price.bid)
206        .bind(price.high_price.ask)
207        .bind(price.high_price.last_traded)
208        .bind(price.low_price.bid)
209        .bind(price.low_price.ask)
210        .bind(price.low_price.last_traded)
211        .bind(price.close_price.bid)
212        .bind(price.close_price.ask)
213        .bind(price.close_price.last_traded)
214        .bind(price.last_traded_volume)
215        .execute(&mut *tx)
216        .await?;
217
218        // Check if it was an insert or update
219        if result.rows_affected() > 0 {
220            // Query to check if this was an insert or update
221            let count: i64 = sqlx::query_scalar(
222                "SELECT COUNT(*) FROM historical_prices WHERE epic = $1 AND resolution = $2 AND snapshot_time = $3 AND created_at = updated_at"
223            )
224                .bind(epic)
225                .bind(resolution)
226                .bind(snapshot_time)
227                .fetch_one(&mut *tx)
228                .await?;
229
230            if count > 0 {
231                stats.inserted += 1;
232            } else {
233                stats.updated += 1;
234            }
235        } else {
236            stats.skipped += 1;
237        }
238
239        // Log progress every 100 records
240        if (i + 1) % 100 == 0 {
241            info!("  Processed {}/{} records...", i + 1, prices.len());
242        }
243    }
244
245    tx.commit().await?;
246    info!("✅ Transaction committed successfully");
247
248    Ok(stats)
249}
250
251/// Parse snapshot time from IG format to `DateTime<Utc>`
252///
253/// # Errors
254///
255/// Returns `AppError::Generic` if the timestamp cannot be parsed with any supported format.
256pub fn parse_snapshot_time(snapshot_time: &str) -> Result<DateTime<Utc>, AppError> {
257    // IG format: "yyyy/MM/dd hh:mm:ss" or "yyyy-MM-dd hh:mm:ss"
258    let formats = [
259        "%Y/%m/%d %H:%M:%S",
260        "%Y-%m-%d %H:%M:%S",
261        "%Y/%m/%d %H:%M",
262        "%Y-%m-%d %H:%M",
263    ];
264
265    for format in &formats {
266        if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(snapshot_time, format) {
267            return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
268        }
269    }
270
271    Err(AppError::Generic(format!(
272        "Unable to parse timestamp: {}",
273        snapshot_time
274    )))
275}
276
277/// Database statistics for a specific epic
278#[derive(Debug)]
279pub struct TableStats {
280    /// Total number of records in the database for this epic
281    pub total_records: i64,
282    /// Earliest date in the dataset (formatted as string)
283    pub earliest_date: String,
284    /// Latest date in the dataset (formatted as string)
285    pub latest_date: String,
286    /// Average closing price across all records
287    pub avg_close_price: f64,
288    /// Minimum price (lowest of all low prices) in the dataset
289    pub min_price: f64,
290    /// Maximum price (highest of all high prices) in the dataset
291    pub max_price: f64,
292}
293
294/// Get statistics for the historical_prices table
295pub async fn get_table_statistics(
296    pool: &PgPool,
297    epic: &str,
298    resolution: Option<&str>,
299) -> Result<TableStats, sqlx::Error> {
300    let row = if let Some(res) = resolution {
301        sqlx::query(
302            r#"
303            SELECT 
304                COUNT(*) as total_records,
305                MIN(snapshot_time)::text as earliest_date,
306                MAX(snapshot_time)::text as latest_date,
307                AVG(close_bid) as avg_close_price,
308                MIN(LEAST(low_bid, low_ask)) as min_price,
309                MAX(GREATEST(high_bid, high_ask)) as max_price
310            FROM historical_prices 
311            WHERE epic = $1 AND resolution = $2
312            "#,
313        )
314        .bind(epic)
315        .bind(res)
316        .fetch_one(pool)
317        .await?
318    } else {
319        sqlx::query(
320            r#"
321            SELECT 
322                COUNT(*) as total_records,
323                MIN(snapshot_time)::text as earliest_date,
324                MAX(snapshot_time)::text as latest_date,
325                AVG(close_bid) as avg_close_price,
326                MIN(LEAST(low_bid, low_ask)) as min_price,
327                MAX(GREATEST(high_bid, high_ask)) as max_price
328            FROM historical_prices 
329            WHERE epic = $1
330            "#,
331        )
332        .bind(epic)
333        .fetch_one(pool)
334        .await?
335    };
336
337    Ok(TableStats {
338        total_records: row.get("total_records"),
339        earliest_date: row.get("earliest_date"),
340        latest_date: row.get("latest_date"),
341        avg_close_price: row.get::<Option<f64>, _>("avg_close_price").unwrap_or(0.0),
342        min_price: row.get::<Option<f64>, _>("min_price").unwrap_or(0.0),
343        max_price: row.get::<Option<f64>, _>("max_price").unwrap_or(0.0),
344    })
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    #[test]
352    fn test_parse_snapshot_time_slash_format() {
353        let result = parse_snapshot_time("2024/01/15 14:30:00");
354        assert!(result.is_ok());
355        let dt = result.expect("should parse");
356        assert_eq!(
357            dt.format("%Y-%m-%d %H:%M:%S").to_string(),
358            "2024-01-15 14:30:00"
359        );
360    }
361
362    #[test]
363    fn test_parse_snapshot_time_dash_format() {
364        let result = parse_snapshot_time("2024-01-15 14:30:00");
365        assert!(result.is_ok());
366        let dt = result.expect("should parse");
367        assert_eq!(
368            dt.format("%Y-%m-%d %H:%M:%S").to_string(),
369            "2024-01-15 14:30:00"
370        );
371    }
372
373    #[test]
374    fn test_parse_snapshot_time_without_seconds_slash() {
375        let result = parse_snapshot_time("2024/01/15 14:30");
376        assert!(result.is_ok());
377        let dt = result.expect("should parse");
378        assert_eq!(dt.format("%Y-%m-%d %H:%M").to_string(), "2024-01-15 14:30");
379    }
380
381    #[test]
382    fn test_parse_snapshot_time_without_seconds_dash() {
383        let result = parse_snapshot_time("2024-01-15 14:30");
384        assert!(result.is_ok());
385        let dt = result.expect("should parse");
386        assert_eq!(dt.format("%Y-%m-%d %H:%M").to_string(), "2024-01-15 14:30");
387    }
388
389    #[test]
390    fn test_parse_snapshot_time_invalid_format() {
391        let result = parse_snapshot_time("invalid-date");
392        assert!(result.is_err());
393    }
394
395    #[test]
396    fn test_parse_snapshot_time_empty_string() {
397        let result = parse_snapshot_time("");
398        assert!(result.is_err());
399    }
400
401    #[test]
402    fn test_parse_snapshot_time_partial_date() {
403        let result = parse_snapshot_time("2024-01-15");
404        assert!(result.is_err());
405    }
406
407    #[test]
408    fn test_parse_snapshot_time_midnight() {
409        let result = parse_snapshot_time("2024/12/31 00:00:00");
410        assert!(result.is_ok());
411        let dt = result.expect("should parse");
412        assert_eq!(dt.format("%H:%M:%S").to_string(), "00:00:00");
413    }
414
415    #[test]
416    fn test_parse_snapshot_time_end_of_day() {
417        let result = parse_snapshot_time("2024/12/31 23:59:59");
418        assert!(result.is_ok());
419        let dt = result.expect("should parse");
420        assert_eq!(dt.format("%H:%M:%S").to_string(), "23:59:59");
421    }
422
423    #[test]
424    fn test_storage_stats_default() {
425        let stats = StorageStats::default();
426        assert_eq!(stats.inserted, 0);
427        assert_eq!(stats.updated, 0);
428        assert_eq!(stats.skipped, 0);
429        assert_eq!(stats.total_processed, 0);
430    }
431
432    #[test]
433    fn test_storage_stats_creation() {
434        let stats = StorageStats {
435            inserted: 10,
436            updated: 5,
437            skipped: 2,
438            total_processed: 17,
439        };
440        assert_eq!(stats.inserted, 10);
441        assert_eq!(stats.updated, 5);
442        assert_eq!(stats.skipped, 2);
443        assert_eq!(stats.total_processed, 17);
444    }
445
446    #[test]
447    fn test_table_stats_creation() {
448        let stats = TableStats {
449            total_records: 100,
450            earliest_date: "2024-01-01".to_string(),
451            latest_date: "2024-12-31".to_string(),
452            avg_close_price: 150.5,
453            min_price: 100.0,
454            max_price: 200.0,
455        };
456        assert_eq!(stats.total_records, 100);
457        assert_eq!(stats.earliest_date, "2024-01-01");
458        assert_eq!(stats.latest_date, "2024-12-31");
459        assert!((stats.avg_close_price - 150.5).abs() < f64::EPSILON);
460        assert!((stats.min_price - 100.0).abs() < f64::EPSILON);
461        assert!((stats.max_price - 200.0).abs() < f64::EPSILON);
462    }
463
464    #[test]
465    fn test_parse_snapshot_time_different_years() {
466        let years = ["2020", "2021", "2022", "2023", "2024", "2025"];
467        for year in years {
468            let timestamp = format!("{}/06/15 12:00:00", year);
469            let result = parse_snapshot_time(&timestamp);
470            assert!(result.is_ok(), "Failed for year: {}", year);
471        }
472    }
473
474    #[test]
475    fn test_parse_snapshot_time_all_months() {
476        for month in 1..=12 {
477            let timestamp = format!("2024/{:02}/15 12:00:00", month);
478            let result = parse_snapshot_time(&timestamp);
479            assert!(result.is_ok(), "Failed for month: {}", month);
480        }
481    }
482}