1use std::{
2 sync::{Arc, Mutex},
3 time::Duration,
4};
5
6use serde_json;
7use thiserror::Error;
8use titan_types_api::TcpSubscriptionRequest;
9use titan_types_core::Event;
10use tokio::{
11 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
12 net::TcpStream,
13 sync::{mpsc, watch},
14 task::JoinHandle,
15};
16use tracing::{debug, error, info, warn};
17
18use crate::tcp::{
19 connection_status::ConnectionStatus,
20 reconnection::{self, ReconnectionManager},
21};
22
23use super::connection_status::ConnectionStatusTracker;
24
25#[derive(Debug, Error)]
26pub enum TcpClientError {
27 #[error("io error: {0}")]
28 IOError(#[from] std::io::Error),
29 #[error("serde error: {0}")]
30 SerdeError(#[from] serde_json::Error),
31 #[error("join error: task panicked")]
32 JoinError,
33 #[error("lock error: failed to acquire lock")]
34 LockError,
35}
36
37#[derive(Debug, Clone)]
39pub struct Config {
40 pub max_retries: Option<u32>,
42 pub retry_delay: Duration,
44 pub read_buffer_capacity: usize,
46 pub max_buffer_size: usize,
48 pub ping_interval: Duration,
50 pub pong_timeout: Duration,
52}
53
54impl Default for Config {
55 fn default() -> Self {
56 Self {
57 max_retries: None,
58 retry_delay: Duration::from_secs(1), read_buffer_capacity: 4096, max_buffer_size: 10 * 1024 * 1024, ping_interval: Duration::from_secs(30), pong_timeout: Duration::from_secs(10), }
64 }
65}
66
67struct ShutdownChannel {
69 sender: watch::Sender<()>,
70 receiver: watch::Receiver<()>,
71}
72
73impl ShutdownChannel {
74 fn new() -> Self {
75 let (sender, receiver) = watch::channel(());
76 Self { sender, receiver }
77 }
78
79 fn get_receiver(&self) -> watch::Receiver<()> {
80 self.receiver.clone()
81 }
82
83 fn send(&self) -> Result<(), watch::error::SendError<()>> {
84 self.sender.send(())
85 }
86}
87
88pub struct AsyncTcpClient {
90 shutdown_channel: Arc<Mutex<ShutdownChannel>>,
91 config: Config,
92 status_tracker: ConnectionStatusTracker,
93 worker_task: Mutex<Option<JoinHandle<()>>>,
94}
95
96impl AsyncTcpClient {
97 pub fn new_with_config(config: Config) -> Self {
99 Self {
100 shutdown_channel: Arc::new(Mutex::new(ShutdownChannel::new())),
101 config,
102 status_tracker: ConnectionStatusTracker::new(),
103 worker_task: Mutex::new(None),
104 }
105 }
106
107 pub fn new() -> Self {
110 Self::new_with_config(Config::default())
111 }
112
113 pub fn get_status(&self) -> ConnectionStatus {
115 self.status_tracker.get_status()
116 }
117
118 pub fn create_status_subscriber(&self) -> mpsc::Receiver<ConnectionStatus> {
120 let (tx, rx) = mpsc::channel(100);
121 self.status_tracker.register_listener(tx);
122 rx
123 }
124
125 pub fn has_active_task(&self) -> Result<bool, TcpClientError> {
129 match self.worker_task.lock() {
130 Ok(lock) => Ok(lock.is_some()),
131 Err(_) => {
132 error!("Failed to acquire worker task lock");
133 Err(TcpClientError::LockError)
134 }
135 }
136 }
137
138 pub async fn subscribe(
146 &self,
147 addr: &str,
148 subscription_request: TcpSubscriptionRequest,
149 ) -> Result<mpsc::Receiver<Event>, TcpClientError> {
150 info!("Subscribing to {}", addr);
151
152 let mut worker_lock = self
154 .worker_task
155 .lock()
156 .map_err(|_| TcpClientError::LockError)?;
157
158 if let Some(handle) = worker_lock.take() {
160 info!("Shutting down existing subscription task before starting a new one");
161 self.send_shutdown_signal()?;
163
164 let mut shutdown_guard = self
166 .shutdown_channel
167 .lock()
168 .map_err(|_| TcpClientError::LockError)?;
169 *shutdown_guard = ShutdownChannel::new();
170 drop(shutdown_guard); match tokio::time::timeout(Duration::from_secs(5), handle).await {
174 Ok(join_result) => match join_result {
175 Ok(_) => info!("Successfully joined existing task"),
176 Err(_) => error!("Error joining existing task: it panicked"),
177 },
178 Err(_) => {
179 error!("Timed out waiting for existing task to complete, proceeding with new task anyway");
181 }
182 }
183 }
184
185 let (tx, rx) = mpsc::channel::<Event>(100);
187
188 let reconnect_settings = self.config.clone();
190 let shutdown_receiver = {
191 let guard = self
192 .shutdown_channel
193 .lock()
194 .map_err(|_| TcpClientError::LockError)?;
195 guard.get_receiver()
196 };
197 let addr = addr.to_owned();
198 let subscription_request = subscription_request;
199 let status_tracker = self.status_tracker.clone();
200
201 status_tracker.update_status(ConnectionStatus::Connecting);
203
204 let reconnection_config = reconnection::from_async_reconnect_settings(&reconnect_settings);
206
207 info!("Creating reconnection manager");
208
209 let handle = tokio::spawn(async move {
211 let update_status = |new_status| status_tracker.update_status(new_status);
213
214 let mut reconnection_manager = ReconnectionManager::new(reconnection_config);
216
217 let mut shutdown_rx = shutdown_receiver;
219
220 let ping_interval = reconnect_settings.ping_interval;
222 let pong_timeout = reconnect_settings.pong_timeout;
223 let mut last_pong_time = std::time::Instant::now();
224 let mut ping_timer = tokio::time::interval(ping_interval);
225 ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
226 let mut awaiting_pong = false;
227
228 loop {
229 if shutdown_rx.has_changed().unwrap_or(false) {
231 info!("Shutdown signal received. Exiting subscription task.");
232 update_status(ConnectionStatus::Disconnected);
233 break;
234 }
235 info!("Attempting to connect to {}", addr);
236 update_status(ConnectionStatus::Connecting);
237
238 match TcpStream::connect(&addr).await {
239 Ok(stream) => {
240 info!("Connected to {}.", addr);
241 update_status(ConnectionStatus::Connected);
242
243 reconnection_manager.reset();
245
246 let (reader, mut writer) = stream.into_split();
247 let mut reader = BufReader::new(reader);
248
249 let ping_interval = reconnect_settings.ping_interval;
251 let pong_timeout = reconnect_settings.pong_timeout;
252 let mut last_pong_time = std::time::Instant::now();
253 let mut ping_timer = tokio::time::interval(ping_interval);
254 ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
255 let mut awaiting_pong = false;
256
257 match serde_json::to_string(&subscription_request) {
259 Ok(req_json) => {
260 if let Err(e) = writer.write_all(req_json.as_bytes()).await {
261 error!("Error sending subscription request: {}", e);
262 continue;
264 }
265 if let Err(e) = writer.write_all(b"\n").await {
266 error!("Error writing newline: {}", e);
267 continue;
268 }
269 if let Err(e) = writer.flush().await {
270 error!("Error flushing writer: {}", e);
271 continue;
272 }
273 }
274 Err(e) => {
275 error!("Error serializing subscription request: {}", e);
276 update_status(ConnectionStatus::Disconnected);
277 break;
278 }
279 }
280
281 let mut byte_buf =
283 Vec::with_capacity(reconnect_settings.read_buffer_capacity);
284 loop {
286 if byte_buf.capacity() > reconnect_settings.max_buffer_size {
288 error!(
289 "Buffer capacity exceeded maximum allowed size ({}), resetting connection.",
290 reconnect_settings.max_buffer_size
291 );
292 break; }
294
295 tokio::select! {
296 result = reader.read_until(b'\n', &mut byte_buf) => {
297 match result {
298 Ok(0) => {
299 warn!("TCP connection closed by server.");
301 update_status(ConnectionStatus::Reconnecting);
302 break; }
304 Ok(n) if n > 0 => {
305 let message_bytes = byte_buf.trim_ascii_end(); if !message_bytes.is_empty() {
312 if message_bytes == b"PONG" {
314 if awaiting_pong {
315 awaiting_pong = false;
316 last_pong_time = std::time::Instant::now();
317 debug!("Received PONG");
318 } else {
319 warn!("Received unexpected PONG");
320 }
321 } else {
322 if message_bytes.len() > reconnect_settings.max_buffer_size {
324 error!(
325 "Received message exceeds maximum allowed size ({}), skipping. Message starts with: {:?}",
326 reconnect_settings.max_buffer_size,
327 String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 50)]) );
329 } else {
331 match serde_json::from_slice::<Event>(message_bytes) {
333 Ok(event) => {
334 last_pong_time = std::time::Instant::now();
336 awaiting_pong = false; if let Err(e) = tx.send(event).await {
338 error!("Failed to send event to channel: {}", e);
339 update_status(ConnectionStatus::Disconnected); return; }
342 }
343 Err(e) => {
344 error!(
345 "Failed to parse event: {}. Raw data (first 100 bytes): {:?}",
346 e,
347 String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 100)])
348 );
349 update_status(ConnectionStatus::Reconnecting);
350 break;
351 }
352 }
353 }
354 }
355 }
356 byte_buf.clear();
358 }
359 Ok(_) => {
360 byte_buf.clear();
362 }
363 Err(e) => {
364 error!("Error reading from TCP socket using read_until: {}", e);
365 update_status(ConnectionStatus::Reconnecting);
366 break; }
368 }
369 }
370 _ = ping_timer.tick() => {
371 if awaiting_pong {
373 let elapsed = last_pong_time.elapsed();
375 if elapsed > pong_timeout {
376 warn!("Pong response timed out after {:?}, considering connection dead", elapsed);
377 update_status(ConnectionStatus::Reconnecting);
378 break;
379 }
380 } else {
381 match writer.write_all(b"PING\n").await {
383 Ok(_) => {
384 if let Err(e) = writer.flush().await {
385 error!("Failed to flush ping: {}", e);
386 update_status(ConnectionStatus::Reconnecting);
387 break;
388 }
389 awaiting_pong = true;
390 }
391 Err(e) => {
392 error!("Failed to send ping: {}", e);
393 update_status(ConnectionStatus::Reconnecting);
394 break;
395 }
396 }
397 }
398 }
399 _ = shutdown_rx.changed() => {
400 info!("Shutdown signal received. Exiting TCP subscription task.");
401 update_status(ConnectionStatus::Disconnected);
402 return;
403 }
404 }
405 }
406 info!("Lost connection. Preparing to reconnect...");
408 update_status(ConnectionStatus::Reconnecting);
409 }
410 Err(e) => {
411 error!("Failed to connect to {}: {}. Will retry...", addr, e);
412 update_status(ConnectionStatus::Reconnecting);
413 }
414 }
415
416 match reconnection_manager.next_delay() {
418 Some(wait_time) => {
419 info!(
420 "Reconnecting in {:?}... (attempt {}/{:?})",
421 wait_time,
422 reconnection_manager.current_attempt(),
423 reconnection_manager.config().max_attempts
424 );
425 tokio::time::sleep(wait_time).await;
426 }
427 None => {
428 error!(
429 "Reached maximum reconnection attempts ({}). Exiting subscription task.",
430 reconnection_manager.config().max_attempts.unwrap_or(0)
431 );
432 update_status(ConnectionStatus::Disconnected);
433 break;
434 }
435 }
436 }
437
438 info!("Exiting TCP subscription task.");
439 update_status(ConnectionStatus::Disconnected);
440 });
441
442 *worker_lock = Some(handle);
444
445 Ok(rx)
446 }
447
448 fn send_shutdown_signal(&self) -> Result<(), TcpClientError> {
450 let channel = match self.shutdown_channel.lock() {
451 Ok(channel) => channel,
452 Err(_) => {
453 error!("Failed to acquire shutdown channel lock");
454 return Err(TcpClientError::LockError);
455 }
456 };
457
458 if let Err(e) = channel.send() {
459 error!("Failed to send shutdown signal: {:?}", e);
460 return Err(TcpClientError::IOError(std::io::Error::new(
461 std::io::ErrorKind::Other,
462 "Failed to send shutdown signal",
463 )));
464 }
465
466 Ok(())
467 }
468
469 pub fn shutdown(&self) {
472 self.status_tracker
474 .update_status(ConnectionStatus::Disconnected);
475
476 if let Err(e) = self.send_shutdown_signal() {
478 error!("Error in shutdown: {:?}", e);
479 }
480 }
481
482 pub async fn shutdown_and_join(&self) -> Result<(), TcpClientError> {
485 self.shutdown();
487
488 self.join().await
490 }
491
492 pub async fn join(&self) -> Result<(), TcpClientError> {
495 let mut worker_lock = self
497 .worker_task
498 .lock()
499 .map_err(|_| TcpClientError::LockError)?;
500
501 if let Some(handle) = worker_lock.take() {
503 match handle.await {
504 Ok(_) => {
505 info!("Successfully joined worker task");
506 Ok(())
507 }
508 Err(_) => {
509 error!("Failed to join worker task due to panic");
510 Err(TcpClientError::JoinError)
511 }
512 }
513 } else {
514 info!("No worker task to join");
516 Ok(())
517 }
518 }
519}
520
521impl Drop for AsyncTcpClient {
522 fn drop(&mut self) {
523 self.shutdown();
525
526 info!("AsyncTcpClient dropped, task will continue running until shutdown completes");
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use std::net::{SocketAddr, TcpListener};
535 use std::sync::Arc;
536 use std::sync::Once;
537 use titan_types_core::EventType;
538 use tokio::io::{AsyncReadExt, AsyncWriteExt};
539 use tokio::net::TcpListener as TokioTcpListener;
540 use tokio::select;
541 use tokio::signal::unix::{signal, SignalKind};
542 use tokio::sync::oneshot;
543 use tokio::task::JoinHandle;
544 use tokio::time::sleep;
545 use tracing_subscriber::{self, EnvFilter};
546
547 static INIT: Once = Once::new();
549
550 fn init_test_logger() {
551 INIT.call_once(|| {
552 let filter =
554 EnvFilter::from_default_env().add_directive("titan_client=trace".parse().unwrap());
555
556 tracing_subscriber::fmt()
557 .with_env_filter(filter)
558 .with_test_writer() .init();
560
561 println!("Test logger initialized at TRACE level");
562 });
563 }
564
565 async fn start_async_test_server() -> (JoinHandle<()>, SocketAddr, oneshot::Sender<()>) {
567 init_test_logger();
569
570 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
572
573 let listener = TokioTcpListener::bind("127.0.0.1:0").await.unwrap();
575 let addr = listener.local_addr().unwrap();
576
577 info!("Test server starting on {}", addr);
579
580 let handle = tokio::spawn(async move {
582 loop {
583 tokio::select! {
584 accept_result = listener.accept() => {
585 match accept_result {
586 Ok((mut stream, client_addr)) => {
587 info!("Test server accepted connection from {}", client_addr);
588
589 tokio::spawn(async move {
591 let (reader, mut writer) = stream.split();
592 let mut reader = BufReader::new(reader);
593 let mut request_buf = Vec::new();
594
595 match reader.read_until(b'\n', &mut request_buf).await {
597 Ok(n) if n > 0 => {
598 let request_bytes = request_buf.trim_ascii_end();
599 info!("Test server received request: {}", String::from_utf8_lossy(request_bytes));
600
601 sleep(Duration::from_millis(10)).await;
603
604 let event_json = r#"{"type":"TransactionsAdded","data": { "txids":["2222222222222222222222222222222222222222222222222222222222222222"]}}"#;
605 if let Err(e) = writer.write_all(event_json.as_bytes()).await {
606 error!("Test server failed to write event: {}", e); return;
607 }
608 if let Err(e) = writer.write_all(b"\n").await {
609 error!("Test server failed to write newline: {}", e); return;
610 }
611 if let Err(e) = writer.flush().await {
612 error!("Test server failed to flush: {}", e); return;
613 }
614 info!("Test server sent event and newline");
615
616 let mut line_buf = Vec::new();
618 loop {
619 line_buf.clear();
621 tokio::select! {
622 read_res = reader.read_until(b'\n', &mut line_buf) => {
623 match read_res {
624 Ok(0) => {
625 info!("Test server: Client disconnected");
626 break; }
628 Ok(m) if m > 0 => {
629 let trimmed_line = line_buf.trim_ascii_end();
630 if trimmed_line == b"PING" {
631 info!("Test server received PING");
632 if let Err(e) = writer.write_all(b"PONG\n").await {
633 error!("Test server failed to send PONG: {}", e);
634 break; }
636 if let Err(e) = writer.flush().await {
637 error!("Test server failed to flush PONG: {}", e);
638 break; }
640 info!("Test server sent PONG");
641 } else if !trimmed_line.is_empty() {
642 info!("Test server received unexpected data: {}", String::from_utf8_lossy(trimmed_line));
644 }
645 }
646 Ok(_) => {
647 }
649 Err(e) => {
650 error!("Test server read error in PING loop: {}", e);
651 break; }
653 }
654 }
655 _ = tokio::time::sleep(Duration::from_secs(1)) => {
657 }
659 }
660 }
661 },
662 Ok(0) => {
663 info!("Test server received empty read, client closed connection early");
664 },
665 Ok(n) => {
666 info!("Test server received {} bytes, but not processing as initial request", n);
667 },
668 Err(e) => error!("Test server initial read error: {}", e),
669 }
670 });
671 },
672 Err(e) => {
673 error!("Test server accept error: {}", e);
674 sleep(Duration::from_millis(10)).await;
676 }
677 }
678 },
679 _ = &mut shutdown_rx => {
680 info!("Test server received shutdown signal");
681 break;
682 }
683 }
684 }
685 info!("Test server shutting down");
686 });
687
688 sleep(Duration::from_millis(10)).await;
690 info!("Test server setup complete, returning handle");
691
692 (handle, addr, shutdown_tx)
693 }
694
695 #[tokio::test]
696 async fn test_shutdown_and_join() {
697 init_test_logger();
699 info!("Starting test_shutdown_and_join");
700
701 let client = AsyncTcpClient::new_with_config(Config {
703 max_retries: Some(2), retry_delay: Duration::from_millis(100), read_buffer_capacity: 4096,
706 max_buffer_size: 10 * 1024 * 1024,
707 ping_interval: Duration::from_secs(30),
708 pong_timeout: Duration::from_secs(10),
709 });
710
711 let subscription_request = TcpSubscriptionRequest { subscribe: vec![] };
713 info!("Subscribing to non-existent server to test shutdown");
714
715 let result = client.subscribe("127.0.0.1:1", subscription_request).await;
717 assert!(result.is_ok());
718 info!("Subscription started, checking active task");
719
720 assert!(client.has_active_task().unwrap());
722
723 info!("Waiting for reconnection attempts to begin");
725 sleep(Duration::from_millis(300)).await;
726
727 info!("Shutting down client");
729 let result = client.shutdown_and_join().await;
730
731 assert!(result.is_ok());
733 info!("Client shutdown successfully");
734
735 assert!(!client.has_active_task().unwrap());
737 info!("Test completed successfully");
738 }
739
740 #[tokio::test]
741 async fn test_multiple_subscribes() {
742 init_test_logger();
744 info!("Starting test_multiple_subscribes");
745
746 let client = AsyncTcpClient::new_with_config(Config {
748 max_retries: Some(1), retry_delay: Duration::from_millis(100), read_buffer_capacity: 4096,
751 max_buffer_size: 10 * 1024 * 1024,
752 ping_interval: Duration::from_secs(30),
753 pong_timeout: Duration::from_secs(10),
754 });
755
756 let subscription_request1 = TcpSubscriptionRequest { subscribe: vec![] };
758 info!("Creating first subscription");
759 let result1 = client.subscribe("127.0.0.1:1", subscription_request1).await;
760 assert!(result1.is_ok());
761
762 assert!(client.has_active_task().unwrap());
764 info!("First subscription active");
765
766 sleep(Duration::from_millis(200)).await;
768
769 let subscription_request2 = TcpSubscriptionRequest { subscribe: vec![] };
771 info!("Creating second subscription (should replace the first)");
772 let result2 = client.subscribe("127.0.0.1:2", subscription_request2).await;
773 assert!(result2.is_ok());
774
775 assert!(client.has_active_task().unwrap());
777 info!("Second subscription active");
778
779 info!("Shutting down client");
781 let join_result = client.shutdown_and_join().await;
782 assert!(join_result.is_ok());
783
784 assert!(!client.has_active_task().unwrap());
786 info!("Test completed successfully");
787 }
788
789 #[tokio::test]
790 async fn test_async_connection_status_transitions() {
791 init_test_logger();
793 info!("Starting test_async_connection_status_transitions");
794
795 info!("Starting test server");
797 let (server_handle, server_addr, shutdown_tx) = start_async_test_server().await;
798
799 let client = AsyncTcpClient::new_with_config(Config {
801 max_retries: Some(2),
802 retry_delay: Duration::from_millis(100),
803 read_buffer_capacity: 4096,
804 max_buffer_size: 10 * 1024 * 1024,
805 ping_interval: Duration::from_secs(30),
806 pong_timeout: Duration::from_secs(10),
807 });
808
809 assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
811 info!("Initial status: {:?}", client.get_status());
812
813 let subscription_request = TcpSubscriptionRequest {
815 subscribe: vec![EventType::TransactionsAdded],
816 };
817
818 info!("Subscribing to test server at {}", server_addr);
819 let rx = client
820 .subscribe(
821 &format!("127.0.0.1:{}", server_addr.port()),
822 subscription_request,
823 )
824 .await
825 .unwrap();
826
827 info!("Waiting for connection to establish");
829 sleep(Duration::from_millis(100)).await;
830
831 let status = client.get_status();
833 info!("Status after connection attempt: {:?}", status);
834 assert_eq!(status, ConnectionStatus::Connected);
835
836 info!("Shutting down client");
838 client.shutdown_and_join().await.unwrap();
839
840 let final_status = client.get_status();
842 info!("Final status: {:?}", final_status);
843 assert_eq!(final_status, ConnectionStatus::Disconnected);
844
845 info!("Shutting down test server");
847 let _ = shutdown_tx.send(());
848 let _ = server_handle.await;
849 info!("Test completed successfully");
850 }
851
852 #[tokio::test]
853 async fn test_async_receive_events() {
854 init_test_logger();
856 info!("Starting test_async_receive_events");
857
858 info!("Starting test server");
860 let (server_handle, server_addr, shutdown_tx) = start_async_test_server().await;
861 info!("Test server started at {}", server_addr);
862
863 let client = AsyncTcpClient::new_with_config(Config {
865 max_retries: Some(2),
866 retry_delay: Duration::from_millis(100),
867 read_buffer_capacity: 4096,
868 max_buffer_size: 10 * 1024 * 1024,
869 ping_interval: Duration::from_secs(30),
870 pong_timeout: Duration::from_secs(10),
871 });
872
873 let subscription_request = TcpSubscriptionRequest {
875 subscribe: vec![EventType::TransactionsAdded],
876 };
877
878 info!("Subscribing to test server at {}", server_addr);
879 let mut rx = client
880 .subscribe(
881 &format!("127.0.0.1:{}", server_addr.port()),
882 subscription_request,
883 )
884 .await
885 .unwrap();
886
887 info!("Waiting for connection to establish");
889 sleep(Duration::from_millis(100)).await;
890
891 info!("Checking connection status: {:?}", client.get_status());
892 assert_eq!(
893 client.get_status(),
894 ConnectionStatus::Connected,
895 "Expected client to be connected, but status is {:?}",
896 client.get_status()
897 );
898
899 info!("Waiting to receive an event");
901 let event = tokio::time::timeout(Duration::from_secs(5), rx.recv()).await;
902
903 if let Err(e) = &event {
904 error!("Timeout waiting for event: {}", e);
905 info!("Current client status: {:?}", client.get_status());
907 assert!(false, "Failed to receive event within timeout");
908 }
909
910 let event = event.unwrap();
911
912 if let None = &event {
913 error!("Received None from channel - sender was likely dropped");
914 info!("Current client status: {:?}", client.get_status());
915 assert!(false, "Expected Some(event) but got None");
916 }
917
918 match event.unwrap() {
919 Event::TransactionsAdded { txids } => {
920 info!("Received transaction event with {} txids", txids.len());
921 assert_eq!(txids.len(), 1, "Expected 1 txid in event");
922 assert_eq!(
923 txids[0].to_string(),
924 "2222222222222222222222222222222222222222222222222222222222222222"
925 );
926 }
927 other => {
928 error!("Received unexpected event type: {:?}", other);
929 panic!("Received unexpected event type: {:?}", other);
930 }
931 }
932
933 info!("Shutting down client");
935 client.shutdown_and_join().await.unwrap();
936
937 info!("Shutting down test server");
939 let _ = shutdown_tx.send(());
940 let _ = server_handle.await;
941 info!("Test completed successfully");
942 }
943
944 #[tokio::test]
945 async fn test_async_connection_error_handling() {
946 init_test_logger();
948 info!("Starting test_async_connection_error_handling");
949
950 let client = AsyncTcpClient::new_with_config(Config {
952 max_retries: Some(2),
953 retry_delay: Duration::from_millis(100),
954 read_buffer_capacity: 4096,
955 max_buffer_size: 10 * 1024 * 1024,
956 ping_interval: Duration::from_secs(30),
957 pong_timeout: Duration::from_secs(10),
958 });
959
960 assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
962 info!("Initial status: {:?}", client.get_status());
963
964 let subscription_request = TcpSubscriptionRequest {
966 subscribe: vec![EventType::TransactionsAdded],
967 };
968
969 info!("Subscribing to non-existent server to test error handling");
970 let rx = client
971 .subscribe("127.0.0.1:1", subscription_request)
972 .await
973 .unwrap();
974
975 info!("Waiting for connection attempts");
977 sleep(Duration::from_millis(500)).await;
978
979 let status = client.get_status();
981 info!("Status after connection attempts: {:?}", status);
982 assert!(
983 status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
984 "Expected Reconnecting or Disconnected state, got {:?}",
985 status
986 );
987
988 info!("Shutting down client");
990 client.shutdown_and_join().await.unwrap();
991
992 let final_status = client.get_status();
994 info!("Final status: {:?}", final_status);
995 assert_eq!(final_status, ConnectionStatus::Disconnected);
996 info!("Test completed successfully");
997 }
998
999 #[tokio::test]
1000 async fn test_async_shutdown_during_reconnect() {
1001 init_test_logger();
1003 info!("Starting test_async_shutdown_during_reconnect");
1004
1005 let client = AsyncTcpClient::new_with_config(Config {
1008 max_retries: Some(100), retry_delay: Duration::from_millis(500), read_buffer_capacity: 4096,
1011 max_buffer_size: 10 * 1024 * 1024,
1012 ping_interval: Duration::from_secs(30),
1013 pong_timeout: Duration::from_secs(10),
1014 });
1015
1016 let subscription_request = TcpSubscriptionRequest {
1018 subscribe: vec![EventType::TransactionsAdded],
1019 };
1020
1021 info!("Subscribing to non-existent server to trigger reconnection");
1022 let rx = client
1023 .subscribe("127.0.0.1:1", subscription_request)
1024 .await
1025 .unwrap();
1026
1027 info!("Waiting for client to enter reconnection state");
1030 sleep(Duration::from_millis(50)).await;
1031
1032 let status_before_shutdown = client.get_status();
1033 info!("Status before shutdown: {:?}", status_before_shutdown);
1034
1035 if status_before_shutdown != ConnectionStatus::Reconnecting {
1037 info!("Not in reconnecting state yet, waiting longer");
1038 sleep(Duration::from_millis(50)).await;
1039 let status_before_shutdown = client.get_status();
1040 info!("Status after additional wait: {:?}", status_before_shutdown);
1041 }
1042
1043 assert_eq!(client.get_status(), ConnectionStatus::Reconnecting);
1046
1047 info!("Shutting down client during reconnection");
1049 client.shutdown();
1050
1051 let status_after_shutdown = client.get_status();
1053 info!(
1054 "Status immediately after shutdown: {:?}",
1055 status_after_shutdown
1056 );
1057 assert_eq!(status_after_shutdown, ConnectionStatus::Disconnected);
1058
1059 info!("Joining worker task");
1061 sleep(Duration::from_millis(50)).await;
1062 let result = client.join().await;
1063 info!("Join result: {:?}", result);
1064 assert!(
1065 result.is_ok(),
1066 "Failed to join task during reconnection: {:?}",
1067 result
1068 );
1069
1070 let final_status = client.get_status();
1072 info!("Final status: {:?}", final_status);
1073 assert_eq!(final_status, ConnectionStatus::Disconnected);
1074 info!("Test completed successfully");
1075 }
1076
1077 #[tokio::test]
1078 async fn test_async_buffer_size_limit() {
1079 init_test_logger();
1081 info!("Starting test_async_buffer_size_limit");
1082
1083 info!("Starting oversized data test server");
1085 let listener = TokioTcpListener::bind("127.0.0.1:0").await.unwrap();
1086 let server_addr = listener.local_addr().unwrap();
1087 info!("Test server bound to {}", server_addr);
1088
1089 let server_handle = tokio::spawn(async move {
1091 info!("Test server waiting for connection");
1092 if let Ok((mut stream, client_addr)) = listener.accept().await {
1093 info!("Test server accepted connection from {}", client_addr);
1094 let mut buf = [0; 1024];
1096 match stream.read(&mut buf).await {
1097 Ok(n) => {
1098 let request = String::from_utf8_lossy(&buf[..n]);
1099 info!("Test server received request: {}", request);
1100
1101 info!("Sending 100KB payload to test buffer limits");
1103
1104 let prefix = r#"{"type":"TransactionsAdded", "data": {"txids":["#;
1106 stream.write_all(prefix.as_bytes()).await.unwrap();
1107
1108 let large_payload = "x".repeat(100_000); stream.write_all(large_payload.as_bytes()).await.unwrap();
1111
1112 let suffix = r#"}]}}"#;
1114 stream.write_all(suffix.as_bytes()).await.unwrap();
1115 stream.write_all(b"\n").await.unwrap();
1116 info!("Large payload sent");
1117
1118 sleep(Duration::from_millis(100)).await;
1120 }
1121 Err(e) => {
1122 error!("Test server read error: {}", e);
1123 }
1124 }
1125 }
1126 info!("Test server shutting down");
1127 });
1128
1129 info!("Creating client with small buffer size");
1131 let client = AsyncTcpClient::new_with_config(Config {
1132 max_retries: Some(1),
1133 retry_delay: Duration::from_millis(100),
1134 read_buffer_capacity: 1024, max_buffer_size: 10 * 1024, ping_interval: Duration::from_secs(30),
1137 pong_timeout: Duration::from_secs(10),
1138 });
1139
1140 let subscription_request = TcpSubscriptionRequest {
1142 subscribe: vec![EventType::TransactionsAdded],
1143 };
1144
1145 info!("Subscribing to server with buffer size limit test");
1146 let _rx = client
1147 .subscribe(
1148 &format!("127.0.0.1:{}", server_addr.port()),
1149 subscription_request,
1150 )
1151 .await
1152 .unwrap();
1153
1154 info!("Waiting for buffer overflow to occur");
1156 sleep(Duration::from_millis(300)).await;
1157
1158 let status = client.get_status();
1160 info!("Client status after buffer overflow test: {:?}", status);
1161 assert!(
1162 status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
1163 "Expected client to disconnect due to buffer overflow, but status is {:?}",
1164 status
1165 );
1166
1167 info!("Cleaning up");
1169 client.shutdown_and_join().await.unwrap();
1170 let _ = server_handle.await;
1171 info!("Test completed successfully");
1172 }
1173}