Skip to main content

sol_parser_sdk/grpc/
client.rs

1//! Yellowstone gRPC 客户端 - 超低延迟 DEX 事件订阅
2//!
3//! 支持多种事件输出模式:
4//! - Unordered: 10-20μs 极低延迟
5//! - MicroBatch: 50-200μs 微批次有序
6//! - StreamingOrdered: 0.1-5ms 流式有序
7//! - Ordered: 1-50ms 完全有序
8
9use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::subscribe_builder::{
11    build_subscribe_request, build_subscribe_request_with_event_filter,
12};
13use super::transaction_meta::try_yellowstone_signature;
14use super::types::*;
15use crate::core::{now_micros, EventMetadata}; // 导入高性能时钟
16use crate::instr::read_pubkey_fast;
17use crate::logs::timestamp_to_microseconds;
18use crate::DexEvent;
19use crossbeam_queue::ArrayQueue;
20use futures::{SinkExt, StreamExt};
21use log::error;
22use memchr::memmem;
23use once_cell::sync::Lazy;
24use solana_sdk::pubkey::Pubkey;
25use std::collections::HashMap;
26use std::str::FromStr;
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::Arc;
29use tokio::sync::{mpsc, Mutex};
30use tokio::task::JoinHandle;
31use tokio::time::{Duration, Instant};
32// Note: ClientTlsConfig moved to yellowstone_grpc_client in newer versions
33use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
34use yellowstone_grpc_proto::prelude::*;
35
36static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
37    Lazy::new(|| memmem::Finder::new(b"Program data: "));
38
39// ==================== YellowstoneGrpc 客户端 ====================
40
41#[derive(Clone)]
42pub struct YellowstoneGrpc {
43    endpoint: String,
44    token: Option<String>,
45    config: ClientConfig,
46    control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
47    subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
48    subscription_lifecycle: Arc<Mutex<()>>,
49    stop_signal: Arc<Mutex<Option<Arc<AtomicBool>>>>,
50}
51
52impl YellowstoneGrpc {
53    pub fn new(
54        endpoint: String,
55        token: Option<String>,
56    ) -> Result<Self, Box<dyn std::error::Error>> {
57        crate::warmup::warmup_parser();
58        Ok(Self {
59            endpoint,
60            token,
61            config: ClientConfig::default(),
62            control_tx: Arc::new(Mutex::new(None)),
63            subscription_handle: Arc::new(Mutex::new(None)),
64            subscription_lifecycle: Arc::new(Mutex::new(())),
65            stop_signal: Arc::new(Mutex::new(None)),
66        })
67    }
68
69    pub fn new_with_config(
70        endpoint: String,
71        token: Option<String>,
72        config: ClientConfig,
73    ) -> Result<Self, Box<dyn std::error::Error>> {
74        crate::warmup::warmup_parser();
75        Ok(Self {
76            endpoint,
77            token,
78            config,
79            control_tx: Arc::new(Mutex::new(None)),
80            subscription_handle: Arc::new(Mutex::new(None)),
81            subscription_lifecycle: Arc::new(Mutex::new(())),
82            stop_signal: Arc::new(Mutex::new(None)),
83        })
84    }
85
86    /// 订阅 DEX 事件(自动重连)
87    pub async fn subscribe_dex_events(
88        &self,
89        transaction_filters: Vec<TransactionFilter>,
90        account_filters: Vec<AccountFilter>,
91        event_type_filter: Option<EventTypeFilter>,
92    ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
93        let _lifecycle = self.subscription_lifecycle.lock().await;
94        self.stop_without_lifecycle_lock().await;
95
96        let queue = Arc::new(ArrayQueue::new(self.config.buffer_size.max(1)));
97        let queue_clone = Arc::clone(&queue);
98        let self_clone = self.clone();
99        let stop_signal = Arc::new(AtomicBool::new(false));
100        *self.stop_signal.lock().await = Some(Arc::clone(&stop_signal));
101
102        let handle = tokio::spawn(async move {
103            let mut delay = 1u64;
104            loop {
105                if stop_signal.load(Ordering::SeqCst) {
106                    break;
107                }
108
109                match self_clone
110                    .stream_events(
111                        &transaction_filters,
112                        &account_filters,
113                        &event_type_filter,
114                        &queue_clone,
115                    )
116                    .await
117                {
118                    Ok(_) => delay = 1,
119                    Err(e) => {
120                        if stop_signal.load(Ordering::SeqCst) {
121                            break;
122                        }
123                        error!("Grpc error: {} - retry in {}s", e, delay);
124                    }
125                }
126
127                if stop_signal.load(Ordering::SeqCst) {
128                    break;
129                }
130                tokio::time::sleep(Duration::from_secs(delay)).await;
131                delay = (delay * 2).min(60);
132            }
133        });
134
135        *self.subscription_handle.lock().await = Some(handle);
136        Ok(queue)
137    }
138
139    /// 动态更新订阅过滤器
140    pub async fn update_subscription(
141        &self,
142        transaction_filters: Vec<TransactionFilter>,
143        account_filters: Vec<AccountFilter>,
144    ) -> Result<(), Box<dyn std::error::Error>> {
145        let sender = self.control_tx.lock().await.as_ref().ok_or("No active subscription")?.clone();
146
147        let request = build_subscribe_request(&transaction_filters, &account_filters);
148        sender.send(request).await.map_err(|e| e.to_string())?;
149        Ok(())
150    }
151
152    pub async fn stop(&self) {
153        let _lifecycle = self.subscription_lifecycle.lock().await;
154        self.stop_without_lifecycle_lock().await;
155    }
156
157    async fn stop_without_lifecycle_lock(&self) {
158        if let Some(stop_signal) = self.stop_signal.lock().await.take() {
159            stop_signal.store(true, Ordering::SeqCst);
160        }
161        self.control_tx.lock().await.take();
162        let handle = self.subscription_handle.lock().await.take();
163        if let Some(handle) = handle {
164            handle.abort();
165            let _ = handle.await;
166        }
167    }
168
169    // ==================== 核心事件流处理 ====================
170
171    async fn stream_events(
172        &self,
173        tx_filters: &[TransactionFilter],
174        acc_filters: &[AccountFilter],
175        event_filter: &Option<EventTypeFilter>,
176        queue: &Arc<ArrayQueue<DexEvent>>,
177    ) -> Result<(), String> {
178        let _ = rustls::crypto::ring::default_provider().install_default();
179
180        // 构建客户端
181        let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
182            .map_err(|e| e.to_string())?
183            .x_token(self.token.clone())
184            .map_err(|e| e.to_string())?
185            .max_decoding_message_size(1024 * 1024 * 1024);
186
187        if self.config.connection_timeout_ms > 0 {
188            builder =
189                builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
190        }
191        if self.config.enable_tls {
192            builder = builder
193                .tls_config(ClientTlsConfig::new().with_native_roots())
194                .map_err(|e| e.to_string())?;
195        }
196
197        let mut client = builder.connect().await.map_err(|e| e.to_string())?;
198        let request = build_subscribe_request_with_event_filter(
199            tx_filters,
200            acc_filters,
201            event_filter.as_ref(),
202            CommitmentLevel::Processed,
203        );
204
205        let (subscribe_tx, mut stream) =
206            client.subscribe_with_request(Some(request)).await.map_err(|e| e.to_string())?;
207
208        self.print_mode_info();
209
210        // 设置控制通道
211        let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
212        *self.control_tx.lock().await = Some(control_tx);
213        let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
214
215        // 初始化缓冲区
216        let mut slot_buffer = SlotBuffer::new();
217        let mut micro_batch = MicroBatchBuffer::new();
218        let mut last_slot = 0u64;
219
220        let order_mode = self.config.order_mode;
221        let timeout_ms = self.config.order_timeout_ms;
222        let batch_us = self.config.micro_batch_us;
223        let check_interval = Duration::from_millis(timeout_ms / 2);
224        let mut next_check = Instant::now() + check_interval;
225
226        loop {
227            // Periodic timeout check for ordered modes and MicroBatch
228            self.check_timeout(
229                order_mode,
230                &mut slot_buffer,
231                &mut micro_batch,
232                queue,
233                timeout_ms,
234                batch_us,
235                &mut next_check,
236                check_interval,
237            );
238
239            tokio::select! {
240                msg = stream.next() => {
241                    match msg {
242                        Some(Ok(update)) => {
243                            // Geyser 会周期性下发 ping;必须在同一 subscribe 流上回写 SubscribeRequest.ping,否则公共节点 / LB 可能 RST_STREAM。
244                            if matches!(
245                                update.update_oneof.as_ref(),
246                                Some(subscribe_update::UpdateOneof::Ping(_))
247                            ) {
248                                if let Err(e) = subscribe_tx
249                                    .lock()
250                                    .await
251                                    .send(SubscribeRequest {
252                                        ping: Some(SubscribeRequestPing { id: 1 }),
253                                        ..Default::default()
254                                    })
255                                    .await
256                                {
257                                    self.control_tx.lock().await.take();
258                                    return Err(e.to_string());
259                                }
260                                continue;
261                            }
262                            self.handle_update(
263                                update, order_mode, event_filter, queue,
264                                &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
265                            );
266                        }
267                        Some(Err(e)) => {
268                            error!("Grpc Stream error: {:?}", e);
269                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
270                            self.control_tx.lock().await.take();
271                            return Err(e.to_string());
272                        }
273                        None => {
274                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
275                            self.control_tx.lock().await.take();
276                            return Ok(());
277                        }
278                    }
279                }
280                Some(req) = control_rx.recv() => {
281                    if let Err(e) = subscribe_tx.lock().await.send(req).await {
282                        self.control_tx.lock().await.take();
283                        return Err(e.to_string());
284                    }
285                }
286            }
287        }
288    }
289
290    fn print_mode_info(&self) {
291        match self.config.order_mode {
292            OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
293            OrderMode::Ordered => {
294                println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms)
295            }
296            OrderMode::StreamingOrdered => {
297                println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms)
298            }
299            OrderMode::MicroBatch => {
300                println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us)
301            }
302        }
303    }
304
305    #[inline]
306    fn check_timeout(
307        &self,
308        mode: OrderMode,
309        slot_buf: &mut SlotBuffer,
310        micro_buf: &mut MicroBatchBuffer,
311        queue: &Arc<ArrayQueue<DexEvent>>,
312        timeout_ms: u64,
313        batch_us: u64,
314        next_check: &mut Instant,
315        interval: Duration,
316    ) {
317        if Instant::now() < *next_check {
318            return;
319        }
320        *next_check = Instant::now() + interval;
321
322        match mode {
323            OrderMode::Ordered => {
324                if slot_buf.should_timeout(timeout_ms) {
325                    for e in slot_buf.flush_all() {
326                        let _ = queue.push(e);
327                    }
328                }
329            }
330            OrderMode::StreamingOrdered => {
331                if slot_buf.should_timeout(timeout_ms) {
332                    for e in slot_buf.flush_streaming_timeout() {
333                        let _ = queue.push(e);
334                    }
335                }
336            }
337            OrderMode::MicroBatch => {
338                // Periodic flush for MicroBatch mode
339                let now_us = get_timestamp_us();
340                if micro_buf.should_flush(now_us, batch_us) {
341                    for e in micro_buf.flush() {
342                        let _ = queue.push(e);
343                    }
344                }
345            }
346            OrderMode::Unordered => {}
347        }
348    }
349
350    fn flush_on_disconnect(
351        &self,
352        mode: OrderMode,
353        buffer: &mut SlotBuffer,
354        queue: &Arc<ArrayQueue<DexEvent>>,
355    ) {
356        if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
357            let events = match mode {
358                OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
359                _ => buffer.flush_all(),
360            };
361            for e in events {
362                let _ = queue.push(e);
363            }
364        }
365    }
366
367    #[inline]
368    fn handle_update(
369        &self,
370        update_msg: SubscribeUpdate,
371        mode: OrderMode,
372        filter: &Option<EventTypeFilter>,
373        queue: &Arc<ArrayQueue<DexEvent>>,
374        slot_buf: &mut SlotBuffer,
375        micro_buf: &mut MicroBatchBuffer,
376        last_slot: &mut u64,
377        batch_us: u64,
378    ) {
379        let created_at = update_msg.created_at.unwrap_or_default();
380        let block_time_us = timestamp_to_microseconds(created_at.seconds, created_at.nanos) as i64;
381        let grpc_recv_us = get_timestamp_us();
382
383        let Some(update) = update_msg.update_oneof else { return };
384
385        match update {
386            subscribe_update::UpdateOneof::Transaction(tx) => {
387                self.handle_transaction(
388                    tx,
389                    mode,
390                    filter,
391                    queue,
392                    slot_buf,
393                    micro_buf,
394                    last_slot,
395                    batch_us,
396                    grpc_recv_us,
397                    block_time_us,
398                );
399            }
400            subscribe_update::UpdateOneof::Account(acc) => {
401                Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
402            }
403            subscribe_update::UpdateOneof::BlockMeta(block_meta) => {
404                Self::handle_block_meta(block_meta, filter, queue, grpc_recv_us, block_time_us);
405            }
406            _ => {}
407        }
408    }
409
410    #[inline]
411    fn handle_transaction(
412        &self,
413        tx: SubscribeUpdateTransaction,
414        mode: OrderMode,
415        filter: &Option<EventTypeFilter>,
416        queue: &Arc<ArrayQueue<DexEvent>>,
417        slot_buf: &mut SlotBuffer,
418        micro_buf: &mut MicroBatchBuffer,
419        last_slot: &mut u64,
420        batch_us: u64,
421        grpc_us: i64,
422        block_us: i64,
423    ) {
424        let slot = tx.slot;
425
426        match mode {
427            OrderMode::Unordered => {
428                for e in crate::grpc::parse_subscribe_update_transaction_low_latency(
429                    &tx,
430                    grpc_us,
431                    Some(block_us),
432                    filter.as_ref(),
433                ) {
434                    let _ = queue.push(e);
435                }
436            }
437            OrderMode::Ordered => {
438                if slot > *last_slot && *last_slot > 0 {
439                    for e in slot_buf.flush_before(slot) {
440                        let _ = queue.push(e);
441                    }
442                }
443                *last_slot = slot;
444                for (idx, e) in
445                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
446                {
447                    slot_buf.push(slot, idx, e);
448                }
449            }
450            OrderMode::StreamingOrdered => {
451                for (idx, e) in
452                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
453                {
454                    for evt in slot_buf.push_streaming(slot, idx, e) {
455                        let _ = queue.push(evt);
456                    }
457                }
458            }
459            OrderMode::MicroBatch => {
460                for (idx, e) in
461                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
462                {
463                    if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
464                        for evt in micro_buf.flush() {
465                            let _ = queue.push(evt);
466                        }
467                    }
468                }
469            }
470        }
471    }
472
473    #[inline]
474    fn handle_account(
475        acc: SubscribeUpdateAccount,
476        filter: &Option<EventTypeFilter>,
477        queue: &Arc<ArrayQueue<DexEvent>>,
478        grpc_us: i64,
479        block_us: i64,
480    ) {
481        let Some(info) = acc.account else { return };
482        let data = crate::accounts::AccountData {
483            pubkey: read_pubkey_fast(&info.pubkey),
484            executable: info.executable,
485            lamports: info.lamports,
486            owner: read_pubkey_fast(&info.owner),
487            rent_epoch: info.rent_epoch,
488            data: info.data,
489        };
490        let meta = EventMetadata {
491            signature: Default::default(),
492            slot: acc.slot,
493            tx_index: 0,
494            block_time_us: block_us,
495            grpc_recv_us: grpc_us,
496            recent_blockhash: None,
497        };
498        if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
499            let _ = queue.push(e);
500        }
501    }
502
503    #[inline]
504    fn handle_block_meta(
505        block_meta: SubscribeUpdateBlockMeta,
506        filter: &Option<EventTypeFilter>,
507        queue: &Arc<ArrayQueue<DexEvent>>,
508        grpc_us: i64,
509        fallback_block_us: i64,
510    ) {
511        let block_time_us = block_meta
512            .block_time
513            .as_ref()
514            .map(|t| t.timestamp.saturating_mul(1_000_000))
515            .unwrap_or(fallback_block_us);
516        let event = DexEvent::BlockMeta(crate::core::events::BlockMetaEvent {
517            metadata: EventMetadata {
518                signature: Default::default(),
519                slot: block_meta.slot,
520                tx_index: 0,
521                block_time_us,
522                grpc_recv_us: grpc_us,
523                recent_blockhash: (!block_meta.blockhash.is_empty())
524                    .then_some(block_meta.blockhash),
525            },
526        });
527        if filter.as_ref().map(|f| f.should_include_dex_event(&event)).unwrap_or(true) {
528            let _ = queue.push(event);
529        }
530    }
531}
532
533// ==================== 辅助函数 ====================
534
535/// 获取当前时间戳(微秒)
536///
537/// 使用高性能时钟,避免系统调用开销
538///
539/// # 性能优势
540/// - 旧实现:使用 libc::clock_gettime,每次调用约 1-2μs
541/// - 新实现:使用高性能时钟,每次调用约 10-50ns
542/// - 性能提升:20-100 倍
543#[inline(always)]
544fn get_timestamp_us() -> i64 {
545    now_micros()
546}
547
548// ==================== 交易解析 ====================
549
550#[inline]
551fn parse_transaction_to_vec(
552    tx: &SubscribeUpdateTransaction,
553    grpc_us: i64,
554    block_us: Option<i64>,
555    filter: Option<&EventTypeFilter>,
556) -> Vec<(u64, DexEvent)> {
557    let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
558    parse_transaction_core(tx, grpc_us, block_us, filter).into_iter().map(|e| (idx, e)).collect()
559}
560
561#[inline]
562fn parse_transaction_core(
563    tx: &SubscribeUpdateTransaction,
564    grpc_us: i64,
565    block_us: Option<i64>,
566    filter: Option<&EventTypeFilter>,
567) -> Vec<DexEvent> {
568    let Some(info) = &tx.transaction else { return Vec::new() };
569    let Some(meta) = &info.meta else { return Vec::new() };
570
571    let sig = extract_signature(&info.signature);
572    let slot = tx.slot;
573    let idx = info.index;
574
575    let log_events = parse_logs(
576        meta,
577        &info.transaction,
578        &meta.log_messages,
579        sig,
580        slot,
581        idx,
582        block_us,
583        grpc_us,
584        filter,
585    );
586    let instr_events =
587        parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter);
588
589    let events =
590        crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events);
591    if let Some(filter) = filter {
592        events.into_iter().map(|e| filter.normalize_dex_event(e)).collect()
593    } else {
594        events
595    }
596}
597
598#[inline(always)]
599fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
600    try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
601}
602
603#[inline]
604fn parse_logs(
605    meta: &TransactionStatusMeta,
606    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
607    logs: &[String],
608    sig: solana_sdk::signature::Signature,
609    slot: u64,
610    tx_idx: u64,
611    block_us: Option<i64>,
612    grpc_us: i64,
613    filter: Option<&EventTypeFilter>,
614) -> Vec<DexEvent> {
615    let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
616        if m.recent_blockhash.is_empty() {
617            None
618        } else {
619            Some(m.recent_blockhash.clone())
620        }
621    });
622
623    let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
624    let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
625
626    let mut outer_idx: i32 = -1;
627    let mut inner_idx: i32 = -1;
628    let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
629    let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
630    let mut result = Vec::with_capacity(4);
631
632    for log in logs {
633        if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
634            if depth == 1 {
635                inner_idx = -1;
636                outer_idx += 1;
637            } else {
638                inner_idx += 1;
639            }
640            if let Ok(pk) = Pubkey::from_str(pid) {
641                active_program_stack.truncate(depth.saturating_sub(1));
642                active_program_stack.push(pk);
643                invokes.entry(pk).or_default().push((outer_idx, inner_idx));
644            }
645        }
646
647        if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_some() {
648            let current_program = active_program_stack.last();
649            if let Some(mut e) = crate::logs::parse_log_with_program_id(
650                log,
651                sig,
652                slot,
653                tx_idx,
654                block_us,
655                grpc_us,
656                filter,
657                has_create,
658                recent_blockhash.as_deref(),
659                current_program,
660            ) {
661                crate::core::account_dispatcher::fill_accounts_with_owned_keys(
662                    &mut e,
663                    meta,
664                    transaction,
665                    &invokes,
666                );
667                crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
668                result.push(e);
669            }
670        }
671
672        if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
673            if let Ok(pk) = Pubkey::from_str(pid) {
674                if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
675                    active_program_stack.truncate(pos);
676                }
677            }
678        }
679    }
680    result
681}
682
683#[inline]
684fn parse_instructions(
685    meta: &TransactionStatusMeta,
686    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
687    sig: solana_sdk::signature::Signature,
688    slot: u64,
689    tx_idx: u64,
690    block_us: Option<i64>,
691    grpc_us: i64,
692    filter: Option<&EventTypeFilter>,
693) -> Vec<DexEvent> {
694    // 使用增强的 instruction 解析器
695    // 支持:
696    // - 主指令解析(8字节 discriminator)
697    // - Inner instruction 解析(16字节 discriminator)
698    // - 自动事件合并(instruction + inner instruction)
699    crate::grpc::instruction_parser::parse_instructions_enhanced(
700        meta,
701        transaction,
702        sig,
703        slot,
704        tx_idx,
705        block_us,
706        grpc_us,
707        filter,
708    )
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714
715    #[tokio::test]
716    async fn stop_clears_subscription_state_and_aborts_handle() {
717        let grpc = YellowstoneGrpc::new("http://127.0.0.1:1".to_string(), None).unwrap();
718        let (tx, _rx) = mpsc::channel::<SubscribeRequest>(1);
719        let handle = tokio::spawn(async {
720            std::future::pending::<()>().await;
721        });
722
723        let stop_signal = Arc::new(AtomicBool::new(false));
724        *grpc.control_tx.lock().await = Some(tx);
725        *grpc.subscription_handle.lock().await = Some(handle);
726        *grpc.stop_signal.lock().await = Some(Arc::clone(&stop_signal));
727
728        grpc.stop().await;
729
730        assert!(stop_signal.load(Ordering::SeqCst));
731        assert!(grpc.stop_signal.lock().await.is_none());
732        assert!(grpc.control_tx.lock().await.is_none());
733        assert!(grpc.subscription_handle.lock().await.is_none());
734    }
735}