titan_client/tcp_client_blocking.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
use std::{
io::{BufRead, BufReader, Write},
net::TcpStream,
sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc,
},
thread,
time::Duration,
};
use serde_json;
use thiserror::Error;
#[cfg(feature = "tcp_client")]
use titan_types::{Event, TcpSubscriptionRequest};
use tracing::{error, info, warn};
#[derive(Debug, Error)]
pub enum TcpClientError {
#[error("io error: {0}")]
IOError(#[from] std::io::Error),
#[error("serde error: {0}")]
SerdeError(#[from] serde_json::Error),
}
/// Synchronous TCP subscription listener.
///
/// Connects to the TCP server at `addr` and sends the given subscription request
/// (encoded as JSON). It then spawns a dedicated thread that reads lines from the TCP
/// connection using non-blocking mode. If no data is available, it sleeps briefly and
/// then checks the shutdown flag again.
///
/// The listener will continue until either the TCP connection is closed or the provided
/// `shutdown` method is called.
///
/// # Arguments
///
/// * `addr` - The address of the TCP subscription server (e.g., "127.0.0.1:9000").
/// * `subscription_request` - The subscription request to send to the server.
///
/// # Returns
///
/// A `Result` containing a `std::sync::mpsc::Receiver<Event>` that will receive events from the server,
/// or an error.
#[cfg(feature = "tcp_client_blocking")]
pub struct TcpClient {
shutdown_flag: Arc<AtomicBool>,
}
#[cfg(feature = "tcp_client_blocking")]
impl TcpClient {
pub fn new() -> Self {
Self {
shutdown_flag: Arc::new(AtomicBool::new(false)),
}
}
pub fn subscribe(
&self,
addr: &str,
subscription_request: TcpSubscriptionRequest,
) -> Result<mpsc::Receiver<Event>, TcpClientError> {
let shutdown_flag = self.shutdown_flag.clone();
subscribe(addr, subscription_request, shutdown_flag)
}
pub fn shutdown(&self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
}
}
fn subscribe(
addr: &str,
subscription_request: TcpSubscriptionRequest,
shutdown_flag: Arc<AtomicBool>,
) -> Result<mpsc::Receiver<Event>, TcpClientError> {
// Connect to the TCP server.
let mut stream = TcpStream::connect(addr)?;
// Set the stream to non-blocking mode.
stream.set_nonblocking(true)?;
// Clone the stream for reading.
let reader_stream = stream.try_clone()?;
let mut reader = BufReader::new(reader_stream);
// Serialize the subscription request to JSON and send it.
let req_json = serde_json::to_string(&subscription_request)?;
stream.write_all(req_json.as_bytes())?;
stream.write_all(b"\n")?;
stream.flush()?;
// Create a standard mpsc channel to forward events.
let (tx, rx) = mpsc::channel::<Event>();
// Spawn a thread to read events from the TCP connection.
thread::spawn(move || {
let mut line = String::new();
loop {
// Check if shutdown has been signaled.
if shutdown_flag.load(Ordering::SeqCst) {
info!("Shutdown flag set. Exiting subscription thread.");
break;
}
line.clear();
match reader.read_line(&mut line) {
Ok(0) => {
// Connection closed.
warn!("TCP connection closed by server.");
break;
}
Ok(_) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
// Deserialize the JSON line into an Event.
match serde_json::from_str::<Event>(trimmed) {
Ok(event) => {
if tx.send(event).is_err() {
error!("Receiver dropped. Exiting subscription thread.");
break;
}
}
Err(e) => {
error!("Failed to parse event: {}. Line: {}", e, trimmed);
}
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// No data available right now.
thread::sleep(Duration::from_millis(100));
continue;
}
Err(e) => {
error!("Error reading from TCP socket: {}", e);
break;
}
}
}
info!("Exiting TCP subscription thread.");
});
Ok(rx)
}