tqsdk_rs/
series.rs

1//! Series API 模块
2//!
3//! 实现 K线和 Tick 订阅功能
4
5use crate::datamanager::DataManager;
6use crate::errors::{Result, TqError};
7use crate::types::{ChartInfo, SeriesData, UpdateInfo};
8use crate::websocket::TqQuoteWebsocket;
9use async_stream::stream;
10use chrono::{DateTime, Utc};
11use futures::Stream;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration as StdDuration;
15use tokio::sync::RwLock;
16use tracing::{debug, info, trace, warn};
17use uuid::Uuid;
18
19use crate::auth::Authenticator;
20
21/// Series API
22pub struct SeriesAPI {
23    dm: Arc<DataManager>,
24    ws: Arc<TqQuoteWebsocket>,
25    auth: Arc<RwLock<dyn Authenticator>>,
26    subscriptions: Arc<RwLock<HashMap<String, Arc<SeriesSubscription>>>>,
27}
28
29impl SeriesAPI {
30    /// 创建 Series API
31    pub fn new(
32        dm: Arc<DataManager>,
33        ws: Arc<TqQuoteWebsocket>,
34        auth: Arc<RwLock<dyn Authenticator>>,
35    ) -> Self {
36        SeriesAPI {
37            dm,
38            ws,
39            auth,
40            subscriptions: Arc::new(RwLock::new(HashMap::new())),
41        }
42    }
43
44    /// 订阅单合约 K线
45    pub async fn kline(
46        &self,
47        symbol: &str,
48        duration: StdDuration,
49        view_width: usize,
50    ) -> Result<Arc<SeriesSubscription>> {
51        self.subscribe(SeriesOptions {
52            symbols: vec![symbol.to_string()],
53            duration: duration.as_nanos() as i64,
54            view_width,
55            chart_id: String::new(),
56            left_kline_id: None,
57            focus_datetime: None,
58            focus_position: None,
59        })
60        .await
61    }
62
63    /// 订阅多合约 K线(对齐)
64    pub async fn kline_multi(
65        &self,
66        symbols: &[String],
67        duration: StdDuration,
68        view_width: usize,
69    ) -> Result<Arc<SeriesSubscription>> {
70        if symbols.is_empty() {
71            return Err(TqError::InvalidParameter("symbols 为空".to_string()));
72        }
73
74        self.subscribe(SeriesOptions {
75            symbols: symbols.to_vec(),
76            duration: duration.as_nanos() as i64,
77            view_width,
78            chart_id: String::new(),
79            left_kline_id: None,
80            focus_datetime: None,
81            focus_position: None,
82        })
83        .await
84    }
85
86    /// 订阅 Tick 数据
87    pub async fn tick(&self, symbol: &str, view_width: usize) -> Result<Arc<SeriesSubscription>> {
88        self.subscribe(SeriesOptions {
89            symbols: vec![symbol.to_string()],
90            duration: 0, // 0 表示 Tick
91            view_width,
92            chart_id: String::new(),
93            left_kline_id: None,
94            focus_datetime: None,
95            focus_position: None,
96        })
97        .await
98    }
99
100    /// 订阅历史 K线(使用 left_kline_id)
101    pub async fn kline_history(
102        &self,
103        symbol: &str,
104        duration: StdDuration,
105        view_width: usize,
106        left_kline_id: i64,
107    ) -> Result<Arc<SeriesSubscription>> {
108        self.subscribe(SeriesOptions {
109            symbols: vec![symbol.to_string()],
110            duration: duration.as_nanos() as i64,
111            view_width,
112            chart_id: String::new(),
113            left_kline_id: Some(left_kline_id),
114            focus_datetime: None,
115            focus_position: None,
116        })
117        .await
118    }
119
120    /// 订阅历史 K线(使用 focus_datetime)
121    pub async fn kline_history_with_focus(
122        &self,
123        symbol: &str,
124        duration: StdDuration,
125        view_width: usize,
126        focus_time: DateTime<Utc>,
127        focus_position: i32,
128    ) -> Result<Arc<SeriesSubscription>> {
129        self.subscribe(SeriesOptions {
130            symbols: vec![symbol.to_string()],
131            duration: duration.as_nanos() as i64,
132            view_width,
133            chart_id: String::new(),
134            left_kline_id: None,
135            focus_datetime: Some(focus_time),
136            focus_position: Some(focus_position),
137        })
138        .await
139    }
140
141    /// 通用订阅方法
142    async fn subscribe(&self, mut options: SeriesOptions) -> Result<Arc<SeriesSubscription>> {
143        if options.symbols.is_empty() {
144            return Err(TqError::InvalidParameter("symbols 为空".to_string()));
145        }
146        {
147            // 权限校验
148            let auth = self.auth.read().await;
149            let symbol_refs: Vec<&str> = options.symbols.iter().map(|s| s.as_str()).collect();
150            auth.has_md_grants(&symbol_refs)?;
151        }
152
153        // 生成 chart_id
154        if options.chart_id.is_empty() {
155            options.chart_id = generate_chart_id(&options);
156        }
157
158        // 检查是否已存在
159        {
160            let subs = self.subscriptions.read().await;
161            if let Some(sub) = subs.get(&options.chart_id) {
162                return Ok(Arc::clone(sub));
163            }
164        }
165
166        // 创建新订阅
167        let sub = Arc::new(SeriesSubscription::new(
168            Arc::clone(&self.dm),
169            Arc::clone(&self.ws),
170            options,
171        )?);
172
173        // // 先启动监听(注册数据更新回调)
174        // sub.start().await?;
175
176        // // 再发送 set_chart 请求(避免错过初始数据)
177        // sub.send_set_chart().await?;
178
179        // 保存订阅
180        let mut subs = self.subscriptions.write().await;
181        subs.insert(sub.options.chart_id.clone(), Arc::clone(&sub));
182
183        Ok(sub)
184    }
185}
186
187/// Series 订阅选项
188#[derive(Debug, Clone)]
189pub struct SeriesOptions {
190    pub symbols: Vec<String>,
191    pub duration: i64,
192    pub view_width: usize,
193    pub chart_id: String,
194    pub left_kline_id: Option<i64>,
195    pub focus_datetime: Option<DateTime<Utc>>,
196    pub focus_position: Option<i32>,
197}
198
199/// 生成 chart_id
200fn generate_chart_id(options: &SeriesOptions) -> String {
201    let uid = Uuid::new_v4();
202    if options.duration == 0 {
203        format!("TQRS_tick_{}", uid)
204    } else {
205        format!("TQRS_kline_{}", uid)
206    }
207}
208
209/// Series 订阅
210pub struct SeriesSubscription {
211    dm: Arc<DataManager>,
212    ws: Arc<TqQuoteWebsocket>,
213    options: SeriesOptions,
214
215    // 状态跟踪
216    last_ids: Arc<RwLock<HashMap<String, i64>>>,
217    last_left_id: Arc<RwLock<i64>>,
218    last_right_id: Arc<RwLock<i64>>,
219    chart_ready: Arc<RwLock<bool>>,
220    has_chart_sync: Arc<RwLock<bool>>,
221
222    // 回调(使用 Arc 避免数据克隆)
223    on_update: Arc<RwLock<Option<Arc<dyn Fn(Arc<SeriesData>, Arc<UpdateInfo>) + Send + Sync>>>>,
224    on_new_bar: Arc<RwLock<Option<Arc<dyn Fn(Arc<SeriesData>) + Send + Sync>>>>,
225    on_bar_update: Arc<RwLock<Option<Arc<dyn Fn(Arc<SeriesData>) + Send + Sync>>>>,
226    on_error: Arc<RwLock<Option<Arc<dyn Fn(Arc<String>) + Send + Sync>>>>,
227
228    running: Arc<RwLock<bool>>,
229}
230
231impl SeriesSubscription {
232    /// 创建订阅
233    fn new(
234        dm: Arc<DataManager>,
235        ws: Arc<TqQuoteWebsocket>,
236        options: SeriesOptions,
237    ) -> Result<Self> {
238        let mut last_ids = HashMap::new();
239        for symbol in &options.symbols {
240            last_ids.insert(symbol.clone(), -1);
241        }
242
243        Ok(SeriesSubscription {
244            dm,
245            ws,
246            options,
247            last_ids: Arc::new(RwLock::new(last_ids)),
248            last_left_id: Arc::new(RwLock::new(-1)),
249            last_right_id: Arc::new(RwLock::new(-1)),
250            chart_ready: Arc::new(RwLock::new(false)),
251            has_chart_sync: Arc::new(RwLock::new(false)),
252            on_update: Arc::new(RwLock::new(None)),
253            on_new_bar: Arc::new(RwLock::new(None)),
254            on_bar_update: Arc::new(RwLock::new(None)),
255            on_error: Arc::new(RwLock::new(None)),
256            running: Arc::new(RwLock::new(false)),
257        })
258    }
259
260    /// 发送 set_chart 请求
261    async fn send_set_chart(&self) -> Result<()> {
262        let view_width = if self.options.view_width > 10000 {
263            warn!("ViewWidth 超过最大限制,调整为 10000");
264            10000
265        } else {
266            self.options.view_width
267        };
268
269        let mut chart_req = serde_json::json!({
270            "aid": "set_chart",
271            "chart_id": self.options.chart_id,
272            "ins_list": self.options.symbols.join(","),
273            "duration": self.options.duration,
274            "view_width": view_width
275        });
276
277        // 添加历史数据参数
278        if let Some(left_kline_id) = self.options.left_kline_id {
279            chart_req["left_kline_id"] = serde_json::json!(left_kline_id);
280        } else if let (Some(focus_datetime), Some(focus_position)) =
281            (self.options.focus_datetime, self.options.focus_position)
282        {
283            chart_req["focus_datetime"] = serde_json::json!(focus_datetime.timestamp_nanos_opt());
284            chart_req["focus_position"] = serde_json::json!(focus_position);
285        }
286
287        debug!(
288            "发送 set_chart 请求: chart_id={}, symbols={:?}, view_width={}",
289            self.options.chart_id, self.options.symbols, view_width
290        );
291
292        self.ws.send(&chart_req).await?;
293        Ok(())
294    }
295
296    /// 启动监听
297    pub async fn start(&self) -> Result<()> {
298        let mut running = self.running.write().await;
299        if *running {
300            return Ok(());
301        }
302        *running = true;
303        drop(running);
304
305        info!("启动 Series 订阅: {}", self.options.chart_id);
306
307        self.start_watching().await;
308        self.send_set_chart().await?;
309        trace!("send_set_chart done");
310        Ok(())
311    }
312
313    /// 启动监听数据更新
314    async fn start_watching(&self) {
315        let dm_clone = Arc::clone(&self.dm);
316        let options = self.options.clone();
317        let last_ids = Arc::clone(&self.last_ids);
318        let last_left_id = Arc::clone(&self.last_left_id);
319        let last_right_id = Arc::clone(&self.last_right_id);
320        let chart_ready = Arc::clone(&self.chart_ready);
321        let has_chart_sync = Arc::clone(&self.has_chart_sync);
322
323        let on_update = Arc::clone(&self.on_update);
324        let on_new_bar = Arc::clone(&self.on_new_bar);
325        let on_bar_update = Arc::clone(&self.on_bar_update);
326        let on_error = Arc::clone(&self.on_error);
327        let running = Arc::clone(&self.running);
328
329        // 注册数据更新回调
330        let dm_for_callback = Arc::clone(&dm_clone);
331        dm_clone.on_data(move || {
332            let dm = Arc::clone(&dm_for_callback);
333            let options = options.clone();
334            let last_ids = Arc::clone(&last_ids);
335            let last_left_id = Arc::clone(&last_left_id);
336            let last_right_id = Arc::clone(&last_right_id);
337            let chart_ready = Arc::clone(&chart_ready);
338            let has_chart_sync = Arc::clone(&has_chart_sync);
339            let on_update = Arc::clone(&on_update);
340            let on_new_bar = Arc::clone(&on_new_bar);
341            let on_bar_update = Arc::clone(&on_bar_update);
342            let on_error = Arc::clone(&on_error);
343            let running = Arc::clone(&running);
344
345            tokio::spawn(async move {
346                let is_running = *running.read().await;
347                if !is_running {
348                    return;
349                }
350
351                let chart_id = &options.chart_id;
352                if dm.get_by_path(&["charts", chart_id]).is_none() {
353                    return;
354                }
355
356                // 处理更新
357                match process_series_update(
358                    &dm,
359                    &options,
360                    &last_ids,
361                    &last_left_id,
362                    &last_right_id,
363                    &chart_ready,
364                    &has_chart_sync,
365                )
366                .await
367                {
368                    Ok((series_data, update_info)) => {
369                        // 包装为 Arc(零拷贝共享)
370                        let series_data = Arc::new(series_data);
371                        let update_info = Arc::new(update_info);
372
373                        // 调用回调
374                        if update_info.has_chart_sync {
375                            if update_info.chart_ready {
376                                if let Some(callback) = on_update.read().await.as_ref() {
377                                    let cb = Arc::clone(callback);
378                                    let sd = Arc::clone(&series_data);
379                                    let ui = Arc::clone(&update_info);
380                                    tokio::spawn(async move {
381                                        cb(sd, ui);
382                                    });
383                                }
384                            }
385
386                            if update_info.has_new_bar && update_info.chart_ready {
387                                if let Some(callback) = on_new_bar.read().await.as_ref() {
388                                    let cb = Arc::clone(callback);
389                                    let sd = Arc::clone(&series_data);
390                                    tokio::spawn(async move {
391                                        cb(sd);
392                                    });
393                                }
394                            }
395
396                            if update_info.has_bar_update && update_info.chart_ready {
397                                if let Some(callback) = on_bar_update.read().await.as_ref() {
398                                    let cb = Arc::clone(callback);
399                                    let sd = Arc::clone(&series_data);
400                                    tokio::spawn(async move {
401                                        cb(sd);
402                                    });
403                                }
404                            }
405                        }
406                    }
407                    Err(e) => {
408                        warn!("处理 Series 更新失败: {}", e);
409                        if let Some(callback) = on_error.read().await.as_ref() {
410                            let cb = Arc::clone(callback);
411                            let err_msg = Arc::new(e.to_string());
412                            tokio::spawn(async move {
413                                cb(err_msg);
414                            });
415                        }
416                    }
417                }
418            });
419        });
420    }
421
422    /// 注册更新回调
423    pub async fn on_update<F>(&self, handler: F)
424    where
425        F: Fn(Arc<SeriesData>, Arc<UpdateInfo>) + Send + Sync + 'static,
426    {
427        let mut guard = self.on_update.write().await;
428        *guard = Some(Arc::new(handler));
429    }
430
431    /// 注册新 K线回调
432    pub async fn on_new_bar<F>(&self, handler: F)
433    where
434        F: Fn(Arc<SeriesData>) + Send + Sync + 'static,
435    {
436        let mut guard = self.on_new_bar.write().await;
437        *guard = Some(Arc::new(handler));
438    }
439
440    /// 注册 K线更新回调
441    pub async fn on_bar_update<F>(&self, handler: F)
442    where
443        F: Fn(Arc<SeriesData>) + Send + Sync + 'static,
444    {
445        let mut guard = self.on_bar_update.write().await;
446        *guard = Some(Arc::new(handler));
447    }
448
449    /// 注册错误回调
450    pub async fn on_error<F>(&self, handler: F)
451    where
452        F: Fn(Arc<String>) + Send + Sync + 'static,
453    {
454        let mut guard = self.on_error.write().await;
455        *guard = Some(Arc::new(handler));
456    }
457
458    /// 获取数据流
459    pub async fn data_stream(&self) -> impl Stream<Item = Arc<SeriesData>> {
460        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
461
462        // 注册回调
463        self.on_update(move |data, _info| {
464            let _ = tx.send(data);
465        })
466        .await;
467
468        stream! {
469            while let Some(data) = rx.recv().await {
470                yield data;
471            }
472        }
473    }
474
475    /// 关闭订阅
476    pub async fn close(&self) -> Result<()> {
477        let mut running = self.running.write().await;
478        if !*running {
479            return Ok(());
480        }
481        *running = false;
482
483        info!("关闭 Series 订阅: {}", self.options.chart_id);
484
485        // 发送取消请求
486        let cancel_req = serde_json::json!({
487            "aid": "set_chart",
488            "chart_id": self.options.chart_id,
489            "ins_list": "",
490            "duration": self.options.duration,
491            "view_width": 0
492        });
493
494        self.ws.send(&cancel_req).await?;
495        Ok(())
496    }
497}
498
499/// 处理 Series 更新
500async fn process_series_update(
501    dm: &DataManager,
502    options: &SeriesOptions,
503    last_ids: &Arc<RwLock<HashMap<String, i64>>>,
504    last_left_id: &Arc<RwLock<i64>>,
505    last_right_id: &Arc<RwLock<i64>>,
506    chart_ready: &Arc<RwLock<bool>>,
507    has_chart_sync: &Arc<RwLock<bool>>,
508) -> Result<(SeriesData, UpdateInfo)> {
509    let is_multi = options.symbols.len() > 1;
510    let is_tick = options.duration == 0;
511
512    // let has_chart_changed = dm.is_changing(&["charts", &options.chart_id]);
513    // let has_data_changed = e
514    let duration_str = options.duration.to_string();
515    // 构建数据路径和图表路径
516    let (data_path, chart_path): (Vec<&str>, Vec<&str>) = if is_tick {
517        let data_path = vec!["ticks", &options.symbols[0]];
518        let chart_path = vec!["charts", &options.chart_id];
519        (data_path, chart_path)
520    } else {
521        let data_path = vec!["klines", &options.symbols[0], &duration_str];
522        let chart_path = vec!["charts", &options.chart_id];
523        (data_path, chart_path)
524    };
525    let has_chart_changed = dm.is_changing(&chart_path);
526    let has_data_changed = dm.is_changing(&data_path);
527
528    if !has_chart_changed && !has_data_changed {
529        return Err(TqError::Other("数据未更新".to_string()));
530    }
531
532    // 获取数据
533    let series_data: SeriesData = if is_tick {
534        get_tick_data(dm, options).await?
535    } else if is_multi {
536        get_multi_kline_data(dm, options).await?
537    } else {
538        get_single_kline_data(dm, options).await?
539    };
540
541    // 检测更新信息
542    let mut update_info = UpdateInfo {
543        has_new_bar: false,
544        has_bar_update: false,
545        chart_range_changed: false,
546        has_chart_sync: false,
547        chart_ready: false,
548        new_bar_ids: HashMap::new(),
549        old_left_id: 0,
550        old_right_id: 0,
551        new_left_id: 0,
552        new_right_id: 0,
553    };
554    // 检测新 K线
555    detect_new_bars(dm, &series_data, last_ids, &mut update_info).await;
556
557    // 检测 Chart 范围变化
558    detect_chart_range_change(
559        dm,
560        &series_data,
561        last_left_id,
562        last_right_id,
563        chart_ready,
564        has_chart_sync,
565        &mut update_info,
566    )
567    .await;
568
569    Ok((series_data, update_info))
570}
571
572/// 获取单合约 K线数据
573async fn get_single_kline_data(dm: &DataManager, options: &SeriesOptions) -> Result<SeriesData> {
574    let symbol = &options.symbols[0];
575
576    // 获取 Chart 信息 - 直接从 JSON 转换为 ChartInfo
577    let mut right_id = -1i64;
578    let chart_info = dm
579        .get_by_path(&["charts", &options.chart_id])
580        .and_then(|chart_data| dm.convert_to_struct::<ChartInfo>(&chart_data).ok())
581        .map(|mut chart| {
582            right_id = chart.right_id;
583            chart.chart_id = options.chart_id.clone();
584            chart.view_width = options.view_width;
585            chart
586        });
587    let mut kline_data =
588        dm.get_klines_data(symbol, options.duration, options.view_width, right_id)?;
589
590    // 设置 Chart 信息
591    kline_data.chart_id = options.chart_id.clone();
592    kline_data.chart = chart_info;
593
594    Ok(SeriesData {
595        is_multi: false,
596        is_tick: false,
597        symbols: vec![symbol.clone()],
598        single: Some(kline_data),
599        multi: None,
600        tick_data: None,
601    })
602}
603
604/// 获取多合约 K线数据
605async fn get_multi_kline_data(dm: &DataManager, options: &SeriesOptions) -> Result<SeriesData> {
606    let multi_data = dm.get_multi_klines_data(
607        &options.symbols,
608        options.duration,
609        &options.chart_id,
610        options.view_width,
611    )?;
612
613    Ok(SeriesData {
614        is_multi: true,
615        is_tick: false,
616        symbols: options.symbols.clone(),
617        single: None,
618        multi: Some(multi_data),
619        tick_data: None,
620    })
621}
622
623/// 获取 Tick 数据
624async fn get_tick_data(dm: &DataManager, options: &SeriesOptions) -> Result<SeriesData> {
625    let symbol = &options.symbols[0];
626
627    // 获取 Chart 信息 - 直接从 JSON 转换为 ChartInfo
628    let mut right_id = -1i64;
629    let chart_info = dm
630        .get_by_path(&["charts", &options.chart_id])
631        .and_then(|chart_data| dm.convert_to_struct::<ChartInfo>(&chart_data).ok())
632        .map(|mut chart| {
633            right_id = chart.right_id;
634            chart.chart_id = options.chart_id.clone();
635            chart.view_width = options.view_width;
636            chart
637        });
638
639    let mut tick_data = dm.get_ticks_data(symbol, options.view_width, right_id)?;
640
641    // 设置 Chart 信息
642    tick_data.chart_id = options.chart_id.clone();
643    tick_data.chart = chart_info;
644
645    Ok(SeriesData {
646        is_multi: false,
647        is_tick: true,
648        symbols: vec![symbol.clone()],
649        single: None,
650        multi: None,
651        tick_data: Some(tick_data),
652    })
653}
654
655/// 检测新 K线
656async fn detect_new_bars(
657    dm: &DataManager,
658    data: &SeriesData,
659    last_ids: &Arc<RwLock<HashMap<String, i64>>>,
660    info: &mut UpdateInfo,
661) {
662    let mut ids = last_ids.write().await;
663
664    // 获取 duration 字符串(用于后续检查 K线更新)
665    let duration_str = if data.is_tick {
666        String::new()
667    } else if data.is_multi {
668        data.multi
669            .as_ref()
670            .map(|m| m.duration.to_string())
671            .unwrap_or_default()
672    } else {
673        data.single
674            .as_ref()
675            .map(|s| s.duration.to_string())
676            .unwrap_or_default()
677    };
678
679    for symbol in &data.symbols {
680        let current_id = if data.is_tick {
681            data.tick_data.as_ref().map(|t| t.last_id).unwrap_or(-1)
682        } else if data.is_multi {
683            data.multi
684                .as_ref()
685                .and_then(|m| m.metadata.get(symbol))
686                .map(|meta| meta.last_id)
687                .unwrap_or(-1)
688        } else {
689            data.single.as_ref().map(|s| s.last_id).unwrap_or(-1)
690        };
691        let last_id = ids.get(symbol).copied().unwrap_or(-1);
692        trace!("current_id = {}, last_id = {}", current_id, last_id);
693        if current_id > last_id {
694            info.has_new_bar = true;
695            info.has_bar_update = true;
696            info.new_bar_ids.insert(symbol.clone(), current_id);
697        }
698        ids.insert(symbol.clone(), current_id);
699    }
700    if !info.has_new_bar && !data.is_tick {
701        for symbol in &data.symbols {
702            if dm.is_changing(&["klines", symbol, &duration_str]) {
703                info.has_bar_update = true;
704                break;
705            }
706        }
707    }
708}
709
710/// 检测 Chart 范围变化
711async fn detect_chart_range_change(
712    dm: &DataManager,
713    data: &SeriesData,
714    last_left_id: &Arc<RwLock<i64>>,
715    last_right_id: &Arc<RwLock<i64>>,
716    chart_ready: &Arc<RwLock<bool>>,
717    has_chart_sync: &Arc<RwLock<bool>>,
718    info: &mut UpdateInfo,
719) {
720    let chart: Option<&ChartInfo> = if let Some(single) = &data.single {
721        single.chart.as_ref()
722    } else if let Some(tick) = &data.tick_data {
723        tick.chart.as_ref()
724    } else {
725        None
726    };
727
728    let multi_chart = if let Some(multi) = &data.multi {
729        if let Some(chart_data) = dm.get_by_path(&["charts", &multi.chart_id]) {
730            dm.convert_to_struct::<ChartInfo>(&chart_data)
731                .ok()
732                .map(|mut c| {
733                    c.chart_id = multi.chart_id.clone();
734                    c.view_width = multi.view_width;
735                    c
736                })
737        } else {
738            None
739        }
740    } else {
741        None
742    };
743
744    if let Some(chart) = chart.or(multi_chart.as_ref()) {
745        let mut last_left = last_left_id.write().await;
746        let mut last_right = last_right_id.write().await;
747        let mut ready = chart_ready.write().await;
748        let mut has_sync = has_chart_sync.write().await;
749
750        trace!("before compare -> last_left: {}, last_right: {}, chart.left_id: {}, chart.right_id: {}, ready: {}, chart.ready: {}, has_sync: {}",
751            *last_left, *last_right, chart.left_id, chart.right_id, *ready, chart.ready, *has_sync,
752        );
753        if chart.left_id != *last_left || chart.right_id != *last_right {
754            info.chart_range_changed = true;
755            info.old_left_id = *last_left;
756            info.old_right_id = *last_right;
757            info.new_left_id = chart.left_id;
758            info.new_right_id = chart.right_id;
759
760            *last_left = chart.left_id;
761            *last_right = chart.right_id;
762        }
763
764        if chart.ready && !*ready {
765            // 首次完成同步
766            *ready = true;
767            *has_sync = true;
768
769            info.has_chart_sync = true;
770            info.has_bar_update = true;
771            info.has_new_bar = true;
772        }
773
774        if chart.ready && !chart.more_data {
775            info.chart_ready = true;
776        }
777        trace!("after compare -> last_left: {}, last_right: {}, chart.left_id: {}, chart.right_id: {}, ready: {}, chart.ready: {}, has_sync: {}",
778            *last_left, *last_right, chart.left_id, chart.right_id, *ready, chart.ready,  *has_sync,
779        );
780        info.has_chart_sync = *has_sync
781    }
782}