1use std::{
2 sync::{Arc, Mutex},
3 time::Duration,
4};
5
6use serde_json;
7use thiserror::Error;
8use titan_types_api::TcpSubscriptionRequest;
9use titan_types_core::Event;
10use tokio::{
11 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
12 net::TcpStream,
13 sync::{mpsc, watch},
14 task::JoinHandle,
15};
16use tracing::{debug, error, info, warn};
17
18use crate::tcp::{
19 connection_status::ConnectionStatus,
20 reconnection::{self, ReconnectionManager},
21};
22
23use super::connection_status::ConnectionStatusTracker;
24
25#[derive(Debug, Error)]
26pub enum TcpClientError {
27 #[error("io error: {0}")]
28 IOError(#[from] std::io::Error),
29 #[error("serde error: {0}")]
30 SerdeError(#[from] serde_json::Error),
31 #[error("join error: task panicked")]
32 JoinError,
33 #[error("lock error: failed to acquire lock")]
34 LockError,
35}
36
37#[derive(Debug, Clone)]
39pub struct Config {
40 pub max_retries: Option<u32>,
42 pub retry_delay: Duration,
44 pub read_buffer_capacity: usize,
46 pub max_buffer_size: usize,
48 pub ping_interval: Duration,
50 pub pong_timeout: Duration,
52}
53
54impl Default for Config {
55 fn default() -> Self {
56 Self {
57 max_retries: None,
58 retry_delay: Duration::from_secs(1), read_buffer_capacity: 4096, max_buffer_size: 10 * 1024 * 1024, ping_interval: Duration::from_secs(30), pong_timeout: Duration::from_secs(10), }
64 }
65}
66
67struct ShutdownChannel {
69 sender: watch::Sender<()>,
70 receiver: watch::Receiver<()>,
71}
72
73impl ShutdownChannel {
74 fn new() -> Self {
75 let (sender, receiver) = watch::channel(());
76 Self { sender, receiver }
77 }
78
79 fn get_receiver(&self) -> watch::Receiver<()> {
80 self.receiver.clone()
81 }
82
83 fn send(&self) -> Result<(), watch::error::SendError<()>> {
84 self.sender.send(())
85 }
86}
87
88pub struct AsyncTcpClient {
90 shutdown_channel: Arc<Mutex<ShutdownChannel>>,
91 config: Config,
92 status_tracker: ConnectionStatusTracker,
93 worker_task: Mutex<Option<JoinHandle<()>>>,
94}
95
96impl AsyncTcpClient {
97 pub fn new_with_config(config: Config) -> Self {
99 Self {
100 shutdown_channel: Arc::new(Mutex::new(ShutdownChannel::new())),
101 config,
102 status_tracker: ConnectionStatusTracker::new(),
103 worker_task: Mutex::new(None),
104 }
105 }
106
107 pub fn new() -> Self {
110 Self::new_with_config(Config::default())
111 }
112
113 pub fn get_status(&self) -> ConnectionStatus {
115 self.status_tracker.get_status()
116 }
117
118 pub fn create_status_subscriber(&self) -> mpsc::Receiver<ConnectionStatus> {
120 let (tx, rx) = mpsc::channel(100);
121 self.status_tracker.register_listener(tx);
122 rx
123 }
124
125 pub fn has_active_task(&self) -> Result<bool, TcpClientError> {
129 match self.worker_task.lock() {
130 Ok(lock) => Ok(lock.is_some()),
131 Err(_) => {
132 error!("Failed to acquire worker task lock");
133 Err(TcpClientError::LockError)
134 }
135 }
136 }
137
138 pub async fn subscribe(
146 &self,
147 addr: &str,
148 subscription_request: TcpSubscriptionRequest,
149 ) -> Result<mpsc::Receiver<Event>, TcpClientError> {
150 info!("Subscribing to {}", addr);
151
152 let mut worker_lock = self
154 .worker_task
155 .lock()
156 .map_err(|_| TcpClientError::LockError)?;
157
158 if let Some(handle) = worker_lock.take() {
160 info!("Shutting down existing subscription task before starting a new one");
161 self.send_shutdown_signal()?;
163
164 let mut shutdown_guard = self
166 .shutdown_channel
167 .lock()
168 .map_err(|_| TcpClientError::LockError)?;
169 *shutdown_guard = ShutdownChannel::new();
170 drop(shutdown_guard); match tokio::time::timeout(Duration::from_secs(5), handle).await {
174 Ok(join_result) => match join_result {
175 Ok(_) => info!("Successfully joined existing task"),
176 Err(_) => error!("Error joining existing task: it panicked"),
177 },
178 Err(_) => {
179 error!("Timed out waiting for existing task to complete, proceeding with new task anyway");
181 }
182 }
183 }
184
185 let (tx, rx) = mpsc::channel::<Event>(100);
187
188 let reconnect_settings = self.config.clone();
190 let shutdown_receiver = {
191 let guard = self
192 .shutdown_channel
193 .lock()
194 .map_err(|_| TcpClientError::LockError)?;
195 guard.get_receiver()
196 };
197 let addr = addr.to_owned();
198 let subscription_request = subscription_request;
199 let status_tracker = self.status_tracker.clone();
200
201 status_tracker.update_status(ConnectionStatus::Connecting);
203
204 let reconnection_config = reconnection::from_async_reconnect_settings(&reconnect_settings);
206
207 info!("Creating reconnection manager");
208
209 let handle = tokio::spawn(async move {
211 let update_status = |new_status| status_tracker.update_status(new_status);
213
214 let mut reconnection_manager = ReconnectionManager::new(reconnection_config);
216
217 let mut shutdown_rx = shutdown_receiver;
219
220 let ping_interval = reconnect_settings.ping_interval;
222 let pong_timeout = reconnect_settings.pong_timeout;
223 let mut last_pong_time = std::time::Instant::now();
224 let mut ping_timer = tokio::time::interval(ping_interval);
225 ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
226 let mut awaiting_pong = false;
227
228 loop {
229 if shutdown_rx.has_changed().unwrap_or(false) {
231 info!("Shutdown signal received. Exiting subscription task.");
232 update_status(ConnectionStatus::Disconnected);
233 break;
234 }
235 info!("Attempting to connect to {}", addr);
236 update_status(ConnectionStatus::Connecting);
237
238 match TcpStream::connect(&addr).await {
239 Ok(stream) => {
240 info!("Connected to {}.", addr);
241 update_status(ConnectionStatus::Connected);
242
243 reconnection_manager.reset();
245
246 let (reader, mut writer) = stream.into_split();
247 let mut reader = BufReader::new(reader);
248
249 let ping_interval = reconnect_settings.ping_interval;
251 let pong_timeout = reconnect_settings.pong_timeout;
252 let mut last_pong_time = std::time::Instant::now();
253 let mut ping_timer = tokio::time::interval(ping_interval);
254 ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
255 let mut awaiting_pong = false;
256
257 match serde_json::to_string(&subscription_request) {
259 Ok(req_json) => {
260 if let Err(e) = writer.write_all(req_json.as_bytes()).await {
261 error!("Error sending subscription request: {}", e);
262 continue;
264 }
265 if let Err(e) = writer.write_all(b"\n").await {
266 error!("Error writing newline: {}", e);
267 continue;
268 }
269 if let Err(e) = writer.flush().await {
270 error!("Error flushing writer: {}", e);
271 continue;
272 }
273 }
274 Err(e) => {
275 error!("Error serializing subscription request: {}", e);
276 update_status(ConnectionStatus::Disconnected);
277 break;
278 }
279 }
280
281 let mut byte_buf =
283 Vec::with_capacity(reconnect_settings.read_buffer_capacity);
284 loop {
286 if byte_buf.capacity() > reconnect_settings.max_buffer_size {
288 error!(
289 "Buffer capacity exceeded maximum allowed size ({}), resetting connection.",
290 reconnect_settings.max_buffer_size
291 );
292 break; }
294
295 tokio::select! {
296 result = reader.read_until(b'\n', &mut byte_buf) => {
297 match result {
298 Ok(0) => {
299 warn!("TCP connection closed by server.");
301 update_status(ConnectionStatus::Reconnecting);
302 break; }
304 Ok(n) if n > 0 => {
305 let message_bytes = byte_buf.trim_ascii_end(); if !message_bytes.is_empty() {
312 if message_bytes == b"PONG" {
314 if awaiting_pong {
315 awaiting_pong = false;
316 last_pong_time = std::time::Instant::now();
317 debug!("Received PONG");
318 }
319 } else {
320 if message_bytes.len() > reconnect_settings.max_buffer_size {
322 error!(
323 "Received message exceeds maximum allowed size ({}), skipping. Message starts with: {:?}",
324 reconnect_settings.max_buffer_size,
325 String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 50)]) );
327 } else {
329 match serde_json::from_slice::<Event>(message_bytes) {
331 Ok(event) => {
332 last_pong_time = std::time::Instant::now();
334 awaiting_pong = false; if let Err(e) = tx.send(event).await {
336 error!("Failed to send event to channel: {}", e);
337 update_status(ConnectionStatus::Disconnected); return; }
340 }
341 Err(e) => {
342 error!(
343 "Failed to parse event: {}. Raw data (first 100 bytes): {:?}",
344 e,
345 String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 100)])
346 );
347 update_status(ConnectionStatus::Reconnecting);
348 break;
349 }
350 }
351 }
352 }
353 }
354 byte_buf.clear();
356 }
357 Ok(_) => {
358 byte_buf.clear();
360 }
361 Err(e) => {
362 error!("Error reading from TCP socket using read_until: {}", e);
363 update_status(ConnectionStatus::Reconnecting);
364 break; }
366 }
367 }
368 _ = ping_timer.tick() => {
369 if awaiting_pong {
371 let elapsed = last_pong_time.elapsed();
373 if elapsed > pong_timeout {
374 warn!("Pong response timed out after {:?}, considering connection dead", elapsed);
375 update_status(ConnectionStatus::Reconnecting);
376 break;
377 }
378 } else {
379 match writer.write_all(b"PING\n").await {
381 Ok(_) => {
382 if let Err(e) = writer.flush().await {
383 error!("Failed to flush ping: {}", e);
384 update_status(ConnectionStatus::Reconnecting);
385 break;
386 }
387 awaiting_pong = true;
388 }
389 Err(e) => {
390 error!("Failed to send ping: {}", e);
391 update_status(ConnectionStatus::Reconnecting);
392 break;
393 }
394 }
395 }
396 }
397 _ = shutdown_rx.changed() => {
398 info!("Shutdown signal received. Exiting TCP subscription task.");
399 update_status(ConnectionStatus::Disconnected);
400 return;
401 }
402 }
403 }
404 info!("Lost connection. Preparing to reconnect...");
406 update_status(ConnectionStatus::Reconnecting);
407 }
408 Err(e) => {
409 error!("Failed to connect to {}: {}. Will retry...", addr, e);
410 update_status(ConnectionStatus::Reconnecting);
411 }
412 }
413
414 match reconnection_manager.next_delay() {
416 Some(wait_time) => {
417 info!(
418 "Reconnecting in {:?}... (attempt {}/{:?})",
419 wait_time,
420 reconnection_manager.current_attempt(),
421 reconnection_manager.config().max_attempts
422 );
423 tokio::time::sleep(wait_time).await;
424 }
425 None => {
426 error!(
427 "Reached maximum reconnection attempts ({}). Exiting subscription task.",
428 reconnection_manager.config().max_attempts.unwrap_or(0)
429 );
430 update_status(ConnectionStatus::Disconnected);
431 break;
432 }
433 }
434 }
435
436 info!("Exiting TCP subscription task.");
437 update_status(ConnectionStatus::Disconnected);
438 });
439
440 *worker_lock = Some(handle);
442
443 Ok(rx)
444 }
445
446 fn send_shutdown_signal(&self) -> Result<(), TcpClientError> {
448 let channel = match self.shutdown_channel.lock() {
449 Ok(channel) => channel,
450 Err(_) => {
451 error!("Failed to acquire shutdown channel lock");
452 return Err(TcpClientError::LockError);
453 }
454 };
455
456 if let Err(e) = channel.send() {
457 error!("Failed to send shutdown signal: {:?}", e);
458 return Err(TcpClientError::IOError(std::io::Error::new(
459 std::io::ErrorKind::Other,
460 "Failed to send shutdown signal",
461 )));
462 }
463
464 Ok(())
465 }
466
467 pub fn shutdown(&self) {
470 self.status_tracker
472 .update_status(ConnectionStatus::Disconnected);
473
474 if let Err(e) = self.send_shutdown_signal() {
476 error!("Error in shutdown: {:?}", e);
477 }
478 }
479
480 pub async fn shutdown_and_join(&self) -> Result<(), TcpClientError> {
483 self.shutdown();
485
486 self.join().await
488 }
489
490 pub async fn join(&self) -> Result<(), TcpClientError> {
493 let mut worker_lock = self
495 .worker_task
496 .lock()
497 .map_err(|_| TcpClientError::LockError)?;
498
499 if let Some(handle) = worker_lock.take() {
501 match handle.await {
502 Ok(_) => {
503 info!("Successfully joined worker task");
504 Ok(())
505 }
506 Err(_) => {
507 error!("Failed to join worker task due to panic");
508 Err(TcpClientError::JoinError)
509 }
510 }
511 } else {
512 info!("No worker task to join");
514 Ok(())
515 }
516 }
517}
518
519impl Drop for AsyncTcpClient {
520 fn drop(&mut self) {
521 self.shutdown();
523
524 info!("AsyncTcpClient dropped, task will continue running until shutdown completes");
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532 use std::net::{SocketAddr, TcpListener};
533 use std::sync::Arc;
534 use std::sync::Once;
535 use titan_types_core::EventType;
536 use tokio::io::{AsyncReadExt, AsyncWriteExt};
537 use tokio::net::TcpListener as TokioTcpListener;
538 use tokio::select;
539 use tokio::signal::unix::{signal, SignalKind};
540 use tokio::sync::oneshot;
541 use tokio::task::JoinHandle;
542 use tokio::time::sleep;
543 use tracing_subscriber::{self, EnvFilter};
544
545 static INIT: Once = Once::new();
547
548 fn init_test_logger() {
549 INIT.call_once(|| {
550 let filter =
552 EnvFilter::from_default_env().add_directive("titan_client=trace".parse().unwrap());
553
554 tracing_subscriber::fmt()
555 .with_env_filter(filter)
556 .with_test_writer() .init();
558
559 println!("Test logger initialized at TRACE level");
560 });
561 }
562
563 async fn start_async_test_server() -> (JoinHandle<()>, SocketAddr, oneshot::Sender<()>) {
565 init_test_logger();
567
568 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
570
571 let listener = TokioTcpListener::bind("127.0.0.1:0").await.unwrap();
573 let addr = listener.local_addr().unwrap();
574
575 info!("Test server starting on {}", addr);
577
578 let handle = tokio::spawn(async move {
580 loop {
581 tokio::select! {
582 accept_result = listener.accept() => {
583 match accept_result {
584 Ok((mut stream, client_addr)) => {
585 info!("Test server accepted connection from {}", client_addr);
586
587 tokio::spawn(async move {
589 let (reader, mut writer) = stream.split();
590 let mut reader = BufReader::new(reader);
591 let mut request_buf = Vec::new();
592
593 match reader.read_until(b'\n', &mut request_buf).await {
595 Ok(n) if n > 0 => {
596 let request_bytes = request_buf.trim_ascii_end();
597 info!("Test server received request: {}", String::from_utf8_lossy(request_bytes));
598
599 sleep(Duration::from_millis(10)).await;
601
602 let event_json = r#"{"type":"TransactionsAdded","data": { "txids":["2222222222222222222222222222222222222222222222222222222222222222"]}}"#;
603 if let Err(e) = writer.write_all(event_json.as_bytes()).await {
604 error!("Test server failed to write event: {}", e); return;
605 }
606 if let Err(e) = writer.write_all(b"\n").await {
607 error!("Test server failed to write newline: {}", e); return;
608 }
609 if let Err(e) = writer.flush().await {
610 error!("Test server failed to flush: {}", e); return;
611 }
612 info!("Test server sent event and newline");
613
614 let mut line_buf = Vec::new();
616 loop {
617 line_buf.clear();
619 tokio::select! {
620 read_res = reader.read_until(b'\n', &mut line_buf) => {
621 match read_res {
622 Ok(0) => {
623 info!("Test server: Client disconnected");
624 break; }
626 Ok(m) if m > 0 => {
627 let trimmed_line = line_buf.trim_ascii_end();
628 if trimmed_line == b"PING" {
629 info!("Test server received PING");
630 if let Err(e) = writer.write_all(b"PONG\n").await {
631 error!("Test server failed to send PONG: {}", e);
632 break; }
634 if let Err(e) = writer.flush().await {
635 error!("Test server failed to flush PONG: {}", e);
636 break; }
638 info!("Test server sent PONG");
639 } else if !trimmed_line.is_empty() {
640 info!("Test server received unexpected data: {}", String::from_utf8_lossy(trimmed_line));
642 }
643 }
644 Ok(_) => {
645 }
647 Err(e) => {
648 error!("Test server read error in PING loop: {}", e);
649 break; }
651 }
652 }
653 _ = tokio::time::sleep(Duration::from_secs(1)) => {
655 }
657 }
658 }
659 },
660 Ok(0) => {
661 info!("Test server received empty read, client closed connection early");
662 },
663 Ok(n) => {
664 info!("Test server received {} bytes, but not processing as initial request", n);
665 },
666 Err(e) => error!("Test server initial read error: {}", e),
667 }
668 });
669 },
670 Err(e) => {
671 error!("Test server accept error: {}", e);
672 sleep(Duration::from_millis(10)).await;
674 }
675 }
676 },
677 _ = &mut shutdown_rx => {
678 info!("Test server received shutdown signal");
679 break;
680 }
681 }
682 }
683 info!("Test server shutting down");
684 });
685
686 sleep(Duration::from_millis(10)).await;
688 info!("Test server setup complete, returning handle");
689
690 (handle, addr, shutdown_tx)
691 }
692
693 #[tokio::test]
694 async fn test_shutdown_and_join() {
695 init_test_logger();
697 info!("Starting test_shutdown_and_join");
698
699 let client = AsyncTcpClient::new_with_config(Config {
701 max_retries: Some(2), retry_delay: Duration::from_millis(100), read_buffer_capacity: 4096,
704 max_buffer_size: 10 * 1024 * 1024,
705 ping_interval: Duration::from_secs(30),
706 pong_timeout: Duration::from_secs(10),
707 });
708
709 let subscription_request = TcpSubscriptionRequest { subscribe: vec![] };
711 info!("Subscribing to non-existent server to test shutdown");
712
713 let result = client.subscribe("127.0.0.1:1", subscription_request).await;
715 assert!(result.is_ok());
716 info!("Subscription started, checking active task");
717
718 assert!(client.has_active_task().unwrap());
720
721 info!("Waiting for reconnection attempts to begin");
723 sleep(Duration::from_millis(300)).await;
724
725 info!("Shutting down client");
727 let result = client.shutdown_and_join().await;
728
729 assert!(result.is_ok());
731 info!("Client shutdown successfully");
732
733 assert!(!client.has_active_task().unwrap());
735 info!("Test completed successfully");
736 }
737
738 #[tokio::test]
739 async fn test_multiple_subscribes() {
740 init_test_logger();
742 info!("Starting test_multiple_subscribes");
743
744 let client = AsyncTcpClient::new_with_config(Config {
746 max_retries: Some(1), retry_delay: Duration::from_millis(100), read_buffer_capacity: 4096,
749 max_buffer_size: 10 * 1024 * 1024,
750 ping_interval: Duration::from_secs(30),
751 pong_timeout: Duration::from_secs(10),
752 });
753
754 let subscription_request1 = TcpSubscriptionRequest { subscribe: vec![] };
756 info!("Creating first subscription");
757 let result1 = client.subscribe("127.0.0.1:1", subscription_request1).await;
758 assert!(result1.is_ok());
759
760 assert!(client.has_active_task().unwrap());
762 info!("First subscription active");
763
764 sleep(Duration::from_millis(200)).await;
766
767 let subscription_request2 = TcpSubscriptionRequest { subscribe: vec![] };
769 info!("Creating second subscription (should replace the first)");
770 let result2 = client.subscribe("127.0.0.1:2", subscription_request2).await;
771 assert!(result2.is_ok());
772
773 assert!(client.has_active_task().unwrap());
775 info!("Second subscription active");
776
777 info!("Shutting down client");
779 let join_result = client.shutdown_and_join().await;
780 assert!(join_result.is_ok());
781
782 assert!(!client.has_active_task().unwrap());
784 info!("Test completed successfully");
785 }
786
787 #[tokio::test]
788 async fn test_async_connection_status_transitions() {
789 init_test_logger();
791 info!("Starting test_async_connection_status_transitions");
792
793 info!("Starting test server");
795 let (server_handle, server_addr, shutdown_tx) = start_async_test_server().await;
796
797 let client = AsyncTcpClient::new_with_config(Config {
799 max_retries: Some(2),
800 retry_delay: Duration::from_millis(100),
801 read_buffer_capacity: 4096,
802 max_buffer_size: 10 * 1024 * 1024,
803 ping_interval: Duration::from_secs(30),
804 pong_timeout: Duration::from_secs(10),
805 });
806
807 assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
809 info!("Initial status: {:?}", client.get_status());
810
811 let subscription_request = TcpSubscriptionRequest {
813 subscribe: vec![EventType::TransactionsAdded],
814 };
815
816 info!("Subscribing to test server at {}", server_addr);
817 let rx = client
818 .subscribe(
819 &format!("127.0.0.1:{}", server_addr.port()),
820 subscription_request,
821 )
822 .await
823 .unwrap();
824
825 info!("Waiting for connection to establish");
827 sleep(Duration::from_millis(100)).await;
828
829 let status = client.get_status();
831 info!("Status after connection attempt: {:?}", status);
832 assert_eq!(status, ConnectionStatus::Connected);
833
834 info!("Shutting down client");
836 client.shutdown_and_join().await.unwrap();
837
838 let final_status = client.get_status();
840 info!("Final status: {:?}", final_status);
841 assert_eq!(final_status, ConnectionStatus::Disconnected);
842
843 info!("Shutting down test server");
845 let _ = shutdown_tx.send(());
846 let _ = server_handle.await;
847 info!("Test completed successfully");
848 }
849
850 #[tokio::test]
851 async fn test_async_receive_events() {
852 init_test_logger();
854 info!("Starting test_async_receive_events");
855
856 info!("Starting test server");
858 let (server_handle, server_addr, shutdown_tx) = start_async_test_server().await;
859 info!("Test server started at {}", server_addr);
860
861 let client = AsyncTcpClient::new_with_config(Config {
863 max_retries: Some(2),
864 retry_delay: Duration::from_millis(100),
865 read_buffer_capacity: 4096,
866 max_buffer_size: 10 * 1024 * 1024,
867 ping_interval: Duration::from_secs(30),
868 pong_timeout: Duration::from_secs(10),
869 });
870
871 let subscription_request = TcpSubscriptionRequest {
873 subscribe: vec![EventType::TransactionsAdded],
874 };
875
876 info!("Subscribing to test server at {}", server_addr);
877 let mut rx = client
878 .subscribe(
879 &format!("127.0.0.1:{}", server_addr.port()),
880 subscription_request,
881 )
882 .await
883 .unwrap();
884
885 info!("Waiting for connection to establish");
887 sleep(Duration::from_millis(100)).await;
888
889 info!("Checking connection status: {:?}", client.get_status());
890 assert_eq!(
891 client.get_status(),
892 ConnectionStatus::Connected,
893 "Expected client to be connected, but status is {:?}",
894 client.get_status()
895 );
896
897 info!("Waiting to receive an event");
899 let event = tokio::time::timeout(Duration::from_secs(5), rx.recv()).await;
900
901 if let Err(e) = &event {
902 error!("Timeout waiting for event: {}", e);
903 info!("Current client status: {:?}", client.get_status());
905 assert!(false, "Failed to receive event within timeout");
906 }
907
908 let event = event.unwrap();
909
910 if let None = &event {
911 error!("Received None from channel - sender was likely dropped");
912 info!("Current client status: {:?}", client.get_status());
913 assert!(false, "Expected Some(event) but got None");
914 }
915
916 match event.unwrap() {
917 Event::TransactionsAdded { txids } => {
918 info!("Received transaction event with {} txids", txids.len());
919 assert_eq!(txids.len(), 1, "Expected 1 txid in event");
920 assert_eq!(
921 txids[0].to_string(),
922 "2222222222222222222222222222222222222222222222222222222222222222"
923 );
924 }
925 other => {
926 error!("Received unexpected event type: {:?}", other);
927 panic!("Received unexpected event type: {:?}", other);
928 }
929 }
930
931 info!("Shutting down client");
933 client.shutdown_and_join().await.unwrap();
934
935 info!("Shutting down test server");
937 let _ = shutdown_tx.send(());
938 let _ = server_handle.await;
939 info!("Test completed successfully");
940 }
941
942 #[tokio::test]
943 async fn test_async_connection_error_handling() {
944 init_test_logger();
946 info!("Starting test_async_connection_error_handling");
947
948 let client = AsyncTcpClient::new_with_config(Config {
950 max_retries: Some(2),
951 retry_delay: Duration::from_millis(100),
952 read_buffer_capacity: 4096,
953 max_buffer_size: 10 * 1024 * 1024,
954 ping_interval: Duration::from_secs(30),
955 pong_timeout: Duration::from_secs(10),
956 });
957
958 assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
960 info!("Initial status: {:?}", client.get_status());
961
962 let subscription_request = TcpSubscriptionRequest {
964 subscribe: vec![EventType::TransactionsAdded],
965 };
966
967 info!("Subscribing to non-existent server to test error handling");
968 let rx = client
969 .subscribe("127.0.0.1:1", subscription_request)
970 .await
971 .unwrap();
972
973 info!("Waiting for connection attempts");
975 sleep(Duration::from_millis(500)).await;
976
977 let status = client.get_status();
979 info!("Status after connection attempts: {:?}", status);
980 assert!(
981 status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
982 "Expected Reconnecting or Disconnected state, got {:?}",
983 status
984 );
985
986 info!("Shutting down client");
988 client.shutdown_and_join().await.unwrap();
989
990 let final_status = client.get_status();
992 info!("Final status: {:?}", final_status);
993 assert_eq!(final_status, ConnectionStatus::Disconnected);
994 info!("Test completed successfully");
995 }
996
997 #[tokio::test]
998 async fn test_async_shutdown_during_reconnect() {
999 init_test_logger();
1001 info!("Starting test_async_shutdown_during_reconnect");
1002
1003 let client = AsyncTcpClient::new_with_config(Config {
1006 max_retries: Some(100), retry_delay: Duration::from_millis(500), read_buffer_capacity: 4096,
1009 max_buffer_size: 10 * 1024 * 1024,
1010 ping_interval: Duration::from_secs(30),
1011 pong_timeout: Duration::from_secs(10),
1012 });
1013
1014 let subscription_request = TcpSubscriptionRequest {
1016 subscribe: vec![EventType::TransactionsAdded],
1017 };
1018
1019 info!("Subscribing to non-existent server to trigger reconnection");
1020 let rx = client
1021 .subscribe("127.0.0.1:1", subscription_request)
1022 .await
1023 .unwrap();
1024
1025 info!("Waiting for client to enter reconnection state");
1028 sleep(Duration::from_millis(50)).await;
1029
1030 let status_before_shutdown = client.get_status();
1031 info!("Status before shutdown: {:?}", status_before_shutdown);
1032
1033 if status_before_shutdown != ConnectionStatus::Reconnecting {
1035 info!("Not in reconnecting state yet, waiting longer");
1036 sleep(Duration::from_millis(50)).await;
1037 let status_before_shutdown = client.get_status();
1038 info!("Status after additional wait: {:?}", status_before_shutdown);
1039 }
1040
1041 assert_eq!(client.get_status(), ConnectionStatus::Reconnecting);
1044
1045 info!("Shutting down client during reconnection");
1047 client.shutdown();
1048
1049 let status_after_shutdown = client.get_status();
1051 info!(
1052 "Status immediately after shutdown: {:?}",
1053 status_after_shutdown
1054 );
1055 assert_eq!(status_after_shutdown, ConnectionStatus::Disconnected);
1056
1057 info!("Joining worker task");
1059 sleep(Duration::from_millis(50)).await;
1060 let result = client.join().await;
1061 info!("Join result: {:?}", result);
1062 assert!(
1063 result.is_ok(),
1064 "Failed to join task during reconnection: {:?}",
1065 result
1066 );
1067
1068 let final_status = client.get_status();
1070 info!("Final status: {:?}", final_status);
1071 assert_eq!(final_status, ConnectionStatus::Disconnected);
1072 info!("Test completed successfully");
1073 }
1074
1075 #[tokio::test]
1076 async fn test_async_buffer_size_limit() {
1077 init_test_logger();
1079 info!("Starting test_async_buffer_size_limit");
1080
1081 info!("Starting oversized data test server");
1083 let listener = TokioTcpListener::bind("127.0.0.1:0").await.unwrap();
1084 let server_addr = listener.local_addr().unwrap();
1085 info!("Test server bound to {}", server_addr);
1086
1087 let server_handle = tokio::spawn(async move {
1089 info!("Test server waiting for connection");
1090 if let Ok((mut stream, client_addr)) = listener.accept().await {
1091 info!("Test server accepted connection from {}", client_addr);
1092 let mut buf = [0; 1024];
1094 match stream.read(&mut buf).await {
1095 Ok(n) => {
1096 let request = String::from_utf8_lossy(&buf[..n]);
1097 info!("Test server received request: {}", request);
1098
1099 info!("Sending 100KB payload to test buffer limits");
1101
1102 let prefix = r#"{"type":"TransactionsAdded", "data": {"txids":["#;
1104 stream.write_all(prefix.as_bytes()).await.unwrap();
1105
1106 let large_payload = "x".repeat(100_000); stream.write_all(large_payload.as_bytes()).await.unwrap();
1109
1110 let suffix = r#"}]}}"#;
1112 stream.write_all(suffix.as_bytes()).await.unwrap();
1113 stream.write_all(b"\n").await.unwrap();
1114 info!("Large payload sent");
1115
1116 sleep(Duration::from_millis(100)).await;
1118 }
1119 Err(e) => {
1120 error!("Test server read error: {}", e);
1121 }
1122 }
1123 }
1124 info!("Test server shutting down");
1125 });
1126
1127 info!("Creating client with small buffer size");
1129 let client = AsyncTcpClient::new_with_config(Config {
1130 max_retries: Some(1),
1131 retry_delay: Duration::from_millis(100),
1132 read_buffer_capacity: 1024, max_buffer_size: 10 * 1024, ping_interval: Duration::from_secs(30),
1135 pong_timeout: Duration::from_secs(10),
1136 });
1137
1138 let subscription_request = TcpSubscriptionRequest {
1140 subscribe: vec![EventType::TransactionsAdded],
1141 };
1142
1143 info!("Subscribing to server with buffer size limit test");
1144 let _rx = client
1145 .subscribe(
1146 &format!("127.0.0.1:{}", server_addr.port()),
1147 subscription_request,
1148 )
1149 .await
1150 .unwrap();
1151
1152 info!("Waiting for buffer overflow to occur");
1154 sleep(Duration::from_millis(300)).await;
1155
1156 let status = client.get_status();
1158 info!("Client status after buffer overflow test: {:?}", status);
1159 assert!(
1160 status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
1161 "Expected client to disconnect due to buffer overflow, but status is {:?}",
1162 status
1163 );
1164
1165 info!("Cleaning up");
1167 client.shutdown_and_join().await.unwrap();
1168 let _ = server_handle.await;
1169 info!("Test completed successfully");
1170 }
1171}