ig_client/storage/
market_database.rs

1use crate::prelude::{MarketData, MarketNode};
2use crate::storage::market_persistence::{MarketHierarchyNode, MarketInstrument};
3use chrono::{DateTime, Utc};
4use sqlx::{Executor, PgPool, Row};
5use std::collections::HashMap;
6use std::future::Future;
7use tracing::info;
8
9/// Service for managing market data persistence in PostgreSQL
10pub struct MarketDatabaseService {
11    pool: PgPool,
12    exchange_name: String,
13}
14
15impl MarketDatabaseService {
16    /// Creates a new MarketDatabaseService
17    pub fn new(pool: PgPool, exchange_name: String) -> Self {
18        Self {
19            pool,
20            exchange_name,
21        }
22    }
23
24    /// Initializes the database tables and triggers
25    pub async fn initialize_database(&self) -> Result<(), sqlx::Error> {
26        info!("Initializing market database tables...");
27
28        // Create market_hierarchy_nodes table
29        sqlx::query(
30            r#"
31            CREATE TABLE IF NOT EXISTS market_hierarchy_nodes (
32                id VARCHAR(255) PRIMARY KEY,
33                name VARCHAR(500) NOT NULL,
34                parent_id VARCHAR(255) REFERENCES market_hierarchy_nodes(id),
35                exchange VARCHAR(50) NOT NULL,
36                level INTEGER NOT NULL DEFAULT 0,
37                path TEXT NOT NULL,
38                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
39                updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
40            )
41            "#,
42        )
43        .execute(&self.pool)
44        .await?;
45
46        // Create market_instruments table
47        sqlx::query(
48            r#"
49            CREATE TABLE IF NOT EXISTS market_instruments (
50                epic VARCHAR(255) PRIMARY KEY,
51                instrument_name VARCHAR(500) NOT NULL,
52                instrument_type VARCHAR(100) NOT NULL,
53                node_id VARCHAR(255) NOT NULL REFERENCES market_hierarchy_nodes(id),
54                exchange VARCHAR(50) NOT NULL,
55                expiry VARCHAR(50) NOT NULL DEFAULT '',
56                high_limit_price DOUBLE PRECISION,
57                low_limit_price DOUBLE PRECISION,
58                market_status VARCHAR(50) NOT NULL,
59                net_change DOUBLE PRECISION,
60                percentage_change DOUBLE PRECISION,
61                update_time VARCHAR(50),
62                update_time_utc TIMESTAMPTZ,
63                bid DOUBLE PRECISION,
64                offer DOUBLE PRECISION,
65                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
66                updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
67            )
68            "#,
69        )
70        .execute(&self.pool)
71        .await?;
72
73        // Create indexes for market_hierarchy_nodes
74        let hierarchy_indexes = [
75            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_parent_id ON market_hierarchy_nodes(parent_id)",
76            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_exchange ON market_hierarchy_nodes(exchange)",
77            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_level ON market_hierarchy_nodes(level)",
78            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_path ON market_hierarchy_nodes USING gin(to_tsvector('english', path))",
79            "CREATE INDEX IF NOT EXISTS idx_market_hierarchy_name ON market_hierarchy_nodes USING gin(to_tsvector('english', name))",
80        ];
81
82        for index_sql in hierarchy_indexes {
83            sqlx::query(index_sql).execute(&self.pool).await?;
84        }
85
86        // Create indexes for market_instruments
87        let instrument_indexes = [
88            "CREATE INDEX IF NOT EXISTS idx_market_instruments_node_id ON market_instruments(node_id)",
89            "CREATE INDEX IF NOT EXISTS idx_market_instruments_exchange ON market_instruments(exchange)",
90            "CREATE INDEX IF NOT EXISTS idx_market_instruments_type ON market_instruments(instrument_type)",
91            "CREATE INDEX IF NOT EXISTS idx_market_instruments_status ON market_instruments(market_status)",
92            "CREATE INDEX IF NOT EXISTS idx_market_instruments_name ON market_instruments USING gin(to_tsvector('english', instrument_name))",
93            "CREATE INDEX IF NOT EXISTS idx_market_instruments_epic ON market_instruments(epic)",
94            "CREATE INDEX IF NOT EXISTS idx_market_instruments_expiry ON market_instruments(expiry)",
95        ];
96
97        for index_sql in instrument_indexes {
98            sqlx::query(index_sql).execute(&self.pool).await?;
99        }
100
101        // Create update timestamp function
102        sqlx::query(
103            r#"
104            CREATE OR REPLACE FUNCTION update_updated_at_column()
105            RETURNS TRIGGER AS $$
106            BEGIN
107                NEW.updated_at = NOW();
108                RETURN NEW;
109            END;
110            $$ language 'plpgsql'
111            "#,
112        )
113        .execute(&self.pool)
114        .await?;
115
116        // Create triggers
117        sqlx::query("DROP TRIGGER IF EXISTS update_market_hierarchy_nodes_updated_at ON market_hierarchy_nodes")
118            .execute(&self.pool)
119            .await?;
120
121        sqlx::query(
122            r#"
123            CREATE TRIGGER update_market_hierarchy_nodes_updated_at
124                BEFORE UPDATE ON market_hierarchy_nodes
125                FOR EACH ROW
126                EXECUTE FUNCTION update_updated_at_column()
127            "#,
128        )
129        .execute(&self.pool)
130        .await?;
131
132        sqlx::query(
133            "DROP TRIGGER IF EXISTS update_market_instruments_updated_at ON market_instruments",
134        )
135        .execute(&self.pool)
136        .await?;
137
138        sqlx::query(
139            r#"
140            CREATE TRIGGER update_market_instruments_updated_at
141                BEFORE UPDATE ON market_instruments
142                FOR EACH ROW
143                EXECUTE FUNCTION update_updated_at_column()
144            "#,
145        )
146        .execute(&self.pool)
147        .await?;
148
149        info!("Market database tables initialized successfully");
150        Ok(())
151    }
152
153    /// Stores the complete market hierarchy in the database
154    pub async fn store_market_hierarchy(
155        &self,
156        hierarchy: &[MarketNode],
157    ) -> Result<(), sqlx::Error> {
158        info!(
159            "Storing market hierarchy with {} top-level nodes",
160            hierarchy.len()
161        );
162
163        // Start a transaction
164        let mut tx = self.pool.begin().await?;
165
166        // Clear existing data for this exchange
167        sqlx::query("DELETE FROM market_instruments WHERE exchange = $1")
168            .bind(&self.exchange_name)
169            .execute(&mut *tx)
170            .await?;
171
172        sqlx::query("DELETE FROM market_hierarchy_nodes WHERE exchange = $1")
173            .bind(&self.exchange_name)
174            .execute(&mut *tx)
175            .await?;
176
177        // Store hierarchy nodes and instruments
178        let mut node_count = 0;
179        let mut instrument_count = 0;
180
181        for node in hierarchy {
182            let (nodes, instruments) = self.process_node_recursive(node, None, 0, "").await?;
183            node_count += nodes.len();
184            instrument_count += instruments.len();
185
186            // Insert nodes
187            for node in nodes {
188                self.insert_hierarchy_node(&mut tx, &node).await?;
189            }
190
191            // Insert instruments
192            for instrument in instruments {
193                self.insert_market_instrument(&mut tx, &instrument).await?;
194            }
195        }
196
197        // Commit transaction
198        tx.commit().await?;
199
200        info!(
201            "Successfully stored {} hierarchy nodes and {} instruments",
202            node_count, instrument_count
203        );
204        Ok(())
205    }
206
207    /// Stores filtered market nodes with specific epic format in a custom table
208    /// Only processes MarketNode.children where epic has format "XX.X.XXXXXXX.XX.XX" (4 dots)
209    /// Adds a symbol field based on the provided HashMap mapping
210    pub async fn store_filtered_market_nodes(
211        &self,
212        hierarchy: &[MarketNode],
213        symbol_map: &HashMap<&str, &str>,
214        table_name: &str,
215    ) -> Result<(), sqlx::Error> {
216        info!(
217            "Storing filtered market nodes to table '{}' with {} top-level nodes",
218            table_name,
219            hierarchy.len()
220        );
221
222        // Start a transaction
223        let mut tx = self.pool.begin().await?;
224
225        // Create table if it doesn't exist
226        let create_table_sql = format!(
227            r#"
228            CREATE TABLE IF NOT EXISTS {} (
229                epic VARCHAR(255) PRIMARY KEY,
230                instrumentName TEXT NOT NULL,
231                instrumentType VARCHAR(50) NOT NULL,
232                expiry VARCHAR(50),
233                lastUpdateUTC TIMESTAMP,
234                symbol VARCHAR(50)
235            )
236            "#,
237            table_name
238        );
239
240        tx.execute(sqlx::query(&create_table_sql)).await?;
241
242        // Note: No DELETE operation - using UPSERT to update existing records
243
244        let mut inserted_count = 0;
245
246        // Process all nodes recursively to find filtered markets
247        for node in hierarchy {
248            inserted_count += self
249                .process_filtered_node_recursive(node, symbol_map, table_name, &mut tx)
250                .await?;
251        }
252
253        // Commit transaction
254        tx.commit().await?;
255
256        info!(
257            "Successfully stored {} filtered instruments in table '{}'",
258            inserted_count, table_name
259        );
260        Ok(())
261    }
262
263    /// Recursively processes nodes to find and insert filtered markets
264    fn process_filtered_node_recursive<'a>(
265        &'a self,
266        node: &'a MarketNode,
267        symbol_map: &'a HashMap<&str, &str>,
268        table_name: &'a str,
269        tx: &'a mut sqlx::Transaction<'_, sqlx::Postgres>,
270    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<i32, sqlx::Error>> + 'a>> {
271        Box::pin(async move {
272            let mut count = 0;
273
274            // Process markets in current node
275            for market in &node.markets {
276                if self.is_valid_epic_format(&market.epic) {
277                    let symbol = self.find_symbol_for_market(&market.instrument_name, symbol_map);
278                    self.insert_filtered_market(market, &symbol, table_name, tx)
279                        .await?;
280                    count += 1;
281                }
282            }
283
284            // Process children recursively
285            for child in &node.children {
286                count += self
287                    .process_filtered_node_recursive(child, symbol_map, table_name, tx)
288                    .await?;
289            }
290
291            Ok(count)
292        })
293    }
294
295    /// Checks if epic has the required format: "XX.X.XXXXXXX.XX.XX" (exactly 4 dots)
296    pub fn is_valid_epic_format(&self, epic: &str) -> bool {
297        epic.matches('.').count() == 4
298    }
299
300    /// Finds the appropriate symbol for a market based on its name
301    pub fn find_symbol_for_market(
302        &self,
303        instrument_name: &str,
304        symbol_map: &HashMap<&str, &str>,
305    ) -> String {
306        let name_lower = instrument_name.to_lowercase();
307
308        for (key, value) in symbol_map {
309            if name_lower.contains(&key.to_lowercase()) {
310                return value.to_string();
311            }
312        }
313
314        // Default symbol if no match found
315        "UNKNOWN".to_string()
316    }
317
318    /// Converts updateTime from milliseconds to formatted timestamp
319    pub fn convert_update_time(&self, update_time: &Option<String>) -> Option<DateTime<Utc>> {
320        if let Some(time_str) = update_time
321            && let Ok(timestamp_ms) = time_str.parse::<i64>()
322        {
323            let timestamp_secs = timestamp_ms / 1000;
324            let nanosecs = ((timestamp_ms % 1000) * 1_000_000) as u32;
325
326            return DateTime::from_timestamp(timestamp_secs, nanosecs);
327        }
328        None
329    }
330
331    /// Inserts a filtered market into the custom table
332    async fn insert_filtered_market(
333        &self,
334        market: &MarketData,
335        symbol: &str,
336        table_name: &str,
337        tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
338    ) -> Result<(), sqlx::Error> {
339        let last_update_utc = self.convert_update_time(&market.update_time);
340
341        let insert_sql = format!(
342            r#"
343            INSERT INTO {} (epic, instrumentName, instrumentType, expiry, lastUpdateUTC, symbol)
344            VALUES ($1, $2, $3, $4, $5, $6)
345            ON CONFLICT (epic) DO UPDATE SET
346                instrumentName = EXCLUDED.instrumentName,
347                instrumentType = EXCLUDED.instrumentType,
348                expiry = EXCLUDED.expiry,
349                lastUpdateUTC = EXCLUDED.lastUpdateUTC,
350                symbol = EXCLUDED.symbol
351            "#,
352            table_name
353        );
354
355        tx.execute(
356            sqlx::query(&insert_sql)
357                .bind(&market.epic)
358                .bind(&market.instrument_name)
359                .bind(format!("{:?}", market.instrument_type))
360                .bind(&market.expiry)
361                .bind(last_update_utc)
362                .bind(symbol),
363        )
364        .await?;
365
366        Ok(())
367    }
368
369    /// Processes a node recursively to extract all nodes and instruments
370    #[allow(clippy::type_complexity)]
371    fn process_node_recursive<'a>(
372        &'a self,
373        node: &'a MarketNode,
374        parent_id: Option<&'a str>,
375        level: i32,
376        parent_path: &'a str,
377    ) -> std::pin::Pin<
378        Box<
379            dyn Future<
380                    Output = Result<(Vec<MarketHierarchyNode>, Vec<MarketInstrument>), sqlx::Error>,
381                > + 'a,
382        >,
383    > {
384        Box::pin(async move {
385            let mut all_nodes = Vec::new();
386            let mut all_instruments = Vec::new();
387
388            // Build path for the current node
389            let current_path = MarketHierarchyNode::build_path(
390                if parent_path.is_empty() {
391                    None
392                } else {
393                    Some(parent_path)
394                },
395                &node.name,
396            );
397
398            // Create current node
399            let current_node = MarketHierarchyNode::new(
400                node.id.clone(),
401                node.name.clone(),
402                parent_id.map(|s| s.to_string()),
403                self.exchange_name.clone(),
404                level,
405                current_path.clone(),
406            );
407
408            all_nodes.push(current_node);
409
410            // Process markets in this node
411            for market in &node.markets {
412                let mut instrument = self.convert_market_data_to_instrument(market, &node.id);
413                instrument.parse_update_time_utc();
414                all_instruments.push(instrument);
415            }
416
417            // Process child nodes recursively
418            for child in &node.children {
419                let (child_nodes, child_instruments) = self
420                    .process_node_recursive(child, Some(&node.id), level + 1, &current_path)
421                    .await?;
422                all_nodes.extend(child_nodes);
423                all_instruments.extend(child_instruments);
424            }
425
426            Ok((all_nodes, all_instruments))
427        })
428    }
429
430    /// Converts MarketData to MarketInstrument
431    pub fn convert_market_data_to_instrument(
432        &self,
433        market: &MarketData,
434        node_id: &str,
435    ) -> MarketInstrument {
436        let mut instrument = MarketInstrument::new(
437            market.epic.clone(),
438            market.instrument_name.clone(),
439            format!("{:?}", market.instrument_type).to_uppercase(),
440            node_id.to_string(),
441            self.exchange_name.clone(),
442        );
443
444        instrument.expiry = market.expiry.clone();
445        instrument.high_limit_price = market.high_limit_price;
446        instrument.low_limit_price = market.low_limit_price;
447        instrument.market_status = market.market_status.clone();
448        instrument.net_change = market.net_change;
449        instrument.percentage_change = market.percentage_change;
450        instrument.update_time = market.update_time.clone();
451        instrument.bid = market.bid;
452        instrument.offer = market.offer;
453
454        instrument
455    }
456
457    /// Inserts a hierarchy node into the database
458    async fn insert_hierarchy_node(
459        &self,
460        tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
461        node: &MarketHierarchyNode,
462    ) -> Result<(), sqlx::Error> {
463        tx.execute(
464            sqlx::query(
465                r#"
466                INSERT INTO market_hierarchy_nodes 
467                (id, name, parent_id, exchange, level, path, created_at, updated_at)
468                VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
469                ON CONFLICT (id) DO UPDATE SET
470                    name = EXCLUDED.name,
471                    parent_id = EXCLUDED.parent_id,
472                    exchange = EXCLUDED.exchange,
473                    level = EXCLUDED.level,
474                    path = EXCLUDED.path,
475                    updated_at = EXCLUDED.updated_at
476                "#,
477            )
478            .bind(&node.id)
479            .bind(&node.name)
480            .bind(&node.parent_id)
481            .bind(&node.exchange)
482            .bind(node.level)
483            .bind(&node.path)
484            .bind(node.created_at)
485            .bind(node.updated_at),
486        )
487        .await?;
488
489        Ok(())
490    }
491
492    /// Inserts a market instrument into the database
493    async fn insert_market_instrument(
494        &self,
495        tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
496        instrument: &MarketInstrument,
497    ) -> Result<(), sqlx::Error> {
498        tx.execute(
499            sqlx::query(
500                r#"
501                INSERT INTO market_instruments 
502                (epic, instrument_name, instrument_type, node_id, exchange, expiry,
503                 high_limit_price, low_limit_price, market_status, net_change, 
504                 percentage_change, update_time, update_time_utc, bid, offer, 
505                 created_at, updated_at)
506                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
507                ON CONFLICT (epic) DO UPDATE SET
508                    instrument_name = EXCLUDED.instrument_name,
509                    instrument_type = EXCLUDED.instrument_type,
510                    node_id = EXCLUDED.node_id,
511                    exchange = EXCLUDED.exchange,
512                    expiry = EXCLUDED.expiry,
513                    high_limit_price = EXCLUDED.high_limit_price,
514                    low_limit_price = EXCLUDED.low_limit_price,
515                    market_status = EXCLUDED.market_status,
516                    net_change = EXCLUDED.net_change,
517                    percentage_change = EXCLUDED.percentage_change,
518                    update_time = EXCLUDED.update_time,
519                    update_time_utc = EXCLUDED.update_time_utc,
520                    bid = EXCLUDED.bid,
521                    offer = EXCLUDED.offer,
522                    updated_at = EXCLUDED.updated_at
523                "#,
524            )
525            .bind(&instrument.epic)
526            .bind(&instrument.instrument_name)
527            .bind(&instrument.instrument_type)
528            .bind(&instrument.node_id)
529            .bind(&instrument.exchange)
530            .bind(&instrument.expiry)
531            .bind(instrument.high_limit_price)
532            .bind(instrument.low_limit_price)
533            .bind(&instrument.market_status)
534            .bind(instrument.net_change)
535            .bind(instrument.percentage_change)
536            .bind(&instrument.update_time)
537            .bind(instrument.update_time_utc)
538            .bind(instrument.bid)
539            .bind(instrument.offer)
540            .bind(instrument.created_at)
541            .bind(instrument.updated_at),
542        )
543        .await?;
544
545        Ok(())
546    }
547
548    /// Retrieves market hierarchy from the database
549    pub async fn get_market_hierarchy(&self) -> Result<Vec<MarketHierarchyNode>, sqlx::Error> {
550        let nodes = sqlx::query_as::<_, MarketHierarchyNode>(
551            "SELECT * FROM market_hierarchy_nodes WHERE exchange = $1 ORDER BY level, name",
552        )
553        .bind(&self.exchange_name)
554        .fetch_all(&self.pool)
555        .await?;
556
557        Ok(nodes)
558    }
559
560    /// Retrieves market instruments for a specific node
561    pub async fn get_instruments_by_node(
562        &self,
563        node_id: &str,
564    ) -> Result<Vec<MarketInstrument>, sqlx::Error> {
565        let instruments = sqlx::query_as::<_, MarketInstrument>(
566            "SELECT * FROM market_instruments WHERE node_id = $1 AND exchange = $2 ORDER BY instrument_name",
567        )
568        .bind(node_id)
569        .bind(&self.exchange_name)
570        .fetch_all(&self.pool)
571        .await?;
572
573        Ok(instruments)
574    }
575
576    /// Searches for instruments by name or epic
577    pub async fn search_instruments(
578        &self,
579        search_term: &str,
580    ) -> Result<Vec<MarketInstrument>, sqlx::Error> {
581        let instruments = sqlx::query_as::<_, MarketInstrument>(
582            r#"
583            SELECT * FROM market_instruments 
584            WHERE exchange = $1 
585            AND (
586                instrument_name ILIKE $2 
587                OR epic ILIKE $2
588                OR to_tsvector('english', instrument_name) @@ plainto_tsquery('english', $3)
589            )
590            ORDER BY instrument_name
591            LIMIT 100
592            "#,
593        )
594        .bind(&self.exchange_name)
595        .bind(format!("%{search_term}%"))
596        .bind(search_term)
597        .fetch_all(&self.pool)
598        .await?;
599
600        Ok(instruments)
601    }
602
603    /// Gets statistics about the stored data
604    pub async fn get_statistics(&self) -> Result<DatabaseStatistics, sqlx::Error> {
605        let node_count: i64 =
606            sqlx::query_scalar("SELECT COUNT(*) FROM market_hierarchy_nodes WHERE exchange = $1")
607                .bind(&self.exchange_name)
608                .fetch_one(&self.pool)
609                .await?;
610
611        let instrument_count: i64 =
612            sqlx::query_scalar("SELECT COUNT(*) FROM market_instruments WHERE exchange = $1")
613                .bind(&self.exchange_name)
614                .fetch_one(&self.pool)
615                .await?;
616
617        let instrument_types: Vec<(String, i64)> = sqlx::query(
618            "SELECT instrument_type, COUNT(*) as count FROM market_instruments WHERE exchange = $1 GROUP BY instrument_type ORDER BY count DESC",
619        )
620        .bind(&self.exchange_name)
621        .fetch_all(&self.pool)
622        .await?
623        .into_iter()
624        .map(|row| (row.get::<String, _>("instrument_type"), row.get::<i64, _>("count")))
625        .collect();
626
627        let max_depth: i32 = sqlx::query_scalar(
628            "SELECT COALESCE(MAX(level), 0) FROM market_hierarchy_nodes WHERE exchange = $1",
629        )
630        .bind(&self.exchange_name)
631        .fetch_one(&self.pool)
632        .await?;
633
634        Ok(DatabaseStatistics {
635            exchange: self.exchange_name.clone(),
636            node_count,
637            instrument_count,
638            instrument_types,
639            max_hierarchy_depth: max_depth,
640        })
641    }
642}
643
644/// Statistics about the stored market data
645#[derive(Debug, Clone)]
646pub struct DatabaseStatistics {
647    /// Name of the exchange for which statistics are collected
648    pub exchange: String,
649    /// Total number of hierarchy nodes in the database
650    pub node_count: i64,
651    /// Total number of market instruments stored
652    pub instrument_count: i64,
653    /// List of instrument types with their respective counts (type_name, count)
654    pub instrument_types: Vec<(String, i64)>,
655    /// Maximum depth level found in the market hierarchy tree
656    pub max_hierarchy_depth: i32,
657}
658
659impl DatabaseStatistics {
660    /// Prints a formatted summary of the statistics
661    pub fn print_summary(&self) {
662        info!("=== Market Database Statistics for {} ===", self.exchange);
663        info!("Hierarchy nodes: {}", self.node_count);
664        info!("Market instruments: {}", self.instrument_count);
665        info!("Maximum hierarchy depth: {}", self.max_hierarchy_depth);
666        info!("Instrument types:");
667        for (instrument_type, count) in &self.instrument_types {
668            info!("  {}: {}", instrument_type, count);
669        }
670    }
671}
672
673#[cfg(test)]
674mod tests {
675    use super::*;
676    use crate::presentation::instrument::InstrumentType;
677
678    #[tokio::test]
679    #[ignore]
680    async fn test_convert_market_data_to_instrument() {
681        let service = MarketDatabaseService::new(
682            // This would be a real pool in actual tests
683            PgPool::connect("postgresql://test")
684                .await
685                .unwrap_or_else(|_| panic!("Test requires a PostgreSQL connection")),
686            "IG".to_string(),
687        );
688
689        let market_data = MarketData {
690            epic: "IX.D.DAX.DAILY.IP".to_string(),
691            instrument_name: "Germany 40".to_string(),
692            instrument_type: InstrumentType::Indices,
693            expiry: "DFB".to_string(),
694            high_limit_price: Some(20000.0),
695            low_limit_price: Some(5000.0),
696            market_status: "TRADEABLE".to_string(),
697            net_change: Some(100.5),
698            percentage_change: Some(0.65),
699            update_time: Some("2023-12-01T10:30:00".to_string()),
700            update_time_utc: Some("2023-12-01T10:30:00Z".to_string()),
701            bid: Some(15450.2),
702            offer: Some(15451.8),
703        };
704
705        let instrument = service.convert_market_data_to_instrument(&market_data, "node_123");
706
707        assert_eq!(instrument.epic, "IX.D.DAX.DAILY.IP");
708        assert_eq!(instrument.instrument_name, "Germany 40");
709        assert_eq!(instrument.instrument_type, "INDICES");
710        assert_eq!(instrument.node_id, "node_123");
711        assert_eq!(instrument.exchange, "IG");
712        assert_eq!(instrument.expiry, "DFB");
713        assert_eq!(instrument.high_limit_price, Some(20000.0));
714        assert_eq!(instrument.bid, Some(15450.2));
715        assert_eq!(instrument.offer, Some(15451.8));
716    }
717}