titan_client/
tcp_client_blocking.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use std::{
    io::{BufRead, BufReader, Write},
    net::TcpStream,
    sync::{
        atomic::{AtomicBool, Ordering},
        mpsc, Arc,
    },
    thread,
    time::Duration,
};

use serde_json;
use thiserror::Error;
#[cfg(feature = "tcp_client")]
use titan_types::{Event, TcpSubscriptionRequest};
use tracing::{error, info, warn};

#[derive(Debug, Error)]
pub enum TcpClientError {
    #[error("io error: {0}")]
    IOError(#[from] std::io::Error),
    #[error("serde error: {0}")]
    SerdeError(#[from] serde_json::Error),
}

/// Synchronous TCP subscription listener.
///
/// Connects to the TCP server at `addr` and sends the given subscription request
/// (encoded as JSON). It then spawns a dedicated thread that reads lines from the TCP
/// connection using non-blocking mode. If no data is available, it sleeps briefly and
/// then checks the shutdown flag again.
///
/// The listener will continue until either the TCP connection is closed or the provided
/// `shutdown` method is called.
///
/// # Arguments
///
/// * `addr` - The address of the TCP subscription server (e.g., "127.0.0.1:9000").
/// * `subscription_request` - The subscription request to send to the server.
///
/// # Returns
///
/// A `Result` containing a `std::sync::mpsc::Receiver<Event>` that will receive events from the server,
/// or an error.
#[cfg(feature = "tcp_client_blocking")]
pub struct TcpClient {
    shutdown_flag: Arc<AtomicBool>,
}

#[cfg(feature = "tcp_client_blocking")]
impl TcpClient {
    pub fn new() -> Self {
        Self {
            shutdown_flag: Arc::new(AtomicBool::new(false)),
        }
    }

    pub fn subscribe(
        &self,
        addr: &str,
        subscription_request: TcpSubscriptionRequest,
    ) -> Result<mpsc::Receiver<Event>, TcpClientError> {
        let shutdown_flag = self.shutdown_flag.clone();
        subscribe(addr, subscription_request, shutdown_flag)
    }

    pub fn shutdown(&self) {
        self.shutdown_flag.store(true, Ordering::SeqCst);
    }
}


fn subscribe(
    addr: &str,
    subscription_request: TcpSubscriptionRequest,
    shutdown_flag: Arc<AtomicBool>,
) -> Result<mpsc::Receiver<Event>, TcpClientError> {
    // Connect to the TCP server.
    let mut stream = TcpStream::connect(addr)?;
    // Set the stream to non-blocking mode.
    stream.set_nonblocking(true)?;

    // Clone the stream for reading.
    let reader_stream = stream.try_clone()?;
    let mut reader = BufReader::new(reader_stream);

    // Serialize the subscription request to JSON and send it.
    let req_json = serde_json::to_string(&subscription_request)?;
    stream.write_all(req_json.as_bytes())?;
    stream.write_all(b"\n")?;
    stream.flush()?;

    // Create a standard mpsc channel to forward events.
    let (tx, rx) = mpsc::channel::<Event>();

    // Spawn a thread to read events from the TCP connection.
    thread::spawn(move || {
        let mut line = String::new();
        loop {
            // Check if shutdown has been signaled.
            if shutdown_flag.load(Ordering::SeqCst) {
                info!("Shutdown flag set. Exiting subscription thread.");
                break;
            }

            line.clear();
            match reader.read_line(&mut line) {
                Ok(0) => {
                    // Connection closed.
                    warn!("TCP connection closed by server.");
                    break;
                }
                Ok(_) => {
                    let trimmed = line.trim();
                    if trimmed.is_empty() {
                        continue;
                    }
                    // Deserialize the JSON line into an Event.
                    match serde_json::from_str::<Event>(trimmed) {
                        Ok(event) => {
                            if tx.send(event).is_err() {
                                error!("Receiver dropped. Exiting subscription thread.");
                                break;
                            }
                        }
                        Err(e) => {
                            error!("Failed to parse event: {}. Line: {}", e, trimmed);
                        }
                    }
                }
                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                    // No data available right now.
                    thread::sleep(Duration::from_millis(100));
                    continue;
                }
                Err(e) => {
                    error!("Error reading from TCP socket: {}", e);
                    break;
                }
            }
        }
        info!("Exiting TCP subscription thread.");
    });

    Ok(rx)
}