titan_client/tcp/
tcp_client.rs

1use std::{
2    sync::{Arc, Mutex},
3    time::Duration,
4};
5
6use serde_json;
7use thiserror::Error;
8use titan_types_api::TcpSubscriptionRequest;
9use titan_types_core::Event;
10use tokio::{
11    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
12    net::TcpStream,
13    sync::{mpsc, watch},
14    task::JoinHandle,
15};
16use tracing::{debug, error, info, warn};
17
18use crate::tcp::{
19    connection_status::ConnectionStatus,
20    reconnection::{self, ReconnectionManager},
21};
22
23use super::connection_status::ConnectionStatusTracker;
24
25#[derive(Debug, Error)]
26pub enum TcpClientError {
27    #[error("io error: {0}")]
28    IOError(#[from] std::io::Error),
29    #[error("serde error: {0}")]
30    SerdeError(#[from] serde_json::Error),
31    #[error("join error: task panicked")]
32    JoinError,
33    #[error("lock error: failed to acquire lock")]
34    LockError,
35}
36
37/// Settings for reconnecting.
38#[derive(Debug, Clone)]
39pub struct Config {
40    /// Maximum number of reconnect attempts. Use `None` for unlimited retries.
41    pub max_retries: Option<u32>,
42    /// Delay between reconnect attempts.
43    pub retry_delay: Duration,
44    /// Initial capacity of the read buffer (in bytes)
45    pub read_buffer_capacity: usize,
46    /// Maximum allowed size for the read buffer (in bytes)
47    pub max_buffer_size: usize,
48    /// Interval between ping messages
49    pub ping_interval: Duration,
50    /// Timeout for waiting for pong responses
51    pub pong_timeout: Duration,
52}
53
54impl Default for Config {
55    fn default() -> Self {
56        Self {
57            max_retries: None,
58            retry_delay: Duration::from_secs(1), // Match the default used by ReconnectionConfig
59            read_buffer_capacity: 4096,          // 4KB initial capacity
60            max_buffer_size: 10 * 1024 * 1024,   // 10MB max buffer size (same as sync client)
61            ping_interval: Duration::from_secs(30), // Send ping every 30 seconds
62            pong_timeout: Duration::from_secs(10), // Wait 10 seconds for pong response
63        }
64    }
65}
66
67/// Shared shutdown channel that can be safely modified
68struct ShutdownChannel {
69    sender: watch::Sender<()>,
70    receiver: watch::Receiver<()>,
71}
72
73impl ShutdownChannel {
74    fn new() -> Self {
75        let (sender, receiver) = watch::channel(());
76        Self { sender, receiver }
77    }
78
79    fn get_receiver(&self) -> watch::Receiver<()> {
80        self.receiver.clone()
81    }
82
83    fn send(&self) -> Result<(), watch::error::SendError<()>> {
84        self.sender.send(())
85    }
86}
87
88/// Asynchronous TCP client that encapsulates the shutdown signal and reconnect settings.
89pub struct AsyncTcpClient {
90    shutdown_channel: Arc<Mutex<ShutdownChannel>>,
91    config: Config,
92    status_tracker: ConnectionStatusTracker,
93    worker_task: Mutex<Option<JoinHandle<()>>>,
94}
95
96impl AsyncTcpClient {
97    /// Creates a new instance with custom reconnect settings.
98    pub fn new_with_config(config: Config) -> Self {
99        Self {
100            shutdown_channel: Arc::new(Mutex::new(ShutdownChannel::new())),
101            config,
102            status_tracker: ConnectionStatusTracker::new(),
103            worker_task: Mutex::new(None),
104        }
105    }
106
107    /// Creates a new instance with default reconnect settings:
108    /// unlimited retries with a 1-second base delay that increases with exponential backoff.
109    pub fn new() -> Self {
110        Self::new_with_config(Config::default())
111    }
112
113    /// Get the current connection status
114    pub fn get_status(&self) -> ConnectionStatus {
115        self.status_tracker.get_status()
116    }
117
118    /// Get whether the client was disconnected at any point in time
119    pub fn create_status_subscriber(&self) -> mpsc::Receiver<ConnectionStatus> {
120        let (tx, rx) = mpsc::channel(100);
121        self.status_tracker.register_listener(tx);
122        rx
123    }
124
125    /// Checks if there is an active worker task.
126    ///
127    /// Returns true if a worker task is currently running.
128    pub fn has_active_task(&self) -> Result<bool, TcpClientError> {
129        match self.worker_task.lock() {
130            Ok(lock) => Ok(lock.is_some()),
131            Err(_) => {
132                error!("Failed to acquire worker task lock");
133                Err(TcpClientError::LockError)
134            }
135        }
136    }
137
138    /// Subscribes to the TCP server at `addr` with the given subscription request.
139    /// Returns a channel receiver for incoming events.
140    ///
141    /// This method includes reconnect logic with exponential backoff. If the connection is lost,
142    /// the client will automatically try to reconnect using the provided settings.
143    ///
144    /// If there's already an active subscription task, it will be shut down and a new one will be created.
145    pub async fn subscribe(
146        &self,
147        addr: &str,
148        subscription_request: TcpSubscriptionRequest,
149    ) -> Result<mpsc::Receiver<Event>, TcpClientError> {
150        info!("Subscribing to {}", addr);
151
152        // Check if we already have a worker task running
153        let mut worker_lock = self
154            .worker_task
155            .lock()
156            .map_err(|_| TcpClientError::LockError)?;
157
158        // If there's an existing task, shut it down and join it
159        if let Some(handle) = worker_lock.take() {
160            info!("Shutting down existing subscription task before starting a new one");
161            // Send shutdown signal
162            self.send_shutdown_signal()?;
163
164            // Create a new shutdown channel for the new task
165            let mut shutdown_guard = self
166                .shutdown_channel
167                .lock()
168                .map_err(|_| TcpClientError::LockError)?;
169            *shutdown_guard = ShutdownChannel::new();
170            drop(shutdown_guard); // Release the lock
171
172            // Attempt to join the existing task with a timeout
173            match tokio::time::timeout(Duration::from_secs(5), handle).await {
174                Ok(join_result) => match join_result {
175                    Ok(_) => info!("Successfully joined existing task"),
176                    Err(_) => error!("Error joining existing task: it panicked"),
177                },
178                Err(_) => {
179                    // Task didn't complete within timeout
180                    error!("Timed out waiting for existing task to complete, proceeding with new task anyway");
181                }
182            }
183        }
184
185        // Create a channel to forward events.
186        let (tx, rx) = mpsc::channel::<Event>(100);
187
188        // Clone the settings and shutdown receiver for the spawned task.
189        let reconnect_settings = self.config.clone();
190        let shutdown_receiver = {
191            let guard = self
192                .shutdown_channel
193                .lock()
194                .map_err(|_| TcpClientError::LockError)?;
195            guard.get_receiver()
196        };
197        let addr = addr.to_owned();
198        let subscription_request = subscription_request;
199        let status_tracker = self.status_tracker.clone();
200
201        // Set initial status to Connecting
202        status_tracker.update_status(ConnectionStatus::Connecting);
203
204        // Create the reconnection config from settings
205        let reconnection_config = reconnection::from_async_reconnect_settings(&reconnect_settings);
206
207        info!("Creating reconnection manager");
208
209        // Spawn a task to manage connection, reading, and reconnection.
210        let handle = tokio::spawn(async move {
211            // Create a status updater for use in the async task
212            let update_status = |new_status| status_tracker.update_status(new_status);
213
214            // Create the reconnection manager
215            let mut reconnection_manager = ReconnectionManager::new(reconnection_config);
216
217            // Use the shutdown receiver
218            let mut shutdown_rx = shutdown_receiver;
219
220            // Ping-pong monitoring
221            let ping_interval = reconnect_settings.ping_interval;
222            let pong_timeout = reconnect_settings.pong_timeout;
223            let mut last_pong_time = std::time::Instant::now();
224            let mut ping_timer = tokio::time::interval(ping_interval);
225            ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
226            let mut awaiting_pong = false;
227
228            loop {
229                // Before each connection attempt, check for a shutdown signal.
230                if shutdown_rx.has_changed().unwrap_or(false) {
231                    info!("Shutdown signal received. Exiting subscription task.");
232                    update_status(ConnectionStatus::Disconnected);
233                    break;
234                }
235                info!("Attempting to connect to {}", addr);
236                update_status(ConnectionStatus::Connecting);
237
238                match TcpStream::connect(&addr).await {
239                    Ok(stream) => {
240                        info!("Connected to {}.", addr);
241                        update_status(ConnectionStatus::Connected);
242
243                        // Reset reconnection attempts after successful connection
244                        reconnection_manager.reset();
245
246                        let (reader, mut writer) = stream.into_split();
247                        let mut reader = BufReader::new(reader);
248
249                        // Ping-pong monitoring
250                        let ping_interval = reconnect_settings.ping_interval;
251                        let pong_timeout = reconnect_settings.pong_timeout;
252                        let mut last_pong_time = std::time::Instant::now();
253                        let mut ping_timer = tokio::time::interval(ping_interval);
254                        ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
255                        let mut awaiting_pong = false;
256
257                        // Serialize and send the subscription request.
258                        match serde_json::to_string(&subscription_request) {
259                            Ok(req_json) => {
260                                if let Err(e) = writer.write_all(req_json.as_bytes()).await {
261                                    error!("Error sending subscription request: {}", e);
262                                    // Let the reconnect loop try again.
263                                    continue;
264                                }
265                                if let Err(e) = writer.write_all(b"\n").await {
266                                    error!("Error writing newline: {}", e);
267                                    continue;
268                                }
269                                if let Err(e) = writer.flush().await {
270                                    error!("Error flushing writer: {}", e);
271                                    continue;
272                                }
273                            }
274                            Err(e) => {
275                                error!("Error serializing subscription request: {}", e);
276                                update_status(ConnectionStatus::Disconnected);
277                                break;
278                            }
279                        }
280
281                        // Initialize the byte buffer with the configured capacity
282                        let mut byte_buf =
283                            Vec::with_capacity(reconnect_settings.read_buffer_capacity);
284                        // Read loop: continuously receive events.
285                        loop {
286                            // Check if the buffer has grown too large
287                            if byte_buf.capacity() > reconnect_settings.max_buffer_size {
288                                error!(
289                                    "Buffer capacity exceeded maximum allowed size ({}), resetting connection.",
290                                    reconnect_settings.max_buffer_size
291                                );
292                                break; // Break inner loop to trigger reconnect
293                            }
294
295                            tokio::select! {
296                                result = reader.read_until(b'\n', &mut byte_buf) => {
297                                    match result {
298                                        Ok(0) => {
299                                            // Connection closed by the server.
300                                            warn!("TCP connection closed by server.");
301                                            update_status(ConnectionStatus::Reconnecting);
302                                            break; // Break inner loop to trigger reconnect
303                                        }
304                                        Ok(n) if n > 0 => {
305                                            // We read n bytes, including the newline.
306                                            // Process the received bytes.
307                                            // Note: read_until includes the delimiter in the buffer.
308                                            // Trim whitespace and the trailing newline before processing.
309                                            let message_bytes = byte_buf.trim_ascii_end(); // Efficient trim for ASCII/UTF8 whitespace + newline
310
311                                            if !message_bytes.is_empty() {
312                                                // Check if this is a pong response
313                                                if message_bytes == b"PONG" {
314                                                    if awaiting_pong {
315                                                        awaiting_pong = false;
316                                                        last_pong_time = std::time::Instant::now();
317                                                        debug!("Received PONG");
318                                                    }
319                                                } else {
320                                                    // Check if message size exceeds limit *before* parsing JSON
321                                                    if message_bytes.len() > reconnect_settings.max_buffer_size {
322                                                        error!(
323                                                            "Received message exceeds maximum allowed size ({}), skipping. Message starts with: {:?}",
324                                                            reconnect_settings.max_buffer_size,
325                                                            String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 50)]) // Log first 50 bytes
326                                                        );
327                                                        // Note: We don't break here, just clear the buffer and continue reading the next message.
328                                                    } else {
329                                                        // Try to parse as an event from the byte slice
330                                                        match serde_json::from_slice::<Event>(message_bytes) {
331                                                            Ok(event) => {
332                                                                // Every successful message resets pong timer
333                                                                last_pong_time = std::time::Instant::now();
334                                                                awaiting_pong = false; // Also reset awaiting_pong if we received a valid event
335                                                                if let Err(e) = tx.send(event).await {
336                                                                    error!("Failed to send event to channel: {}", e);
337                                                                    update_status(ConnectionStatus::Disconnected); // Channel broken, can't continue
338                                                                    return; // Exit the task
339                                                                }
340                                                            }
341                                                            Err(e) => {
342                                                                error!(
343                                                                    "Failed to parse event: {}. Raw data (first 100 bytes): {:?}",
344                                                                    e,
345                                                                    String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 100)])
346                                                                );
347                                                                update_status(ConnectionStatus::Reconnecting);
348                                                                break;
349                                                            }
350                                                        }
351                                                    }
352                                                }
353                                            }
354                                            // Clear the buffer for the next message AFTER processing the current one
355                                            byte_buf.clear();
356                                        }
357                                        Ok(_) => {
358                                            // n == 0, should be handled by Ok(0) case, but safety belt
359                                            byte_buf.clear();
360                                        }
361                                        Err(e) => {
362                                            error!("Error reading from TCP socket using read_until: {}", e);
363                                            update_status(ConnectionStatus::Reconnecting);
364                                            break; // Break inner loop to trigger reconnect
365                                        }
366                                    }
367                                }
368                                _ = ping_timer.tick() => {
369                                    // Time to send a ping
370                                    if awaiting_pong {
371                                        // We're still waiting for a pong from the previous ping
372                                        let elapsed = last_pong_time.elapsed();
373                                        if elapsed > pong_timeout {
374                                            warn!("Pong response timed out after {:?}, considering connection dead", elapsed);
375                                            update_status(ConnectionStatus::Reconnecting);
376                                            break;
377                                        }
378                                    } else {
379                                        // Send a ping
380                                        match writer.write_all(b"PING\n").await {
381                                            Ok(_) => {
382                                                if let Err(e) = writer.flush().await {
383                                                    error!("Failed to flush ping: {}", e);
384                                                    update_status(ConnectionStatus::Reconnecting);
385                                                    break;
386                                                }
387                                                awaiting_pong = true;
388                                            }
389                                            Err(e) => {
390                                                error!("Failed to send ping: {}", e);
391                                                update_status(ConnectionStatus::Reconnecting);
392                                                break;
393                                            }
394                                        }
395                                    }
396                                }
397                                _ = shutdown_rx.changed() => {
398                                    info!("Shutdown signal received. Exiting TCP subscription task.");
399                                    update_status(ConnectionStatus::Disconnected);
400                                    return;
401                                }
402                            }
403                        }
404                        // When the inner loop ends (e.g. connection lost), try to reconnect.
405                        info!("Lost connection. Preparing to reconnect...");
406                        update_status(ConnectionStatus::Reconnecting);
407                    }
408                    Err(e) => {
409                        error!("Failed to connect to {}: {}. Will retry...", addr, e);
410                        update_status(ConnectionStatus::Reconnecting);
411                    }
412                }
413
414                // Get the next delay from the reconnection manager
415                match reconnection_manager.next_delay() {
416                    Some(wait_time) => {
417                        info!(
418                            "Reconnecting in {:?}... (attempt {}/{:?})",
419                            wait_time,
420                            reconnection_manager.current_attempt(),
421                            reconnection_manager.config().max_attempts
422                        );
423                        tokio::time::sleep(wait_time).await;
424                    }
425                    None => {
426                        error!(
427                            "Reached maximum reconnection attempts ({}). Exiting subscription task.",
428                            reconnection_manager.config().max_attempts.unwrap_or(0)
429                        );
430                        update_status(ConnectionStatus::Disconnected);
431                        break;
432                    }
433                }
434            }
435
436            info!("Exiting TCP subscription task.");
437            update_status(ConnectionStatus::Disconnected);
438        });
439
440        // Store the task handle
441        *worker_lock = Some(handle);
442
443        Ok(rx)
444    }
445
446    /// Helper method to send a shutdown signal
447    fn send_shutdown_signal(&self) -> Result<(), TcpClientError> {
448        let channel = match self.shutdown_channel.lock() {
449            Ok(channel) => channel,
450            Err(_) => {
451                error!("Failed to acquire shutdown channel lock");
452                return Err(TcpClientError::LockError);
453            }
454        };
455
456        if let Err(e) = channel.send() {
457            error!("Failed to send shutdown signal: {:?}", e);
458            return Err(TcpClientError::IOError(std::io::Error::new(
459                std::io::ErrorKind::Other,
460                "Failed to send shutdown signal",
461            )));
462        }
463
464        Ok(())
465    }
466
467    /// Signals the client to shut down by sending a signal through the watch channel.
468    /// Does not wait for the worker task to complete.
469    pub fn shutdown(&self) {
470        // First directly update the status locally to ensure it's set immediately
471        self.status_tracker
472            .update_status(ConnectionStatus::Disconnected);
473
474        // Then send the shutdown signal to the worker task
475        if let Err(e) = self.send_shutdown_signal() {
476            error!("Error in shutdown: {:?}", e);
477        }
478    }
479
480    /// Signals the client to shut down and waits for the worker task to complete.
481    /// Returns true if the task was successfully joined, false otherwise.
482    pub async fn shutdown_and_join(&self) -> Result<(), TcpClientError> {
483        // Signal shutdown
484        self.shutdown();
485
486        // Try to join the task
487        self.join().await
488    }
489
490    /// Waits for the worker task to complete.
491    /// Returns Ok(()) if the task was successfully joined, or an error if joining failed.
492    pub async fn join(&self) -> Result<(), TcpClientError> {
493        // Acquire the lock on the worker task
494        let mut worker_lock = self
495            .worker_task
496            .lock()
497            .map_err(|_| TcpClientError::LockError)?;
498
499        // Take the task handle out (replacing it with None)
500        if let Some(handle) = worker_lock.take() {
501            match handle.await {
502                Ok(_) => {
503                    info!("Successfully joined worker task");
504                    Ok(())
505                }
506                Err(_) => {
507                    error!("Failed to join worker task due to panic");
508                    Err(TcpClientError::JoinError)
509                }
510            }
511        } else {
512            // No task to join
513            info!("No worker task to join");
514            Ok(())
515        }
516    }
517}
518
519impl Drop for AsyncTcpClient {
520    fn drop(&mut self) {
521        // Signal task to terminate
522        self.shutdown();
523
524        // We can't await in drop, so just log that the task will continue to run until it checks the shutdown signal
525        info!("AsyncTcpClient dropped, task will continue running until shutdown completes");
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use std::net::{SocketAddr, TcpListener};
533    use std::sync::Arc;
534    use std::sync::Once;
535    use titan_types_core::EventType;
536    use tokio::io::{AsyncReadExt, AsyncWriteExt};
537    use tokio::net::TcpListener as TokioTcpListener;
538    use tokio::select;
539    use tokio::signal::unix::{signal, SignalKind};
540    use tokio::sync::oneshot;
541    use tokio::task::JoinHandle;
542    use tokio::time::sleep;
543    use tracing_subscriber::{self, EnvFilter};
544
545    // Initialize the logger once for all tests
546    static INIT: Once = Once::new();
547
548    fn init_test_logger() {
549        INIT.call_once(|| {
550            // Initialize a subscriber that prints all logs to stderr
551            let filter =
552                EnvFilter::from_default_env().add_directive("titan_client=trace".parse().unwrap());
553
554            tracing_subscriber::fmt()
555                .with_env_filter(filter)
556                .with_test_writer() // This ensures logs go to the test output
557                .init();
558
559            println!("Test logger initialized at TRACE level");
560        });
561    }
562
563    // Start an async test server
564    async fn start_async_test_server() -> (JoinHandle<()>, SocketAddr, oneshot::Sender<()>) {
565        // Initialize the logger for testing
566        init_test_logger();
567
568        // Create a shutdown channel
569        let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
570
571        // Create the server
572        let listener = TokioTcpListener::bind("127.0.0.1:0").await.unwrap();
573        let addr = listener.local_addr().unwrap();
574
575        // Log that the server is starting
576        info!("Test server starting on {}", addr);
577
578        // Spawn the server task
579        let handle = tokio::spawn(async move {
580            loop {
581                tokio::select! {
582                    accept_result = listener.accept() => {
583                        match accept_result {
584                            Ok((mut stream, client_addr)) => {
585                                info!("Test server accepted connection from {}", client_addr);
586
587                                // Handle the connection in a new task
588                                tokio::spawn(async move {
589                                    let (reader, mut writer) = stream.split();
590                                    let mut reader = BufReader::new(reader);
591                                    let mut request_buf = Vec::new();
592
593                                    // Read the request line
594                                    match reader.read_until(b'\n', &mut request_buf).await {
595                                        Ok(n) if n > 0 => {
596                                            let request_bytes = request_buf.trim_ascii_end();
597                                            info!("Test server received request: {}", String::from_utf8_lossy(request_bytes));
598
599                                            // Send back a test event with a small delay
600                                            sleep(Duration::from_millis(10)).await;
601
602                                            let event_json = r#"{"type":"TransactionsAdded","data": { "txids":["2222222222222222222222222222222222222222222222222222222222222222"]}}"#;
603                                            if let Err(e) = writer.write_all(event_json.as_bytes()).await {
604                                                 error!("Test server failed to write event: {}", e); return;
605                                            }
606                                            if let Err(e) = writer.write_all(b"\n").await {
607                                                error!("Test server failed to write newline: {}", e); return;
608                                            }
609                                            if let Err(e) = writer.flush().await {
610                                                 error!("Test server failed to flush: {}", e); return;
611                                            }
612                                            info!("Test server sent event and newline");
613
614                                            // Handle PING/PONG
615                                            let mut line_buf = Vec::new();
616                                            loop {
617                                                 // Clear buffer for next read
618                                                line_buf.clear();
619                                                tokio::select! {
620                                                    read_res = reader.read_until(b'\n', &mut line_buf) => {
621                                                        match read_res {
622                                                            Ok(0) => {
623                                                                info!("Test server: Client disconnected");
624                                                                break; // Exit loop on disconnect
625                                                            }
626                                                            Ok(m) if m > 0 => {
627                                                                let trimmed_line = line_buf.trim_ascii_end();
628                                                                if trimmed_line == b"PING" {
629                                                                    info!("Test server received PING");
630                                                                    if let Err(e) = writer.write_all(b"PONG\n").await {
631                                                                        error!("Test server failed to send PONG: {}", e);
632                                                                        break; // Exit loop on write error
633                                                                    }
634                                                                    if let Err(e) = writer.flush().await {
635                                                                         error!("Test server failed to flush PONG: {}", e);
636                                                                         break; // Exit loop on flush error
637                                                                    }
638                                                                    info!("Test server sent PONG");
639                                                                } else if !trimmed_line.is_empty() {
640                                                                    // Log unexpected input
641                                                                    info!("Test server received unexpected data: {}", String::from_utf8_lossy(trimmed_line));
642                                                                }
643                                                            }
644                                                            Ok(_) => {
645                                                                // Should not happen with Ok(0) case above
646                                                            }
647                                                            Err(e) => {
648                                                                error!("Test server read error in PING loop: {}", e);
649                                                                break; // Exit loop on read error
650                                                            }
651                                                        }
652                                                    }
653                                                    // Add a timeout or shutdown mechanism here if needed for the test server's loop
654                                                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
655                                                        // Periodically check or just idle
656                                                    }
657                                                }
658                                            }
659                                        },
660                                        Ok(0) => {
661                                            info!("Test server received empty read, client closed connection early");
662                                        },
663                                        Ok(n) => {
664                                             info!("Test server received {} bytes, but not processing as initial request", n);
665                                        },
666                                        Err(e) => error!("Test server initial read error: {}", e),
667                                    }
668                                });
669                            },
670                            Err(e) => {
671                                error!("Test server accept error: {}", e);
672                                // Add a short delay to prevent tight loop on accept errors
673                                sleep(Duration::from_millis(10)).await;
674                            }
675                        }
676                    },
677                    _ = &mut shutdown_rx => {
678                        info!("Test server received shutdown signal");
679                        break;
680                    }
681                }
682            }
683            info!("Test server shutting down");
684        });
685
686        // Small delay to ensure server is ready
687        sleep(Duration::from_millis(10)).await;
688        info!("Test server setup complete, returning handle");
689
690        (handle, addr, shutdown_tx)
691    }
692
693    #[tokio::test]
694    async fn test_shutdown_and_join() {
695        // Initialize logging for tests
696        init_test_logger();
697        info!("Starting test_shutdown_and_join");
698
699        // Create a TCP client with a mock server (that doesn't exist)
700        let client = AsyncTcpClient::new_with_config(Config {
701            max_retries: Some(2),                    // Limit retries for quicker test
702            retry_delay: Duration::from_millis(100), // Short delay for quicker test
703            read_buffer_capacity: 4096,
704            max_buffer_size: 10 * 1024 * 1024,
705            ping_interval: Duration::from_secs(30),
706            pong_timeout: Duration::from_secs(10),
707        });
708
709        // Subscribe to a non-existent server - this will keep retrying
710        let subscription_request = TcpSubscriptionRequest { subscribe: vec![] };
711        info!("Subscribing to non-existent server to test shutdown");
712
713        // We know this will fail to connect, but it starts the background task
714        let result = client.subscribe("127.0.0.1:1", subscription_request).await;
715        assert!(result.is_ok());
716        info!("Subscription started, checking active task");
717
718        // Check that we have an active task
719        assert!(client.has_active_task().unwrap());
720
721        // Wait a bit to let the reconnect logic run
722        info!("Waiting for reconnection attempts to begin");
723        sleep(Duration::from_millis(300)).await;
724
725        // Shutdown and join
726        info!("Shutting down client");
727        let result = client.shutdown_and_join().await;
728
729        // Should succeed
730        assert!(result.is_ok());
731        info!("Client shutdown successfully");
732
733        // Check we no longer have an active task
734        assert!(!client.has_active_task().unwrap());
735        info!("Test completed successfully");
736    }
737
738    #[tokio::test]
739    async fn test_multiple_subscribes() {
740        // Initialize logging for tests
741        init_test_logger();
742        info!("Starting test_multiple_subscribes");
743
744        // Create a TCP client
745        let client = AsyncTcpClient::new_with_config(Config {
746            max_retries: Some(1),                    // Limit retries for quicker test
747            retry_delay: Duration::from_millis(100), // Short delay for quicker test
748            read_buffer_capacity: 4096,
749            max_buffer_size: 10 * 1024 * 1024,
750            ping_interval: Duration::from_secs(30),
751            pong_timeout: Duration::from_secs(10),
752        });
753
754        // First subscription
755        let subscription_request1 = TcpSubscriptionRequest { subscribe: vec![] };
756        info!("Creating first subscription");
757        let result1 = client.subscribe("127.0.0.1:1", subscription_request1).await;
758        assert!(result1.is_ok());
759
760        // We should have an active task
761        assert!(client.has_active_task().unwrap());
762        info!("First subscription active");
763
764        // Wait a bit
765        sleep(Duration::from_millis(200)).await;
766
767        // Second subscription - should replace the first one
768        let subscription_request2 = TcpSubscriptionRequest { subscribe: vec![] };
769        info!("Creating second subscription (should replace the first)");
770        let result2 = client.subscribe("127.0.0.1:2", subscription_request2).await;
771        assert!(result2.is_ok());
772
773        // We should still have an active task
774        assert!(client.has_active_task().unwrap());
775        info!("Second subscription active");
776
777        // Shutdown and join
778        info!("Shutting down client");
779        let join_result = client.shutdown_and_join().await;
780        assert!(join_result.is_ok());
781
782        // We should no longer have an active task
783        assert!(!client.has_active_task().unwrap());
784        info!("Test completed successfully");
785    }
786
787    #[tokio::test]
788    async fn test_async_connection_status_transitions() {
789        // Initialize logging for tests
790        init_test_logger();
791        info!("Starting test_async_connection_status_transitions");
792
793        // Start a test server
794        info!("Starting test server");
795        let (server_handle, server_addr, shutdown_tx) = start_async_test_server().await;
796
797        // Create a client
798        let client = AsyncTcpClient::new_with_config(Config {
799            max_retries: Some(2),
800            retry_delay: Duration::from_millis(100),
801            read_buffer_capacity: 4096,
802            max_buffer_size: 10 * 1024 * 1024,
803            ping_interval: Duration::from_secs(30),
804            pong_timeout: Duration::from_secs(10),
805        });
806
807        // Initially disconnected
808        assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
809        info!("Initial status: {:?}", client.get_status());
810
811        // Subscribe to the server
812        let subscription_request = TcpSubscriptionRequest {
813            subscribe: vec![EventType::TransactionsAdded],
814        };
815
816        info!("Subscribing to test server at {}", server_addr);
817        let rx = client
818            .subscribe(
819                &format!("127.0.0.1:{}", server_addr.port()),
820                subscription_request,
821            )
822            .await
823            .unwrap();
824
825        // Give it time to connect
826        info!("Waiting for connection to establish");
827        sleep(Duration::from_millis(100)).await;
828
829        // Should be connected now
830        let status = client.get_status();
831        info!("Status after connection attempt: {:?}", status);
832        assert_eq!(status, ConnectionStatus::Connected);
833
834        // Shutdown the client
835        info!("Shutting down client");
836        client.shutdown_and_join().await.unwrap();
837
838        // Check the client is disconnected
839        let final_status = client.get_status();
840        info!("Final status: {:?}", final_status);
841        assert_eq!(final_status, ConnectionStatus::Disconnected);
842
843        // Shutdown the server
844        info!("Shutting down test server");
845        let _ = shutdown_tx.send(());
846        let _ = server_handle.await;
847        info!("Test completed successfully");
848    }
849
850    #[tokio::test]
851    async fn test_async_receive_events() {
852        // Initialize logging for tests
853        init_test_logger();
854        info!("Starting test_async_receive_events");
855
856        // Start a test server
857        info!("Starting test server");
858        let (server_handle, server_addr, shutdown_tx) = start_async_test_server().await;
859        info!("Test server started at {}", server_addr);
860
861        // Create a client
862        let client = AsyncTcpClient::new_with_config(Config {
863            max_retries: Some(2),
864            retry_delay: Duration::from_millis(100),
865            read_buffer_capacity: 4096,
866            max_buffer_size: 10 * 1024 * 1024,
867            ping_interval: Duration::from_secs(30),
868            pong_timeout: Duration::from_secs(10),
869        });
870
871        // Subscribe to the server
872        let subscription_request = TcpSubscriptionRequest {
873            subscribe: vec![EventType::TransactionsAdded],
874        };
875
876        info!("Subscribing to test server at {}", server_addr);
877        let mut rx = client
878            .subscribe(
879                &format!("127.0.0.1:{}", server_addr.port()),
880                subscription_request,
881            )
882            .await
883            .unwrap();
884
885        // Give it enough time to establish the connection
886        info!("Waiting for connection to establish");
887        sleep(Duration::from_millis(100)).await;
888
889        info!("Checking connection status: {:?}", client.get_status());
890        assert_eq!(
891            client.get_status(),
892            ConnectionStatus::Connected,
893            "Expected client to be connected, but status is {:?}",
894            client.get_status()
895        );
896
897        // Try to receive an event with timeout
898        info!("Waiting to receive an event");
899        let event = tokio::time::timeout(Duration::from_secs(5), rx.recv()).await;
900
901        if let Err(e) = &event {
902            error!("Timeout waiting for event: {}", e);
903            // Try to diagnose what happened
904            info!("Current client status: {:?}", client.get_status());
905            assert!(false, "Failed to receive event within timeout");
906        }
907
908        let event = event.unwrap();
909
910        if let None = &event {
911            error!("Received None from channel - sender was likely dropped");
912            info!("Current client status: {:?}", client.get_status());
913            assert!(false, "Expected Some(event) but got None");
914        }
915
916        match event.unwrap() {
917            Event::TransactionsAdded { txids } => {
918                info!("Received transaction event with {} txids", txids.len());
919                assert_eq!(txids.len(), 1, "Expected 1 txid in event");
920                assert_eq!(
921                    txids[0].to_string(),
922                    "2222222222222222222222222222222222222222222222222222222222222222"
923                );
924            }
925            other => {
926                error!("Received unexpected event type: {:?}", other);
927                panic!("Received unexpected event type: {:?}", other);
928            }
929        }
930
931        // Shutdown the client
932        info!("Shutting down client");
933        client.shutdown_and_join().await.unwrap();
934
935        // Shutdown the server
936        info!("Shutting down test server");
937        let _ = shutdown_tx.send(());
938        let _ = server_handle.await;
939        info!("Test completed successfully");
940    }
941
942    #[tokio::test]
943    async fn test_async_connection_error_handling() {
944        // Initialize logging for tests
945        init_test_logger();
946        info!("Starting test_async_connection_error_handling");
947
948        // Create a client with short timeout
949        let client = AsyncTcpClient::new_with_config(Config {
950            max_retries: Some(2),
951            retry_delay: Duration::from_millis(100),
952            read_buffer_capacity: 4096,
953            max_buffer_size: 10 * 1024 * 1024,
954            ping_interval: Duration::from_secs(30),
955            pong_timeout: Duration::from_secs(10),
956        });
957
958        // Initially disconnected
959        assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
960        info!("Initial status: {:?}", client.get_status());
961
962        // Try to connect to a non-existent server
963        let subscription_request = TcpSubscriptionRequest {
964            subscribe: vec![EventType::TransactionsAdded],
965        };
966
967        info!("Subscribing to non-existent server to test error handling");
968        let rx = client
969            .subscribe("127.0.0.1:1", subscription_request)
970            .await
971            .unwrap();
972
973        // Give it time to attempt connection and reconnection
974        info!("Waiting for connection attempts");
975        sleep(Duration::from_millis(500)).await;
976
977        // Should be in reconnecting state or disconnected if it already gave up
978        let status = client.get_status();
979        info!("Status after connection attempts: {:?}", status);
980        assert!(
981            status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
982            "Expected Reconnecting or Disconnected state, got {:?}",
983            status
984        );
985
986        // Shutdown the client
987        info!("Shutting down client");
988        client.shutdown_and_join().await.unwrap();
989
990        // Check the client is disconnected
991        let final_status = client.get_status();
992        info!("Final status: {:?}", final_status);
993        assert_eq!(final_status, ConnectionStatus::Disconnected);
994        info!("Test completed successfully");
995    }
996
997    #[tokio::test]
998    async fn test_async_shutdown_during_reconnect() {
999        // Initialize logging for tests
1000        init_test_logger();
1001        info!("Starting test_async_shutdown_during_reconnect");
1002
1003        // Create a client with many more retries and a longer delay to ensure we
1004        // have time to catch it in the reconnecting state
1005        let client = AsyncTcpClient::new_with_config(Config {
1006            max_retries: Some(100), // Many more retries so it won't finish quickly
1007            retry_delay: Duration::from_millis(500), // Longer delay between attempts
1008            read_buffer_capacity: 4096,
1009            max_buffer_size: 10 * 1024 * 1024,
1010            ping_interval: Duration::from_secs(30),
1011            pong_timeout: Duration::from_secs(10),
1012        });
1013
1014        // Subscribe to a non-existent server to trigger reconnection attempts
1015        let subscription_request = TcpSubscriptionRequest {
1016            subscribe: vec![EventType::TransactionsAdded],
1017        };
1018
1019        info!("Subscribing to non-existent server to trigger reconnection");
1020        let rx = client
1021            .subscribe("127.0.0.1:1", subscription_request)
1022            .await
1023            .unwrap();
1024
1025        // Give it just enough time for the first connection attempt to fail and enter reconnecting
1026        // This is much shorter than before to ensure we don't go through all retries
1027        info!("Waiting for client to enter reconnection state");
1028        sleep(Duration::from_millis(50)).await;
1029
1030        let status_before_shutdown = client.get_status();
1031        info!("Status before shutdown: {:?}", status_before_shutdown);
1032
1033        // If we're not in the reconnecting state yet, wait a little longer
1034        if status_before_shutdown != ConnectionStatus::Reconnecting {
1035            info!("Not in reconnecting state yet, waiting longer");
1036            sleep(Duration::from_millis(50)).await;
1037            let status_before_shutdown = client.get_status();
1038            info!("Status after additional wait: {:?}", status_before_shutdown);
1039        }
1040
1041        // Now assert - this should work because we're either catching it during the first
1042        // reconnection attempt or we've waited a bit longer
1043        assert_eq!(client.get_status(), ConnectionStatus::Reconnecting);
1044
1045        // Shutdown the client while it's reconnecting
1046        info!("Shutting down client during reconnection");
1047        client.shutdown();
1048
1049        // Status should be immediately set to Disconnected by the shutdown method
1050        let status_after_shutdown = client.get_status();
1051        info!(
1052            "Status immediately after shutdown: {:?}",
1053            status_after_shutdown
1054        );
1055        assert_eq!(status_after_shutdown, ConnectionStatus::Disconnected);
1056
1057        // Now try joining the task
1058        info!("Joining worker task");
1059        sleep(Duration::from_millis(50)).await;
1060        let result = client.join().await;
1061        info!("Join result: {:?}", result);
1062        assert!(
1063            result.is_ok(),
1064            "Failed to join task during reconnection: {:?}",
1065            result
1066        );
1067
1068        // Status should definitely be disconnected now
1069        let final_status = client.get_status();
1070        info!("Final status: {:?}", final_status);
1071        assert_eq!(final_status, ConnectionStatus::Disconnected);
1072        info!("Test completed successfully");
1073    }
1074
1075    #[tokio::test]
1076    async fn test_async_buffer_size_limit() {
1077        // Initialize logging for tests
1078        init_test_logger();
1079        info!("Starting test_async_buffer_size_limit");
1080
1081        // Create a special test server that sends oversized data
1082        info!("Starting oversized data test server");
1083        let listener = TokioTcpListener::bind("127.0.0.1:0").await.unwrap();
1084        let server_addr = listener.local_addr().unwrap();
1085        info!("Test server bound to {}", server_addr);
1086
1087        // Spawn server
1088        let server_handle = tokio::spawn(async move {
1089            info!("Test server waiting for connection");
1090            if let Ok((mut stream, client_addr)) = listener.accept().await {
1091                info!("Test server accepted connection from {}", client_addr);
1092                // Read the request
1093                let mut buf = [0; 1024];
1094                match stream.read(&mut buf).await {
1095                    Ok(n) => {
1096                        let request = String::from_utf8_lossy(&buf[..n]);
1097                        info!("Test server received request: {}", request);
1098
1099                        // Send a large response that exceeds the small buffer size
1100                        info!("Sending 100KB payload to test buffer limits");
1101
1102                        // Start with a valid JSON prefix - make sure it's valid Event format
1103                        let prefix = r#"{"type":"TransactionsAdded", "data": {"txids":["#;
1104                        stream.write_all(prefix.as_bytes()).await.unwrap();
1105
1106                        // Add the large payload inside the JSON
1107                        let large_payload = "x".repeat(100_000); // 100KB of 'x' characters
1108                        stream.write_all(large_payload.as_bytes()).await.unwrap();
1109
1110                        // Close the JSON properly
1111                        let suffix = r#"}]}}"#;
1112                        stream.write_all(suffix.as_bytes()).await.unwrap();
1113                        stream.write_all(b"\n").await.unwrap();
1114                        info!("Large payload sent");
1115
1116                        // Keep connection open for a bit
1117                        sleep(Duration::from_millis(100)).await;
1118                    }
1119                    Err(e) => {
1120                        error!("Test server read error: {}", e);
1121                    }
1122                }
1123            }
1124            info!("Test server shutting down");
1125        });
1126
1127        // Create a client with a very small buffer size
1128        info!("Creating client with small buffer size");
1129        let client = AsyncTcpClient::new_with_config(Config {
1130            max_retries: Some(1),
1131            retry_delay: Duration::from_millis(100),
1132            read_buffer_capacity: 1024, // 1KB initial capacity
1133            max_buffer_size: 10 * 1024, // Only 10KB max buffer size
1134            ping_interval: Duration::from_secs(30),
1135            pong_timeout: Duration::from_secs(10),
1136        });
1137
1138        // Subscribe to the server
1139        let subscription_request = TcpSubscriptionRequest {
1140            subscribe: vec![EventType::TransactionsAdded],
1141        };
1142
1143        info!("Subscribing to server with buffer size limit test");
1144        let _rx = client
1145            .subscribe(
1146                &format!("127.0.0.1:{}", server_addr.port()),
1147                subscription_request,
1148            )
1149            .await
1150            .unwrap();
1151
1152        // Give some time for the client to connect and process data
1153        info!("Waiting for buffer overflow to occur");
1154        sleep(Duration::from_millis(300)).await;
1155
1156        // The client should have disconnected due to buffer overflow
1157        let status = client.get_status();
1158        info!("Client status after buffer overflow test: {:?}", status);
1159        assert!(
1160            status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
1161            "Expected client to disconnect due to buffer overflow, but status is {:?}",
1162            status
1163        );
1164
1165        // Clean up
1166        info!("Cleaning up");
1167        client.shutdown_and_join().await.unwrap();
1168        let _ = server_handle.await;
1169        info!("Test completed successfully");
1170    }
1171}