1use crate::errors::{Result, TqError};
10use crate::types::*;
11use crate::utils::{nanos_to_datetime, value_to_i64};
12use chrono::Utc;
13use serde_json::Value;
14use std::collections::HashMap;
15use std::sync::atomic::{AtomicI64, Ordering};
16use std::sync::{Arc, RwLock};
17use async_channel::{Sender, Receiver, unbounded};
18use tracing::{debug, error};
19
20#[derive(Debug, Clone)]
22pub struct DataManagerConfig {
23 pub default_view_width: usize,
25 pub enable_auto_cleanup: bool,
27}
28
29impl Default for DataManagerConfig {
30 fn default() -> Self {
31 DataManagerConfig {
32 default_view_width: 10000,
33 enable_auto_cleanup: true,
34 }
35 }
36}
37
38struct PathWatcher {
40 path: Vec<String>,
41 tx: Sender<Value>,
42}
43
44pub struct DataManager {
51 data: Arc<RwLock<HashMap<String, Value>>>,
53 epoch: AtomicI64,
55 config: DataManagerConfig,
57 watchers: Arc<RwLock<HashMap<String, PathWatcher>>>,
59 on_data_callbacks: Arc<RwLock<Vec<Arc<dyn Fn() + Send + Sync>>>>,
61}
62
63impl DataManager {
64 pub fn new(initial_data: HashMap<String, Value>, config: DataManagerConfig) -> Self {
66 DataManager {
67 data: Arc::new(RwLock::new(initial_data)),
68 epoch: AtomicI64::new(0),
69 config,
70 watchers: Arc::new(RwLock::new(HashMap::new())),
71 on_data_callbacks: Arc::new(RwLock::new(Vec::new())),
72 }
73 }
74
75 pub fn on_data<F>(&self, callback: F)
77 where
78 F: Fn() + Send + Sync + 'static,
79 {
80 let mut callbacks = self.on_data_callbacks.write().unwrap();
81 callbacks.push(Arc::new(callback));
82 }
83
84 pub fn merge_data(&self, source: Value, epoch_increase: bool, delete_null: bool) {
92 let should_notify_watchers = if epoch_increase {
93 let current_epoch = self.epoch.fetch_add(1, Ordering::SeqCst) + 1;
95
96 let source_arr = match source {
98 Value::Array(arr) => arr,
99 Value::Object(_) => vec![source],
100 _ => {
101 debug!("merge_data: 无效的源数据类型");
102 return;
103 }
104 };
105
106 let mut data = self.data.write().unwrap();
108 for item in source_arr.iter() {
109 if let Value::Object(obj) = item {
110 if !obj.is_empty() {
111 self.merge_object(&mut data, obj, current_epoch, delete_null);
112 }
113 }
114 }
115 drop(data);
116
117 let callbacks = self.on_data_callbacks.read().unwrap();
119 for callback in callbacks.iter() {
120 let cb = Arc::clone(callback);
121 tokio::spawn(async move {
122 cb();
123 });
124 }
125 drop(callbacks);
126
127 true
128 } else {
129 let current_epoch = self.epoch.load(Ordering::SeqCst);
131
132 let source_arr = match source {
133 Value::Array(arr) => arr,
134 Value::Object(_) => vec![source],
135 _ => return,
136 };
137
138 let mut data = self.data.write().unwrap();
139 for item in source_arr.iter() {
140 if let Value::Object(obj) = item {
141 if !obj.is_empty() {
142 self.merge_object(&mut data, obj, current_epoch, delete_null);
143 }
144 }
145 }
146
147 false
148 };
149
150 if should_notify_watchers {
152 self.notify_watchers();
153 }
154 }
155
156 fn merge_object(
158 &self,
159 target: &mut HashMap<String, Value>,
160 source: &serde_json::Map<String, Value>,
161 epoch: i64,
162 delete_null: bool,
163 ) {
164 for (property, value) in source.iter() {
165 if value.is_null() {
166 if delete_null {
167 target.remove(property);
168 }
169 continue;
170 }
171
172 match value {
173 Value::String(s) if s == "NaN" || s == "-" => {
174 target.insert(property.clone(), Value::Null);
176 }
177 Value::Object(obj) => {
178 if property == "quotes" {
180 self.merge_quotes(target, obj, epoch, delete_null);
182 } else {
183 let target_obj = target
185 .entry(property.clone())
186 .or_insert_with(|| Value::Object(serde_json::Map::new()));
187
188 if let Value::Object(target_map) = target_obj {
189 let mut target_hashmap: HashMap<String, Value> = target_map
190 .iter()
191 .map(|(k, v)| (k.clone(), v.clone()))
192 .collect();
193
194 self.merge_object(&mut target_hashmap, obj, epoch, delete_null);
195
196 *target_map = target_hashmap
197 .into_iter()
198 .collect::<serde_json::Map<String, Value>>();
199 }
200 }
201 }
202 _ => {
203 target.insert(property.clone(), value.clone());
205 }
206 }
207 }
208
209 target.insert("_epoch".to_string(), Value::Number(epoch.into()));
211 }
212
213 fn merge_quotes(
215 &self,
216 target: &mut HashMap<String, Value>,
217 quotes: &serde_json::Map<String, Value>,
218 epoch: i64,
219 delete_null: bool,
220 ) {
221 let quotes_obj = target
222 .entry("quotes".to_string())
223 .or_insert_with(|| Value::Object(serde_json::Map::new()));
224
225 if let Value::Object(quotes_map) = quotes_obj {
226 let mut quotes_hashmap: HashMap<String, Value> = quotes_map
227 .iter()
228 .map(|(k, v)| (k.clone(), v.clone()))
229 .collect();
230
231 for (symbol, quote_data) in quotes.iter() {
232 if quote_data.is_null() {
233 if delete_null {
234 quotes_hashmap.remove(symbol);
235 }
236 continue;
237 }
238
239 if let Value::Object(quote_obj) = quote_data {
240 let target_quote = quotes_hashmap
242 .entry(symbol.clone())
243 .or_insert_with(|| Value::Object(serde_json::Map::new()));
244
245 if let Value::Object(target_quote_map) = target_quote {
246 let mut target_quote_hashmap: HashMap<String, Value> = target_quote_map
247 .iter()
248 .map(|(k, v)| (k.clone(), v.clone()))
249 .collect();
250
251 self.merge_object(&mut target_quote_hashmap, quote_obj, epoch, delete_null);
252
253 *target_quote_map = target_quote_hashmap
254 .into_iter()
255 .collect::<serde_json::Map<String, Value>>();
256 }
257 }
258 }
259
260 *quotes_map = quotes_hashmap
261 .into_iter()
262 .collect::<serde_json::Map<String, Value>>();
263 }
264 }
265
266 pub fn get_by_path(&self, path: &[&str]) -> Option<Value> {
268 let data = self.data.read().unwrap();
269 let mut current: &Value = &Value::Object(
270 data.iter()
271 .map(|(k, v)| (k.clone(), v.clone()))
272 .collect::<serde_json::Map<String, Value>>(),
273 );
274
275 for &key in path.iter() {
276 match current {
277 Value::Object(map) => {
278 if let Some(val) = map.get(key) {
279 current = val;
280 } else {
281 return None;
282 }
283 }
284 _ => return None,
285 }
286 }
287 Some(current.clone())
288 }
289
290 pub fn is_changing(&self, path: &[&str]) -> bool {
292 let current_epoch = self.epoch.load(Ordering::SeqCst);
293 let data = self.data.read().unwrap();
294
295 let mut current_value = Some(&Value::Object(
296 data.iter()
297 .map(|(k, v)| (k.clone(), v.clone()))
298 .collect::<serde_json::Map<String, Value>>(),
299 ));
300
301 for (i, &key) in path.iter().enumerate() {
302 if let Some(Value::Object(map)) = current_value {
303 if let Some(val) = map.get(key) {
304 if i == path.len() - 1 {
306 if let Some(Value::Object(obj)) = Some(val) {
307 if let Some(Value::Number(e)) = obj.get("_epoch") {
308 if let Some(e_val) = e.as_i64() {
309 if e_val == current_epoch {
310 return true;
311 }
312 }
313 }
314 }
315 }
316
317 if i < path.len() - 1 {
319 current_value = Some(val);
320 }
321 } else {
322 return false;
323 }
324 } else {
325 return false;
326 }
327 }
328
329 false
330 }
331
332 pub fn get_epoch(&self) -> i64 {
334 self.epoch.load(Ordering::SeqCst)
335 }
336
337 pub fn set_default(&self, path: &[&str], default_value: Value) -> Option<Value> {
339 let mut data = self.data.write().unwrap();
340 let current = &mut *data;
341
342 for (i, &key) in path.iter().enumerate() {
343 if i == path.len() - 1 {
344 if !current.contains_key(key) {
346 current.insert(key.to_string(), default_value.clone());
347 }
348 return current.get(key).cloned();
349 }
350
351 let entry = current
353 .entry(key.to_string())
354 .or_insert_with(|| Value::Object(serde_json::Map::new()));
355
356 if !entry.is_object() {
358 return None;
359 }
360 }
361
362 None
363 }
364
365 pub fn watch(&self, path: Vec<String>) -> Receiver<Value> {
369 let path_key = path.join(".");
370 let (tx, rx) = unbounded();
371
372 let watcher = PathWatcher {
373 path: path.clone(),
374 tx,
375 };
376
377 let mut watchers = self.watchers.write().unwrap();
378 watchers.insert(path_key, watcher);
379
380 rx
381 }
382
383 pub fn unwatch(&self, path: &[String]) -> Result<()> {
385 let path_key = path.join(".");
386 let mut watchers = self.watchers.write().unwrap();
387
388 if watchers.remove(&path_key).is_none() {
389 return Err(TqError::DataNotFound(format!("路径未监听: {}", path_key)));
390 }
391
392 Ok(())
393 }
394
395 fn notify_watchers(&self) {
397 let watchers = self.watchers.read().unwrap();
398 for (_, watcher) in watchers.iter() {
399 let path_refs: Vec<&str> = watcher.path.iter().map(|s| s.as_str()).collect();
400 if self.is_changing(&path_refs) {
401 if let Some(data) = self.get_by_path(&path_refs) {
402 let tx = watcher.tx.clone();
403 tokio::spawn(async move {
404 let _ = tx.send(data).await;
405 });
406 }
407 }
408 }
409 }
410
411 pub fn convert_to_struct<T: serde::de::DeserializeOwned>(&self, data: &Value) -> Result<T> {
413 serde_json::from_value(data.clone())
414 .map_err(|e| TqError::ParseError(format!("转换失败: {}", e)))
415 }
416
417 pub fn get_quote_data(&self, symbol: &str) -> Result<Quote> {
419 let data = self
420 .get_by_path(&["quotes", symbol])
421 .ok_or_else(|| TqError::DataNotFound(format!("Quote 未找到: {}", symbol)))?;
422
423 self.convert_to_struct(&data)
424 }
425
426 pub fn get_klines_data(
428 &self,
429 symbol: &str,
430 duration: i64,
431 view_width: usize,
432 right_id: i64,
433 ) -> Result<KlineSeriesData> {
434 let duration_str = duration.to_string();
435 let data = self
436 .get_by_path(&["klines", symbol, &duration_str])
437 .ok_or_else(|| TqError::DataNotFound(format!("K线未找到: {}/{}", symbol, duration)))?;
438
439 if let Value::Object(data_map) = data {
440 let mut kline_series = KlineSeriesData {
441 symbol: symbol.to_string(),
442 duration,
443 chart_id: String::new(),
444 chart: None,
445 last_id: value_to_i64(data_map.get("last_id").unwrap_or(&Value::Null)),
446 trading_day_start_id: value_to_i64(
447 data_map.get("trading_day_start_id").unwrap_or(&Value::Null),
448 ),
449 trading_day_end_id: value_to_i64(
450 data_map.get("trading_day_end_id").unwrap_or(&Value::Null),
451 ),
452 data: Vec::new(),
453 has_new_bar: false,
454 };
455
456 if let Some(Value::Object(kline_map)) = data_map.get("data") {
458 let mut all_klines: Vec<(i64, Kline)> = Vec::new();
459
460 for (id_str, kline_data) in kline_map.iter() {
461 if let Ok(id) = id_str.parse::<i64>() {
462 match self.convert_to_struct::<Kline>(kline_data) {
463 Ok(mut kline) => {
464 kline.id = id;
465 all_klines.push((id, kline));
466 }
467 Err(e) => {
468 error!("{}", TqError::ParseError(format!("K线数据格式错误: {}", e)));
469 }
470 }
471 }
472 }
473 all_klines.sort_by_key(|(id, _)| *id);
475
476 if right_id > 0 {
478 all_klines.retain(|(id, _)| *id <= right_id);
479 }
480
481 let vw = if view_width > 0 {
483 view_width
484 } else {
485 self.config.default_view_width
486 };
487
488 let klines: Vec<Kline> = all_klines
489 .into_iter()
490 .map(|(_, k)| k)
491 .rev()
492 .take(vw)
493 .collect::<Vec<_>>()
494 .into_iter()
495 .rev()
496 .collect();
497 kline_series.data = klines;
498 }
499
500 Ok(kline_series)
501 } else {
502 Err(TqError::ParseError("K线数据格式错误".to_string()))
503 }
504 }
505
506 pub fn get_multi_klines_data(
508 &self,
509 symbols: &[String],
510 duration: i64,
511 chart_id: &str,
512 view_width: usize,
513 ) -> Result<MultiKlineSeriesData> {
514 if symbols.is_empty() {
515 return Err(TqError::InvalidParameter("symbols 为空".to_string()));
516 }
517
518 let main_symbol = &symbols[0];
519 let duration_str = duration.to_string();
520
521 let (left_id, right_id) = if let Some(chart_data) = self.get_by_path(&["charts", chart_id])
523 {
524 if let Value::Object(chart_map) = chart_data {
525 let left = value_to_i64(chart_map.get("left_id").unwrap_or(&Value::Null));
526 let right = value_to_i64(chart_map.get("right_id").unwrap_or(&Value::Null));
527 (left, right)
528 } else {
529 (-1, -1)
530 }
531 } else {
532 (-1, -1)
533 };
534
535 let mut result = MultiKlineSeriesData {
536 chart_id: chart_id.to_string(),
537 duration,
538 main_symbol: main_symbol.clone(),
539 symbols: symbols.to_vec(),
540 left_id,
541 right_id,
542 view_width,
543 data: Vec::new(),
544 has_new_bar: false,
545 metadata: HashMap::new(),
546 };
547
548 for symbol in symbols.iter() {
550 if let Some(kline_data) = self.get_by_path(&["klines", symbol, &duration_str]) {
551 if let Value::Object(kline_map) = kline_data {
552 let metadata = KlineMetadata {
553 symbol: symbol.clone(),
554 last_id: value_to_i64(kline_map.get("last_id").unwrap_or(&Value::Null)),
555 trading_day_start_id: value_to_i64(
556 kline_map
557 .get("trading_day_start_id")
558 .unwrap_or(&Value::Null),
559 ),
560 trading_day_end_id: value_to_i64(
561 kline_map.get("trading_day_end_id").unwrap_or(&Value::Null),
562 ),
563 };
564 result.metadata.insert(symbol.clone(), metadata);
565 }
566 }
567 }
568
569 if let Some(main_kline_data) = self.get_by_path(&["klines", main_symbol, &duration_str]) {
571 if let Value::Object(main_kline_map) = main_kline_data {
572 let mut bindings: HashMap<String, HashMap<i64, i64>> = HashMap::new();
574 if let Some(Value::Object(binding_map)) = main_kline_map.get("binding") {
575 for (symbol, binding_info) in binding_map.iter() {
576 if let Value::Object(binding_id_map) = binding_info {
577 let mut id_map: HashMap<i64, i64> = HashMap::new();
578 for (main_id_str, other_id) in binding_id_map.iter() {
579 if let Ok(main_id) = main_id_str.parse::<i64>() {
580 id_map.insert(main_id, value_to_i64(other_id));
581 }
582 }
583 bindings.insert(symbol.clone(), id_map);
584 }
585 }
586 }
587
588 if let Some(Value::Object(main_data_map)) = main_kline_map.get("data") {
590 let mut main_ids: Vec<i64> = main_data_map
591 .keys()
592 .filter_map(|k| k.parse::<i64>().ok())
593 .collect();
594 main_ids.sort();
595
596 if right_id > 0 {
598 main_ids.retain(|&id| id <= right_id);
599 }
600
601 if view_width > 0 && main_ids.len() > view_width {
603 main_ids = main_ids
604 .into_iter()
605 .rev()
606 .take(view_width)
607 .collect::<Vec<_>>()
608 .into_iter()
609 .rev()
610 .collect();
611 result.left_id = main_ids[0];
612 result.right_id = main_ids[main_ids.len() - 1];
613 }
614
615 for main_id in main_ids {
617 let main_id_str = main_id.to_string();
618 let mut set = AlignedKlineSet {
619 main_id,
620 timestamp: Utc::now(),
621 klines: HashMap::new(),
622 };
623
624 if let Some(kline_data) = main_data_map.get(&main_id_str) {
626 if let Ok(mut kline) = self.convert_to_struct::<Kline>(kline_data) {
627 kline.id = main_id;
628 set.timestamp = nanos_to_datetime(kline.datetime);
629 set.klines.insert(main_symbol.clone(), kline);
630 }
631 }
632
633 for symbol in symbols.iter().skip(1) {
635 if let Some(binding) = bindings.get(symbol) {
636 if let Some(&mapped_id) = binding.get(&main_id) {
637 if let Some(other_kline_data) = self.get_by_path(&[
638 "klines",
639 symbol,
640 &duration_str,
641 "data",
642 &mapped_id.to_string(),
643 ]) {
644 if let Ok(mut kline) =
645 self.convert_to_struct::<Kline>(&other_kline_data)
646 {
647 kline.id = mapped_id;
648 set.klines.insert(symbol.clone(), kline);
649 }
650 }
651 }
652 }
653 }
654
655 result.data.push(set);
656 }
657 }
658 }
659 }
660
661 Ok(result)
662 }
663
664 pub fn get_ticks_data(
666 &self,
667 symbol: &str,
668 view_width: usize,
669 right_id: i64,
670 ) -> Result<TickSeriesData> {
671 let data = self
672 .get_by_path(&["ticks", symbol])
673 .ok_or_else(|| TqError::DataNotFound(format!("Tick 未找到: {}", symbol)))?;
674
675 if let Value::Object(data_map) = data {
676 let mut tick_series = TickSeriesData {
677 symbol: symbol.to_string(),
678 chart_id: String::new(),
679 chart: None,
680 last_id: value_to_i64(data_map.get("last_id").unwrap_or(&Value::Null)),
681 data: Vec::new(),
682 has_new_bar: false,
683 };
684
685 if let Some(Value::Object(tick_map)) = data_map.get("data") {
687 let mut all_ticks: Vec<(i64, Tick)> = Vec::new();
688
689 for (id_str, tick_data) in tick_map.iter() {
690 if let Ok(id) = id_str.parse::<i64>() {
691 if let Ok(mut tick) = self.convert_to_struct::<Tick>(tick_data) {
692 tick.id = id;
693 all_ticks.push((id, tick));
694 }
695 }
696 }
697
698 all_ticks.sort_by_key(|(id, _)| *id);
700
701 if right_id > 0 {
703 all_ticks.retain(|(id, _)| *id <= right_id);
704 }
705
706 let vw = if view_width > 0 {
708 view_width
709 } else {
710 self.config.default_view_width
711 };
712
713 let ticks: Vec<Tick> = all_ticks
714 .into_iter()
715 .map(|(_, t)| t)
716 .rev()
717 .take(vw)
718 .collect::<Vec<_>>()
719 .into_iter()
720 .rev()
721 .collect();
722
723 tick_series.data = ticks;
724 }
725
726 Ok(tick_series)
727 } else {
728 Err(TqError::ParseError("Tick数据格式错误".to_string()))
729 }
730 }
731
732 pub fn get_account_data(&self, user_id: &str, currency: &str) -> Result<Account> {
734 let data = self
735 .get_by_path(&["trade", user_id, "accounts", currency])
736 .ok_or_else(|| {
737 TqError::DataNotFound(format!("账户未找到: {}/{}", user_id, currency))
738 })?;
739
740 self.convert_to_struct(&data)
741 }
742
743 pub fn get_position_data(&self, user_id: &str, symbol: &str) -> Result<Position> {
745 let data = self
746 .get_by_path(&["trade", user_id, "positions", symbol])
747 .ok_or_else(|| TqError::DataNotFound(format!("持仓未找到: {}/{}", user_id, symbol)))?;
748
749 self.convert_to_struct(&data)
750 }
751
752 pub fn get_order_data(&self, user_id: &str, order_id: &str) -> Result<Order> {
754 let data = self
755 .get_by_path(&["trade", user_id, "orders", order_id])
756 .ok_or_else(|| {
757 TqError::DataNotFound(format!("委托单未找到: {}/{}", user_id, order_id))
758 })?;
759
760 self.convert_to_struct(&data)
761 }
762
763 pub fn get_trade_data(&self, user_id: &str, trade_id: &str) -> Result<Trade> {
765 let data = self
766 .get_by_path(&["trade", user_id, "trades", trade_id])
767 .ok_or_else(|| {
768 TqError::DataNotFound(format!("成交未找到: {}/{}", user_id, trade_id))
769 })?;
770
771 self.convert_to_struct(&data)
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778 use serde_json::json;
779
780 #[test]
781 fn test_merge_data() {
782 let initial_data = HashMap::new();
783 let config = DataManagerConfig::default();
784 let dm = DataManager::new(initial_data, config);
785
786 let source = json!({
787 "quotes": {
788 "SHFE.au2602": {
789 "last_price": 500.0,
790 "volume": 1000
791 }
792 }
793 });
794
795 dm.merge_data(source, true, false);
796
797 let quote = dm.get_by_path(&["quotes", "SHFE.au2602"]);
798 assert!(quote.is_some());
799
800 if let Some(Value::Object(quote_map)) = quote {
801 assert_eq!(quote_map.get("last_price"), Some(&json!(500.0)));
802 assert_eq!(quote_map.get("volume"), Some(&json!(1000)));
803 }
804 }
805
806 #[test]
807 fn test_is_changing() {
808 let initial_data = HashMap::new();
809 let config = DataManagerConfig::default();
810 let dm = DataManager::new(initial_data, config);
811
812 let source = json!({
813 "quotes": {
814 "SHFE.au2602": {
815 "last_price": 500.0
816 }
817 }
818 });
819
820 dm.merge_data(source, true, false);
821
822 assert!(dm.is_changing(&["quotes", "SHFE.au2602"]));
823 assert!(!dm.is_changing(&["quotes", "SHFE.ag2512"]));
824 }
825}