1use crate::presentation::market::HistoricalPrice;
2use chrono::{DateTime, Utc};
3use sqlx::{PgPool, Row};
4use tracing::{info, warn};
5
6pub async fn initialize_historical_prices_table(pool: &PgPool) -> Result<(), sqlx::Error> {
8 info!("Initializing historical_prices table...");
9
10 sqlx::query(
11 r#"
12 CREATE TABLE IF NOT EXISTS historical_prices (
13 id BIGSERIAL PRIMARY KEY,
14 epic VARCHAR(255) NOT NULL,
15 snapshot_time TIMESTAMPTZ NOT NULL,
16 open_bid DOUBLE PRECISION,
17 open_ask DOUBLE PRECISION,
18 open_last_traded DOUBLE PRECISION,
19 high_bid DOUBLE PRECISION,
20 high_ask DOUBLE PRECISION,
21 high_last_traded DOUBLE PRECISION,
22 low_bid DOUBLE PRECISION,
23 low_ask DOUBLE PRECISION,
24 low_last_traded DOUBLE PRECISION,
25 close_bid DOUBLE PRECISION,
26 close_ask DOUBLE PRECISION,
27 close_last_traded DOUBLE PRECISION,
28 last_traded_volume BIGINT,
29 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
30 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
31 UNIQUE(epic, snapshot_time)
32 )
33 "#,
34 )
35 .execute(pool)
36 .await?;
37
38 sqlx::query(
40 r#"
41 CREATE INDEX IF NOT EXISTS idx_historical_prices_epic_time
42 ON historical_prices(epic, snapshot_time DESC)
43 "#,
44 )
45 .execute(pool)
46 .await?;
47
48 sqlx::query(
50 r#"
51 CREATE OR REPLACE FUNCTION update_updated_at_column()
52 RETURNS TRIGGER AS $$
53 BEGIN
54 NEW.updated_at = NOW();
55 RETURN NEW;
56 END;
57 $$ language 'plpgsql'
58 "#,
59 )
60 .execute(pool)
61 .await?;
62
63 sqlx::query(
65 r#"
66 DROP TRIGGER IF EXISTS update_historical_prices_updated_at ON historical_prices
67 "#,
68 )
69 .execute(pool)
70 .await?;
71
72 sqlx::query(
74 r#"
75 CREATE TRIGGER update_historical_prices_updated_at
76 BEFORE UPDATE ON historical_prices
77 FOR EACH ROW
78 EXECUTE FUNCTION update_updated_at_column()
79 "#,
80 )
81 .execute(pool)
82 .await?;
83
84 info!("✅ Historical prices table initialized successfully");
85 Ok(())
86}
87
88#[derive(Debug, Default)]
90pub struct StorageStats {
91 pub inserted: usize,
93 pub updated: usize,
95 pub skipped: usize,
97 pub total_processed: usize,
99}
100
101pub async fn store_historical_prices(
103 pool: &PgPool,
104 epic: &str,
105 prices: &[HistoricalPrice],
106) -> Result<StorageStats, sqlx::Error> {
107 let mut stats = StorageStats::default();
108 let mut tx = pool.begin().await?;
109
110 info!(
111 "Processing {} price records for epic: {}",
112 prices.len(),
113 epic
114 );
115
116 for (i, price) in prices.iter().enumerate() {
117 stats.total_processed += 1;
118
119 let snapshot_time = match parse_snapshot_time(&price.snapshot_time) {
121 Ok(time) => time,
122 Err(e) => {
123 warn!(
124 "⚠️ Skipping record {}: Invalid timestamp '{}': {}",
125 i + 1,
126 price.snapshot_time,
127 e
128 );
129 stats.skipped += 1;
130 continue;
131 }
132 };
133
134 let result = sqlx::query(
136 r#"
137 INSERT INTO historical_prices (
138 epic, snapshot_time,
139 open_bid, open_ask, open_last_traded,
140 high_bid, high_ask, high_last_traded,
141 low_bid, low_ask, low_last_traded,
142 close_bid, close_ask, close_last_traded,
143 last_traded_volume
144 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
145 ON CONFLICT (epic, snapshot_time)
146 DO UPDATE SET
147 open_bid = EXCLUDED.open_bid,
148 open_ask = EXCLUDED.open_ask,
149 open_last_traded = EXCLUDED.open_last_traded,
150 high_bid = EXCLUDED.high_bid,
151 high_ask = EXCLUDED.high_ask,
152 high_last_traded = EXCLUDED.high_last_traded,
153 low_bid = EXCLUDED.low_bid,
154 low_ask = EXCLUDED.low_ask,
155 low_last_traded = EXCLUDED.low_last_traded,
156 close_bid = EXCLUDED.close_bid,
157 close_ask = EXCLUDED.close_ask,
158 close_last_traded = EXCLUDED.close_last_traded,
159 last_traded_volume = EXCLUDED.last_traded_volume,
160 updated_at = NOW()
161 "#,
162 )
163 .bind(epic)
164 .bind(snapshot_time)
165 .bind(price.open_price.bid)
166 .bind(price.open_price.ask)
167 .bind(price.open_price.last_traded)
168 .bind(price.high_price.bid)
169 .bind(price.high_price.ask)
170 .bind(price.high_price.last_traded)
171 .bind(price.low_price.bid)
172 .bind(price.low_price.ask)
173 .bind(price.low_price.last_traded)
174 .bind(price.close_price.bid)
175 .bind(price.close_price.ask)
176 .bind(price.close_price.last_traded)
177 .bind(price.last_traded_volume)
178 .execute(&mut *tx)
179 .await?;
180
181 if result.rows_affected() > 0 {
183 let count: i64 = sqlx::query_scalar(
185 "SELECT COUNT(*) FROM historical_prices WHERE epic = $1 AND snapshot_time = $2 AND created_at = updated_at"
186 )
187 .bind(epic)
188 .bind(snapshot_time)
189 .fetch_one(&mut *tx)
190 .await?;
191
192 if count > 0 {
193 stats.inserted += 1;
194 } else {
195 stats.updated += 1;
196 }
197 } else {
198 stats.skipped += 1;
199 }
200
201 if (i + 1) % 100 == 0 {
203 info!(" Processed {}/{} records...", i + 1, prices.len());
204 }
205 }
206
207 tx.commit().await?;
208 info!("✅ Transaction committed successfully");
209
210 Ok(stats)
211}
212
213pub fn parse_snapshot_time(
215 snapshot_time: &str,
216) -> Result<DateTime<Utc>, Box<dyn std::error::Error>> {
217 let formats = [
219 "%Y/%m/%d %H:%M:%S",
220 "%Y-%m-%d %H:%M:%S",
221 "%Y/%m/%d %H:%M",
222 "%Y-%m-%d %H:%M",
223 ];
224
225 for format in &formats {
226 if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(snapshot_time, format) {
227 return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
228 }
229 }
230
231 Err(format!("Unable to parse timestamp: {}", snapshot_time).into())
232}
233
234#[derive(Debug)]
236pub struct TableStats {
237 pub total_records: i64,
239 pub earliest_date: String,
241 pub latest_date: String,
243 pub avg_close_price: f64,
245 pub min_price: f64,
247 pub max_price: f64,
249}
250
251pub async fn get_table_statistics(pool: &PgPool, epic: &str) -> Result<TableStats, sqlx::Error> {
253 let row = sqlx::query(
254 r#"
255 SELECT
256 COUNT(*) as total_records,
257 MIN(snapshot_time)::text as earliest_date,
258 MAX(snapshot_time)::text as latest_date,
259 AVG(close_bid) as avg_close_price,
260 MIN(LEAST(low_bid, low_ask)) as min_price,
261 MAX(GREATEST(high_bid, high_ask)) as max_price
262 FROM historical_prices
263 WHERE epic = $1
264 "#,
265 )
266 .bind(epic)
267 .fetch_one(pool)
268 .await?;
269
270 Ok(TableStats {
271 total_records: row.get("total_records"),
272 earliest_date: row.get("earliest_date"),
273 latest_date: row.get("latest_date"),
274 avg_close_price: row.get::<Option<f64>, _>("avg_close_price").unwrap_or(0.0),
275 min_price: row.get::<Option<f64>, _>("min_price").unwrap_or(0.0),
276 max_price: row.get::<Option<f64>, _>("max_price").unwrap_or(0.0),
277 })
278}