ig_client/storage/
market_database.rs

1use crate::application::models::market::{MarketData, MarketNode};
2use crate::storage::market_persistence::{MarketHierarchyNode, MarketInstrument};
3use sqlx::{Executor, PgPool, Row};
4use tracing::info;
5
6/// Service for managing market data persistence in PostgreSQL
7pub struct MarketDatabaseService {
8    pool: PgPool,
9    exchange_name: String,
10}
11
12impl MarketDatabaseService {
13    /// Creates a new MarketDatabaseService
14    pub fn new(pool: PgPool, exchange_name: String) -> Self {
15        Self {
16            pool,
17            exchange_name,
18        }
19    }
20
21    /// Initializes the database tables and triggers
22    pub async fn initialize_database(&self) -> Result<(), sqlx::Error> {
23        info!("Initializing market database tables...");
24
25        // Create market_hierarchy_nodes table
26        sqlx::query(
27            r#"
28            CREATE TABLE IF NOT EXISTS market_hierarchy_nodes (
29                id VARCHAR(255) PRIMARY KEY,
30                name VARCHAR(500) NOT NULL,
31                parent_id VARCHAR(255) REFERENCES market_hierarchy_nodes(id),
32                exchange VARCHAR(50) NOT NULL,
33                level INTEGER NOT NULL DEFAULT 0,
34                path TEXT NOT NULL,
35                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
36                updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
37            )
38            "#,
39        )
40        .execute(&self.pool)
41        .await?;
42
43        // Create market_instruments table
44        sqlx::query(
45            r#"
46            CREATE TABLE IF NOT EXISTS market_instruments (
47                epic VARCHAR(255) PRIMARY KEY,
48                instrument_name VARCHAR(500) NOT NULL,
49                instrument_type VARCHAR(100) NOT NULL,
50                node_id VARCHAR(255) NOT NULL REFERENCES market_hierarchy_nodes(id),
51                exchange VARCHAR(50) NOT NULL,
52                expiry VARCHAR(50) NOT NULL DEFAULT '',
53                high_limit_price DOUBLE PRECISION,
54                low_limit_price DOUBLE PRECISION,
55                market_status VARCHAR(50) NOT NULL,
56                net_change DOUBLE PRECISION,
57                percentage_change DOUBLE PRECISION,
58                update_time VARCHAR(50),
59                update_time_utc TIMESTAMPTZ,
60                bid DOUBLE PRECISION,
61                offer DOUBLE PRECISION,
62                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
63                updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
64            )
65            "#,
66        )
67        .execute(&self.pool)
68        .await?;
69
70        // Create indexes for market_hierarchy_nodes
71        let hierarchy_indexes = [
72            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_parent_id ON market_hierarchy_nodes(parent_id)",
73            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_exchange ON market_hierarchy_nodes(exchange)",
74            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_level ON market_hierarchy_nodes(level)",
75            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_path ON market_hierarchy_nodes USING gin(to_tsvector('english', path))",
76            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_name ON market_hierarchy_nodes USING gin(to_tsvector('english', name))",
77        ];
78
79        for index_sql in hierarchy_indexes {
80            sqlx::query(index_sql).execute(&self.pool).await?;
81        }
82
83        // Create indexes for market_instruments
84        let instrument_indexes = [
85            "CREATE INDEX IF NOT EXISTS idx_market_instruments_node_id ON market_instruments(node_id)",
86            "CREATE INDEX IF NOT EXISTS idx_market_instruments_exchange ON market_instruments(exchange)",
87            "CREATE INDEX IF NOT EXISTS idx_market_instruments_type ON market_instruments(instrument_type)",
88            "CREATE INDEX IF NOT EXISTS idx_market_instruments_status ON market_instruments(market_status)",
89            "CREATE INDEX IF NOT EXISTS idx_market_instruments_name ON market_instruments USING gin(to_tsvector('english', instrument_name))",
90            "CREATE INDEX IF NOT EXISTS idx_market_instruments_epic ON market_instruments(epic)",
91            "CREATE INDEX IF NOT EXISTS idx_market_instruments_expiry ON market_instruments(expiry)",
92        ];
93
94        for index_sql in instrument_indexes {
95            sqlx::query(index_sql).execute(&self.pool).await?;
96        }
97
98        // Create update timestamp function
99        sqlx::query(
100            r#"
101            CREATE OR REPLACE FUNCTION update_updated_at_column()
102            RETURNS TRIGGER AS $$
103            BEGIN
104                NEW.updated_at = NOW();
105                RETURN NEW;
106            END;
107            $$ language 'plpgsql'
108            "#,
109        )
110        .execute(&self.pool)
111        .await?;
112
113        // Create triggers
114        sqlx::query("DROP TRIGGER IF EXISTS update_market_hierarchy_nodes_updated_at ON market_hierarchy_nodes")
115            .execute(&self.pool)
116            .await?;
117
118        sqlx::query(
119            r#"
120            CREATE TRIGGER update_market_hierarchy_nodes_updated_at
121                BEFORE UPDATE ON market_hierarchy_nodes
122                FOR EACH ROW
123                EXECUTE FUNCTION update_updated_at_column()
124            "#,
125        )
126        .execute(&self.pool)
127        .await?;
128
129        sqlx::query(
130            "DROP TRIGGER IF EXISTS update_market_instruments_updated_at ON market_instruments",
131        )
132        .execute(&self.pool)
133        .await?;
134
135        sqlx::query(
136            r#"
137            CREATE TRIGGER update_market_instruments_updated_at
138                BEFORE UPDATE ON market_instruments
139                FOR EACH ROW
140                EXECUTE FUNCTION update_updated_at_column()
141            "#,
142        )
143        .execute(&self.pool)
144        .await?;
145
146        info!("Market database tables initialized successfully");
147        Ok(())
148    }
149
150    /// Stores the complete market hierarchy in the database
151    pub async fn store_market_hierarchy(
152        &self,
153        hierarchy: &[MarketNode],
154    ) -> Result<(), sqlx::Error> {
155        info!(
156            "Storing market hierarchy with {} top-level nodes",
157            hierarchy.len()
158        );
159
160        // Start a transaction
161        let mut tx = self.pool.begin().await?;
162
163        // Clear existing data for this exchange
164        sqlx::query("DELETE FROM market_instruments WHERE exchange = $1")
165            .bind(&self.exchange_name)
166            .execute(&mut *tx)
167            .await?;
168
169        sqlx::query("DELETE FROM market_hierarchy_nodes WHERE exchange = $1")
170            .bind(&self.exchange_name)
171            .execute(&mut *tx)
172            .await?;
173
174        // Store hierarchy nodes and instruments
175        let mut node_count = 0;
176        let mut instrument_count = 0;
177
178        for node in hierarchy {
179            let (nodes, instruments) = self.process_node_recursive(node, None, 0, "").await?;
180            node_count += nodes.len();
181            instrument_count += instruments.len();
182
183            // Insert nodes
184            for node in nodes {
185                self.insert_hierarchy_node(&mut tx, &node).await?;
186            }
187
188            // Insert instruments
189            for instrument in instruments {
190                self.insert_market_instrument(&mut tx, &instrument).await?;
191            }
192        }
193
194        // Commit transaction
195        tx.commit().await?;
196
197        info!(
198            "Successfully stored {} hierarchy nodes and {} instruments",
199            node_count, instrument_count
200        );
201        Ok(())
202    }
203
204    /// Processes a node recursively to extract all nodes and instruments
205    #[allow(clippy::type_complexity)]
206    fn process_node_recursive<'a>(
207        &'a self,
208        node: &'a MarketNode,
209        parent_id: Option<&'a str>,
210        level: i32,
211        parent_path: &'a str,
212    ) -> std::pin::Pin<
213        Box<
214            dyn Future<
215                    Output = Result<(Vec<MarketHierarchyNode>, Vec<MarketInstrument>), sqlx::Error>,
216                > + 'a,
217        >,
218    > {
219        Box::pin(async move {
220            let mut all_nodes = Vec::new();
221            let mut all_instruments = Vec::new();
222
223            // Build path for the current node
224            let current_path = MarketHierarchyNode::build_path(
225                if parent_path.is_empty() {
226                    None
227                } else {
228                    Some(parent_path)
229                },
230                &node.name,
231            );
232
233            // Create current node
234            let current_node = MarketHierarchyNode::new(
235                node.id.clone(),
236                node.name.clone(),
237                parent_id.map(|s| s.to_string()),
238                self.exchange_name.clone(),
239                level,
240                current_path.clone(),
241            );
242
243            all_nodes.push(current_node);
244
245            // Process markets in this node
246            for market in &node.markets {
247                let mut instrument = self.convert_market_data_to_instrument(market, &node.id);
248                instrument.parse_update_time_utc();
249                all_instruments.push(instrument);
250            }
251
252            // Process child nodes recursively
253            for child in &node.children {
254                let (child_nodes, child_instruments) = self
255                    .process_node_recursive(child, Some(&node.id), level + 1, &current_path)
256                    .await?;
257                all_nodes.extend(child_nodes);
258                all_instruments.extend(child_instruments);
259            }
260
261            Ok((all_nodes, all_instruments))
262        })
263    }
264
265    /// Converts MarketData to MarketInstrument
266    fn convert_market_data_to_instrument(
267        &self,
268        market: &MarketData,
269        node_id: &str,
270    ) -> MarketInstrument {
271        let mut instrument = MarketInstrument::new(
272            market.epic.clone(),
273            market.instrument_name.clone(),
274            format!("{:?}", market.instrument_type),
275            node_id.to_string(),
276            self.exchange_name.clone(),
277        );
278
279        instrument.expiry = market.expiry.clone();
280        instrument.high_limit_price = market.high_limit_price;
281        instrument.low_limit_price = market.low_limit_price;
282        instrument.market_status = market.market_status.clone();
283        instrument.net_change = market.net_change;
284        instrument.percentage_change = market.percentage_change;
285        instrument.update_time = market.update_time.clone();
286        instrument.bid = market.bid;
287        instrument.offer = market.offer;
288
289        instrument
290    }
291
292    /// Inserts a hierarchy node into the database
293    async fn insert_hierarchy_node(
294        &self,
295        tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
296        node: &MarketHierarchyNode,
297    ) -> Result<(), sqlx::Error> {
298        tx.execute(
299            sqlx::query(
300                r#"
301                INSERT INTO market_hierarchy_nodes 
302                (id, name, parent_id, exchange, level, path, created_at, updated_at)
303                VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
304                ON CONFLICT (id) DO UPDATE SET
305                    name = EXCLUDED.name,
306                    parent_id = EXCLUDED.parent_id,
307                    exchange = EXCLUDED.exchange,
308                    level = EXCLUDED.level,
309                    path = EXCLUDED.path,
310                    updated_at = EXCLUDED.updated_at
311                "#,
312            )
313            .bind(&node.id)
314            .bind(&node.name)
315            .bind(&node.parent_id)
316            .bind(&node.exchange)
317            .bind(node.level)
318            .bind(&node.path)
319            .bind(node.created_at)
320            .bind(node.updated_at),
321        )
322        .await?;
323
324        Ok(())
325    }
326
327    /// Inserts a market instrument into the database
328    async fn insert_market_instrument(
329        &self,
330        tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
331        instrument: &MarketInstrument,
332    ) -> Result<(), sqlx::Error> {
333        tx.execute(
334            sqlx::query(
335                r#"
336                INSERT INTO market_instruments 
337                (epic, instrument_name, instrument_type, node_id, exchange, expiry,
338                 high_limit_price, low_limit_price, market_status, net_change, 
339                 percentage_change, update_time, update_time_utc, bid, offer, 
340                 created_at, updated_at)
341                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
342                ON CONFLICT (epic) DO UPDATE SET
343                    instrument_name = EXCLUDED.instrument_name,
344                    instrument_type = EXCLUDED.instrument_type,
345                    node_id = EXCLUDED.node_id,
346                    exchange = EXCLUDED.exchange,
347                    expiry = EXCLUDED.expiry,
348                    high_limit_price = EXCLUDED.high_limit_price,
349                    low_limit_price = EXCLUDED.low_limit_price,
350                    market_status = EXCLUDED.market_status,
351                    net_change = EXCLUDED.net_change,
352                    percentage_change = EXCLUDED.percentage_change,
353                    update_time = EXCLUDED.update_time,
354                    update_time_utc = EXCLUDED.update_time_utc,
355                    bid = EXCLUDED.bid,
356                    offer = EXCLUDED.offer,
357                    updated_at = EXCLUDED.updated_at
358                "#,
359            )
360            .bind(&instrument.epic)
361            .bind(&instrument.instrument_name)
362            .bind(&instrument.instrument_type)
363            .bind(&instrument.node_id)
364            .bind(&instrument.exchange)
365            .bind(&instrument.expiry)
366            .bind(instrument.high_limit_price)
367            .bind(instrument.low_limit_price)
368            .bind(&instrument.market_status)
369            .bind(instrument.net_change)
370            .bind(instrument.percentage_change)
371            .bind(&instrument.update_time)
372            .bind(instrument.update_time_utc)
373            .bind(instrument.bid)
374            .bind(instrument.offer)
375            .bind(instrument.created_at)
376            .bind(instrument.updated_at),
377        )
378        .await?;
379
380        Ok(())
381    }
382
383    /// Retrieves market hierarchy from the database
384    pub async fn get_market_hierarchy(&self) -> Result<Vec<MarketHierarchyNode>, sqlx::Error> {
385        let nodes = sqlx::query_as::<_, MarketHierarchyNode>(
386            "SELECT * FROM market_hierarchy_nodes WHERE exchange = $1 ORDER BY level, name",
387        )
388        .bind(&self.exchange_name)
389        .fetch_all(&self.pool)
390        .await?;
391
392        Ok(nodes)
393    }
394
395    /// Retrieves market instruments for a specific node
396    pub async fn get_instruments_by_node(
397        &self,
398        node_id: &str,
399    ) -> Result<Vec<MarketInstrument>, sqlx::Error> {
400        let instruments = sqlx::query_as::<_, MarketInstrument>(
401            "SELECT * FROM market_instruments WHERE node_id = $1 AND exchange = $2 ORDER BY instrument_name",
402        )
403        .bind(node_id)
404        .bind(&self.exchange_name)
405        .fetch_all(&self.pool)
406        .await?;
407
408        Ok(instruments)
409    }
410
411    /// Searches for instruments by name or epic
412    pub async fn search_instruments(
413        &self,
414        search_term: &str,
415    ) -> Result<Vec<MarketInstrument>, sqlx::Error> {
416        let instruments = sqlx::query_as::<_, MarketInstrument>(
417            r#"
418            SELECT * FROM market_instruments 
419            WHERE exchange = $1 
420            AND (
421                instrument_name ILIKE $2 
422                OR epic ILIKE $2
423                OR to_tsvector('english', instrument_name) @@ plainto_tsquery('english', $3)
424            )
425            ORDER BY instrument_name
426            LIMIT 100
427            "#,
428        )
429        .bind(&self.exchange_name)
430        .bind(format!("%{search_term}%"))
431        .bind(search_term)
432        .fetch_all(&self.pool)
433        .await?;
434
435        Ok(instruments)
436    }
437
438    /// Gets statistics about the stored data
439    pub async fn get_statistics(&self) -> Result<DatabaseStatistics, sqlx::Error> {
440        let node_count: i64 =
441            sqlx::query_scalar("SELECT COUNT(*) FROM market_hierarchy_nodes WHERE exchange = $1")
442                .bind(&self.exchange_name)
443                .fetch_one(&self.pool)
444                .await?;
445
446        let instrument_count: i64 =
447            sqlx::query_scalar("SELECT COUNT(*) FROM market_instruments WHERE exchange = $1")
448                .bind(&self.exchange_name)
449                .fetch_one(&self.pool)
450                .await?;
451
452        let instrument_types: Vec<(String, i64)> = sqlx::query(
453            "SELECT instrument_type, COUNT(*) as count FROM market_instruments WHERE exchange = $1 GROUP BY instrument_type ORDER BY count DESC",
454        )
455        .bind(&self.exchange_name)
456        .fetch_all(&self.pool)
457        .await?
458        .into_iter()
459        .map(|row| (row.get::<String, _>("instrument_type"), row.get::<i64, _>("count")))
460        .collect();
461
462        let max_depth: i32 = sqlx::query_scalar(
463            "SELECT COALESCE(MAX(level), 0) FROM market_hierarchy_nodes WHERE exchange = $1",
464        )
465        .bind(&self.exchange_name)
466        .fetch_one(&self.pool)
467        .await?;
468
469        Ok(DatabaseStatistics {
470            exchange: self.exchange_name.clone(),
471            node_count,
472            instrument_count,
473            instrument_types,
474            max_hierarchy_depth: max_depth,
475        })
476    }
477}
478
479/// Statistics about the stored market data
480#[derive(Debug, Clone)]
481pub struct DatabaseStatistics {
482    /// Name of the exchange for which statistics are collected
483    pub exchange: String,
484    /// Total number of hierarchy nodes in the database
485    pub node_count: i64,
486    /// Total number of market instruments stored
487    pub instrument_count: i64,
488    /// List of instrument types with their respective counts (type_name, count)
489    pub instrument_types: Vec<(String, i64)>,
490    /// Maximum depth level found in the market hierarchy tree
491    pub max_hierarchy_depth: i32,
492}
493
494impl DatabaseStatistics {
495    /// Prints a formatted summary of the statistics
496    pub fn print_summary(&self) {
497        info!("=== Market Database Statistics for {} ===", self.exchange);
498        info!("Hierarchy nodes: {}", self.node_count);
499        info!("Market instruments: {}", self.instrument_count);
500        info!("Maximum hierarchy depth: {}", self.max_hierarchy_depth);
501        info!("Instrument types:");
502        for (instrument_type, count) in &self.instrument_types {
503            info!("  {}: {}", instrument_type, count);
504        }
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511    use crate::application::models::market::InstrumentType;
512
513    #[tokio::test]
514    #[ignore]
515    async fn test_convert_market_data_to_instrument() {
516        let service = MarketDatabaseService::new(
517            // This would be a real pool in actual tests
518            PgPool::connect("postgresql://test")
519                .await
520                .unwrap_or_else(|_| panic!("Test requires a PostgreSQL connection")),
521            "IG".to_string(),
522        );
523
524        let market_data = MarketData {
525            epic: "IX.D.DAX.DAILY.IP".to_string(),
526            instrument_name: "Germany 40".to_string(),
527            instrument_type: InstrumentType::Indices,
528            expiry: "DFB".to_string(),
529            high_limit_price: Some(20000.0),
530            low_limit_price: Some(5000.0),
531            market_status: "TRADEABLE".to_string(),
532            net_change: Some(100.5),
533            percentage_change: Some(0.65),
534            update_time: Some("2023-12-01T10:30:00".to_string()),
535            update_time_utc: Some("2023-12-01T10:30:00Z".to_string()),
536            bid: Some(15450.2),
537            offer: Some(15451.8),
538        };
539
540        let instrument = service.convert_market_data_to_instrument(&market_data, "node_123");
541
542        assert_eq!(instrument.epic, "IX.D.DAX.DAILY.IP");
543        assert_eq!(instrument.instrument_name, "Germany 40");
544        assert_eq!(instrument.instrument_type, "INDICES");
545        assert_eq!(instrument.node_id, "node_123");
546        assert_eq!(instrument.exchange, "IG");
547        assert_eq!(instrument.expiry, "DFB");
548        assert_eq!(instrument.high_limit_price, Some(20000.0));
549        assert_eq!(instrument.bid, Some(15450.2));
550        assert_eq!(instrument.offer, Some(15451.8));
551    }
552}