Skip to main content

lnc_client/
client.rs

1use crate::error::{ClientError, Result};
2use crate::tls::TlsClientConfig;
3use bytes::Bytes;
4use lnc_network::{
5    ControlCommand, Frame, FrameType, LWP_HEADER_SIZE, TlsConnector, encode_frame, parse_frame,
6};
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
13use tokio::net::TcpStream;
14use tokio::net::lookup_host;
15use tracing::{debug, trace, warn};
16
17/// Wrapper enum for TCP and TLS streams to avoid dynamic dispatch
18#[allow(clippy::large_enum_variant)]
19pub enum ClientStream {
20    /// Plain TCP connection
21    Tcp(TcpStream),
22    /// TLS-encrypted connection
23    Tls(tokio_rustls::client::TlsStream<TcpStream>),
24}
25
26impl AsyncRead for ClientStream {
27    /// Delegates read readiness directly to the concrete transport so
28    /// buffered frames stay on the Architecture §15 zero-copy path without
29    /// layering additional indirection.
30    fn poll_read(
31        self: Pin<&mut Self>,
32        cx: &mut Context<'_>,
33        buf: &mut ReadBuf<'_>,
34    ) -> Poll<std::io::Result<()>> {
35        match self.get_mut() {
36            ClientStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
37            ClientStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
38        }
39    }
40}
41
42impl AsyncWrite for ClientStream {
43    /// Delegates write readiness to the underlying transport without
44    /// introducing dynamic dispatch, which keeps buffered writes on the
45    /// Section 14 thread-pinned path compliant with Architecture §15's
46    /// loaner-buffer rules.
47    fn poll_write(
48        self: Pin<&mut Self>,
49        cx: &mut Context<'_>,
50        buf: &[u8],
51    ) -> Poll<std::io::Result<usize>> {
52        match self.get_mut() {
53            ClientStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
54            ClientStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
55        }
56    }
57
58    /// Flushes bytes on whichever transport is active so that ingestion
59    /// frames honor the write-buffering guarantees described in Architecture
60    /// §22 without duplicating logic across TCP/TLS paths.
61    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
62        match self.get_mut() {
63            ClientStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
64            ClientStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
65        }
66    }
67
68    /// Propagates orderly shutdown to the concrete stream implementation,
69    /// enabling graceful disconnects per Architecture §14's pinning strategy
70    /// whether the session is plain TCP or TLS.
71    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
72        match self.get_mut() {
73            ClientStream::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
74            ClientStream::Tls(stream) => Pin::new(stream).poll_shutdown(cx),
75        }
76    }
77}
78
79/// Helper to extract error message from a frame payload
80fn extract_error_message(frame: &Frame) -> String {
81    frame
82        .payload
83        .as_ref()
84        .map(|p| String::from_utf8_lossy(p).to_string())
85        .unwrap_or_else(|| "Unknown error".to_string())
86}
87
88/// Helper to validate frame type and extract error if present
89#[allow(dead_code)] // Reserved for future protocol extensions
90fn expect_frame_type(frame: Frame, expected: ControlCommand, expected_name: &str) -> Result<Frame> {
91    match frame.frame_type {
92        FrameType::Control(cmd) if cmd == expected => Ok(frame),
93        FrameType::Control(ControlCommand::ErrorResponse) => {
94            Err(ClientError::ServerError(extract_error_message(&frame)))
95        },
96        other => Err(ClientError::InvalidResponse(format!(
97            "Expected {}, got {:?}",
98            expected_name, other
99        ))),
100    }
101}
102
103/// Helper to validate frame is a success response (TopicResponse or similar)
104fn expect_success_response(frame: Frame) -> Result<()> {
105    match frame.frame_type {
106        FrameType::Control(ControlCommand::TopicResponse) => Ok(()),
107        FrameType::Control(ControlCommand::ErrorResponse) => {
108            Err(ClientError::ServerError(extract_error_message(&frame)))
109        },
110        other => Err(ClientError::InvalidResponse(format!(
111            "Expected TopicResponse, got {:?}",
112            other
113        ))),
114    }
115}
116
117/// Authentication configuration for client connections
118#[derive(Debug, Clone, Default)]
119pub struct AuthConfig {
120    /// Whether mutual TLS (mTLS) authentication is enabled
121    pub mtls_enabled: bool,
122    /// Path to the client certificate file for mTLS
123    pub client_cert_path: Option<String>,
124    /// Path to the client private key file for mTLS
125    pub client_key_path: Option<String>,
126}
127
128/// Retention configuration for a topic
129#[derive(Debug, Clone, Default)]
130pub struct RetentionInfo {
131    /// Maximum age of messages in seconds (0 = no limit)
132    pub max_age_secs: u64,
133    /// Maximum size of topic data in bytes (0 = no limit)
134    pub max_bytes: u64,
135}
136
137/// Information about a topic
138#[derive(Debug, Clone)]
139pub struct TopicInfo {
140    /// Unique topic identifier
141    pub id: u32,
142    /// Topic name
143    pub name: String,
144    /// Unix timestamp when the topic was created
145    pub created_at: u64,
146    /// Topic identity epoch for stale-id detection
147    pub topic_epoch: u64,
148    /// Retention policy configuration (None = no retention policy set)
149    pub retention: Option<RetentionInfo>,
150}
151
152/// Result of a fetch operation
153#[derive(Debug, Clone)]
154pub struct FetchResult {
155    /// Raw data fetched from the topic
156    pub data: Bytes,
157    /// Offset to use for the next fetch operation
158    pub next_offset: u64,
159    /// Number of bytes returned in this fetch
160    pub bytes_returned: u32,
161    /// Number of records in the fetched data
162    pub record_count: u32,
163}
164
165/// Result of a subscribe operation
166#[derive(Debug, Clone)]
167pub struct SubscribeResult {
168    /// Assigned consumer identifier
169    pub consumer_id: u64,
170    /// Starting offset for consumption
171    pub start_offset: u64,
172}
173
174/// Result of a commit offset operation
175#[derive(Debug, Clone)]
176pub struct CommitResult {
177    /// Consumer identifier that committed the offset
178    pub consumer_id: u64,
179    /// The offset that was successfully committed
180    pub committed_offset: u64,
181}
182
183/// Cluster status information
184#[derive(Debug, Clone)]
185pub struct ClusterStatus {
186    pub node_id: u16,
187    pub is_leader: bool,
188    pub leader_id: Option<u16>,
189    pub current_term: u64,
190    pub node_count: usize,
191    pub healthy_nodes: usize,
192    pub quorum_available: bool,
193    pub peer_states: std::collections::HashMap<u16, String>,
194}
195
196/// Configuration for the LANCE client
197#[derive(Debug, Clone)]
198pub struct ClientConfig {
199    /// Server address to connect to (supports both IP:port and hostname:port)
200    pub addr: String,
201    /// Timeout for establishing connections
202    pub connect_timeout: Duration,
203    /// Timeout for read operations
204    pub read_timeout: Duration,
205    /// Timeout for write operations
206    pub write_timeout: Duration,
207    /// Interval between keepalive messages
208    pub keepalive_interval: Duration,
209    /// Optional TLS configuration for encrypted connections
210    pub tls: Option<TlsClientConfig>,
211}
212
213impl Default for ClientConfig {
214    fn default() -> Self {
215        Self {
216            addr: "127.0.0.1:1992".to_string(),
217            connect_timeout: Duration::from_secs(10),
218            read_timeout: Duration::from_secs(30),
219            write_timeout: Duration::from_secs(10),
220            keepalive_interval: Duration::from_secs(10),
221            tls: None,
222        }
223    }
224}
225
226impl ClientConfig {
227    /// Create a new client configuration with the specified server address
228    ///
229    /// The address can be either an IP:port (e.g., "127.0.0.1:1992") or
230    /// a hostname:port (e.g., "lance.example.com:1992"). DNS resolution
231    /// is performed asynchronously during connection.
232    pub fn new(addr: impl Into<String>) -> Self {
233        Self {
234            addr: addr.into(),
235            ..Default::default()
236        }
237    }
238
239    /// Enables TLS for this configuration so clients can satisfy
240    /// Architecture §14's production security guidance when traversing
241    /// untrusted networks.
242    ///
243    /// # Arguments
244    /// * `tls_config` - Certificates and trust roots passed through to
245    ///   `lnc-network`'s TLS connector.
246    ///
247    /// # Returns
248    /// * `Self` - Updated config allowing fluent builder-style chaining.
249    pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
250        self.tls = Some(tls_config);
251        self
252    }
253
254    /// Check if TLS is enabled
255    pub fn is_tls_enabled(&self) -> bool {
256        self.tls.is_some()
257    }
258}
259
260/// LANCE protocol client for communicating with LANCE servers
261///
262/// Provides methods for ingesting data, managing topics, and consuming records.
263pub struct LanceClient {
264    stream: ClientStream,
265    config: ClientConfig,
266    batch_id: AtomicU64,
267    read_buffer: Vec<u8>,
268    read_offset: usize,
269}
270
271impl LanceClient {
272    /// Resolves an address string (hostname:port or IP:port) to a `SocketAddr`
273    /// so clients can honor Architecture §10.1's Docker-first deployment model
274    /// where hostnames are common.
275    ///
276    /// # Arguments
277    /// * `addr` - Address string such as `"10.0.0.5:1992"` or
278    ///   `"broker.lance:1992"`.
279    ///
280    /// # Returns
281    /// * `Result<SocketAddr>` - Parsed IP:port pair ready for `TcpStream`.
282    ///
283    /// # Errors
284    /// * [`ClientError::ProtocolError`] when DNS resolution fails or produces
285    ///   no usable endpoints.
286    async fn resolve_address(addr: &str) -> Result<SocketAddr> {
287        // First, try parsing as a SocketAddr directly (for IP:port format)
288        if let Ok(socket_addr) = addr.parse::<SocketAddr>() {
289            return Ok(socket_addr);
290        }
291
292        // If direct parsing fails, perform DNS resolution (for hostname:port format)
293        let mut addrs = lookup_host(addr).await.map_err(|e| {
294            ClientError::ProtocolError(format!("DNS resolution failed for '{}': {}", addr, e))
295        })?;
296
297        addrs
298            .next()
299            .ok_or_else(|| ClientError::ProtocolError(format!("No addresses found for '{}'", addr)))
300    }
301
302    /// Connect to LANCE server, automatically using TLS if configured
303    ///
304    /// The address in the config can be either an IP:port or hostname:port.
305    /// DNS resolution is performed automatically for hostnames.
306    pub async fn connect(config: ClientConfig) -> Result<Self> {
307        // If TLS is configured in ClientConfig, use TLS connection
308        if let Some(ref tls_config) = config.tls {
309            return Self::connect_tls(config.clone(), tls_config.clone()).await;
310        }
311
312        debug!(addr = %config.addr, "Connecting to LANCE server");
313
314        // Resolve address (handles both IP:port and hostname:port)
315        let socket_addr = Self::resolve_address(&config.addr).await?;
316        debug!(resolved_addr = %socket_addr, "Resolved server address");
317
318        let stream = tokio::time::timeout(config.connect_timeout, TcpStream::connect(socket_addr))
319            .await
320            .map_err(|_| ClientError::Timeout)?
321            .map_err(ClientError::ConnectionFailed)?;
322
323        stream.set_nodelay(true)?;
324
325        debug!(addr = %config.addr, "Connected to LANCE server");
326
327        Ok(Self {
328            stream: ClientStream::Tcp(stream),
329            config,
330            batch_id: AtomicU64::new(1),
331            read_buffer: vec![0u8; 64 * 1024],
332            read_offset: 0,
333        })
334    }
335
336    /// Connect to LANCE server with TLS encryption
337    ///
338    /// # Arguments
339    /// * `config` - Client configuration with server address (IP:port or hostname:port)
340    /// * `tls_config` - TLS configuration including certificates
341    ///
342    /// # Example
343    /// ```rust,ignore
344    /// use lnc_client::{ClientConfig, TlsClientConfig, LanceClient};
345    ///
346    /// let config = ClientConfig::new("lance.example.com:1992");
347    /// let tls = TlsClientConfig::new()
348    ///     .with_ca_cert("/path/to/ca.pem");
349    ///
350    /// let client = LanceClient::connect_tls(config, tls).await?;
351    /// ```
352    pub async fn connect_tls(config: ClientConfig, tls_config: TlsClientConfig) -> Result<Self> {
353        debug!(addr = %config.addr, "Connecting to LANCE server with TLS");
354
355        // Resolve address (handles both IP:port and hostname:port)
356        let socket_addr = Self::resolve_address(&config.addr).await?;
357        debug!(resolved_addr = %socket_addr, "Resolved server address");
358
359        // First establish TCP connection
360        let tcp_stream =
361            tokio::time::timeout(config.connect_timeout, TcpStream::connect(socket_addr))
362                .await
363                .map_err(|_| ClientError::Timeout)?
364                .map_err(ClientError::ConnectionFailed)?;
365
366        tcp_stream.set_nodelay(true)?;
367
368        // Create TLS connector
369        let network_config = tls_config.to_network_config();
370        let connector =
371            TlsConnector::new(network_config).map_err(|e| ClientError::TlsError(e.to_string()))?;
372
373        // Determine server name for SNI - prefer configured name, then extract hostname from address
374        let server_name = tls_config.server_name.unwrap_or_else(|| {
375            // Extract hostname from address (remove port if present)
376            config
377                .addr
378                .rsplit_once(':')
379                .map(|(host, _)| host.to_string())
380                .unwrap_or_else(|| socket_addr.ip().to_string())
381        });
382
383        // Perform TLS handshake
384        let tls_stream = connector
385            .connect(&server_name, tcp_stream)
386            .await
387            .map_err(|e| ClientError::TlsError(e.to_string()))?;
388
389        debug!(addr = %config.addr, "TLS connection established");
390
391        Ok(Self {
392            stream: ClientStream::Tls(tls_stream),
393            config,
394            batch_id: AtomicU64::new(1),
395            read_buffer: vec![0u8; 64 * 1024],
396            read_offset: 0,
397        })
398    }
399
400    /// Connect to a LANCE server using an address string
401    ///
402    /// The address can be either an IP:port (e.g., "127.0.0.1:1992") or
403    /// a hostname:port (e.g., "lance.example.com:1992").
404    pub async fn connect_to(addr: &str) -> Result<Self> {
405        Self::connect(ClientConfig::new(addr)).await
406    }
407
408    /// Connect to LANCE server with TLS using address string
409    ///
410    /// The address can be either an IP:port (e.g., "127.0.0.1:1992") or
411    /// a hostname:port (e.g., "lance.example.com:1992").
412    pub async fn connect_tls_to(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
413        Self::connect_tls(ClientConfig::new(addr), tls_config).await
414    }
415
416    fn next_batch_id(&self) -> u64 {
417        self.batch_id.fetch_add(1, Ordering::SeqCst)
418    }
419
420    /// Sends an ingest frame to the default topic (ID 0) while preserving the
421    /// Architecture §22 write-buffering guarantees.
422    ///
423    /// # Arguments
424    /// * `payload` - Record batch encoded using the zero-copy LWP format.
425    /// * `record_count` - Logical record total encoded in the frame header.
426    ///
427    /// # Returns
428    /// * `Result<u64>` - Server-assigned batch identifier for the frame.
429    ///
430    /// # Errors
431    /// Propagates [`ClientError::Timeout`] when the write exceeds the configured
432    /// deadline or any framing/connection error surfaced by Tokio.
433    pub async fn send_ingest(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
434        self.send_ingest_to_topic(0, payload, record_count, None)
435            .await
436    }
437
438    /// Sends an ingest frame to a specific topic while attaching metadata
439    /// required by Architecture §22's deferred flush/ack scheme.
440    ///
441    /// # Arguments
442    /// * `topic_id` - Destination topic identifier.
443    /// * `payload` - Zero-copy encoded batch.
444    /// * `record_count` - Logical records contained in `payload`.
445    /// * `_auth_config` - Optional future hook for per-request auth context.
446    ///
447    /// # Returns
448    /// * `Result<u64>` - Batch identifier allocated by the client monotonic
449    ///   counter and echoed back by the server.
450    pub async fn send_ingest_to_topic(
451        &mut self,
452        topic_id: u32,
453        payload: Bytes,
454        record_count: u32,
455        _auth_config: Option<&AuthConfig>,
456    ) -> Result<u64> {
457        let batch_id = self.next_batch_id();
458        let timestamp_ns = std::time::SystemTime::now()
459            .duration_since(std::time::UNIX_EPOCH)
460            .map(|d| d.as_nanos() as u64)
461            .unwrap_or(0);
462
463        let frame =
464            Frame::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, topic_id);
465        let frame_bytes = encode_frame(&frame);
466
467        trace!(
468            batch_id,
469            topic_id,
470            payload_len = frame.payload_length(),
471            "Sending ingest frame"
472        );
473
474        tokio::time::timeout(
475            self.config.write_timeout,
476            self.stream.write_all(&frame_bytes),
477        )
478        .await
479        .map_err(|_| ClientError::Timeout)??;
480
481        Ok(batch_id)
482    }
483
484    /// Sends an ingest request to the default topic and waits for the
485    /// corresponding acknowledgment, mirroring Architecture §22.3 sync gates.
486    ///
487    /// # Arguments
488    /// * `payload` - Ingest batch to transmit.
489    /// * `record_count` - Logical record total for metrics validation.
490    ///
491    /// # Returns
492    /// * `Result<u64>` - The acked batch identifier if the server confirms the
493    ///   write succeeded.
494    pub async fn send_ingest_sync(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
495        self.send_ingest_to_topic_sync(0, payload, record_count, None)
496            .await
497    }
498
499    /// Sends an ingest request to a topic and blocks for server acknowledgment
500    /// so callers can enforce durability or backpressure decisions inline.
501    ///
502    /// # Arguments
503    /// * `topic_id` - Destination topic.
504    /// * `payload` - Zero-copy loaner buffer to transmit.
505    /// * `record_count` - Logical records contained in the batch.
506    /// * `auth_config` - Optional per-request authentication context.
507    ///
508    /// # Returns
509    /// * `Result<u64>` - The acked batch identifier, ensuring sequencing with
510    ///   downstream consumers.
511    pub async fn send_ingest_to_topic_sync(
512        &mut self,
513        topic_id: u32,
514        payload: Bytes,
515        record_count: u32,
516        auth_config: Option<&AuthConfig>,
517    ) -> Result<u64> {
518        let batch_id = self
519            .send_ingest_to_topic(topic_id, payload, record_count, auth_config)
520            .await?;
521        self.wait_for_ack(batch_id).await
522    }
523
524    /// Waits for a specific acknowledgment frame, enforcing Architecture §22's
525    /// deferred flush contract between ingestion and persistence stages.
526    ///
527    /// # Arguments
528    /// * `expected_batch_id` - Identifier produced by the paired send path.
529    ///
530    /// # Returns
531    /// * `Result<u64>` - The acked batch identifier (matching expectation).
532    ///
533    /// # Errors
534    /// Surfaces protocol mismatches, server backpressure, or error frames so
535    /// callers can react immediately.
536    async fn wait_for_ack(&mut self, expected_batch_id: u64) -> Result<u64> {
537        // Defensive drain: if a stale ack (batch_id < expected) is in the read
538        // buffer -- e.g. from a prior forwarding-path mismatch -- skip it and
539        // keep reading until we see the expected batch_id or a non-ack frame.
540        // This prevents a single stale ack from cascading into every subsequent
541        // send on this connection.
542        loop {
543            let frame = self.recv_frame().await?;
544
545            match frame.frame_type {
546                FrameType::Ack => {
547                    let acked_id = frame.batch_id();
548                    if acked_id == expected_batch_id {
549                        trace!(batch_id = acked_id, "Received ack");
550                        return Ok(acked_id);
551                    }
552                    if acked_id < expected_batch_id {
553                        warn!(
554                            expected = expected_batch_id,
555                            received = acked_id,
556                            "Draining stale ack with lower batch_id"
557                        );
558                        continue;
559                    }
560                    // acked_id > expected: this is a protocol violation, not a stale ack.
561                    return Err(ClientError::InvalidResponse(format!(
562                        "Ack batch_id mismatch: sent {}, received {} (ahead)",
563                        expected_batch_id, acked_id
564                    )));
565                },
566                FrameType::Control(ControlCommand::ErrorResponse) => {
567                    let error_msg = frame
568                        .payload
569                        .map(|p| String::from_utf8_lossy(&p).to_string())
570                        .unwrap_or_else(|| "Unknown error".to_string());
571                    return Err(ClientError::ServerError(error_msg));
572                },
573                FrameType::Backpressure => {
574                    warn!("Server signaled backpressure");
575                    return Err(ClientError::ServerBackpressure);
576                },
577                other => {
578                    return Err(ClientError::InvalidResponse(format!(
579                        "Expected Ack, got {:?}",
580                        other
581                    )));
582                },
583            }
584        }
585    }
586
587    /// Receives the next acknowledgment frame and translates server feedback
588    /// (ack, backpressure, or error) into structured [`ClientError`] variants.
589    ///
590    /// # Returns
591    /// * `Result<u64>` - Acked batch identifier if the server confirmed success.
592    ///
593    /// # Errors
594    /// Surfaces [`ClientError::ServerBackpressure`] or
595    /// [`ClientError::InvalidResponse`] when the frame type deviates from the
596    /// Architecture §22 control flow expectations.
597    pub async fn recv_ack(&mut self) -> Result<u64> {
598        let frame = self.recv_frame().await?;
599
600        match frame.frame_type {
601            FrameType::Ack => {
602                trace!(batch_id = frame.batch_id(), "Received ack");
603                Ok(frame.batch_id())
604            },
605            FrameType::Backpressure => {
606                warn!("Server signaled backpressure");
607                Err(ClientError::ServerBackpressure)
608            },
609            other => Err(ClientError::InvalidResponse(format!(
610                "Expected Ack, got {:?}",
611                other
612            ))),
613        }
614    }
615
616    /// Sends a keepalive frame so long-lived clients satisfy Architecture §9.4
617    /// drain/force-exit requirements and keep connection state fresh.
618    ///
619    /// # Returns
620    /// * `Result<()>` - Ok when the frame is flushed before the configured
621    ///   write timeout expires.
622    pub async fn send_keepalive(&mut self) -> Result<()> {
623        let frame = Frame::new_keepalive();
624        let frame_bytes = encode_frame(&frame);
625
626        trace!("Sending keepalive");
627
628        tokio::time::timeout(
629            self.config.write_timeout,
630            self.stream.write_all(&frame_bytes),
631        )
632        .await
633        .map_err(|_| ClientError::Timeout)??;
634
635        Ok(())
636    }
637
638    /// Waits for a keepalive response, guaranteeing the control-plane path is
639    /// still healthy per Architecture §9.4 monitoring requirements.
640    ///
641    /// # Returns
642    /// * `Result<()>` - Ok when the server replies with `FrameType::Keepalive`.
643    ///
644    /// # Errors
645    /// Returns [`ClientError::InvalidResponse`] when any other frame type
646    /// arrives, signaling connection drift.
647    pub async fn recv_keepalive(&mut self) -> Result<()> {
648        let frame = self.recv_frame().await?;
649
650        match frame.frame_type {
651            FrameType::Keepalive => {
652                trace!("Received keepalive response");
653                Ok(())
654            },
655            other => Err(ClientError::InvalidResponse(format!(
656                "Expected Keepalive, got {:?}",
657                other
658            ))),
659        }
660    }
661
662    /// Ping the server and measure round-trip latency
663    pub async fn ping(&mut self) -> Result<Duration> {
664        let start = std::time::Instant::now();
665        self.send_keepalive().await?;
666        self.recv_keepalive().await?;
667        Ok(start.elapsed())
668    }
669
670    /// Create a new topic with the given name
671    pub async fn create_topic(&mut self, name: &str) -> Result<TopicInfo> {
672        const DEFAULT_CREATE_TOPIC_ATTEMPTS: usize = 20;
673        const DEFAULT_CREATE_TOPIC_BACKOFF_MS: u64 = 500;
674        self.ensure_topic(
675            name,
676            DEFAULT_CREATE_TOPIC_ATTEMPTS,
677            DEFAULT_CREATE_TOPIC_BACKOFF_MS,
678        )
679        .await
680    }
681
682    async fn create_topic_once(&mut self, name: &str) -> Result<TopicInfo> {
683        let frame = Frame::new_create_topic(name);
684        let frame_bytes = encode_frame(&frame);
685
686        trace!(topic_name = %name, "Creating topic");
687
688        tokio::time::timeout(
689            self.config.write_timeout,
690            self.stream.write_all(&frame_bytes),
691        )
692        .await
693        .map_err(|_| ClientError::Timeout)??;
694
695        let response = self.recv_frame().await?;
696        self.parse_topic_response(response)
697    }
698
699    /// Ensure a topic exists and return its metadata.
700    ///
701    /// This helper encapsulates common create/list convergence retry behavior so
702    /// application callers (bench/chaos) can stay simple.
703    pub async fn ensure_topic(
704        &mut self,
705        name: &str,
706        max_attempts: usize,
707        base_backoff_ms: u64,
708    ) -> Result<TopicInfo> {
709        let attempts = max_attempts.max(1);
710        let mut last_error: Option<ClientError> = None;
711        let mut saw_retryable_error = false;
712
713        for attempt in 1..=attempts {
714            let mut retryable_this_attempt = false;
715
716            match self.create_topic_once(name).await {
717                Ok(info) => {
718                    trace!(
719                        topic_id = info.id,
720                        topic_name = %info.name,
721                        attempt,
722                        max_attempts = attempts,
723                        "Topic ensured via create_topic"
724                    );
725                    return Ok(info);
726                },
727                Err(create_err) => {
728                    if create_err.is_retryable() {
729                        retryable_this_attempt = true;
730                        saw_retryable_error = true;
731                    }
732                    last_error = Some(ClientError::ServerError(create_err.to_string()));
733                    warn!(
734                        topic_name = %name,
735                        attempt,
736                        max_attempts = attempts,
737                        error = %create_err,
738                        "create_topic failed during ensure_topic; retrying with list fallback"
739                    );
740                },
741            }
742
743            match self.list_topics().await {
744                Ok(topics) => {
745                    if let Some(topic) = topics.into_iter().find(|t| t.name == name) {
746                        trace!(
747                            topic_id = topic.id,
748                            topic_name = %topic.name,
749                            attempt,
750                            max_attempts = attempts,
751                            "Topic ensured via list_topics fallback"
752                        );
753                        return Ok(topic);
754                    }
755                },
756                Err(list_err) => {
757                    if list_err.is_retryable() {
758                        retryable_this_attempt = true;
759                        saw_retryable_error = true;
760                    }
761                    last_error = Some(ClientError::ServerError(list_err.to_string()));
762                    warn!(
763                        topic_name = %name,
764                        attempt,
765                        max_attempts = attempts,
766                        error = %list_err,
767                        "list_topics failed during ensure_topic"
768                    );
769                },
770            }
771
772            if attempt < attempts {
773                let backoff_ms = if retryable_this_attempt {
774                    base_backoff_ms.saturating_mul(attempt as u64).max(1)
775                } else {
776                    // Non-retryable errors are unlikely to heal with long sleeps.
777                    base_backoff_ms.max(1)
778                };
779                tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
780
781                // Refresh connection to improve odds of landing on current leader
782                // after elections and readiness transitions.
783                let reconnect_config = self.config.clone();
784                match Self::connect(reconnect_config).await {
785                    Ok(new_client) => {
786                        *self = new_client;
787                    },
788                    Err(reconnect_err) => {
789                        warn!(
790                            topic_name = %name,
791                            attempt,
792                            max_attempts = attempts,
793                            error = %reconnect_err,
794                            "ensure_topic reconnect attempt failed"
795                        );
796                        last_error = Some(reconnect_err);
797                    },
798                }
799            }
800        }
801
802        if let Some(err) = last_error {
803            return Err(ClientError::ServerError(format!(
804                "ensure_topic('{}') failed after {} attempts: {}",
805                name, attempts, err
806            )));
807        }
808
809        if saw_retryable_error {
810            return Err(ClientError::ServerError(format!(
811                "ensure_topic('{}') exhausted {} retryable attempts",
812                name, attempts
813            )));
814        }
815
816        Err(ClientError::ServerError(format!(
817            "topic '{}' not found after {} ensure_topic attempts",
818            name, attempts
819        )))
820    }
821
822    /// Ensure topic with standard retry profile suitable for benchmark/chaos tools.
823    pub async fn ensure_topic_default(&mut self, name: &str) -> Result<TopicInfo> {
824        const DEFAULT_ENSURE_TOPIC_ATTEMPTS: usize = 20;
825        const DEFAULT_ENSURE_TOPIC_BACKOFF_MS: u64 = 500;
826        self.ensure_topic(
827            name,
828            DEFAULT_ENSURE_TOPIC_ATTEMPTS,
829            DEFAULT_ENSURE_TOPIC_BACKOFF_MS,
830        )
831        .await
832    }
833
834    /// List all topics on the server
835    pub async fn list_topics(&mut self) -> Result<Vec<TopicInfo>> {
836        let frame = Frame::new_list_topics();
837        let frame_bytes = encode_frame(&frame);
838
839        trace!("Listing topics");
840
841        tokio::time::timeout(
842            self.config.write_timeout,
843            self.stream.write_all(&frame_bytes),
844        )
845        .await
846        .map_err(|_| ClientError::Timeout)??;
847
848        let response = self.recv_frame().await?;
849        self.parse_topic_list_response(response)
850    }
851
852    /// Get information about a specific topic
853    pub async fn get_topic(&mut self, topic_id: u32) -> Result<TopicInfo> {
854        let frame = Frame::new_get_topic(topic_id);
855        let frame_bytes = encode_frame(&frame);
856
857        trace!(topic_id, "Getting topic");
858
859        tokio::time::timeout(
860            self.config.write_timeout,
861            self.stream.write_all(&frame_bytes),
862        )
863        .await
864        .map_err(|_| ClientError::Timeout)??;
865
866        let response = self.recv_frame().await?;
867        self.parse_topic_response(response)
868    }
869
870    /// Delete a topic by its ID
871    pub async fn delete_topic(&mut self, topic_id: u32) -> Result<()> {
872        let frame = Frame::new_delete_topic(topic_id);
873        let frame_bytes = encode_frame(&frame);
874
875        trace!(topic_id, "Deleting topic");
876
877        tokio::time::timeout(
878            self.config.write_timeout,
879            self.stream.write_all(&frame_bytes),
880        )
881        .await
882        .map_err(|_| ClientError::Timeout)??;
883
884        let response = self.recv_frame().await?;
885        self.parse_delete_response(response)
886    }
887
888    /// Sets the retention policy for an existing topic, mirroring the
889    /// configuration model documented in Architecture §12.7.
890    ///
891    /// # Arguments
892    /// * `topic_id` - Topic identifier
893    /// * `max_age_secs` - Maximum age in seconds (0 = no limit)
894    /// * `max_bytes` - Maximum size in bytes (0 = no limit)
895    pub async fn set_retention(
896        &mut self,
897        topic_id: u32,
898        max_age_secs: u64,
899        max_bytes: u64,
900    ) -> Result<()> {
901        let frame = Frame::new_set_retention(topic_id, max_age_secs, max_bytes);
902        let frame_bytes = encode_frame(&frame);
903
904        trace!(
905            topic_id,
906            max_age_secs, max_bytes, "Setting retention policy"
907        );
908
909        tokio::time::timeout(
910            self.config.write_timeout,
911            self.stream.write_all(&frame_bytes),
912        )
913        .await
914        .map_err(|_| ClientError::Timeout)??;
915
916        let response = self.recv_frame().await?;
917        self.parse_retention_response(response)
918    }
919
920    /// Create a topic with retention policy in a single operation
921    ///
922    /// # Arguments
923    /// * `name` - Topic name
924    /// * `max_age_secs` - Maximum age in seconds (0 = no limit)
925    /// * `max_bytes` - Maximum size in bytes (0 = no limit)
926    pub async fn create_topic_with_retention(
927        &mut self,
928        name: &str,
929        max_age_secs: u64,
930        max_bytes: u64,
931    ) -> Result<TopicInfo> {
932        let frame = Frame::new_create_topic_with_retention(name, max_age_secs, max_bytes);
933        let frame_bytes = encode_frame(&frame);
934
935        trace!(
936            name,
937            max_age_secs, max_bytes, "Creating topic with retention"
938        );
939
940        tokio::time::timeout(
941            self.config.write_timeout,
942            self.stream.write_all(&frame_bytes),
943        )
944        .await
945        .map_err(|_| ClientError::Timeout)??;
946
947        let response = self.recv_frame().await?;
948        self.parse_topic_response(response)
949    }
950
951    /// Get cluster status and health information
952    pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus> {
953        let frame = Frame::new_get_cluster_status();
954        let frame_bytes = encode_frame(&frame);
955
956        tokio::time::timeout(
957            self.config.write_timeout,
958            self.stream.write_all(&frame_bytes),
959        )
960        .await
961        .map_err(|_| ClientError::Timeout)??;
962
963        let response = self.recv_frame().await?;
964        self.parse_cluster_status_response(response)
965    }
966
967    fn parse_cluster_status_response(&self, frame: Frame) -> Result<ClusterStatus> {
968        match frame.frame_type {
969            FrameType::Control(ControlCommand::ClusterStatusResponse) => {
970                let payload = frame.payload.ok_or_else(|| {
971                    ClientError::InvalidResponse("Empty cluster status response".to_string())
972                })?;
973                let json: serde_json::Value = serde_json::from_slice(&payload)
974                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
975
976                let peer_states: std::collections::HashMap<u16, String> = json["peer_states"]
977                    .as_object()
978                    .map(|obj| {
979                        obj.iter()
980                            .filter_map(|(k, v)| {
981                                k.parse::<u16>()
982                                    .ok()
983                                    .map(|id| (id, v.as_str().unwrap_or("unknown").to_string()))
984                            })
985                            .collect()
986                    })
987                    .unwrap_or_default();
988
989                Ok(ClusterStatus {
990                    node_id: json["node_id"].as_u64().unwrap_or(0) as u16,
991                    is_leader: json["is_leader"].as_bool().unwrap_or(false),
992                    leader_id: json["leader_id"].as_u64().map(|id| id as u16),
993                    current_term: json["current_term"].as_u64().unwrap_or(0),
994                    node_count: json["node_count"].as_u64().unwrap_or(1) as usize,
995                    healthy_nodes: json["healthy_nodes"].as_u64().unwrap_or(1) as usize,
996                    quorum_available: json["quorum_available"].as_bool().unwrap_or(true),
997                    peer_states,
998                })
999            },
1000            FrameType::Control(ControlCommand::ErrorResponse) => {
1001                let error_msg = frame
1002                    .payload
1003                    .map(|p| String::from_utf8_lossy(&p).to_string())
1004                    .unwrap_or_else(|| "Unknown error".to_string());
1005                Err(ClientError::ServerError(error_msg))
1006            },
1007            other => Err(ClientError::InvalidResponse(format!(
1008                "Expected ClusterStatusResponse, got {:?}",
1009                other
1010            ))),
1011        }
1012    }
1013
1014    /// Fetch data from a topic starting at the given offset
1015    /// Returns (data, next_offset, record_count)
1016    pub async fn fetch(
1017        &mut self,
1018        topic_id: u32,
1019        start_offset: u64,
1020        max_bytes: u32,
1021    ) -> Result<FetchResult> {
1022        let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
1023        let frame_bytes = encode_frame(&frame);
1024
1025        trace!(topic_id, start_offset, max_bytes, "Fetching data");
1026
1027        tokio::time::timeout(
1028            self.config.write_timeout,
1029            self.stream.write_all(&frame_bytes),
1030        )
1031        .await
1032        .map_err(|_| ClientError::Timeout)??;
1033
1034        let response = self.recv_frame().await?;
1035        self.parse_fetch_response(response)
1036    }
1037
1038    /// Subscribes to a topic for streaming consumption, honoring the consumer
1039    /// coordination model described in Architecture §20.
1040    ///
1041    /// # Arguments
1042    /// * `topic_id` - Topic to follow.
1043    /// * `start_offset` - First logical offset to deliver.
1044    /// * `max_batch_bytes` - Maximum payload size per delivery window.
1045    /// * `consumer_id` - Stable consumer identifier for server-side tracking.
1046    ///
1047    /// # Returns
1048    /// * `Result<SubscribeResult>` - Contains the confirmed consumer_id and
1049    ///   start offset granted by the server.
1050    ///
1051    /// # Errors
1052    /// Surfaces timeouts, protocol violations, or server error frames (e.g.,
1053    /// `ControlCommand::ErrorResponse`).
1054    pub async fn subscribe(
1055        &mut self,
1056        topic_id: u32,
1057        start_offset: u64,
1058        max_batch_bytes: u32,
1059        consumer_id: u64,
1060    ) -> Result<SubscribeResult> {
1061        let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
1062        let frame_bytes = encode_frame(&frame);
1063
1064        trace!(topic_id, start_offset, consumer_id, "Subscribing to topic");
1065
1066        tokio::time::timeout(
1067            self.config.write_timeout,
1068            self.stream.write_all(&frame_bytes),
1069        )
1070        .await
1071        .map_err(|_| ClientError::Timeout)??;
1072
1073        let response = self.recv_frame().await?;
1074        self.parse_subscribe_response(response)
1075    }
1076
1077    /// Unsubscribes a consumer from a topic, ensuring server resources are
1078    /// reclaimed per Architecture §20's consumer lifecycle.
1079    ///
1080    /// # Arguments
1081    /// * `topic_id` - Topic to leave.
1082    /// * `consumer_id` - Consumer identifier provided during subscribe.
1083    ///
1084    /// # Returns
1085    /// * `Result<()>` - Ok when the server acknowledges the unsubscribe.
1086    ///
1087    /// # Errors
1088    /// Propagates [`ClientError::ServerError`] if the server rejects the
1089    /// request or [`ClientError::InvalidResponse`] if a non-ack frame arrives.
1090    pub async fn unsubscribe(&mut self, topic_id: u32, consumer_id: u64) -> Result<()> {
1091        let frame = Frame::new_unsubscribe(topic_id, consumer_id);
1092        let frame_bytes = encode_frame(&frame);
1093
1094        trace!(topic_id, consumer_id, "Unsubscribing from topic");
1095
1096        tokio::time::timeout(
1097            self.config.write_timeout,
1098            self.stream.write_all(&frame_bytes),
1099        )
1100        .await
1101        .map_err(|_| ClientError::Timeout)??;
1102
1103        // Wait for ack or error
1104        let response = self.recv_frame().await?;
1105        match response.frame_type {
1106            FrameType::Ack => Ok(()),
1107            FrameType::Control(ControlCommand::ErrorResponse) => {
1108                let error_msg = response
1109                    .payload
1110                    .map(|p| String::from_utf8_lossy(&p).to_string())
1111                    .unwrap_or_else(|| "Unknown error".to_string());
1112                Err(ClientError::ServerError(error_msg))
1113            },
1114            other => Err(ClientError::InvalidResponse(format!(
1115                "Expected Ack, got {:?}",
1116                other
1117            ))),
1118        }
1119    }
1120
1121    /// Commit consumer offset for checkpointing
1122    pub async fn commit_offset(
1123        &mut self,
1124        topic_id: u32,
1125        consumer_id: u64,
1126        offset: u64,
1127    ) -> Result<CommitResult> {
1128        let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
1129        let frame_bytes = encode_frame(&frame);
1130
1131        trace!(topic_id, consumer_id, offset, "Committing offset");
1132
1133        tokio::time::timeout(
1134            self.config.write_timeout,
1135            self.stream.write_all(&frame_bytes),
1136        )
1137        .await
1138        .map_err(|_| ClientError::Timeout)??;
1139
1140        let response = self.recv_frame().await?;
1141        self.parse_commit_response(response)
1142    }
1143
1144    fn parse_subscribe_response(&self, frame: Frame) -> Result<SubscribeResult> {
1145        match frame.frame_type {
1146            FrameType::Control(ControlCommand::SubscribeAck) => {
1147                let payload = frame.payload.ok_or_else(|| {
1148                    ClientError::InvalidResponse("Empty subscribe response".to_string())
1149                })?;
1150
1151                if payload.len() < 16 {
1152                    return Err(ClientError::ProtocolError(
1153                        "Subscribe response too small".to_string(),
1154                    ));
1155                }
1156
1157                let consumer_id = u64::from_le_bytes([
1158                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1159                    payload[6], payload[7],
1160                ]);
1161                let start_offset = u64::from_le_bytes([
1162                    payload[8],
1163                    payload[9],
1164                    payload[10],
1165                    payload[11],
1166                    payload[12],
1167                    payload[13],
1168                    payload[14],
1169                    payload[15],
1170                ]);
1171
1172                Ok(SubscribeResult {
1173                    consumer_id,
1174                    start_offset,
1175                })
1176            },
1177            FrameType::Control(ControlCommand::ErrorResponse) => {
1178                let error_msg = frame
1179                    .payload
1180                    .map(|p| String::from_utf8_lossy(&p).to_string())
1181                    .unwrap_or_else(|| "Unknown error".to_string());
1182                Err(ClientError::ServerError(error_msg))
1183            },
1184            other => Err(ClientError::InvalidResponse(format!(
1185                "Expected SubscribeAck, got {:?}",
1186                other
1187            ))),
1188        }
1189    }
1190
1191    fn parse_commit_response(&self, frame: Frame) -> Result<CommitResult> {
1192        match frame.frame_type {
1193            FrameType::Control(ControlCommand::CommitAck) => {
1194                let payload = frame.payload.ok_or_else(|| {
1195                    ClientError::InvalidResponse("Empty commit response".to_string())
1196                })?;
1197
1198                if payload.len() < 16 {
1199                    return Err(ClientError::ProtocolError(
1200                        "Commit response too small".to_string(),
1201                    ));
1202                }
1203
1204                let consumer_id = u64::from_le_bytes([
1205                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1206                    payload[6], payload[7],
1207                ]);
1208                let committed_offset = u64::from_le_bytes([
1209                    payload[8],
1210                    payload[9],
1211                    payload[10],
1212                    payload[11],
1213                    payload[12],
1214                    payload[13],
1215                    payload[14],
1216                    payload[15],
1217                ]);
1218
1219                Ok(CommitResult {
1220                    consumer_id,
1221                    committed_offset,
1222                })
1223            },
1224            FrameType::Control(ControlCommand::ErrorResponse) => {
1225                let error_msg = frame
1226                    .payload
1227                    .map(|p| String::from_utf8_lossy(&p).to_string())
1228                    .unwrap_or_else(|| "Unknown error".to_string());
1229                Err(ClientError::ServerError(error_msg))
1230            },
1231            other => Err(ClientError::InvalidResponse(format!(
1232                "Expected CommitAck, got {:?}",
1233                other
1234            ))),
1235        }
1236    }
1237
1238    fn parse_fetch_response(&self, frame: Frame) -> Result<FetchResult> {
1239        match frame.frame_type {
1240            FrameType::Control(ControlCommand::CatchingUp) => {
1241                let server_offset = frame
1242                    .payload
1243                    .as_ref()
1244                    .filter(|p| p.len() >= 8)
1245                    .map(|p| u64::from_le_bytes([p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7]]))
1246                    .unwrap_or(0);
1247                Err(ClientError::ServerCatchingUp { server_offset })
1248            },
1249            FrameType::Control(ControlCommand::FetchResponse) => {
1250                let payload = frame.payload.ok_or_else(|| {
1251                    ClientError::InvalidResponse("Empty fetch response".to_string())
1252                })?;
1253
1254                if payload.len() < 16 {
1255                    return Err(ClientError::ProtocolError(
1256                        "Fetch response too small".to_string(),
1257                    ));
1258                }
1259
1260                let next_offset = u64::from_le_bytes([
1261                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1262                    payload[6], payload[7],
1263                ]);
1264                let bytes_returned =
1265                    u32::from_le_bytes([payload[8], payload[9], payload[10], payload[11]]);
1266                let record_count =
1267                    u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
1268                let data = payload.slice(16..);
1269
1270                Ok(FetchResult {
1271                    data,
1272                    next_offset,
1273                    bytes_returned,
1274                    record_count,
1275                })
1276            },
1277            FrameType::Control(ControlCommand::ErrorResponse) => {
1278                let error_msg = frame
1279                    .payload
1280                    .map(|p| String::from_utf8_lossy(&p).to_string())
1281                    .unwrap_or_else(|| "Unknown error".to_string());
1282                Err(ClientError::ServerError(error_msg))
1283            },
1284            other => Err(ClientError::InvalidResponse(format!(
1285                "Expected FetchResponse, got {:?}",
1286                other
1287            ))),
1288        }
1289    }
1290
1291    fn parse_delete_response(&self, frame: Frame) -> Result<()> {
1292        expect_success_response(frame)
1293    }
1294
1295    fn parse_retention_response(&self, frame: Frame) -> Result<()> {
1296        expect_success_response(frame)
1297    }
1298
1299    fn parse_topic_response(&self, frame: Frame) -> Result<TopicInfo> {
1300        match frame.frame_type {
1301            FrameType::Control(ControlCommand::TopicResponse) => {
1302                let payload = frame.payload.ok_or_else(|| {
1303                    ClientError::InvalidResponse("Empty topic response".to_string())
1304                })?;
1305                let json: serde_json::Value = serde_json::from_slice(&payload)
1306                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
1307
1308                let retention = if json.get("retention").is_some() {
1309                    Some(RetentionInfo {
1310                        max_age_secs: json["retention"]["max_age_secs"].as_u64().unwrap_or(0),
1311                        max_bytes: json["retention"]["max_bytes"].as_u64().unwrap_or(0),
1312                    })
1313                } else {
1314                    None
1315                };
1316
1317                Ok(TopicInfo {
1318                    id: json["id"].as_u64().unwrap_or(0) as u32,
1319                    name: json["name"].as_str().unwrap_or("").to_string(),
1320                    created_at: json["created_at"].as_u64().unwrap_or(0),
1321                    topic_epoch: json["topic_epoch"].as_u64().unwrap_or(1),
1322                    retention,
1323                })
1324            },
1325            FrameType::Control(ControlCommand::ErrorResponse) => {
1326                let error_msg = frame
1327                    .payload
1328                    .map(|p| String::from_utf8_lossy(&p).to_string())
1329                    .unwrap_or_else(|| "Unknown error".to_string());
1330                Err(ClientError::ServerError(error_msg))
1331            },
1332            other => Err(ClientError::InvalidResponse(format!(
1333                "Expected TopicResponse, got {:?}",
1334                other
1335            ))),
1336        }
1337    }
1338
1339    fn parse_topic_list_response(&self, frame: Frame) -> Result<Vec<TopicInfo>> {
1340        match frame.frame_type {
1341            FrameType::Control(ControlCommand::TopicResponse) => {
1342                let payload = frame.payload.ok_or_else(|| {
1343                    ClientError::InvalidResponse("Empty topic list response".to_string())
1344                })?;
1345                let json: serde_json::Value = serde_json::from_slice(&payload)
1346                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
1347
1348                let topics = json["topics"]
1349                    .as_array()
1350                    .map(|arr| {
1351                        arr.iter()
1352                            .map(|t| {
1353                                let retention = if t.get("retention").is_some() {
1354                                    Some(RetentionInfo {
1355                                        max_age_secs: t["retention"]["max_age_secs"]
1356                                            .as_u64()
1357                                            .unwrap_or(0),
1358                                        max_bytes: t["retention"]["max_bytes"]
1359                                            .as_u64()
1360                                            .unwrap_or(0),
1361                                    })
1362                                } else {
1363                                    None
1364                                };
1365                                TopicInfo {
1366                                    id: t["id"].as_u64().unwrap_or(0) as u32,
1367                                    name: t["name"].as_str().unwrap_or("").to_string(),
1368                                    created_at: t["created_at"].as_u64().unwrap_or(0),
1369                                    topic_epoch: t["topic_epoch"].as_u64().unwrap_or(1),
1370                                    retention,
1371                                }
1372                            })
1373                            .collect()
1374                    })
1375                    .unwrap_or_default();
1376
1377                Ok(topics)
1378            },
1379            FrameType::Control(ControlCommand::ErrorResponse) => {
1380                let error_msg = frame
1381                    .payload
1382                    .map(|p| String::from_utf8_lossy(&p).to_string())
1383                    .unwrap_or_else(|| "Unknown error".to_string());
1384                Err(ClientError::ServerError(error_msg))
1385            },
1386            other => Err(ClientError::InvalidResponse(format!(
1387                "Expected TopicResponse, got {:?}",
1388                other
1389            ))),
1390        }
1391    }
1392
1393    /// Reads the next wire frame into the preallocated buffer, preserving the
1394    /// Architecture §15 zero-copy guarantees while translating parse failures
1395    /// into structured [`ClientError`] values.
1396    ///
1397    /// # Returns
1398    /// * `Result<Frame>` - Fully parsed LWP frame ready for higher-level
1399    ///   ingestion/consumer handlers.
1400    ///
1401    /// # Errors
1402    /// Propagates timeouts, malformed headers, or connection closures so
1403    /// callers can fail fast when the transport becomes unhealthy.
1404    async fn recv_frame(&mut self) -> Result<Frame> {
1405        // Max frame size cap to prevent OOM from malformed headers (16 MB)
1406        const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
1407
1408        loop {
1409            if self.read_offset >= LWP_HEADER_SIZE {
1410                // Grow buffer if the header indicates a payload larger than current capacity
1411                let payload_len = u32::from_le_bytes([
1412                    self.read_buffer[32],
1413                    self.read_buffer[33],
1414                    self.read_buffer[34],
1415                    self.read_buffer[35],
1416                ]) as usize;
1417                let total_frame_size = LWP_HEADER_SIZE + payload_len;
1418                if total_frame_size > MAX_FRAME_SIZE {
1419                    return Err(ClientError::ServerError(format!(
1420                        "Frame too large: {} bytes",
1421                        total_frame_size
1422                    )));
1423                }
1424                if total_frame_size > self.read_buffer.len() {
1425                    self.read_buffer.resize(total_frame_size, 0);
1426                }
1427
1428                if let Some((frame, consumed)) = parse_frame(&self.read_buffer[..self.read_offset])?
1429                {
1430                    self.read_buffer.copy_within(consumed..self.read_offset, 0);
1431                    self.read_offset -= consumed;
1432                    // Shrink buffer back to default if it was grown for a large frame
1433                    if self.read_buffer.len() > 64 * 1024 && self.read_offset < 64 * 1024 {
1434                        self.read_buffer.resize(64 * 1024, 0);
1435                    }
1436                    return Ok(frame);
1437                }
1438            }
1439
1440            let n = tokio::time::timeout(
1441                self.config.read_timeout,
1442                self.stream.read(&mut self.read_buffer[self.read_offset..]),
1443            )
1444            .await
1445            .map_err(|_| ClientError::Timeout)??;
1446
1447            if n == 0 {
1448                return Err(ClientError::ConnectionClosed);
1449            }
1450
1451            self.read_offset += n;
1452        }
1453    }
1454
1455    /// Get a reference to the client configuration
1456    pub fn config(&self) -> &ClientConfig {
1457        &self.config
1458    }
1459
1460    /// Close the client connection
1461    pub async fn close(mut self) -> Result<()> {
1462        self.stream.shutdown().await?;
1463        Ok(())
1464    }
1465}
1466
1467impl std::fmt::Debug for LanceClient {
1468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1469        f.debug_struct("LanceClient")
1470            .field("addr", &self.config.addr)
1471            .field("batch_id", &self.batch_id.load(Ordering::SeqCst))
1472            .finish()
1473    }
1474}