1use crate::datamanager::DataManager;
6use crate::errors::{Result, TqError};
7use crate::types::{ChartInfo, SeriesData, UpdateInfo};
8use crate::websocket::TqQuoteWebsocket;
9use async_stream::stream;
10use chrono::{DateTime, Utc};
11use futures::Stream;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration as StdDuration;
15use tokio::sync::RwLock;
16use tracing::{debug, info, trace, warn};
17use uuid::Uuid;
18
19use crate::auth::Authenticator;
20
21pub struct SeriesAPI {
23 dm: Arc<DataManager>,
24 ws: Arc<TqQuoteWebsocket>,
25 auth: Arc<RwLock<dyn Authenticator>>,
26 subscriptions: Arc<RwLock<HashMap<String, Arc<SeriesSubscription>>>>,
27}
28
29impl SeriesAPI {
30 pub fn new(
32 dm: Arc<DataManager>,
33 ws: Arc<TqQuoteWebsocket>,
34 auth: Arc<RwLock<dyn Authenticator>>,
35 ) -> Self {
36 SeriesAPI {
37 dm,
38 ws,
39 auth,
40 subscriptions: Arc::new(RwLock::new(HashMap::new())),
41 }
42 }
43
44 pub async fn kline(
46 &self,
47 symbol: &str,
48 duration: StdDuration,
49 view_width: usize,
50 ) -> Result<Arc<SeriesSubscription>> {
51 self.subscribe(SeriesOptions {
52 symbols: vec![symbol.to_string()],
53 duration: duration.as_nanos() as i64,
54 view_width,
55 chart_id: String::new(),
56 left_kline_id: None,
57 focus_datetime: None,
58 focus_position: None,
59 })
60 .await
61 }
62
63 pub async fn kline_multi(
65 &self,
66 symbols: &[String],
67 duration: StdDuration,
68 view_width: usize,
69 ) -> Result<Arc<SeriesSubscription>> {
70 if symbols.is_empty() {
71 return Err(TqError::InvalidParameter("symbols 为空".to_string()));
72 }
73
74 self.subscribe(SeriesOptions {
75 symbols: symbols.to_vec(),
76 duration: duration.as_nanos() as i64,
77 view_width,
78 chart_id: String::new(),
79 left_kline_id: None,
80 focus_datetime: None,
81 focus_position: None,
82 })
83 .await
84 }
85
86 pub async fn tick(&self, symbol: &str, view_width: usize) -> Result<Arc<SeriesSubscription>> {
88 self.subscribe(SeriesOptions {
89 symbols: vec![symbol.to_string()],
90 duration: 0, view_width,
92 chart_id: String::new(),
93 left_kline_id: None,
94 focus_datetime: None,
95 focus_position: None,
96 })
97 .await
98 }
99
100 pub async fn kline_history(
102 &self,
103 symbol: &str,
104 duration: StdDuration,
105 view_width: usize,
106 left_kline_id: i64,
107 ) -> Result<Arc<SeriesSubscription>> {
108 self.subscribe(SeriesOptions {
109 symbols: vec![symbol.to_string()],
110 duration: duration.as_nanos() as i64,
111 view_width,
112 chart_id: String::new(),
113 left_kline_id: Some(left_kline_id),
114 focus_datetime: None,
115 focus_position: None,
116 })
117 .await
118 }
119
120 pub async fn kline_history_with_focus(
122 &self,
123 symbol: &str,
124 duration: StdDuration,
125 view_width: usize,
126 focus_time: DateTime<Utc>,
127 focus_position: i32,
128 ) -> Result<Arc<SeriesSubscription>> {
129 self.subscribe(SeriesOptions {
130 symbols: vec![symbol.to_string()],
131 duration: duration.as_nanos() as i64,
132 view_width,
133 chart_id: String::new(),
134 left_kline_id: None,
135 focus_datetime: Some(focus_time),
136 focus_position: Some(focus_position),
137 })
138 .await
139 }
140
141 async fn subscribe(&self, mut options: SeriesOptions) -> Result<Arc<SeriesSubscription>> {
143 if options.symbols.is_empty() {
144 return Err(TqError::InvalidParameter("symbols 为空".to_string()));
145 }
146 {
147 let auth = self.auth.read().await;
149 let symbol_refs: Vec<&str> = options.symbols.iter().map(|s| s.as_str()).collect();
150 auth.has_md_grants(&symbol_refs)?;
151 }
152
153 if options.chart_id.is_empty() {
155 options.chart_id = generate_chart_id(&options);
156 }
157
158 {
160 let subs = self.subscriptions.read().await;
161 if let Some(sub) = subs.get(&options.chart_id) {
162 return Ok(Arc::clone(sub));
163 }
164 }
165
166 let sub = Arc::new(SeriesSubscription::new(
168 Arc::clone(&self.dm),
169 Arc::clone(&self.ws),
170 options,
171 )?);
172
173 let mut subs = self.subscriptions.write().await;
181 subs.insert(sub.options.chart_id.clone(), Arc::clone(&sub));
182
183 Ok(sub)
184 }
185}
186
187#[derive(Debug, Clone)]
189pub struct SeriesOptions {
190 pub symbols: Vec<String>,
191 pub duration: i64,
192 pub view_width: usize,
193 pub chart_id: String,
194 pub left_kline_id: Option<i64>,
195 pub focus_datetime: Option<DateTime<Utc>>,
196 pub focus_position: Option<i32>,
197}
198
199fn generate_chart_id(options: &SeriesOptions) -> String {
201 let uid = Uuid::new_v4();
202 if options.duration == 0 {
203 format!("TQRS_tick_{}", uid)
204 } else {
205 format!("TQRS_kline_{}", uid)
206 }
207}
208
209pub struct SeriesSubscription {
211 dm: Arc<DataManager>,
212 ws: Arc<TqQuoteWebsocket>,
213 options: SeriesOptions,
214
215 last_ids: Arc<RwLock<HashMap<String, i64>>>,
217 last_left_id: Arc<RwLock<i64>>,
218 last_right_id: Arc<RwLock<i64>>,
219 chart_ready: Arc<RwLock<bool>>,
220 has_chart_sync: Arc<RwLock<bool>>,
221
222 on_update: Arc<RwLock<Option<Arc<dyn Fn(Arc<SeriesData>, Arc<UpdateInfo>) + Send + Sync>>>>,
224 on_new_bar: Arc<RwLock<Option<Arc<dyn Fn(Arc<SeriesData>) + Send + Sync>>>>,
225 on_bar_update: Arc<RwLock<Option<Arc<dyn Fn(Arc<SeriesData>) + Send + Sync>>>>,
226 on_error: Arc<RwLock<Option<Arc<dyn Fn(Arc<String>) + Send + Sync>>>>,
227
228 running: Arc<RwLock<bool>>,
229}
230
231impl SeriesSubscription {
232 fn new(
234 dm: Arc<DataManager>,
235 ws: Arc<TqQuoteWebsocket>,
236 options: SeriesOptions,
237 ) -> Result<Self> {
238 let mut last_ids = HashMap::new();
239 for symbol in &options.symbols {
240 last_ids.insert(symbol.clone(), -1);
241 }
242
243 Ok(SeriesSubscription {
244 dm,
245 ws,
246 options,
247 last_ids: Arc::new(RwLock::new(last_ids)),
248 last_left_id: Arc::new(RwLock::new(-1)),
249 last_right_id: Arc::new(RwLock::new(-1)),
250 chart_ready: Arc::new(RwLock::new(false)),
251 has_chart_sync: Arc::new(RwLock::new(false)),
252 on_update: Arc::new(RwLock::new(None)),
253 on_new_bar: Arc::new(RwLock::new(None)),
254 on_bar_update: Arc::new(RwLock::new(None)),
255 on_error: Arc::new(RwLock::new(None)),
256 running: Arc::new(RwLock::new(false)),
257 })
258 }
259
260 async fn send_set_chart(&self) -> Result<()> {
262 let view_width = if self.options.view_width > 10000 {
263 warn!("ViewWidth 超过最大限制,调整为 10000");
264 10000
265 } else {
266 self.options.view_width
267 };
268
269 let mut chart_req = serde_json::json!({
270 "aid": "set_chart",
271 "chart_id": self.options.chart_id,
272 "ins_list": self.options.symbols.join(","),
273 "duration": self.options.duration,
274 "view_width": view_width
275 });
276
277 if let Some(left_kline_id) = self.options.left_kline_id {
279 chart_req["left_kline_id"] = serde_json::json!(left_kline_id);
280 } else if let (Some(focus_datetime), Some(focus_position)) =
281 (self.options.focus_datetime, self.options.focus_position)
282 {
283 chart_req["focus_datetime"] = serde_json::json!(focus_datetime.timestamp_nanos_opt());
284 chart_req["focus_position"] = serde_json::json!(focus_position);
285 }
286
287 debug!(
288 "发送 set_chart 请求: chart_id={}, symbols={:?}, view_width={}",
289 self.options.chart_id, self.options.symbols, view_width
290 );
291
292 self.ws.send(&chart_req).await?;
293 Ok(())
294 }
295
296 pub async fn start(&self) -> Result<()> {
298 let mut running = self.running.write().await;
299 if *running {
300 return Ok(());
301 }
302 *running = true;
303 drop(running);
304
305 info!("启动 Series 订阅: {}", self.options.chart_id);
306
307 self.start_watching().await;
308 self.send_set_chart().await?;
309 trace!("send_set_chart done");
310 Ok(())
311 }
312
313 async fn start_watching(&self) {
315 let dm_clone = Arc::clone(&self.dm);
316 let options = self.options.clone();
317 let last_ids = Arc::clone(&self.last_ids);
318 let last_left_id = Arc::clone(&self.last_left_id);
319 let last_right_id = Arc::clone(&self.last_right_id);
320 let chart_ready = Arc::clone(&self.chart_ready);
321 let has_chart_sync = Arc::clone(&self.has_chart_sync);
322
323 let on_update = Arc::clone(&self.on_update);
324 let on_new_bar = Arc::clone(&self.on_new_bar);
325 let on_bar_update = Arc::clone(&self.on_bar_update);
326 let on_error = Arc::clone(&self.on_error);
327 let running = Arc::clone(&self.running);
328
329 let dm_for_callback = Arc::clone(&dm_clone);
331 dm_clone.on_data(move || {
332 let dm = Arc::clone(&dm_for_callback);
333 let options = options.clone();
334 let last_ids = Arc::clone(&last_ids);
335 let last_left_id = Arc::clone(&last_left_id);
336 let last_right_id = Arc::clone(&last_right_id);
337 let chart_ready = Arc::clone(&chart_ready);
338 let has_chart_sync = Arc::clone(&has_chart_sync);
339 let on_update = Arc::clone(&on_update);
340 let on_new_bar = Arc::clone(&on_new_bar);
341 let on_bar_update = Arc::clone(&on_bar_update);
342 let on_error = Arc::clone(&on_error);
343 let running = Arc::clone(&running);
344
345 tokio::spawn(async move {
346 let is_running = *running.read().await;
347 if !is_running {
348 return;
349 }
350
351 let chart_id = &options.chart_id;
352 if dm.get_by_path(&["charts", chart_id]).is_none() {
353 return;
354 }
355
356 match process_series_update(
358 &dm,
359 &options,
360 &last_ids,
361 &last_left_id,
362 &last_right_id,
363 &chart_ready,
364 &has_chart_sync,
365 )
366 .await
367 {
368 Ok((series_data, update_info)) => {
369 let series_data = Arc::new(series_data);
371 let update_info = Arc::new(update_info);
372
373 if update_info.has_chart_sync {
375 if update_info.chart_ready {
376 if let Some(callback) = on_update.read().await.as_ref() {
377 let cb = Arc::clone(callback);
378 let sd = Arc::clone(&series_data);
379 let ui = Arc::clone(&update_info);
380 tokio::spawn(async move {
381 cb(sd, ui);
382 });
383 }
384 }
385
386 if update_info.has_new_bar && update_info.chart_ready {
387 if let Some(callback) = on_new_bar.read().await.as_ref() {
388 let cb = Arc::clone(callback);
389 let sd = Arc::clone(&series_data);
390 tokio::spawn(async move {
391 cb(sd);
392 });
393 }
394 }
395
396 if update_info.has_bar_update && update_info.chart_ready {
397 if let Some(callback) = on_bar_update.read().await.as_ref() {
398 let cb = Arc::clone(callback);
399 let sd = Arc::clone(&series_data);
400 tokio::spawn(async move {
401 cb(sd);
402 });
403 }
404 }
405 }
406 }
407 Err(e) => {
408 warn!("处理 Series 更新失败: {}", e);
409 if let Some(callback) = on_error.read().await.as_ref() {
410 let cb = Arc::clone(callback);
411 let err_msg = Arc::new(e.to_string());
412 tokio::spawn(async move {
413 cb(err_msg);
414 });
415 }
416 }
417 }
418 });
419 });
420 }
421
422 pub async fn on_update<F>(&self, handler: F)
424 where
425 F: Fn(Arc<SeriesData>, Arc<UpdateInfo>) + Send + Sync + 'static,
426 {
427 let mut guard = self.on_update.write().await;
428 *guard = Some(Arc::new(handler));
429 }
430
431 pub async fn on_new_bar<F>(&self, handler: F)
433 where
434 F: Fn(Arc<SeriesData>) + Send + Sync + 'static,
435 {
436 let mut guard = self.on_new_bar.write().await;
437 *guard = Some(Arc::new(handler));
438 }
439
440 pub async fn on_bar_update<F>(&self, handler: F)
442 where
443 F: Fn(Arc<SeriesData>) + Send + Sync + 'static,
444 {
445 let mut guard = self.on_bar_update.write().await;
446 *guard = Some(Arc::new(handler));
447 }
448
449 pub async fn on_error<F>(&self, handler: F)
451 where
452 F: Fn(Arc<String>) + Send + Sync + 'static,
453 {
454 let mut guard = self.on_error.write().await;
455 *guard = Some(Arc::new(handler));
456 }
457
458 pub async fn data_stream(&self) -> impl Stream<Item = Arc<SeriesData>> {
460 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
461
462 self.on_update(move |data, _info| {
464 let _ = tx.send(data);
465 })
466 .await;
467
468 stream! {
469 while let Some(data) = rx.recv().await {
470 yield data;
471 }
472 }
473 }
474
475 pub async fn close(&self) -> Result<()> {
477 let mut running = self.running.write().await;
478 if !*running {
479 return Ok(());
480 }
481 *running = false;
482
483 info!("关闭 Series 订阅: {}", self.options.chart_id);
484
485 let cancel_req = serde_json::json!({
487 "aid": "set_chart",
488 "chart_id": self.options.chart_id,
489 "ins_list": "",
490 "duration": self.options.duration,
491 "view_width": 0
492 });
493
494 self.ws.send(&cancel_req).await?;
495 Ok(())
496 }
497}
498
499async fn process_series_update(
501 dm: &DataManager,
502 options: &SeriesOptions,
503 last_ids: &Arc<RwLock<HashMap<String, i64>>>,
504 last_left_id: &Arc<RwLock<i64>>,
505 last_right_id: &Arc<RwLock<i64>>,
506 chart_ready: &Arc<RwLock<bool>>,
507 has_chart_sync: &Arc<RwLock<bool>>,
508) -> Result<(SeriesData, UpdateInfo)> {
509 let is_multi = options.symbols.len() > 1;
510 let is_tick = options.duration == 0;
511
512 let duration_str = options.duration.to_string();
515 let (data_path, chart_path): (Vec<&str>, Vec<&str>) = if is_tick {
517 let data_path = vec!["ticks", &options.symbols[0]];
518 let chart_path = vec!["charts", &options.chart_id];
519 (data_path, chart_path)
520 } else {
521 let data_path = vec!["klines", &options.symbols[0], &duration_str];
522 let chart_path = vec!["charts", &options.chart_id];
523 (data_path, chart_path)
524 };
525 let has_chart_changed = dm.is_changing(&chart_path);
526 let has_data_changed = dm.is_changing(&data_path);
527
528 if !has_chart_changed && !has_data_changed {
529 return Err(TqError::Other("数据未更新".to_string()));
530 }
531
532 let series_data: SeriesData = if is_tick {
534 get_tick_data(dm, options).await?
535 } else if is_multi {
536 get_multi_kline_data(dm, options).await?
537 } else {
538 get_single_kline_data(dm, options).await?
539 };
540
541 let mut update_info = UpdateInfo {
543 has_new_bar: false,
544 has_bar_update: false,
545 chart_range_changed: false,
546 has_chart_sync: false,
547 chart_ready: false,
548 new_bar_ids: HashMap::new(),
549 old_left_id: 0,
550 old_right_id: 0,
551 new_left_id: 0,
552 new_right_id: 0,
553 };
554 detect_new_bars(dm, &series_data, last_ids, &mut update_info).await;
556
557 detect_chart_range_change(
559 dm,
560 &series_data,
561 last_left_id,
562 last_right_id,
563 chart_ready,
564 has_chart_sync,
565 &mut update_info,
566 )
567 .await;
568
569 Ok((series_data, update_info))
570}
571
572async fn get_single_kline_data(dm: &DataManager, options: &SeriesOptions) -> Result<SeriesData> {
574 let symbol = &options.symbols[0];
575
576 let mut right_id = -1i64;
578 let chart_info = dm
579 .get_by_path(&["charts", &options.chart_id])
580 .and_then(|chart_data| dm.convert_to_struct::<ChartInfo>(&chart_data).ok())
581 .map(|mut chart| {
582 right_id = chart.right_id;
583 chart.chart_id = options.chart_id.clone();
584 chart.view_width = options.view_width;
585 chart
586 });
587 let mut kline_data =
588 dm.get_klines_data(symbol, options.duration, options.view_width, right_id)?;
589
590 kline_data.chart_id = options.chart_id.clone();
592 kline_data.chart = chart_info;
593
594 Ok(SeriesData {
595 is_multi: false,
596 is_tick: false,
597 symbols: vec![symbol.clone()],
598 single: Some(kline_data),
599 multi: None,
600 tick_data: None,
601 })
602}
603
604async fn get_multi_kline_data(dm: &DataManager, options: &SeriesOptions) -> Result<SeriesData> {
606 let multi_data = dm.get_multi_klines_data(
607 &options.symbols,
608 options.duration,
609 &options.chart_id,
610 options.view_width,
611 )?;
612
613 Ok(SeriesData {
614 is_multi: true,
615 is_tick: false,
616 symbols: options.symbols.clone(),
617 single: None,
618 multi: Some(multi_data),
619 tick_data: None,
620 })
621}
622
623async fn get_tick_data(dm: &DataManager, options: &SeriesOptions) -> Result<SeriesData> {
625 let symbol = &options.symbols[0];
626
627 let mut right_id = -1i64;
629 let chart_info = dm
630 .get_by_path(&["charts", &options.chart_id])
631 .and_then(|chart_data| dm.convert_to_struct::<ChartInfo>(&chart_data).ok())
632 .map(|mut chart| {
633 right_id = chart.right_id;
634 chart.chart_id = options.chart_id.clone();
635 chart.view_width = options.view_width;
636 chart
637 });
638
639 let mut tick_data = dm.get_ticks_data(symbol, options.view_width, right_id)?;
640
641 tick_data.chart_id = options.chart_id.clone();
643 tick_data.chart = chart_info;
644
645 Ok(SeriesData {
646 is_multi: false,
647 is_tick: true,
648 symbols: vec![symbol.clone()],
649 single: None,
650 multi: None,
651 tick_data: Some(tick_data),
652 })
653}
654
655async fn detect_new_bars(
657 dm: &DataManager,
658 data: &SeriesData,
659 last_ids: &Arc<RwLock<HashMap<String, i64>>>,
660 info: &mut UpdateInfo,
661) {
662 let mut ids = last_ids.write().await;
663
664 let duration_str = if data.is_tick {
666 String::new()
667 } else if data.is_multi {
668 data.multi
669 .as_ref()
670 .map(|m| m.duration.to_string())
671 .unwrap_or_default()
672 } else {
673 data.single
674 .as_ref()
675 .map(|s| s.duration.to_string())
676 .unwrap_or_default()
677 };
678
679 for symbol in &data.symbols {
680 let current_id = if data.is_tick {
681 data.tick_data.as_ref().map(|t| t.last_id).unwrap_or(-1)
682 } else if data.is_multi {
683 data.multi
684 .as_ref()
685 .and_then(|m| m.metadata.get(symbol))
686 .map(|meta| meta.last_id)
687 .unwrap_or(-1)
688 } else {
689 data.single.as_ref().map(|s| s.last_id).unwrap_or(-1)
690 };
691 let last_id = ids.get(symbol).copied().unwrap_or(-1);
692 trace!("current_id = {}, last_id = {}", current_id, last_id);
693 if current_id > last_id {
694 info.has_new_bar = true;
695 info.has_bar_update = true;
696 info.new_bar_ids.insert(symbol.clone(), current_id);
697 }
698 ids.insert(symbol.clone(), current_id);
699 }
700 if !info.has_new_bar && !data.is_tick {
701 for symbol in &data.symbols {
702 if dm.is_changing(&["klines", symbol, &duration_str]) {
703 info.has_bar_update = true;
704 break;
705 }
706 }
707 }
708}
709
710async fn detect_chart_range_change(
712 dm: &DataManager,
713 data: &SeriesData,
714 last_left_id: &Arc<RwLock<i64>>,
715 last_right_id: &Arc<RwLock<i64>>,
716 chart_ready: &Arc<RwLock<bool>>,
717 has_chart_sync: &Arc<RwLock<bool>>,
718 info: &mut UpdateInfo,
719) {
720 let chart: Option<&ChartInfo> = if let Some(single) = &data.single {
721 single.chart.as_ref()
722 } else if let Some(tick) = &data.tick_data {
723 tick.chart.as_ref()
724 } else {
725 None
726 };
727
728 let multi_chart = if let Some(multi) = &data.multi {
729 if let Some(chart_data) = dm.get_by_path(&["charts", &multi.chart_id]) {
730 dm.convert_to_struct::<ChartInfo>(&chart_data)
731 .ok()
732 .map(|mut c| {
733 c.chart_id = multi.chart_id.clone();
734 c.view_width = multi.view_width;
735 c
736 })
737 } else {
738 None
739 }
740 } else {
741 None
742 };
743
744 if let Some(chart) = chart.or(multi_chart.as_ref()) {
745 let mut last_left = last_left_id.write().await;
746 let mut last_right = last_right_id.write().await;
747 let mut ready = chart_ready.write().await;
748 let mut has_sync = has_chart_sync.write().await;
749
750 trace!("before compare -> last_left: {}, last_right: {}, chart.left_id: {}, chart.right_id: {}, ready: {}, chart.ready: {}, has_sync: {}",
751 *last_left, *last_right, chart.left_id, chart.right_id, *ready, chart.ready, *has_sync,
752 );
753 if chart.left_id != *last_left || chart.right_id != *last_right {
754 info.chart_range_changed = true;
755 info.old_left_id = *last_left;
756 info.old_right_id = *last_right;
757 info.new_left_id = chart.left_id;
758 info.new_right_id = chart.right_id;
759
760 *last_left = chart.left_id;
761 *last_right = chart.right_id;
762 }
763
764 if chart.ready && !*ready {
765 *ready = true;
767 *has_sync = true;
768
769 info.has_chart_sync = true;
770 info.has_bar_update = true;
771 info.has_new_bar = true;
772 }
773
774 if chart.ready && !chart.more_data {
775 info.chart_ready = true;
776 }
777 trace!("after compare -> last_left: {}, last_right: {}, chart.left_id: {}, chart.right_id: {}, ready: {}, chart.ready: {}, has_sync: {}",
778 *last_left, *last_right, chart.left_id, chart.right_id, *ready, chart.ready, *has_sync,
779 );
780 info.has_chart_sync = *has_sync
781 }
782}