1use chrono::{DateTime, Utc};
2use sqlx::{PgPool, Row};
3use tracing::{info, warn};
4
5pub async fn initialize_historical_prices_table(pool: &PgPool) -> Result<(), sqlx::Error> {
7 info!("Initializing historical_prices table...");
8
9 sqlx::query(
10 r#"
11 CREATE TABLE IF NOT EXISTS historical_prices (
12 id BIGSERIAL PRIMARY KEY,
13 epic VARCHAR(255) NOT NULL,
14 snapshot_time TIMESTAMPTZ NOT NULL,
15 open_bid DOUBLE PRECISION,
16 open_ask DOUBLE PRECISION,
17 open_last_traded DOUBLE PRECISION,
18 high_bid DOUBLE PRECISION,
19 high_ask DOUBLE PRECISION,
20 high_last_traded DOUBLE PRECISION,
21 low_bid DOUBLE PRECISION,
22 low_ask DOUBLE PRECISION,
23 low_last_traded DOUBLE PRECISION,
24 close_bid DOUBLE PRECISION,
25 close_ask DOUBLE PRECISION,
26 close_last_traded DOUBLE PRECISION,
27 last_traded_volume BIGINT,
28 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
29 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
30 UNIQUE(epic, snapshot_time)
31 )
32 "#,
33 )
34 .execute(pool)
35 .await?;
36
37 sqlx::query(
39 r#"
40 CREATE INDEX IF NOT EXISTS idx_historical_prices_epic_time
41 ON historical_prices(epic, snapshot_time DESC)
42 "#,
43 )
44 .execute(pool)
45 .await?;
46
47 sqlx::query(
49 r#"
50 CREATE OR REPLACE FUNCTION update_updated_at_column()
51 RETURNS TRIGGER AS $$
52 BEGIN
53 NEW.updated_at = NOW();
54 RETURN NEW;
55 END;
56 $$ language 'plpgsql'
57 "#,
58 )
59 .execute(pool)
60 .await?;
61
62 sqlx::query(
64 r#"
65 DROP TRIGGER IF EXISTS update_historical_prices_updated_at ON historical_prices
66 "#,
67 )
68 .execute(pool)
69 .await?;
70
71 sqlx::query(
73 r#"
74 CREATE TRIGGER update_historical_prices_updated_at
75 BEFORE UPDATE ON historical_prices
76 FOR EACH ROW
77 EXECUTE FUNCTION update_updated_at_column()
78 "#,
79 )
80 .execute(pool)
81 .await?;
82
83 info!("✅ Historical prices table initialized successfully");
84 Ok(())
85}
86
87#[derive(Debug, Default)]
89pub struct StorageStats {
90 pub inserted: usize,
92 pub updated: usize,
94 pub skipped: usize,
96 pub total_processed: usize,
98}
99
100pub async fn store_historical_prices(
102 pool: &PgPool,
103 epic: &str,
104 prices: &[crate::application::models::market::HistoricalPrice],
105) -> Result<StorageStats, sqlx::Error> {
106 let mut stats = StorageStats::default();
107 let mut tx = pool.begin().await?;
108
109 info!(
110 "Processing {} price records for epic: {}",
111 prices.len(),
112 epic
113 );
114
115 for (i, price) in prices.iter().enumerate() {
116 stats.total_processed += 1;
117
118 let snapshot_time = match parse_snapshot_time(&price.snapshot_time) {
120 Ok(time) => time,
121 Err(e) => {
122 warn!(
123 "⚠️ Skipping record {}: Invalid timestamp '{}': {}",
124 i + 1,
125 price.snapshot_time,
126 e
127 );
128 stats.skipped += 1;
129 continue;
130 }
131 };
132
133 let result = sqlx::query(
135 r#"
136 INSERT INTO historical_prices (
137 epic, snapshot_time,
138 open_bid, open_ask, open_last_traded,
139 high_bid, high_ask, high_last_traded,
140 low_bid, low_ask, low_last_traded,
141 close_bid, close_ask, close_last_traded,
142 last_traded_volume
143 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
144 ON CONFLICT (epic, snapshot_time)
145 DO UPDATE SET
146 open_bid = EXCLUDED.open_bid,
147 open_ask = EXCLUDED.open_ask,
148 open_last_traded = EXCLUDED.open_last_traded,
149 high_bid = EXCLUDED.high_bid,
150 high_ask = EXCLUDED.high_ask,
151 high_last_traded = EXCLUDED.high_last_traded,
152 low_bid = EXCLUDED.low_bid,
153 low_ask = EXCLUDED.low_ask,
154 low_last_traded = EXCLUDED.low_last_traded,
155 close_bid = EXCLUDED.close_bid,
156 close_ask = EXCLUDED.close_ask,
157 close_last_traded = EXCLUDED.close_last_traded,
158 last_traded_volume = EXCLUDED.last_traded_volume,
159 updated_at = NOW()
160 "#,
161 )
162 .bind(epic)
163 .bind(snapshot_time)
164 .bind(price.open_price.bid)
165 .bind(price.open_price.ask)
166 .bind(price.open_price.last_traded)
167 .bind(price.high_price.bid)
168 .bind(price.high_price.ask)
169 .bind(price.high_price.last_traded)
170 .bind(price.low_price.bid)
171 .bind(price.low_price.ask)
172 .bind(price.low_price.last_traded)
173 .bind(price.close_price.bid)
174 .bind(price.close_price.ask)
175 .bind(price.close_price.last_traded)
176 .bind(price.last_traded_volume)
177 .execute(&mut *tx)
178 .await?;
179
180 if result.rows_affected() > 0 {
182 let count: i64 = sqlx::query_scalar(
184 "SELECT COUNT(*) FROM historical_prices WHERE epic = $1 AND snapshot_time = $2 AND created_at = updated_at"
185 )
186 .bind(epic)
187 .bind(snapshot_time)
188 .fetch_one(&mut *tx)
189 .await?;
190
191 if count > 0 {
192 stats.inserted += 1;
193 } else {
194 stats.updated += 1;
195 }
196 } else {
197 stats.skipped += 1;
198 }
199
200 if (i + 1) % 100 == 0 {
202 info!(" Processed {}/{} records...", i + 1, prices.len());
203 }
204 }
205
206 tx.commit().await?;
207 info!("✅ Transaction committed successfully");
208
209 Ok(stats)
210}
211
212fn parse_snapshot_time(snapshot_time: &str) -> Result<DateTime<Utc>, Box<dyn std::error::Error>> {
214 let formats = [
216 "%Y/%m/%d %H:%M:%S",
217 "%Y-%m-%d %H:%M:%S",
218 "%Y/%m/%d %H:%M",
219 "%Y-%m-%d %H:%M",
220 ];
221
222 for format in &formats {
223 if let Ok(naive_dt) = chrono::NaiveDateTime::parse_from_str(snapshot_time, format) {
224 return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
225 }
226 }
227
228 Err(format!("Unable to parse timestamp: {}", snapshot_time).into())
229}
230
231#[derive(Debug)]
233pub struct TableStats {
234 pub total_records: i64,
236 pub earliest_date: String,
238 pub latest_date: String,
240 pub avg_close_price: f64,
242 pub min_price: f64,
244 pub max_price: f64,
246}
247
248pub async fn get_table_statistics(pool: &PgPool, epic: &str) -> Result<TableStats, sqlx::Error> {
250 let row = sqlx::query(
251 r#"
252 SELECT
253 COUNT(*) as total_records,
254 MIN(snapshot_time)::text as earliest_date,
255 MAX(snapshot_time)::text as latest_date,
256 AVG(close_bid) as avg_close_price,
257 MIN(LEAST(low_bid, low_ask)) as min_price,
258 MAX(GREATEST(high_bid, high_ask)) as max_price
259 FROM historical_prices
260 WHERE epic = $1
261 "#,
262 )
263 .bind(epic)
264 .fetch_one(pool)
265 .await?;
266
267 Ok(TableStats {
268 total_records: row.get("total_records"),
269 earliest_date: row.get("earliest_date"),
270 latest_date: row.get("latest_date"),
271 avg_close_price: row.get::<Option<f64>, _>("avg_close_price").unwrap_or(0.0),
272 min_price: row.get::<Option<f64>, _>("min_price").unwrap_or(0.0),
273 max_price: row.get::<Option<f64>, _>("max_price").unwrap_or(0.0),
274 })
275}