Skip to main content

voltdb_client_rust/
node.rs

1//! # VoltDB Node Connection
2//!
3//! This module provides the core TCP connection handling for communicating with VoltDB servers.
4//!
5//! ## Architecture
6//!
7//! The connection uses a single-threaded TCP listener design:
8//!
9//! - We spawn one dedicated thread to synchronously read from the TcpStream.
10//! - Each incoming message is dispatched via channels to the rest of the application,
11//!   allowing users to perform asynchronous operations on the received data.
12//! - Using `Mutex<TcpStream>` for single Stream would introduce blocking and contention,
13//!   because locking for each read/write would stall other operations. This design
14//!   avoids that while keeping the network I/O simple and efficient.
15//!
16//! ## Timeouts
17//!
18//! The module supports two types of timeouts:
19//!
20//! - **Connection timeout**: Limits how long the initial TCP connection attempt will wait.
21//!   Use [`OptsBuilder::connect_timeout`] or set [`NodeOpt::connect_timeout`].
22//! - **Read timeout**: Limits how long socket read operations will wait for data.
23//!   This affects both the authentication handshake and the background listener thread.
24//!   Use [`OptsBuilder::read_timeout`] or set [`NodeOpt::read_timeout`].
25//!
26//! ## Example
27//!
28//! ```no_run
29//! use voltdb_client_rust::{Opts, Pool};
30//! use std::time::Duration;
31//!
32//! // Create connection options with timeouts
33//! let opts = Opts::builder()
34//!     .host("localhost", 21212)
35//!     .connect_timeout(Duration::from_secs(5))
36//!     .read_timeout(Duration::from_secs(30))
37//!     .build()
38//!     .unwrap();
39//!
40//! // Create a connection pool
41//! let mut pool = Pool::new(opts)?;
42//! let mut conn = pool.get_conn()?;
43//!
44//! // Execute a query
45//! let result = conn.query("SELECT * FROM my_table")?;
46//! # Ok::<(), voltdb_client_rust::VoltError>(())
47//! ```
48
49use std::collections::HashMap;
50use std::fmt::{Debug, Formatter};
51use std::io::{Read, Write};
52use std::net::{Ipv4Addr, Shutdown, SocketAddr, TcpStream, ToSocketAddrs};
53use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
54use std::sync::mpsc::{Receiver, Sender};
55use std::sync::{Arc, Mutex, mpsc};
56use std::thread;
57use std::time::Duration;
58
59use bytebuffer::ByteBuffer;
60use byteorder::{BigEndian, ReadBytesExt};
61
62use crate::encode::{Value, VoltError};
63use crate::procedure_invocation::new_procedure_invocation;
64use crate::protocol::{PING_HANDLE, build_auth_message, parse_auth_response};
65use crate::response::VoltResponseInfo;
66use crate::table::{VoltTable, new_volt_table};
67use crate::volt_param;
68
69// ============================================================================
70// Logging macros - use tracing if available, otherwise no-op
71// ============================================================================
72
73#[cfg(feature = "tracing")]
74macro_rules! node_error {
75    ($($arg:tt)*) => { tracing::error!($($arg)*) };
76}
77#[cfg(not(feature = "tracing"))]
78macro_rules! node_error {
79    ($($arg:tt)*) => {};
80}
81
82/// Connection options for VoltDB client.
83///
84/// This struct encapsulates all configuration options needed to establish
85/// connections to a VoltDB cluster. Use [`Opts::builder()`] for a fluent
86/// configuration API or [`Opts::new()`] for simple configurations.
87///
88/// # Example
89/// ```no_run
90/// use voltdb_client_rust::{Opts, IpPort};
91///
92/// // Simple configuration
93/// let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
94/// let opts = Opts::new(hosts);
95///
96/// // Or use the builder for more options
97/// let opts = Opts::builder()
98///     .host("localhost", 21212)
99///     .user("admin")
100///     .password("secret")
101///     .build()
102///     .unwrap();
103/// ```
104#[derive(Clone, Eq, PartialEq, Debug)]
105pub struct Opts(pub(crate) Box<InnerOpts>);
106
107/// Host and port pair for VoltDB server connections.
108///
109/// Represents a single VoltDB server endpoint. Multiple `IpPort` instances
110/// can be used with connection pools for cluster connectivity.
111///
112/// # Example
113/// ```
114/// use voltdb_client_rust::IpPort;
115///
116/// let endpoint = IpPort::new("192.168.1.100".to_string(), 21212);
117/// ```
118#[derive(Debug, Clone, Eq, PartialEq)]
119pub struct IpPort {
120    pub(crate) ip_host: String,
121    pub(crate) port: u16,
122}
123
124impl IpPort {
125    /// Creates a new `IpPort` with the given hostname/IP and port.
126    ///
127    /// # Arguments
128    /// * `ip_host` - Hostname or IP address of the VoltDB server
129    /// * `port` - Port number (typically 21212 for client connections)
130    pub fn new(ip_host: String, port: u16) -> Self {
131        IpPort { ip_host, port }
132    }
133}
134
135impl Opts {
136    /// Creates connection options with the given hosts and default settings.
137    ///
138    /// This is a convenience constructor for simple configurations without
139    /// authentication or timeouts. For more control, use [`Opts::builder()`].
140    ///
141    /// # Arguments
142    /// * `hosts` - List of VoltDB server endpoints to connect to
143    pub fn new(hosts: Vec<IpPort>) -> Opts {
144        Opts(Box::new(InnerOpts {
145            ip_ports: hosts,
146            user: None,
147            pass: None,
148            connect_timeout: None,
149            read_timeout: None,
150        }))
151    }
152
153    /// Creates a new [`OptsBuilder`] for fluent configuration.
154    ///
155    /// # Example
156    /// ```no_run
157    /// use voltdb_client_rust::Opts;
158    /// use std::time::Duration;
159    ///
160    /// let opts = Opts::builder()
161    ///     .host("localhost", 21212)
162    ///     .connect_timeout(Duration::from_secs(5))
163    ///     .build()
164    ///     .unwrap();
165    /// ```
166    pub fn builder() -> OptsBuilder {
167        OptsBuilder::default()
168    }
169}
170
171/// Builder for connection options.
172///
173/// # Example
174/// ```no_run
175/// use voltdb_client_rust::{Opts, IpPort};
176/// use std::time::Duration;
177///
178/// let opts = Opts::builder()
179///     .host("localhost", 21212)
180///     .user("admin")
181///     .password("password")
182///     .connect_timeout(Duration::from_secs(10))
183///     .build()
184///     .unwrap();
185/// ```
186#[derive(Debug, Clone, Default)]
187pub struct OptsBuilder {
188    hosts: Vec<IpPort>,
189    user: Option<String>,
190    pass: Option<String>,
191    connect_timeout: Option<Duration>,
192    read_timeout: Option<Duration>,
193}
194
195impl OptsBuilder {
196    /// Add a host to connect to.
197    pub fn host(mut self, ip: &str, port: u16) -> Self {
198        self.hosts.push(IpPort::new(ip.to_string(), port));
199        self
200    }
201
202    /// Add multiple hosts to connect to.
203    pub fn hosts(mut self, hosts: Vec<IpPort>) -> Self {
204        self.hosts.extend(hosts);
205        self
206    }
207
208    /// Set the username for authentication.
209    pub fn user(mut self, user: &str) -> Self {
210        self.user = Some(user.to_string());
211        self
212    }
213
214    /// Set the password for authentication.
215    pub fn password(mut self, pass: &str) -> Self {
216        self.pass = Some(pass.to_string());
217        self
218    }
219
220    /// Set connection timeout.
221    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
222        self.connect_timeout = Some(timeout);
223        self
224    }
225
226    /// Set read timeout for socket operations.
227    pub fn read_timeout(mut self, timeout: Duration) -> Self {
228        self.read_timeout = Some(timeout);
229        self
230    }
231
232    /// Build the Opts.
233    ///
234    /// Returns an error if no hosts are configured.
235    pub fn build(self) -> Result<Opts, VoltError> {
236        if self.hosts.is_empty() {
237            return Err(VoltError::InvalidConfig);
238        }
239        Ok(Opts(Box::new(InnerOpts {
240            ip_ports: self.hosts,
241            user: self.user,
242            pass: self.pass,
243            connect_timeout: self.connect_timeout,
244            read_timeout: self.read_timeout,
245        })))
246    }
247}
248
249#[derive(Debug, Clone, Eq, PartialEq)]
250pub(crate) struct InnerOpts {
251    pub(crate) ip_ports: Vec<IpPort>,
252    pub(crate) user: Option<String>,
253    pub(crate) pass: Option<String>,
254    pub(crate) connect_timeout: Option<Duration>,
255    pub(crate) read_timeout: Option<Duration>,
256}
257
258/// Options for creating a single [`Node`] connection.
259///
260/// This struct holds the connection parameters for establishing a TCP connection
261/// to a VoltDB server node.
262///
263/// # Example
264/// ```no_run
265/// use voltdb_client_rust::{NodeOpt, IpPort};
266/// use std::time::Duration;
267///
268/// let opt = NodeOpt {
269///     ip_port: IpPort::new("localhost".to_string(), 21212),
270///     user: Some("admin".to_string()),
271///     pass: Some("password".to_string()),
272///     connect_timeout: Some(Duration::from_secs(10)),
273///     read_timeout: Some(Duration::from_secs(30)),
274/// };
275/// ```
276pub struct NodeOpt {
277    /// The host and port to connect to.
278    pub ip_port: IpPort,
279    /// Optional username for authentication.
280    pub user: Option<String>,
281    /// Optional password for authentication.
282    pub pass: Option<String>,
283    /// Connection timeout. If `None`, the connection attempt will block indefinitely.
284    pub connect_timeout: Option<Duration>,
285    /// Read timeout for socket operations. If `None`, reads will block indefinitely.
286    /// This affects the background listener thread that receives responses from the server.
287    pub read_timeout: Option<Duration>,
288}
289
290/// Type alias for the pending requests map.
291/// Maps request handles to their response channels.
292type PendingRequests = HashMap<i64, Sender<VoltTable>>;
293
294/// Marker trait for VoltDB connections.
295///
296/// Implemented by both synchronous [`Node`] and async `AsyncNode` connections.
297pub trait Connection: Sync + Send + 'static {}
298
299/// A single TCP connection to a VoltDB server node.
300///
301/// `Node` represents a persistent, stateful TCP connection used to execute
302/// stored procedures and queries against a VoltDB cluster. Each `Node`
303/// maintains its own socket and spawns a dedicated background thread to
304/// asynchronously receive and dispatch responses from the server.
305///
306/// # Concurrency
307///
308/// `Node` is safe to use concurrently and supports multiple in-flight requests
309/// over the same connection. For automatic reconnection, load balancing, or
310/// managing multiple connections, use [`crate::Pool`].
311///
312/// # Example
313///
314/// ```no_run
315/// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
316///
317/// let opt = NodeOpt {
318///     ip_port: IpPort::new("localhost".to_string(), 21212),
319///     user: None,
320///     pass: None,
321///     connect_timeout: None,
322///     read_timeout: None,
323/// };
324///
325/// let node = Node::new(opt)?;
326/// let rx = node.query("SELECT * FROM my_table")?;
327/// let table = block_for_result(&rx)?;
328/// # Ok::<(), voltdb_client_rust::VoltError>(())
329/// ```
330#[allow(dead_code)]
331pub struct Node {
332    /// Write-side of the TCP stream, protected by a mutex for thread-safe writes.
333    /// Multiple threads can call query/call_sp concurrently; writes are serialized.
334    write_stream: Mutex<Option<TcpStream>>,
335    info: ConnInfo,
336    /// Pending requests awaiting responses. Uses Mutex instead of RwLock since
337    /// both insert (main thread) and remove (listener thread) require exclusive access.
338    requests: Arc<Mutex<PendingRequests>>,
339    stop: Arc<Mutex<bool>>,
340    counter: AtomicI64,
341    /// Simple atomic lock for write operations. True = locked, False = unlocked.
342    write_lock: AtomicBool,
343}
344
345impl Debug for Node {
346    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
347        write!(f, "Pending request: {}", 1)
348    }
349}
350
351impl Drop for Node {
352    fn drop(&mut self) {
353        let res = self.shutdown();
354        match res {
355            Ok(_) => {}
356            Err(_e) => {
357                node_error!(error = ?_e, "error during node shutdown");
358            }
359        }
360    }
361}
362
363impl Connection for Node {}
364
365impl Node {
366    /// Creates a new connection to a VoltDB server node.
367    ///
368    /// This method establishes a TCP connection to the specified host/port,
369    /// performs authentication, and spawns a background listener thread for
370    /// receiving asynchronous responses.
371    ///
372    /// # Arguments
373    /// * `opt` - Connection options including host, port, credentials, and timeouts.
374    ///
375    /// # Timeouts
376    /// * `connect_timeout` - If set, limits how long the connection attempt will wait.
377    ///   If not set, the connection attempt blocks indefinitely.
378    /// * `read_timeout` - If set, socket read operations will timeout after this duration.
379    ///   This affects both the authentication phase and the background listener thread.
380    ///
381    /// # Errors
382    /// Returns `VoltError` if:
383    /// * The connection cannot be established (network error or timeout)
384    /// * DNS resolution fails
385    /// * Authentication fails
386    /// * The server rejects the connection
387    ///
388    /// # Example
389    /// ```no_run
390    /// use voltdb_client_rust::{Node, NodeOpt, IpPort};
391    /// use std::time::Duration;
392    ///
393    /// let opt = NodeOpt {
394    ///     ip_port: IpPort::new("localhost".to_string(), 21212),
395    ///     user: None,
396    ///     pass: None,
397    ///     connect_timeout: Some(Duration::from_secs(5)),
398    ///     read_timeout: Some(Duration::from_secs(30)),
399    /// };
400    /// let node = Node::new(opt)?;
401    /// # Ok::<(), voltdb_client_rust::VoltError>(())
402    /// ```
403    pub fn new(opt: NodeOpt) -> Result<Node, VoltError> {
404        let ip_host = opt.ip_port;
405        let addr_str = format!("{}:{}", ip_host.ip_host, ip_host.port);
406
407        // Build authentication message using shared protocol code
408        let auth_msg = build_auth_message(opt.user.as_deref(), opt.pass.as_deref())?;
409
410        // Connect to server with optional timeout
411        let mut stream: TcpStream = match opt.connect_timeout {
412            Some(timeout) => {
413                // Resolve address for connect_timeout (requires SocketAddr)
414                let socket_addr: SocketAddr = addr_str
415                    .to_socket_addrs()
416                    .map_err(|_| VoltError::InvalidConfig)?
417                    .find(|s| s.is_ipv4())
418                    .ok_or(VoltError::InvalidConfig)?;
419                TcpStream::connect_timeout(&socket_addr, timeout)?
420            }
421            None => TcpStream::connect(&addr_str)?,
422        };
423
424        // Set read timeout if configured
425        if let Some(read_timeout) = opt.read_timeout {
426            stream.set_read_timeout(Some(read_timeout))?;
427        }
428
429        // Send auth request
430        stream.write_all(&auth_msg)?;
431        stream.flush()?;
432
433        // Read auth response
434        let read = stream.read_u32::<BigEndian>()?;
435        let mut all = vec![0; read as usize];
436        stream.read_exact(&mut all)?;
437
438        // Parse auth response using shared protocol code
439        let info = parse_auth_response(&all)?;
440
441        // Clone the stream for the read side (listener thread)
442        let read_stream = stream.try_clone()?;
443
444        let requests = Arc::new(Mutex::new(HashMap::new()));
445        let stop = Arc::new(Mutex::new(false));
446
447        // Start the listener thread with the read side
448        Self::start_listener(read_stream, Arc::clone(&requests), Arc::clone(&stop));
449
450        Ok(Node {
451            stop,
452            write_stream: Mutex::new(Some(stream)),
453            info,
454            requests,
455            counter: AtomicI64::new(1),
456            write_lock: AtomicBool::new(false),
457        })
458    }
459    /// Returns the next unique sequence number for request tracking.
460    ///
461    /// Each request to VoltDB uses a unique handle (sequence number) for
462    /// matching responses to requests.
463    pub fn get_sequence(&self) -> i64 {
464        self.counter.fetch_add(1, Ordering::Relaxed)
465    }
466
467    /// Lists all stored procedures available in the VoltDB database.
468    ///
469    /// This calls the `@SystemCatalog` system procedure with "PROCEDURES" argument.
470    ///
471    /// # Returns
472    /// A receiver that will yield a `VoltTable` containing procedure metadata.
473    pub fn list_procedures(&self) -> Result<Receiver<VoltTable>, VoltError> {
474        self.call_sp("@SystemCatalog", volt_param!("PROCEDURES"))
475    }
476
477    /// Executes a stored procedure with the given parameters.
478    ///
479    /// # Arguments
480    /// * `query` - The name of the stored procedure (e.g., "@AdHoc", "MyProcedure")
481    /// * `param` - Vector of parameter values. Use [`volt_param!`] macro for convenience.
482    ///
483    /// # Returns
484    /// A receiver that will yield the result `VoltTable` when available.
485    ///
486    /// # Example
487    /// ```no_run
488    /// use voltdb_client_rust::{Node, NodeOpt, IpPort, Value, block_for_result, volt_param};
489    ///
490    /// # let opt = NodeOpt {
491    /// #     ip_port: IpPort::new("localhost".to_string(), 21212),
492    /// #     user: None, pass: None, connect_timeout: None, read_timeout: None,
493    /// # };
494    /// let node = Node::new(opt)?;
495    /// let id = 1i32;
496    /// let name = "test".to_string();
497    /// let rx = node.call_sp("MyProcedure", volt_param![id, name])?;
498    /// let result = block_for_result(&rx)?;
499    /// # Ok::<(), voltdb_client_rust::VoltError>(())
500    /// ```
501    pub fn call_sp(
502        &self,
503        query: &str,
504        param: Vec<&dyn Value>,
505    ) -> Result<Receiver<VoltTable>, VoltError> {
506        let handle = self.get_sequence();
507        let mut proc = new_procedure_invocation(handle, false, &param, query);
508        let (tx, rx): (Sender<VoltTable>, Receiver<VoltTable>) = mpsc::channel();
509
510        // Register the response channel before sending the request
511        self.requests.lock()?.insert(handle, tx);
512        let bs = proc.bytes();
513        // Write to stream while holding the lock
514        let result = {
515            let mut stream_guard = self.write_stream.lock()?;
516            match stream_guard.as_mut() {
517                None => Err(VoltError::ConnectionNotAvailable),
518                Some(stream) => {
519                    stream.write_all(&bs)?;
520                    Ok(rx)
521                }
522            }
523        };
524
525        // Release write lock
526        self.write_lock.store(false, Ordering::Release);
527
528        result
529    }
530
531    /// Uploads a JAR file containing stored procedure classes to VoltDB.
532    ///
533    /// This calls the `@UpdateClasses` system procedure to deploy new classes.
534    ///
535    /// # Arguments
536    /// * `bs` - The JAR file contents as bytes
537    pub fn upload_jar(&self, bs: Vec<u8>) -> Result<Receiver<VoltTable>, VoltError> {
538        self.call_sp("@UpdateClasses", volt_param!(bs, ""))
539    }
540
541    /// Executes an ad-hoc SQL query.
542    ///
543    /// This is a convenience method that calls the `@AdHoc` system procedure.
544    ///
545    /// # Arguments
546    /// * `sql` - The SQL query string to execute
547    ///
548    /// # Returns
549    /// A receiver that will yield the result `VoltTable` when available.
550    ///
551    /// # Example
552    /// ```no_run
553    /// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
554    ///
555    /// # let opt = NodeOpt {
556    /// #     ip_port: IpPort::new("localhost".to_string(), 21212),
557    /// #     user: None, pass: None, connect_timeout: None, read_timeout: None,
558    /// # };
559    /// let node = Node::new(opt)?;
560    /// let rx = node.query("SELECT COUNT(*) FROM users")?;
561    /// let result = block_for_result(&rx)?;
562    /// # Ok::<(), voltdb_client_rust::VoltError>(())
563    /// ```
564    pub fn query(&self, sql: &str) -> Result<Receiver<VoltTable>, VoltError> {
565        let zero_vec: Vec<&dyn Value> = vec![&sql];
566        self.call_sp("@AdHoc", zero_vec)
567    }
568
569    /// Sends a ping to the VoltDB server.
570    ///
571    /// This can be used to keep the connection alive or verify connectivity.
572    /// The ping response is handled internally and not returned to the caller.
573    pub fn ping(&self) -> Result<(), VoltError> {
574        let zero_vec: Vec<&dyn Value> = Vec::new();
575        let mut proc = new_procedure_invocation(PING_HANDLE, false, &zero_vec, "@Ping");
576        let bs = proc.bytes();
577
578        // Acquire write lock (spin until acquired)
579        while self
580            .write_lock
581            .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
582            .is_err()
583        {
584            std::hint::spin_loop();
585        }
586
587        let result = {
588            let mut stream_guard = self.write_stream.lock()?;
589            match stream_guard.as_mut() {
590                None => Err(VoltError::ConnectionNotAvailable),
591                Some(stream) => {
592                    stream.write_all(&bs)?;
593                    Ok(())
594                }
595            }
596        };
597
598        // Release write lock
599        self.write_lock.store(false, Ordering::Release);
600
601        result
602    }
603
604    /// Reads and processes a single message from the TCP stream.
605    ///
606    /// # Arguments
607    /// * `tcp` - The TCP stream to read from
608    /// * `requests` - Map of pending requests awaiting responses
609    /// * `buffer` - Reusable buffer for reading message data (reduces allocations)
610    fn job(
611        tcp: &mut impl Read,
612        requests: &Arc<Mutex<PendingRequests>>,
613        buffer: &mut Vec<u8>,
614    ) -> Result<(), VoltError> {
615        // Read message length (4 bytes, big-endian)
616        let msg_len = tcp.read_u32::<BigEndian>()?;
617        if msg_len == 0 {
618            return Ok(());
619        }
620
621        // Reuse buffer: resize if needed, but capacity is retained
622        buffer.resize(msg_len as usize, 0);
623        tcp.read_exact(buffer)?;
624
625        let mut res = ByteBuffer::from_bytes(buffer);
626        // Skip protocol version byte (always 0 for current protocol)
627        let _ = res.read_u8()?;
628        let handle = res.read_i64()?;
629
630        if handle == PING_HANDLE {
631            return Ok(()); // Ping response, nothing else to do
632        }
633
634        if let Some(sender) = requests.lock()?.remove(&handle) {
635            let info = VoltResponseInfo::new(&mut res, handle)?;
636            let table = new_volt_table(&mut res, info)?;
637            // Ignore send error - receiver may have been dropped if caller
638            // timed out or cancelled the request
639            let _ = sender.send(table);
640        }
641
642        Ok(())
643    }
644    /// Gracefully shuts down the connection.
645    ///
646    /// This stops the background listener thread and closes the TCP connection.
647    /// The `Node` will be unusable after calling this method.
648    ///
649    /// Note: This is automatically called when the `Node` is dropped.
650    pub fn shutdown(&mut self) -> Result<(), VoltError> {
651        let mut stop = self.stop.lock()?;
652        *stop = true;
653
654        let mut stream_guard = self.write_stream.lock()?;
655        if let Some(stream) = stream_guard.take() {
656            stream.shutdown(Shutdown::Both)?;
657        }
658        Ok(())
659    }
660
661    /// Starts the background listener thread for receiving responses.
662    ///
663    /// This is a static method that takes ownership of the read-side stream.
664    fn start_listener(
665        mut tcp: TcpStream,
666        requests: Arc<Mutex<PendingRequests>>,
667        stopping: Arc<Mutex<bool>>,
668    ) {
669        thread::spawn(move || {
670            // Reusable buffer to reduce allocation pressure.
671            // Starts with 4KB capacity, grows as needed but rarely shrinks.
672            let mut buffer = Vec::with_capacity(4096);
673
674            loop {
675                // Check stop flag before blocking on read
676                let should_stop = stopping
677                    .lock()
678                    .unwrap_or_else(|poisoned| poisoned.into_inner());
679                if *should_stop {
680                    break;
681                }
682                drop(should_stop); // Release lock before blocking on I/O
683
684                if let Err(_err) = Node::job(&mut tcp, &requests, &mut buffer) {
685                    // Only log error if we're not intentionally stopping
686                    let is_stopping = stopping
687                        .lock()
688                        .unwrap_or_else(|poisoned| poisoned.into_inner());
689                    if !*is_stopping {
690                        node_error!(error = %_err, "VoltDB listener error");
691                    }
692                }
693            }
694        });
695    }
696}
697
698/// Connection metadata returned by the VoltDB server during authentication.
699///
700/// This struct contains information about the server that the client connected to,
701/// including the host ID, connection ID, and cluster leader address.
702#[derive(Debug, Clone)]
703pub struct ConnInfo {
704    /// The ID of the host in the VoltDB cluster.
705    pub host_id: i32,
706    /// Unique connection identifier assigned by the server.
707    pub connection: i64,
708    /// IPv4 address of the cluster leader node.
709    pub leader_addr: Ipv4Addr,
710    /// VoltDB server build/version string.
711    pub build: String,
712}
713
714impl Default for ConnInfo {
715    fn default() -> Self {
716        Self {
717            host_id: 0,
718            connection: 0,
719            leader_addr: Ipv4Addr::new(127, 0, 0, 1),
720            build: String::new(),
721        }
722    }
723}
724
725/// Blocks until a response is received and converts any VoltDB errors.
726///
727/// This is a convenience function for synchronous usage. It waits for the
728/// response on the channel and converts VoltDB-level errors (from the response)
729/// into `VoltError`.
730///
731/// # Arguments
732/// * `res` - The receiver from a query or stored procedure call
733///
734/// # Returns
735/// The result `VoltTable` on success, or `VoltError` if the operation failed.
736///
737/// # Example
738/// ```no_run
739/// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
740///
741/// # let opt = NodeOpt {
742/// #     ip_port: IpPort::new("localhost".to_string(), 21212),
743/// #     user: None, pass: None, connect_timeout: None, read_timeout: None,
744/// # };
745/// let node = Node::new(opt)?;
746/// let rx = node.query("SELECT * FROM users")?;
747/// let mut table = block_for_result(&rx)?;
748///
749/// while table.advance_row() {
750///     // Process rows...
751/// }
752/// # Ok::<(), voltdb_client_rust::VoltError>(())
753/// ```
754pub fn block_for_result(res: &Receiver<VoltTable>) -> Result<VoltTable, VoltError> {
755    let mut table = res.recv()?;
756    let err = table.has_error();
757    match err {
758        None => Ok(table),
759        Some(err) => Err(err),
760    }
761}
762
763pub fn reset() {}
764
765/// Creates a new connection to a VoltDB server using an address string.
766///
767/// This is a convenience function that parses the address string and creates
768/// a connection with default settings (no authentication, no timeouts).
769///
770/// # Arguments
771/// * `addr` - The server address in "host:port" format (e.g., "localhost:21212")
772///
773/// # Errors
774/// Returns `VoltError::InvalidConfig` if the address cannot be parsed or resolved.
775///
776/// # Example
777/// ```no_run
778/// use voltdb_client_rust::get_node;
779///
780/// let node = get_node("localhost:21212")?;
781/// # Ok::<(), voltdb_client_rust::VoltError>(())
782/// ```
783pub fn get_node(addr: &str) -> Result<Node, VoltError> {
784    let addrs = addr
785        .to_socket_addrs()
786        .map_err(|_| VoltError::InvalidConfig)?;
787
788    let socket_addr = addrs
789        .into_iter()
790        .find(|s| s.is_ipv4())
791        .ok_or(VoltError::InvalidConfig)?;
792
793    let ip_port = IpPort::new(socket_addr.ip().to_string(), socket_addr.port());
794
795    let opt = NodeOpt {
796        ip_port,
797        user: None,
798        pass: None,
799        connect_timeout: None,
800        read_timeout: None,
801    };
802    Node::new(opt)
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808
809    #[test]
810    fn test_opts_builder_basic() {
811        let opts = Opts::builder().host("localhost", 21212).build().unwrap();
812
813        assert_eq!(opts.0.ip_ports.len(), 1);
814        assert_eq!(opts.0.ip_ports[0].ip_host, "localhost");
815        assert_eq!(opts.0.ip_ports[0].port, 21212);
816        assert!(opts.0.user.is_none());
817        assert!(opts.0.pass.is_none());
818    }
819
820    #[test]
821    fn test_opts_builder_with_auth() {
822        let opts = Opts::builder()
823            .host("127.0.0.1", 21211)
824            .user("admin")
825            .password("secret")
826            .build()
827            .unwrap();
828
829        assert_eq!(opts.0.user, Some("admin".to_string()));
830        assert_eq!(opts.0.pass, Some("secret".to_string()));
831    }
832
833    #[test]
834    fn test_opts_builder_multiple_hosts() {
835        let opts = Opts::builder()
836            .host("host1", 21212)
837            .host("host2", 21212)
838            .host("host3", 21212)
839            .build()
840            .unwrap();
841
842        assert_eq!(opts.0.ip_ports.len(), 3);
843        assert_eq!(opts.0.ip_ports[0].ip_host, "host1");
844        assert_eq!(opts.0.ip_ports[1].ip_host, "host2");
845        assert_eq!(opts.0.ip_ports[2].ip_host, "host3");
846    }
847
848    #[test]
849    fn test_opts_builder_with_hosts_vec() {
850        let hosts = vec![
851            IpPort::new("node1".to_string(), 21212),
852            IpPort::new("node2".to_string(), 21213),
853        ];
854        let opts = Opts::builder().hosts(hosts).build().unwrap();
855
856        assert_eq!(opts.0.ip_ports.len(), 2);
857    }
858
859    #[test]
860    fn test_opts_builder_with_timeouts() {
861        let opts = Opts::builder()
862            .host("localhost", 21212)
863            .connect_timeout(Duration::from_secs(10))
864            .read_timeout(Duration::from_secs(30))
865            .build()
866            .unwrap();
867
868        assert_eq!(opts.0.connect_timeout, Some(Duration::from_secs(10)));
869        assert_eq!(opts.0.read_timeout, Some(Duration::from_secs(30)));
870    }
871
872    #[test]
873    fn test_opts_builder_no_hosts_fails() {
874        let result = Opts::builder().build();
875        assert!(result.is_err());
876        match result {
877            Err(VoltError::InvalidConfig) => {}
878            _ => panic!("Expected InvalidConfig error"),
879        }
880    }
881
882    #[test]
883    fn test_opts_new_compatibility() {
884        let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
885        let opts = Opts::new(hosts);
886
887        assert_eq!(opts.0.ip_ports.len(), 1);
888        assert!(opts.0.user.is_none());
889        assert!(opts.0.connect_timeout.is_none());
890    }
891
892    #[test]
893    fn test_ip_port_new() {
894        let ip_port = IpPort::new("192.168.1.1".to_string(), 8080);
895        assert_eq!(ip_port.ip_host, "192.168.1.1");
896        assert_eq!(ip_port.port, 8080);
897    }
898}