titan_client/tcp_client.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
use serde_json;
use thiserror::Error;
use titan_types::{Event, TcpSubscriptionRequest};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpStream,
sync::{mpsc, watch},
};
use tracing::{error, info, warn};
#[derive(Debug, Error)]
pub enum TcpClientError {
#[error("io error: {0}")]
IOError(#[from] std::io::Error),
#[error("serde error: {0}")]
SerdeError(#[from] serde_json::Error),
}
/// Asynchronous TCP client that encapsulates the shutdown signal.
pub struct AsyncTcpClient {
shutdown_tx: watch::Sender<()>,
shutdown_rx: watch::Receiver<()>,
}
impl AsyncTcpClient {
/// Creates a new instance of the async TCP client.
pub fn new() -> Self {
// Create a watch channel with an initial value.
let (shutdown_tx, shutdown_rx) = watch::channel(());
Self {
shutdown_tx,
shutdown_rx,
}
}
/// Subscribes to the TCP server at `addr` with the given subscription request.
/// Returns a channel receiver for incoming events.
pub async fn subscribe(
&self,
addr: &str,
subscription_request: TcpSubscriptionRequest,
) -> Result<mpsc::Receiver<Event>, TcpClientError> {
// Connect to the TCP server.
let stream = TcpStream::connect(addr).await?;
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
// Serialize the subscription request and send it.
let req_json = serde_json::to_string(&subscription_request)?;
writer.write_all(req_json.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
// Create a channel to forward events.
let (tx, rx) = mpsc::channel::<Event>(100);
// Clone the shutdown receiver for the spawned task.
let mut shutdown_rx = self.shutdown_rx.clone();
// Spawn a task to continuously read from the TCP stream.
tokio::spawn(async move {
// Keep writer in scope (if needed later).
let _writer_guard = writer;
let mut line = String::new();
loop {
line.clear();
tokio::select! {
// Read a line from the TCP connection.
result = reader.read_line(&mut line) => {
match result {
Ok(0) => {
// Connection closed.
warn!("TCP connection closed by server.");
break;
}
Ok(_) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<Event>(trimmed) {
Ok(event) => {
if let Err(e) = tx.send(event).await {
error!("Failed to send event to channel: {}", e);
break;
}
}
Err(e) => {
error!("Failed to parse event: {}. Line: {}", e, trimmed);
}
}
}
Err(e) => {
error!("Error reading from TCP socket: {}", e);
break;
}
}
}
// Check for shutdown signal.
_ = shutdown_rx.changed() => {
info!("Shutdown signal received. Exiting TCP subscription task.");
break;
}
}
}
info!("Exiting TCP subscription task.");
});
Ok(rx)
}
/// Signals the client to shut down by sending a signal through the watch channel.
pub fn shutdown(&self) {
if let Err(e) = self.shutdown_tx.send(()) {
error!("Failed to send shutdown signal: {:?}", e);
}
}
}