titan_client/tcp/
tcp_client_blocking.rs

1use std::{
2    io::{BufRead, BufReader, Write},
3    net::{TcpStream, ToSocketAddrs},
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        mpsc, Arc, Mutex,
7    },
8    thread::{self, JoinHandle},
9    time::Duration,
10};
11
12use serde_json;
13use thiserror::Error;
14use titan_types_api::TcpSubscriptionRequest;
15use titan_types_core::Event;
16use tracing::{debug, error, info, warn};
17
18use crate::tcp::reconnection::ReconnectionManager;
19
20use super::{
21    connection_status::{ConnectionStatus, ConnectionStatusTracker},
22    reconnection,
23};
24
25#[derive(Debug, Error)]
26pub enum TcpClientError {
27    #[error("io error: {0}")]
28    IOError(#[from] std::io::Error),
29    #[error("serde error: {0}")]
30    SerdeError(#[from] serde_json::Error),
31    #[error("address parse error: {0}")]
32    AddrParseError(String),
33}
34/// Configuration for TCP client reconnection.
35#[derive(Debug, Clone)]
36pub struct TcpClientConfig {
37    /// Base duration for reconnect interval (will be used with exponential backoff)
38    pub base_reconnect_interval: Duration,
39    /// Maximum reconnect interval (cap for exponential backoff)
40    pub max_reconnect_interval: Duration,
41    /// Maximum number of reconnection attempts.
42    /// Use `None` for unlimited attempts.
43    pub max_reconnect_attempts: Option<u32>,
44    /// Connection timeout.
45    pub connection_timeout: Duration,
46    /// Initial capacity of the read buffer (in bytes)
47    pub read_buffer_capacity: usize,
48    /// Maximum allowed size for the read buffer (in bytes)
49    pub max_buffer_size: usize,
50    /// Interval between ping messages
51    pub ping_interval: Duration,
52    /// Timeout for waiting for pong responses
53    pub pong_timeout: Duration,
54}
55
56impl Default for TcpClientConfig {
57    fn default() -> Self {
58        TcpClientConfig {
59            base_reconnect_interval: Duration::from_secs(1),
60            max_reconnect_interval: Duration::from_secs(60),
61            max_reconnect_attempts: None,
62            connection_timeout: Duration::from_secs(30),
63            read_buffer_capacity: 4096,             // 4KB initial capacity
64            max_buffer_size: 10 * 1024 * 1024,      // 10MB max buffer size
65            ping_interval: Duration::from_secs(30), // Send ping every 30 seconds
66            pong_timeout: Duration::from_secs(10),  // Wait 10 seconds for pong response
67        }
68    }
69}
70
71/// Synchronous TCP subscription listener with reconnection support.
72///
73/// Connects to the TCP server at `addr` and sends the given subscription request
74/// (encoded as JSON). It then spawns a dedicated thread that reads lines from the TCP
75/// connection. If the connection drops or an error occurs, it will attempt to reconnect
76/// according to the configuration settings.
77///
78/// # Thread Management
79///
80/// This client spawns a background thread to handle the TCP connection and event processing.
81/// To ensure proper cleanup, you should call `shutdown_and_join()` when you're done with the
82/// client. If you don't call this method, the background thread will be automatically
83/// signaled to shut down when the `TcpClient` is dropped, but the thread may continue
84/// running briefly after the client is dropped.
85///
86/// ```
87/// # use client::tcp_client_blocking::{TcpClient, TcpClientConfig};
88/// # fn main() {
89/// let client = TcpClient::new(TcpClientConfig::default());
90/// // Use the client...
91///
92/// // When done, ensure clean shutdown
93/// client.shutdown_and_join();
94/// # }
95/// ```
96#[cfg(feature = "tcp_client_blocking")]
97pub struct TcpClient {
98    shutdown_flag: Arc<AtomicBool>,
99    config: TcpClientConfig,
100    status_tracker: ConnectionStatusTracker,
101    worker_thread: Mutex<Option<JoinHandle<()>>>,
102}
103
104#[cfg(feature = "tcp_client_blocking")]
105impl TcpClient {
106    /// Creates a new TCP client with the given configuration.
107    pub fn new(config: TcpClientConfig) -> Self {
108        Self {
109            shutdown_flag: Arc::new(AtomicBool::new(false)),
110            config,
111            status_tracker: ConnectionStatusTracker::new(),
112            worker_thread: Mutex::new(None),
113        }
114    }
115
116    /// Get the current connection status
117    pub fn get_status(&self) -> ConnectionStatus {
118        self.status_tracker.get_status()
119    }
120
121    /// Get whether the client was disconnected at any point in time
122    pub fn create_status_subscriber(&self) -> mpsc::Receiver<ConnectionStatus> {
123        let (tx, rx) = mpsc::channel();
124        self.status_tracker.register_listener(tx);
125        rx
126    }
127
128    /// Checks if there is an active worker thread.
129    ///
130    /// Returns true if a worker thread is currently running.
131    pub fn has_active_thread(&self) -> bool {
132        match self.worker_thread.lock() {
133            Ok(lock) => lock.is_some(),
134            Err(_) => {
135                error!("Failed to acquire worker thread lock");
136                false
137            }
138        }
139    }
140
141    /// Subscribe to events from the given address.
142    ///
143    /// This will spawn a background thread that connects to the server and
144    /// listens for events. The events will be sent to the returned channel.
145    ///
146    /// If there's already an active worker thread, it will be shut down and
147    /// a new one will be created.
148    pub fn subscribe(
149        &self,
150        addr: String,
151        subscription_request: TcpSubscriptionRequest,
152    ) -> Result<mpsc::Receiver<Event>, TcpClientError> {
153        // Check if we already have a worker thread running
154        let mut worker_lock = self.worker_thread.lock().map_err(|_| {
155            TcpClientError::IOError(std::io::Error::new(
156                std::io::ErrorKind::Other,
157                "Failed to acquire worker thread lock",
158            ))
159        })?;
160
161        // Reset shutdown flag in case it was previously set
162        self.shutdown_flag.store(false, Ordering::SeqCst);
163
164        let shutdown_flag = self.shutdown_flag.clone();
165        let config = self.config.clone();
166        let status_tracker = self.status_tracker.clone();
167
168        // Call the subscribe function which returns both the receiver and thread handle
169        let (rx, handle) = subscribe(
170            addr,
171            subscription_request,
172            shutdown_flag,
173            config,
174            status_tracker,
175        )?;
176
177        // Store the thread handle for later joining
178        *worker_lock = Some(handle);
179
180        Ok(rx)
181    }
182
183    /// Signals the client to shut down and stop any reconnection attempts.
184    /// Does not wait for the worker thread to complete.
185    pub fn shutdown(&self) {
186        self.status_tracker
187            .update_status(ConnectionStatus::Disconnected);
188        self.shutdown_flag.store(true, Ordering::SeqCst);
189    }
190
191    /// Signals the client to shut down and waits for the worker thread to complete.
192    /// Returns true if the thread was successfully joined, false otherwise.
193    pub fn shutdown_and_join(&self) -> bool {
194        // Signal shutdown
195        self.shutdown();
196
197        // Try to join the thread
198        self.join()
199    }
200
201    /// Waits for the worker thread to complete.
202    /// Returns true if the thread was successfully joined, false otherwise.
203    pub fn join(&self) -> bool {
204        // Acquire the lock on the worker thread
205        let mut worker_lock = match self.worker_thread.lock() {
206            Ok(lock) => lock,
207            Err(e) => {
208                error!("Failed to acquire worker thread lock: {}", e);
209                return false;
210            }
211        };
212
213        // Take the thread handle out (replacing it with None)
214        if let Some(handle) = worker_lock.take() {
215            match handle.join() {
216                Ok(_) => {
217                    info!("Successfully joined worker thread");
218                    true
219                }
220                Err(e) => {
221                    error!("Failed to join worker thread: {:?}", e);
222                    false
223                }
224            }
225        } else {
226            // No thread to join
227            false
228        }
229    }
230}
231
232#[cfg(feature = "tcp_client_blocking")]
233impl Drop for TcpClient {
234    fn drop(&mut self) {
235        // Signal thread to terminate
236        self.shutdown();
237
238        // Attempt to join the thread directly in the destructor
239        // This is safe because we're taking ownership of the JoinHandle
240        if let Ok(mut worker_lock) = self.worker_thread.lock() {
241            if let Some(handle) = worker_lock.take() {
242                // Don't block for too long in a destructor - it's generally not good practice
243                // Just log that we're not waiting for the thread
244                info!("TcpClient dropped, thread will continue running until shutdown completes");
245            }
246        }
247        // The shutdown flag has been set, so the thread will terminate naturally
248    }
249}
250
251fn subscribe(
252    addr: String,
253    subscription_request: TcpSubscriptionRequest,
254    shutdown_flag: Arc<AtomicBool>,
255    config: TcpClientConfig,
256    status_tracker: ConnectionStatusTracker,
257) -> Result<(mpsc::Receiver<Event>, JoinHandle<()>), TcpClientError> {
258    // Create a standard mpsc channel to forward events.
259    let (tx, rx) = mpsc::channel::<Event>();
260
261    let address = addr
262        .to_socket_addrs()
263        .map_err(|_| TcpClientError::AddrParseError(format!("Invalid address: {}", addr)))?
264        .next()
265        .ok_or(TcpClientError::AddrParseError(format!(
266            "Invalid address: {}",
267            addr
268        )))?;
269
270    // Set initial status to Connecting
271    status_tracker.update_status(ConnectionStatus::Connecting);
272
273    // Create the reconnection manager
274    let reconnection_config = reconnection::from_tcp_client_config(&config);
275
276    let handle = thread::spawn(move || {
277        // Create a status updater for use in the thread
278        let update_status = status_tracker.create_updater();
279
280        // Create the reconnection manager
281        let mut reconnection_manager = ReconnectionManager::new(reconnection_config);
282
283        loop {
284            if shutdown_flag.load(Ordering::SeqCst) {
285                info!("Shutdown flag set. Exiting subscription thread.");
286                // Set status to disconnected
287                update_status(ConnectionStatus::Disconnected);
288                break;
289            }
290
291            // Try to connect to the server.
292            info!("Attempting to connect to {}...", addr);
293            // Ensure status is set to Connecting
294            update_status(ConnectionStatus::Connecting);
295
296            let connect_result = TcpStream::connect_timeout(&address, config.connection_timeout);
297
298            match connect_result {
299                Ok(mut stream) => {
300                    info!("Connected to server at {}", addr);
301                    // Update status to Connected
302                    update_status(ConnectionStatus::Connected);
303
304                    // Reset reconnection attempts after successful connection
305                    reconnection_manager.reset();
306
307                    // Set read timeout - use shorter timeout to allow for ping checks
308                    if let Err(e) = stream.set_read_timeout(Some(Duration::from_millis(500))) {
309                        error!("Failed to set read timeout: {}", e);
310                        continue;
311                    }
312
313                    // Set write timeout
314                    if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(5))) {
315                        error!("Failed to set write timeout: {}", e);
316                        continue;
317                    }
318
319                    // Clone the stream for reading.
320                    let reader_stream = match stream.try_clone() {
321                        Ok(rs) => rs,
322                        Err(e) => {
323                            error!("Failed to clone stream: {}", e);
324                            continue;
325                        }
326                    };
327                    let mut reader = BufReader::new(reader_stream);
328
329                    // Serialize and send the subscription request.
330                    match serde_json::to_string(&subscription_request) {
331                        Ok(req_json) => {
332                            if let Err(e) = stream.write_all(req_json.as_bytes()) {
333                                error!("Failed to send subscription request: {}", e);
334                                continue;
335                            }
336                            if let Err(e) = stream.write_all(b"\n") {
337                                error!("Failed to send newline: {}", e);
338                                continue;
339                            }
340                            if let Err(e) = stream.flush() {
341                                error!("Failed to flush stream: {}", e);
342                                continue;
343                            }
344                        }
345                        Err(e) => {
346                            error!("Failed to serialize subscription request: {}", e);
347                            break;
348                        }
349                    }
350
351                    // Initialize the byte buffer with the configured capacity
352                    let mut byte_buf = Vec::with_capacity(config.read_buffer_capacity);
353
354                    // Ping-pong state tracking
355                    let mut last_ping_time = std::time::Instant::now();
356                    let mut last_pong_time = std::time::Instant::now();
357                    let mut awaiting_pong = false;
358
359                    // Inner loop: read events from the connection with ping/pong support
360                    loop {
361                        if shutdown_flag.load(Ordering::SeqCst) {
362                            info!("Shutdown flag set. Exiting inner read loop.");
363                            update_status(ConnectionStatus::Disconnected);
364                            break;
365                        }
366
367                        // Current time
368                        let now = std::time::Instant::now();
369
370                        // Handle ping-pong logic
371                        if now.duration_since(last_ping_time) >= config.ping_interval {
372                            if awaiting_pong {
373                                // Check if we've exceeded the pong timeout
374                                if now.duration_since(last_pong_time) >= config.pong_timeout {
375                                    warn!("Pong response timed out after {:?}, considering connection dead",
376                                          now.duration_since(last_pong_time));
377                                    update_status(ConnectionStatus::Reconnecting);
378                                    break;
379                                }
380                            } else {
381                                // Time to send a ping
382                                match stream.write_all(b"PING\n") {
383                                    Ok(_) => {
384                                        if let Err(e) = stream.flush() {
385                                            error!("Failed to flush after PING: {}", e);
386                                            update_status(ConnectionStatus::Reconnecting);
387                                            break;
388                                        }
389                                        last_ping_time = now;
390                                        awaiting_pong = true;
391                                    }
392                                    Err(e) => {
393                                        error!("Failed to send PING: {}", e);
394                                        update_status(ConnectionStatus::Reconnecting);
395                                        break;
396                                    }
397                                }
398                            }
399                        }
400
401                        // Set read timeout to allow for ping checks and shutdown signals
402                        if let Err(e) = stream.set_read_timeout(Some(Duration::from_millis(50))) {
403                            error!("Failed to set read timeout: {}", e);
404                            update_status(ConnectionStatus::Reconnecting);
405                            break;
406                        }
407
408                        // Try to read until newline
409                        match reader.read_until(b'\n', &mut byte_buf) {
410                            Ok(0) => {
411                                // Connection closed by server
412                                warn!("TCP connection closed by server. Attempting to reconnect.");
413                                update_status(ConnectionStatus::Reconnecting);
414                                break;
415                            }
416                            Ok(n) if n > 0 => {
417                                // Note: read_until includes the delimiter in the buffer.
418                                // Trim whitespace and the trailing newline before processing.
419                                let message_bytes = byte_buf.trim_ascii_end();
420
421                                if !message_bytes.is_empty() {
422                                    // Check if this is a PONG response
423                                    if message_bytes == b"PONG" {
424                                        if awaiting_pong {
425                                            awaiting_pong = false;
426                                            last_pong_time = std::time::Instant::now();
427                                            debug!("Received PONG");
428                                        } else {
429                                            warn!("Received unexpected PONG");
430                                        }
431                                    } else {
432                                        // Check if message size exceeds limit *before* parsing JSON
433                                        if message_bytes.len() > config.max_buffer_size {
434                                            error!(
435                                                "Received message exceeds maximum allowed size ({}), skipping. Message starts with: {:?}",
436                                                config.max_buffer_size,
437                                                String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 50)]) // Log first 50 bytes
438                                            );
439                                            // Don't break, just clear buffer and continue reading the next message.
440                                        } else {
441                                            // Try to parse as an event from the byte slice
442                                            match serde_json::from_slice::<Event>(message_bytes) {
443                                                Ok(event) => {
444                                                    // Any successful message means the connection is alive
445                                                    last_pong_time = std::time::Instant::now();
446                                                    awaiting_pong = false; // Reset awaiting_pong if we received a valid event
447
448                                                    if tx.send(event).is_err() {
449                                                        error!("Receiver dropped. Exiting subscription thread.");
450                                                        update_status(
451                                                            ConnectionStatus::Disconnected,
452                                                        ); // Set status before returning
453                                                        return; // Exit the thread
454                                                    }
455                                                }
456                                                Err(e) => {
457                                                    error!(
458                                                        "Failed to parse event: {}. Raw data (first 100 bytes): {:?}",
459                                                        e,
460                                                        String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 100)])
461                                                    );
462                                                    // Consider if this error should cause a reconnect or just skip
463                                                    // For now, let's try reconnecting on parse error for safety
464                                                    update_status(ConnectionStatus::Reconnecting);
465                                                    break; // Trigger reconnect on parse error
466                                                }
467                                            }
468                                        }
469                                    }
470                                }
471                                // Clear the buffer for the next message AFTER processing the current one
472                                byte_buf.clear();
473                            }
474                            Ok(_) => {
475                                // n == 0, should be handled by Ok(0) case, safety belt
476                                byte_buf.clear();
477                            }
478                            Err(e) => {
479                                if e.kind() == std::io::ErrorKind::TimedOut
480                                    || e.kind() == std::io::ErrorKind::WouldBlock
481                                {
482                                    // Expected timeout - continue the loop to check ping/shutdown
483                                    continue;
484                                } else {
485                                    // Real error
486                                    error!("Error reading from TCP socket using read_until: {}", e);
487                                    update_status(ConnectionStatus::Reconnecting);
488                                    break; // Break inner loop to trigger reconnect
489                                }
490                            }
491                        }
492
493                        // Check if buffer capacity is exceeding limits (less critical with clear(), but good safety)
494                        if byte_buf.capacity() > config.max_buffer_size {
495                            error!("Buffer capacity exceeded maximum allowed size ({}), resetting connection.", config.max_buffer_size);
496                            update_status(ConnectionStatus::Reconnecting);
497                            break;
498                        }
499                    } // end inner loop for current connection
500
501                    // When we exit the inner loop (connection lost or shutdown)
502                    // Update status to reconnecting only if not shutting down
503                    if !shutdown_flag.load(Ordering::SeqCst) {
504                        update_status(ConnectionStatus::Reconnecting);
505                    }
506                }
507                Err(e) => {
508                    error!("Failed to connect to {}: {}", addr, e);
509                    // Set status to reconnecting since we're going to try again
510                    update_status(ConnectionStatus::Reconnecting);
511                }
512            }
513
514            // Before attempting reconnect, check shutdown flag again
515            if shutdown_flag.load(Ordering::SeqCst) {
516                update_status(ConnectionStatus::Disconnected);
517                break;
518            }
519
520            // Get the next delay from the reconnection manager
521            match reconnection_manager.next_delay() {
522                Some(wait_time) => {
523                    info!(
524                        "Reconnecting in {:?}... (attempt {}/{:?})",
525                        wait_time,
526                        reconnection_manager.current_attempt(),
527                        reconnection_manager.config().max_attempts
528                    );
529                    // Use a flag-aware sleep
530                    let sleep_start = std::time::Instant::now();
531                    while sleep_start.elapsed() < wait_time {
532                        if shutdown_flag.load(Ordering::SeqCst) {
533                            info!("Shutdown detected during reconnect wait.");
534                            update_status(ConnectionStatus::Disconnected);
535                            return; // Exit thread immediately
536                        }
537                        thread::sleep(Duration::from_millis(50)); // Check flag periodically
538                    }
539                }
540                None => {
541                    error!(
542                        "Reached maximum reconnection attempts ({}). Exiting.",
543                        reconnection_manager.config().max_attempts.unwrap_or(0)
544                    );
545                    // Set status to disconnected when max attempts reached
546                    update_status(ConnectionStatus::Disconnected);
547                    break;
548                }
549            }
550        }
551        info!("Exiting TCP subscription thread.");
552        // Ensure status is Disconnected when thread exits naturally
553        update_status(ConnectionStatus::Disconnected);
554    });
555
556    Ok((rx, handle))
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562    use std::io::{BufRead, Read, Write};
563    use std::net::{SocketAddr, TcpListener};
564    use std::thread;
565    use std::time::Duration;
566    use titan_types_core::EventType;
567
568    // Helper function to create a test TCP server
569    fn start_test_server(ready_tx: std::sync::mpsc::Sender<SocketAddr>) -> JoinHandle<()> {
570        thread::spawn(move || {
571            // Bind to a random available port
572            let listener = TcpListener::bind("127.0.0.1:0").unwrap();
573            let addr = listener.local_addr().unwrap();
574
575            // Notify the test that we're ready and send the address
576            ready_tx.send(addr).unwrap();
577
578            // Accept one connection
579            if let Ok((mut stream, _)) = listener.accept() {
580                let mut reader = BufReader::new(stream.try_clone().unwrap());
581                let mut request_buf = Vec::new();
582
583                // Read the subscription request
584                match reader.read_until(b'\n', &mut request_buf) {
585                    Ok(n) if n > 0 => {
586                        let request_bytes = request_buf.trim_ascii_end();
587                        println!(
588                            "Server received request: {}",
589                            String::from_utf8_lossy(request_bytes)
590                        );
591
592                        // Add a small delay to ensure the client is ready to receive
593                        thread::sleep(Duration::from_millis(50));
594
595                        // Send a sample event - using correct format for Event
596                        let event = r#"{"type":"TransactionsAdded","data": {"txids":["1111111111111111111111111111111111111111111111111111111111111111"]}}"#;
597                        if let Err(e) = stream.write_all(event.as_bytes()) {
598                            println!("Server write error: {}", e);
599                            return;
600                        }
601                        if let Err(e) = stream.write_all(b"\n") {
602                            println!("Server write error: {}", e);
603                            return;
604                        }
605                        if let Err(e) = stream.flush() {
606                            println!("Server flush error: {}", e);
607                            return;
608                        }
609
610                        // Keep the connection open for a while to ensure the client can read the response
611                        thread::sleep(Duration::from_millis(500));
612                    }
613                    Ok(0) => println!("Server: Client disconnected before sending request"),
614                    Err(e) => println!("Test server read error: {}", e),
615                    _ => println!("Server: Unexpected read result for request"),
616                }
617            }
618        })
619    }
620
621    #[test]
622    fn test_connection_status_transitions() {
623        // Create a channel to sync with the test server
624        let (ready_tx, ready_rx) = std::sync::mpsc::channel();
625
626        // Start a test server
627        let server_handle = start_test_server(ready_tx);
628
629        // Wait for the server to be ready and get its address
630        let server_addr = ready_rx.recv_timeout(Duration::from_secs(5)).unwrap();
631
632        // Create a client with short timeout
633        let config = TcpClientConfig {
634            connection_timeout: Duration::from_secs(1),
635            max_reconnect_attempts: Some(1),
636            base_reconnect_interval: Duration::from_millis(100),
637            ..TcpClientConfig::default()
638        };
639        let client = TcpClient::new(config);
640
641        // Initially disconnected
642        assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
643
644        // Subscribe - this should connect
645        let subscription_request = TcpSubscriptionRequest {
646            subscribe: vec![EventType::TransactionsAdded],
647        };
648
649        let rx = client
650            .subscribe(format!("{}", server_addr), subscription_request)
651            .unwrap();
652
653        // Give it time to connect
654        thread::sleep(Duration::from_millis(100));
655
656        // Should be connected now
657        assert_eq!(client.get_status(), ConnectionStatus::Connected);
658
659        // Shutdown the client
660        client.shutdown_and_join();
661
662        // Check the client is disconnected
663        assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
664
665        // Wait for the server to finish
666        server_handle.join().unwrap();
667    }
668
669    #[test]
670    fn test_receive_events() {
671        // Create a channel to sync with the test server
672        let (ready_tx, ready_rx) = std::sync::mpsc::channel();
673
674        // Start a test server
675        let server_handle = start_test_server(ready_tx);
676
677        // Wait for the server to be ready and get its address
678        let server_addr = ready_rx.recv_timeout(Duration::from_secs(5)).unwrap();
679
680        // Create a client with short timeout
681        let config = TcpClientConfig {
682            connection_timeout: Duration::from_secs(1),
683            max_reconnect_attempts: Some(1),
684            base_reconnect_interval: Duration::from_millis(100),
685            ..TcpClientConfig::default()
686        };
687        let client = TcpClient::new(config);
688
689        // Subscribe to receive events
690        let subscription_request = TcpSubscriptionRequest {
691            subscribe: vec![EventType::TransactionsAdded],
692        };
693
694        let rx = client
695            .subscribe(format!("{}", server_addr), subscription_request)
696            .unwrap();
697
698        // Give it time to establish connection
699        thread::sleep(Duration::from_millis(200));
700
701        // Try to receive an event with timeout
702        let event = rx.recv_timeout(Duration::from_secs(2));
703        assert!(event.is_ok(), "Should have received an event");
704
705        match event.unwrap() {
706            Event::TransactionsAdded { txids } => {
707                assert_eq!(txids.len(), 1);
708                assert_eq!(
709                    txids[0].to_string(),
710                    "1111111111111111111111111111111111111111111111111111111111111111"
711                );
712            }
713            other => panic!("Received unexpected event type: {:?}", other),
714        }
715
716        // Shutdown the client
717        client.shutdown_and_join();
718
719        // Wait for the server to finish
720        server_handle.join().unwrap();
721    }
722
723    #[test]
724    fn test_connection_error_handling() {
725        // Create a client with short timeout
726        let config = TcpClientConfig {
727            connection_timeout: Duration::from_secs(1),
728            max_reconnect_attempts: Some(2),
729            base_reconnect_interval: Duration::from_millis(100),
730            ..TcpClientConfig::default()
731        };
732        let client = TcpClient::new(config);
733
734        // Initially disconnected
735        assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
736
737        // Try to connect to a non-existent server
738        let subscription_request = TcpSubscriptionRequest {
739            subscribe: vec![EventType::TransactionsAdded],
740        };
741
742        let _rx = client
743            .subscribe("127.0.0.1:1".to_string(), subscription_request)
744            .unwrap();
745
746        // Give it time to attempt connection and reconnection
747        thread::sleep(Duration::from_millis(500));
748
749        // Should be in reconnecting state or disconnected if it already gave up
750        let status = client.get_status();
751        assert!(
752            status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
753            "Expected Reconnecting or Disconnected state, got {:?}",
754            status
755        );
756
757        // Shutdown the client
758        client.shutdown_and_join();
759
760        // Check the client is disconnected
761        assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
762    }
763
764    #[test]
765    fn test_resource_cleanup() {
766        // Create a client
767        let client = TcpClient::new(TcpClientConfig::default());
768
769        // Subscribe to a non-existent server
770        let subscription_request = TcpSubscriptionRequest {
771            subscribe: vec![EventType::TransactionsAdded],
772        };
773
774        let rx = client
775            .subscribe("127.0.0.1:1".to_string(), subscription_request)
776            .unwrap();
777
778        // Verify we have an active thread
779        assert!(client.has_active_thread());
780
781        // Drop the receiver channel
782        drop(rx);
783
784        // Give the thread a moment to notice the receiver is dropped (if applicable)
785        thread::sleep(Duration::from_millis(50));
786
787        // Shutdown and join the client
788        let joined = client.shutdown_and_join();
789        assert!(joined, "Should have successfully joined the worker thread");
790
791        // Verify we no longer have an active thread
792        assert!(!client.has_active_thread());
793    }
794
795    // Helper function to create a test TCP server that handles ping/pong
796    fn start_ping_pong_server(ready_tx: std::sync::mpsc::Sender<SocketAddr>) -> JoinHandle<()> {
797        thread::spawn(move || {
798            // Bind to a random available port
799            let listener = TcpListener::bind("127.0.0.1:0").unwrap();
800            let addr = listener.local_addr().unwrap();
801
802            // Notify the test that we're ready and send the address
803            ready_tx.send(addr).unwrap();
804
805            // Accept one connection
806            if let Ok((mut stream, _)) = listener.accept() {
807                // Set a read timeout so we don't block forever
808                stream
809                    .set_read_timeout(Some(Duration::from_millis(200)))
810                    .unwrap();
811
812                // Create a buffer reader
813                let mut reader = BufReader::new(stream.try_clone().unwrap());
814                let mut line_buf = Vec::new(); // Use Vec<u8> for read_until
815
816                // Read the subscription request
817                match reader.read_until(b'\n', &mut line_buf) {
818                    Ok(n) if n > 0 => {
819                        let request_bytes = line_buf.trim_ascii_end();
820                        println!(
821                            "Ping-pong server received request: {}",
822                            String::from_utf8_lossy(request_bytes)
823                        );
824
825                        // Send a sample event
826                        let event = r#"{"type":"TransactionsAdded","data": {"txids":["1111111111111111111111111111111111111111111111111111111111111111"]}}"#;
827                        if let Err(e) = stream.write_all(event.as_bytes()) {
828                            println!("Server write error: {}", e);
829                            return;
830                        }
831                        if let Err(e) = stream.write_all(b"\n") {
832                            println!("Server write error: {}", e);
833                            return;
834                        }
835                        if let Err(e) = stream.flush() {
836                            println!("Server flush error: {}", e);
837                            return;
838                        }
839                        println!("Ping-pong server sent initial event");
840                    }
841                    Ok(0) => {
842                        println!("Ping-pong server: Client disconnected early");
843                        return;
844                    }
845                    _ => {
846                        println!("Ping-pong server failed to read subscription request");
847                        return;
848                    }
849                }
850
851                // Clear line for next reads
852                line_buf.clear();
853
854                // Keep handling ping/pong for a while
855                let start = std::time::Instant::now();
856                let timeout = Duration::from_secs(5); // Run for 5 seconds
857
858                while start.elapsed() < timeout {
859                    match reader.read_until(b'\n', &mut line_buf) {
860                        // Use read_until here too
861                        Ok(0) => {
862                            println!("Ping-pong server: client closed connection");
863                            break;
864                        }
865                        Ok(n) if n > 0 => {
866                            let trimmed_line = line_buf.trim_ascii_end(); // Trim bytes
867                            println!(
868                                "Ping-pong server received: {}",
869                                String::from_utf8_lossy(trimmed_line)
870                            );
871
872                            if trimmed_line == b"PING" {
873                                // Compare bytes
874                                println!("Ping-pong server sending PONG");
875                                if let Err(e) = stream.write_all(b"PONG\n") {
876                                    println!("Ping-pong server failed to send PONG: {}", e);
877                                    break;
878                                }
879                                if let Err(e) = stream.flush() {
880                                    println!("Ping-pong server failed to flush PONG: {}", e);
881                                    break;
882                                }
883                            }
884                            line_buf.clear(); // Clear buffer after processing
885                        }
886                        Ok(_) => {
887                            /* n==0 case handled above */
888                            line_buf.clear();
889                        }
890                        Err(e)
891                            if e.kind() == std::io::ErrorKind::WouldBlock
892                                || e.kind() == std::io::ErrorKind::TimedOut =>
893                        {
894                            // Expected timeout - continue
895                            line_buf.clear(); // Ensure buffer is cleared even on timeout
896                        }
897                        Err(e) => {
898                            println!("Ping-pong server error: {}", e);
899                            break;
900                        }
901                    }
902
903                    // Small sleep to prevent tight loop
904                    thread::sleep(Duration::from_millis(50));
905                }
906
907                println!("Ping-pong server shutting down");
908            }
909        })
910    }
911
912    #[test]
913    fn test_ping_pong_mechanism() {
914        // Create a channel to sync with the test server
915        let (ready_tx, ready_rx) = std::sync::mpsc::channel();
916
917        // Start a ping-pong test server
918        let server_handle = start_ping_pong_server(ready_tx);
919
920        // Wait for the server to be ready and get its address
921        let server_addr = ready_rx.recv_timeout(Duration::from_secs(5)).unwrap();
922
923        // Create a client with short ping interval for faster testing
924        let config = TcpClientConfig {
925            connection_timeout: Duration::from_secs(1),
926            max_reconnect_attempts: Some(1),
927            base_reconnect_interval: Duration::from_millis(100),
928            ping_interval: Duration::from_millis(500), // Short ping interval for testing
929            pong_timeout: Duration::from_millis(1000), // 1 second timeout
930            ..TcpClientConfig::default()
931        };
932        let client = TcpClient::new(config);
933
934        // Subscribe to receive events
935        let subscription_request = TcpSubscriptionRequest {
936            subscribe: vec![EventType::TransactionsAdded],
937        };
938
939        let _rx = client
940            .subscribe(format!("{}", server_addr), subscription_request)
941            .unwrap();
942
943        // Give it time to establish connection
944        thread::sleep(Duration::from_millis(200));
945
946        // Verify connection status is connected
947        assert_eq!(
948            client.get_status(),
949            ConnectionStatus::Connected,
950            "Client should be connected"
951        );
952
953        // Wait long enough for multiple ping/pong cycles
954        thread::sleep(Duration::from_secs(2));
955
956        // Verify still connected after ping/pong cycles
957        assert_eq!(
958            client.get_status(),
959            ConnectionStatus::Connected,
960            "Client should remain connected after ping/pong exchanges"
961        );
962
963        // Shutdown the client
964        client.shutdown_and_join();
965
966        // Wait for the server to finish
967        server_handle.join().unwrap();
968    }
969}