titan_client/tcp/
tcp_client.rs

1use std::{
2    sync::{Arc, Mutex},
3    time::Duration,
4};
5
6use serde_json;
7use thiserror::Error;
8use titan_types_api::TcpSubscriptionRequest;
9use titan_types_core::Event;
10use tokio::{
11    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
12    net::TcpStream,
13    sync::{mpsc, watch},
14    task::JoinHandle,
15};
16use tracing::{debug, error, info, warn};
17
18use crate::tcp::{
19    connection_status::ConnectionStatus,
20    reconnection::{self, ReconnectionManager},
21};
22
23use super::connection_status::ConnectionStatusTracker;
24
25#[derive(Debug, Error)]
26pub enum TcpClientError {
27    #[error("io error: {0}")]
28    IOError(#[from] std::io::Error),
29    #[error("serde error: {0}")]
30    SerdeError(#[from] serde_json::Error),
31    #[error("join error: task panicked")]
32    JoinError,
33    #[error("lock error: failed to acquire lock")]
34    LockError,
35}
36
37/// Settings for reconnecting.
38#[derive(Debug, Clone)]
39pub struct Config {
40    /// Maximum number of reconnect attempts. Use `None` for unlimited retries.
41    pub max_retries: Option<u32>,
42    /// Delay between reconnect attempts.
43    pub retry_delay: Duration,
44    /// Initial capacity of the read buffer (in bytes)
45    pub read_buffer_capacity: usize,
46    /// Maximum allowed size for the read buffer (in bytes)
47    pub max_buffer_size: usize,
48    /// Interval between ping messages
49    pub ping_interval: Duration,
50    /// Timeout for waiting for pong responses
51    pub pong_timeout: Duration,
52}
53
54impl Default for Config {
55    fn default() -> Self {
56        Self {
57            max_retries: None,
58            retry_delay: Duration::from_secs(1), // Match the default used by ReconnectionConfig
59            read_buffer_capacity: 4096,          // 4KB initial capacity
60            max_buffer_size: 10 * 1024 * 1024,   // 10MB max buffer size (same as sync client)
61            ping_interval: Duration::from_secs(30), // Send ping every 30 seconds
62            pong_timeout: Duration::from_secs(10), // Wait 10 seconds for pong response
63        }
64    }
65}
66
67/// Shared shutdown channel that can be safely modified
68struct ShutdownChannel {
69    sender: watch::Sender<()>,
70    receiver: watch::Receiver<()>,
71}
72
73impl ShutdownChannel {
74    fn new() -> Self {
75        let (sender, receiver) = watch::channel(());
76        Self { sender, receiver }
77    }
78
79    fn get_receiver(&self) -> watch::Receiver<()> {
80        self.receiver.clone()
81    }
82
83    fn send(&self) -> Result<(), watch::error::SendError<()>> {
84        self.sender.send(())
85    }
86}
87
88/// Asynchronous TCP client that encapsulates the shutdown signal and reconnect settings.
89pub struct AsyncTcpClient {
90    shutdown_channel: Arc<Mutex<ShutdownChannel>>,
91    config: Config,
92    status_tracker: ConnectionStatusTracker,
93    worker_task: Mutex<Option<JoinHandle<()>>>,
94}
95
96impl AsyncTcpClient {
97    /// Creates a new instance with custom reconnect settings.
98    pub fn new_with_config(config: Config) -> Self {
99        Self {
100            shutdown_channel: Arc::new(Mutex::new(ShutdownChannel::new())),
101            config,
102            status_tracker: ConnectionStatusTracker::new(),
103            worker_task: Mutex::new(None),
104        }
105    }
106
107    /// Creates a new instance with default reconnect settings:
108    /// unlimited retries with a 1-second base delay that increases with exponential backoff.
109    pub fn new() -> Self {
110        Self::new_with_config(Config::default())
111    }
112
113    /// Get the current connection status
114    pub fn get_status(&self) -> ConnectionStatus {
115        self.status_tracker.get_status()
116    }
117
118    /// Get whether the client was disconnected at any point in time
119    pub fn create_status_subscriber(&self) -> mpsc::Receiver<ConnectionStatus> {
120        let (tx, rx) = mpsc::channel(100);
121        self.status_tracker.register_listener(tx);
122        rx
123    }
124
125    /// Checks if there is an active worker task.
126    ///
127    /// Returns true if a worker task is currently running.
128    pub fn has_active_task(&self) -> Result<bool, TcpClientError> {
129        match self.worker_task.lock() {
130            Ok(lock) => Ok(lock.is_some()),
131            Err(_) => {
132                error!("Failed to acquire worker task lock");
133                Err(TcpClientError::LockError)
134            }
135        }
136    }
137
138    /// Subscribes to the TCP server at `addr` with the given subscription request.
139    /// Returns a channel receiver for incoming events.
140    ///
141    /// This method includes reconnect logic with exponential backoff. If the connection is lost,
142    /// the client will automatically try to reconnect using the provided settings.
143    ///
144    /// If there's already an active subscription task, it will be shut down and a new one will be created.
145    pub async fn subscribe(
146        &self,
147        addr: &str,
148        subscription_request: TcpSubscriptionRequest,
149    ) -> Result<mpsc::Receiver<Event>, TcpClientError> {
150        info!("Subscribing to {}", addr);
151
152        // Check if we already have a worker task running
153        let mut worker_lock = self
154            .worker_task
155            .lock()
156            .map_err(|_| TcpClientError::LockError)?;
157
158        // If there's an existing task, shut it down and join it
159        if let Some(handle) = worker_lock.take() {
160            info!("Shutting down existing subscription task before starting a new one");
161            // Send shutdown signal
162            self.send_shutdown_signal()?;
163
164            // Create a new shutdown channel for the new task
165            let mut shutdown_guard = self
166                .shutdown_channel
167                .lock()
168                .map_err(|_| TcpClientError::LockError)?;
169            *shutdown_guard = ShutdownChannel::new();
170            drop(shutdown_guard); // Release the lock
171
172            // Attempt to join the existing task with a timeout
173            match tokio::time::timeout(Duration::from_secs(5), handle).await {
174                Ok(join_result) => match join_result {
175                    Ok(_) => info!("Successfully joined existing task"),
176                    Err(_) => error!("Error joining existing task: it panicked"),
177                },
178                Err(_) => {
179                    // Task didn't complete within timeout
180                    error!("Timed out waiting for existing task to complete, proceeding with new task anyway");
181                }
182            }
183        }
184
185        // Create a channel to forward events.
186        let (tx, rx) = mpsc::channel::<Event>(100);
187
188        // Clone the settings and shutdown receiver for the spawned task.
189        let reconnect_settings = self.config.clone();
190        let shutdown_receiver = {
191            let guard = self
192                .shutdown_channel
193                .lock()
194                .map_err(|_| TcpClientError::LockError)?;
195            guard.get_receiver()
196        };
197        let addr = addr.to_owned();
198        let subscription_request = subscription_request;
199        let status_tracker = self.status_tracker.clone();
200
201        // Set initial status to Connecting
202        status_tracker.update_status(ConnectionStatus::Connecting);
203
204        // Create the reconnection config from settings
205        let reconnection_config = reconnection::from_async_reconnect_settings(&reconnect_settings);
206
207        info!("Creating reconnection manager");
208
209        // Spawn a task to manage connection, reading, and reconnection.
210        let handle = tokio::spawn(async move {
211            // Create a status updater for use in the async task
212            let update_status = |new_status| status_tracker.update_status(new_status);
213
214            // Create the reconnection manager
215            let mut reconnection_manager = ReconnectionManager::new(reconnection_config);
216
217            // Use the shutdown receiver
218            let mut shutdown_rx = shutdown_receiver;
219
220            // Ping-pong monitoring
221            let ping_interval = reconnect_settings.ping_interval;
222            let pong_timeout = reconnect_settings.pong_timeout;
223            let mut last_pong_time = std::time::Instant::now();
224            let mut ping_timer = tokio::time::interval(ping_interval);
225            ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
226            let mut awaiting_pong = false;
227
228            loop {
229                // Before each connection attempt, check for a shutdown signal.
230                if shutdown_rx.has_changed().unwrap_or(false) {
231                    info!("Shutdown signal received. Exiting subscription task.");
232                    update_status(ConnectionStatus::Disconnected);
233                    break;
234                }
235                info!("Attempting to connect to {}", addr);
236                update_status(ConnectionStatus::Connecting);
237
238                match TcpStream::connect(&addr).await {
239                    Ok(stream) => {
240                        info!("Connected to {}.", addr);
241                        update_status(ConnectionStatus::Connected);
242
243                        // Reset reconnection attempts after successful connection
244                        reconnection_manager.reset();
245
246                        let (reader, mut writer) = stream.into_split();
247                        let mut reader = BufReader::new(reader);
248
249                        // Ping-pong monitoring
250                        let ping_interval = reconnect_settings.ping_interval;
251                        let pong_timeout = reconnect_settings.pong_timeout;
252                        let mut last_pong_time = std::time::Instant::now();
253                        let mut ping_timer = tokio::time::interval(ping_interval);
254                        ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
255                        let mut awaiting_pong = false;
256
257                        // Serialize and send the subscription request.
258                        match serde_json::to_string(&subscription_request) {
259                            Ok(req_json) => {
260                                if let Err(e) = writer.write_all(req_json.as_bytes()).await {
261                                    error!("Error sending subscription request: {}", e);
262                                    // Let the reconnect loop try again.
263                                    continue;
264                                }
265                                if let Err(e) = writer.write_all(b"\n").await {
266                                    error!("Error writing newline: {}", e);
267                                    continue;
268                                }
269                                if let Err(e) = writer.flush().await {
270                                    error!("Error flushing writer: {}", e);
271                                    continue;
272                                }
273                            }
274                            Err(e) => {
275                                error!("Error serializing subscription request: {}", e);
276                                update_status(ConnectionStatus::Disconnected);
277                                break;
278                            }
279                        }
280
281                        // Initialize the byte buffer with the configured capacity
282                        let mut byte_buf =
283                            Vec::with_capacity(reconnect_settings.read_buffer_capacity);
284                        // Read loop: continuously receive events.
285                        loop {
286                            // Check if the buffer has grown too large
287                            if byte_buf.capacity() > reconnect_settings.max_buffer_size {
288                                error!(
289                                    "Buffer capacity exceeded maximum allowed size ({}), resetting connection.",
290                                    reconnect_settings.max_buffer_size
291                                );
292                                break; // Break inner loop to trigger reconnect
293                            }
294
295                            tokio::select! {
296                                result = reader.read_until(b'\n', &mut byte_buf) => {
297                                    match result {
298                                        Ok(0) => {
299                                            // Connection closed by the server.
300                                            warn!("TCP connection closed by server.");
301                                            update_status(ConnectionStatus::Reconnecting);
302                                            break; // Break inner loop to trigger reconnect
303                                        }
304                                        Ok(n) if n > 0 => {
305                                            // We read n bytes, including the newline.
306                                            // Process the received bytes.
307                                            // Note: read_until includes the delimiter in the buffer.
308                                            // Trim whitespace and the trailing newline before processing.
309                                            let message_bytes = byte_buf.trim_ascii_end(); // Efficient trim for ASCII/UTF8 whitespace + newline
310
311                                            if !message_bytes.is_empty() {
312                                                // Check if this is a pong response
313                                                if message_bytes == b"PONG" {
314                                                    if awaiting_pong {
315                                                        awaiting_pong = false;
316                                                        last_pong_time = std::time::Instant::now();
317                                                        debug!("Received PONG");
318                                                    } else {
319                                                        warn!("Received unexpected PONG");
320                                                    }
321                                                } else {
322                                                    // Check if message size exceeds limit *before* parsing JSON
323                                                    if message_bytes.len() > reconnect_settings.max_buffer_size {
324                                                        error!(
325                                                            "Received message exceeds maximum allowed size ({}), skipping. Message starts with: {:?}",
326                                                            reconnect_settings.max_buffer_size,
327                                                            String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 50)]) // Log first 50 bytes
328                                                        );
329                                                        // Note: We don't break here, just clear the buffer and continue reading the next message.
330                                                    } else {
331                                                        // Try to parse as an event from the byte slice
332                                                        match serde_json::from_slice::<Event>(message_bytes) {
333                                                            Ok(event) => {
334                                                                // Every successful message resets pong timer
335                                                                last_pong_time = std::time::Instant::now();
336                                                                awaiting_pong = false; // Also reset awaiting_pong if we received a valid event
337                                                                if let Err(e) = tx.send(event).await {
338                                                                    error!("Failed to send event to channel: {}", e);
339                                                                    update_status(ConnectionStatus::Disconnected); // Channel broken, can't continue
340                                                                    return; // Exit the task
341                                                                }
342                                                            }
343                                                            Err(e) => {
344                                                                error!(
345                                                                    "Failed to parse event: {}. Raw data (first 100 bytes): {:?}",
346                                                                    e,
347                                                                    String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 100)])
348                                                                );
349                                                                update_status(ConnectionStatus::Reconnecting);
350                                                                break;
351                                                            }
352                                                        }
353                                                    }
354                                                }
355                                            }
356                                            // Clear the buffer for the next message AFTER processing the current one
357                                            byte_buf.clear();
358                                        }
359                                        Ok(_) => {
360                                            // n == 0, should be handled by Ok(0) case, but safety belt
361                                            byte_buf.clear();
362                                        }
363                                        Err(e) => {
364                                            error!("Error reading from TCP socket using read_until: {}", e);
365                                            update_status(ConnectionStatus::Reconnecting);
366                                            break; // Break inner loop to trigger reconnect
367                                        }
368                                    }
369                                }
370                                _ = ping_timer.tick() => {
371                                    // Time to send a ping
372                                    if awaiting_pong {
373                                        // We're still waiting for a pong from the previous ping
374                                        let elapsed = last_pong_time.elapsed();
375                                        if elapsed > pong_timeout {
376                                            warn!("Pong response timed out after {:?}, considering connection dead", elapsed);
377                                            update_status(ConnectionStatus::Reconnecting);
378                                            break;
379                                        }
380                                    } else {
381                                        // Send a ping
382                                        match writer.write_all(b"PING\n").await {
383                                            Ok(_) => {
384                                                if let Err(e) = writer.flush().await {
385                                                    error!("Failed to flush ping: {}", e);
386                                                    update_status(ConnectionStatus::Reconnecting);
387                                                    break;
388                                                }
389                                                awaiting_pong = true;
390                                            }
391                                            Err(e) => {
392                                                error!("Failed to send ping: {}", e);
393                                                update_status(ConnectionStatus::Reconnecting);
394                                                break;
395                                            }
396                                        }
397                                    }
398                                }
399                                _ = shutdown_rx.changed() => {
400                                    info!("Shutdown signal received. Exiting TCP subscription task.");
401                                    update_status(ConnectionStatus::Disconnected);
402                                    return;
403                                }
404                            }
405                        }
406                        // When the inner loop ends (e.g. connection lost), try to reconnect.
407                        info!("Lost connection. Preparing to reconnect...");
408                        update_status(ConnectionStatus::Reconnecting);
409                    }
410                    Err(e) => {
411                        error!("Failed to connect to {}: {}. Will retry...", addr, e);
412                        update_status(ConnectionStatus::Reconnecting);
413                    }
414                }
415
416                // Get the next delay from the reconnection manager
417                match reconnection_manager.next_delay() {
418                    Some(wait_time) => {
419                        info!(
420                            "Reconnecting in {:?}... (attempt {}/{:?})",
421                            wait_time,
422                            reconnection_manager.current_attempt(),
423                            reconnection_manager.config().max_attempts
424                        );
425                        tokio::time::sleep(wait_time).await;
426                    }
427                    None => {
428                        error!(
429                            "Reached maximum reconnection attempts ({}). Exiting subscription task.",
430                            reconnection_manager.config().max_attempts.unwrap_or(0)
431                        );
432                        update_status(ConnectionStatus::Disconnected);
433                        break;
434                    }
435                }
436            }
437
438            info!("Exiting TCP subscription task.");
439            update_status(ConnectionStatus::Disconnected);
440        });
441
442        // Store the task handle
443        *worker_lock = Some(handle);
444
445        Ok(rx)
446    }
447
448    /// Helper method to send a shutdown signal
449    fn send_shutdown_signal(&self) -> Result<(), TcpClientError> {
450        let channel = match self.shutdown_channel.lock() {
451            Ok(channel) => channel,
452            Err(_) => {
453                error!("Failed to acquire shutdown channel lock");
454                return Err(TcpClientError::LockError);
455            }
456        };
457
458        if let Err(e) = channel.send() {
459            error!("Failed to send shutdown signal: {:?}", e);
460            return Err(TcpClientError::IOError(std::io::Error::new(
461                std::io::ErrorKind::Other,
462                "Failed to send shutdown signal",
463            )));
464        }
465
466        Ok(())
467    }
468
469    /// Signals the client to shut down by sending a signal through the watch channel.
470    /// Does not wait for the worker task to complete.
471    pub fn shutdown(&self) {
472        // First directly update the status locally to ensure it's set immediately
473        self.status_tracker
474            .update_status(ConnectionStatus::Disconnected);
475
476        // Then send the shutdown signal to the worker task
477        if let Err(e) = self.send_shutdown_signal() {
478            error!("Error in shutdown: {:?}", e);
479        }
480    }
481
482    /// Signals the client to shut down and waits for the worker task to complete.
483    /// Returns true if the task was successfully joined, false otherwise.
484    pub async fn shutdown_and_join(&self) -> Result<(), TcpClientError> {
485        // Signal shutdown
486        self.shutdown();
487
488        // Try to join the task
489        self.join().await
490    }
491
492    /// Waits for the worker task to complete.
493    /// Returns Ok(()) if the task was successfully joined, or an error if joining failed.
494    pub async fn join(&self) -> Result<(), TcpClientError> {
495        // Acquire the lock on the worker task
496        let mut worker_lock = self
497            .worker_task
498            .lock()
499            .map_err(|_| TcpClientError::LockError)?;
500
501        // Take the task handle out (replacing it with None)
502        if let Some(handle) = worker_lock.take() {
503            match handle.await {
504                Ok(_) => {
505                    info!("Successfully joined worker task");
506                    Ok(())
507                }
508                Err(_) => {
509                    error!("Failed to join worker task due to panic");
510                    Err(TcpClientError::JoinError)
511                }
512            }
513        } else {
514            // No task to join
515            info!("No worker task to join");
516            Ok(())
517        }
518    }
519}
520
521impl Drop for AsyncTcpClient {
522    fn drop(&mut self) {
523        // Signal task to terminate
524        self.shutdown();
525
526        // We can't await in drop, so just log that the task will continue to run until it checks the shutdown signal
527        info!("AsyncTcpClient dropped, task will continue running until shutdown completes");
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use std::net::{SocketAddr, TcpListener};
535    use std::sync::Arc;
536    use std::sync::Once;
537    use titan_types_core::EventType;
538    use tokio::io::{AsyncReadExt, AsyncWriteExt};
539    use tokio::net::TcpListener as TokioTcpListener;
540    use tokio::select;
541    use tokio::signal::unix::{signal, SignalKind};
542    use tokio::sync::oneshot;
543    use tokio::task::JoinHandle;
544    use tokio::time::sleep;
545    use tracing_subscriber::{self, EnvFilter};
546
547    // Initialize the logger once for all tests
548    static INIT: Once = Once::new();
549
550    fn init_test_logger() {
551        INIT.call_once(|| {
552            // Initialize a subscriber that prints all logs to stderr
553            let filter =
554                EnvFilter::from_default_env().add_directive("titan_client=trace".parse().unwrap());
555
556            tracing_subscriber::fmt()
557                .with_env_filter(filter)
558                .with_test_writer() // This ensures logs go to the test output
559                .init();
560
561            println!("Test logger initialized at TRACE level");
562        });
563    }
564
565    // Start an async test server
566    async fn start_async_test_server() -> (JoinHandle<()>, SocketAddr, oneshot::Sender<()>) {
567        // Initialize the logger for testing
568        init_test_logger();
569
570        // Create a shutdown channel
571        let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
572
573        // Create the server
574        let listener = TokioTcpListener::bind("127.0.0.1:0").await.unwrap();
575        let addr = listener.local_addr().unwrap();
576
577        // Log that the server is starting
578        info!("Test server starting on {}", addr);
579
580        // Spawn the server task
581        let handle = tokio::spawn(async move {
582            loop {
583                tokio::select! {
584                    accept_result = listener.accept() => {
585                        match accept_result {
586                            Ok((mut stream, client_addr)) => {
587                                info!("Test server accepted connection from {}", client_addr);
588
589                                // Handle the connection in a new task
590                                tokio::spawn(async move {
591                                    let (reader, mut writer) = stream.split();
592                                    let mut reader = BufReader::new(reader);
593                                    let mut request_buf = Vec::new();
594
595                                    // Read the request line
596                                    match reader.read_until(b'\n', &mut request_buf).await {
597                                        Ok(n) if n > 0 => {
598                                            let request_bytes = request_buf.trim_ascii_end();
599                                            info!("Test server received request: {}", String::from_utf8_lossy(request_bytes));
600
601                                            // Send back a test event with a small delay
602                                            sleep(Duration::from_millis(10)).await;
603
604                                            let event_json = r#"{"type":"TransactionsAdded","data": { "txids":["2222222222222222222222222222222222222222222222222222222222222222"]}}"#;
605                                            if let Err(e) = writer.write_all(event_json.as_bytes()).await {
606                                                 error!("Test server failed to write event: {}", e); return;
607                                            }
608                                            if let Err(e) = writer.write_all(b"\n").await {
609                                                error!("Test server failed to write newline: {}", e); return;
610                                            }
611                                            if let Err(e) = writer.flush().await {
612                                                 error!("Test server failed to flush: {}", e); return;
613                                            }
614                                            info!("Test server sent event and newline");
615
616                                            // Handle PING/PONG
617                                            let mut line_buf = Vec::new();
618                                            loop {
619                                                 // Clear buffer for next read
620                                                line_buf.clear();
621                                                tokio::select! {
622                                                    read_res = reader.read_until(b'\n', &mut line_buf) => {
623                                                        match read_res {
624                                                            Ok(0) => {
625                                                                info!("Test server: Client disconnected");
626                                                                break; // Exit loop on disconnect
627                                                            }
628                                                            Ok(m) if m > 0 => {
629                                                                let trimmed_line = line_buf.trim_ascii_end();
630                                                                if trimmed_line == b"PING" {
631                                                                    info!("Test server received PING");
632                                                                    if let Err(e) = writer.write_all(b"PONG\n").await {
633                                                                        error!("Test server failed to send PONG: {}", e);
634                                                                        break; // Exit loop on write error
635                                                                    }
636                                                                    if let Err(e) = writer.flush().await {
637                                                                         error!("Test server failed to flush PONG: {}", e);
638                                                                         break; // Exit loop on flush error
639                                                                    }
640                                                                    info!("Test server sent PONG");
641                                                                } else if !trimmed_line.is_empty() {
642                                                                    // Log unexpected input
643                                                                    info!("Test server received unexpected data: {}", String::from_utf8_lossy(trimmed_line));
644                                                                }
645                                                            }
646                                                            Ok(_) => {
647                                                                // Should not happen with Ok(0) case above
648                                                            }
649                                                            Err(e) => {
650                                                                error!("Test server read error in PING loop: {}", e);
651                                                                break; // Exit loop on read error
652                                                            }
653                                                        }
654                                                    }
655                                                    // Add a timeout or shutdown mechanism here if needed for the test server's loop
656                                                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
657                                                        // Periodically check or just idle
658                                                    }
659                                                }
660                                            }
661                                        },
662                                        Ok(0) => {
663                                            info!("Test server received empty read, client closed connection early");
664                                        },
665                                        Ok(n) => {
666                                             info!("Test server received {} bytes, but not processing as initial request", n);
667                                        },
668                                        Err(e) => error!("Test server initial read error: {}", e),
669                                    }
670                                });
671                            },
672                            Err(e) => {
673                                error!("Test server accept error: {}", e);
674                                // Add a short delay to prevent tight loop on accept errors
675                                sleep(Duration::from_millis(10)).await;
676                            }
677                        }
678                    },
679                    _ = &mut shutdown_rx => {
680                        info!("Test server received shutdown signal");
681                        break;
682                    }
683                }
684            }
685            info!("Test server shutting down");
686        });
687
688        // Small delay to ensure server is ready
689        sleep(Duration::from_millis(10)).await;
690        info!("Test server setup complete, returning handle");
691
692        (handle, addr, shutdown_tx)
693    }
694
695    #[tokio::test]
696    async fn test_shutdown_and_join() {
697        // Initialize logging for tests
698        init_test_logger();
699        info!("Starting test_shutdown_and_join");
700
701        // Create a TCP client with a mock server (that doesn't exist)
702        let client = AsyncTcpClient::new_with_config(Config {
703            max_retries: Some(2),                    // Limit retries for quicker test
704            retry_delay: Duration::from_millis(100), // Short delay for quicker test
705            read_buffer_capacity: 4096,
706            max_buffer_size: 10 * 1024 * 1024,
707            ping_interval: Duration::from_secs(30),
708            pong_timeout: Duration::from_secs(10),
709        });
710
711        // Subscribe to a non-existent server - this will keep retrying
712        let subscription_request = TcpSubscriptionRequest { subscribe: vec![] };
713        info!("Subscribing to non-existent server to test shutdown");
714
715        // We know this will fail to connect, but it starts the background task
716        let result = client.subscribe("127.0.0.1:1", subscription_request).await;
717        assert!(result.is_ok());
718        info!("Subscription started, checking active task");
719
720        // Check that we have an active task
721        assert!(client.has_active_task().unwrap());
722
723        // Wait a bit to let the reconnect logic run
724        info!("Waiting for reconnection attempts to begin");
725        sleep(Duration::from_millis(300)).await;
726
727        // Shutdown and join
728        info!("Shutting down client");
729        let result = client.shutdown_and_join().await;
730
731        // Should succeed
732        assert!(result.is_ok());
733        info!("Client shutdown successfully");
734
735        // Check we no longer have an active task
736        assert!(!client.has_active_task().unwrap());
737        info!("Test completed successfully");
738    }
739
740    #[tokio::test]
741    async fn test_multiple_subscribes() {
742        // Initialize logging for tests
743        init_test_logger();
744        info!("Starting test_multiple_subscribes");
745
746        // Create a TCP client
747        let client = AsyncTcpClient::new_with_config(Config {
748            max_retries: Some(1),                    // Limit retries for quicker test
749            retry_delay: Duration::from_millis(100), // Short delay for quicker test
750            read_buffer_capacity: 4096,
751            max_buffer_size: 10 * 1024 * 1024,
752            ping_interval: Duration::from_secs(30),
753            pong_timeout: Duration::from_secs(10),
754        });
755
756        // First subscription
757        let subscription_request1 = TcpSubscriptionRequest { subscribe: vec![] };
758        info!("Creating first subscription");
759        let result1 = client.subscribe("127.0.0.1:1", subscription_request1).await;
760        assert!(result1.is_ok());
761
762        // We should have an active task
763        assert!(client.has_active_task().unwrap());
764        info!("First subscription active");
765
766        // Wait a bit
767        sleep(Duration::from_millis(200)).await;
768
769        // Second subscription - should replace the first one
770        let subscription_request2 = TcpSubscriptionRequest { subscribe: vec![] };
771        info!("Creating second subscription (should replace the first)");
772        let result2 = client.subscribe("127.0.0.1:2", subscription_request2).await;
773        assert!(result2.is_ok());
774
775        // We should still have an active task
776        assert!(client.has_active_task().unwrap());
777        info!("Second subscription active");
778
779        // Shutdown and join
780        info!("Shutting down client");
781        let join_result = client.shutdown_and_join().await;
782        assert!(join_result.is_ok());
783
784        // We should no longer have an active task
785        assert!(!client.has_active_task().unwrap());
786        info!("Test completed successfully");
787    }
788
789    #[tokio::test]
790    async fn test_async_connection_status_transitions() {
791        // Initialize logging for tests
792        init_test_logger();
793        info!("Starting test_async_connection_status_transitions");
794
795        // Start a test server
796        info!("Starting test server");
797        let (server_handle, server_addr, shutdown_tx) = start_async_test_server().await;
798
799        // Create a client
800        let client = AsyncTcpClient::new_with_config(Config {
801            max_retries: Some(2),
802            retry_delay: Duration::from_millis(100),
803            read_buffer_capacity: 4096,
804            max_buffer_size: 10 * 1024 * 1024,
805            ping_interval: Duration::from_secs(30),
806            pong_timeout: Duration::from_secs(10),
807        });
808
809        // Initially disconnected
810        assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
811        info!("Initial status: {:?}", client.get_status());
812
813        // Subscribe to the server
814        let subscription_request = TcpSubscriptionRequest {
815            subscribe: vec![EventType::TransactionsAdded],
816        };
817
818        info!("Subscribing to test server at {}", server_addr);
819        let rx = client
820            .subscribe(
821                &format!("127.0.0.1:{}", server_addr.port()),
822                subscription_request,
823            )
824            .await
825            .unwrap();
826
827        // Give it time to connect
828        info!("Waiting for connection to establish");
829        sleep(Duration::from_millis(100)).await;
830
831        // Should be connected now
832        let status = client.get_status();
833        info!("Status after connection attempt: {:?}", status);
834        assert_eq!(status, ConnectionStatus::Connected);
835
836        // Shutdown the client
837        info!("Shutting down client");
838        client.shutdown_and_join().await.unwrap();
839
840        // Check the client is disconnected
841        let final_status = client.get_status();
842        info!("Final status: {:?}", final_status);
843        assert_eq!(final_status, ConnectionStatus::Disconnected);
844
845        // Shutdown the server
846        info!("Shutting down test server");
847        let _ = shutdown_tx.send(());
848        let _ = server_handle.await;
849        info!("Test completed successfully");
850    }
851
852    #[tokio::test]
853    async fn test_async_receive_events() {
854        // Initialize logging for tests
855        init_test_logger();
856        info!("Starting test_async_receive_events");
857
858        // Start a test server
859        info!("Starting test server");
860        let (server_handle, server_addr, shutdown_tx) = start_async_test_server().await;
861        info!("Test server started at {}", server_addr);
862
863        // Create a client
864        let client = AsyncTcpClient::new_with_config(Config {
865            max_retries: Some(2),
866            retry_delay: Duration::from_millis(100),
867            read_buffer_capacity: 4096,
868            max_buffer_size: 10 * 1024 * 1024,
869            ping_interval: Duration::from_secs(30),
870            pong_timeout: Duration::from_secs(10),
871        });
872
873        // Subscribe to the server
874        let subscription_request = TcpSubscriptionRequest {
875            subscribe: vec![EventType::TransactionsAdded],
876        };
877
878        info!("Subscribing to test server at {}", server_addr);
879        let mut rx = client
880            .subscribe(
881                &format!("127.0.0.1:{}", server_addr.port()),
882                subscription_request,
883            )
884            .await
885            .unwrap();
886
887        // Give it enough time to establish the connection
888        info!("Waiting for connection to establish");
889        sleep(Duration::from_millis(100)).await;
890
891        info!("Checking connection status: {:?}", client.get_status());
892        assert_eq!(
893            client.get_status(),
894            ConnectionStatus::Connected,
895            "Expected client to be connected, but status is {:?}",
896            client.get_status()
897        );
898
899        // Try to receive an event with timeout
900        info!("Waiting to receive an event");
901        let event = tokio::time::timeout(Duration::from_secs(5), rx.recv()).await;
902
903        if let Err(e) = &event {
904            error!("Timeout waiting for event: {}", e);
905            // Try to diagnose what happened
906            info!("Current client status: {:?}", client.get_status());
907            assert!(false, "Failed to receive event within timeout");
908        }
909
910        let event = event.unwrap();
911
912        if let None = &event {
913            error!("Received None from channel - sender was likely dropped");
914            info!("Current client status: {:?}", client.get_status());
915            assert!(false, "Expected Some(event) but got None");
916        }
917
918        match event.unwrap() {
919            Event::TransactionsAdded { txids } => {
920                info!("Received transaction event with {} txids", txids.len());
921                assert_eq!(txids.len(), 1, "Expected 1 txid in event");
922                assert_eq!(
923                    txids[0].to_string(),
924                    "2222222222222222222222222222222222222222222222222222222222222222"
925                );
926            }
927            other => {
928                error!("Received unexpected event type: {:?}", other);
929                panic!("Received unexpected event type: {:?}", other);
930            }
931        }
932
933        // Shutdown the client
934        info!("Shutting down client");
935        client.shutdown_and_join().await.unwrap();
936
937        // Shutdown the server
938        info!("Shutting down test server");
939        let _ = shutdown_tx.send(());
940        let _ = server_handle.await;
941        info!("Test completed successfully");
942    }
943
944    #[tokio::test]
945    async fn test_async_connection_error_handling() {
946        // Initialize logging for tests
947        init_test_logger();
948        info!("Starting test_async_connection_error_handling");
949
950        // Create a client with short timeout
951        let client = AsyncTcpClient::new_with_config(Config {
952            max_retries: Some(2),
953            retry_delay: Duration::from_millis(100),
954            read_buffer_capacity: 4096,
955            max_buffer_size: 10 * 1024 * 1024,
956            ping_interval: Duration::from_secs(30),
957            pong_timeout: Duration::from_secs(10),
958        });
959
960        // Initially disconnected
961        assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
962        info!("Initial status: {:?}", client.get_status());
963
964        // Try to connect to a non-existent server
965        let subscription_request = TcpSubscriptionRequest {
966            subscribe: vec![EventType::TransactionsAdded],
967        };
968
969        info!("Subscribing to non-existent server to test error handling");
970        let rx = client
971            .subscribe("127.0.0.1:1", subscription_request)
972            .await
973            .unwrap();
974
975        // Give it time to attempt connection and reconnection
976        info!("Waiting for connection attempts");
977        sleep(Duration::from_millis(500)).await;
978
979        // Should be in reconnecting state or disconnected if it already gave up
980        let status = client.get_status();
981        info!("Status after connection attempts: {:?}", status);
982        assert!(
983            status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
984            "Expected Reconnecting or Disconnected state, got {:?}",
985            status
986        );
987
988        // Shutdown the client
989        info!("Shutting down client");
990        client.shutdown_and_join().await.unwrap();
991
992        // Check the client is disconnected
993        let final_status = client.get_status();
994        info!("Final status: {:?}", final_status);
995        assert_eq!(final_status, ConnectionStatus::Disconnected);
996        info!("Test completed successfully");
997    }
998
999    #[tokio::test]
1000    async fn test_async_shutdown_during_reconnect() {
1001        // Initialize logging for tests
1002        init_test_logger();
1003        info!("Starting test_async_shutdown_during_reconnect");
1004
1005        // Create a client with many more retries and a longer delay to ensure we
1006        // have time to catch it in the reconnecting state
1007        let client = AsyncTcpClient::new_with_config(Config {
1008            max_retries: Some(100), // Many more retries so it won't finish quickly
1009            retry_delay: Duration::from_millis(500), // Longer delay between attempts
1010            read_buffer_capacity: 4096,
1011            max_buffer_size: 10 * 1024 * 1024,
1012            ping_interval: Duration::from_secs(30),
1013            pong_timeout: Duration::from_secs(10),
1014        });
1015
1016        // Subscribe to a non-existent server to trigger reconnection attempts
1017        let subscription_request = TcpSubscriptionRequest {
1018            subscribe: vec![EventType::TransactionsAdded],
1019        };
1020
1021        info!("Subscribing to non-existent server to trigger reconnection");
1022        let rx = client
1023            .subscribe("127.0.0.1:1", subscription_request)
1024            .await
1025            .unwrap();
1026
1027        // Give it just enough time for the first connection attempt to fail and enter reconnecting
1028        // This is much shorter than before to ensure we don't go through all retries
1029        info!("Waiting for client to enter reconnection state");
1030        sleep(Duration::from_millis(50)).await;
1031
1032        let status_before_shutdown = client.get_status();
1033        info!("Status before shutdown: {:?}", status_before_shutdown);
1034
1035        // If we're not in the reconnecting state yet, wait a little longer
1036        if status_before_shutdown != ConnectionStatus::Reconnecting {
1037            info!("Not in reconnecting state yet, waiting longer");
1038            sleep(Duration::from_millis(50)).await;
1039            let status_before_shutdown = client.get_status();
1040            info!("Status after additional wait: {:?}", status_before_shutdown);
1041        }
1042
1043        // Now assert - this should work because we're either catching it during the first
1044        // reconnection attempt or we've waited a bit longer
1045        assert_eq!(client.get_status(), ConnectionStatus::Reconnecting);
1046
1047        // Shutdown the client while it's reconnecting
1048        info!("Shutting down client during reconnection");
1049        client.shutdown();
1050
1051        // Status should be immediately set to Disconnected by the shutdown method
1052        let status_after_shutdown = client.get_status();
1053        info!(
1054            "Status immediately after shutdown: {:?}",
1055            status_after_shutdown
1056        );
1057        assert_eq!(status_after_shutdown, ConnectionStatus::Disconnected);
1058
1059        // Now try joining the task
1060        info!("Joining worker task");
1061        sleep(Duration::from_millis(50)).await;
1062        let result = client.join().await;
1063        info!("Join result: {:?}", result);
1064        assert!(
1065            result.is_ok(),
1066            "Failed to join task during reconnection: {:?}",
1067            result
1068        );
1069
1070        // Status should definitely be disconnected now
1071        let final_status = client.get_status();
1072        info!("Final status: {:?}", final_status);
1073        assert_eq!(final_status, ConnectionStatus::Disconnected);
1074        info!("Test completed successfully");
1075    }
1076
1077    #[tokio::test]
1078    async fn test_async_buffer_size_limit() {
1079        // Initialize logging for tests
1080        init_test_logger();
1081        info!("Starting test_async_buffer_size_limit");
1082
1083        // Create a special test server that sends oversized data
1084        info!("Starting oversized data test server");
1085        let listener = TokioTcpListener::bind("127.0.0.1:0").await.unwrap();
1086        let server_addr = listener.local_addr().unwrap();
1087        info!("Test server bound to {}", server_addr);
1088
1089        // Spawn server
1090        let server_handle = tokio::spawn(async move {
1091            info!("Test server waiting for connection");
1092            if let Ok((mut stream, client_addr)) = listener.accept().await {
1093                info!("Test server accepted connection from {}", client_addr);
1094                // Read the request
1095                let mut buf = [0; 1024];
1096                match stream.read(&mut buf).await {
1097                    Ok(n) => {
1098                        let request = String::from_utf8_lossy(&buf[..n]);
1099                        info!("Test server received request: {}", request);
1100
1101                        // Send a large response that exceeds the small buffer size
1102                        info!("Sending 100KB payload to test buffer limits");
1103
1104                        // Start with a valid JSON prefix - make sure it's valid Event format
1105                        let prefix = r#"{"type":"TransactionsAdded", "data": {"txids":["#;
1106                        stream.write_all(prefix.as_bytes()).await.unwrap();
1107
1108                        // Add the large payload inside the JSON
1109                        let large_payload = "x".repeat(100_000); // 100KB of 'x' characters
1110                        stream.write_all(large_payload.as_bytes()).await.unwrap();
1111
1112                        // Close the JSON properly
1113                        let suffix = r#"}]}}"#;
1114                        stream.write_all(suffix.as_bytes()).await.unwrap();
1115                        stream.write_all(b"\n").await.unwrap();
1116                        info!("Large payload sent");
1117
1118                        // Keep connection open for a bit
1119                        sleep(Duration::from_millis(100)).await;
1120                    }
1121                    Err(e) => {
1122                        error!("Test server read error: {}", e);
1123                    }
1124                }
1125            }
1126            info!("Test server shutting down");
1127        });
1128
1129        // Create a client with a very small buffer size
1130        info!("Creating client with small buffer size");
1131        let client = AsyncTcpClient::new_with_config(Config {
1132            max_retries: Some(1),
1133            retry_delay: Duration::from_millis(100),
1134            read_buffer_capacity: 1024, // 1KB initial capacity
1135            max_buffer_size: 10 * 1024, // Only 10KB max buffer size
1136            ping_interval: Duration::from_secs(30),
1137            pong_timeout: Duration::from_secs(10),
1138        });
1139
1140        // Subscribe to the server
1141        let subscription_request = TcpSubscriptionRequest {
1142            subscribe: vec![EventType::TransactionsAdded],
1143        };
1144
1145        info!("Subscribing to server with buffer size limit test");
1146        let _rx = client
1147            .subscribe(
1148                &format!("127.0.0.1:{}", server_addr.port()),
1149                subscription_request,
1150            )
1151            .await
1152            .unwrap();
1153
1154        // Give some time for the client to connect and process data
1155        info!("Waiting for buffer overflow to occur");
1156        sleep(Duration::from_millis(300)).await;
1157
1158        // The client should have disconnected due to buffer overflow
1159        let status = client.get_status();
1160        info!("Client status after buffer overflow test: {:?}", status);
1161        assert!(
1162            status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
1163            "Expected client to disconnect due to buffer overflow, but status is {:?}",
1164            status
1165        );
1166
1167        // Clean up
1168        info!("Cleaning up");
1169        client.shutdown_and_join().await.unwrap();
1170        let _ = server_handle.await;
1171        info!("Test completed successfully");
1172    }
1173}