Skip to main content

voltdb_client_rust/
node.rs

1//! # VoltDB Node Connection
2//!
3//! This module provides the core TCP connection handling for communicating with VoltDB servers.
4//!
5//! ## Architecture
6//!
7//! The connection uses a single-threaded TCP listener design:
8//!
9//! - We spawn one dedicated thread to synchronously read from the TcpStream.
10//! - Each incoming message is dispatched via channels to the rest of the application,
11//!   allowing users to perform asynchronous operations on the received data.
12//! - Using `Mutex<TcpStream>` for single Stream would introduce blocking and contention,
13//!   because locking for each read/write would stall other operations. This design
14//!   avoids that while keeping the network I/O simple and efficient.
15//!
16//! ## Timeouts
17//!
18//! The module supports two types of timeouts:
19//!
20//! - **Connection timeout**: Limits how long the initial TCP connection attempt will wait.
21//!   Use [`OptsBuilder::connect_timeout`] or set [`NodeOpt::connect_timeout`].
22//! - **Read timeout**: Limits how long socket read operations will wait for data.
23//!   This affects both the authentication handshake and the background listener thread.
24//!   Use [`OptsBuilder::read_timeout`] or set [`NodeOpt::read_timeout`].
25//!
26//! ## Example
27//!
28//! ```no_run
29//! use voltdb_client_rust::{Opts, Pool};
30//! use std::time::Duration;
31//!
32//! // Create connection options with timeouts
33//! let opts = Opts::builder()
34//!     .host("localhost", 21212)
35//!     .connect_timeout(Duration::from_secs(5))
36//!     .read_timeout(Duration::from_secs(30))
37//!     .build()
38//!     .unwrap();
39//!
40//! // Create a connection pool
41//! let mut pool = Pool::new(opts)?;
42//! let mut conn = pool.get_conn()?;
43//!
44//! // Execute a query
45//! let result = conn.query("SELECT * FROM my_table")?;
46//! # Ok::<(), voltdb_client_rust::VoltError>(())
47//! ```
48
49use std::collections::HashMap;
50use std::fmt::{Debug, Formatter};
51use std::io::{Read, Write};
52use std::net::{Ipv4Addr, Shutdown, SocketAddr, TcpStream, ToSocketAddrs};
53use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
54use std::sync::mpsc::{Receiver, Sender};
55use std::sync::{Arc, Mutex, mpsc};
56use std::thread;
57use std::time::Duration;
58
59use bytebuffer::ByteBuffer;
60use byteorder::{BigEndian, ReadBytesExt};
61
62use crate::encode::{Value, VoltError};
63use crate::procedure_invocation::new_procedure_invocation;
64use crate::protocol::{PING_HANDLE, build_auth_message, parse_auth_response};
65use crate::response::VoltResponseInfo;
66use crate::table::{VoltTable, new_volt_table};
67use crate::volt_param;
68
69// ============================================================================
70// Logging macros - use tracing if available, otherwise no-op
71// ============================================================================
72
73#[cfg(feature = "tracing")]
74macro_rules! node_error {
75    ($($arg:tt)*) => { tracing::error!($($arg)*) };
76}
77#[cfg(not(feature = "tracing"))]
78macro_rules! node_error {
79    ($($arg:tt)*) => {};
80}
81
82/// Connection options for VoltDB client.
83///
84/// This struct encapsulates all configuration options needed to establish
85/// connections to a VoltDB cluster. Use [`Opts::builder()`] for a fluent
86/// configuration API or [`Opts::new()`] for simple configurations.
87///
88/// # Example
89/// ```no_run
90/// use voltdb_client_rust::{Opts, IpPort};
91///
92/// // Simple configuration
93/// let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
94/// let opts = Opts::new(hosts);
95///
96/// // Or use the builder for more options
97/// let opts = Opts::builder()
98///     .host("localhost", 21212)
99///     .user("admin")
100///     .password("secret")
101///     .build()
102///     .unwrap();
103/// ```
104#[derive(Clone, Eq, PartialEq, Debug)]
105pub struct Opts(pub(crate) Box<InnerOpts>);
106
107/// Host and port pair for VoltDB server connections.
108///
109/// Represents a single VoltDB server endpoint. Multiple `IpPort` instances
110/// can be used with connection pools for cluster connectivity.
111///
112/// # Example
113/// ```
114/// use voltdb_client_rust::IpPort;
115///
116/// let endpoint = IpPort::new("192.168.1.100".to_string(), 21212);
117/// ```
118#[derive(Debug, Clone, Eq, PartialEq)]
119pub struct IpPort {
120    pub(crate) ip_host: String,
121    pub(crate) port: u16,
122}
123
124impl IpPort {
125    /// Creates a new `IpPort` with the given hostname/IP and port.
126    ///
127    /// # Arguments
128    /// * `ip_host` - Hostname or IP address of the VoltDB server
129    /// * `port` - Port number (typically 21212 for client connections)
130    pub fn new(ip_host: String, port: u16) -> Self {
131        IpPort { ip_host, port }
132    }
133}
134
135impl Opts {
136    /// Creates connection options with the given hosts and default settings.
137    ///
138    /// This is a convenience constructor for simple configurations without
139    /// authentication or timeouts. For more control, use [`Opts::builder()`].
140    ///
141    /// # Arguments
142    /// * `hosts` - List of VoltDB server endpoints to connect to
143    pub fn new(hosts: Vec<IpPort>) -> Opts {
144        Opts(Box::new(InnerOpts {
145            ip_ports: hosts,
146            user: None,
147            pass: None,
148            connect_timeout: None,
149            read_timeout: None,
150        }))
151    }
152
153    /// Creates a new [`OptsBuilder`] for fluent configuration.
154    ///
155    /// # Example
156    /// ```no_run
157    /// use voltdb_client_rust::Opts;
158    /// use std::time::Duration;
159    ///
160    /// let opts = Opts::builder()
161    ///     .host("localhost", 21212)
162    ///     .connect_timeout(Duration::from_secs(5))
163    ///     .build()
164    ///     .unwrap();
165    /// ```
166    pub fn builder() -> OptsBuilder {
167        OptsBuilder::default()
168    }
169}
170
171/// Builder for connection options.
172///
173/// # Example
174/// ```no_run
175/// use voltdb_client_rust::{Opts, IpPort};
176/// use std::time::Duration;
177///
178/// let opts = Opts::builder()
179///     .host("localhost", 21212)
180///     .user("admin")
181///     .password("password")
182///     .connect_timeout(Duration::from_secs(10))
183///     .build()
184///     .unwrap();
185/// ```
186#[derive(Debug, Clone, Default)]
187pub struct OptsBuilder {
188    hosts: Vec<IpPort>,
189    user: Option<String>,
190    pass: Option<String>,
191    connect_timeout: Option<Duration>,
192    read_timeout: Option<Duration>,
193}
194
195impl OptsBuilder {
196    /// Add a host to connect to.
197    pub fn host(mut self, ip: &str, port: u16) -> Self {
198        self.hosts.push(IpPort::new(ip.to_string(), port));
199        self
200    }
201
202    /// Add multiple hosts to connect to.
203    pub fn hosts(mut self, hosts: Vec<IpPort>) -> Self {
204        self.hosts.extend(hosts);
205        self
206    }
207
208    /// Set the username for authentication.
209    pub fn user(mut self, user: &str) -> Self {
210        self.user = Some(user.to_string());
211        self
212    }
213
214    /// Set the password for authentication.
215    pub fn password(mut self, pass: &str) -> Self {
216        self.pass = Some(pass.to_string());
217        self
218    }
219
220    /// Set connection timeout.
221    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
222        self.connect_timeout = Some(timeout);
223        self
224    }
225
226    /// Set read timeout for socket operations.
227    pub fn read_timeout(mut self, timeout: Duration) -> Self {
228        self.read_timeout = Some(timeout);
229        self
230    }
231
232    /// Build the Opts.
233    ///
234    /// Returns an error if no hosts are configured.
235    pub fn build(self) -> Result<Opts, VoltError> {
236        if self.hosts.is_empty() {
237            return Err(VoltError::InvalidConfig);
238        }
239        Ok(Opts(Box::new(InnerOpts {
240            ip_ports: self.hosts,
241            user: self.user,
242            pass: self.pass,
243            connect_timeout: self.connect_timeout,
244            read_timeout: self.read_timeout,
245        })))
246    }
247}
248
249#[derive(Debug, Clone, Eq, PartialEq)]
250pub(crate) struct InnerOpts {
251    pub(crate) ip_ports: Vec<IpPort>,
252    pub(crate) user: Option<String>,
253    pub(crate) pass: Option<String>,
254    pub(crate) connect_timeout: Option<Duration>,
255    pub(crate) read_timeout: Option<Duration>,
256}
257
258/// Options for creating a single [`Node`] connection.
259///
260/// This struct holds the connection parameters for establishing a TCP connection
261/// to a VoltDB server node.
262///
263/// # Example
264/// ```no_run
265/// use voltdb_client_rust::{NodeOpt, IpPort};
266/// use std::time::Duration;
267///
268/// let opt = NodeOpt {
269///     ip_port: IpPort::new("localhost".to_string(), 21212),
270///     user: Some("admin".to_string()),
271///     pass: Some("password".to_string()),
272///     connect_timeout: Some(Duration::from_secs(10)),
273///     read_timeout: Some(Duration::from_secs(30)),
274/// };
275/// ```
276pub struct NodeOpt {
277    /// The host and port to connect to.
278    pub ip_port: IpPort,
279    /// Optional username for authentication.
280    pub user: Option<String>,
281    /// Optional password for authentication.
282    pub pass: Option<String>,
283    /// Connection timeout. If `None`, the connection attempt will block indefinitely.
284    pub connect_timeout: Option<Duration>,
285    /// Read timeout for socket operations. If `None`, reads will block indefinitely.
286    /// This affects the background listener thread that receives responses from the server.
287    pub read_timeout: Option<Duration>,
288}
289
290/// Type alias for the pending requests map.
291/// Maps request handles to their response channels.
292type PendingRequests = HashMap<i64, Sender<VoltTable>>;
293
294/// Marker trait for VoltDB connections.
295///
296/// Implemented by both synchronous [`Node`] and async `AsyncNode` connections.
297pub trait Connection: Sync + Send + 'static {}
298
299/// A single TCP connection to a VoltDB server node.
300///
301/// `Node` represents a persistent, stateful TCP connection used to execute
302/// stored procedures and queries against a VoltDB cluster. Each `Node`
303/// maintains its own socket and spawns a dedicated background thread to
304/// asynchronously receive and dispatch responses from the server.
305///
306/// # Concurrency
307///
308/// `Node` is safe to use concurrently and supports multiple in-flight requests
309/// over the same connection. For automatic reconnection, load balancing, or
310/// managing multiple connections, use [`crate::Pool`].
311///
312/// # Example
313///
314/// ```no_run
315/// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
316///
317/// let opt = NodeOpt {
318///     ip_port: IpPort::new("localhost".to_string(), 21212),
319///     user: None,
320///     pass: None,
321///     connect_timeout: None,
322///     read_timeout: None,
323/// };
324///
325/// let node = Node::new(opt)?;
326/// let rx = node.query("SELECT * FROM my_table")?;
327/// let table = block_for_result(&rx)?;
328/// # Ok::<(), voltdb_client_rust::VoltError>(())
329/// ```
330#[allow(dead_code)]
331pub struct Node {
332    /// Write-side of the TCP stream, protected by a mutex for thread-safe writes.
333    /// Multiple threads can call query/call_sp concurrently; writes are serialized.
334    write_stream: Mutex<Option<TcpStream>>,
335    info: ConnInfo,
336    /// Pending requests awaiting responses. Uses Mutex instead of RwLock since
337    /// both insert (main thread) and remove (listener thread) require exclusive access.
338    requests: Arc<Mutex<PendingRequests>>,
339    stop: Arc<Mutex<bool>>,
340    counter: AtomicI64,
341    /// Simple atomic lock for write operations. True = locked, False = unlocked.
342    write_lock: AtomicBool,
343    /// Handle for the background listener thread
344    listener_handle: Option<thread::JoinHandle<()>>,
345}
346
347impl Debug for Node {
348    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
349        write!(f, "Pending request: {}", 1)
350    }
351}
352
353impl Drop for Node {
354    fn drop(&mut self) {
355        let res = self.shutdown();
356        match res {
357            Ok(_) => {}
358            Err(_e) => {
359                node_error!(error = ?_e, "error during node shutdown");
360            }
361        }
362    }
363}
364
365impl Connection for Node {}
366
367impl Node {
368    /// Creates a new connection to a VoltDB server node.
369    ///
370    /// This method establishes a TCP connection to the specified host/port,
371    /// performs authentication, and spawns a background listener thread for
372    /// receiving asynchronous responses.
373    ///
374    /// # Arguments
375    /// * `opt` - Connection options including host, port, credentials, and timeouts.
376    ///
377    /// # Timeouts
378    /// * `connect_timeout` - If set, limits how long the connection attempt will wait.
379    ///   If not set, the connection attempt blocks indefinitely.
380    /// * `read_timeout` - If set, socket read operations will timeout after this duration.
381    ///   This affects both the authentication phase and the background listener thread.
382    ///
383    /// # Errors
384    /// Returns `VoltError` if:
385    /// * The connection cannot be established (network error or timeout)
386    /// * DNS resolution fails
387    /// * Authentication fails
388    /// * The server rejects the connection
389    ///
390    /// # Example
391    /// ```no_run
392    /// use voltdb_client_rust::{Node, NodeOpt, IpPort};
393    /// use std::time::Duration;
394    ///
395    /// let opt = NodeOpt {
396    ///     ip_port: IpPort::new("localhost".to_string(), 21212),
397    ///     user: None,
398    ///     pass: None,
399    ///     connect_timeout: Some(Duration::from_secs(5)),
400    ///     read_timeout: Some(Duration::from_secs(30)),
401    /// };
402    /// let node = Node::new(opt)?;
403    /// # Ok::<(), voltdb_client_rust::VoltError>(())
404    /// ```
405    pub fn new(opt: NodeOpt) -> Result<Node, VoltError> {
406        let ip_host = opt.ip_port;
407        let addr_str = format!("{}:{}", ip_host.ip_host, ip_host.port);
408
409        // Build authentication message using shared protocol code
410        let auth_msg = build_auth_message(opt.user.as_deref(), opt.pass.as_deref())?;
411
412        // Connect to server with optional timeout
413        let mut stream: TcpStream = match opt.connect_timeout {
414            Some(timeout) => {
415                // Resolve address for connect_timeout (requires SocketAddr)
416                let socket_addr: SocketAddr = addr_str
417                    .to_socket_addrs()
418                    .map_err(|_| VoltError::InvalidConfig)?
419                    .find(|s| s.is_ipv4())
420                    .ok_or(VoltError::InvalidConfig)?;
421                TcpStream::connect_timeout(&socket_addr, timeout)?
422            }
423            None => TcpStream::connect(&addr_str)?,
424        };
425
426        // Set read timeout if configured
427        if let Some(read_timeout) = opt.read_timeout {
428            stream.set_read_timeout(Some(read_timeout))?;
429        }
430
431        // Send auth request
432        stream.write_all(&auth_msg)?;
433        stream.flush()?;
434
435        // Read auth response
436        let read = stream.read_u32::<BigEndian>()?;
437        let mut all = vec![0; read as usize];
438        stream.read_exact(&mut all)?;
439
440        // Parse auth response using shared protocol code
441        let info = parse_auth_response(&all)?;
442
443        // Clone the stream for the read side (listener thread).
444        // Set a read timeout on the read clone so the listener thread can
445        // periodically check the stop flag (required on Linux where
446        // shutdown() on the write clone may not unblock the read clone).
447        let read_stream = stream.try_clone()?;
448        let listener_timeout = opt.read_timeout.unwrap_or(Duration::from_secs(2));
449        read_stream.set_read_timeout(Some(listener_timeout))?;
450
451        let requests = Arc::new(Mutex::new(HashMap::new()));
452        let stop = Arc::new(Mutex::new(false));
453
454        // Start the listener thread with the read side
455        let handle = Self::start_listener(read_stream, Arc::clone(&requests), Arc::clone(&stop));
456
457        Ok(Node {
458            stop,
459            write_stream: Mutex::new(Some(stream)),
460            info,
461            requests,
462            counter: AtomicI64::new(1),
463            write_lock: AtomicBool::new(false),
464            listener_handle: Some(handle),
465        })
466    }
467    /// Returns the next unique sequence number for request tracking.
468    ///
469    /// Each request to VoltDB uses a unique handle (sequence number) for
470    /// matching responses to requests.
471    pub fn get_sequence(&self) -> i64 {
472        self.counter.fetch_add(1, Ordering::Relaxed)
473    }
474
475    /// Lists all stored procedures available in the VoltDB database.
476    ///
477    /// This calls the `@SystemCatalog` system procedure with "PROCEDURES" argument.
478    ///
479    /// # Returns
480    /// A receiver that will yield a `VoltTable` containing procedure metadata.
481    pub fn list_procedures(&self) -> Result<Receiver<VoltTable>, VoltError> {
482        self.call_sp("@SystemCatalog", volt_param!("PROCEDURES"))
483    }
484
485    /// Executes a stored procedure with the given parameters.
486    ///
487    /// # Arguments
488    /// * `query` - The name of the stored procedure (e.g., "@AdHoc", "MyProcedure")
489    /// * `param` - Vector of parameter values. Use [`volt_param!`] macro for convenience.
490    ///
491    /// # Returns
492    /// A receiver that will yield the result `VoltTable` when available.
493    ///
494    /// # Example
495    /// ```no_run
496    /// use voltdb_client_rust::{Node, NodeOpt, IpPort, Value, block_for_result, volt_param};
497    ///
498    /// # let opt = NodeOpt {
499    /// #     ip_port: IpPort::new("localhost".to_string(), 21212),
500    /// #     user: None, pass: None, connect_timeout: None, read_timeout: None,
501    /// # };
502    /// let node = Node::new(opt)?;
503    /// let id = 1i32;
504    /// let name = "test".to_string();
505    /// let rx = node.call_sp("MyProcedure", volt_param![id, name])?;
506    /// let result = block_for_result(&rx)?;
507    /// # Ok::<(), voltdb_client_rust::VoltError>(())
508    /// ```
509    pub fn call_sp(
510        &self,
511        query: &str,
512        param: Vec<&dyn Value>,
513    ) -> Result<Receiver<VoltTable>, VoltError> {
514        let handle = self.get_sequence();
515        let mut proc = new_procedure_invocation(handle, false, &param, query);
516        let (tx, rx): (Sender<VoltTable>, Receiver<VoltTable>) = mpsc::channel();
517
518        // Register the response channel before sending the request
519        self.requests.lock()?.insert(handle, tx);
520        let bs = proc.bytes();
521        // Write to stream while holding the lock
522        let result = {
523            let mut stream_guard = self.write_stream.lock()?;
524            match stream_guard.as_mut() {
525                None => Err(VoltError::ConnectionNotAvailable),
526                Some(stream) => {
527                    stream.write_all(&bs)?;
528                    Ok(rx)
529                }
530            }
531        };
532
533        // Release write lock
534        self.write_lock.store(false, Ordering::Release);
535
536        result
537    }
538
539    /// Uploads a JAR file containing stored procedure classes to VoltDB.
540    ///
541    /// This calls the `@UpdateClasses` system procedure to deploy new classes.
542    ///
543    /// # Arguments
544    /// * `bs` - The JAR file contents as bytes
545    pub fn upload_jar(&self, bs: Vec<u8>) -> Result<Receiver<VoltTable>, VoltError> {
546        self.call_sp("@UpdateClasses", volt_param!(bs, ""))
547    }
548
549    /// Executes an ad-hoc SQL query.
550    ///
551    /// This is a convenience method that calls the `@AdHoc` system procedure.
552    ///
553    /// # Arguments
554    /// * `sql` - The SQL query string to execute
555    ///
556    /// # Returns
557    /// A receiver that will yield the result `VoltTable` when available.
558    ///
559    /// # Example
560    /// ```no_run
561    /// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
562    ///
563    /// # let opt = NodeOpt {
564    /// #     ip_port: IpPort::new("localhost".to_string(), 21212),
565    /// #     user: None, pass: None, connect_timeout: None, read_timeout: None,
566    /// # };
567    /// let node = Node::new(opt)?;
568    /// let rx = node.query("SELECT COUNT(*) FROM users")?;
569    /// let result = block_for_result(&rx)?;
570    /// # Ok::<(), voltdb_client_rust::VoltError>(())
571    /// ```
572    pub fn query(&self, sql: &str) -> Result<Receiver<VoltTable>, VoltError> {
573        let zero_vec: Vec<&dyn Value> = vec![&sql];
574        self.call_sp("@AdHoc", zero_vec)
575    }
576
577    /// Sends a ping to the VoltDB server.
578    ///
579    /// This can be used to keep the connection alive or verify connectivity.
580    /// The ping response is handled internally and not returned to the caller.
581    pub fn ping(&self) -> Result<(), VoltError> {
582        let zero_vec: Vec<&dyn Value> = Vec::new();
583        let mut proc = new_procedure_invocation(PING_HANDLE, false, &zero_vec, "@Ping");
584        let bs = proc.bytes();
585
586        // Acquire write lock (spin until acquired)
587        while self
588            .write_lock
589            .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
590            .is_err()
591        {
592            std::hint::spin_loop();
593        }
594
595        let result = {
596            let mut stream_guard = self.write_stream.lock()?;
597            match stream_guard.as_mut() {
598                None => Err(VoltError::ConnectionNotAvailable),
599                Some(stream) => {
600                    stream.write_all(&bs)?;
601                    Ok(())
602                }
603            }
604        };
605
606        // Release write lock
607        self.write_lock.store(false, Ordering::Release);
608
609        result
610    }
611
612    /// Reads and processes a single message from the TCP stream.
613    ///
614    /// # Arguments
615    /// * `tcp` - The TCP stream to read from
616    /// * `requests` - Map of pending requests awaiting responses
617    /// * `buffer` - Reusable buffer for reading message data (reduces allocations)
618    fn job(
619        tcp: &mut impl Read,
620        requests: &Arc<Mutex<PendingRequests>>,
621        buffer: &mut Vec<u8>,
622    ) -> Result<(), VoltError> {
623        // Read message length (4 bytes, big-endian)
624        let msg_len = tcp.read_u32::<BigEndian>()?;
625        if msg_len == 0 {
626            return Ok(());
627        }
628
629        // Reuse buffer: resize if needed, but capacity is retained
630        buffer.resize(msg_len as usize, 0);
631        tcp.read_exact(buffer)?;
632
633        let mut res = ByteBuffer::from_bytes(buffer);
634        // Skip protocol version byte (always 0 for current protocol)
635        let _ = res.read_u8()?;
636        let handle = res.read_i64()?;
637
638        if handle == PING_HANDLE {
639            return Ok(()); // Ping response, nothing else to do
640        }
641
642        if let Some(sender) = requests.lock()?.remove(&handle) {
643            let info = VoltResponseInfo::new(&mut res, handle)?;
644            let table = new_volt_table(&mut res, info)?;
645            // Ignore send error - receiver may have been dropped if caller
646            // timed out or cancelled the request
647            let _ = sender.send(table);
648        }
649
650        Ok(())
651    }
652    /// Gracefully shuts down the connection.
653    ///
654    /// This stops the background listener thread and closes the TCP connection.
655    /// The `Node` will be unusable after calling this method.
656    ///
657    /// Note: This is automatically called when the `Node` is dropped.
658    pub fn shutdown(&mut self) -> Result<(), VoltError> {
659        {
660            let mut stop = self.stop.lock()?;
661            *stop = true;
662        } // release stop lock before any join — listener thread needs it
663
664        let mut stream_guard = self.write_stream.lock()?;
665        if let Some(stream) = stream_guard.take() {
666            let _ = stream.shutdown(Shutdown::Both);
667        }
668        drop(stream_guard); // release lock before join
669
670        if let Some(handle) = self.listener_handle.take() {
671            let _ = handle.join();
672        }
673        Ok(())
674    }
675
676    /// Starts the background listener thread for receiving responses.
677    ///
678    /// This is a static method that takes ownership of the read-side stream.
679    fn start_listener(
680        mut tcp: TcpStream,
681        requests: Arc<Mutex<PendingRequests>>,
682        stopping: Arc<Mutex<bool>>,
683    ) -> thread::JoinHandle<()> {
684        thread::spawn(move || {
685            // Reusable buffer to reduce allocation pressure.
686            // Starts with 4KB capacity, grows as needed but rarely shrinks.
687            let mut buffer = Vec::with_capacity(4096);
688
689            loop {
690                // Check stop flag before blocking on read
691                let should_stop = stopping
692                    .lock()
693                    .unwrap_or_else(|poisoned| poisoned.into_inner());
694                if *should_stop {
695                    break;
696                }
697                drop(should_stop); // Release lock before blocking on I/O
698
699                if let Err(_err) = Node::job(&mut tcp, &requests, &mut buffer) {
700                    // Timeout errors are expected — the read timeout lets us
701                    // periodically check the stop flag. Just loop back.
702                    if let VoltError::Io(ref io_err) = _err {
703                        if io_err.kind() == std::io::ErrorKind::WouldBlock
704                            || io_err.kind() == std::io::ErrorKind::TimedOut
705                        {
706                            continue;
707                        }
708                    }
709                    // Only log non-timeout errors if we're not intentionally stopping
710                    let is_stopping = stopping
711                        .lock()
712                        .unwrap_or_else(|poisoned| poisoned.into_inner());
713                    if !*is_stopping {
714                        node_error!(error = %_err, "VoltDB listener error");
715                    }
716                    break; // Exit on non-timeout errors
717                }
718            }
719        })
720    }
721}
722
723/// Connection metadata returned by the VoltDB server during authentication.
724///
725/// This struct contains information about the server that the client connected to,
726/// including the host ID, connection ID, and cluster leader address.
727#[derive(Debug, Clone)]
728pub struct ConnInfo {
729    /// The ID of the host in the VoltDB cluster.
730    pub host_id: i32,
731    /// Unique connection identifier assigned by the server.
732    pub connection: i64,
733    /// IPv4 address of the cluster leader node.
734    pub leader_addr: Ipv4Addr,
735    /// VoltDB server build/version string.
736    pub build: String,
737}
738
739impl Default for ConnInfo {
740    fn default() -> Self {
741        Self {
742            host_id: 0,
743            connection: 0,
744            leader_addr: Ipv4Addr::new(127, 0, 0, 1),
745            build: String::new(),
746        }
747    }
748}
749
750/// Blocks until a response is received and converts any VoltDB errors.
751///
752/// This is a convenience function for synchronous usage. It waits for the
753/// response on the channel and converts VoltDB-level errors (from the response)
754/// into `VoltError`.
755///
756/// # Arguments
757/// * `res` - The receiver from a query or stored procedure call
758///
759/// # Returns
760/// The result `VoltTable` on success, or `VoltError` if the operation failed.
761///
762/// # Example
763/// ```no_run
764/// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
765///
766/// # let opt = NodeOpt {
767/// #     ip_port: IpPort::new("localhost".to_string(), 21212),
768/// #     user: None, pass: None, connect_timeout: None, read_timeout: None,
769/// # };
770/// let node = Node::new(opt)?;
771/// let rx = node.query("SELECT * FROM users")?;
772/// let mut table = block_for_result(&rx)?;
773///
774/// while table.advance_row() {
775///     // Process rows...
776/// }
777/// # Ok::<(), voltdb_client_rust::VoltError>(())
778/// ```
779pub fn block_for_result(res: &Receiver<VoltTable>) -> Result<VoltTable, VoltError> {
780    let mut table = res.recv()?;
781    let err = table.has_error();
782    match err {
783        None => Ok(table),
784        Some(err) => Err(err),
785    }
786}
787
788pub fn reset() {}
789
790/// Creates a new connection to a VoltDB server using an address string.
791///
792/// This is a convenience function that parses the address string and creates
793/// a connection with default settings (no authentication, no timeouts).
794///
795/// # Arguments
796/// * `addr` - The server address in "host:port" format (e.g., "localhost:21212")
797///
798/// # Errors
799/// Returns `VoltError::InvalidConfig` if the address cannot be parsed or resolved.
800///
801/// # Example
802/// ```no_run
803/// use voltdb_client_rust::get_node;
804///
805/// let node = get_node("localhost:21212")?;
806/// # Ok::<(), voltdb_client_rust::VoltError>(())
807/// ```
808pub fn get_node(addr: &str) -> Result<Node, VoltError> {
809    let addrs = addr
810        .to_socket_addrs()
811        .map_err(|_| VoltError::InvalidConfig)?;
812
813    let socket_addr = addrs
814        .into_iter()
815        .find(|s| s.is_ipv4())
816        .ok_or(VoltError::InvalidConfig)?;
817
818    let ip_port = IpPort::new(socket_addr.ip().to_string(), socket_addr.port());
819
820    let opt = NodeOpt {
821        ip_port,
822        user: None,
823        pass: None,
824        connect_timeout: None,
825        read_timeout: None,
826    };
827    Node::new(opt)
828}
829
830#[cfg(test)]
831mod tests {
832    use super::*;
833
834    #[test]
835    fn test_opts_builder_basic() {
836        let opts = Opts::builder().host("localhost", 21212).build().unwrap();
837
838        assert_eq!(opts.0.ip_ports.len(), 1);
839        assert_eq!(opts.0.ip_ports[0].ip_host, "localhost");
840        assert_eq!(opts.0.ip_ports[0].port, 21212);
841        assert!(opts.0.user.is_none());
842        assert!(opts.0.pass.is_none());
843    }
844
845    #[test]
846    fn test_opts_builder_with_auth() {
847        let opts = Opts::builder()
848            .host("127.0.0.1", 21211)
849            .user("admin")
850            .password("secret")
851            .build()
852            .unwrap();
853
854        assert_eq!(opts.0.user, Some("admin".to_string()));
855        assert_eq!(opts.0.pass, Some("secret".to_string()));
856    }
857
858    #[test]
859    fn test_opts_builder_multiple_hosts() {
860        let opts = Opts::builder()
861            .host("host1", 21212)
862            .host("host2", 21212)
863            .host("host3", 21212)
864            .build()
865            .unwrap();
866
867        assert_eq!(opts.0.ip_ports.len(), 3);
868        assert_eq!(opts.0.ip_ports[0].ip_host, "host1");
869        assert_eq!(opts.0.ip_ports[1].ip_host, "host2");
870        assert_eq!(opts.0.ip_ports[2].ip_host, "host3");
871    }
872
873    #[test]
874    fn test_opts_builder_with_hosts_vec() {
875        let hosts = vec![
876            IpPort::new("node1".to_string(), 21212),
877            IpPort::new("node2".to_string(), 21213),
878        ];
879        let opts = Opts::builder().hosts(hosts).build().unwrap();
880
881        assert_eq!(opts.0.ip_ports.len(), 2);
882    }
883
884    #[test]
885    fn test_opts_builder_with_timeouts() {
886        let opts = Opts::builder()
887            .host("localhost", 21212)
888            .connect_timeout(Duration::from_secs(10))
889            .read_timeout(Duration::from_secs(30))
890            .build()
891            .unwrap();
892
893        assert_eq!(opts.0.connect_timeout, Some(Duration::from_secs(10)));
894        assert_eq!(opts.0.read_timeout, Some(Duration::from_secs(30)));
895    }
896
897    #[test]
898    fn test_opts_builder_no_hosts_fails() {
899        let result = Opts::builder().build();
900        assert!(result.is_err());
901        match result {
902            Err(VoltError::InvalidConfig) => {}
903            _ => panic!("Expected InvalidConfig error"),
904        }
905    }
906
907    #[test]
908    fn test_opts_new_compatibility() {
909        let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
910        let opts = Opts::new(hosts);
911
912        assert_eq!(opts.0.ip_ports.len(), 1);
913        assert!(opts.0.user.is_none());
914        assert!(opts.0.connect_timeout.is_none());
915    }
916
917    #[test]
918    fn test_ip_port_new() {
919        let ip_port = IpPort::new("192.168.1.1".to_string(), 8080);
920        assert_eq!(ip_port.ip_host, "192.168.1.1");
921        assert_eq!(ip_port.port, 8080);
922    }
923}