1use crate::prelude::{MarketData, MarketNode};
2use crate::storage::market_persistence::{MarketHierarchyNode, MarketInstrument};
3use chrono::{DateTime, Utc};
4use sqlx::{Executor, PgPool, Row};
5use std::collections::HashMap;
6use std::future::Future;
7use tracing::info;
8
9pub struct MarketDatabaseService {
11 pool: PgPool,
12 exchange_name: String,
13}
14
15impl MarketDatabaseService {
16 pub fn new(pool: PgPool, exchange_name: String) -> Self {
18 Self {
19 pool,
20 exchange_name,
21 }
22 }
23
24 pub async fn initialize_database(&self) -> Result<(), sqlx::Error> {
26 info!("Initializing market database tables...");
27
28 sqlx::query(
30 r#"
31 CREATE TABLE IF NOT EXISTS market_hierarchy_nodes (
32 id VARCHAR(255) PRIMARY KEY,
33 name VARCHAR(500) NOT NULL,
34 parent_id VARCHAR(255) REFERENCES market_hierarchy_nodes(id),
35 exchange VARCHAR(50) NOT NULL,
36 level INTEGER NOT NULL DEFAULT 0,
37 path TEXT NOT NULL,
38 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
39 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
40 )
41 "#,
42 )
43 .execute(&self.pool)
44 .await?;
45
46 sqlx::query(
48 r#"
49 CREATE TABLE IF NOT EXISTS market_instruments (
50 epic VARCHAR(255) PRIMARY KEY,
51 instrument_name VARCHAR(500) NOT NULL,
52 instrument_type VARCHAR(100) NOT NULL,
53 node_id VARCHAR(255) NOT NULL REFERENCES market_hierarchy_nodes(id),
54 exchange VARCHAR(50) NOT NULL,
55 expiry VARCHAR(50) NOT NULL DEFAULT '',
56 high_limit_price DOUBLE PRECISION,
57 low_limit_price DOUBLE PRECISION,
58 market_status VARCHAR(50) NOT NULL,
59 net_change DOUBLE PRECISION,
60 percentage_change DOUBLE PRECISION,
61 update_time VARCHAR(50),
62 update_time_utc TIMESTAMPTZ,
63 bid DOUBLE PRECISION,
64 offer DOUBLE PRECISION,
65 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
66 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
67 )
68 "#,
69 )
70 .execute(&self.pool)
71 .await?;
72
73 let hierarchy_indexes = [
75 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_parent_id ON market_hierarchy_nodes(parent_id)",
76 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_exchange ON market_hierarchy_nodes(exchange)",
77 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_level ON market_hierarchy_nodes(level)",
78 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_path ON market_hierarchy_nodes USING gin(to_tsvector('english', path))",
79 "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_name ON market_hierarchy_nodes USING gin(to_tsvector('english', name))",
80 ];
81
82 for index_sql in hierarchy_indexes {
83 sqlx::query(index_sql).execute(&self.pool).await?;
84 }
85
86 let instrument_indexes = [
88 "CREATE INDEX IF NOT EXISTS idx_market_instruments_node_id ON market_instruments(node_id)",
89 "CREATE INDEX IF NOT EXISTS idx_market_instruments_exchange ON market_instruments(exchange)",
90 "CREATE INDEX IF NOT EXISTS idx_market_instruments_type ON market_instruments(instrument_type)",
91 "CREATE INDEX IF NOT EXISTS idx_market_instruments_status ON market_instruments(market_status)",
92 "CREATE INDEX IF NOT EXISTS idx_market_instruments_name ON market_instruments USING gin(to_tsvector('english', instrument_name))",
93 "CREATE INDEX IF NOT EXISTS idx_market_instruments_epic ON market_instruments(epic)",
94 "CREATE INDEX IF NOT EXISTS idx_market_instruments_expiry ON market_instruments(expiry)",
95 ];
96
97 for index_sql in instrument_indexes {
98 sqlx::query(index_sql).execute(&self.pool).await?;
99 }
100
101 sqlx::query(
103 r#"
104 CREATE OR REPLACE FUNCTION update_updated_at_column()
105 RETURNS TRIGGER AS $$
106 BEGIN
107 NEW.updated_at = NOW();
108 RETURN NEW;
109 END;
110 $$ language 'plpgsql'
111 "#,
112 )
113 .execute(&self.pool)
114 .await?;
115
116 sqlx::query("DROP TRIGGER IF EXISTS update_market_hierarchy_nodes_updated_at ON market_hierarchy_nodes")
118 .execute(&self.pool)
119 .await?;
120
121 sqlx::query(
122 r#"
123 CREATE TRIGGER update_market_hierarchy_nodes_updated_at
124 BEFORE UPDATE ON market_hierarchy_nodes
125 FOR EACH ROW
126 EXECUTE FUNCTION update_updated_at_column()
127 "#,
128 )
129 .execute(&self.pool)
130 .await?;
131
132 sqlx::query(
133 "DROP TRIGGER IF EXISTS update_market_instruments_updated_at ON market_instruments",
134 )
135 .execute(&self.pool)
136 .await?;
137
138 sqlx::query(
139 r#"
140 CREATE TRIGGER update_market_instruments_updated_at
141 BEFORE UPDATE ON market_instruments
142 FOR EACH ROW
143 EXECUTE FUNCTION update_updated_at_column()
144 "#,
145 )
146 .execute(&self.pool)
147 .await?;
148
149 info!("Market database tables initialized successfully");
150 Ok(())
151 }
152
153 pub async fn store_market_hierarchy(
155 &self,
156 hierarchy: &[MarketNode],
157 ) -> Result<(), sqlx::Error> {
158 info!(
159 "Storing market hierarchy with {} top-level nodes",
160 hierarchy.len()
161 );
162
163 let mut tx = self.pool.begin().await?;
165
166 sqlx::query("DELETE FROM market_instruments WHERE exchange = $1")
168 .bind(&self.exchange_name)
169 .execute(&mut *tx)
170 .await?;
171
172 sqlx::query("DELETE FROM market_hierarchy_nodes WHERE exchange = $1")
173 .bind(&self.exchange_name)
174 .execute(&mut *tx)
175 .await?;
176
177 let mut node_count = 0;
179 let mut instrument_count = 0;
180
181 for node in hierarchy {
182 let (nodes, instruments) = self.process_node_recursive(node, None, 0, "").await?;
183 node_count += nodes.len();
184 instrument_count += instruments.len();
185
186 for node in nodes {
188 self.insert_hierarchy_node(&mut tx, &node).await?;
189 }
190
191 for instrument in instruments {
193 self.insert_market_instrument(&mut tx, &instrument).await?;
194 }
195 }
196
197 tx.commit().await?;
199
200 info!(
201 "Successfully stored {} hierarchy nodes and {} instruments",
202 node_count, instrument_count
203 );
204 Ok(())
205 }
206
207 pub async fn store_filtered_market_nodes(
211 &self,
212 hierarchy: &[MarketNode],
213 symbol_map: &HashMap<&str, &str>,
214 table_name: &str,
215 ) -> Result<(), sqlx::Error> {
216 info!(
217 "Storing filtered market nodes to table '{}' with {} top-level nodes",
218 table_name,
219 hierarchy.len()
220 );
221
222 let mut tx = self.pool.begin().await?;
224
225 let create_table_sql = format!(
227 r#"
228 CREATE TABLE IF NOT EXISTS {} (
229 epic VARCHAR(255) PRIMARY KEY,
230 instrumentName TEXT NOT NULL,
231 instrumentType VARCHAR(50) NOT NULL,
232 expiry VARCHAR(50),
233 lastUpdateUTC TIMESTAMP,
234 symbol VARCHAR(50)
235 )
236 "#,
237 table_name
238 );
239
240 tx.execute(sqlx::query(&create_table_sql)).await?;
241
242 let mut inserted_count = 0;
245
246 for node in hierarchy {
248 inserted_count += self
249 .process_filtered_node_recursive(node, symbol_map, table_name, &mut tx)
250 .await?;
251 }
252
253 tx.commit().await?;
255
256 info!(
257 "Successfully stored {} filtered instruments in table '{}'",
258 inserted_count, table_name
259 );
260 Ok(())
261 }
262
263 fn process_filtered_node_recursive<'a>(
265 &'a self,
266 node: &'a MarketNode,
267 symbol_map: &'a HashMap<&str, &str>,
268 table_name: &'a str,
269 tx: &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
270 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<i32, sqlx::Error>> + 'a>> {
271 Box::pin(async move {
272 let mut count = 0;
273
274 for market in &node.markets {
276 if self.is_valid_epic_format(&market.epic) {
277 let symbol = self.find_symbol_for_market(&market.instrument_name, symbol_map);
278 self.insert_filtered_market(market, &symbol, table_name, tx)
279 .await?;
280 count += 1;
281 }
282 }
283
284 for child in &node.children {
286 count += self
287 .process_filtered_node_recursive(child, symbol_map, table_name, tx)
288 .await?;
289 }
290
291 Ok(count)
292 })
293 }
294
295 pub fn is_valid_epic_format(&self, epic: &str) -> bool {
297 epic.matches('.').count() == 4
298 }
299
300 pub fn find_symbol_for_market(
302 &self,
303 instrument_name: &str,
304 symbol_map: &HashMap<&str, &str>,
305 ) -> String {
306 let name_lower = instrument_name.to_lowercase();
307
308 for (key, value) in symbol_map {
309 if name_lower.contains(&key.to_lowercase()) {
310 return value.to_string();
311 }
312 }
313
314 "UNKNOWN".to_string()
316 }
317
318 pub fn convert_update_time(&self, update_time: &Option<String>) -> Option<DateTime<Utc>> {
320 if let Some(time_str) = update_time
321 && let Ok(timestamp_ms) = time_str.parse::<i64>()
322 {
323 let timestamp_secs = timestamp_ms / 1000;
324 let nanosecs = ((timestamp_ms % 1000) * 1_000_000) as u32;
325
326 return DateTime::from_timestamp(timestamp_secs, nanosecs);
327 }
328 None
329 }
330
331 async fn insert_filtered_market(
333 &self,
334 market: &MarketData,
335 symbol: &str,
336 table_name: &str,
337 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
338 ) -> Result<(), sqlx::Error> {
339 let last_update_utc = self.convert_update_time(&market.update_time);
340
341 let insert_sql = format!(
342 r#"
343 INSERT INTO {} (epic, instrumentName, instrumentType, expiry, lastUpdateUTC, symbol)
344 VALUES ($1, $2, $3, $4, $5, $6)
345 ON CONFLICT (epic) DO UPDATE SET
346 instrumentName = EXCLUDED.instrumentName,
347 instrumentType = EXCLUDED.instrumentType,
348 expiry = EXCLUDED.expiry,
349 lastUpdateUTC = EXCLUDED.lastUpdateUTC,
350 symbol = EXCLUDED.symbol
351 "#,
352 table_name
353 );
354
355 tx.execute(
356 sqlx::query(&insert_sql)
357 .bind(&market.epic)
358 .bind(&market.instrument_name)
359 .bind(format!("{:?}", market.instrument_type))
360 .bind(&market.expiry)
361 .bind(last_update_utc)
362 .bind(symbol),
363 )
364 .await?;
365
366 Ok(())
367 }
368
369 #[allow(clippy::type_complexity)]
371 fn process_node_recursive<'a>(
372 &'a self,
373 node: &'a MarketNode,
374 parent_id: Option<&'a str>,
375 level: i32,
376 parent_path: &'a str,
377 ) -> std::pin::Pin<
378 Box<
379 dyn Future<
380 Output = Result<(Vec<MarketHierarchyNode>, Vec<MarketInstrument>), sqlx::Error>,
381 > + 'a,
382 >,
383 > {
384 Box::pin(async move {
385 let mut all_nodes = Vec::new();
386 let mut all_instruments = Vec::new();
387
388 let current_path = MarketHierarchyNode::build_path(
390 if parent_path.is_empty() {
391 None
392 } else {
393 Some(parent_path)
394 },
395 &node.name,
396 );
397
398 let current_node = MarketHierarchyNode::new(
400 node.id.clone(),
401 node.name.clone(),
402 parent_id.map(|s| s.to_string()),
403 self.exchange_name.clone(),
404 level,
405 current_path.clone(),
406 );
407
408 all_nodes.push(current_node);
409
410 for market in &node.markets {
412 let mut instrument = self.convert_market_data_to_instrument(market, &node.id);
413 instrument.parse_update_time_utc();
414 all_instruments.push(instrument);
415 }
416
417 for child in &node.children {
419 let (child_nodes, child_instruments) = self
420 .process_node_recursive(child, Some(&node.id), level + 1, ¤t_path)
421 .await?;
422 all_nodes.extend(child_nodes);
423 all_instruments.extend(child_instruments);
424 }
425
426 Ok((all_nodes, all_instruments))
427 })
428 }
429
430 pub fn convert_market_data_to_instrument(
432 &self,
433 market: &MarketData,
434 node_id: &str,
435 ) -> MarketInstrument {
436 let mut instrument = MarketInstrument::new(
437 market.epic.clone(),
438 market.instrument_name.clone(),
439 format!("{:?}", market.instrument_type).to_uppercase(),
440 node_id.to_string(),
441 self.exchange_name.clone(),
442 );
443
444 instrument.expiry = market.expiry.clone();
445 instrument.high_limit_price = market.high_limit_price;
446 instrument.low_limit_price = market.low_limit_price;
447 instrument.market_status = market.market_status.clone();
448 instrument.net_change = market.net_change;
449 instrument.percentage_change = market.percentage_change;
450 instrument.update_time = market.update_time.clone();
451 instrument.bid = market.bid;
452 instrument.offer = market.offer;
453
454 instrument
455 }
456
457 async fn insert_hierarchy_node(
459 &self,
460 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
461 node: &MarketHierarchyNode,
462 ) -> Result<(), sqlx::Error> {
463 tx.execute(
464 sqlx::query(
465 r#"
466 INSERT INTO market_hierarchy_nodes
467 (id, name, parent_id, exchange, level, path, created_at, updated_at)
468 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
469 ON CONFLICT (id) DO UPDATE SET
470 name = EXCLUDED.name,
471 parent_id = EXCLUDED.parent_id,
472 exchange = EXCLUDED.exchange,
473 level = EXCLUDED.level,
474 path = EXCLUDED.path,
475 updated_at = EXCLUDED.updated_at
476 "#,
477 )
478 .bind(&node.id)
479 .bind(&node.name)
480 .bind(&node.parent_id)
481 .bind(&node.exchange)
482 .bind(node.level)
483 .bind(&node.path)
484 .bind(node.created_at)
485 .bind(node.updated_at),
486 )
487 .await?;
488
489 Ok(())
490 }
491
492 async fn insert_market_instrument(
494 &self,
495 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
496 instrument: &MarketInstrument,
497 ) -> Result<(), sqlx::Error> {
498 tx.execute(
499 sqlx::query(
500 r#"
501 INSERT INTO market_instruments
502 (epic, instrument_name, instrument_type, node_id, exchange, expiry,
503 high_limit_price, low_limit_price, market_status, net_change,
504 percentage_change, update_time, update_time_utc, bid, offer,
505 created_at, updated_at)
506 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
507 ON CONFLICT (epic) DO UPDATE SET
508 instrument_name = EXCLUDED.instrument_name,
509 instrument_type = EXCLUDED.instrument_type,
510 node_id = EXCLUDED.node_id,
511 exchange = EXCLUDED.exchange,
512 expiry = EXCLUDED.expiry,
513 high_limit_price = EXCLUDED.high_limit_price,
514 low_limit_price = EXCLUDED.low_limit_price,
515 market_status = EXCLUDED.market_status,
516 net_change = EXCLUDED.net_change,
517 percentage_change = EXCLUDED.percentage_change,
518 update_time = EXCLUDED.update_time,
519 update_time_utc = EXCLUDED.update_time_utc,
520 bid = EXCLUDED.bid,
521 offer = EXCLUDED.offer,
522 updated_at = EXCLUDED.updated_at
523 "#,
524 )
525 .bind(&instrument.epic)
526 .bind(&instrument.instrument_name)
527 .bind(&instrument.instrument_type)
528 .bind(&instrument.node_id)
529 .bind(&instrument.exchange)
530 .bind(&instrument.expiry)
531 .bind(instrument.high_limit_price)
532 .bind(instrument.low_limit_price)
533 .bind(&instrument.market_status)
534 .bind(instrument.net_change)
535 .bind(instrument.percentage_change)
536 .bind(&instrument.update_time)
537 .bind(instrument.update_time_utc)
538 .bind(instrument.bid)
539 .bind(instrument.offer)
540 .bind(instrument.created_at)
541 .bind(instrument.updated_at),
542 )
543 .await?;
544
545 Ok(())
546 }
547
548 pub async fn get_market_hierarchy(&self) -> Result<Vec<MarketHierarchyNode>, sqlx::Error> {
550 let nodes = sqlx::query_as::<_, MarketHierarchyNode>(
551 "SELECT * FROM market_hierarchy_nodes WHERE exchange = $1 ORDER BY level, name",
552 )
553 .bind(&self.exchange_name)
554 .fetch_all(&self.pool)
555 .await?;
556
557 Ok(nodes)
558 }
559
560 pub async fn get_instruments_by_node(
562 &self,
563 node_id: &str,
564 ) -> Result<Vec<MarketInstrument>, sqlx::Error> {
565 let instruments = sqlx::query_as::<_, MarketInstrument>(
566 "SELECT * FROM market_instruments WHERE node_id = $1 AND exchange = $2 ORDER BY instrument_name",
567 )
568 .bind(node_id)
569 .bind(&self.exchange_name)
570 .fetch_all(&self.pool)
571 .await?;
572
573 Ok(instruments)
574 }
575
576 pub async fn search_instruments(
578 &self,
579 search_term: &str,
580 ) -> Result<Vec<MarketInstrument>, sqlx::Error> {
581 let instruments = sqlx::query_as::<_, MarketInstrument>(
582 r#"
583 SELECT * FROM market_instruments
584 WHERE exchange = $1
585 AND (
586 instrument_name ILIKE $2
587 OR epic ILIKE $2
588 OR to_tsvector('english', instrument_name) @@ plainto_tsquery('english', $3)
589 )
590 ORDER BY instrument_name
591 LIMIT 100
592 "#,
593 )
594 .bind(&self.exchange_name)
595 .bind(format!("%{search_term}%"))
596 .bind(search_term)
597 .fetch_all(&self.pool)
598 .await?;
599
600 Ok(instruments)
601 }
602
603 pub async fn get_statistics(&self) -> Result<DatabaseStatistics, sqlx::Error> {
605 let node_count: i64 =
606 sqlx::query_scalar("SELECT COUNT(*) FROM market_hierarchy_nodes WHERE exchange = $1")
607 .bind(&self.exchange_name)
608 .fetch_one(&self.pool)
609 .await?;
610
611 let instrument_count: i64 =
612 sqlx::query_scalar("SELECT COUNT(*) FROM market_instruments WHERE exchange = $1")
613 .bind(&self.exchange_name)
614 .fetch_one(&self.pool)
615 .await?;
616
617 let instrument_types: Vec<(String, i64)> = sqlx::query(
618 "SELECT instrument_type, COUNT(*) as count FROM market_instruments WHERE exchange = $1 GROUP BY instrument_type ORDER BY count DESC",
619 )
620 .bind(&self.exchange_name)
621 .fetch_all(&self.pool)
622 .await?
623 .into_iter()
624 .map(|row| (row.get::<String, _>("instrument_type"), row.get::<i64, _>("count")))
625 .collect();
626
627 let max_depth: i32 = sqlx::query_scalar(
628 "SELECT COALESCE(MAX(level), 0) FROM market_hierarchy_nodes WHERE exchange = $1",
629 )
630 .bind(&self.exchange_name)
631 .fetch_one(&self.pool)
632 .await?;
633
634 Ok(DatabaseStatistics {
635 exchange: self.exchange_name.clone(),
636 node_count,
637 instrument_count,
638 instrument_types,
639 max_hierarchy_depth: max_depth,
640 })
641 }
642}
643
644#[derive(Debug, Clone)]
646pub struct DatabaseStatistics {
647 pub exchange: String,
649 pub node_count: i64,
651 pub instrument_count: i64,
653 pub instrument_types: Vec<(String, i64)>,
655 pub max_hierarchy_depth: i32,
657}
658
659impl DatabaseStatistics {
660 pub fn print_summary(&self) {
662 info!("=== Market Database Statistics for {} ===", self.exchange);
663 info!("Hierarchy nodes: {}", self.node_count);
664 info!("Market instruments: {}", self.instrument_count);
665 info!("Maximum hierarchy depth: {}", self.max_hierarchy_depth);
666 info!("Instrument types:");
667 for (instrument_type, count) in &self.instrument_types {
668 info!(" {}: {}", instrument_type, count);
669 }
670 }
671}
672
673#[cfg(test)]
674mod tests {
675 use super::*;
676 use crate::presentation::instrument::InstrumentType;
677
678 #[tokio::test]
679 #[ignore]
680 async fn test_convert_market_data_to_instrument() {
681 let service = MarketDatabaseService::new(
682 PgPool::connect("postgresql://test")
684 .await
685 .unwrap_or_else(|_| panic!("Test requires a PostgreSQL connection")),
686 "IG".to_string(),
687 );
688
689 let market_data = MarketData {
690 epic: "IX.D.DAX.DAILY.IP".to_string(),
691 instrument_name: "Germany 40".to_string(),
692 instrument_type: InstrumentType::Indices,
693 expiry: "DFB".to_string(),
694 high_limit_price: Some(20000.0),
695 low_limit_price: Some(5000.0),
696 market_status: "TRADEABLE".to_string(),
697 net_change: Some(100.5),
698 percentage_change: Some(0.65),
699 update_time: Some("2023-12-01T10:30:00".to_string()),
700 update_time_utc: Some("2023-12-01T10:30:00Z".to_string()),
701 bid: Some(15450.2),
702 offer: Some(15451.8),
703 };
704
705 let instrument = service.convert_market_data_to_instrument(&market_data, "node_123");
706
707 assert_eq!(instrument.epic, "IX.D.DAX.DAILY.IP");
708 assert_eq!(instrument.instrument_name, "Germany 40");
709 assert_eq!(instrument.instrument_type, "INDICES");
710 assert_eq!(instrument.node_id, "node_123");
711 assert_eq!(instrument.exchange, "IG");
712 assert_eq!(instrument.expiry, "DFB");
713 assert_eq!(instrument.high_limit_price, Some(20000.0));
714 assert_eq!(instrument.bid, Some(15450.2));
715 assert_eq!(instrument.offer, Some(15451.8));
716 }
717}