1use crate::application::models::market::{MarketData, MarketNode};
2use crate::storage::market_persistence::{MarketHierarchyNode, MarketInstrument};
3use sqlx::{Executor, PgPool, Row};
4use tracing::info;
5
6pub struct MarketDatabaseService {
8 pool: PgPool,
9 exchange_name: String,
10}
11
12impl MarketDatabaseService {
13 pub fn new(pool: PgPool, exchange_name: String) -> Self {
15 Self {
16 pool,
17 exchange_name,
18 }
19 }
20
21 pub async fn initialize_database(&self) -> Result<(), sqlx::Error> {
23 info!("Initializing market database tables...");
24
25 sqlx::query(
27 r#"
28 CREATE TABLE IF NOT EXISTS market_hierarchy_nodes (
29 id VARCHAR(255) PRIMARY KEY,
30 name VARCHAR(500) NOT NULL,
31 parent_id VARCHAR(255) REFERENCES market_hierarchy_nodes(id),
32 exchange VARCHAR(50) NOT NULL,
33 level INTEGER NOT NULL DEFAULT 0,
34 path TEXT NOT NULL,
35 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
36 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
37 )
38 "#,
39 )
40 .execute(&self.pool)
41 .await?;
42
43 sqlx::query(
45 r#"
46 CREATE TABLE IF NOT EXISTS market_instruments (
47 epic VARCHAR(255) PRIMARY KEY,
48 instrument_name VARCHAR(500) NOT NULL,
49 instrument_type VARCHAR(100) NOT NULL,
50 node_id VARCHAR(255) NOT NULL REFERENCES market_hierarchy_nodes(id),
51 exchange VARCHAR(50) NOT NULL,
52 expiry VARCHAR(50) NOT NULL DEFAULT '',
53 high_limit_price DOUBLE PRECISION,
54 low_limit_price DOUBLE PRECISION,
55 market_status VARCHAR(50) NOT NULL,
56 net_change DOUBLE PRECISION,
57 percentage_change DOUBLE PRECISION,
58 update_time VARCHAR(50),
59 update_time_utc TIMESTAMPTZ,
60 bid DOUBLE PRECISION,
61 offer DOUBLE PRECISION,
62 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
63 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
64 )
65 "#,
66 )
67 .execute(&self.pool)
68 .await?;
69
70 let hierarchy_indexes = [
72 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_parent_id ON market_hierarchy_nodes(parent_id)",
73 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_exchange ON market_hierarchy_nodes(exchange)",
74 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_level ON market_hierarchy_nodes(level)",
75 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_path ON market_hierarchy_nodes USING gin(to_tsvector('english', path))",
76 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_name ON market_hierarchy_nodes USING gin(to_tsvector('english', name))",
77 ];
78
79 for index_sql in hierarchy_indexes {
80 sqlx::query(index_sql).execute(&self.pool).await?;
81 }
82
83 let instrument_indexes = [
85 "CREATE INDEX IF NOT EXISTS idx_market_instruments_node_id ON market_instruments(node_id)",
86 "CREATE INDEX IF NOT EXISTS idx_market_instruments_exchange ON market_instruments(exchange)",
87 "CREATE INDEX IF NOT EXISTS idx_market_instruments_type ON market_instruments(instrument_type)",
88 "CREATE INDEX IF NOT EXISTS idx_market_instruments_status ON market_instruments(market_status)",
89 "CREATE INDEX IF NOT EXISTS idx_market_instruments_name ON market_instruments USING gin(to_tsvector('english', instrument_name))",
90 "CREATE INDEX IF NOT EXISTS idx_market_instruments_epic ON market_instruments(epic)",
91 "CREATE INDEX IF NOT EXISTS idx_market_instruments_expiry ON market_instruments(expiry)",
92 ];
93
94 for index_sql in instrument_indexes {
95 sqlx::query(index_sql).execute(&self.pool).await?;
96 }
97
98 sqlx::query(
100 r#"
101 CREATE OR REPLACE FUNCTION update_updated_at_column()
102 RETURNS TRIGGER AS $$
103 BEGIN
104 NEW.updated_at = NOW();
105 RETURN NEW;
106 END;
107 $$ language 'plpgsql'
108 "#,
109 )
110 .execute(&self.pool)
111 .await?;
112
113 sqlx::query("DROP TRIGGER IF EXISTS update_market_hierarchy_nodes_updated_at ON market_hierarchy_nodes")
115 .execute(&self.pool)
116 .await?;
117
118 sqlx::query(
119 r#"
120 CREATE TRIGGER update_market_hierarchy_nodes_updated_at
121 BEFORE UPDATE ON market_hierarchy_nodes
122 FOR EACH ROW
123 EXECUTE FUNCTION update_updated_at_column()
124 "#,
125 )
126 .execute(&self.pool)
127 .await?;
128
129 sqlx::query(
130 "DROP TRIGGER IF EXISTS update_market_instruments_updated_at ON market_instruments",
131 )
132 .execute(&self.pool)
133 .await?;
134
135 sqlx::query(
136 r#"
137 CREATE TRIGGER update_market_instruments_updated_at
138 BEFORE UPDATE ON market_instruments
139 FOR EACH ROW
140 EXECUTE FUNCTION update_updated_at_column()
141 "#,
142 )
143 .execute(&self.pool)
144 .await?;
145
146 info!("Market database tables initialized successfully");
147 Ok(())
148 }
149
150 pub async fn store_market_hierarchy(
152 &self,
153 hierarchy: &[MarketNode],
154 ) -> Result<(), sqlx::Error> {
155 info!(
156 "Storing market hierarchy with {} top-level nodes",
157 hierarchy.len()
158 );
159
160 let mut tx = self.pool.begin().await?;
162
163 sqlx::query("DELETE FROM market_instruments WHERE exchange = $1")
165 .bind(&self.exchange_name)
166 .execute(&mut *tx)
167 .await?;
168
169 sqlx::query("DELETE FROM market_hierarchy_nodes WHERE exchange = $1")
170 .bind(&self.exchange_name)
171 .execute(&mut *tx)
172 .await?;
173
174 let mut node_count = 0;
176 let mut instrument_count = 0;
177
178 for node in hierarchy {
179 let (nodes, instruments) = self.process_node_recursive(node, None, 0, "").await?;
180 node_count += nodes.len();
181 instrument_count += instruments.len();
182
183 for node in nodes {
185 self.insert_hierarchy_node(&mut tx, &node).await?;
186 }
187
188 for instrument in instruments {
190 self.insert_market_instrument(&mut tx, &instrument).await?;
191 }
192 }
193
194 tx.commit().await?;
196
197 info!(
198 "Successfully stored {} hierarchy nodes and {} instruments",
199 node_count, instrument_count
200 );
201 Ok(())
202 }
203
204 #[allow(clippy::type_complexity)]
206 fn process_node_recursive<'a>(
207 &'a self,
208 node: &'a MarketNode,
209 parent_id: Option<&'a str>,
210 level: i32,
211 parent_path: &'a str,
212 ) -> std::pin::Pin<
213 Box<
214 dyn Future<
215 Output = Result<(Vec<MarketHierarchyNode>, Vec<MarketInstrument>), sqlx::Error>,
216 > + 'a,
217 >,
218 > {
219 Box::pin(async move {
220 let mut all_nodes = Vec::new();
221 let mut all_instruments = Vec::new();
222
223 let current_path = MarketHierarchyNode::build_path(
225 if parent_path.is_empty() {
226 None
227 } else {
228 Some(parent_path)
229 },
230 &node.name,
231 );
232
233 let current_node = MarketHierarchyNode::new(
235 node.id.clone(),
236 node.name.clone(),
237 parent_id.map(|s| s.to_string()),
238 self.exchange_name.clone(),
239 level,
240 current_path.clone(),
241 );
242
243 all_nodes.push(current_node);
244
245 for market in &node.markets {
247 let mut instrument = self.convert_market_data_to_instrument(market, &node.id);
248 instrument.parse_update_time_utc();
249 all_instruments.push(instrument);
250 }
251
252 for child in &node.children {
254 let (child_nodes, child_instruments) = self
255 .process_node_recursive(child, Some(&node.id), level + 1, ¤t_path)
256 .await?;
257 all_nodes.extend(child_nodes);
258 all_instruments.extend(child_instruments);
259 }
260
261 Ok((all_nodes, all_instruments))
262 })
263 }
264
265 fn convert_market_data_to_instrument(
267 &self,
268 market: &MarketData,
269 node_id: &str,
270 ) -> MarketInstrument {
271 let mut instrument = MarketInstrument::new(
272 market.epic.clone(),
273 market.instrument_name.clone(),
274 format!("{:?}", market.instrument_type),
275 node_id.to_string(),
276 self.exchange_name.clone(),
277 );
278
279 instrument.expiry = market.expiry.clone();
280 instrument.high_limit_price = market.high_limit_price;
281 instrument.low_limit_price = market.low_limit_price;
282 instrument.market_status = market.market_status.clone();
283 instrument.net_change = market.net_change;
284 instrument.percentage_change = market.percentage_change;
285 instrument.update_time = market.update_time.clone();
286 instrument.bid = market.bid;
287 instrument.offer = market.offer;
288
289 instrument
290 }
291
292 async fn insert_hierarchy_node(
294 &self,
295 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
296 node: &MarketHierarchyNode,
297 ) -> Result<(), sqlx::Error> {
298 tx.execute(
299 sqlx::query(
300 r#"
301 INSERT INTO market_hierarchy_nodes
302 (id, name, parent_id, exchange, level, path, created_at, updated_at)
303 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
304 ON CONFLICT (id) DO UPDATE SET
305 name = EXCLUDED.name,
306 parent_id = EXCLUDED.parent_id,
307 exchange = EXCLUDED.exchange,
308 level = EXCLUDED.level,
309 path = EXCLUDED.path,
310 updated_at = EXCLUDED.updated_at
311 "#,
312 )
313 .bind(&node.id)
314 .bind(&node.name)
315 .bind(&node.parent_id)
316 .bind(&node.exchange)
317 .bind(node.level)
318 .bind(&node.path)
319 .bind(node.created_at)
320 .bind(node.updated_at),
321 )
322 .await?;
323
324 Ok(())
325 }
326
327 async fn insert_market_instrument(
329 &self,
330 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
331 instrument: &MarketInstrument,
332 ) -> Result<(), sqlx::Error> {
333 tx.execute(
334 sqlx::query(
335 r#"
336 INSERT INTO market_instruments
337 (epic, instrument_name, instrument_type, node_id, exchange, expiry,
338 high_limit_price, low_limit_price, market_status, net_change,
339 percentage_change, update_time, update_time_utc, bid, offer,
340 created_at, updated_at)
341 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
342 ON CONFLICT (epic) DO UPDATE SET
343 instrument_name = EXCLUDED.instrument_name,
344 instrument_type = EXCLUDED.instrument_type,
345 node_id = EXCLUDED.node_id,
346 exchange = EXCLUDED.exchange,
347 expiry = EXCLUDED.expiry,
348 high_limit_price = EXCLUDED.high_limit_price,
349 low_limit_price = EXCLUDED.low_limit_price,
350 market_status = EXCLUDED.market_status,
351 net_change = EXCLUDED.net_change,
352 percentage_change = EXCLUDED.percentage_change,
353 update_time = EXCLUDED.update_time,
354 update_time_utc = EXCLUDED.update_time_utc,
355 bid = EXCLUDED.bid,
356 offer = EXCLUDED.offer,
357 updated_at = EXCLUDED.updated_at
358 "#,
359 )
360 .bind(&instrument.epic)
361 .bind(&instrument.instrument_name)
362 .bind(&instrument.instrument_type)
363 .bind(&instrument.node_id)
364 .bind(&instrument.exchange)
365 .bind(&instrument.expiry)
366 .bind(instrument.high_limit_price)
367 .bind(instrument.low_limit_price)
368 .bind(&instrument.market_status)
369 .bind(instrument.net_change)
370 .bind(instrument.percentage_change)
371 .bind(&instrument.update_time)
372 .bind(instrument.update_time_utc)
373 .bind(instrument.bid)
374 .bind(instrument.offer)
375 .bind(instrument.created_at)
376 .bind(instrument.updated_at),
377 )
378 .await?;
379
380 Ok(())
381 }
382
383 pub async fn get_market_hierarchy(&self) -> Result<Vec<MarketHierarchyNode>, sqlx::Error> {
385 let nodes = sqlx::query_as::<_, MarketHierarchyNode>(
386 "SELECT * FROM market_hierarchy_nodes WHERE exchange = $1 ORDER BY level, name",
387 )
388 .bind(&self.exchange_name)
389 .fetch_all(&self.pool)
390 .await?;
391
392 Ok(nodes)
393 }
394
395 pub async fn get_instruments_by_node(
397 &self,
398 node_id: &str,
399 ) -> Result<Vec<MarketInstrument>, sqlx::Error> {
400 let instruments = sqlx::query_as::<_, MarketInstrument>(
401 "SELECT * FROM market_instruments WHERE node_id = $1 AND exchange = $2 ORDER BY instrument_name",
402 )
403 .bind(node_id)
404 .bind(&self.exchange_name)
405 .fetch_all(&self.pool)
406 .await?;
407
408 Ok(instruments)
409 }
410
411 pub async fn search_instruments(
413 &self,
414 search_term: &str,
415 ) -> Result<Vec<MarketInstrument>, sqlx::Error> {
416 let instruments = sqlx::query_as::<_, MarketInstrument>(
417 r#"
418 SELECT * FROM market_instruments
419 WHERE exchange = $1
420 AND (
421 instrument_name ILIKE $2
422 OR epic ILIKE $2
423 OR to_tsvector('english', instrument_name) @@ plainto_tsquery('english', $3)
424 )
425 ORDER BY instrument_name
426 LIMIT 100
427 "#,
428 )
429 .bind(&self.exchange_name)
430 .bind(format!("%{search_term}%"))
431 .bind(search_term)
432 .fetch_all(&self.pool)
433 .await?;
434
435 Ok(instruments)
436 }
437
438 pub async fn get_statistics(&self) -> Result<DatabaseStatistics, sqlx::Error> {
440 let node_count: i64 =
441 sqlx::query_scalar("SELECT COUNT(*) FROM market_hierarchy_nodes WHERE exchange = $1")
442 .bind(&self.exchange_name)
443 .fetch_one(&self.pool)
444 .await?;
445
446 let instrument_count: i64 =
447 sqlx::query_scalar("SELECT COUNT(*) FROM market_instruments WHERE exchange = $1")
448 .bind(&self.exchange_name)
449 .fetch_one(&self.pool)
450 .await?;
451
452 let instrument_types: Vec<(String, i64)> = sqlx::query(
453 "SELECT instrument_type, COUNT(*) as count FROM market_instruments WHERE exchange = $1 GROUP BY instrument_type ORDER BY count DESC",
454 )
455 .bind(&self.exchange_name)
456 .fetch_all(&self.pool)
457 .await?
458 .into_iter()
459 .map(|row| (row.get::<String, _>("instrument_type"), row.get::<i64, _>("count")))
460 .collect();
461
462 let max_depth: i32 = sqlx::query_scalar(
463 "SELECT COALESCE(MAX(level), 0) FROM market_hierarchy_nodes WHERE exchange = $1",
464 )
465 .bind(&self.exchange_name)
466 .fetch_one(&self.pool)
467 .await?;
468
469 Ok(DatabaseStatistics {
470 exchange: self.exchange_name.clone(),
471 node_count,
472 instrument_count,
473 instrument_types,
474 max_hierarchy_depth: max_depth,
475 })
476 }
477}
478
479#[derive(Debug, Clone)]
481pub struct DatabaseStatistics {
482 pub exchange: String,
484 pub node_count: i64,
486 pub instrument_count: i64,
488 pub instrument_types: Vec<(String, i64)>,
490 pub max_hierarchy_depth: i32,
492}
493
494impl DatabaseStatistics {
495 pub fn print_summary(&self) {
497 info!("=== Market Database Statistics for {} ===", self.exchange);
498 info!("Hierarchy nodes: {}", self.node_count);
499 info!("Market instruments: {}", self.instrument_count);
500 info!("Maximum hierarchy depth: {}", self.max_hierarchy_depth);
501 info!("Instrument types:");
502 for (instrument_type, count) in &self.instrument_types {
503 info!(" {}: {}", instrument_type, count);
504 }
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use super::*;
511 use crate::application::models::market::InstrumentType;
512
513 #[tokio::test]
514 #[ignore]
515 async fn test_convert_market_data_to_instrument() {
516 let service = MarketDatabaseService::new(
517 PgPool::connect("postgresql://test")
519 .await
520 .unwrap_or_else(|_| panic!("Test requires a PostgreSQL connection")),
521 "IG".to_string(),
522 );
523
524 let market_data = MarketData {
525 epic: "IX.D.DAX.DAILY.IP".to_string(),
526 instrument_name: "Germany 40".to_string(),
527 instrument_type: InstrumentType::Indices,
528 expiry: "DFB".to_string(),
529 high_limit_price: Some(20000.0),
530 low_limit_price: Some(5000.0),
531 market_status: "TRADEABLE".to_string(),
532 net_change: Some(100.5),
533 percentage_change: Some(0.65),
534 update_time: Some("2023-12-01T10:30:00".to_string()),
535 update_time_utc: Some("2023-12-01T10:30:00Z".to_string()),
536 bid: Some(15450.2),
537 offer: Some(15451.8),
538 };
539
540 let instrument = service.convert_market_data_to_instrument(&market_data, "node_123");
541
542 assert_eq!(instrument.epic, "IX.D.DAX.DAILY.IP");
543 assert_eq!(instrument.instrument_name, "Germany 40");
544 assert_eq!(instrument.instrument_type, "INDICES");
545 assert_eq!(instrument.node_id, "node_123");
546 assert_eq!(instrument.exchange, "IG");
547 assert_eq!(instrument.expiry, "DFB");
548 assert_eq!(instrument.high_limit_price, Some(20000.0));
549 assert_eq!(instrument.bid, Some(15450.2));
550 assert_eq!(instrument.offer, Some(15451.8));
551 }
552}