1use crate::datamanager::DataManager;
6use crate::errors::{Result, TqError};
7use crate::types::{ChartInfo, SeriesData, UpdateInfo};
8use crate::websocket::TqQuoteWebsocket;
9use async_stream::stream;
10use chrono::{DateTime, Utc};
11use futures::Stream;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration as StdDuration;
15use tokio::sync::RwLock;
16use tracing::{debug, info, trace, warn};
17use uuid::Uuid;
18
19use crate::auth::Authenticator;
20
21pub struct SeriesAPI {
23 dm: Arc<DataManager>,
24 ws: Arc<TqQuoteWebsocket>,
25 auth: Arc<RwLock<dyn Authenticator>>,
26 subscriptions: Arc<RwLock<HashMap<String, Arc<SeriesSubscription>>>>,
27}
28
29impl SeriesAPI {
30 pub fn new(
32 dm: Arc<DataManager>,
33 ws: Arc<TqQuoteWebsocket>,
34 auth: Arc<RwLock<dyn Authenticator>>,
35 ) -> Self {
36 SeriesAPI {
37 dm,
38 ws,
39 auth,
40 subscriptions: Arc::new(RwLock::new(HashMap::new())),
41 }
42 }
43
44 pub async fn kline(
46 &self,
47 symbol: &str,
48 duration: StdDuration,
49 view_width: usize,
50 ) -> Result<Arc<SeriesSubscription>> {
51 self.subscribe(SeriesOptions {
52 symbols: vec![symbol.to_string()],
53 duration: duration.as_nanos() as i64,
54 view_width,
55 chart_id: String::new(),
56 left_kline_id: None,
57 focus_datetime: None,
58 focus_position: None,
59 })
60 .await
61 }
62
63 pub async fn kline_multi(
65 &self,
66 symbols: &[String],
67 duration: StdDuration,
68 view_width: usize,
69 ) -> Result<Arc<SeriesSubscription>> {
70 if symbols.is_empty() {
71 return Err(TqError::InvalidParameter("symbols 为空".to_string()));
72 }
73
74 self.subscribe(SeriesOptions {
75 symbols: symbols.to_vec(),
76 duration: duration.as_nanos() as i64,
77 view_width,
78 chart_id: String::new(),
79 left_kline_id: None,
80 focus_datetime: None,
81 focus_position: None,
82 })
83 .await
84 }
85
86 pub async fn tick(&self, symbol: &str, view_width: usize) -> Result<Arc<SeriesSubscription>> {
88 self.subscribe(SeriesOptions {
89 symbols: vec![symbol.to_string()],
90 duration: 0, view_width,
92 chart_id: String::new(),
93 left_kline_id: None,
94 focus_datetime: None,
95 focus_position: None,
96 })
97 .await
98 }
99
100 pub async fn kline_history(
102 &self,
103 symbol: &str,
104 duration: StdDuration,
105 view_width: usize,
106 left_kline_id: i64,
107 ) -> Result<Arc<SeriesSubscription>> {
108 self.subscribe(SeriesOptions {
109 symbols: vec![symbol.to_string()],
110 duration: duration.as_nanos() as i64,
111 view_width,
112 chart_id: String::new(),
113 left_kline_id: Some(left_kline_id),
114 focus_datetime: None,
115 focus_position: None,
116 })
117 .await
118 }
119
120 pub async fn kline_history_with_focus(
122 &self,
123 symbol: &str,
124 duration: StdDuration,
125 view_width: usize,
126 focus_time: DateTime<Utc>,
127 focus_position: i32,
128 ) -> Result<Arc<SeriesSubscription>> {
129 self.subscribe(SeriesOptions {
130 symbols: vec![symbol.to_string()],
131 duration: duration.as_nanos() as i64,
132 view_width,
133 chart_id: String::new(),
134 left_kline_id: None,
135 focus_datetime: Some(focus_time),
136 focus_position: Some(focus_position),
137 })
138 .await
139 }
140
141 async fn subscribe(&self, mut options: SeriesOptions) -> Result<Arc<SeriesSubscription>> {
143 if options.symbols.is_empty() {
144 return Err(TqError::InvalidParameter("symbols 为空".to_string()));
145 }
146 {
147 let auth = self.auth.read().await;
149 let symbol_refs: Vec<&str> = options.symbols.iter().map(|s| s.as_str()).collect();
150 auth.has_md_grants(&symbol_refs)?;
151 }
152
153 if options.chart_id.is_empty() {
155 options.chart_id = generate_chart_id(&options);
156 }
157
158 {
160 let subs = self.subscriptions.read().await;
161 if let Some(sub) = subs.get(&options.chart_id) {
162 return Ok(Arc::clone(sub));
163 }
164 }
165
166 let sub = Arc::new(SeriesSubscription::new(
168 Arc::clone(&self.dm),
169 Arc::clone(&self.ws),
170 options,
171 )?);
172
173 let mut subs = self.subscriptions.write().await;
181 subs.insert(sub.options.chart_id.clone(), Arc::clone(&sub));
182
183 Ok(sub)
184 }
185}
186
187#[derive(Debug, Clone)]
189pub struct SeriesOptions {
190 pub symbols: Vec<String>,
191 pub duration: i64,
192 pub view_width: usize,
193 pub chart_id: String,
194 pub left_kline_id: Option<i64>,
195 pub focus_datetime: Option<DateTime<Utc>>,
196 pub focus_position: Option<i32>,
197}
198
199fn generate_chart_id(options: &SeriesOptions) -> String {
201 let uid = Uuid::new_v4();
202 if options.duration == 0 {
203 format!("TQRS_tick_{}", uid)
204 } else {
205 format!("TQRS_kline_{}", uid)
206 }
207}
208
209pub struct SeriesSubscription {
211 dm: Arc<DataManager>,
212 ws: Arc<TqQuoteWebsocket>,
213 options: SeriesOptions,
214
215 last_ids: Arc<RwLock<HashMap<String, i64>>>,
217 last_left_id: Arc<RwLock<i64>>,
218 last_right_id: Arc<RwLock<i64>>,
219 chart_ready: Arc<RwLock<bool>>,
220
221 on_update: Arc<RwLock<Option<Arc<dyn Fn(Arc<SeriesData>, Arc<UpdateInfo>) + Send + Sync>>>>,
223 on_new_bar: Arc<RwLock<Option<Arc<dyn Fn(Arc<SeriesData>) + Send + Sync>>>>,
224 on_bar_update: Arc<RwLock<Option<Arc<dyn Fn(Arc<SeriesData>) + Send + Sync>>>>,
225 on_error: Arc<RwLock<Option<Arc<dyn Fn(Arc<String>) + Send + Sync>>>>,
226
227 running: Arc<RwLock<bool>>,
228}
229
230impl SeriesSubscription {
231 fn new(
233 dm: Arc<DataManager>,
234 ws: Arc<TqQuoteWebsocket>,
235 options: SeriesOptions,
236 ) -> Result<Self> {
237 let mut last_ids = HashMap::new();
238 for symbol in &options.symbols {
239 last_ids.insert(symbol.clone(), -1);
240 }
241
242 Ok(SeriesSubscription {
243 dm,
244 ws,
245 options,
246 last_ids: Arc::new(RwLock::new(last_ids)),
247 last_left_id: Arc::new(RwLock::new(-1)),
248 last_right_id: Arc::new(RwLock::new(-1)),
249 chart_ready: Arc::new(RwLock::new(false)),
250 on_update: Arc::new(RwLock::new(None)),
251 on_new_bar: Arc::new(RwLock::new(None)),
252 on_bar_update: Arc::new(RwLock::new(None)),
253 on_error: Arc::new(RwLock::new(None)),
254 running: Arc::new(RwLock::new(false)),
255 })
256 }
257
258 async fn send_set_chart(&self) -> Result<()> {
260 let view_width = if self.options.view_width > 10000 {
261 warn!("ViewWidth 超过最大限制,调整为 10000");
262 10000
263 } else {
264 self.options.view_width
265 };
266
267 let mut chart_req = serde_json::json!({
268 "aid": "set_chart",
269 "chart_id": self.options.chart_id,
270 "ins_list": self.options.symbols.join(","),
271 "duration": self.options.duration,
272 "view_width": view_width
273 });
274
275 if let Some(left_kline_id) = self.options.left_kline_id {
277 chart_req["left_kline_id"] = serde_json::json!(left_kline_id);
278 } else if let (Some(focus_datetime), Some(focus_position)) =
279 (self.options.focus_datetime, self.options.focus_position)
280 {
281 chart_req["focus_datetime"] = serde_json::json!(focus_datetime.timestamp_nanos_opt());
282 chart_req["focus_position"] = serde_json::json!(focus_position);
283 }
284
285 debug!(
286 "发送 set_chart 请求: chart_id={}, symbols={:?}, view_width={}",
287 self.options.chart_id, self.options.symbols, view_width
288 );
289
290 self.ws.send(&chart_req).await?;
291 Ok(())
292 }
293
294 pub async fn start(&self) -> Result<()> {
296 let mut running = self.running.write().await;
297 if *running {
298 return Ok(());
299 }
300 *running = true;
301 drop(running);
302
303 info!("启动 Series 订阅: {}", self.options.chart_id);
304
305 self.start_watching().await;
306 self.send_set_chart().await?;
307 trace!("send_set_chart done");
308 Ok(())
309 }
310
311 async fn start_watching(&self) {
313 let dm_clone = Arc::clone(&self.dm);
314 let options = self.options.clone();
315 let last_ids = Arc::clone(&self.last_ids);
316 let last_left_id = Arc::clone(&self.last_left_id);
317 let last_right_id = Arc::clone(&self.last_right_id);
318 let chart_ready = Arc::clone(&self.chart_ready);
319 let on_update = Arc::clone(&self.on_update);
320 let on_new_bar = Arc::clone(&self.on_new_bar);
321 let on_bar_update = Arc::clone(&self.on_bar_update);
322 let on_error = Arc::clone(&self.on_error);
323 let running = Arc::clone(&self.running);
324
325 let dm_for_callback = Arc::clone(&dm_clone);
327 dm_clone.on_data(move || {
328 let dm = Arc::clone(&dm_for_callback);
329 let options = options.clone();
330 let last_ids = Arc::clone(&last_ids);
331 let last_left_id = Arc::clone(&last_left_id);
332 let last_right_id = Arc::clone(&last_right_id);
333 let chart_ready = Arc::clone(&chart_ready);
334 let on_update = Arc::clone(&on_update);
335 let on_new_bar = Arc::clone(&on_new_bar);
336 let on_bar_update = Arc::clone(&on_bar_update);
337 let on_error = Arc::clone(&on_error);
338 let running = Arc::clone(&running);
339
340
341 tokio::spawn(async move {
342 let is_running = *running.read().await;
343 if !is_running {
344 return;
345 }
346
347 let chart_id = &options.chart_id;
348 if dm.get_by_path(&["charts", chart_id]).is_none() {
349 return;
350 }
351
352 match process_series_update(
354 &dm,
355 &options,
356 &last_ids,
357 &last_left_id,
358 &last_right_id,
359 &chart_ready,
360 )
361 .await
362 {
363 Ok((series_data, update_info)) => {
364 let series_data = Arc::new(series_data);
366 let update_info = Arc::new(update_info);
367
368 if let Some(callback) = on_update.read().await.as_ref() {
370 let cb = Arc::clone(callback);
371 let sd = Arc::clone(&series_data);
372 let ui = Arc::clone(&update_info);
373 tokio::spawn(async move {
374 cb(sd, ui);
375 });
376 }
377
378 if update_info.has_new_bar {
379 if let Some(callback) = on_new_bar.read().await.as_ref() {
380 let cb = Arc::clone(callback);
381 let sd = Arc::clone(&series_data);
382 tokio::spawn(async move {
383 cb(sd);
384 });
385 }
386 }
387
388 if update_info.has_bar_update {
389 if let Some(callback) = on_bar_update.read().await.as_ref() {
390 let cb = Arc::clone(callback);
391 let sd = Arc::clone(&series_data);
392 tokio::spawn(async move {
393 cb(sd);
394 });
395 }
396 }
397 }
398 Err(e) => {
399 warn!("处理 Series 更新失败: {}", e);
400 if let Some(callback) = on_error.read().await.as_ref() {
401 let cb = Arc::clone(callback);
402 let err_msg = Arc::new(e.to_string());
403 tokio::spawn(async move {
404 cb(err_msg);
405 });
406 }
407 }
408 }
409
410 });
411 });
412 }
413
414 pub async fn on_update<F>(&self, handler: F)
416 where
417 F: Fn(Arc<SeriesData>, Arc<UpdateInfo>) + Send + Sync + 'static,
418 {
419 let mut guard = self.on_update.write().await;
420 *guard = Some(Arc::new(handler));
421 }
422
423 pub async fn on_new_bar<F>(&self, handler: F)
425 where
426 F: Fn(Arc<SeriesData>) + Send + Sync + 'static,
427 {
428 let mut guard = self.on_new_bar.write().await;
429 *guard = Some(Arc::new(handler));
430 }
431
432 pub async fn on_bar_update<F>(&self, handler: F)
434 where
435 F: Fn(Arc<SeriesData>) + Send + Sync + 'static,
436 {
437 let mut guard = self.on_bar_update.write().await;
438 *guard = Some(Arc::new(handler));
439 }
440
441 pub async fn on_error<F>(&self, handler: F)
443 where
444 F: Fn(Arc<String>) + Send + Sync + 'static,
445 {
446 let mut guard = self.on_error.write().await;
447 *guard = Some(Arc::new(handler));
448 }
449
450 pub async fn data_stream(&self) -> impl Stream<Item = Arc<SeriesData>> {
452 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
453
454 self.on_update(move |data, _info| {
456 let _ = tx.send(data);
457 })
458 .await;
459
460 stream! {
461 while let Some(data) = rx.recv().await {
462 yield data;
463 }
464 }
465 }
466
467 pub async fn close(&self) -> Result<()> {
469 let mut running = self.running.write().await;
470 if !*running {
471 return Ok(());
472 }
473 *running = false;
474
475 info!("关闭 Series 订阅: {}", self.options.chart_id);
476
477 let cancel_req = serde_json::json!({
479 "aid": "set_chart",
480 "chart_id": self.options.chart_id,
481 "ins_list": "",
482 "duration": self.options.duration,
483 "view_width": 0
484 });
485
486 self.ws.send(&cancel_req).await?;
487 Ok(())
488 }
489}
490
491async fn process_series_update(
493 dm: &DataManager,
494 options: &SeriesOptions,
495 last_ids: &Arc<RwLock<HashMap<String, i64>>>,
496 last_left_id: &Arc<RwLock<i64>>,
497 last_right_id: &Arc<RwLock<i64>>,
498 chart_ready: &Arc<RwLock<bool>>,
499) -> Result<(SeriesData, UpdateInfo)> {
500
501 let is_multi = options.symbols.len() > 1;
502 let is_tick = options.duration == 0;
503
504 let duration_str = options.duration.to_string();
507 let (data_path, chart_path): (Vec<&str>, Vec<&str>) = if is_tick {
509 let data_path = vec!["ticks", &options.symbols[0]];
510 let chart_path = vec!["charts", &options.chart_id];
511 (data_path, chart_path)
512 } else {
513 let data_path = vec!["klines", &options.symbols[0], &duration_str];
514 let chart_path = vec!["charts", &options.chart_id];
515 (data_path, chart_path)
516 };
517 let has_chart_changed = dm.is_changing(&chart_path);
518 let has_data_changed = dm.is_changing(&data_path);
519
520 if !has_chart_changed && !has_data_changed {
521 return Err(TqError::Other("数据未更新".to_string()));
522 }
523
524 let series_data: SeriesData = if is_tick {
526 get_tick_data(dm, options).await?
527 } else if is_multi {
528 get_multi_kline_data(dm, options).await?
529 } else {
530 get_single_kline_data(dm, options).await?
531 };
532
533 let mut update_info = UpdateInfo {
535 has_new_bar: false,
536 has_bar_update: false,
537 chart_range_changed: false,
538 has_chart_sync: false,
539 chart_ready: false,
540 new_bar_ids: HashMap::new(),
541 old_left_id: 0,
542 old_right_id: 0,
543 new_left_id: 0,
544 new_right_id: 0,
545 };
546 detect_new_bars(dm, &series_data, last_ids, &mut update_info).await;
548
549
550 detect_chart_range_change(
552 dm,
553 &series_data,
554 last_left_id,
555 last_right_id,
556 chart_ready,
557 &mut update_info,
558 )
559 .await;
560
561 Ok((series_data, update_info))
562}
563
564async fn get_single_kline_data(dm: &DataManager, options: &SeriesOptions) -> Result<SeriesData> {
566 let symbol = &options.symbols[0];
567
568 let mut right_id = -1i64;
570 let chart_info = dm
571 .get_by_path(&["charts", &options.chart_id])
572 .and_then(|chart_data| dm.convert_to_struct::<ChartInfo>(&chart_data).ok())
573 .map(|mut chart| {
574 right_id = chart.right_id;
575 chart.chart_id = options.chart_id.clone();
576 chart.view_width = options.view_width;
577 chart
578 });
579 let mut kline_data = dm.get_klines_data(symbol, options.duration, options.view_width, right_id)?;
580
581 kline_data.chart_id = options.chart_id.clone();
583 kline_data.chart = chart_info;
584
585 Ok(SeriesData {
586 is_multi: false,
587 is_tick: false,
588 symbols: vec![symbol.clone()],
589 single: Some(kline_data),
590 multi: None,
591 tick_data: None,
592 })
593}
594
595async fn get_multi_kline_data(dm: &DataManager, options: &SeriesOptions) -> Result<SeriesData> {
597 let multi_data = dm.get_multi_klines_data(
598 &options.symbols,
599 options.duration,
600 &options.chart_id,
601 options.view_width,
602 )?;
603
604 Ok(SeriesData {
605 is_multi: true,
606 is_tick: false,
607 symbols: options.symbols.clone(),
608 single: None,
609 multi: Some(multi_data),
610 tick_data: None,
611 })
612}
613
614async fn get_tick_data(dm: &DataManager, options: &SeriesOptions) -> Result<SeriesData> {
616 let symbol = &options.symbols[0];
617
618 let mut right_id = -1i64;
620 let chart_info = dm
621 .get_by_path(&["charts", &options.chart_id])
622 .and_then(|chart_data| dm.convert_to_struct::<ChartInfo>(&chart_data).ok())
623 .map(|mut chart| {
624 right_id = chart.right_id;
625 chart.chart_id = options.chart_id.clone();
626 chart.view_width = options.view_width;
627 chart
628 });
629
630 let mut tick_data = dm.get_ticks_data(symbol, options.view_width, right_id)?;
631
632 tick_data.chart_id = options.chart_id.clone();
634 tick_data.chart = chart_info;
635
636 Ok(SeriesData {
637 is_multi: false,
638 is_tick: true,
639 symbols: vec![symbol.clone()],
640 single: None,
641 multi: None,
642 tick_data: Some(tick_data),
643 })
644}
645
646async fn detect_new_bars(
648 dm: &DataManager,
649 data: &SeriesData,
650 last_ids: &Arc<RwLock<HashMap<String, i64>>>,
651 info: &mut UpdateInfo,
652) {
653 let mut ids = last_ids.write().await;
654
655 let duration_str = if data.is_tick {
657 String::new()
658 } else if data.is_multi {
659 data.multi.as_ref().map(|m| m.duration.to_string()).unwrap_or_default()
660 } else {
661 data.single.as_ref().map(|s| s.duration.to_string()).unwrap_or_default()
662 };
663
664 for symbol in &data.symbols {
665 let current_id = if data.is_tick {
666 data.tick_data.as_ref().map(|t| t.last_id).unwrap_or(-1)
667 } else if data.is_multi {
668 data.multi
669 .as_ref()
670 .and_then(|m| m.metadata.get(symbol))
671 .map(|meta| meta.last_id)
672 .unwrap_or(-1)
673 } else {
674 data.single.as_ref().map(|s| s.last_id).unwrap_or(-1)
675 };
676
677 let last_id = ids.get(symbol).copied().unwrap_or(-1);
678 if current_id > last_id {
679 info.has_new_bar = true;
680 info.has_bar_update = true;
681 info.new_bar_ids.insert(symbol.clone(), current_id);
682 }
683
684 ids.insert(symbol.clone(), current_id);
685 }
686 if !info.has_new_bar && !data.is_tick {
687 for symbol in &data.symbols {
688 if dm.is_changing(&["klines", symbol, &duration_str]) {
689 info.has_bar_update = true;
690 break;
691 }
692 }
693 }
694}
695
696async fn detect_chart_range_change(
698 dm: &DataManager,
699 data: &SeriesData,
700 last_left_id: &Arc<RwLock<i64>>,
701 last_right_id: &Arc<RwLock<i64>>,
702 chart_ready: &Arc<RwLock<bool>>,
703 info: &mut UpdateInfo,
704) {
705 let chart: Option<&ChartInfo> = if let Some(single) = &data.single {
706 single.chart.as_ref()
707 } else if let Some(tick) = &data.tick_data {
708 tick.chart.as_ref()
709 } else {
710 None
711 };
712
713 let multi_chart= if let Some(multi) = &data.multi {
714 if let Some(chart_data) = dm.get_by_path(&["charts", &multi.chart_id]) {
715 dm.convert_to_struct::<ChartInfo>(&chart_data).ok().map(|mut c| {
716 c.chart_id = multi.chart_id.clone();
717 c.view_width = multi.view_width;
718 c
719 })
720 } else {
721 None
722 }
723 } else {
724 None
725 };
726
727 if let Some(chart) = &chart.or(multi_chart.as_ref()) {
728 let mut last_left = last_left_id.write().await;
729 let mut last_right = last_right_id.write().await;
730 let mut ready = chart_ready.write().await;
731
732 if chart.left_id != *last_left || chart.right_id != *last_right {
733 if *last_left != -1 || *last_right != -1 {
734 info.chart_range_changed = true;
735 info.old_left_id = *last_left;
736 info.old_right_id = *last_right;
737 info.new_left_id = chart.left_id;
738 info.new_right_id = chart.right_id;
739 }
740 *last_left = chart.left_id;
741 *last_right = chart.right_id;
742 }
743
744 if chart.ready && !*ready {
745 info.has_chart_sync = true;
746 *ready = true;
747 }
748
749 if chart.ready && !chart.more_data {
750 info.chart_ready = true;
751 }
752 }
753}