voltdb_client_rust/node.rs
1//! # VoltDB Node Connection
2//!
3//! This module provides the core TCP connection handling for communicating with VoltDB servers.
4//!
5//! ## Architecture
6//!
7//! The connection uses a single-threaded TCP listener design:
8//!
9//! - We spawn one dedicated thread to synchronously read from the TcpStream.
10//! - Each incoming message is dispatched via channels to the rest of the application,
11//! allowing users to perform asynchronous operations on the received data.
12//! - Using `Mutex<TcpStream>` for single Stream would introduce blocking and contention,
13//! because locking for each read/write would stall other operations. This design
14//! avoids that while keeping the network I/O simple and efficient.
15//!
16//! ## Timeouts
17//!
18//! The module supports two types of timeouts:
19//!
20//! - **Connection timeout**: Limits how long the initial TCP connection attempt will wait.
21//! Use [`OptsBuilder::connect_timeout`] or set [`NodeOpt::connect_timeout`].
22//! - **Read timeout**: Limits how long socket read operations will wait for data.
23//! This affects both the authentication handshake and the background listener thread.
24//! Use [`OptsBuilder::read_timeout`] or set [`NodeOpt::read_timeout`].
25//!
26//! ## Example
27//!
28//! ```no_run
29//! use voltdb_client_rust::{Opts, Pool};
30//! use std::time::Duration;
31//!
32//! // Create connection options with timeouts
33//! let opts = Opts::builder()
34//! .host("localhost", 21212)
35//! .connect_timeout(Duration::from_secs(5))
36//! .read_timeout(Duration::from_secs(30))
37//! .build()
38//! .unwrap();
39//!
40//! // Create a connection pool
41//! let mut pool = Pool::new(opts)?;
42//! let mut conn = pool.get_conn()?;
43//!
44//! // Execute a query
45//! let result = conn.query("SELECT * FROM my_table")?;
46//! # Ok::<(), voltdb_client_rust::VoltError>(())
47//! ```
48
49use std::collections::HashMap;
50use std::fmt::{Debug, Formatter};
51use std::io::{Read, Write};
52use std::net::{Ipv4Addr, Shutdown, SocketAddr, TcpStream, ToSocketAddrs};
53use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
54use std::sync::mpsc::{Receiver, Sender};
55use std::sync::{Arc, Mutex, mpsc};
56use std::thread;
57use std::time::Duration;
58
59use bytebuffer::ByteBuffer;
60use byteorder::{BigEndian, ReadBytesExt};
61
62use crate::encode::{Value, VoltError};
63use crate::procedure_invocation::new_procedure_invocation;
64use crate::protocol::{PING_HANDLE, build_auth_message, parse_auth_response};
65use crate::response::VoltResponseInfo;
66use crate::table::{VoltTable, new_volt_table};
67use crate::volt_param;
68
69// ============================================================================
70// Logging macros - use tracing if available, otherwise no-op
71// ============================================================================
72
73#[cfg(feature = "tracing")]
74macro_rules! node_error {
75 ($($arg:tt)*) => { tracing::error!($($arg)*) };
76}
77#[cfg(not(feature = "tracing"))]
78macro_rules! node_error {
79 ($($arg:tt)*) => {};
80}
81
82/// Connection options for VoltDB client.
83///
84/// This struct encapsulates all configuration options needed to establish
85/// connections to a VoltDB cluster. Use [`Opts::builder()`] for a fluent
86/// configuration API or [`Opts::new()`] for simple configurations.
87///
88/// # Example
89/// ```no_run
90/// use voltdb_client_rust::{Opts, IpPort};
91///
92/// // Simple configuration
93/// let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
94/// let opts = Opts::new(hosts);
95///
96/// // Or use the builder for more options
97/// let opts = Opts::builder()
98/// .host("localhost", 21212)
99/// .user("admin")
100/// .password("secret")
101/// .build()
102/// .unwrap();
103/// ```
104#[derive(Clone, Eq, PartialEq, Debug)]
105pub struct Opts(pub(crate) Box<InnerOpts>);
106
107/// Host and port pair for VoltDB server connections.
108///
109/// Represents a single VoltDB server endpoint. Multiple `IpPort` instances
110/// can be used with connection pools for cluster connectivity.
111///
112/// # Example
113/// ```
114/// use voltdb_client_rust::IpPort;
115///
116/// let endpoint = IpPort::new("192.168.1.100".to_string(), 21212);
117/// ```
118#[derive(Debug, Clone, Eq, PartialEq)]
119pub struct IpPort {
120 pub(crate) ip_host: String,
121 pub(crate) port: u16,
122}
123
124impl IpPort {
125 /// Creates a new `IpPort` with the given hostname/IP and port.
126 ///
127 /// # Arguments
128 /// * `ip_host` - Hostname or IP address of the VoltDB server
129 /// * `port` - Port number (typically 21212 for client connections)
130 pub fn new(ip_host: String, port: u16) -> Self {
131 IpPort { ip_host, port }
132 }
133}
134
135impl Opts {
136 /// Creates connection options with the given hosts and default settings.
137 ///
138 /// This is a convenience constructor for simple configurations without
139 /// authentication or timeouts. For more control, use [`Opts::builder()`].
140 ///
141 /// # Arguments
142 /// * `hosts` - List of VoltDB server endpoints to connect to
143 pub fn new(hosts: Vec<IpPort>) -> Opts {
144 Opts(Box::new(InnerOpts {
145 ip_ports: hosts,
146 user: None,
147 pass: None,
148 connect_timeout: None,
149 read_timeout: None,
150 }))
151 }
152
153 /// Creates a new [`OptsBuilder`] for fluent configuration.
154 ///
155 /// # Example
156 /// ```no_run
157 /// use voltdb_client_rust::Opts;
158 /// use std::time::Duration;
159 ///
160 /// let opts = Opts::builder()
161 /// .host("localhost", 21212)
162 /// .connect_timeout(Duration::from_secs(5))
163 /// .build()
164 /// .unwrap();
165 /// ```
166 pub fn builder() -> OptsBuilder {
167 OptsBuilder::default()
168 }
169}
170
171/// Builder for connection options.
172///
173/// # Example
174/// ```no_run
175/// use voltdb_client_rust::{Opts, IpPort};
176/// use std::time::Duration;
177///
178/// let opts = Opts::builder()
179/// .host("localhost", 21212)
180/// .user("admin")
181/// .password("password")
182/// .connect_timeout(Duration::from_secs(10))
183/// .build()
184/// .unwrap();
185/// ```
186#[derive(Debug, Clone, Default)]
187pub struct OptsBuilder {
188 hosts: Vec<IpPort>,
189 user: Option<String>,
190 pass: Option<String>,
191 connect_timeout: Option<Duration>,
192 read_timeout: Option<Duration>,
193}
194
195impl OptsBuilder {
196 /// Add a host to connect to.
197 pub fn host(mut self, ip: &str, port: u16) -> Self {
198 self.hosts.push(IpPort::new(ip.to_string(), port));
199 self
200 }
201
202 /// Add multiple hosts to connect to.
203 pub fn hosts(mut self, hosts: Vec<IpPort>) -> Self {
204 self.hosts.extend(hosts);
205 self
206 }
207
208 /// Set the username for authentication.
209 pub fn user(mut self, user: &str) -> Self {
210 self.user = Some(user.to_string());
211 self
212 }
213
214 /// Set the password for authentication.
215 pub fn password(mut self, pass: &str) -> Self {
216 self.pass = Some(pass.to_string());
217 self
218 }
219
220 /// Set connection timeout.
221 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
222 self.connect_timeout = Some(timeout);
223 self
224 }
225
226 /// Set read timeout for socket operations.
227 pub fn read_timeout(mut self, timeout: Duration) -> Self {
228 self.read_timeout = Some(timeout);
229 self
230 }
231
232 /// Build the Opts.
233 ///
234 /// Returns an error if no hosts are configured.
235 pub fn build(self) -> Result<Opts, VoltError> {
236 if self.hosts.is_empty() {
237 return Err(VoltError::InvalidConfig);
238 }
239 Ok(Opts(Box::new(InnerOpts {
240 ip_ports: self.hosts,
241 user: self.user,
242 pass: self.pass,
243 connect_timeout: self.connect_timeout,
244 read_timeout: self.read_timeout,
245 })))
246 }
247}
248
249#[derive(Debug, Clone, Eq, PartialEq)]
250pub(crate) struct InnerOpts {
251 pub(crate) ip_ports: Vec<IpPort>,
252 pub(crate) user: Option<String>,
253 pub(crate) pass: Option<String>,
254 pub(crate) connect_timeout: Option<Duration>,
255 pub(crate) read_timeout: Option<Duration>,
256}
257
258/// Options for creating a single [`Node`] connection.
259///
260/// This struct holds the connection parameters for establishing a TCP connection
261/// to a VoltDB server node.
262///
263/// # Example
264/// ```no_run
265/// use voltdb_client_rust::{NodeOpt, IpPort};
266/// use std::time::Duration;
267///
268/// let opt = NodeOpt {
269/// ip_port: IpPort::new("localhost".to_string(), 21212),
270/// user: Some("admin".to_string()),
271/// pass: Some("password".to_string()),
272/// connect_timeout: Some(Duration::from_secs(10)),
273/// read_timeout: Some(Duration::from_secs(30)),
274/// };
275/// ```
276pub struct NodeOpt {
277 /// The host and port to connect to.
278 pub ip_port: IpPort,
279 /// Optional username for authentication.
280 pub user: Option<String>,
281 /// Optional password for authentication.
282 pub pass: Option<String>,
283 /// Connection timeout. If `None`, the connection attempt will block indefinitely.
284 pub connect_timeout: Option<Duration>,
285 /// Read timeout for socket operations. If `None`, reads will block indefinitely.
286 /// This affects the background listener thread that receives responses from the server.
287 pub read_timeout: Option<Duration>,
288}
289
290/// Type alias for the pending requests map.
291/// Maps request handles to their response channels.
292type PendingRequests = HashMap<i64, Sender<VoltTable>>;
293
294/// Marker trait for VoltDB connections.
295///
296/// Implemented by both synchronous [`Node`] and async `AsyncNode` connections.
297pub trait Connection: Sync + Send + 'static {}
298
299/// A single TCP connection to a VoltDB server node.
300///
301/// `Node` represents a persistent, stateful TCP connection used to execute
302/// stored procedures and queries against a VoltDB cluster. Each `Node`
303/// maintains its own socket and spawns a dedicated background thread to
304/// asynchronously receive and dispatch responses from the server.
305///
306/// # Concurrency
307///
308/// `Node` is safe to use concurrently and supports multiple in-flight requests
309/// over the same connection. For automatic reconnection, load balancing, or
310/// managing multiple connections, use [`crate::Pool`].
311///
312/// # Example
313///
314/// ```no_run
315/// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
316///
317/// let opt = NodeOpt {
318/// ip_port: IpPort::new("localhost".to_string(), 21212),
319/// user: None,
320/// pass: None,
321/// connect_timeout: None,
322/// read_timeout: None,
323/// };
324///
325/// let node = Node::new(opt)?;
326/// let rx = node.query("SELECT * FROM my_table")?;
327/// let table = block_for_result(&rx)?;
328/// # Ok::<(), voltdb_client_rust::VoltError>(())
329/// ```
330#[allow(dead_code)]
331pub struct Node {
332 /// Write-side of the TCP stream, protected by a mutex for thread-safe writes.
333 /// Multiple threads can call query/call_sp concurrently; writes are serialized.
334 write_stream: Mutex<Option<TcpStream>>,
335 info: ConnInfo,
336 /// Pending requests awaiting responses. Uses Mutex instead of RwLock since
337 /// both insert (main thread) and remove (listener thread) require exclusive access.
338 requests: Arc<Mutex<PendingRequests>>,
339 stop: Arc<Mutex<bool>>,
340 counter: AtomicI64,
341 /// Simple atomic lock for write operations. True = locked, False = unlocked.
342 write_lock: AtomicBool,
343 /// Handle for the background listener thread
344 listener_handle: Option<thread::JoinHandle<()>>,
345}
346
347impl Debug for Node {
348 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
349 write!(f, "Pending request: {}", 1)
350 }
351}
352
353impl Drop for Node {
354 fn drop(&mut self) {
355 let res = self.shutdown();
356 match res {
357 Ok(_) => {}
358 Err(_e) => {
359 node_error!(error = ?_e, "error during node shutdown");
360 }
361 }
362 }
363}
364
365impl Connection for Node {}
366
367impl Node {
368 /// Creates a new connection to a VoltDB server node.
369 ///
370 /// This method establishes a TCP connection to the specified host/port,
371 /// performs authentication, and spawns a background listener thread for
372 /// receiving asynchronous responses.
373 ///
374 /// # Arguments
375 /// * `opt` - Connection options including host, port, credentials, and timeouts.
376 ///
377 /// # Timeouts
378 /// * `connect_timeout` - If set, limits how long the connection attempt will wait.
379 /// If not set, the connection attempt blocks indefinitely.
380 /// * `read_timeout` - If set, socket read operations will timeout after this duration.
381 /// This affects both the authentication phase and the background listener thread.
382 ///
383 /// # Errors
384 /// Returns `VoltError` if:
385 /// * The connection cannot be established (network error or timeout)
386 /// * DNS resolution fails
387 /// * Authentication fails
388 /// * The server rejects the connection
389 ///
390 /// # Example
391 /// ```no_run
392 /// use voltdb_client_rust::{Node, NodeOpt, IpPort};
393 /// use std::time::Duration;
394 ///
395 /// let opt = NodeOpt {
396 /// ip_port: IpPort::new("localhost".to_string(), 21212),
397 /// user: None,
398 /// pass: None,
399 /// connect_timeout: Some(Duration::from_secs(5)),
400 /// read_timeout: Some(Duration::from_secs(30)),
401 /// };
402 /// let node = Node::new(opt)?;
403 /// # Ok::<(), voltdb_client_rust::VoltError>(())
404 /// ```
405 pub fn new(opt: NodeOpt) -> Result<Node, VoltError> {
406 let ip_host = opt.ip_port;
407 let addr_str = format!("{}:{}", ip_host.ip_host, ip_host.port);
408
409 // Build authentication message using shared protocol code
410 let auth_msg = build_auth_message(opt.user.as_deref(), opt.pass.as_deref())?;
411
412 // Connect to server with optional timeout
413 let mut stream: TcpStream = match opt.connect_timeout {
414 Some(timeout) => {
415 // Resolve address for connect_timeout (requires SocketAddr)
416 let socket_addr: SocketAddr = addr_str
417 .to_socket_addrs()
418 .map_err(|_| VoltError::InvalidConfig)?
419 .find(|s| s.is_ipv4())
420 .ok_or(VoltError::InvalidConfig)?;
421 TcpStream::connect_timeout(&socket_addr, timeout)?
422 }
423 None => TcpStream::connect(&addr_str)?,
424 };
425
426 // Set read timeout if configured
427 if let Some(read_timeout) = opt.read_timeout {
428 stream.set_read_timeout(Some(read_timeout))?;
429 }
430
431 // Send auth request
432 stream.write_all(&auth_msg)?;
433 stream.flush()?;
434
435 // Read auth response
436 let read = stream.read_u32::<BigEndian>()?;
437 let mut all = vec![0; read as usize];
438 stream.read_exact(&mut all)?;
439
440 // Parse auth response using shared protocol code
441 let info = parse_auth_response(&all)?;
442
443 // Clone the stream for the read side (listener thread).
444 // Set a read timeout on the read clone so the listener thread can
445 // periodically check the stop flag (required on Linux where
446 // shutdown() on the write clone may not unblock the read clone).
447 let read_stream = stream.try_clone()?;
448 let listener_timeout = opt.read_timeout.unwrap_or(Duration::from_secs(2));
449 read_stream.set_read_timeout(Some(listener_timeout))?;
450
451 let requests = Arc::new(Mutex::new(HashMap::new()));
452 let stop = Arc::new(Mutex::new(false));
453
454 // Start the listener thread with the read side
455 let handle = Self::start_listener(read_stream, Arc::clone(&requests), Arc::clone(&stop));
456
457 Ok(Node {
458 stop,
459 write_stream: Mutex::new(Some(stream)),
460 info,
461 requests,
462 counter: AtomicI64::new(1),
463 write_lock: AtomicBool::new(false),
464 listener_handle: Some(handle),
465 })
466 }
467 /// Returns the next unique sequence number for request tracking.
468 ///
469 /// Each request to VoltDB uses a unique handle (sequence number) for
470 /// matching responses to requests.
471 pub fn get_sequence(&self) -> i64 {
472 self.counter.fetch_add(1, Ordering::Relaxed)
473 }
474
475 /// Lists all stored procedures available in the VoltDB database.
476 ///
477 /// This calls the `@SystemCatalog` system procedure with "PROCEDURES" argument.
478 ///
479 /// # Returns
480 /// A receiver that will yield a `VoltTable` containing procedure metadata.
481 pub fn list_procedures(&self) -> Result<Receiver<VoltTable>, VoltError> {
482 self.call_sp("@SystemCatalog", volt_param!("PROCEDURES"))
483 }
484
485 /// Executes a stored procedure with the given parameters.
486 ///
487 /// # Arguments
488 /// * `query` - The name of the stored procedure (e.g., "@AdHoc", "MyProcedure")
489 /// * `param` - Vector of parameter values. Use [`volt_param!`] macro for convenience.
490 ///
491 /// # Returns
492 /// A receiver that will yield the result `VoltTable` when available.
493 ///
494 /// # Example
495 /// ```no_run
496 /// use voltdb_client_rust::{Node, NodeOpt, IpPort, Value, block_for_result, volt_param};
497 ///
498 /// # let opt = NodeOpt {
499 /// # ip_port: IpPort::new("localhost".to_string(), 21212),
500 /// # user: None, pass: None, connect_timeout: None, read_timeout: None,
501 /// # };
502 /// let node = Node::new(opt)?;
503 /// let id = 1i32;
504 /// let name = "test".to_string();
505 /// let rx = node.call_sp("MyProcedure", volt_param![id, name])?;
506 /// let result = block_for_result(&rx)?;
507 /// # Ok::<(), voltdb_client_rust::VoltError>(())
508 /// ```
509 pub fn call_sp(
510 &self,
511 query: &str,
512 param: Vec<&dyn Value>,
513 ) -> Result<Receiver<VoltTable>, VoltError> {
514 let handle = self.get_sequence();
515 let mut proc = new_procedure_invocation(handle, false, ¶m, query);
516 let (tx, rx): (Sender<VoltTable>, Receiver<VoltTable>) = mpsc::channel();
517
518 // Register the response channel before sending the request
519 self.requests.lock()?.insert(handle, tx);
520 let bs = proc.bytes();
521 // Write to stream while holding the lock
522 let result = {
523 let mut stream_guard = self.write_stream.lock()?;
524 match stream_guard.as_mut() {
525 None => Err(VoltError::ConnectionNotAvailable),
526 Some(stream) => {
527 stream.write_all(&bs)?;
528 Ok(rx)
529 }
530 }
531 };
532
533 // Release write lock
534 self.write_lock.store(false, Ordering::Release);
535
536 result
537 }
538
539 /// Uploads a JAR file containing stored procedure classes to VoltDB.
540 ///
541 /// This calls the `@UpdateClasses` system procedure to deploy new classes.
542 ///
543 /// # Arguments
544 /// * `bs` - The JAR file contents as bytes
545 pub fn upload_jar(&self, bs: Vec<u8>) -> Result<Receiver<VoltTable>, VoltError> {
546 self.call_sp("@UpdateClasses", volt_param!(bs, ""))
547 }
548
549 /// Executes an ad-hoc SQL query.
550 ///
551 /// This is a convenience method that calls the `@AdHoc` system procedure.
552 ///
553 /// # Arguments
554 /// * `sql` - The SQL query string to execute
555 ///
556 /// # Returns
557 /// A receiver that will yield the result `VoltTable` when available.
558 ///
559 /// # Example
560 /// ```no_run
561 /// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
562 ///
563 /// # let opt = NodeOpt {
564 /// # ip_port: IpPort::new("localhost".to_string(), 21212),
565 /// # user: None, pass: None, connect_timeout: None, read_timeout: None,
566 /// # };
567 /// let node = Node::new(opt)?;
568 /// let rx = node.query("SELECT COUNT(*) FROM users")?;
569 /// let result = block_for_result(&rx)?;
570 /// # Ok::<(), voltdb_client_rust::VoltError>(())
571 /// ```
572 pub fn query(&self, sql: &str) -> Result<Receiver<VoltTable>, VoltError> {
573 let zero_vec: Vec<&dyn Value> = vec![&sql];
574 self.call_sp("@AdHoc", zero_vec)
575 }
576
577 /// Sends a ping to the VoltDB server.
578 ///
579 /// This can be used to keep the connection alive or verify connectivity.
580 /// The ping response is handled internally and not returned to the caller.
581 pub fn ping(&self) -> Result<(), VoltError> {
582 let zero_vec: Vec<&dyn Value> = Vec::new();
583 let mut proc = new_procedure_invocation(PING_HANDLE, false, &zero_vec, "@Ping");
584 let bs = proc.bytes();
585
586 // Acquire write lock (spin until acquired)
587 while self
588 .write_lock
589 .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
590 .is_err()
591 {
592 std::hint::spin_loop();
593 }
594
595 let result = {
596 let mut stream_guard = self.write_stream.lock()?;
597 match stream_guard.as_mut() {
598 None => Err(VoltError::ConnectionNotAvailable),
599 Some(stream) => {
600 stream.write_all(&bs)?;
601 Ok(())
602 }
603 }
604 };
605
606 // Release write lock
607 self.write_lock.store(false, Ordering::Release);
608
609 result
610 }
611
612 /// Reads and processes a single message from the TCP stream.
613 ///
614 /// # Arguments
615 /// * `tcp` - The TCP stream to read from
616 /// * `requests` - Map of pending requests awaiting responses
617 /// * `buffer` - Reusable buffer for reading message data (reduces allocations)
618 fn job(
619 tcp: &mut impl Read,
620 requests: &Arc<Mutex<PendingRequests>>,
621 buffer: &mut Vec<u8>,
622 ) -> Result<(), VoltError> {
623 // Read message length (4 bytes, big-endian)
624 let msg_len = tcp.read_u32::<BigEndian>()?;
625 if msg_len == 0 {
626 return Ok(());
627 }
628
629 // Reuse buffer: resize if needed, but capacity is retained
630 buffer.resize(msg_len as usize, 0);
631 tcp.read_exact(buffer)?;
632
633 let mut res = ByteBuffer::from_bytes(buffer);
634 // Skip protocol version byte (always 0 for current protocol)
635 let _ = res.read_u8()?;
636 let handle = res.read_i64()?;
637
638 if handle == PING_HANDLE {
639 return Ok(()); // Ping response, nothing else to do
640 }
641
642 if let Some(sender) = requests.lock()?.remove(&handle) {
643 let info = VoltResponseInfo::new(&mut res, handle)?;
644 let table = new_volt_table(&mut res, info)?;
645 // Ignore send error - receiver may have been dropped if caller
646 // timed out or cancelled the request
647 let _ = sender.send(table);
648 }
649
650 Ok(())
651 }
652 /// Gracefully shuts down the connection.
653 ///
654 /// This stops the background listener thread and closes the TCP connection.
655 /// The `Node` will be unusable after calling this method.
656 ///
657 /// Note: This is automatically called when the `Node` is dropped.
658 pub fn shutdown(&mut self) -> Result<(), VoltError> {
659 {
660 let mut stop = self.stop.lock()?;
661 *stop = true;
662 } // release stop lock before any join — listener thread needs it
663
664 let mut stream_guard = self.write_stream.lock()?;
665 if let Some(stream) = stream_guard.take() {
666 let _ = stream.shutdown(Shutdown::Both);
667 }
668 drop(stream_guard); // release lock before join
669
670 if let Some(handle) = self.listener_handle.take() {
671 let _ = handle.join();
672 }
673 Ok(())
674 }
675
676 /// Starts the background listener thread for receiving responses.
677 ///
678 /// This is a static method that takes ownership of the read-side stream.
679 fn start_listener(
680 mut tcp: TcpStream,
681 requests: Arc<Mutex<PendingRequests>>,
682 stopping: Arc<Mutex<bool>>,
683 ) -> thread::JoinHandle<()> {
684 thread::spawn(move || {
685 // Reusable buffer to reduce allocation pressure.
686 // Starts with 4KB capacity, grows as needed but rarely shrinks.
687 let mut buffer = Vec::with_capacity(4096);
688
689 loop {
690 // Check stop flag before blocking on read
691 let should_stop = stopping
692 .lock()
693 .unwrap_or_else(|poisoned| poisoned.into_inner());
694 if *should_stop {
695 break;
696 }
697 drop(should_stop); // Release lock before blocking on I/O
698
699 if let Err(_err) = Node::job(&mut tcp, &requests, &mut buffer) {
700 // Timeout errors are expected — the read timeout lets us
701 // periodically check the stop flag. Just loop back.
702 if let VoltError::Io(ref io_err) = _err {
703 if io_err.kind() == std::io::ErrorKind::WouldBlock
704 || io_err.kind() == std::io::ErrorKind::TimedOut
705 {
706 continue;
707 }
708 }
709 // Only log non-timeout errors if we're not intentionally stopping
710 let is_stopping = stopping
711 .lock()
712 .unwrap_or_else(|poisoned| poisoned.into_inner());
713 if !*is_stopping {
714 node_error!(error = %_err, "VoltDB listener error");
715 }
716 break; // Exit on non-timeout errors
717 }
718 }
719 })
720 }
721}
722
723/// Connection metadata returned by the VoltDB server during authentication.
724///
725/// This struct contains information about the server that the client connected to,
726/// including the host ID, connection ID, and cluster leader address.
727#[derive(Debug, Clone)]
728pub struct ConnInfo {
729 /// The ID of the host in the VoltDB cluster.
730 pub host_id: i32,
731 /// Unique connection identifier assigned by the server.
732 pub connection: i64,
733 /// IPv4 address of the cluster leader node.
734 pub leader_addr: Ipv4Addr,
735 /// VoltDB server build/version string.
736 pub build: String,
737}
738
739impl Default for ConnInfo {
740 fn default() -> Self {
741 Self {
742 host_id: 0,
743 connection: 0,
744 leader_addr: Ipv4Addr::new(127, 0, 0, 1),
745 build: String::new(),
746 }
747 }
748}
749
750/// Blocks until a response is received and converts any VoltDB errors.
751///
752/// This is a convenience function for synchronous usage. It waits for the
753/// response on the channel and converts VoltDB-level errors (from the response)
754/// into `VoltError`.
755///
756/// # Arguments
757/// * `res` - The receiver from a query or stored procedure call
758///
759/// # Returns
760/// The result `VoltTable` on success, or `VoltError` if the operation failed.
761///
762/// # Example
763/// ```no_run
764/// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
765///
766/// # let opt = NodeOpt {
767/// # ip_port: IpPort::new("localhost".to_string(), 21212),
768/// # user: None, pass: None, connect_timeout: None, read_timeout: None,
769/// # };
770/// let node = Node::new(opt)?;
771/// let rx = node.query("SELECT * FROM users")?;
772/// let mut table = block_for_result(&rx)?;
773///
774/// while table.advance_row() {
775/// // Process rows...
776/// }
777/// # Ok::<(), voltdb_client_rust::VoltError>(())
778/// ```
779pub fn block_for_result(res: &Receiver<VoltTable>) -> Result<VoltTable, VoltError> {
780 let mut table = res.recv()?;
781 let err = table.has_error();
782 match err {
783 None => Ok(table),
784 Some(err) => Err(err),
785 }
786}
787
788pub fn reset() {}
789
790/// Creates a new connection to a VoltDB server using an address string.
791///
792/// This is a convenience function that parses the address string and creates
793/// a connection with default settings (no authentication, no timeouts).
794///
795/// # Arguments
796/// * `addr` - The server address in "host:port" format (e.g., "localhost:21212")
797///
798/// # Errors
799/// Returns `VoltError::InvalidConfig` if the address cannot be parsed or resolved.
800///
801/// # Example
802/// ```no_run
803/// use voltdb_client_rust::get_node;
804///
805/// let node = get_node("localhost:21212")?;
806/// # Ok::<(), voltdb_client_rust::VoltError>(())
807/// ```
808pub fn get_node(addr: &str) -> Result<Node, VoltError> {
809 let addrs = addr
810 .to_socket_addrs()
811 .map_err(|_| VoltError::InvalidConfig)?;
812
813 let socket_addr = addrs
814 .into_iter()
815 .find(|s| s.is_ipv4())
816 .ok_or(VoltError::InvalidConfig)?;
817
818 let ip_port = IpPort::new(socket_addr.ip().to_string(), socket_addr.port());
819
820 let opt = NodeOpt {
821 ip_port,
822 user: None,
823 pass: None,
824 connect_timeout: None,
825 read_timeout: None,
826 };
827 Node::new(opt)
828}
829
830#[cfg(test)]
831mod tests {
832 use super::*;
833
834 #[test]
835 fn test_opts_builder_basic() {
836 let opts = Opts::builder().host("localhost", 21212).build().unwrap();
837
838 assert_eq!(opts.0.ip_ports.len(), 1);
839 assert_eq!(opts.0.ip_ports[0].ip_host, "localhost");
840 assert_eq!(opts.0.ip_ports[0].port, 21212);
841 assert!(opts.0.user.is_none());
842 assert!(opts.0.pass.is_none());
843 }
844
845 #[test]
846 fn test_opts_builder_with_auth() {
847 let opts = Opts::builder()
848 .host("127.0.0.1", 21211)
849 .user("admin")
850 .password("secret")
851 .build()
852 .unwrap();
853
854 assert_eq!(opts.0.user, Some("admin".to_string()));
855 assert_eq!(opts.0.pass, Some("secret".to_string()));
856 }
857
858 #[test]
859 fn test_opts_builder_multiple_hosts() {
860 let opts = Opts::builder()
861 .host("host1", 21212)
862 .host("host2", 21212)
863 .host("host3", 21212)
864 .build()
865 .unwrap();
866
867 assert_eq!(opts.0.ip_ports.len(), 3);
868 assert_eq!(opts.0.ip_ports[0].ip_host, "host1");
869 assert_eq!(opts.0.ip_ports[1].ip_host, "host2");
870 assert_eq!(opts.0.ip_ports[2].ip_host, "host3");
871 }
872
873 #[test]
874 fn test_opts_builder_with_hosts_vec() {
875 let hosts = vec![
876 IpPort::new("node1".to_string(), 21212),
877 IpPort::new("node2".to_string(), 21213),
878 ];
879 let opts = Opts::builder().hosts(hosts).build().unwrap();
880
881 assert_eq!(opts.0.ip_ports.len(), 2);
882 }
883
884 #[test]
885 fn test_opts_builder_with_timeouts() {
886 let opts = Opts::builder()
887 .host("localhost", 21212)
888 .connect_timeout(Duration::from_secs(10))
889 .read_timeout(Duration::from_secs(30))
890 .build()
891 .unwrap();
892
893 assert_eq!(opts.0.connect_timeout, Some(Duration::from_secs(10)));
894 assert_eq!(opts.0.read_timeout, Some(Duration::from_secs(30)));
895 }
896
897 #[test]
898 fn test_opts_builder_no_hosts_fails() {
899 let result = Opts::builder().build();
900 assert!(result.is_err());
901 match result {
902 Err(VoltError::InvalidConfig) => {}
903 _ => panic!("Expected InvalidConfig error"),
904 }
905 }
906
907 #[test]
908 fn test_opts_new_compatibility() {
909 let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
910 let opts = Opts::new(hosts);
911
912 assert_eq!(opts.0.ip_ports.len(), 1);
913 assert!(opts.0.user.is_none());
914 assert!(opts.0.connect_timeout.is_none());
915 }
916
917 #[test]
918 fn test_ip_port_new() {
919 let ip_port = IpPort::new("192.168.1.1".to_string(), 8080);
920 assert_eq!(ip_port.ip_host, "192.168.1.1");
921 assert_eq!(ip_port.port, 8080);
922 }
923}