1use crate::error::AppError;
2use crate::presentation::market::HistoricalPrice;
3use chrono::{DateTime, Utc};
4use sqlx::{PgPool, Row};
5use tracing::{info, warn};
6
7pub async fn initialize_historical_prices_table(pool: &PgPool) -> Result<(), sqlx::Error> {
9 info!("Initializing historical_prices table...");
10
11 sqlx::query(
12 r#"
13 CREATE TABLE IF NOT EXISTS historical_prices (
14 id BIGSERIAL PRIMARY KEY,
15 epic VARCHAR(255) NOT NULL,
16 resolution VARCHAR(50) NOT NULL,
17 snapshot_time TIMESTAMPTZ NOT NULL,
18 open_bid DOUBLE PRECISION,
19 open_ask DOUBLE PRECISION,
20 open_last_traded DOUBLE PRECISION,
21 high_bid DOUBLE PRECISION,
22 high_ask DOUBLE PRECISION,
23 high_last_traded DOUBLE PRECISION,
24 low_bid DOUBLE PRECISION,
25 low_ask DOUBLE PRECISION,
26 low_last_traded DOUBLE PRECISION,
27 close_bid DOUBLE PRECISION,
28 close_ask DOUBLE PRECISION,
29 close_last_traded DOUBLE PRECISION,
30 last_traded_volume BIGINT,
31 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
32 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
33 UNIQUE(epic, resolution, snapshot_time)
34 )
35 "#,
36 )
37 .execute(pool)
38 .await?;
39
40 if let Err(e) = sqlx::query(
42 r#"
43 ALTER TABLE historical_prices
44 ADD COLUMN IF NOT EXISTS resolution VARCHAR(50) NOT NULL DEFAULT 'UNKNOWN'
45 "#,
46 )
47 .execute(pool)
48 .await
49 {
50 info!(
51 "Column 'resolution' check/migration skipped or already present: {}",
52 e
53 );
54 }
55
56 let _ = sqlx::query(
58 r#"
59 ALTER TABLE historical_prices
60 DROP CONSTRAINT IF EXISTS historical_prices_epic_snapshot_time_key;
61
62 ALTER TABLE historical_prices
63 DROP CONSTRAINT IF EXISTS historical_prices_epic_resolution_snapshot_time_key;
64
65 ALTER TABLE historical_prices
66 ADD CONSTRAINT historical_prices_epic_resolution_snapshot_time_key
67 UNIQUE (epic, resolution, snapshot_time);
68 "#,
69 )
70 .execute(pool)
71 .await;
72
73 sqlx::query(
75 r#"
76 CREATE INDEX IF NOT EXISTS idx_historical_prices_epic_res_time
77 ON historical_prices(epic, resolution, snapshot_time DESC)
78 "#,
79 )
80 .execute(pool)
81 .await?;
82
83 sqlx::query(
85 r#"
86 CREATE OR REPLACE FUNCTION update_updated_at_column()
87 RETURNS TRIGGER AS $$
88 BEGIN
89 NEW.updated_at = NOW();
90 RETURN NEW;
91 END;
92 $$ language 'plpgsql'
93 "#,
94 )
95 .execute(pool)
96 .await?;
97
98 sqlx::query(
100 r#"
101 DROP TRIGGER IF EXISTS update_historical_prices_updated_at ON historical_prices
102 "#,
103 )
104 .execute(pool)
105 .await?;
106
107 sqlx::query(
109 r#"
110 CREATE TRIGGER update_historical_prices_updated_at
111 BEFORE UPDATE ON historical_prices
112 FOR EACH ROW
113 EXECUTE FUNCTION update_updated_at_column()
114 "#,
115 )
116 .execute(pool)
117 .await?;
118
119 info!("✅ Historical prices table initialized successfully");
120 Ok(())
121}
122
123#[derive(Debug, Default)]
125pub struct StorageStats {
126 pub inserted: usize,
128 pub updated: usize,
130 pub skipped: usize,
132 pub total_processed: usize,
134}
135
136pub async fn store_historical_prices(
138 pool: &PgPool,
139 epic: &str,
140 resolution: &str,
141 prices: &[HistoricalPrice],
142) -> Result<StorageStats, sqlx::Error> {
143 let mut stats = StorageStats::default();
144 let mut tx = pool.begin().await?;
145
146 info!(
147 "Processing {} price records for epic: {}",
148 prices.len(),
149 epic
150 );
151
152 for (i, price) in prices.iter().enumerate() {
153 stats.total_processed += 1;
154
155 let snapshot_time = match parse_snapshot_time(&price.snapshot_time) {
157 Ok(time) => time,
158 Err(e) => {
159 warn!(
160 "⚠️ Skipping record {}: Invalid timestamp '{}': {}",
161 i + 1,
162 price.snapshot_time,
163 e
164 );
165 stats.skipped += 1;
166 continue;
167 }
168 };
169
170 let result = sqlx::query(
172 r#"
173 INSERT INTO historical_prices (
174 epic, resolution, snapshot_time,
175 open_bid, open_ask, open_last_traded,
176 high_bid, high_ask, high_last_traded,
177 low_bid, low_ask, low_last_traded,
178 close_bid, close_ask, close_last_traded,
179 last_traded_volume
180 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
181 ON CONFLICT (epic, resolution, snapshot_time)
182 DO UPDATE SET
183 open_bid = EXCLUDED.open_bid,
184 open_ask = EXCLUDED.open_ask,
185 open_last_traded = EXCLUDED.open_last_traded,
186 high_bid = EXCLUDED.high_bid,
187 high_ask = EXCLUDED.high_ask,
188 high_last_traded = EXCLUDED.high_last_traded,
189 low_bid = EXCLUDED.low_bid,
190 low_ask = EXCLUDED.low_ask,
191 low_last_traded = EXCLUDED.low_last_traded,
192 close_bid = EXCLUDED.close_bid,
193 close_ask = EXCLUDED.close_ask,
194 close_last_traded = EXCLUDED.close_last_traded,
195 last_traded_volume = EXCLUDED.last_traded_volume,
196 updated_at = NOW()
197 "#,
198 )
199 .bind(epic)
200 .bind(resolution)
201 .bind(snapshot_time)
202 .bind(price.open_price.bid)
203 .bind(price.open_price.ask)
204 .bind(price.open_price.last_traded)
205 .bind(price.high_price.bid)
206 .bind(price.high_price.ask)
207 .bind(price.high_price.last_traded)
208 .bind(price.low_price.bid)
209 .bind(price.low_price.ask)
210 .bind(price.low_price.last_traded)
211 .bind(price.close_price.bid)
212 .bind(price.close_price.ask)
213 .bind(price.close_price.last_traded)
214 .bind(price.last_traded_volume)
215 .execute(&mut *tx)
216 .await?;
217
218 if result.rows_affected() > 0 {
220 let count: i64 = sqlx::query_scalar(
222 "SELECT COUNT(*) FROM historical_prices WHERE epic = $1 AND resolution = $2 AND snapshot_time = $3 AND created_at = updated_at"
223 )
224 .bind(epic)
225 .bind(resolution)
226 .bind(snapshot_time)
227 .fetch_one(&mut *tx)
228 .await?;
229
230 if count > 0 {
231 stats.inserted += 1;
232 } else {
233 stats.updated += 1;
234 }
235 } else {
236 stats.skipped += 1;
237 }
238
239 if (i + 1) % 100 == 0 {
241 info!(" Processed {}/{} records...", i + 1, prices.len());
242 }
243 }
244
245 tx.commit().await?;
246 info!("✅ Transaction committed successfully");
247
248 Ok(stats)
249}
250
251pub fn parse_snapshot_time(snapshot_time: &str) -> Result<DateTime<Utc>, AppError> {
257 let formats = [
259 "%Y/%m/%d %H:%M:%S",
260 "%Y-%m-%d %H:%M:%S",
261 "%Y/%m/%d %H:%M",
262 "%Y-%m-%d %H:%M",
263 ];
264
265 for format in &formats {
266 if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(snapshot_time, format) {
267 return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
268 }
269 }
270
271 Err(AppError::Generic(format!(
272 "Unable to parse timestamp: {}",
273 snapshot_time
274 )))
275}
276
277#[derive(Debug)]
279pub struct TableStats {
280 pub total_records: i64,
282 pub earliest_date: String,
284 pub latest_date: String,
286 pub avg_close_price: f64,
288 pub min_price: f64,
290 pub max_price: f64,
292}
293
294pub async fn get_table_statistics(
296 pool: &PgPool,
297 epic: &str,
298 resolution: Option<&str>,
299) -> Result<TableStats, sqlx::Error> {
300 let row = if let Some(res) = resolution {
301 sqlx::query(
302 r#"
303 SELECT
304 COUNT(*) as total_records,
305 MIN(snapshot_time)::text as earliest_date,
306 MAX(snapshot_time)::text as latest_date,
307 AVG(close_bid) as avg_close_price,
308 MIN(LEAST(low_bid, low_ask)) as min_price,
309 MAX(GREATEST(high_bid, high_ask)) as max_price
310 FROM historical_prices
311 WHERE epic = $1 AND resolution = $2
312 "#,
313 )
314 .bind(epic)
315 .bind(res)
316 .fetch_one(pool)
317 .await?
318 } else {
319 sqlx::query(
320 r#"
321 SELECT
322 COUNT(*) as total_records,
323 MIN(snapshot_time)::text as earliest_date,
324 MAX(snapshot_time)::text as latest_date,
325 AVG(close_bid) as avg_close_price,
326 MIN(LEAST(low_bid, low_ask)) as min_price,
327 MAX(GREATEST(high_bid, high_ask)) as max_price
328 FROM historical_prices
329 WHERE epic = $1
330 "#,
331 )
332 .bind(epic)
333 .fetch_one(pool)
334 .await?
335 };
336
337 Ok(TableStats {
338 total_records: row.get("total_records"),
339 earliest_date: row.get("earliest_date"),
340 latest_date: row.get("latest_date"),
341 avg_close_price: row.get::<Option<f64>, _>("avg_close_price").unwrap_or(0.0),
342 min_price: row.get::<Option<f64>, _>("min_price").unwrap_or(0.0),
343 max_price: row.get::<Option<f64>, _>("max_price").unwrap_or(0.0),
344 })
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 #[test]
352 fn test_parse_snapshot_time_slash_format() {
353 let result = parse_snapshot_time("2024/01/15 14:30:00");
354 assert!(result.is_ok());
355 let dt = result.expect("should parse");
356 assert_eq!(
357 dt.format("%Y-%m-%d %H:%M:%S").to_string(),
358 "2024-01-15 14:30:00"
359 );
360 }
361
362 #[test]
363 fn test_parse_snapshot_time_dash_format() {
364 let result = parse_snapshot_time("2024-01-15 14:30:00");
365 assert!(result.is_ok());
366 let dt = result.expect("should parse");
367 assert_eq!(
368 dt.format("%Y-%m-%d %H:%M:%S").to_string(),
369 "2024-01-15 14:30:00"
370 );
371 }
372
373 #[test]
374 fn test_parse_snapshot_time_without_seconds_slash() {
375 let result = parse_snapshot_time("2024/01/15 14:30");
376 assert!(result.is_ok());
377 let dt = result.expect("should parse");
378 assert_eq!(dt.format("%Y-%m-%d %H:%M").to_string(), "2024-01-15 14:30");
379 }
380
381 #[test]
382 fn test_parse_snapshot_time_without_seconds_dash() {
383 let result = parse_snapshot_time("2024-01-15 14:30");
384 assert!(result.is_ok());
385 let dt = result.expect("should parse");
386 assert_eq!(dt.format("%Y-%m-%d %H:%M").to_string(), "2024-01-15 14:30");
387 }
388
389 #[test]
390 fn test_parse_snapshot_time_invalid_format() {
391 let result = parse_snapshot_time("invalid-date");
392 assert!(result.is_err());
393 }
394
395 #[test]
396 fn test_parse_snapshot_time_empty_string() {
397 let result = parse_snapshot_time("");
398 assert!(result.is_err());
399 }
400
401 #[test]
402 fn test_parse_snapshot_time_partial_date() {
403 let result = parse_snapshot_time("2024-01-15");
404 assert!(result.is_err());
405 }
406
407 #[test]
408 fn test_parse_snapshot_time_midnight() {
409 let result = parse_snapshot_time("2024/12/31 00:00:00");
410 assert!(result.is_ok());
411 let dt = result.expect("should parse");
412 assert_eq!(dt.format("%H:%M:%S").to_string(), "00:00:00");
413 }
414
415 #[test]
416 fn test_parse_snapshot_time_end_of_day() {
417 let result = parse_snapshot_time("2024/12/31 23:59:59");
418 assert!(result.is_ok());
419 let dt = result.expect("should parse");
420 assert_eq!(dt.format("%H:%M:%S").to_string(), "23:59:59");
421 }
422
423 #[test]
424 fn test_storage_stats_default() {
425 let stats = StorageStats::default();
426 assert_eq!(stats.inserted, 0);
427 assert_eq!(stats.updated, 0);
428 assert_eq!(stats.skipped, 0);
429 assert_eq!(stats.total_processed, 0);
430 }
431
432 #[test]
433 fn test_storage_stats_creation() {
434 let stats = StorageStats {
435 inserted: 10,
436 updated: 5,
437 skipped: 2,
438 total_processed: 17,
439 };
440 assert_eq!(stats.inserted, 10);
441 assert_eq!(stats.updated, 5);
442 assert_eq!(stats.skipped, 2);
443 assert_eq!(stats.total_processed, 17);
444 }
445
446 #[test]
447 fn test_table_stats_creation() {
448 let stats = TableStats {
449 total_records: 100,
450 earliest_date: "2024-01-01".to_string(),
451 latest_date: "2024-12-31".to_string(),
452 avg_close_price: 150.5,
453 min_price: 100.0,
454 max_price: 200.0,
455 };
456 assert_eq!(stats.total_records, 100);
457 assert_eq!(stats.earliest_date, "2024-01-01");
458 assert_eq!(stats.latest_date, "2024-12-31");
459 assert!((stats.avg_close_price - 150.5).abs() < f64::EPSILON);
460 assert!((stats.min_price - 100.0).abs() < f64::EPSILON);
461 assert!((stats.max_price - 200.0).abs() < f64::EPSILON);
462 }
463
464 #[test]
465 fn test_parse_snapshot_time_different_years() {
466 let years = ["2020", "2021", "2022", "2023", "2024", "2025"];
467 for year in years {
468 let timestamp = format!("{}/06/15 12:00:00", year);
469 let result = parse_snapshot_time(×tamp);
470 assert!(result.is_ok(), "Failed for year: {}", year);
471 }
472 }
473
474 #[test]
475 fn test_parse_snapshot_time_all_months() {
476 for month in 1..=12 {
477 let timestamp = format!("2024/{:02}/15 12:00:00", month);
478 let result = parse_snapshot_time(×tamp);
479 assert!(result.is_ok(), "Failed for month: {}", month);
480 }
481 }
482}