tqsdk_rs/
datamanager.rs

1//! 数据管理器
2//!
3//! 实现 DIFF 协议的数据合并和管理,包括:
4//! - DIFF 数据递归合并
5//! - 路径访问和版本追踪
6//! - Watch/UnWatch 路径监听
7//! - 数据类型转换
8
9use crate::errors::{Result, TqError};
10use crate::types::*;
11use crate::utils::{nanos_to_datetime, value_to_i64};
12use chrono::Utc;
13use serde_json::Value;
14use std::collections::HashMap;
15use std::sync::atomic::{AtomicI64, Ordering};
16use std::sync::{Arc, RwLock};
17use async_channel::{Sender, Receiver, unbounded};
18use tracing::{debug, error};
19
20/// 数据管理器配置
21#[derive(Debug, Clone)]
22pub struct DataManagerConfig {
23    /// 默认视图宽度
24    pub default_view_width: usize,
25    /// 启用自动清理
26    pub enable_auto_cleanup: bool,
27}
28
29impl Default for DataManagerConfig {
30    fn default() -> Self {
31        DataManagerConfig {
32            default_view_width: 10000,
33            enable_auto_cleanup: true,
34        }
35    }
36}
37
38/// 路径监听器
39struct PathWatcher {
40    path: Vec<String>,
41    tx: Sender<Value>,
42}
43
44/// 数据管理器
45///
46/// 管理所有 DIFF 协议数据,支持:
47/// - 递归合并
48/// - 版本追踪
49/// - 路径监听
50pub struct DataManager {
51    /// 数据存储
52    data: Arc<RwLock<HashMap<String, Value>>>,
53    /// 版本号(使用原子变量以提高性能)
54    epoch: AtomicI64,
55    /// 配置
56    config: DataManagerConfig,
57    /// 路径监听器
58    watchers: Arc<RwLock<HashMap<String, PathWatcher>>>,
59    /// 数据更新回调(使用 Arc 以支持异步触发)
60    on_data_callbacks: Arc<RwLock<Vec<Arc<dyn Fn() + Send + Sync>>>>,
61}
62
63impl DataManager {
64    /// 创建新的数据管理器
65    pub fn new(initial_data: HashMap<String, Value>, config: DataManagerConfig) -> Self {
66        DataManager {
67            data: Arc::new(RwLock::new(initial_data)),
68            epoch: AtomicI64::new(0),
69            config,
70            watchers: Arc::new(RwLock::new(HashMap::new())),
71            on_data_callbacks: Arc::new(RwLock::new(Vec::new())),
72        }
73    }
74
75    /// 注册数据更新回调
76    pub fn on_data<F>(&self, callback: F)
77    where
78        F: Fn() + Send + Sync + 'static,
79    {
80        let mut callbacks = self.on_data_callbacks.write().unwrap();
81        callbacks.push(Arc::new(callback));
82    }
83
84    /// 合并数据(DIFF 协议核心)
85    ///
86    /// # 参数
87    ///
88    /// * `source` - 源数据(可以是单个对象或数组)
89    /// * `epoch_increase` - 是否增加版本号
90    /// * `delete_null` - 是否删除 null 对象
91    pub fn merge_data(&self, source: Value, epoch_increase: bool, delete_null: bool) {
92        let should_notify_watchers = if epoch_increase {
93            // 增加版本号(使用原子操作)
94            let current_epoch = self.epoch.fetch_add(1, Ordering::SeqCst) + 1;
95
96            // 转换为数组
97            let source_arr = match source {
98                Value::Array(arr) => arr,
99                Value::Object(_) => vec![source],
100                _ => {
101                    debug!("merge_data: 无效的源数据类型");
102                    return;
103                }
104            };
105
106            // 合并数据
107            let mut data = self.data.write().unwrap();
108            for item in source_arr.iter() {
109                if let Value::Object(obj) = item {
110                    if !obj.is_empty() {
111                        self.merge_object(&mut data, obj, current_epoch, delete_null);
112                    }
113                }
114            }
115            drop(data);
116
117            // 异步触发回调(不阻塞数据合并)
118            let callbacks = self.on_data_callbacks.read().unwrap();
119            for callback in callbacks.iter() {
120                let cb = Arc::clone(callback);
121                tokio::spawn(async move {
122                    cb();
123                });
124            }
125            drop(callbacks);
126
127            true
128        } else {
129            // 不增加版本号,只合并(使用原子操作读取)
130            let current_epoch = self.epoch.load(Ordering::SeqCst);
131
132            let source_arr = match source {
133                Value::Array(arr) => arr,
134                Value::Object(_) => vec![source],
135                _ => return,
136            };
137
138            let mut data = self.data.write().unwrap();
139            for item in source_arr.iter() {
140                if let Value::Object(obj) = item {
141                    if !obj.is_empty() {
142                        self.merge_object(&mut data, obj, current_epoch, delete_null);
143                    }
144                }
145            }
146
147            false
148        };
149
150        // 通知 watchers
151        if should_notify_watchers {
152            self.notify_watchers();
153        }
154    }
155
156    /// 递归合并对象
157    fn merge_object(
158        &self,
159        target: &mut HashMap<String, Value>,
160        source: &serde_json::Map<String, Value>,
161        epoch: i64,
162        delete_null: bool,
163    ) {
164        for (property, value) in source.iter() {
165            if value.is_null() {
166                if delete_null {
167                    target.remove(property);
168                }
169                continue;
170            }
171
172            match value {
173                Value::String(s) if s == "NaN" || s == "-" => {
174                    // NaN 字符串处理
175                    target.insert(property.clone(), Value::Null);
176                }
177                Value::Object(obj) => {
178                    // 递归合并对象
179                    if property == "quotes" {
180                        // quotes 特殊处理
181                        self.merge_quotes(target, obj, epoch, delete_null);
182                    } else {
183                        // 确保目标存在
184                        let target_obj = target
185                            .entry(property.clone())
186                            .or_insert_with(|| Value::Object(serde_json::Map::new()));
187
188                        if let Value::Object(target_map) = target_obj {
189                            let mut target_hashmap: HashMap<String, Value> = target_map
190                                .iter()
191                                .map(|(k, v)| (k.clone(), v.clone()))
192                                .collect();
193
194                            self.merge_object(&mut target_hashmap, obj, epoch, delete_null);
195
196                            *target_map = target_hashmap
197                                .into_iter()
198                                .collect::<serde_json::Map<String, Value>>();
199                        }
200                    }
201                }
202                _ => {
203                    // 基本类型和数组直接赋值
204                    target.insert(property.clone(), value.clone());
205                }
206            }
207        }
208
209        // 设置 epoch
210        target.insert("_epoch".to_string(), Value::Number(epoch.into()));
211    }
212
213    /// 特殊处理 quotes 对象
214    fn merge_quotes(
215        &self,
216        target: &mut HashMap<String, Value>,
217        quotes: &serde_json::Map<String, Value>,
218        epoch: i64,
219        delete_null: bool,
220    ) {
221        let quotes_obj = target
222            .entry("quotes".to_string())
223            .or_insert_with(|| Value::Object(serde_json::Map::new()));
224
225        if let Value::Object(quotes_map) = quotes_obj {
226            let mut quotes_hashmap: HashMap<String, Value> = quotes_map
227                .iter()
228                .map(|(k, v)| (k.clone(), v.clone()))
229                .collect();
230
231            for (symbol, quote_data) in quotes.iter() {
232                if quote_data.is_null() {
233                    if delete_null {
234                        quotes_hashmap.remove(symbol);
235                    }
236                    continue;
237                }
238
239                if let Value::Object(quote_obj) = quote_data {
240                    // 确保目标存在
241                    let target_quote = quotes_hashmap
242                        .entry(symbol.clone())
243                        .or_insert_with(|| Value::Object(serde_json::Map::new()));
244
245                    if let Value::Object(target_quote_map) = target_quote {
246                        let mut target_quote_hashmap: HashMap<String, Value> = target_quote_map
247                            .iter()
248                            .map(|(k, v)| (k.clone(), v.clone()))
249                            .collect();
250
251                        self.merge_object(&mut target_quote_hashmap, quote_obj, epoch, delete_null);
252
253                        *target_quote_map = target_quote_hashmap
254                            .into_iter()
255                            .collect::<serde_json::Map<String, Value>>();
256                    }
257                }
258            }
259
260            *quotes_map = quotes_hashmap
261                .into_iter()
262                .collect::<serde_json::Map<String, Value>>();
263        }
264    }
265
266    /// 根据路径获取数据
267    pub fn get_by_path(&self, path: &[&str]) -> Option<Value> {
268        let data = self.data.read().unwrap();
269        let mut current: &Value = &Value::Object(
270            data.iter()
271                .map(|(k, v)| (k.clone(), v.clone()))
272                .collect::<serde_json::Map<String, Value>>(),
273        );
274
275        for &key in path.iter() {
276            match current {
277                Value::Object(map) => {
278                    if let Some(val) = map.get(key) {
279                        current = val;
280                    } else {
281                        return None;
282                    }
283                }
284                _ => return None,
285            }
286        }
287        Some(current.clone())
288    }
289
290    /// 判断指定路径的数据是否在最近一次更新中发生了变化
291    pub fn is_changing(&self, path: &[&str]) -> bool {
292        let current_epoch = self.epoch.load(Ordering::SeqCst);
293        let data = self.data.read().unwrap();
294
295        let mut current_value = Some(&Value::Object(
296            data.iter()
297                .map(|(k, v)| (k.clone(), v.clone()))
298                .collect::<serde_json::Map<String, Value>>(),
299        ));
300
301        for (i, &key) in path.iter().enumerate() {
302            if let Some(Value::Object(map)) = current_value {
303                if let Some(val) = map.get(key) {
304                    // 只在最后一层检查 epoch(避免父节点更新导致误判)
305                    if i == path.len() - 1 {
306                        if let Some(Value::Object(obj)) = Some(val) {
307                            if let Some(Value::Number(e)) = obj.get("_epoch") {
308                                if let Some(e_val) = e.as_i64() {
309                                    if e_val == current_epoch {
310                                        return true;
311                                    }
312                                }
313                            }
314                        }
315                    }
316
317                    // 继续往下查找
318                    if i < path.len() - 1 {
319                        current_value = Some(val);
320                    }
321                } else {
322                    return false;
323                }
324            } else {
325                return false;
326            }
327        }
328
329        false
330    }
331
332    /// 获取当前版本号
333    pub fn get_epoch(&self) -> i64 {
334        self.epoch.load(Ordering::SeqCst)
335    }
336
337    /// 设置默认值
338    pub fn set_default(&self, path: &[&str], default_value: Value) -> Option<Value> {
339        let mut data = self.data.write().unwrap();
340        let current = &mut *data;
341
342        for (i, &key) in path.iter().enumerate() {
343            if i == path.len() - 1 {
344                // 最后一个 key
345                if !current.contains_key(key) {
346                    current.insert(key.to_string(), default_value.clone());
347                }
348                return current.get(key).cloned();
349            }
350
351            // 中间节点
352            let entry = current
353                .entry(key.to_string())
354                .or_insert_with(|| Value::Object(serde_json::Map::new()));
355
356            // 无法继续(类型不匹配)
357            if !entry.is_object() {
358                return None;
359            }
360        }
361
362        None
363    }
364
365    /// 监听指定路径的数据变化
366    ///
367    /// 返回一个 receiver,数据变化时会推送到这个 channel
368    pub fn watch(&self, path: Vec<String>) -> Receiver<Value> {
369        let path_key = path.join(".");
370        let (tx, rx) = unbounded();
371
372        let watcher = PathWatcher {
373            path: path.clone(),
374            tx,
375        };
376
377        let mut watchers = self.watchers.write().unwrap();
378        watchers.insert(path_key, watcher);
379
380        rx
381    }
382
383    /// 取消路径监听
384    pub fn unwatch(&self, path: &[String]) -> Result<()> {
385        let path_key = path.join(".");
386        let mut watchers = self.watchers.write().unwrap();
387
388        if watchers.remove(&path_key).is_none() {
389            return Err(TqError::DataNotFound(format!("路径未监听: {}", path_key)));
390        }
391
392        Ok(())
393    }
394
395    /// 通知所有 watchers
396    fn notify_watchers(&self) {
397        let watchers = self.watchers.read().unwrap();
398        for (_, watcher) in watchers.iter() {
399            let path_refs: Vec<&str> = watcher.path.iter().map(|s| s.as_str()).collect();
400            if self.is_changing(&path_refs) {
401                if let Some(data) = self.get_by_path(&path_refs) {
402                    let tx = watcher.tx.clone();
403                    tokio::spawn(async move {
404                        let _ = tx.send(data).await;
405                    });
406                }
407            }
408        }
409    }
410
411    /// 转换为结构体
412    pub fn convert_to_struct<T: serde::de::DeserializeOwned>(&self, data: &Value) -> Result<T> {
413        serde_json::from_value(data.clone())
414            .map_err(|e| TqError::ParseError(format!("转换失败: {}", e)))
415    }
416
417    /// 获取 Quote 数据
418    pub fn get_quote_data(&self, symbol: &str) -> Result<Quote> {
419        let data = self
420            .get_by_path(&["quotes", symbol])
421            .ok_or_else(|| TqError::DataNotFound(format!("Quote 未找到: {}", symbol)))?;
422
423        self.convert_to_struct(&data)
424    }
425
426    /// 获取 K线数据
427    pub fn get_klines_data(
428        &self,
429        symbol: &str,
430        duration: i64,
431        view_width: usize,
432        right_id: i64,
433    ) -> Result<KlineSeriesData> {
434        let duration_str = duration.to_string();
435        let data = self
436            .get_by_path(&["klines", symbol, &duration_str])
437            .ok_or_else(|| TqError::DataNotFound(format!("K线未找到: {}/{}", symbol, duration)))?;
438
439        if let Value::Object(data_map) = data {
440            let mut kline_series = KlineSeriesData {
441                symbol: symbol.to_string(),
442                duration,
443                chart_id: String::new(),
444                chart: None,
445                last_id: value_to_i64(data_map.get("last_id").unwrap_or(&Value::Null)),
446                trading_day_start_id: value_to_i64(
447                    data_map.get("trading_day_start_id").unwrap_or(&Value::Null),
448                ),
449                trading_day_end_id: value_to_i64(
450                    data_map.get("trading_day_end_id").unwrap_or(&Value::Null),
451                ),
452                data: Vec::new(),
453                has_new_bar: false,
454            };
455
456            // 转换 data map 为数组
457            if let Some(Value::Object(kline_map)) = data_map.get("data") {
458                let mut all_klines: Vec<(i64, Kline)> = Vec::new();
459                
460                for (id_str, kline_data) in kline_map.iter() {
461                    if let Ok(id) = id_str.parse::<i64>() {
462                        match self.convert_to_struct::<Kline>(kline_data) {
463                            Ok(mut kline) => {
464                                kline.id = id;
465                                all_klines.push((id, kline));
466                            }
467                            Err(e) => {
468                                error!("{}", TqError::ParseError(format!("K线数据格式错误: {}", e)));
469                            }
470                        }
471                    }
472                }
473                // 按 ID 排序
474                all_klines.sort_by_key(|(id, _)| *id);
475
476                // 过滤超出 right_id 的数据
477                if right_id > 0 {
478                    all_klines.retain(|(id, _)| *id <= right_id);
479                }
480
481                // 应用 ViewWidth 限制
482                let vw = if view_width > 0 {
483                    view_width
484                } else {
485                    self.config.default_view_width
486                };
487
488                let klines: Vec<Kline> = all_klines
489                    .into_iter()
490                    .map(|(_, k)| k)
491                    .rev()
492                    .take(vw)
493                    .collect::<Vec<_>>()
494                    .into_iter()
495                    .rev()
496                    .collect();
497                kline_series.data = klines;
498            }
499
500            Ok(kline_series)
501        } else {
502            Err(TqError::ParseError("K线数据格式错误".to_string()))
503        }
504    }
505
506    /// 获取多合约对齐的 K线数据
507    pub fn get_multi_klines_data(
508        &self,
509        symbols: &[String],
510        duration: i64,
511        chart_id: &str,
512        view_width: usize,
513    ) -> Result<MultiKlineSeriesData> {
514        if symbols.is_empty() {
515            return Err(TqError::InvalidParameter("symbols 为空".to_string()));
516        }
517
518        let main_symbol = &symbols[0];
519        let duration_str = duration.to_string();
520
521        // 获取 Chart 信息
522        let (left_id, right_id) = if let Some(chart_data) = self.get_by_path(&["charts", chart_id])
523        {
524            if let Value::Object(chart_map) = chart_data {
525                let left = value_to_i64(chart_map.get("left_id").unwrap_or(&Value::Null));
526                let right = value_to_i64(chart_map.get("right_id").unwrap_or(&Value::Null));
527                (left, right)
528            } else {
529                (-1, -1)
530            }
531        } else {
532            (-1, -1)
533        };
534
535        let mut result = MultiKlineSeriesData {
536            chart_id: chart_id.to_string(),
537            duration,
538            main_symbol: main_symbol.clone(),
539            symbols: symbols.to_vec(),
540            left_id,
541            right_id,
542            view_width,
543            data: Vec::new(),
544            has_new_bar: false,
545            metadata: HashMap::new(),
546        };
547
548        // 获取每个合约的元数据
549        for symbol in symbols.iter() {
550            if let Some(kline_data) = self.get_by_path(&["klines", symbol, &duration_str]) {
551                if let Value::Object(kline_map) = kline_data {
552                    let metadata = KlineMetadata {
553                        symbol: symbol.clone(),
554                        last_id: value_to_i64(kline_map.get("last_id").unwrap_or(&Value::Null)),
555                        trading_day_start_id: value_to_i64(
556                            kline_map
557                                .get("trading_day_start_id")
558                                .unwrap_or(&Value::Null),
559                        ),
560                        trading_day_end_id: value_to_i64(
561                            kline_map.get("trading_day_end_id").unwrap_or(&Value::Null),
562                        ),
563                    };
564                    result.metadata.insert(symbol.clone(), metadata);
565                }
566            }
567        }
568
569        // 获取主合约的 K线数据
570        if let Some(main_kline_data) = self.get_by_path(&["klines", main_symbol, &duration_str]) {
571            if let Value::Object(main_kline_map) = main_kline_data {
572                // 获取 binding 信息
573                let mut bindings: HashMap<String, HashMap<i64, i64>> = HashMap::new();
574                if let Some(Value::Object(binding_map)) = main_kline_map.get("binding") {
575                    for (symbol, binding_info) in binding_map.iter() {
576                        if let Value::Object(binding_id_map) = binding_info {
577                            let mut id_map: HashMap<i64, i64> = HashMap::new();
578                            for (main_id_str, other_id) in binding_id_map.iter() {
579                                if let Ok(main_id) = main_id_str.parse::<i64>() {
580                                    id_map.insert(main_id, value_to_i64(other_id));
581                                }
582                            }
583                            bindings.insert(symbol.clone(), id_map);
584                        }
585                    }
586                }
587
588                // 获取主合约的 K线 map
589                if let Some(Value::Object(main_data_map)) = main_kline_map.get("data") {
590                    let mut main_ids: Vec<i64> = main_data_map
591                        .keys()
592                        .filter_map(|k| k.parse::<i64>().ok())
593                        .collect();
594                    main_ids.sort();
595
596                    // 过滤超出 right_id 的数据
597                    if right_id > 0 {
598                        main_ids.retain(|&id| id <= right_id);
599                    }
600
601                    // 应用 ViewWidth 限制
602                    if view_width > 0 && main_ids.len() > view_width {
603                        main_ids = main_ids
604                            .into_iter()
605                            .rev()
606                            .take(view_width)
607                            .collect::<Vec<_>>()
608                            .into_iter()
609                            .rev()
610                            .collect();
611                        result.left_id = main_ids[0];
612                        result.right_id = main_ids[main_ids.len() - 1];
613                    }
614
615                    // 对齐所有合约的 K线
616                    for main_id in main_ids {
617                        let main_id_str = main_id.to_string();
618                        let mut set = AlignedKlineSet {
619                            main_id,
620                            timestamp: Utc::now(),
621                            klines: HashMap::new(),
622                        };
623
624                        // 添加主合约 K线
625                        if let Some(kline_data) = main_data_map.get(&main_id_str) {
626                            if let Ok(mut kline) = self.convert_to_struct::<Kline>(kline_data) {
627                                kline.id = main_id;
628                                set.timestamp = nanos_to_datetime(kline.datetime);
629                                set.klines.insert(main_symbol.clone(), kline);
630                            }
631                        }
632
633                        // 添加其他合约的对齐 K线
634                        for symbol in symbols.iter().skip(1) {
635                            if let Some(binding) = bindings.get(symbol) {
636                                if let Some(&mapped_id) = binding.get(&main_id) {
637                                    if let Some(other_kline_data) = self.get_by_path(&[
638                                        "klines",
639                                        symbol,
640                                        &duration_str,
641                                        "data",
642                                        &mapped_id.to_string(),
643                                    ]) {
644                                        if let Ok(mut kline) =
645                                            self.convert_to_struct::<Kline>(&other_kline_data)
646                                        {
647                                            kline.id = mapped_id;
648                                            set.klines.insert(symbol.clone(), kline);
649                                        }
650                                    }
651                                }
652                            }
653                        }
654
655                        result.data.push(set);
656                    }
657                }
658            }
659        }
660
661        Ok(result)
662    }
663
664    /// 获取 Tick 数据
665    pub fn get_ticks_data(
666        &self,
667        symbol: &str,
668        view_width: usize,
669        right_id: i64,
670    ) -> Result<TickSeriesData> {
671        let data = self
672            .get_by_path(&["ticks", symbol])
673            .ok_or_else(|| TqError::DataNotFound(format!("Tick 未找到: {}", symbol)))?;
674
675        if let Value::Object(data_map) = data {
676            let mut tick_series = TickSeriesData {
677                symbol: symbol.to_string(),
678                chart_id: String::new(),
679                chart: None,
680                last_id: value_to_i64(data_map.get("last_id").unwrap_or(&Value::Null)),
681                data: Vec::new(),
682                has_new_bar: false,
683            };
684
685            // 转换 data map 为数组
686            if let Some(Value::Object(tick_map)) = data_map.get("data") {
687                let mut all_ticks: Vec<(i64, Tick)> = Vec::new();
688
689                for (id_str, tick_data) in tick_map.iter() {
690                    if let Ok(id) = id_str.parse::<i64>() {
691                        if let Ok(mut tick) = self.convert_to_struct::<Tick>(tick_data) {
692                            tick.id = id;
693                            all_ticks.push((id, tick));
694                        }
695                    }
696                }
697
698                // 按 ID 排序
699                all_ticks.sort_by_key(|(id, _)| *id);
700
701                // 过滤超出 right_id 的数据
702                if right_id > 0 {
703                    all_ticks.retain(|(id, _)| *id <= right_id);
704                }
705
706                // 应用 ViewWidth 限制
707                let vw = if view_width > 0 {
708                    view_width
709                } else {
710                    self.config.default_view_width
711                };
712
713                let ticks: Vec<Tick> = all_ticks
714                    .into_iter()
715                    .map(|(_, t)| t)
716                    .rev()
717                    .take(vw)
718                    .collect::<Vec<_>>()
719                    .into_iter()
720                    .rev()
721                    .collect();
722
723                tick_series.data = ticks;
724            }
725
726            Ok(tick_series)
727        } else {
728            Err(TqError::ParseError("Tick数据格式错误".to_string()))
729        }
730    }
731
732    /// 获取账户数据
733    pub fn get_account_data(&self, user_id: &str, currency: &str) -> Result<Account> {
734        let data = self
735            .get_by_path(&["trade", user_id, "accounts", currency])
736            .ok_or_else(|| {
737                TqError::DataNotFound(format!("账户未找到: {}/{}", user_id, currency))
738            })?;
739
740        self.convert_to_struct(&data)
741    }
742
743    /// 获取持仓数据
744    pub fn get_position_data(&self, user_id: &str, symbol: &str) -> Result<Position> {
745        let data = self
746            .get_by_path(&["trade", user_id, "positions", symbol])
747            .ok_or_else(|| TqError::DataNotFound(format!("持仓未找到: {}/{}", user_id, symbol)))?;
748
749        self.convert_to_struct(&data)
750    }
751
752    /// 获取委托单数据
753    pub fn get_order_data(&self, user_id: &str, order_id: &str) -> Result<Order> {
754        let data = self
755            .get_by_path(&["trade", user_id, "orders", order_id])
756            .ok_or_else(|| {
757                TqError::DataNotFound(format!("委托单未找到: {}/{}", user_id, order_id))
758            })?;
759
760        self.convert_to_struct(&data)
761    }
762
763    /// 获取成交数据
764    pub fn get_trade_data(&self, user_id: &str, trade_id: &str) -> Result<Trade> {
765        let data = self
766            .get_by_path(&["trade", user_id, "trades", trade_id])
767            .ok_or_else(|| {
768                TqError::DataNotFound(format!("成交未找到: {}/{}", user_id, trade_id))
769            })?;
770
771        self.convert_to_struct(&data)
772    }
773}
774
775#[cfg(test)]
776mod tests {
777    use super::*;
778    use serde_json::json;
779
780    #[test]
781    fn test_merge_data() {
782        let initial_data = HashMap::new();
783        let config = DataManagerConfig::default();
784        let dm = DataManager::new(initial_data, config);
785
786        let source = json!({
787            "quotes": {
788                "SHFE.au2602": {
789                    "last_price": 500.0,
790                    "volume": 1000
791                }
792            }
793        });
794
795        dm.merge_data(source, true, false);
796
797        let quote = dm.get_by_path(&["quotes", "SHFE.au2602"]);
798        assert!(quote.is_some());
799
800        if let Some(Value::Object(quote_map)) = quote {
801            assert_eq!(quote_map.get("last_price"), Some(&json!(500.0)));
802            assert_eq!(quote_map.get("volume"), Some(&json!(1000)));
803        }
804    }
805
806    #[test]
807    fn test_is_changing() {
808        let initial_data = HashMap::new();
809        let config = DataManagerConfig::default();
810        let dm = DataManager::new(initial_data, config);
811
812        let source = json!({
813            "quotes": {
814                "SHFE.au2602": {
815                    "last_price": 500.0
816                }
817            }
818        });
819
820        dm.merge_data(source, true, false);
821
822        assert!(dm.is_changing(&["quotes", "SHFE.au2602"]));
823        assert!(!dm.is_changing(&["quotes", "SHFE.ag2512"]));
824    }
825}