voltdb_client_rust/node.rs
1//! # VoltDB Node Connection
2//!
3//! This module provides the core TCP connection handling for communicating with VoltDB servers.
4//!
5//! ## Architecture
6//!
7//! The connection uses a single-threaded TCP listener design:
8//!
9//! - We spawn one dedicated thread to synchronously read from the TcpStream.
10//! - Each incoming message is dispatched via channels to the rest of the application,
11//! allowing users to perform asynchronous operations on the received data.
12//! - Using `Mutex<TcpStream>` for single Stream would introduce blocking and contention,
13//! because locking for each read/write would stall other operations. This design
14//! avoids that while keeping the network I/O simple and efficient.
15//!
16//! ## Timeouts
17//!
18//! The module supports two types of timeouts:
19//!
20//! - **Connection timeout**: Limits how long the initial TCP connection attempt will wait.
21//! Use [`OptsBuilder::connect_timeout`] or set [`NodeOpt::connect_timeout`].
22//! - **Read timeout**: Limits how long socket read operations will wait for data.
23//! This affects both the authentication handshake and the background listener thread.
24//! Use [`OptsBuilder::read_timeout`] or set [`NodeOpt::read_timeout`].
25//!
26//! ## Example
27//!
28//! ```no_run
29//! use voltdb_client_rust::{Opts, Pool};
30//! use std::time::Duration;
31//!
32//! // Create connection options with timeouts
33//! let opts = Opts::builder()
34//! .host("localhost", 21212)
35//! .connect_timeout(Duration::from_secs(5))
36//! .read_timeout(Duration::from_secs(30))
37//! .build()
38//! .unwrap();
39//!
40//! // Create a connection pool
41//! let mut pool = Pool::new(opts)?;
42//! let mut conn = pool.get_conn()?;
43//!
44//! // Execute a query
45//! let result = conn.query("SELECT * FROM my_table")?;
46//! # Ok::<(), voltdb_client_rust::VoltError>(())
47//! ```
48
49use std::collections::HashMap;
50use std::fmt::{Debug, Formatter};
51use std::io::{Read, Write};
52use std::net::{Ipv4Addr, Shutdown, SocketAddr, TcpStream, ToSocketAddrs};
53use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
54use std::sync::mpsc::{Receiver, Sender};
55use std::sync::{Arc, Mutex, mpsc};
56use std::thread;
57use std::time::Duration;
58
59use bytebuffer::ByteBuffer;
60use byteorder::{BigEndian, ReadBytesExt};
61
62use crate::encode::{Value, VoltError};
63use crate::procedure_invocation::new_procedure_invocation;
64use crate::protocol::{PING_HANDLE, build_auth_message, parse_auth_response};
65use crate::response::VoltResponseInfo;
66use crate::table::{VoltTable, new_volt_table};
67use crate::volt_param;
68
69// ============================================================================
70// Logging macros - use tracing if available, otherwise no-op
71// ============================================================================
72
73#[cfg(feature = "tracing")]
74macro_rules! node_error {
75 ($($arg:tt)*) => { tracing::error!($($arg)*) };
76}
77#[cfg(not(feature = "tracing"))]
78macro_rules! node_error {
79 ($($arg:tt)*) => {};
80}
81
82/// Connection options for VoltDB client.
83///
84/// This struct encapsulates all configuration options needed to establish
85/// connections to a VoltDB cluster. Use [`Opts::builder()`] for a fluent
86/// configuration API or [`Opts::new()`] for simple configurations.
87///
88/// # Example
89/// ```no_run
90/// use voltdb_client_rust::{Opts, IpPort};
91///
92/// // Simple configuration
93/// let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
94/// let opts = Opts::new(hosts);
95///
96/// // Or use the builder for more options
97/// let opts = Opts::builder()
98/// .host("localhost", 21212)
99/// .user("admin")
100/// .password("secret")
101/// .build()
102/// .unwrap();
103/// ```
104#[derive(Clone, Eq, PartialEq, Debug)]
105pub struct Opts(pub(crate) Box<InnerOpts>);
106
107/// Host and port pair for VoltDB server connections.
108///
109/// Represents a single VoltDB server endpoint. Multiple `IpPort` instances
110/// can be used with connection pools for cluster connectivity.
111///
112/// # Example
113/// ```
114/// use voltdb_client_rust::IpPort;
115///
116/// let endpoint = IpPort::new("192.168.1.100".to_string(), 21212);
117/// ```
118#[derive(Debug, Clone, Eq, PartialEq)]
119pub struct IpPort {
120 pub(crate) ip_host: String,
121 pub(crate) port: u16,
122}
123
124impl IpPort {
125 /// Creates a new `IpPort` with the given hostname/IP and port.
126 ///
127 /// # Arguments
128 /// * `ip_host` - Hostname or IP address of the VoltDB server
129 /// * `port` - Port number (typically 21212 for client connections)
130 pub fn new(ip_host: String, port: u16) -> Self {
131 IpPort { ip_host, port }
132 }
133}
134
135impl Opts {
136 /// Creates connection options with the given hosts and default settings.
137 ///
138 /// This is a convenience constructor for simple configurations without
139 /// authentication or timeouts. For more control, use [`Opts::builder()`].
140 ///
141 /// # Arguments
142 /// * `hosts` - List of VoltDB server endpoints to connect to
143 pub fn new(hosts: Vec<IpPort>) -> Opts {
144 Opts(Box::new(InnerOpts {
145 ip_ports: hosts,
146 user: None,
147 pass: None,
148 connect_timeout: None,
149 read_timeout: None,
150 }))
151 }
152
153 /// Creates a new [`OptsBuilder`] for fluent configuration.
154 ///
155 /// # Example
156 /// ```no_run
157 /// use voltdb_client_rust::Opts;
158 /// use std::time::Duration;
159 ///
160 /// let opts = Opts::builder()
161 /// .host("localhost", 21212)
162 /// .connect_timeout(Duration::from_secs(5))
163 /// .build()
164 /// .unwrap();
165 /// ```
166 pub fn builder() -> OptsBuilder {
167 OptsBuilder::default()
168 }
169}
170
171/// Builder for connection options.
172///
173/// # Example
174/// ```no_run
175/// use voltdb_client_rust::{Opts, IpPort};
176/// use std::time::Duration;
177///
178/// let opts = Opts::builder()
179/// .host("localhost", 21212)
180/// .user("admin")
181/// .password("password")
182/// .connect_timeout(Duration::from_secs(10))
183/// .build()
184/// .unwrap();
185/// ```
186#[derive(Debug, Clone, Default)]
187pub struct OptsBuilder {
188 hosts: Vec<IpPort>,
189 user: Option<String>,
190 pass: Option<String>,
191 connect_timeout: Option<Duration>,
192 read_timeout: Option<Duration>,
193}
194
195impl OptsBuilder {
196 /// Add a host to connect to.
197 pub fn host(mut self, ip: &str, port: u16) -> Self {
198 self.hosts.push(IpPort::new(ip.to_string(), port));
199 self
200 }
201
202 /// Add multiple hosts to connect to.
203 pub fn hosts(mut self, hosts: Vec<IpPort>) -> Self {
204 self.hosts.extend(hosts);
205 self
206 }
207
208 /// Set the username for authentication.
209 pub fn user(mut self, user: &str) -> Self {
210 self.user = Some(user.to_string());
211 self
212 }
213
214 /// Set the password for authentication.
215 pub fn password(mut self, pass: &str) -> Self {
216 self.pass = Some(pass.to_string());
217 self
218 }
219
220 /// Set connection timeout.
221 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
222 self.connect_timeout = Some(timeout);
223 self
224 }
225
226 /// Set read timeout for socket operations.
227 pub fn read_timeout(mut self, timeout: Duration) -> Self {
228 self.read_timeout = Some(timeout);
229 self
230 }
231
232 /// Build the Opts.
233 ///
234 /// Returns an error if no hosts are configured.
235 pub fn build(self) -> Result<Opts, VoltError> {
236 if self.hosts.is_empty() {
237 return Err(VoltError::InvalidConfig);
238 }
239 Ok(Opts(Box::new(InnerOpts {
240 ip_ports: self.hosts,
241 user: self.user,
242 pass: self.pass,
243 connect_timeout: self.connect_timeout,
244 read_timeout: self.read_timeout,
245 })))
246 }
247}
248
249#[derive(Debug, Clone, Eq, PartialEq)]
250pub(crate) struct InnerOpts {
251 pub(crate) ip_ports: Vec<IpPort>,
252 pub(crate) user: Option<String>,
253 pub(crate) pass: Option<String>,
254 pub(crate) connect_timeout: Option<Duration>,
255 pub(crate) read_timeout: Option<Duration>,
256}
257
258/// Options for creating a single [`Node`] connection.
259///
260/// This struct holds the connection parameters for establishing a TCP connection
261/// to a VoltDB server node.
262///
263/// # Example
264/// ```no_run
265/// use voltdb_client_rust::{NodeOpt, IpPort};
266/// use std::time::Duration;
267///
268/// let opt = NodeOpt {
269/// ip_port: IpPort::new("localhost".to_string(), 21212),
270/// user: Some("admin".to_string()),
271/// pass: Some("password".to_string()),
272/// connect_timeout: Some(Duration::from_secs(10)),
273/// read_timeout: Some(Duration::from_secs(30)),
274/// };
275/// ```
276pub struct NodeOpt {
277 /// The host and port to connect to.
278 pub ip_port: IpPort,
279 /// Optional username for authentication.
280 pub user: Option<String>,
281 /// Optional password for authentication.
282 pub pass: Option<String>,
283 /// Connection timeout. If `None`, the connection attempt will block indefinitely.
284 pub connect_timeout: Option<Duration>,
285 /// Read timeout for socket operations. If `None`, reads will block indefinitely.
286 /// This affects the background listener thread that receives responses from the server.
287 pub read_timeout: Option<Duration>,
288}
289
290/// Type alias for the pending requests map.
291/// Maps request handles to their response channels.
292type PendingRequests = HashMap<i64, Sender<VoltTable>>;
293
294/// Marker trait for VoltDB connections.
295///
296/// Implemented by both synchronous [`Node`] and async `AsyncNode` connections.
297pub trait Connection: Sync + Send + 'static {}
298
299/// A single TCP connection to a VoltDB server node.
300///
301/// `Node` represents a persistent, stateful TCP connection used to execute
302/// stored procedures and queries against a VoltDB cluster. Each `Node`
303/// maintains its own socket and spawns a dedicated background thread to
304/// asynchronously receive and dispatch responses from the server.
305///
306/// # Concurrency
307///
308/// `Node` is safe to use concurrently and supports multiple in-flight requests
309/// over the same connection. For automatic reconnection, load balancing, or
310/// managing multiple connections, use [`crate::Pool`].
311///
312/// # Example
313///
314/// ```no_run
315/// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
316///
317/// let opt = NodeOpt {
318/// ip_port: IpPort::new("localhost".to_string(), 21212),
319/// user: None,
320/// pass: None,
321/// connect_timeout: None,
322/// read_timeout: None,
323/// };
324///
325/// let node = Node::new(opt)?;
326/// let rx = node.query("SELECT * FROM my_table")?;
327/// let table = block_for_result(&rx)?;
328/// # Ok::<(), voltdb_client_rust::VoltError>(())
329/// ```
330#[allow(dead_code)]
331pub struct Node {
332 /// Write-side of the TCP stream, protected by a mutex for thread-safe writes.
333 /// Multiple threads can call query/call_sp concurrently; writes are serialized.
334 write_stream: Mutex<Option<TcpStream>>,
335 info: ConnInfo,
336 /// Pending requests awaiting responses. Uses Mutex instead of RwLock since
337 /// both insert (main thread) and remove (listener thread) require exclusive access.
338 requests: Arc<Mutex<PendingRequests>>,
339 stop: Arc<Mutex<bool>>,
340 counter: AtomicI64,
341 /// Simple atomic lock for write operations. True = locked, False = unlocked.
342 write_lock: AtomicBool,
343}
344
345impl Debug for Node {
346 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
347 write!(f, "Pending request: {}", 1)
348 }
349}
350
351impl Drop for Node {
352 fn drop(&mut self) {
353 let res = self.shutdown();
354 match res {
355 Ok(_) => {}
356 Err(_e) => {
357 node_error!(error = ?_e, "error during node shutdown");
358 }
359 }
360 }
361}
362
363impl Connection for Node {}
364
365impl Node {
366 /// Creates a new connection to a VoltDB server node.
367 ///
368 /// This method establishes a TCP connection to the specified host/port,
369 /// performs authentication, and spawns a background listener thread for
370 /// receiving asynchronous responses.
371 ///
372 /// # Arguments
373 /// * `opt` - Connection options including host, port, credentials, and timeouts.
374 ///
375 /// # Timeouts
376 /// * `connect_timeout` - If set, limits how long the connection attempt will wait.
377 /// If not set, the connection attempt blocks indefinitely.
378 /// * `read_timeout` - If set, socket read operations will timeout after this duration.
379 /// This affects both the authentication phase and the background listener thread.
380 ///
381 /// # Errors
382 /// Returns `VoltError` if:
383 /// * The connection cannot be established (network error or timeout)
384 /// * DNS resolution fails
385 /// * Authentication fails
386 /// * The server rejects the connection
387 ///
388 /// # Example
389 /// ```no_run
390 /// use voltdb_client_rust::{Node, NodeOpt, IpPort};
391 /// use std::time::Duration;
392 ///
393 /// let opt = NodeOpt {
394 /// ip_port: IpPort::new("localhost".to_string(), 21212),
395 /// user: None,
396 /// pass: None,
397 /// connect_timeout: Some(Duration::from_secs(5)),
398 /// read_timeout: Some(Duration::from_secs(30)),
399 /// };
400 /// let node = Node::new(opt)?;
401 /// # Ok::<(), voltdb_client_rust::VoltError>(())
402 /// ```
403 pub fn new(opt: NodeOpt) -> Result<Node, VoltError> {
404 let ip_host = opt.ip_port;
405 let addr_str = format!("{}:{}", ip_host.ip_host, ip_host.port);
406
407 // Build authentication message using shared protocol code
408 let auth_msg = build_auth_message(opt.user.as_deref(), opt.pass.as_deref())?;
409
410 // Connect to server with optional timeout
411 let mut stream: TcpStream = match opt.connect_timeout {
412 Some(timeout) => {
413 // Resolve address for connect_timeout (requires SocketAddr)
414 let socket_addr: SocketAddr = addr_str
415 .to_socket_addrs()
416 .map_err(|_| VoltError::InvalidConfig)?
417 .find(|s| s.is_ipv4())
418 .ok_or(VoltError::InvalidConfig)?;
419 TcpStream::connect_timeout(&socket_addr, timeout)?
420 }
421 None => TcpStream::connect(&addr_str)?,
422 };
423
424 // Set read timeout if configured
425 if let Some(read_timeout) = opt.read_timeout {
426 stream.set_read_timeout(Some(read_timeout))?;
427 }
428
429 // Send auth request
430 stream.write_all(&auth_msg)?;
431 stream.flush()?;
432
433 // Read auth response
434 let read = stream.read_u32::<BigEndian>()?;
435 let mut all = vec![0; read as usize];
436 stream.read_exact(&mut all)?;
437
438 // Parse auth response using shared protocol code
439 let info = parse_auth_response(&all)?;
440
441 // Clone the stream for the read side (listener thread)
442 let read_stream = stream.try_clone()?;
443
444 let requests = Arc::new(Mutex::new(HashMap::new()));
445 let stop = Arc::new(Mutex::new(false));
446
447 // Start the listener thread with the read side
448 Self::start_listener(read_stream, Arc::clone(&requests), Arc::clone(&stop));
449
450 Ok(Node {
451 stop,
452 write_stream: Mutex::new(Some(stream)),
453 info,
454 requests,
455 counter: AtomicI64::new(1),
456 write_lock: AtomicBool::new(false),
457 })
458 }
459 /// Returns the next unique sequence number for request tracking.
460 ///
461 /// Each request to VoltDB uses a unique handle (sequence number) for
462 /// matching responses to requests.
463 pub fn get_sequence(&self) -> i64 {
464 self.counter.fetch_add(1, Ordering::Relaxed)
465 }
466
467 /// Lists all stored procedures available in the VoltDB database.
468 ///
469 /// This calls the `@SystemCatalog` system procedure with "PROCEDURES" argument.
470 ///
471 /// # Returns
472 /// A receiver that will yield a `VoltTable` containing procedure metadata.
473 pub fn list_procedures(&self) -> Result<Receiver<VoltTable>, VoltError> {
474 self.call_sp("@SystemCatalog", volt_param!("PROCEDURES"))
475 }
476
477 /// Executes a stored procedure with the given parameters.
478 ///
479 /// # Arguments
480 /// * `query` - The name of the stored procedure (e.g., "@AdHoc", "MyProcedure")
481 /// * `param` - Vector of parameter values. Use [`volt_param!`] macro for convenience.
482 ///
483 /// # Returns
484 /// A receiver that will yield the result `VoltTable` when available.
485 ///
486 /// # Example
487 /// ```no_run
488 /// use voltdb_client_rust::{Node, NodeOpt, IpPort, Value, block_for_result, volt_param};
489 ///
490 /// # let opt = NodeOpt {
491 /// # ip_port: IpPort::new("localhost".to_string(), 21212),
492 /// # user: None, pass: None, connect_timeout: None, read_timeout: None,
493 /// # };
494 /// let node = Node::new(opt)?;
495 /// let id = 1i32;
496 /// let name = "test".to_string();
497 /// let rx = node.call_sp("MyProcedure", volt_param![id, name])?;
498 /// let result = block_for_result(&rx)?;
499 /// # Ok::<(), voltdb_client_rust::VoltError>(())
500 /// ```
501 pub fn call_sp(
502 &self,
503 query: &str,
504 param: Vec<&dyn Value>,
505 ) -> Result<Receiver<VoltTable>, VoltError> {
506 let handle = self.get_sequence();
507 let mut proc = new_procedure_invocation(handle, false, ¶m, query);
508 let (tx, rx): (Sender<VoltTable>, Receiver<VoltTable>) = mpsc::channel();
509
510 // Register the response channel before sending the request
511 self.requests.lock()?.insert(handle, tx);
512 let bs = proc.bytes();
513 // Write to stream while holding the lock
514 let result = {
515 let mut stream_guard = self.write_stream.lock()?;
516 match stream_guard.as_mut() {
517 None => Err(VoltError::ConnectionNotAvailable),
518 Some(stream) => {
519 stream.write_all(&bs)?;
520 Ok(rx)
521 }
522 }
523 };
524
525 // Release write lock
526 self.write_lock.store(false, Ordering::Release);
527
528 result
529 }
530
531 /// Uploads a JAR file containing stored procedure classes to VoltDB.
532 ///
533 /// This calls the `@UpdateClasses` system procedure to deploy new classes.
534 ///
535 /// # Arguments
536 /// * `bs` - The JAR file contents as bytes
537 pub fn upload_jar(&self, bs: Vec<u8>) -> Result<Receiver<VoltTable>, VoltError> {
538 self.call_sp("@UpdateClasses", volt_param!(bs, ""))
539 }
540
541 /// Executes an ad-hoc SQL query.
542 ///
543 /// This is a convenience method that calls the `@AdHoc` system procedure.
544 ///
545 /// # Arguments
546 /// * `sql` - The SQL query string to execute
547 ///
548 /// # Returns
549 /// A receiver that will yield the result `VoltTable` when available.
550 ///
551 /// # Example
552 /// ```no_run
553 /// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
554 ///
555 /// # let opt = NodeOpt {
556 /// # ip_port: IpPort::new("localhost".to_string(), 21212),
557 /// # user: None, pass: None, connect_timeout: None, read_timeout: None,
558 /// # };
559 /// let node = Node::new(opt)?;
560 /// let rx = node.query("SELECT COUNT(*) FROM users")?;
561 /// let result = block_for_result(&rx)?;
562 /// # Ok::<(), voltdb_client_rust::VoltError>(())
563 /// ```
564 pub fn query(&self, sql: &str) -> Result<Receiver<VoltTable>, VoltError> {
565 let zero_vec: Vec<&dyn Value> = vec![&sql];
566 self.call_sp("@AdHoc", zero_vec)
567 }
568
569 /// Sends a ping to the VoltDB server.
570 ///
571 /// This can be used to keep the connection alive or verify connectivity.
572 /// The ping response is handled internally and not returned to the caller.
573 pub fn ping(&self) -> Result<(), VoltError> {
574 let zero_vec: Vec<&dyn Value> = Vec::new();
575 let mut proc = new_procedure_invocation(PING_HANDLE, false, &zero_vec, "@Ping");
576 let bs = proc.bytes();
577
578 // Acquire write lock (spin until acquired)
579 while self
580 .write_lock
581 .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
582 .is_err()
583 {
584 std::hint::spin_loop();
585 }
586
587 let result = {
588 let mut stream_guard = self.write_stream.lock()?;
589 match stream_guard.as_mut() {
590 None => Err(VoltError::ConnectionNotAvailable),
591 Some(stream) => {
592 stream.write_all(&bs)?;
593 Ok(())
594 }
595 }
596 };
597
598 // Release write lock
599 self.write_lock.store(false, Ordering::Release);
600
601 result
602 }
603
604 /// Reads and processes a single message from the TCP stream.
605 ///
606 /// # Arguments
607 /// * `tcp` - The TCP stream to read from
608 /// * `requests` - Map of pending requests awaiting responses
609 /// * `buffer` - Reusable buffer for reading message data (reduces allocations)
610 fn job(
611 tcp: &mut impl Read,
612 requests: &Arc<Mutex<PendingRequests>>,
613 buffer: &mut Vec<u8>,
614 ) -> Result<(), VoltError> {
615 // Read message length (4 bytes, big-endian)
616 let msg_len = tcp.read_u32::<BigEndian>()?;
617 if msg_len == 0 {
618 return Ok(());
619 }
620
621 // Reuse buffer: resize if needed, but capacity is retained
622 buffer.resize(msg_len as usize, 0);
623 tcp.read_exact(buffer)?;
624
625 let mut res = ByteBuffer::from_bytes(buffer);
626 // Skip protocol version byte (always 0 for current protocol)
627 let _ = res.read_u8()?;
628 let handle = res.read_i64()?;
629
630 if handle == PING_HANDLE {
631 return Ok(()); // Ping response, nothing else to do
632 }
633
634 if let Some(sender) = requests.lock()?.remove(&handle) {
635 let info = VoltResponseInfo::new(&mut res, handle)?;
636 let table = new_volt_table(&mut res, info)?;
637 // Ignore send error - receiver may have been dropped if caller
638 // timed out or cancelled the request
639 let _ = sender.send(table);
640 }
641
642 Ok(())
643 }
644 /// Gracefully shuts down the connection.
645 ///
646 /// This stops the background listener thread and closes the TCP connection.
647 /// The `Node` will be unusable after calling this method.
648 ///
649 /// Note: This is automatically called when the `Node` is dropped.
650 pub fn shutdown(&mut self) -> Result<(), VoltError> {
651 let mut stop = self.stop.lock()?;
652 *stop = true;
653
654 let mut stream_guard = self.write_stream.lock()?;
655 if let Some(stream) = stream_guard.take() {
656 stream.shutdown(Shutdown::Both)?;
657 }
658 Ok(())
659 }
660
661 /// Starts the background listener thread for receiving responses.
662 ///
663 /// This is a static method that takes ownership of the read-side stream.
664 fn start_listener(
665 mut tcp: TcpStream,
666 requests: Arc<Mutex<PendingRequests>>,
667 stopping: Arc<Mutex<bool>>,
668 ) {
669 thread::spawn(move || {
670 // Reusable buffer to reduce allocation pressure.
671 // Starts with 4KB capacity, grows as needed but rarely shrinks.
672 let mut buffer = Vec::with_capacity(4096);
673
674 loop {
675 // Check stop flag before blocking on read
676 let should_stop = stopping
677 .lock()
678 .unwrap_or_else(|poisoned| poisoned.into_inner());
679 if *should_stop {
680 break;
681 }
682 drop(should_stop); // Release lock before blocking on I/O
683
684 if let Err(_err) = Node::job(&mut tcp, &requests, &mut buffer) {
685 // Only log error if we're not intentionally stopping
686 let is_stopping = stopping
687 .lock()
688 .unwrap_or_else(|poisoned| poisoned.into_inner());
689 if !*is_stopping {
690 node_error!(error = %_err, "VoltDB listener error");
691 }
692 }
693 }
694 });
695 }
696}
697
698/// Connection metadata returned by the VoltDB server during authentication.
699///
700/// This struct contains information about the server that the client connected to,
701/// including the host ID, connection ID, and cluster leader address.
702#[derive(Debug, Clone)]
703pub struct ConnInfo {
704 /// The ID of the host in the VoltDB cluster.
705 pub host_id: i32,
706 /// Unique connection identifier assigned by the server.
707 pub connection: i64,
708 /// IPv4 address of the cluster leader node.
709 pub leader_addr: Ipv4Addr,
710 /// VoltDB server build/version string.
711 pub build: String,
712}
713
714impl Default for ConnInfo {
715 fn default() -> Self {
716 Self {
717 host_id: 0,
718 connection: 0,
719 leader_addr: Ipv4Addr::new(127, 0, 0, 1),
720 build: String::new(),
721 }
722 }
723}
724
725/// Blocks until a response is received and converts any VoltDB errors.
726///
727/// This is a convenience function for synchronous usage. It waits for the
728/// response on the channel and converts VoltDB-level errors (from the response)
729/// into `VoltError`.
730///
731/// # Arguments
732/// * `res` - The receiver from a query or stored procedure call
733///
734/// # Returns
735/// The result `VoltTable` on success, or `VoltError` if the operation failed.
736///
737/// # Example
738/// ```no_run
739/// use voltdb_client_rust::{Node, NodeOpt, IpPort, block_for_result};
740///
741/// # let opt = NodeOpt {
742/// # ip_port: IpPort::new("localhost".to_string(), 21212),
743/// # user: None, pass: None, connect_timeout: None, read_timeout: None,
744/// # };
745/// let node = Node::new(opt)?;
746/// let rx = node.query("SELECT * FROM users")?;
747/// let mut table = block_for_result(&rx)?;
748///
749/// while table.advance_row() {
750/// // Process rows...
751/// }
752/// # Ok::<(), voltdb_client_rust::VoltError>(())
753/// ```
754pub fn block_for_result(res: &Receiver<VoltTable>) -> Result<VoltTable, VoltError> {
755 let mut table = res.recv()?;
756 let err = table.has_error();
757 match err {
758 None => Ok(table),
759 Some(err) => Err(err),
760 }
761}
762
763pub fn reset() {}
764
765/// Creates a new connection to a VoltDB server using an address string.
766///
767/// This is a convenience function that parses the address string and creates
768/// a connection with default settings (no authentication, no timeouts).
769///
770/// # Arguments
771/// * `addr` - The server address in "host:port" format (e.g., "localhost:21212")
772///
773/// # Errors
774/// Returns `VoltError::InvalidConfig` if the address cannot be parsed or resolved.
775///
776/// # Example
777/// ```no_run
778/// use voltdb_client_rust::get_node;
779///
780/// let node = get_node("localhost:21212")?;
781/// # Ok::<(), voltdb_client_rust::VoltError>(())
782/// ```
783pub fn get_node(addr: &str) -> Result<Node, VoltError> {
784 let addrs = addr
785 .to_socket_addrs()
786 .map_err(|_| VoltError::InvalidConfig)?;
787
788 let socket_addr = addrs
789 .into_iter()
790 .find(|s| s.is_ipv4())
791 .ok_or(VoltError::InvalidConfig)?;
792
793 let ip_port = IpPort::new(socket_addr.ip().to_string(), socket_addr.port());
794
795 let opt = NodeOpt {
796 ip_port,
797 user: None,
798 pass: None,
799 connect_timeout: None,
800 read_timeout: None,
801 };
802 Node::new(opt)
803}
804
805#[cfg(test)]
806mod tests {
807 use super::*;
808
809 #[test]
810 fn test_opts_builder_basic() {
811 let opts = Opts::builder().host("localhost", 21212).build().unwrap();
812
813 assert_eq!(opts.0.ip_ports.len(), 1);
814 assert_eq!(opts.0.ip_ports[0].ip_host, "localhost");
815 assert_eq!(opts.0.ip_ports[0].port, 21212);
816 assert!(opts.0.user.is_none());
817 assert!(opts.0.pass.is_none());
818 }
819
820 #[test]
821 fn test_opts_builder_with_auth() {
822 let opts = Opts::builder()
823 .host("127.0.0.1", 21211)
824 .user("admin")
825 .password("secret")
826 .build()
827 .unwrap();
828
829 assert_eq!(opts.0.user, Some("admin".to_string()));
830 assert_eq!(opts.0.pass, Some("secret".to_string()));
831 }
832
833 #[test]
834 fn test_opts_builder_multiple_hosts() {
835 let opts = Opts::builder()
836 .host("host1", 21212)
837 .host("host2", 21212)
838 .host("host3", 21212)
839 .build()
840 .unwrap();
841
842 assert_eq!(opts.0.ip_ports.len(), 3);
843 assert_eq!(opts.0.ip_ports[0].ip_host, "host1");
844 assert_eq!(opts.0.ip_ports[1].ip_host, "host2");
845 assert_eq!(opts.0.ip_ports[2].ip_host, "host3");
846 }
847
848 #[test]
849 fn test_opts_builder_with_hosts_vec() {
850 let hosts = vec![
851 IpPort::new("node1".to_string(), 21212),
852 IpPort::new("node2".to_string(), 21213),
853 ];
854 let opts = Opts::builder().hosts(hosts).build().unwrap();
855
856 assert_eq!(opts.0.ip_ports.len(), 2);
857 }
858
859 #[test]
860 fn test_opts_builder_with_timeouts() {
861 let opts = Opts::builder()
862 .host("localhost", 21212)
863 .connect_timeout(Duration::from_secs(10))
864 .read_timeout(Duration::from_secs(30))
865 .build()
866 .unwrap();
867
868 assert_eq!(opts.0.connect_timeout, Some(Duration::from_secs(10)));
869 assert_eq!(opts.0.read_timeout, Some(Duration::from_secs(30)));
870 }
871
872 #[test]
873 fn test_opts_builder_no_hosts_fails() {
874 let result = Opts::builder().build();
875 assert!(result.is_err());
876 match result {
877 Err(VoltError::InvalidConfig) => {}
878 _ => panic!("Expected InvalidConfig error"),
879 }
880 }
881
882 #[test]
883 fn test_opts_new_compatibility() {
884 let hosts = vec![IpPort::new("localhost".to_string(), 21212)];
885 let opts = Opts::new(hosts);
886
887 assert_eq!(opts.0.ip_ports.len(), 1);
888 assert!(opts.0.user.is_none());
889 assert!(opts.0.connect_timeout.is_none());
890 }
891
892 #[test]
893 fn test_ip_port_new() {
894 let ip_port = IpPort::new("192.168.1.1".to_string(), 8080);
895 assert_eq!(ip_port.ip_host, "192.168.1.1");
896 assert_eq!(ip_port.port, 8080);
897 }
898}