titan_client/tcp/tcp_client_blocking.rs
1use std::{
2 io::{BufRead, BufReader, Write},
3 net::{TcpStream, ToSocketAddrs},
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 mpsc, Arc, Mutex,
7 },
8 thread::{self, JoinHandle},
9 time::Duration,
10};
11
12use serde_json;
13use thiserror::Error;
14use titan_types_api::TcpSubscriptionRequest;
15use titan_types_core::Event;
16use tracing::{debug, error, info, warn};
17
18use crate::tcp::reconnection::ReconnectionManager;
19
20use super::{
21 connection_status::{ConnectionStatus, ConnectionStatusTracker},
22 reconnection,
23};
24
25#[derive(Debug, Error)]
26pub enum TcpClientError {
27 #[error("io error: {0}")]
28 IOError(#[from] std::io::Error),
29 #[error("serde error: {0}")]
30 SerdeError(#[from] serde_json::Error),
31 #[error("address parse error: {0}")]
32 AddrParseError(String),
33}
34/// Configuration for TCP client reconnection.
35#[derive(Debug, Clone)]
36pub struct TcpClientConfig {
37 /// Base duration for reconnect interval (will be used with exponential backoff)
38 pub base_reconnect_interval: Duration,
39 /// Maximum reconnect interval (cap for exponential backoff)
40 pub max_reconnect_interval: Duration,
41 /// Maximum number of reconnection attempts.
42 /// Use `None` for unlimited attempts.
43 pub max_reconnect_attempts: Option<u32>,
44 /// Connection timeout.
45 pub connection_timeout: Duration,
46 /// Initial capacity of the read buffer (in bytes)
47 pub read_buffer_capacity: usize,
48 /// Maximum allowed size for the read buffer (in bytes)
49 pub max_buffer_size: usize,
50 /// Interval between ping messages
51 pub ping_interval: Duration,
52 /// Timeout for waiting for pong responses
53 pub pong_timeout: Duration,
54}
55
56impl Default for TcpClientConfig {
57 fn default() -> Self {
58 TcpClientConfig {
59 base_reconnect_interval: Duration::from_secs(1),
60 max_reconnect_interval: Duration::from_secs(60),
61 max_reconnect_attempts: None,
62 connection_timeout: Duration::from_secs(30),
63 read_buffer_capacity: 4096, // 4KB initial capacity
64 max_buffer_size: 10 * 1024 * 1024, // 10MB max buffer size
65 ping_interval: Duration::from_secs(30), // Send ping every 30 seconds
66 pong_timeout: Duration::from_secs(10), // Wait 10 seconds for pong response
67 }
68 }
69}
70
71/// Synchronous TCP subscription listener with reconnection support.
72///
73/// Connects to the TCP server at `addr` and sends the given subscription request
74/// (encoded as JSON). It then spawns a dedicated thread that reads lines from the TCP
75/// connection. If the connection drops or an error occurs, it will attempt to reconnect
76/// according to the configuration settings.
77///
78/// # Thread Management
79///
80/// This client spawns a background thread to handle the TCP connection and event processing.
81/// To ensure proper cleanup, you should call `shutdown_and_join()` when you're done with the
82/// client. If you don't call this method, the background thread will be automatically
83/// signaled to shut down when the `TcpClient` is dropped, but the thread may continue
84/// running briefly after the client is dropped.
85///
86/// ```
87/// # use client::tcp_client_blocking::{TcpClient, TcpClientConfig};
88/// # fn main() {
89/// let client = TcpClient::new(TcpClientConfig::default());
90/// // Use the client...
91///
92/// // When done, ensure clean shutdown
93/// client.shutdown_and_join();
94/// # }
95/// ```
96#[cfg(feature = "tcp_client_blocking")]
97pub struct TcpClient {
98 shutdown_flag: Arc<AtomicBool>,
99 config: TcpClientConfig,
100 status_tracker: ConnectionStatusTracker,
101 worker_thread: Mutex<Option<JoinHandle<()>>>,
102}
103
104#[cfg(feature = "tcp_client_blocking")]
105impl TcpClient {
106 /// Creates a new TCP client with the given configuration.
107 pub fn new(config: TcpClientConfig) -> Self {
108 Self {
109 shutdown_flag: Arc::new(AtomicBool::new(false)),
110 config,
111 status_tracker: ConnectionStatusTracker::new(),
112 worker_thread: Mutex::new(None),
113 }
114 }
115
116 /// Get the current connection status
117 pub fn get_status(&self) -> ConnectionStatus {
118 self.status_tracker.get_status()
119 }
120
121 /// Get whether the client was disconnected at any point in time
122 pub fn create_status_subscriber(&self) -> mpsc::Receiver<ConnectionStatus> {
123 let (tx, rx) = mpsc::channel();
124 self.status_tracker.register_listener(tx);
125 rx
126 }
127
128 /// Checks if there is an active worker thread.
129 ///
130 /// Returns true if a worker thread is currently running.
131 pub fn has_active_thread(&self) -> bool {
132 match self.worker_thread.lock() {
133 Ok(lock) => lock.is_some(),
134 Err(_) => {
135 error!("Failed to acquire worker thread lock");
136 false
137 }
138 }
139 }
140
141 /// Subscribe to events from the given address.
142 ///
143 /// This will spawn a background thread that connects to the server and
144 /// listens for events. The events will be sent to the returned channel.
145 ///
146 /// If there's already an active worker thread, it will be shut down and
147 /// a new one will be created.
148 pub fn subscribe(
149 &self,
150 addr: String,
151 subscription_request: TcpSubscriptionRequest,
152 ) -> Result<mpsc::Receiver<Event>, TcpClientError> {
153 // Check if we already have a worker thread running
154 let mut worker_lock = self.worker_thread.lock().map_err(|_| {
155 TcpClientError::IOError(std::io::Error::new(
156 std::io::ErrorKind::Other,
157 "Failed to acquire worker thread lock",
158 ))
159 })?;
160
161 // Reset shutdown flag in case it was previously set
162 self.shutdown_flag.store(false, Ordering::SeqCst);
163
164 let shutdown_flag = self.shutdown_flag.clone();
165 let config = self.config.clone();
166 let status_tracker = self.status_tracker.clone();
167
168 // Call the subscribe function which returns both the receiver and thread handle
169 let (rx, handle) = subscribe(
170 addr,
171 subscription_request,
172 shutdown_flag,
173 config,
174 status_tracker,
175 )?;
176
177 // Store the thread handle for later joining
178 *worker_lock = Some(handle);
179
180 Ok(rx)
181 }
182
183 /// Signals the client to shut down and stop any reconnection attempts.
184 /// Does not wait for the worker thread to complete.
185 pub fn shutdown(&self) {
186 self.status_tracker
187 .update_status(ConnectionStatus::Disconnected);
188 self.shutdown_flag.store(true, Ordering::SeqCst);
189 }
190
191 /// Signals the client to shut down and waits for the worker thread to complete.
192 /// Returns true if the thread was successfully joined, false otherwise.
193 pub fn shutdown_and_join(&self) -> bool {
194 // Signal shutdown
195 self.shutdown();
196
197 // Try to join the thread
198 self.join()
199 }
200
201 /// Waits for the worker thread to complete.
202 /// Returns true if the thread was successfully joined, false otherwise.
203 pub fn join(&self) -> bool {
204 // Acquire the lock on the worker thread
205 let mut worker_lock = match self.worker_thread.lock() {
206 Ok(lock) => lock,
207 Err(e) => {
208 error!("Failed to acquire worker thread lock: {}", e);
209 return false;
210 }
211 };
212
213 // Take the thread handle out (replacing it with None)
214 if let Some(handle) = worker_lock.take() {
215 match handle.join() {
216 Ok(_) => {
217 info!("Successfully joined worker thread");
218 true
219 }
220 Err(e) => {
221 error!("Failed to join worker thread: {:?}", e);
222 false
223 }
224 }
225 } else {
226 // No thread to join
227 false
228 }
229 }
230}
231
232#[cfg(feature = "tcp_client_blocking")]
233impl Drop for TcpClient {
234 fn drop(&mut self) {
235 // Signal thread to terminate
236 self.shutdown();
237
238 // Attempt to join the thread directly in the destructor
239 // This is safe because we're taking ownership of the JoinHandle
240 if let Ok(mut worker_lock) = self.worker_thread.lock() {
241 if let Some(handle) = worker_lock.take() {
242 // Don't block for too long in a destructor - it's generally not good practice
243 // Just log that we're not waiting for the thread
244 info!("TcpClient dropped, thread will continue running until shutdown completes");
245 }
246 }
247 // The shutdown flag has been set, so the thread will terminate naturally
248 }
249}
250
251fn subscribe(
252 addr: String,
253 subscription_request: TcpSubscriptionRequest,
254 shutdown_flag: Arc<AtomicBool>,
255 config: TcpClientConfig,
256 status_tracker: ConnectionStatusTracker,
257) -> Result<(mpsc::Receiver<Event>, JoinHandle<()>), TcpClientError> {
258 // Create a standard mpsc channel to forward events.
259 let (tx, rx) = mpsc::channel::<Event>();
260
261 let address = addr
262 .to_socket_addrs()
263 .map_err(|_| TcpClientError::AddrParseError(format!("Invalid address: {}", addr)))?
264 .next()
265 .ok_or(TcpClientError::AddrParseError(format!(
266 "Invalid address: {}",
267 addr
268 )))?;
269
270 // Set initial status to Connecting
271 status_tracker.update_status(ConnectionStatus::Connecting);
272
273 // Create the reconnection manager
274 let reconnection_config = reconnection::from_tcp_client_config(&config);
275
276 let handle = thread::spawn(move || {
277 // Create a status updater for use in the thread
278 let update_status = status_tracker.create_updater();
279
280 // Create the reconnection manager
281 let mut reconnection_manager = ReconnectionManager::new(reconnection_config);
282
283 loop {
284 if shutdown_flag.load(Ordering::SeqCst) {
285 info!("Shutdown flag set. Exiting subscription thread.");
286 // Set status to disconnected
287 update_status(ConnectionStatus::Disconnected);
288 break;
289 }
290
291 // Try to connect to the server.
292 info!("Attempting to connect to {}...", addr);
293 // Ensure status is set to Connecting
294 update_status(ConnectionStatus::Connecting);
295
296 let connect_result = TcpStream::connect_timeout(&address, config.connection_timeout);
297
298 match connect_result {
299 Ok(mut stream) => {
300 info!("Connected to server at {}", addr);
301 // Update status to Connected
302 update_status(ConnectionStatus::Connected);
303
304 // Reset reconnection attempts after successful connection
305 reconnection_manager.reset();
306
307 // Set read timeout - use shorter timeout to allow for ping checks
308 if let Err(e) = stream.set_read_timeout(Some(Duration::from_millis(500))) {
309 error!("Failed to set read timeout: {}", e);
310 continue;
311 }
312
313 // Set write timeout
314 if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(5))) {
315 error!("Failed to set write timeout: {}", e);
316 continue;
317 }
318
319 // Clone the stream for reading.
320 let reader_stream = match stream.try_clone() {
321 Ok(rs) => rs,
322 Err(e) => {
323 error!("Failed to clone stream: {}", e);
324 continue;
325 }
326 };
327 let mut reader = BufReader::new(reader_stream);
328
329 // Serialize and send the subscription request.
330 match serde_json::to_string(&subscription_request) {
331 Ok(req_json) => {
332 if let Err(e) = stream.write_all(req_json.as_bytes()) {
333 error!("Failed to send subscription request: {}", e);
334 continue;
335 }
336 if let Err(e) = stream.write_all(b"\n") {
337 error!("Failed to send newline: {}", e);
338 continue;
339 }
340 if let Err(e) = stream.flush() {
341 error!("Failed to flush stream: {}", e);
342 continue;
343 }
344 }
345 Err(e) => {
346 error!("Failed to serialize subscription request: {}", e);
347 break;
348 }
349 }
350
351 // Initialize the byte buffer with the configured capacity
352 let mut byte_buf = Vec::with_capacity(config.read_buffer_capacity);
353
354 // Ping-pong state tracking
355 let mut last_ping_time = std::time::Instant::now();
356 let mut last_pong_time = std::time::Instant::now();
357 let mut awaiting_pong = false;
358
359 // Inner loop: read events from the connection with ping/pong support
360 loop {
361 if shutdown_flag.load(Ordering::SeqCst) {
362 info!("Shutdown flag set. Exiting inner read loop.");
363 update_status(ConnectionStatus::Disconnected);
364 break;
365 }
366
367 // Current time
368 let now = std::time::Instant::now();
369
370 // Handle ping-pong logic
371 if now.duration_since(last_ping_time) >= config.ping_interval {
372 if awaiting_pong {
373 // Check if we've exceeded the pong timeout
374 if now.duration_since(last_pong_time) >= config.pong_timeout {
375 warn!("Pong response timed out after {:?}, considering connection dead",
376 now.duration_since(last_pong_time));
377 update_status(ConnectionStatus::Reconnecting);
378 break;
379 }
380 } else {
381 // Time to send a ping
382 match stream.write_all(b"PING\n") {
383 Ok(_) => {
384 if let Err(e) = stream.flush() {
385 error!("Failed to flush after PING: {}", e);
386 update_status(ConnectionStatus::Reconnecting);
387 break;
388 }
389 last_ping_time = now;
390 awaiting_pong = true;
391 }
392 Err(e) => {
393 error!("Failed to send PING: {}", e);
394 update_status(ConnectionStatus::Reconnecting);
395 break;
396 }
397 }
398 }
399 }
400
401 // Set read timeout to allow for ping checks and shutdown signals
402 if let Err(e) = stream.set_read_timeout(Some(Duration::from_millis(50))) {
403 error!("Failed to set read timeout: {}", e);
404 update_status(ConnectionStatus::Reconnecting);
405 break;
406 }
407
408 // Try to read until newline
409 match reader.read_until(b'\n', &mut byte_buf) {
410 Ok(0) => {
411 // Connection closed by server
412 warn!("TCP connection closed by server. Attempting to reconnect.");
413 update_status(ConnectionStatus::Reconnecting);
414 break;
415 }
416 Ok(n) if n > 0 => {
417 // Note: read_until includes the delimiter in the buffer.
418 // Trim whitespace and the trailing newline before processing.
419 let message_bytes = byte_buf.trim_ascii_end();
420
421 if !message_bytes.is_empty() {
422 // Check if this is a PONG response
423 if message_bytes == b"PONG" {
424 if awaiting_pong {
425 awaiting_pong = false;
426 last_pong_time = std::time::Instant::now();
427 debug!("Received PONG");
428 } else {
429 warn!("Received unexpected PONG");
430 }
431 } else {
432 // Check if message size exceeds limit *before* parsing JSON
433 if message_bytes.len() > config.max_buffer_size {
434 error!(
435 "Received message exceeds maximum allowed size ({}), skipping. Message starts with: {:?}",
436 config.max_buffer_size,
437 String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 50)]) // Log first 50 bytes
438 );
439 // Don't break, just clear buffer and continue reading the next message.
440 } else {
441 // Try to parse as an event from the byte slice
442 match serde_json::from_slice::<Event>(message_bytes) {
443 Ok(event) => {
444 // Any successful message means the connection is alive
445 last_pong_time = std::time::Instant::now();
446 awaiting_pong = false; // Reset awaiting_pong if we received a valid event
447
448 if tx.send(event).is_err() {
449 error!("Receiver dropped. Exiting subscription thread.");
450 update_status(
451 ConnectionStatus::Disconnected,
452 ); // Set status before returning
453 return; // Exit the thread
454 }
455 }
456 Err(e) => {
457 error!(
458 "Failed to parse event: {}. Raw data (first 100 bytes): {:?}",
459 e,
460 String::from_utf8_lossy(&message_bytes[..std::cmp::min(message_bytes.len(), 100)])
461 );
462 // Consider if this error should cause a reconnect or just skip
463 // For now, let's try reconnecting on parse error for safety
464 update_status(ConnectionStatus::Reconnecting);
465 break; // Trigger reconnect on parse error
466 }
467 }
468 }
469 }
470 }
471 // Clear the buffer for the next message AFTER processing the current one
472 byte_buf.clear();
473 }
474 Ok(_) => {
475 // n == 0, should be handled by Ok(0) case, safety belt
476 byte_buf.clear();
477 }
478 Err(e) => {
479 if e.kind() == std::io::ErrorKind::TimedOut
480 || e.kind() == std::io::ErrorKind::WouldBlock
481 {
482 // Expected timeout - continue the loop to check ping/shutdown
483 continue;
484 } else {
485 // Real error
486 error!("Error reading from TCP socket using read_until: {}", e);
487 update_status(ConnectionStatus::Reconnecting);
488 break; // Break inner loop to trigger reconnect
489 }
490 }
491 }
492
493 // Check if buffer capacity is exceeding limits (less critical with clear(), but good safety)
494 if byte_buf.capacity() > config.max_buffer_size {
495 error!("Buffer capacity exceeded maximum allowed size ({}), resetting connection.", config.max_buffer_size);
496 update_status(ConnectionStatus::Reconnecting);
497 break;
498 }
499 } // end inner loop for current connection
500
501 // When we exit the inner loop (connection lost or shutdown)
502 // Update status to reconnecting only if not shutting down
503 if !shutdown_flag.load(Ordering::SeqCst) {
504 update_status(ConnectionStatus::Reconnecting);
505 }
506 }
507 Err(e) => {
508 error!("Failed to connect to {}: {}", addr, e);
509 // Set status to reconnecting since we're going to try again
510 update_status(ConnectionStatus::Reconnecting);
511 }
512 }
513
514 // Before attempting reconnect, check shutdown flag again
515 if shutdown_flag.load(Ordering::SeqCst) {
516 update_status(ConnectionStatus::Disconnected);
517 break;
518 }
519
520 // Get the next delay from the reconnection manager
521 match reconnection_manager.next_delay() {
522 Some(wait_time) => {
523 info!(
524 "Reconnecting in {:?}... (attempt {}/{:?})",
525 wait_time,
526 reconnection_manager.current_attempt(),
527 reconnection_manager.config().max_attempts
528 );
529 // Use a flag-aware sleep
530 let sleep_start = std::time::Instant::now();
531 while sleep_start.elapsed() < wait_time {
532 if shutdown_flag.load(Ordering::SeqCst) {
533 info!("Shutdown detected during reconnect wait.");
534 update_status(ConnectionStatus::Disconnected);
535 return; // Exit thread immediately
536 }
537 thread::sleep(Duration::from_millis(50)); // Check flag periodically
538 }
539 }
540 None => {
541 error!(
542 "Reached maximum reconnection attempts ({}). Exiting.",
543 reconnection_manager.config().max_attempts.unwrap_or(0)
544 );
545 // Set status to disconnected when max attempts reached
546 update_status(ConnectionStatus::Disconnected);
547 break;
548 }
549 }
550 }
551 info!("Exiting TCP subscription thread.");
552 // Ensure status is Disconnected when thread exits naturally
553 update_status(ConnectionStatus::Disconnected);
554 });
555
556 Ok((rx, handle))
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use std::io::{BufRead, Read, Write};
563 use std::net::{SocketAddr, TcpListener};
564 use std::thread;
565 use std::time::Duration;
566 use titan_types_core::EventType;
567
568 // Helper function to create a test TCP server
569 fn start_test_server(ready_tx: std::sync::mpsc::Sender<SocketAddr>) -> JoinHandle<()> {
570 thread::spawn(move || {
571 // Bind to a random available port
572 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
573 let addr = listener.local_addr().unwrap();
574
575 // Notify the test that we're ready and send the address
576 ready_tx.send(addr).unwrap();
577
578 // Accept one connection
579 if let Ok((mut stream, _)) = listener.accept() {
580 let mut reader = BufReader::new(stream.try_clone().unwrap());
581 let mut request_buf = Vec::new();
582
583 // Read the subscription request
584 match reader.read_until(b'\n', &mut request_buf) {
585 Ok(n) if n > 0 => {
586 let request_bytes = request_buf.trim_ascii_end();
587 println!(
588 "Server received request: {}",
589 String::from_utf8_lossy(request_bytes)
590 );
591
592 // Add a small delay to ensure the client is ready to receive
593 thread::sleep(Duration::from_millis(50));
594
595 // Send a sample event - using correct format for Event
596 let event = r#"{"type":"TransactionsAdded","data": {"txids":["1111111111111111111111111111111111111111111111111111111111111111"]}}"#;
597 if let Err(e) = stream.write_all(event.as_bytes()) {
598 println!("Server write error: {}", e);
599 return;
600 }
601 if let Err(e) = stream.write_all(b"\n") {
602 println!("Server write error: {}", e);
603 return;
604 }
605 if let Err(e) = stream.flush() {
606 println!("Server flush error: {}", e);
607 return;
608 }
609
610 // Keep the connection open for a while to ensure the client can read the response
611 thread::sleep(Duration::from_millis(500));
612 }
613 Ok(0) => println!("Server: Client disconnected before sending request"),
614 Err(e) => println!("Test server read error: {}", e),
615 _ => println!("Server: Unexpected read result for request"),
616 }
617 }
618 })
619 }
620
621 #[test]
622 fn test_connection_status_transitions() {
623 // Create a channel to sync with the test server
624 let (ready_tx, ready_rx) = std::sync::mpsc::channel();
625
626 // Start a test server
627 let server_handle = start_test_server(ready_tx);
628
629 // Wait for the server to be ready and get its address
630 let server_addr = ready_rx.recv_timeout(Duration::from_secs(5)).unwrap();
631
632 // Create a client with short timeout
633 let config = TcpClientConfig {
634 connection_timeout: Duration::from_secs(1),
635 max_reconnect_attempts: Some(1),
636 base_reconnect_interval: Duration::from_millis(100),
637 ..TcpClientConfig::default()
638 };
639 let client = TcpClient::new(config);
640
641 // Initially disconnected
642 assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
643
644 // Subscribe - this should connect
645 let subscription_request = TcpSubscriptionRequest {
646 subscribe: vec![EventType::TransactionsAdded],
647 };
648
649 let rx = client
650 .subscribe(format!("{}", server_addr), subscription_request)
651 .unwrap();
652
653 // Give it time to connect
654 thread::sleep(Duration::from_millis(100));
655
656 // Should be connected now
657 assert_eq!(client.get_status(), ConnectionStatus::Connected);
658
659 // Shutdown the client
660 client.shutdown_and_join();
661
662 // Check the client is disconnected
663 assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
664
665 // Wait for the server to finish
666 server_handle.join().unwrap();
667 }
668
669 #[test]
670 fn test_receive_events() {
671 // Create a channel to sync with the test server
672 let (ready_tx, ready_rx) = std::sync::mpsc::channel();
673
674 // Start a test server
675 let server_handle = start_test_server(ready_tx);
676
677 // Wait for the server to be ready and get its address
678 let server_addr = ready_rx.recv_timeout(Duration::from_secs(5)).unwrap();
679
680 // Create a client with short timeout
681 let config = TcpClientConfig {
682 connection_timeout: Duration::from_secs(1),
683 max_reconnect_attempts: Some(1),
684 base_reconnect_interval: Duration::from_millis(100),
685 ..TcpClientConfig::default()
686 };
687 let client = TcpClient::new(config);
688
689 // Subscribe to receive events
690 let subscription_request = TcpSubscriptionRequest {
691 subscribe: vec![EventType::TransactionsAdded],
692 };
693
694 let rx = client
695 .subscribe(format!("{}", server_addr), subscription_request)
696 .unwrap();
697
698 // Give it time to establish connection
699 thread::sleep(Duration::from_millis(200));
700
701 // Try to receive an event with timeout
702 let event = rx.recv_timeout(Duration::from_secs(2));
703 assert!(event.is_ok(), "Should have received an event");
704
705 match event.unwrap() {
706 Event::TransactionsAdded { txids } => {
707 assert_eq!(txids.len(), 1);
708 assert_eq!(
709 txids[0].to_string(),
710 "1111111111111111111111111111111111111111111111111111111111111111"
711 );
712 }
713 other => panic!("Received unexpected event type: {:?}", other),
714 }
715
716 // Shutdown the client
717 client.shutdown_and_join();
718
719 // Wait for the server to finish
720 server_handle.join().unwrap();
721 }
722
723 #[test]
724 fn test_connection_error_handling() {
725 // Create a client with short timeout
726 let config = TcpClientConfig {
727 connection_timeout: Duration::from_secs(1),
728 max_reconnect_attempts: Some(2),
729 base_reconnect_interval: Duration::from_millis(100),
730 ..TcpClientConfig::default()
731 };
732 let client = TcpClient::new(config);
733
734 // Initially disconnected
735 assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
736
737 // Try to connect to a non-existent server
738 let subscription_request = TcpSubscriptionRequest {
739 subscribe: vec![EventType::TransactionsAdded],
740 };
741
742 let _rx = client
743 .subscribe("127.0.0.1:1".to_string(), subscription_request)
744 .unwrap();
745
746 // Give it time to attempt connection and reconnection
747 thread::sleep(Duration::from_millis(500));
748
749 // Should be in reconnecting state or disconnected if it already gave up
750 let status = client.get_status();
751 assert!(
752 status == ConnectionStatus::Reconnecting || status == ConnectionStatus::Disconnected,
753 "Expected Reconnecting or Disconnected state, got {:?}",
754 status
755 );
756
757 // Shutdown the client
758 client.shutdown_and_join();
759
760 // Check the client is disconnected
761 assert_eq!(client.get_status(), ConnectionStatus::Disconnected);
762 }
763
764 #[test]
765 fn test_resource_cleanup() {
766 // Create a client
767 let client = TcpClient::new(TcpClientConfig::default());
768
769 // Subscribe to a non-existent server
770 let subscription_request = TcpSubscriptionRequest {
771 subscribe: vec![EventType::TransactionsAdded],
772 };
773
774 let rx = client
775 .subscribe("127.0.0.1:1".to_string(), subscription_request)
776 .unwrap();
777
778 // Verify we have an active thread
779 assert!(client.has_active_thread());
780
781 // Drop the receiver channel
782 drop(rx);
783
784 // Give the thread a moment to notice the receiver is dropped (if applicable)
785 thread::sleep(Duration::from_millis(50));
786
787 // Shutdown and join the client
788 let joined = client.shutdown_and_join();
789 assert!(joined, "Should have successfully joined the worker thread");
790
791 // Verify we no longer have an active thread
792 assert!(!client.has_active_thread());
793 }
794
795 // Helper function to create a test TCP server that handles ping/pong
796 fn start_ping_pong_server(ready_tx: std::sync::mpsc::Sender<SocketAddr>) -> JoinHandle<()> {
797 thread::spawn(move || {
798 // Bind to a random available port
799 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
800 let addr = listener.local_addr().unwrap();
801
802 // Notify the test that we're ready and send the address
803 ready_tx.send(addr).unwrap();
804
805 // Accept one connection
806 if let Ok((mut stream, _)) = listener.accept() {
807 // Set a read timeout so we don't block forever
808 stream
809 .set_read_timeout(Some(Duration::from_millis(200)))
810 .unwrap();
811
812 // Create a buffer reader
813 let mut reader = BufReader::new(stream.try_clone().unwrap());
814 let mut line_buf = Vec::new(); // Use Vec<u8> for read_until
815
816 // Read the subscription request
817 match reader.read_until(b'\n', &mut line_buf) {
818 Ok(n) if n > 0 => {
819 let request_bytes = line_buf.trim_ascii_end();
820 println!(
821 "Ping-pong server received request: {}",
822 String::from_utf8_lossy(request_bytes)
823 );
824
825 // Send a sample event
826 let event = r#"{"type":"TransactionsAdded","data": {"txids":["1111111111111111111111111111111111111111111111111111111111111111"]}}"#;
827 if let Err(e) = stream.write_all(event.as_bytes()) {
828 println!("Server write error: {}", e);
829 return;
830 }
831 if let Err(e) = stream.write_all(b"\n") {
832 println!("Server write error: {}", e);
833 return;
834 }
835 if let Err(e) = stream.flush() {
836 println!("Server flush error: {}", e);
837 return;
838 }
839 println!("Ping-pong server sent initial event");
840 }
841 Ok(0) => {
842 println!("Ping-pong server: Client disconnected early");
843 return;
844 }
845 _ => {
846 println!("Ping-pong server failed to read subscription request");
847 return;
848 }
849 }
850
851 // Clear line for next reads
852 line_buf.clear();
853
854 // Keep handling ping/pong for a while
855 let start = std::time::Instant::now();
856 let timeout = Duration::from_secs(5); // Run for 5 seconds
857
858 while start.elapsed() < timeout {
859 match reader.read_until(b'\n', &mut line_buf) {
860 // Use read_until here too
861 Ok(0) => {
862 println!("Ping-pong server: client closed connection");
863 break;
864 }
865 Ok(n) if n > 0 => {
866 let trimmed_line = line_buf.trim_ascii_end(); // Trim bytes
867 println!(
868 "Ping-pong server received: {}",
869 String::from_utf8_lossy(trimmed_line)
870 );
871
872 if trimmed_line == b"PING" {
873 // Compare bytes
874 println!("Ping-pong server sending PONG");
875 if let Err(e) = stream.write_all(b"PONG\n") {
876 println!("Ping-pong server failed to send PONG: {}", e);
877 break;
878 }
879 if let Err(e) = stream.flush() {
880 println!("Ping-pong server failed to flush PONG: {}", e);
881 break;
882 }
883 }
884 line_buf.clear(); // Clear buffer after processing
885 }
886 Ok(_) => {
887 /* n==0 case handled above */
888 line_buf.clear();
889 }
890 Err(e)
891 if e.kind() == std::io::ErrorKind::WouldBlock
892 || e.kind() == std::io::ErrorKind::TimedOut =>
893 {
894 // Expected timeout - continue
895 line_buf.clear(); // Ensure buffer is cleared even on timeout
896 }
897 Err(e) => {
898 println!("Ping-pong server error: {}", e);
899 break;
900 }
901 }
902
903 // Small sleep to prevent tight loop
904 thread::sleep(Duration::from_millis(50));
905 }
906
907 println!("Ping-pong server shutting down");
908 }
909 })
910 }
911
912 #[test]
913 fn test_ping_pong_mechanism() {
914 // Create a channel to sync with the test server
915 let (ready_tx, ready_rx) = std::sync::mpsc::channel();
916
917 // Start a ping-pong test server
918 let server_handle = start_ping_pong_server(ready_tx);
919
920 // Wait for the server to be ready and get its address
921 let server_addr = ready_rx.recv_timeout(Duration::from_secs(5)).unwrap();
922
923 // Create a client with short ping interval for faster testing
924 let config = TcpClientConfig {
925 connection_timeout: Duration::from_secs(1),
926 max_reconnect_attempts: Some(1),
927 base_reconnect_interval: Duration::from_millis(100),
928 ping_interval: Duration::from_millis(500), // Short ping interval for testing
929 pong_timeout: Duration::from_millis(1000), // 1 second timeout
930 ..TcpClientConfig::default()
931 };
932 let client = TcpClient::new(config);
933
934 // Subscribe to receive events
935 let subscription_request = TcpSubscriptionRequest {
936 subscribe: vec![EventType::TransactionsAdded],
937 };
938
939 let _rx = client
940 .subscribe(format!("{}", server_addr), subscription_request)
941 .unwrap();
942
943 // Give it time to establish connection
944 thread::sleep(Duration::from_millis(200));
945
946 // Verify connection status is connected
947 assert_eq!(
948 client.get_status(),
949 ConnectionStatus::Connected,
950 "Client should be connected"
951 );
952
953 // Wait long enough for multiple ping/pong cycles
954 thread::sleep(Duration::from_secs(2));
955
956 // Verify still connected after ping/pong cycles
957 assert_eq!(
958 client.get_status(),
959 ConnectionStatus::Connected,
960 "Client should remain connected after ping/pong exchanges"
961 );
962
963 // Shutdown the client
964 client.shutdown_and_join();
965
966 // Wait for the server to finish
967 server_handle.join().unwrap();
968 }
969}