Skip to main content

lnc_client/
client.rs

1use crate::error::{ClientError, Result};
2use crate::tls::TlsClientConfig;
3use bytes::Bytes;
4use lnc_network::{
5    ControlCommand, Frame, FrameType, LWP_HEADER_SIZE, TlsConnector, encode_frame, parse_frame,
6};
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
13use tokio::net::TcpStream;
14use tokio::net::lookup_host;
15use tracing::{debug, trace, warn};
16
17/// Wrapper enum for TCP and TLS streams to avoid dynamic dispatch
18#[allow(clippy::large_enum_variant)]
19pub enum ClientStream {
20    /// Plain TCP connection
21    Tcp(TcpStream),
22    /// TLS-encrypted connection
23    Tls(tokio_rustls::client::TlsStream<TcpStream>),
24}
25
26impl AsyncRead for ClientStream {
27    /// Delegates read readiness directly to the concrete transport so
28    /// buffered frames stay on the Architecture §15 zero-copy path without
29    /// layering additional indirection.
30    fn poll_read(
31        self: Pin<&mut Self>,
32        cx: &mut Context<'_>,
33        buf: &mut ReadBuf<'_>,
34    ) -> Poll<std::io::Result<()>> {
35        match self.get_mut() {
36            ClientStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
37            ClientStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
38        }
39    }
40}
41
42impl AsyncWrite for ClientStream {
43    /// Delegates write readiness to the underlying transport without
44    /// introducing dynamic dispatch, which keeps buffered writes on the
45    /// Section 14 thread-pinned path compliant with Architecture §15's
46    /// loaner-buffer rules.
47    fn poll_write(
48        self: Pin<&mut Self>,
49        cx: &mut Context<'_>,
50        buf: &[u8],
51    ) -> Poll<std::io::Result<usize>> {
52        match self.get_mut() {
53            ClientStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
54            ClientStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
55        }
56    }
57
58    /// Flushes bytes on whichever transport is active so that ingestion
59    /// frames honor the write-buffering guarantees described in Architecture
60    /// §22 without duplicating logic across TCP/TLS paths.
61    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
62        match self.get_mut() {
63            ClientStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
64            ClientStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
65        }
66    }
67
68    /// Propagates orderly shutdown to the concrete stream implementation,
69    /// enabling graceful disconnects per Architecture §14's pinning strategy
70    /// whether the session is plain TCP or TLS.
71    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
72        match self.get_mut() {
73            ClientStream::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
74            ClientStream::Tls(stream) => Pin::new(stream).poll_shutdown(cx),
75        }
76    }
77}
78
79/// Helper to extract error message from a frame payload
80fn extract_error_message(frame: &Frame) -> String {
81    frame
82        .payload
83        .as_ref()
84        .map(|p| String::from_utf8_lossy(p).to_string())
85        .unwrap_or_else(|| "Unknown error".to_string())
86}
87
88/// Helper to validate frame type and extract error if present
89#[allow(dead_code)] // Reserved for future protocol extensions
90fn expect_frame_type(frame: Frame, expected: ControlCommand, expected_name: &str) -> Result<Frame> {
91    match frame.frame_type {
92        FrameType::Control(cmd) if cmd == expected => Ok(frame),
93        FrameType::Control(ControlCommand::ErrorResponse) => {
94            Err(ClientError::ServerError(extract_error_message(&frame)))
95        },
96        other => Err(ClientError::InvalidResponse(format!(
97            "Expected {}, got {:?}",
98            expected_name, other
99        ))),
100    }
101}
102
103/// Helper to validate frame is a success response (TopicResponse or similar)
104fn expect_success_response(frame: Frame) -> Result<()> {
105    match frame.frame_type {
106        FrameType::Control(ControlCommand::TopicResponse) => Ok(()),
107        FrameType::Control(ControlCommand::ErrorResponse) => {
108            Err(ClientError::ServerError(extract_error_message(&frame)))
109        },
110        other => Err(ClientError::InvalidResponse(format!(
111            "Expected TopicResponse, got {:?}",
112            other
113        ))),
114    }
115}
116
117/// Authentication configuration for client connections
118#[derive(Debug, Clone, Default)]
119pub struct AuthConfig {
120    /// Whether mutual TLS (mTLS) authentication is enabled
121    pub mtls_enabled: bool,
122    /// Path to the client certificate file for mTLS
123    pub client_cert_path: Option<String>,
124    /// Path to the client private key file for mTLS
125    pub client_key_path: Option<String>,
126}
127
128/// Retention configuration for a topic
129#[derive(Debug, Clone, Default)]
130pub struct RetentionInfo {
131    /// Maximum age of messages in seconds (0 = no limit)
132    pub max_age_secs: u64,
133    /// Maximum size of topic data in bytes (0 = no limit)
134    pub max_bytes: u64,
135}
136
137/// Information about a topic
138#[derive(Debug, Clone)]
139pub struct TopicInfo {
140    /// Unique topic identifier
141    pub id: u32,
142    /// Topic name
143    pub name: String,
144    /// Unix timestamp when the topic was created
145    pub created_at: u64,
146    /// Topic identity epoch for stale-id detection
147    pub topic_epoch: u64,
148    /// Retention policy configuration (None = no retention policy set)
149    pub retention: Option<RetentionInfo>,
150}
151
152/// Result of a fetch operation
153#[derive(Debug, Clone)]
154pub struct FetchResult {
155    /// Raw data fetched from the topic
156    pub data: Bytes,
157    /// Offset to use for the next fetch operation
158    pub next_offset: u64,
159    /// Number of bytes returned in this fetch
160    pub bytes_returned: u32,
161    /// Number of records in the fetched data
162    pub record_count: u32,
163}
164
165/// Result of a subscribe operation
166#[derive(Debug, Clone)]
167pub struct SubscribeResult {
168    /// Assigned consumer identifier
169    pub consumer_id: u64,
170    /// Starting offset for consumption
171    pub start_offset: u64,
172}
173
174/// Result of a commit offset operation
175#[derive(Debug, Clone)]
176pub struct CommitResult {
177    /// Consumer identifier that committed the offset
178    pub consumer_id: u64,
179    /// The offset that was successfully committed
180    pub committed_offset: u64,
181}
182
183/// Cluster status information
184#[derive(Debug, Clone)]
185pub struct ClusterStatus {
186    pub node_id: u16,
187    pub is_leader: bool,
188    pub leader_id: Option<u16>,
189    pub current_term: u64,
190    pub node_count: usize,
191    pub healthy_nodes: usize,
192    pub quorum_available: bool,
193    pub peer_states: std::collections::HashMap<u16, String>,
194}
195
196/// Configuration for the LANCE client
197#[derive(Debug, Clone)]
198pub struct ClientConfig {
199    /// Server address to connect to (supports both IP:port and hostname:port)
200    pub addr: String,
201    /// Timeout for establishing connections
202    pub connect_timeout: Duration,
203    /// Timeout for read operations
204    pub read_timeout: Duration,
205    /// Timeout for write operations
206    pub write_timeout: Duration,
207    /// Interval between keepalive messages
208    pub keepalive_interval: Duration,
209    /// Optional TLS configuration for encrypted connections
210    pub tls: Option<TlsClientConfig>,
211}
212
213impl Default for ClientConfig {
214    fn default() -> Self {
215        Self {
216            addr: "127.0.0.1:1992".to_string(),
217            connect_timeout: Duration::from_secs(10),
218            read_timeout: Duration::from_secs(30),
219            write_timeout: Duration::from_secs(10),
220            keepalive_interval: Duration::from_secs(10),
221            tls: None,
222        }
223    }
224}
225
226impl ClientConfig {
227    /// Create a new client configuration with the specified server address
228    ///
229    /// The address can be either an IP:port (e.g., "127.0.0.1:1992") or
230    /// a hostname:port (e.g., "lance.example.com:1992"). DNS resolution
231    /// is performed asynchronously during connection.
232    pub fn new(addr: impl Into<String>) -> Self {
233        Self {
234            addr: addr.into(),
235            ..Default::default()
236        }
237    }
238
239    /// Enables TLS for this configuration so clients can satisfy
240    /// Architecture §14's production security guidance when traversing
241    /// untrusted networks.
242    ///
243    /// # Arguments
244    /// * `tls_config` - Certificates and trust roots passed through to
245    ///   `lnc-network`'s TLS connector.
246    ///
247    /// # Returns
248    /// * `Self` - Updated config allowing fluent builder-style chaining.
249    pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
250        self.tls = Some(tls_config);
251        self
252    }
253
254    /// Check if TLS is enabled
255    pub fn is_tls_enabled(&self) -> bool {
256        self.tls.is_some()
257    }
258}
259
260/// LANCE protocol client for communicating with LANCE servers
261///
262/// Provides methods for ingesting data, managing topics, and consuming records.
263pub struct LanceClient {
264    stream: ClientStream,
265    config: ClientConfig,
266    batch_id: AtomicU64,
267    read_buffer: Vec<u8>,
268    read_offset: usize,
269}
270
271impl LanceClient {
272    /// Resolves an address string (hostname:port or IP:port) to a `SocketAddr`
273    /// so clients can honor Architecture §10.1's Docker-first deployment model
274    /// where hostnames are common.
275    ///
276    /// # Arguments
277    /// * `addr` - Address string such as `"10.0.0.5:1992"` or
278    ///   `"broker.lance:1992"`.
279    ///
280    /// # Returns
281    /// * `Result<SocketAddr>` - Parsed IP:port pair ready for `TcpStream`.
282    ///
283    /// # Errors
284    /// * [`ClientError::ProtocolError`] when DNS resolution fails or produces
285    ///   no usable endpoints.
286    async fn resolve_address(addr: &str) -> Result<SocketAddr> {
287        // First, try parsing as a SocketAddr directly (for IP:port format)
288        if let Ok(socket_addr) = addr.parse::<SocketAddr>() {
289            return Ok(socket_addr);
290        }
291
292        // If direct parsing fails, perform DNS resolution (for hostname:port format)
293        let mut addrs = lookup_host(addr).await.map_err(|e| {
294            ClientError::ProtocolError(format!("DNS resolution failed for '{}': {}", addr, e))
295        })?;
296
297        addrs
298            .next()
299            .ok_or_else(|| ClientError::ProtocolError(format!("No addresses found for '{}'", addr)))
300    }
301
302    /// Connect to LANCE server, automatically using TLS if configured
303    ///
304    /// The address in the config can be either an IP:port or hostname:port.
305    /// DNS resolution is performed automatically for hostnames.
306    pub async fn connect(config: ClientConfig) -> Result<Self> {
307        // If TLS is configured in ClientConfig, use TLS connection
308        if let Some(ref tls_config) = config.tls {
309            return Self::connect_tls(config.clone(), tls_config.clone()).await;
310        }
311
312        debug!(addr = %config.addr, "Connecting to LANCE server");
313
314        // Resolve address (handles both IP:port and hostname:port)
315        let socket_addr = Self::resolve_address(&config.addr).await?;
316        debug!(resolved_addr = %socket_addr, "Resolved server address");
317
318        let stream = tokio::time::timeout(config.connect_timeout, TcpStream::connect(socket_addr))
319            .await
320            .map_err(|_| ClientError::Timeout)?
321            .map_err(ClientError::ConnectionFailed)?;
322
323        stream.set_nodelay(true)?;
324
325        debug!(addr = %config.addr, "Connected to LANCE server");
326
327        Ok(Self {
328            stream: ClientStream::Tcp(stream),
329            config,
330            batch_id: AtomicU64::new(0),
331            read_buffer: vec![0u8; 64 * 1024],
332            read_offset: 0,
333        })
334    }
335
336    /// Connect to LANCE server with TLS encryption
337    ///
338    /// # Arguments
339    /// * `config` - Client configuration with server address (IP:port or hostname:port)
340    /// * `tls_config` - TLS configuration including certificates
341    ///
342    /// # Example
343    /// ```rust,ignore
344    /// use lnc_client::{ClientConfig, TlsClientConfig, LanceClient};
345    ///
346    /// let config = ClientConfig::new("lance.example.com:1992");
347    /// let tls = TlsClientConfig::new()
348    ///     .with_ca_cert("/path/to/ca.pem");
349    ///
350    /// let client = LanceClient::connect_tls(config, tls).await?;
351    /// ```
352    pub async fn connect_tls(config: ClientConfig, tls_config: TlsClientConfig) -> Result<Self> {
353        debug!(addr = %config.addr, "Connecting to LANCE server with TLS");
354
355        // Resolve address (handles both IP:port and hostname:port)
356        let socket_addr = Self::resolve_address(&config.addr).await?;
357        debug!(resolved_addr = %socket_addr, "Resolved server address");
358
359        // First establish TCP connection
360        let tcp_stream =
361            tokio::time::timeout(config.connect_timeout, TcpStream::connect(socket_addr))
362                .await
363                .map_err(|_| ClientError::Timeout)?
364                .map_err(ClientError::ConnectionFailed)?;
365
366        tcp_stream.set_nodelay(true)?;
367
368        // Create TLS connector
369        let network_config = tls_config.to_network_config();
370        let connector =
371            TlsConnector::new(network_config).map_err(|e| ClientError::TlsError(e.to_string()))?;
372
373        // Determine server name for SNI - prefer configured name, then extract hostname from address
374        let server_name = tls_config.server_name.unwrap_or_else(|| {
375            // Extract hostname from address (remove port if present)
376            config
377                .addr
378                .rsplit_once(':')
379                .map(|(host, _)| host.to_string())
380                .unwrap_or_else(|| socket_addr.ip().to_string())
381        });
382
383        // Perform TLS handshake
384        let tls_stream = connector
385            .connect(&server_name, tcp_stream)
386            .await
387            .map_err(|e| ClientError::TlsError(e.to_string()))?;
388
389        debug!(addr = %config.addr, "TLS connection established");
390
391        Ok(Self {
392            stream: ClientStream::Tls(tls_stream),
393            config,
394            batch_id: AtomicU64::new(0),
395            read_buffer: vec![0u8; 64 * 1024],
396            read_offset: 0,
397        })
398    }
399
400    /// Connect to a LANCE server using an address string
401    ///
402    /// The address can be either an IP:port (e.g., "127.0.0.1:1992") or
403    /// a hostname:port (e.g., "lance.example.com:1992").
404    pub async fn connect_to(addr: &str) -> Result<Self> {
405        Self::connect(ClientConfig::new(addr)).await
406    }
407
408    /// Connect to LANCE server with TLS using address string
409    ///
410    /// The address can be either an IP:port (e.g., "127.0.0.1:1992") or
411    /// a hostname:port (e.g., "lance.example.com:1992").
412    pub async fn connect_tls_to(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
413        Self::connect_tls(ClientConfig::new(addr), tls_config).await
414    }
415
416    fn next_batch_id(&self) -> u64 {
417        self.batch_id.fetch_add(1, Ordering::SeqCst) + 1
418    }
419
420    /// Sends an ingest frame to the default topic (ID 0) while preserving the
421    /// Architecture §22 write-buffering guarantees.
422    ///
423    /// # Arguments
424    /// * `payload` - Record batch encoded using the zero-copy LWP format.
425    /// * `record_count` - Logical record total encoded in the frame header.
426    ///
427    /// # Returns
428    /// * `Result<u64>` - Server-assigned batch identifier for the frame.
429    ///
430    /// # Errors
431    /// Propagates [`ClientError::Timeout`] when the write exceeds the configured
432    /// deadline or any framing/connection error surfaced by Tokio.
433    pub async fn send_ingest(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
434        self.send_ingest_to_topic(0, payload, record_count, None)
435            .await
436    }
437
438    /// Sends an ingest frame to a specific topic while attaching metadata
439    /// required by Architecture §22's deferred flush/ack scheme.
440    ///
441    /// # Arguments
442    /// * `topic_id` - Destination topic identifier.
443    /// * `payload` - Zero-copy encoded batch.
444    /// * `record_count` - Logical records contained in `payload`.
445    /// * `_auth_config` - Optional future hook for per-request auth context.
446    ///
447    /// # Returns
448    /// * `Result<u64>` - Batch identifier allocated by the client monotonic
449    ///   counter and echoed back by the server.
450    pub async fn send_ingest_to_topic(
451        &mut self,
452        topic_id: u32,
453        payload: Bytes,
454        record_count: u32,
455        _auth_config: Option<&AuthConfig>,
456    ) -> Result<u64> {
457        let batch_id = self.next_batch_id();
458        let timestamp_ns = std::time::SystemTime::now()
459            .duration_since(std::time::UNIX_EPOCH)
460            .map(|d| d.as_nanos() as u64)
461            .unwrap_or(0);
462
463        let frame =
464            Frame::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, topic_id);
465        let frame_bytes = encode_frame(&frame);
466
467        trace!(
468            batch_id,
469            topic_id,
470            payload_len = frame.payload_length(),
471            "Sending ingest frame"
472        );
473
474        tokio::time::timeout(
475            self.config.write_timeout,
476            self.stream.write_all(&frame_bytes),
477        )
478        .await
479        .map_err(|_| ClientError::Timeout)??;
480
481        Ok(batch_id)
482    }
483
484    /// Sends an ingest request to the default topic and waits for the
485    /// corresponding acknowledgment, mirroring Architecture §22.3 sync gates.
486    ///
487    /// # Arguments
488    /// * `payload` - Ingest batch to transmit.
489    /// * `record_count` - Logical record total for metrics validation.
490    ///
491    /// # Returns
492    /// * `Result<u64>` - The acked batch identifier if the server confirms the
493    ///   write succeeded.
494    pub async fn send_ingest_sync(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
495        self.send_ingest_to_topic_sync(0, payload, record_count, None)
496            .await
497    }
498
499    /// Sends an ingest request to a topic and blocks for server acknowledgment
500    /// so callers can enforce durability or backpressure decisions inline.
501    ///
502    /// # Arguments
503    /// * `topic_id` - Destination topic.
504    /// * `payload` - Zero-copy loaner buffer to transmit.
505    /// * `record_count` - Logical records contained in the batch.
506    /// * `auth_config` - Optional per-request authentication context.
507    ///
508    /// # Returns
509    /// * `Result<u64>` - The acked batch identifier, ensuring sequencing with
510    ///   downstream consumers.
511    pub async fn send_ingest_to_topic_sync(
512        &mut self,
513        topic_id: u32,
514        payload: Bytes,
515        record_count: u32,
516        auth_config: Option<&AuthConfig>,
517    ) -> Result<u64> {
518        let batch_id = self
519            .send_ingest_to_topic(topic_id, payload, record_count, auth_config)
520            .await?;
521        self.wait_for_ack(batch_id).await
522    }
523
524    /// Waits for a specific acknowledgment frame, enforcing Architecture §22's
525    /// deferred flush contract between ingestion and persistence stages.
526    ///
527    /// # Arguments
528    /// * `expected_batch_id` - Identifier produced by the paired send path.
529    ///
530    /// # Returns
531    /// * `Result<u64>` - The acked batch identifier (matching expectation).
532    ///
533    /// # Errors
534    /// Surfaces protocol mismatches, server backpressure, or error frames so
535    /// callers can react immediately.
536    async fn wait_for_ack(&mut self, expected_batch_id: u64) -> Result<u64> {
537        let frame = self.recv_frame().await?;
538
539        match frame.frame_type {
540            FrameType::Ack => {
541                let acked_id = frame.batch_id();
542                if acked_id != expected_batch_id {
543                    return Err(ClientError::InvalidResponse(format!(
544                        "Ack batch_id mismatch: sent {}, received {}",
545                        expected_batch_id, acked_id
546                    )));
547                }
548                trace!(batch_id = acked_id, "Received ack");
549                Ok(acked_id)
550            },
551            FrameType::Control(ControlCommand::ErrorResponse) => {
552                let error_msg = frame
553                    .payload
554                    .map(|p| String::from_utf8_lossy(&p).to_string())
555                    .unwrap_or_else(|| "Unknown error".to_string());
556                Err(ClientError::ServerError(error_msg))
557            },
558            FrameType::Backpressure => {
559                warn!("Server signaled backpressure");
560                Err(ClientError::ServerBackpressure)
561            },
562            other => Err(ClientError::InvalidResponse(format!(
563                "Expected Ack, got {:?}",
564                other
565            ))),
566        }
567    }
568
569    /// Receives the next acknowledgment frame and translates server feedback
570    /// (ack, backpressure, or error) into structured [`ClientError`] variants.
571    ///
572    /// # Returns
573    /// * `Result<u64>` - Acked batch identifier if the server confirmed success.
574    ///
575    /// # Errors
576    /// Surfaces [`ClientError::ServerBackpressure`] or
577    /// [`ClientError::InvalidResponse`] when the frame type deviates from the
578    /// Architecture §22 control flow expectations.
579    pub async fn recv_ack(&mut self) -> Result<u64> {
580        let frame = self.recv_frame().await?;
581
582        match frame.frame_type {
583            FrameType::Ack => {
584                trace!(batch_id = frame.batch_id(), "Received ack");
585                Ok(frame.batch_id())
586            },
587            FrameType::Backpressure => {
588                warn!("Server signaled backpressure");
589                Err(ClientError::ServerBackpressure)
590            },
591            other => Err(ClientError::InvalidResponse(format!(
592                "Expected Ack, got {:?}",
593                other
594            ))),
595        }
596    }
597
598    /// Sends a keepalive frame so long-lived clients satisfy Architecture §9.4
599    /// drain/force-exit requirements and keep connection state fresh.
600    ///
601    /// # Returns
602    /// * `Result<()>` - Ok when the frame is flushed before the configured
603    ///   write timeout expires.
604    pub async fn send_keepalive(&mut self) -> Result<()> {
605        let frame = Frame::new_keepalive();
606        let frame_bytes = encode_frame(&frame);
607
608        trace!("Sending keepalive");
609
610        tokio::time::timeout(
611            self.config.write_timeout,
612            self.stream.write_all(&frame_bytes),
613        )
614        .await
615        .map_err(|_| ClientError::Timeout)??;
616
617        Ok(())
618    }
619
620    /// Waits for a keepalive response, guaranteeing the control-plane path is
621    /// still healthy per Architecture §9.4 monitoring requirements.
622    ///
623    /// # Returns
624    /// * `Result<()>` - Ok when the server replies with `FrameType::Keepalive`.
625    ///
626    /// # Errors
627    /// Returns [`ClientError::InvalidResponse`] when any other frame type
628    /// arrives, signaling connection drift.
629    pub async fn recv_keepalive(&mut self) -> Result<()> {
630        let frame = self.recv_frame().await?;
631
632        match frame.frame_type {
633            FrameType::Keepalive => {
634                trace!("Received keepalive response");
635                Ok(())
636            },
637            other => Err(ClientError::InvalidResponse(format!(
638                "Expected Keepalive, got {:?}",
639                other
640            ))),
641        }
642    }
643
644    /// Ping the server and measure round-trip latency
645    pub async fn ping(&mut self) -> Result<Duration> {
646        let start = std::time::Instant::now();
647        self.send_keepalive().await?;
648        self.recv_keepalive().await?;
649        Ok(start.elapsed())
650    }
651
652    /// Create a new topic with the given name
653    pub async fn create_topic(&mut self, name: &str) -> Result<TopicInfo> {
654        const DEFAULT_CREATE_TOPIC_ATTEMPTS: usize = 20;
655        const DEFAULT_CREATE_TOPIC_BACKOFF_MS: u64 = 500;
656        self.ensure_topic(
657            name,
658            DEFAULT_CREATE_TOPIC_ATTEMPTS,
659            DEFAULT_CREATE_TOPIC_BACKOFF_MS,
660        )
661        .await
662    }
663
664    async fn create_topic_once(&mut self, name: &str) -> Result<TopicInfo> {
665        let frame = Frame::new_create_topic(name);
666        let frame_bytes = encode_frame(&frame);
667
668        trace!(topic_name = %name, "Creating topic");
669
670        tokio::time::timeout(
671            self.config.write_timeout,
672            self.stream.write_all(&frame_bytes),
673        )
674        .await
675        .map_err(|_| ClientError::Timeout)??;
676
677        let response = self.recv_frame().await?;
678        self.parse_topic_response(response)
679    }
680
681    /// Ensure a topic exists and return its metadata.
682    ///
683    /// This helper encapsulates common create/list convergence retry behavior so
684    /// application callers (bench/chaos) can stay simple.
685    pub async fn ensure_topic(
686        &mut self,
687        name: &str,
688        max_attempts: usize,
689        base_backoff_ms: u64,
690    ) -> Result<TopicInfo> {
691        let attempts = max_attempts.max(1);
692        let mut last_error: Option<ClientError> = None;
693        let mut saw_retryable_error = false;
694
695        for attempt in 1..=attempts {
696            let mut retryable_this_attempt = false;
697
698            match self.create_topic_once(name).await {
699                Ok(info) => {
700                    trace!(
701                        topic_id = info.id,
702                        topic_name = %info.name,
703                        attempt,
704                        max_attempts = attempts,
705                        "Topic ensured via create_topic"
706                    );
707                    return Ok(info);
708                },
709                Err(create_err) => {
710                    if create_err.is_retryable() {
711                        retryable_this_attempt = true;
712                        saw_retryable_error = true;
713                    }
714                    last_error = Some(ClientError::ServerError(create_err.to_string()));
715                    warn!(
716                        topic_name = %name,
717                        attempt,
718                        max_attempts = attempts,
719                        error = %create_err,
720                        "create_topic failed during ensure_topic; retrying with list fallback"
721                    );
722                },
723            }
724
725            match self.list_topics().await {
726                Ok(topics) => {
727                    if let Some(topic) = topics.into_iter().find(|t| t.name == name) {
728                        trace!(
729                            topic_id = topic.id,
730                            topic_name = %topic.name,
731                            attempt,
732                            max_attempts = attempts,
733                            "Topic ensured via list_topics fallback"
734                        );
735                        return Ok(topic);
736                    }
737                },
738                Err(list_err) => {
739                    if list_err.is_retryable() {
740                        retryable_this_attempt = true;
741                        saw_retryable_error = true;
742                    }
743                    last_error = Some(ClientError::ServerError(list_err.to_string()));
744                    warn!(
745                        topic_name = %name,
746                        attempt,
747                        max_attempts = attempts,
748                        error = %list_err,
749                        "list_topics failed during ensure_topic"
750                    );
751                },
752            }
753
754            if attempt < attempts {
755                let backoff_ms = if retryable_this_attempt {
756                    base_backoff_ms.saturating_mul(attempt as u64).max(1)
757                } else {
758                    // Non-retryable errors are unlikely to heal with long sleeps.
759                    base_backoff_ms.max(1)
760                };
761                tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
762
763                // Refresh connection to improve odds of landing on current leader
764                // after elections and readiness transitions.
765                let reconnect_config = self.config.clone();
766                match Self::connect(reconnect_config).await {
767                    Ok(new_client) => {
768                        *self = new_client;
769                    },
770                    Err(reconnect_err) => {
771                        warn!(
772                            topic_name = %name,
773                            attempt,
774                            max_attempts = attempts,
775                            error = %reconnect_err,
776                            "ensure_topic reconnect attempt failed"
777                        );
778                        last_error = Some(reconnect_err);
779                    },
780                }
781            }
782        }
783
784        if let Some(err) = last_error {
785            return Err(ClientError::ServerError(format!(
786                "ensure_topic('{}') failed after {} attempts: {}",
787                name, attempts, err
788            )));
789        }
790
791        if saw_retryable_error {
792            return Err(ClientError::ServerError(format!(
793                "ensure_topic('{}') exhausted {} retryable attempts",
794                name, attempts
795            )));
796        }
797
798        Err(ClientError::ServerError(format!(
799            "topic '{}' not found after {} ensure_topic attempts",
800            name, attempts
801        )))
802    }
803
804    /// Ensure topic with standard retry profile suitable for benchmark/chaos tools.
805    pub async fn ensure_topic_default(&mut self, name: &str) -> Result<TopicInfo> {
806        const DEFAULT_ENSURE_TOPIC_ATTEMPTS: usize = 20;
807        const DEFAULT_ENSURE_TOPIC_BACKOFF_MS: u64 = 500;
808        self.ensure_topic(
809            name,
810            DEFAULT_ENSURE_TOPIC_ATTEMPTS,
811            DEFAULT_ENSURE_TOPIC_BACKOFF_MS,
812        )
813        .await
814    }
815
816    /// List all topics on the server
817    pub async fn list_topics(&mut self) -> Result<Vec<TopicInfo>> {
818        let frame = Frame::new_list_topics();
819        let frame_bytes = encode_frame(&frame);
820
821        trace!("Listing topics");
822
823        tokio::time::timeout(
824            self.config.write_timeout,
825            self.stream.write_all(&frame_bytes),
826        )
827        .await
828        .map_err(|_| ClientError::Timeout)??;
829
830        let response = self.recv_frame().await?;
831        self.parse_topic_list_response(response)
832    }
833
834    /// Get information about a specific topic
835    pub async fn get_topic(&mut self, topic_id: u32) -> Result<TopicInfo> {
836        let frame = Frame::new_get_topic(topic_id);
837        let frame_bytes = encode_frame(&frame);
838
839        trace!(topic_id, "Getting topic");
840
841        tokio::time::timeout(
842            self.config.write_timeout,
843            self.stream.write_all(&frame_bytes),
844        )
845        .await
846        .map_err(|_| ClientError::Timeout)??;
847
848        let response = self.recv_frame().await?;
849        self.parse_topic_response(response)
850    }
851
852    /// Delete a topic by its ID
853    pub async fn delete_topic(&mut self, topic_id: u32) -> Result<()> {
854        let frame = Frame::new_delete_topic(topic_id);
855        let frame_bytes = encode_frame(&frame);
856
857        trace!(topic_id, "Deleting topic");
858
859        tokio::time::timeout(
860            self.config.write_timeout,
861            self.stream.write_all(&frame_bytes),
862        )
863        .await
864        .map_err(|_| ClientError::Timeout)??;
865
866        let response = self.recv_frame().await?;
867        self.parse_delete_response(response)
868    }
869
870    /// Sets the retention policy for an existing topic, mirroring the
871    /// configuration model documented in Architecture §12.7.
872    ///
873    /// # Arguments
874    /// * `topic_id` - Topic identifier
875    /// * `max_age_secs` - Maximum age in seconds (0 = no limit)
876    /// * `max_bytes` - Maximum size in bytes (0 = no limit)
877    pub async fn set_retention(
878        &mut self,
879        topic_id: u32,
880        max_age_secs: u64,
881        max_bytes: u64,
882    ) -> Result<()> {
883        let frame = Frame::new_set_retention(topic_id, max_age_secs, max_bytes);
884        let frame_bytes = encode_frame(&frame);
885
886        trace!(
887            topic_id,
888            max_age_secs, max_bytes, "Setting retention policy"
889        );
890
891        tokio::time::timeout(
892            self.config.write_timeout,
893            self.stream.write_all(&frame_bytes),
894        )
895        .await
896        .map_err(|_| ClientError::Timeout)??;
897
898        let response = self.recv_frame().await?;
899        self.parse_retention_response(response)
900    }
901
902    /// Create a topic with retention policy in a single operation
903    ///
904    /// # Arguments
905    /// * `name` - Topic name
906    /// * `max_age_secs` - Maximum age in seconds (0 = no limit)
907    /// * `max_bytes` - Maximum size in bytes (0 = no limit)
908    pub async fn create_topic_with_retention(
909        &mut self,
910        name: &str,
911        max_age_secs: u64,
912        max_bytes: u64,
913    ) -> Result<TopicInfo> {
914        let frame = Frame::new_create_topic_with_retention(name, max_age_secs, max_bytes);
915        let frame_bytes = encode_frame(&frame);
916
917        trace!(
918            name,
919            max_age_secs, max_bytes, "Creating topic with retention"
920        );
921
922        tokio::time::timeout(
923            self.config.write_timeout,
924            self.stream.write_all(&frame_bytes),
925        )
926        .await
927        .map_err(|_| ClientError::Timeout)??;
928
929        let response = self.recv_frame().await?;
930        self.parse_topic_response(response)
931    }
932
933    /// Get cluster status and health information
934    pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus> {
935        let frame = Frame::new_get_cluster_status();
936        let frame_bytes = encode_frame(&frame);
937
938        tokio::time::timeout(
939            self.config.write_timeout,
940            self.stream.write_all(&frame_bytes),
941        )
942        .await
943        .map_err(|_| ClientError::Timeout)??;
944
945        let response = self.recv_frame().await?;
946        self.parse_cluster_status_response(response)
947    }
948
949    fn parse_cluster_status_response(&self, frame: Frame) -> Result<ClusterStatus> {
950        match frame.frame_type {
951            FrameType::Control(ControlCommand::ClusterStatusResponse) => {
952                let payload = frame.payload.ok_or_else(|| {
953                    ClientError::InvalidResponse("Empty cluster status response".to_string())
954                })?;
955                let json: serde_json::Value = serde_json::from_slice(&payload)
956                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
957
958                let peer_states: std::collections::HashMap<u16, String> = json["peer_states"]
959                    .as_object()
960                    .map(|obj| {
961                        obj.iter()
962                            .filter_map(|(k, v)| {
963                                k.parse::<u16>()
964                                    .ok()
965                                    .map(|id| (id, v.as_str().unwrap_or("unknown").to_string()))
966                            })
967                            .collect()
968                    })
969                    .unwrap_or_default();
970
971                Ok(ClusterStatus {
972                    node_id: json["node_id"].as_u64().unwrap_or(0) as u16,
973                    is_leader: json["is_leader"].as_bool().unwrap_or(false),
974                    leader_id: json["leader_id"].as_u64().map(|id| id as u16),
975                    current_term: json["current_term"].as_u64().unwrap_or(0),
976                    node_count: json["node_count"].as_u64().unwrap_or(1) as usize,
977                    healthy_nodes: json["healthy_nodes"].as_u64().unwrap_or(1) as usize,
978                    quorum_available: json["quorum_available"].as_bool().unwrap_or(true),
979                    peer_states,
980                })
981            },
982            FrameType::Control(ControlCommand::ErrorResponse) => {
983                let error_msg = frame
984                    .payload
985                    .map(|p| String::from_utf8_lossy(&p).to_string())
986                    .unwrap_or_else(|| "Unknown error".to_string());
987                Err(ClientError::ServerError(error_msg))
988            },
989            other => Err(ClientError::InvalidResponse(format!(
990                "Expected ClusterStatusResponse, got {:?}",
991                other
992            ))),
993        }
994    }
995
996    /// Fetch data from a topic starting at the given offset
997    /// Returns (data, next_offset, record_count)
998    pub async fn fetch(
999        &mut self,
1000        topic_id: u32,
1001        start_offset: u64,
1002        max_bytes: u32,
1003    ) -> Result<FetchResult> {
1004        let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
1005        let frame_bytes = encode_frame(&frame);
1006
1007        trace!(topic_id, start_offset, max_bytes, "Fetching data");
1008
1009        tokio::time::timeout(
1010            self.config.write_timeout,
1011            self.stream.write_all(&frame_bytes),
1012        )
1013        .await
1014        .map_err(|_| ClientError::Timeout)??;
1015
1016        let response = self.recv_frame().await?;
1017        self.parse_fetch_response(response)
1018    }
1019
1020    /// Subscribes to a topic for streaming consumption, honoring the consumer
1021    /// coordination model described in Architecture §20.
1022    ///
1023    /// # Arguments
1024    /// * `topic_id` - Topic to follow.
1025    /// * `start_offset` - First logical offset to deliver.
1026    /// * `max_batch_bytes` - Maximum payload size per delivery window.
1027    /// * `consumer_id` - Stable consumer identifier for server-side tracking.
1028    ///
1029    /// # Returns
1030    /// * `Result<SubscribeResult>` - Contains the confirmed consumer_id and
1031    ///   start offset granted by the server.
1032    ///
1033    /// # Errors
1034    /// Surfaces timeouts, protocol violations, or server error frames (e.g.,
1035    /// `ControlCommand::ErrorResponse`).
1036    pub async fn subscribe(
1037        &mut self,
1038        topic_id: u32,
1039        start_offset: u64,
1040        max_batch_bytes: u32,
1041        consumer_id: u64,
1042    ) -> Result<SubscribeResult> {
1043        let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
1044        let frame_bytes = encode_frame(&frame);
1045
1046        trace!(topic_id, start_offset, consumer_id, "Subscribing to topic");
1047
1048        tokio::time::timeout(
1049            self.config.write_timeout,
1050            self.stream.write_all(&frame_bytes),
1051        )
1052        .await
1053        .map_err(|_| ClientError::Timeout)??;
1054
1055        let response = self.recv_frame().await?;
1056        self.parse_subscribe_response(response)
1057    }
1058
1059    /// Unsubscribes a consumer from a topic, ensuring server resources are
1060    /// reclaimed per Architecture §20's consumer lifecycle.
1061    ///
1062    /// # Arguments
1063    /// * `topic_id` - Topic to leave.
1064    /// * `consumer_id` - Consumer identifier provided during subscribe.
1065    ///
1066    /// # Returns
1067    /// * `Result<()>` - Ok when the server acknowledges the unsubscribe.
1068    ///
1069    /// # Errors
1070    /// Propagates [`ClientError::ServerError`] if the server rejects the
1071    /// request or [`ClientError::InvalidResponse`] if a non-ack frame arrives.
1072    pub async fn unsubscribe(&mut self, topic_id: u32, consumer_id: u64) -> Result<()> {
1073        let frame = Frame::new_unsubscribe(topic_id, consumer_id);
1074        let frame_bytes = encode_frame(&frame);
1075
1076        trace!(topic_id, consumer_id, "Unsubscribing from topic");
1077
1078        tokio::time::timeout(
1079            self.config.write_timeout,
1080            self.stream.write_all(&frame_bytes),
1081        )
1082        .await
1083        .map_err(|_| ClientError::Timeout)??;
1084
1085        // Wait for ack or error
1086        let response = self.recv_frame().await?;
1087        match response.frame_type {
1088            FrameType::Ack => Ok(()),
1089            FrameType::Control(ControlCommand::ErrorResponse) => {
1090                let error_msg = response
1091                    .payload
1092                    .map(|p| String::from_utf8_lossy(&p).to_string())
1093                    .unwrap_or_else(|| "Unknown error".to_string());
1094                Err(ClientError::ServerError(error_msg))
1095            },
1096            other => Err(ClientError::InvalidResponse(format!(
1097                "Expected Ack, got {:?}",
1098                other
1099            ))),
1100        }
1101    }
1102
1103    /// Commit consumer offset for checkpointing
1104    pub async fn commit_offset(
1105        &mut self,
1106        topic_id: u32,
1107        consumer_id: u64,
1108        offset: u64,
1109    ) -> Result<CommitResult> {
1110        let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
1111        let frame_bytes = encode_frame(&frame);
1112
1113        trace!(topic_id, consumer_id, offset, "Committing offset");
1114
1115        tokio::time::timeout(
1116            self.config.write_timeout,
1117            self.stream.write_all(&frame_bytes),
1118        )
1119        .await
1120        .map_err(|_| ClientError::Timeout)??;
1121
1122        let response = self.recv_frame().await?;
1123        self.parse_commit_response(response)
1124    }
1125
1126    fn parse_subscribe_response(&self, frame: Frame) -> Result<SubscribeResult> {
1127        match frame.frame_type {
1128            FrameType::Control(ControlCommand::SubscribeAck) => {
1129                let payload = frame.payload.ok_or_else(|| {
1130                    ClientError::InvalidResponse("Empty subscribe response".to_string())
1131                })?;
1132
1133                if payload.len() < 16 {
1134                    return Err(ClientError::ProtocolError(
1135                        "Subscribe response too small".to_string(),
1136                    ));
1137                }
1138
1139                let consumer_id = u64::from_le_bytes([
1140                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1141                    payload[6], payload[7],
1142                ]);
1143                let start_offset = u64::from_le_bytes([
1144                    payload[8],
1145                    payload[9],
1146                    payload[10],
1147                    payload[11],
1148                    payload[12],
1149                    payload[13],
1150                    payload[14],
1151                    payload[15],
1152                ]);
1153
1154                Ok(SubscribeResult {
1155                    consumer_id,
1156                    start_offset,
1157                })
1158            },
1159            FrameType::Control(ControlCommand::ErrorResponse) => {
1160                let error_msg = frame
1161                    .payload
1162                    .map(|p| String::from_utf8_lossy(&p).to_string())
1163                    .unwrap_or_else(|| "Unknown error".to_string());
1164                Err(ClientError::ServerError(error_msg))
1165            },
1166            other => Err(ClientError::InvalidResponse(format!(
1167                "Expected SubscribeAck, got {:?}",
1168                other
1169            ))),
1170        }
1171    }
1172
1173    fn parse_commit_response(&self, frame: Frame) -> Result<CommitResult> {
1174        match frame.frame_type {
1175            FrameType::Control(ControlCommand::CommitAck) => {
1176                let payload = frame.payload.ok_or_else(|| {
1177                    ClientError::InvalidResponse("Empty commit response".to_string())
1178                })?;
1179
1180                if payload.len() < 16 {
1181                    return Err(ClientError::ProtocolError(
1182                        "Commit response too small".to_string(),
1183                    ));
1184                }
1185
1186                let consumer_id = u64::from_le_bytes([
1187                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1188                    payload[6], payload[7],
1189                ]);
1190                let committed_offset = u64::from_le_bytes([
1191                    payload[8],
1192                    payload[9],
1193                    payload[10],
1194                    payload[11],
1195                    payload[12],
1196                    payload[13],
1197                    payload[14],
1198                    payload[15],
1199                ]);
1200
1201                Ok(CommitResult {
1202                    consumer_id,
1203                    committed_offset,
1204                })
1205            },
1206            FrameType::Control(ControlCommand::ErrorResponse) => {
1207                let error_msg = frame
1208                    .payload
1209                    .map(|p| String::from_utf8_lossy(&p).to_string())
1210                    .unwrap_or_else(|| "Unknown error".to_string());
1211                Err(ClientError::ServerError(error_msg))
1212            },
1213            other => Err(ClientError::InvalidResponse(format!(
1214                "Expected CommitAck, got {:?}",
1215                other
1216            ))),
1217        }
1218    }
1219
1220    fn parse_fetch_response(&self, frame: Frame) -> Result<FetchResult> {
1221        match frame.frame_type {
1222            FrameType::Control(ControlCommand::CatchingUp) => {
1223                let server_offset = frame
1224                    .payload
1225                    .as_ref()
1226                    .filter(|p| p.len() >= 8)
1227                    .map(|p| u64::from_le_bytes([p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7]]))
1228                    .unwrap_or(0);
1229                Err(ClientError::ServerCatchingUp { server_offset })
1230            },
1231            FrameType::Control(ControlCommand::FetchResponse) => {
1232                let payload = frame.payload.ok_or_else(|| {
1233                    ClientError::InvalidResponse("Empty fetch response".to_string())
1234                })?;
1235
1236                if payload.len() < 16 {
1237                    return Err(ClientError::ProtocolError(
1238                        "Fetch response too small".to_string(),
1239                    ));
1240                }
1241
1242                let next_offset = u64::from_le_bytes([
1243                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1244                    payload[6], payload[7],
1245                ]);
1246                let bytes_returned =
1247                    u32::from_le_bytes([payload[8], payload[9], payload[10], payload[11]]);
1248                let record_count =
1249                    u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
1250                let data = payload.slice(16..);
1251
1252                Ok(FetchResult {
1253                    data,
1254                    next_offset,
1255                    bytes_returned,
1256                    record_count,
1257                })
1258            },
1259            FrameType::Control(ControlCommand::ErrorResponse) => {
1260                let error_msg = frame
1261                    .payload
1262                    .map(|p| String::from_utf8_lossy(&p).to_string())
1263                    .unwrap_or_else(|| "Unknown error".to_string());
1264                Err(ClientError::ServerError(error_msg))
1265            },
1266            other => Err(ClientError::InvalidResponse(format!(
1267                "Expected FetchResponse, got {:?}",
1268                other
1269            ))),
1270        }
1271    }
1272
1273    fn parse_delete_response(&self, frame: Frame) -> Result<()> {
1274        expect_success_response(frame)
1275    }
1276
1277    fn parse_retention_response(&self, frame: Frame) -> Result<()> {
1278        expect_success_response(frame)
1279    }
1280
1281    fn parse_topic_response(&self, frame: Frame) -> Result<TopicInfo> {
1282        match frame.frame_type {
1283            FrameType::Control(ControlCommand::TopicResponse) => {
1284                let payload = frame.payload.ok_or_else(|| {
1285                    ClientError::InvalidResponse("Empty topic response".to_string())
1286                })?;
1287                let json: serde_json::Value = serde_json::from_slice(&payload)
1288                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
1289
1290                let retention = if json.get("retention").is_some() {
1291                    Some(RetentionInfo {
1292                        max_age_secs: json["retention"]["max_age_secs"].as_u64().unwrap_or(0),
1293                        max_bytes: json["retention"]["max_bytes"].as_u64().unwrap_or(0),
1294                    })
1295                } else {
1296                    None
1297                };
1298
1299                Ok(TopicInfo {
1300                    id: json["id"].as_u64().unwrap_or(0) as u32,
1301                    name: json["name"].as_str().unwrap_or("").to_string(),
1302                    created_at: json["created_at"].as_u64().unwrap_or(0),
1303                    topic_epoch: json["topic_epoch"].as_u64().unwrap_or(1),
1304                    retention,
1305                })
1306            },
1307            FrameType::Control(ControlCommand::ErrorResponse) => {
1308                let error_msg = frame
1309                    .payload
1310                    .map(|p| String::from_utf8_lossy(&p).to_string())
1311                    .unwrap_or_else(|| "Unknown error".to_string());
1312                Err(ClientError::ServerError(error_msg))
1313            },
1314            other => Err(ClientError::InvalidResponse(format!(
1315                "Expected TopicResponse, got {:?}",
1316                other
1317            ))),
1318        }
1319    }
1320
1321    fn parse_topic_list_response(&self, frame: Frame) -> Result<Vec<TopicInfo>> {
1322        match frame.frame_type {
1323            FrameType::Control(ControlCommand::TopicResponse) => {
1324                let payload = frame.payload.ok_or_else(|| {
1325                    ClientError::InvalidResponse("Empty topic list response".to_string())
1326                })?;
1327                let json: serde_json::Value = serde_json::from_slice(&payload)
1328                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
1329
1330                let topics = json["topics"]
1331                    .as_array()
1332                    .map(|arr| {
1333                        arr.iter()
1334                            .map(|t| {
1335                                let retention = if t.get("retention").is_some() {
1336                                    Some(RetentionInfo {
1337                                        max_age_secs: t["retention"]["max_age_secs"]
1338                                            .as_u64()
1339                                            .unwrap_or(0),
1340                                        max_bytes: t["retention"]["max_bytes"]
1341                                            .as_u64()
1342                                            .unwrap_or(0),
1343                                    })
1344                                } else {
1345                                    None
1346                                };
1347                                TopicInfo {
1348                                    id: t["id"].as_u64().unwrap_or(0) as u32,
1349                                    name: t["name"].as_str().unwrap_or("").to_string(),
1350                                    created_at: t["created_at"].as_u64().unwrap_or(0),
1351                                    topic_epoch: t["topic_epoch"].as_u64().unwrap_or(1),
1352                                    retention,
1353                                }
1354                            })
1355                            .collect()
1356                    })
1357                    .unwrap_or_default();
1358
1359                Ok(topics)
1360            },
1361            FrameType::Control(ControlCommand::ErrorResponse) => {
1362                let error_msg = frame
1363                    .payload
1364                    .map(|p| String::from_utf8_lossy(&p).to_string())
1365                    .unwrap_or_else(|| "Unknown error".to_string());
1366                Err(ClientError::ServerError(error_msg))
1367            },
1368            other => Err(ClientError::InvalidResponse(format!(
1369                "Expected TopicResponse, got {:?}",
1370                other
1371            ))),
1372        }
1373    }
1374
1375    /// Reads the next wire frame into the preallocated buffer, preserving the
1376    /// Architecture §15 zero-copy guarantees while translating parse failures
1377    /// into structured [`ClientError`] values.
1378    ///
1379    /// # Returns
1380    /// * `Result<Frame>` - Fully parsed LWP frame ready for higher-level
1381    ///   ingestion/consumer handlers.
1382    ///
1383    /// # Errors
1384    /// Propagates timeouts, malformed headers, or connection closures so
1385    /// callers can fail fast when the transport becomes unhealthy.
1386    async fn recv_frame(&mut self) -> Result<Frame> {
1387        // Max frame size cap to prevent OOM from malformed headers (16 MB)
1388        const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
1389
1390        loop {
1391            if self.read_offset >= LWP_HEADER_SIZE {
1392                // Grow buffer if the header indicates a payload larger than current capacity
1393                let payload_len = u32::from_le_bytes([
1394                    self.read_buffer[32],
1395                    self.read_buffer[33],
1396                    self.read_buffer[34],
1397                    self.read_buffer[35],
1398                ]) as usize;
1399                let total_frame_size = LWP_HEADER_SIZE + payload_len;
1400                if total_frame_size > MAX_FRAME_SIZE {
1401                    return Err(ClientError::ServerError(format!(
1402                        "Frame too large: {} bytes",
1403                        total_frame_size
1404                    )));
1405                }
1406                if total_frame_size > self.read_buffer.len() {
1407                    self.read_buffer.resize(total_frame_size, 0);
1408                }
1409
1410                if let Some((frame, consumed)) = parse_frame(&self.read_buffer[..self.read_offset])?
1411                {
1412                    self.read_buffer.copy_within(consumed..self.read_offset, 0);
1413                    self.read_offset -= consumed;
1414                    // Shrink buffer back to default if it was grown for a large frame
1415                    if self.read_buffer.len() > 64 * 1024 && self.read_offset < 64 * 1024 {
1416                        self.read_buffer.resize(64 * 1024, 0);
1417                    }
1418                    return Ok(frame);
1419                }
1420            }
1421
1422            let n = tokio::time::timeout(
1423                self.config.read_timeout,
1424                self.stream.read(&mut self.read_buffer[self.read_offset..]),
1425            )
1426            .await
1427            .map_err(|_| ClientError::Timeout)??;
1428
1429            if n == 0 {
1430                return Err(ClientError::ConnectionClosed);
1431            }
1432
1433            self.read_offset += n;
1434        }
1435    }
1436
1437    /// Get a reference to the client configuration
1438    pub fn config(&self) -> &ClientConfig {
1439        &self.config
1440    }
1441
1442    /// Close the client connection
1443    pub async fn close(mut self) -> Result<()> {
1444        self.stream.shutdown().await?;
1445        Ok(())
1446    }
1447}
1448
1449impl std::fmt::Debug for LanceClient {
1450    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1451        f.debug_struct("LanceClient")
1452            .field("addr", &self.config.addr)
1453            .field("batch_id", &self.batch_id.load(Ordering::SeqCst))
1454            .finish()
1455    }
1456}