Skip to main content

lnc_client/
client.rs

1use crate::error::{ClientError, Result};
2use crate::tls::TlsClientConfig;
3use bytes::Bytes;
4use lnc_network::{
5    ControlCommand, Frame, FrameType, LWP_HEADER_SIZE, TlsConnector, encode_frame, parse_frame,
6};
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
13use tokio::net::TcpStream;
14use tracing::{debug, trace, warn};
15
16/// Wrapper enum for TCP and TLS streams to avoid dynamic dispatch
17#[allow(clippy::large_enum_variant)]
18pub enum ClientStream {
19    /// Plain TCP connection
20    Tcp(TcpStream),
21    /// TLS-encrypted connection
22    Tls(tokio_rustls::client::TlsStream<TcpStream>),
23}
24
25impl AsyncRead for ClientStream {
26    fn poll_read(
27        self: Pin<&mut Self>,
28        cx: &mut Context<'_>,
29        buf: &mut ReadBuf<'_>,
30    ) -> Poll<std::io::Result<()>> {
31        match self.get_mut() {
32            ClientStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
33            ClientStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
34        }
35    }
36}
37
38impl AsyncWrite for ClientStream {
39    fn poll_write(
40        self: Pin<&mut Self>,
41        cx: &mut Context<'_>,
42        buf: &[u8],
43    ) -> Poll<std::io::Result<usize>> {
44        match self.get_mut() {
45            ClientStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
46            ClientStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
47        }
48    }
49
50    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
51        match self.get_mut() {
52            ClientStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
53            ClientStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
54        }
55    }
56
57    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
58        match self.get_mut() {
59            ClientStream::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
60            ClientStream::Tls(stream) => Pin::new(stream).poll_shutdown(cx),
61        }
62    }
63}
64
65/// Helper to extract error message from a frame payload
66fn extract_error_message(frame: &Frame) -> String {
67    frame
68        .payload
69        .as_ref()
70        .map(|p| String::from_utf8_lossy(p).to_string())
71        .unwrap_or_else(|| "Unknown error".to_string())
72}
73
74/// Helper to validate frame type and extract error if present
75#[allow(dead_code)] // Reserved for future protocol extensions
76fn expect_frame_type(frame: Frame, expected: ControlCommand, expected_name: &str) -> Result<Frame> {
77    match frame.frame_type {
78        FrameType::Control(cmd) if cmd == expected => Ok(frame),
79        FrameType::Control(ControlCommand::ErrorResponse) => {
80            Err(ClientError::ServerError(extract_error_message(&frame)))
81        },
82        other => Err(ClientError::InvalidResponse(format!(
83            "Expected {}, got {:?}",
84            expected_name, other
85        ))),
86    }
87}
88
89/// Helper to validate frame is a success response (TopicResponse or similar)
90fn expect_success_response(frame: Frame) -> Result<()> {
91    match frame.frame_type {
92        FrameType::Control(ControlCommand::TopicResponse) => Ok(()),
93        FrameType::Control(ControlCommand::ErrorResponse) => {
94            Err(ClientError::ServerError(extract_error_message(&frame)))
95        },
96        other => Err(ClientError::InvalidResponse(format!(
97            "Expected TopicResponse, got {:?}",
98            other
99        ))),
100    }
101}
102
103#[derive(Debug, Clone, Default)]
104pub struct AuthConfig {
105    pub mtls_enabled: bool,
106    pub client_cert_path: Option<String>,
107    pub client_key_path: Option<String>,
108}
109
110/// Retention configuration for a topic
111#[derive(Debug, Clone, Default)]
112pub struct RetentionInfo {
113    /// Maximum age of messages in seconds (0 = no limit)
114    pub max_age_secs: u64,
115    /// Maximum size of topic data in bytes (0 = no limit)
116    pub max_bytes: u64,
117}
118
119#[derive(Debug, Clone)]
120pub struct TopicInfo {
121    pub id: u32,
122    pub name: String,
123    pub created_at: u64,
124    /// Retention policy configuration (None = no retention policy set)
125    pub retention: Option<RetentionInfo>,
126}
127
128/// Result of a fetch operation
129#[derive(Debug, Clone)]
130pub struct FetchResult {
131    pub data: Bytes,
132    pub next_offset: u64,
133    pub bytes_returned: u32,
134    pub record_count: u32,
135}
136
137/// Result of a subscribe operation
138#[derive(Debug, Clone)]
139pub struct SubscribeResult {
140    pub consumer_id: u64,
141    pub start_offset: u64,
142}
143
144/// Result of a commit offset operation
145#[derive(Debug, Clone)]
146pub struct CommitResult {
147    pub consumer_id: u64,
148    pub committed_offset: u64,
149}
150
151/// Cluster status information
152#[derive(Debug, Clone)]
153pub struct ClusterStatus {
154    pub node_id: u16,
155    pub is_leader: bool,
156    pub leader_id: Option<u16>,
157    pub current_term: u64,
158    pub node_count: usize,
159    pub healthy_nodes: usize,
160    pub quorum_available: bool,
161    pub peer_states: std::collections::HashMap<u16, String>,
162}
163
164#[derive(Debug, Clone)]
165pub struct ClientConfig {
166    pub addr: SocketAddr,
167    pub connect_timeout: Duration,
168    pub read_timeout: Duration,
169    pub write_timeout: Duration,
170    pub keepalive_interval: Duration,
171    /// Optional TLS configuration for encrypted connections
172    pub tls: Option<TlsClientConfig>,
173}
174
175impl Default for ClientConfig {
176    fn default() -> Self {
177        Self {
178            addr: "127.0.0.1:1992"
179                .parse()
180                .unwrap_or_else(|_| SocketAddr::from(([127, 0, 0, 1], 1992))),
181            connect_timeout: Duration::from_secs(10),
182            read_timeout: Duration::from_secs(30),
183            write_timeout: Duration::from_secs(10),
184            keepalive_interval: Duration::from_secs(10),
185            tls: None,
186        }
187    }
188}
189
190impl ClientConfig {
191    pub fn new(addr: SocketAddr) -> Self {
192        Self {
193            addr,
194            ..Default::default()
195        }
196    }
197
198    /// Enable TLS with the provided configuration
199    pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
200        self.tls = Some(tls_config);
201        self
202    }
203
204    /// Check if TLS is enabled
205    pub fn is_tls_enabled(&self) -> bool {
206        self.tls.is_some()
207    }
208}
209
210pub struct LanceClient {
211    stream: ClientStream,
212    config: ClientConfig,
213    batch_id: AtomicU64,
214    read_buffer: Vec<u8>,
215    read_offset: usize,
216}
217
218impl LanceClient {
219    /// Connect to LANCE server, automatically using TLS if configured
220    pub async fn connect(config: ClientConfig) -> Result<Self> {
221        // If TLS is configured in ClientConfig, use TLS connection
222        if let Some(ref tls_config) = config.tls {
223            return Self::connect_tls(config.clone(), tls_config.clone()).await;
224        }
225
226        debug!(addr = %config.addr, "Connecting to LANCE server");
227
228        let stream = tokio::time::timeout(config.connect_timeout, TcpStream::connect(config.addr))
229            .await
230            .map_err(|_| ClientError::Timeout)?
231            .map_err(ClientError::ConnectionFailed)?;
232
233        stream.set_nodelay(true)?;
234
235        debug!(addr = %config.addr, "Connected to LANCE server");
236
237        Ok(Self {
238            stream: ClientStream::Tcp(stream),
239            config,
240            batch_id: AtomicU64::new(0),
241            read_buffer: vec![0u8; 64 * 1024],
242            read_offset: 0,
243        })
244    }
245
246    /// Connect to LANCE server with TLS encryption
247    ///
248    /// # Arguments
249    /// * `config` - Client configuration with server address
250    /// * `tls_config` - TLS configuration including certificates
251    ///
252    /// # Example
253    /// ```rust,ignore
254    /// use lnc_client::{ClientConfig, TlsClientConfig, LanceClient};
255    ///
256    /// let config = ClientConfig::new("127.0.0.1:1992".parse().unwrap());
257    /// let tls = TlsClientConfig::new()
258    ///     .with_ca_cert("/path/to/ca.pem");
259    ///
260    /// let client = LanceClient::connect_tls(config, tls).await?;
261    /// ```
262    pub async fn connect_tls(config: ClientConfig, tls_config: TlsClientConfig) -> Result<Self> {
263        debug!(addr = %config.addr, "Connecting to LANCE server with TLS");
264
265        // First establish TCP connection
266        let tcp_stream =
267            tokio::time::timeout(config.connect_timeout, TcpStream::connect(config.addr))
268                .await
269                .map_err(|_| ClientError::Timeout)?
270                .map_err(ClientError::ConnectionFailed)?;
271
272        tcp_stream.set_nodelay(true)?;
273
274        // Create TLS connector
275        let network_config = tls_config.to_network_config();
276        let connector =
277            TlsConnector::new(network_config).map_err(|e| ClientError::TlsError(e.to_string()))?;
278
279        // Determine server name for SNI
280        let server_name = tls_config
281            .server_name
282            .unwrap_or_else(|| config.addr.ip().to_string());
283
284        // Perform TLS handshake
285        let tls_stream = connector
286            .connect(&server_name, tcp_stream)
287            .await
288            .map_err(|e| ClientError::TlsError(e.to_string()))?;
289
290        debug!(addr = %config.addr, "TLS connection established");
291
292        Ok(Self {
293            stream: ClientStream::Tls(tls_stream),
294            config,
295            batch_id: AtomicU64::new(0),
296            read_buffer: vec![0u8; 64 * 1024],
297            read_offset: 0,
298        })
299    }
300
301    pub async fn connect_to(addr: &str) -> Result<Self> {
302        let socket_addr: SocketAddr = addr
303            .parse()
304            .map_err(|e| ClientError::ProtocolError(format!("Invalid address: {}", e)))?;
305        Self::connect(ClientConfig::new(socket_addr)).await
306    }
307
308    /// Connect to LANCE server with TLS using address string
309    pub async fn connect_tls_to(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
310        let socket_addr: SocketAddr = addr
311            .parse()
312            .map_err(|e| ClientError::ProtocolError(format!("Invalid address: {}", e)))?;
313        Self::connect_tls(ClientConfig::new(socket_addr), tls_config).await
314    }
315
316    fn next_batch_id(&self) -> u64 {
317        self.batch_id.fetch_add(1, Ordering::SeqCst) + 1
318    }
319
320    pub async fn send_ingest(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
321        self.send_ingest_to_topic(0, payload, record_count, None)
322            .await
323    }
324
325    pub async fn send_ingest_to_topic(
326        &mut self,
327        topic_id: u32,
328        payload: Bytes,
329        record_count: u32,
330        _auth_config: Option<&AuthConfig>,
331    ) -> Result<u64> {
332        let batch_id = self.next_batch_id();
333        let timestamp_ns = std::time::SystemTime::now()
334            .duration_since(std::time::UNIX_EPOCH)
335            .map(|d| d.as_nanos() as u64)
336            .unwrap_or(0);
337
338        let frame =
339            Frame::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, topic_id);
340        let frame_bytes = encode_frame(&frame);
341
342        trace!(
343            batch_id,
344            topic_id,
345            payload_len = frame.payload_length(),
346            "Sending ingest frame"
347        );
348
349        tokio::time::timeout(
350            self.config.write_timeout,
351            self.stream.write_all(&frame_bytes),
352        )
353        .await
354        .map_err(|_| ClientError::Timeout)??;
355
356        Ok(batch_id)
357    }
358
359    pub async fn send_ingest_sync(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
360        self.send_ingest_to_topic_sync(0, payload, record_count, None)
361            .await
362    }
363
364    pub async fn send_ingest_to_topic_sync(
365        &mut self,
366        topic_id: u32,
367        payload: Bytes,
368        record_count: u32,
369        auth_config: Option<&AuthConfig>,
370    ) -> Result<u64> {
371        let batch_id = self
372            .send_ingest_to_topic(topic_id, payload, record_count, auth_config)
373            .await?;
374        self.wait_for_ack(batch_id).await
375    }
376
377    async fn wait_for_ack(&mut self, expected_batch_id: u64) -> Result<u64> {
378        let frame = self.recv_frame().await?;
379
380        match frame.frame_type {
381            FrameType::Ack => {
382                let acked_id = frame.batch_id();
383                if acked_id != expected_batch_id {
384                    return Err(ClientError::InvalidResponse(format!(
385                        "Ack batch_id mismatch: sent {}, received {}",
386                        expected_batch_id, acked_id
387                    )));
388                }
389                trace!(batch_id = acked_id, "Received ack");
390                Ok(acked_id)
391            },
392            FrameType::Control(ControlCommand::ErrorResponse) => {
393                let error_msg = frame
394                    .payload
395                    .map(|p| String::from_utf8_lossy(&p).to_string())
396                    .unwrap_or_else(|| "Unknown error".to_string());
397                Err(ClientError::ServerError(error_msg))
398            },
399            FrameType::Backpressure => {
400                warn!("Server signaled backpressure");
401                Err(ClientError::ServerBackpressure)
402            },
403            other => Err(ClientError::InvalidResponse(format!(
404                "Expected Ack, got {:?}",
405                other
406            ))),
407        }
408    }
409
410    pub async fn recv_ack(&mut self) -> Result<u64> {
411        let frame = self.recv_frame().await?;
412
413        match frame.frame_type {
414            FrameType::Ack => {
415                trace!(batch_id = frame.batch_id(), "Received ack");
416                Ok(frame.batch_id())
417            },
418            FrameType::Backpressure => {
419                warn!("Server signaled backpressure");
420                Err(ClientError::ServerBackpressure)
421            },
422            other => Err(ClientError::InvalidResponse(format!(
423                "Expected Ack, got {:?}",
424                other
425            ))),
426        }
427    }
428
429    pub async fn send_keepalive(&mut self) -> Result<()> {
430        let frame = Frame::new_keepalive();
431        let frame_bytes = encode_frame(&frame);
432
433        trace!("Sending keepalive");
434
435        tokio::time::timeout(
436            self.config.write_timeout,
437            self.stream.write_all(&frame_bytes),
438        )
439        .await
440        .map_err(|_| ClientError::Timeout)??;
441
442        Ok(())
443    }
444
445    pub async fn recv_keepalive(&mut self) -> Result<()> {
446        let frame = self.recv_frame().await?;
447
448        match frame.frame_type {
449            FrameType::Keepalive => {
450                trace!("Received keepalive response");
451                Ok(())
452            },
453            other => Err(ClientError::InvalidResponse(format!(
454                "Expected Keepalive, got {:?}",
455                other
456            ))),
457        }
458    }
459
460    pub async fn ping(&mut self) -> Result<Duration> {
461        let start = std::time::Instant::now();
462        self.send_keepalive().await?;
463        self.recv_keepalive().await?;
464        Ok(start.elapsed())
465    }
466
467    pub async fn create_topic(&mut self, name: &str) -> Result<TopicInfo> {
468        let frame = Frame::new_create_topic(name);
469        let frame_bytes = encode_frame(&frame);
470
471        trace!(topic_name = %name, "Creating topic");
472
473        tokio::time::timeout(
474            self.config.write_timeout,
475            self.stream.write_all(&frame_bytes),
476        )
477        .await
478        .map_err(|_| ClientError::Timeout)??;
479
480        let response = self.recv_frame().await?;
481        self.parse_topic_response(response)
482    }
483
484    pub async fn list_topics(&mut self) -> Result<Vec<TopicInfo>> {
485        let frame = Frame::new_list_topics();
486        let frame_bytes = encode_frame(&frame);
487
488        trace!("Listing topics");
489
490        tokio::time::timeout(
491            self.config.write_timeout,
492            self.stream.write_all(&frame_bytes),
493        )
494        .await
495        .map_err(|_| ClientError::Timeout)??;
496
497        let response = self.recv_frame().await?;
498        self.parse_topic_list_response(response)
499    }
500
501    pub async fn get_topic(&mut self, topic_id: u32) -> Result<TopicInfo> {
502        let frame = Frame::new_get_topic(topic_id);
503        let frame_bytes = encode_frame(&frame);
504
505        trace!(topic_id, "Getting topic");
506
507        tokio::time::timeout(
508            self.config.write_timeout,
509            self.stream.write_all(&frame_bytes),
510        )
511        .await
512        .map_err(|_| ClientError::Timeout)??;
513
514        let response = self.recv_frame().await?;
515        self.parse_topic_response(response)
516    }
517
518    pub async fn delete_topic(&mut self, topic_id: u32) -> Result<()> {
519        let frame = Frame::new_delete_topic(topic_id);
520        let frame_bytes = encode_frame(&frame);
521
522        trace!(topic_id, "Deleting topic");
523
524        tokio::time::timeout(
525            self.config.write_timeout,
526            self.stream.write_all(&frame_bytes),
527        )
528        .await
529        .map_err(|_| ClientError::Timeout)??;
530
531        let response = self.recv_frame().await?;
532        self.parse_delete_response(response)
533    }
534
535    /// Set retention policy for an existing topic
536    ///
537    /// # Arguments
538    /// * `topic_id` - Topic identifier
539    /// * `max_age_secs` - Maximum age in seconds (0 = no limit)
540    /// * `max_bytes` - Maximum size in bytes (0 = no limit)
541    pub async fn set_retention(
542        &mut self,
543        topic_id: u32,
544        max_age_secs: u64,
545        max_bytes: u64,
546    ) -> Result<()> {
547        let frame = Frame::new_set_retention(topic_id, max_age_secs, max_bytes);
548        let frame_bytes = encode_frame(&frame);
549
550        trace!(
551            topic_id,
552            max_age_secs, max_bytes, "Setting retention policy"
553        );
554
555        tokio::time::timeout(
556            self.config.write_timeout,
557            self.stream.write_all(&frame_bytes),
558        )
559        .await
560        .map_err(|_| ClientError::Timeout)??;
561
562        let response = self.recv_frame().await?;
563        self.parse_retention_response(response)
564    }
565
566    /// Create a topic with retention policy in a single operation
567    ///
568    /// # Arguments
569    /// * `name` - Topic name
570    /// * `max_age_secs` - Maximum age in seconds (0 = no limit)
571    /// * `max_bytes` - Maximum size in bytes (0 = no limit)
572    pub async fn create_topic_with_retention(
573        &mut self,
574        name: &str,
575        max_age_secs: u64,
576        max_bytes: u64,
577    ) -> Result<TopicInfo> {
578        let frame = Frame::new_create_topic_with_retention(name, max_age_secs, max_bytes);
579        let frame_bytes = encode_frame(&frame);
580
581        trace!(
582            name,
583            max_age_secs, max_bytes, "Creating topic with retention"
584        );
585
586        tokio::time::timeout(
587            self.config.write_timeout,
588            self.stream.write_all(&frame_bytes),
589        )
590        .await
591        .map_err(|_| ClientError::Timeout)??;
592
593        let response = self.recv_frame().await?;
594        self.parse_topic_response(response)
595    }
596
597    /// Get cluster status and health information
598    pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus> {
599        let frame = Frame::new_get_cluster_status();
600        let frame_bytes = encode_frame(&frame);
601
602        tokio::time::timeout(
603            self.config.write_timeout,
604            self.stream.write_all(&frame_bytes),
605        )
606        .await
607        .map_err(|_| ClientError::Timeout)??;
608
609        let response = self.recv_frame().await?;
610        self.parse_cluster_status_response(response)
611    }
612
613    fn parse_cluster_status_response(&self, frame: Frame) -> Result<ClusterStatus> {
614        match frame.frame_type {
615            FrameType::Control(ControlCommand::ClusterStatusResponse) => {
616                let payload = frame.payload.ok_or_else(|| {
617                    ClientError::InvalidResponse("Empty cluster status response".to_string())
618                })?;
619                let json: serde_json::Value = serde_json::from_slice(&payload)
620                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
621
622                let peer_states: std::collections::HashMap<u16, String> = json["peer_states"]
623                    .as_object()
624                    .map(|obj| {
625                        obj.iter()
626                            .filter_map(|(k, v)| {
627                                k.parse::<u16>()
628                                    .ok()
629                                    .map(|id| (id, v.as_str().unwrap_or("unknown").to_string()))
630                            })
631                            .collect()
632                    })
633                    .unwrap_or_default();
634
635                Ok(ClusterStatus {
636                    node_id: json["node_id"].as_u64().unwrap_or(0) as u16,
637                    is_leader: json["is_leader"].as_bool().unwrap_or(false),
638                    leader_id: json["leader_id"].as_u64().map(|id| id as u16),
639                    current_term: json["current_term"].as_u64().unwrap_or(0),
640                    node_count: json["node_count"].as_u64().unwrap_or(1) as usize,
641                    healthy_nodes: json["healthy_nodes"].as_u64().unwrap_or(1) as usize,
642                    quorum_available: json["quorum_available"].as_bool().unwrap_or(true),
643                    peer_states,
644                })
645            },
646            FrameType::Control(ControlCommand::ErrorResponse) => {
647                let error_msg = frame
648                    .payload
649                    .map(|p| String::from_utf8_lossy(&p).to_string())
650                    .unwrap_or_else(|| "Unknown error".to_string());
651                Err(ClientError::ServerError(error_msg))
652            },
653            other => Err(ClientError::InvalidResponse(format!(
654                "Expected ClusterStatusResponse, got {:?}",
655                other
656            ))),
657        }
658    }
659
660    /// Fetch data from a topic starting at the given offset
661    /// Returns (data, next_offset, record_count)
662    pub async fn fetch(
663        &mut self,
664        topic_id: u32,
665        start_offset: u64,
666        max_bytes: u32,
667    ) -> Result<FetchResult> {
668        let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
669        let frame_bytes = encode_frame(&frame);
670
671        trace!(topic_id, start_offset, max_bytes, "Fetching data");
672
673        tokio::time::timeout(
674            self.config.write_timeout,
675            self.stream.write_all(&frame_bytes),
676        )
677        .await
678        .map_err(|_| ClientError::Timeout)??;
679
680        let response = self.recv_frame().await?;
681        self.parse_fetch_response(response)
682    }
683
684    /// Subscribe to a topic for streaming data
685    /// Returns the consumer ID and starting offset
686    pub async fn subscribe(
687        &mut self,
688        topic_id: u32,
689        start_offset: u64,
690        max_batch_bytes: u32,
691        consumer_id: u64,
692    ) -> Result<SubscribeResult> {
693        let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
694        let frame_bytes = encode_frame(&frame);
695
696        trace!(topic_id, start_offset, consumer_id, "Subscribing to topic");
697
698        tokio::time::timeout(
699            self.config.write_timeout,
700            self.stream.write_all(&frame_bytes),
701        )
702        .await
703        .map_err(|_| ClientError::Timeout)??;
704
705        let response = self.recv_frame().await?;
706        self.parse_subscribe_response(response)
707    }
708
709    /// Unsubscribe from a topic
710    pub async fn unsubscribe(&mut self, topic_id: u32, consumer_id: u64) -> Result<()> {
711        let frame = Frame::new_unsubscribe(topic_id, consumer_id);
712        let frame_bytes = encode_frame(&frame);
713
714        trace!(topic_id, consumer_id, "Unsubscribing from topic");
715
716        tokio::time::timeout(
717            self.config.write_timeout,
718            self.stream.write_all(&frame_bytes),
719        )
720        .await
721        .map_err(|_| ClientError::Timeout)??;
722
723        // Wait for ack or error
724        let response = self.recv_frame().await?;
725        match response.frame_type {
726            FrameType::Ack => Ok(()),
727            FrameType::Control(ControlCommand::ErrorResponse) => {
728                let error_msg = response
729                    .payload
730                    .map(|p| String::from_utf8_lossy(&p).to_string())
731                    .unwrap_or_else(|| "Unknown error".to_string());
732                Err(ClientError::ServerError(error_msg))
733            },
734            other => Err(ClientError::InvalidResponse(format!(
735                "Expected Ack, got {:?}",
736                other
737            ))),
738        }
739    }
740
741    /// Commit consumer offset for checkpointing
742    pub async fn commit_offset(
743        &mut self,
744        topic_id: u32,
745        consumer_id: u64,
746        offset: u64,
747    ) -> Result<CommitResult> {
748        let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
749        let frame_bytes = encode_frame(&frame);
750
751        trace!(topic_id, consumer_id, offset, "Committing offset");
752
753        tokio::time::timeout(
754            self.config.write_timeout,
755            self.stream.write_all(&frame_bytes),
756        )
757        .await
758        .map_err(|_| ClientError::Timeout)??;
759
760        let response = self.recv_frame().await?;
761        self.parse_commit_response(response)
762    }
763
764    fn parse_subscribe_response(&self, frame: Frame) -> Result<SubscribeResult> {
765        match frame.frame_type {
766            FrameType::Control(ControlCommand::SubscribeAck) => {
767                let payload = frame.payload.ok_or_else(|| {
768                    ClientError::InvalidResponse("Empty subscribe response".to_string())
769                })?;
770
771                if payload.len() < 16 {
772                    return Err(ClientError::ProtocolError(
773                        "Subscribe response too small".to_string(),
774                    ));
775                }
776
777                let consumer_id = u64::from_le_bytes([
778                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
779                    payload[6], payload[7],
780                ]);
781                let start_offset = u64::from_le_bytes([
782                    payload[8],
783                    payload[9],
784                    payload[10],
785                    payload[11],
786                    payload[12],
787                    payload[13],
788                    payload[14],
789                    payload[15],
790                ]);
791
792                Ok(SubscribeResult {
793                    consumer_id,
794                    start_offset,
795                })
796            },
797            FrameType::Control(ControlCommand::ErrorResponse) => {
798                let error_msg = frame
799                    .payload
800                    .map(|p| String::from_utf8_lossy(&p).to_string())
801                    .unwrap_or_else(|| "Unknown error".to_string());
802                Err(ClientError::ServerError(error_msg))
803            },
804            other => Err(ClientError::InvalidResponse(format!(
805                "Expected SubscribeAck, got {:?}",
806                other
807            ))),
808        }
809    }
810
811    fn parse_commit_response(&self, frame: Frame) -> Result<CommitResult> {
812        match frame.frame_type {
813            FrameType::Control(ControlCommand::CommitAck) => {
814                let payload = frame.payload.ok_or_else(|| {
815                    ClientError::InvalidResponse("Empty commit response".to_string())
816                })?;
817
818                if payload.len() < 16 {
819                    return Err(ClientError::ProtocolError(
820                        "Commit response too small".to_string(),
821                    ));
822                }
823
824                let consumer_id = u64::from_le_bytes([
825                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
826                    payload[6], payload[7],
827                ]);
828                let committed_offset = u64::from_le_bytes([
829                    payload[8],
830                    payload[9],
831                    payload[10],
832                    payload[11],
833                    payload[12],
834                    payload[13],
835                    payload[14],
836                    payload[15],
837                ]);
838
839                Ok(CommitResult {
840                    consumer_id,
841                    committed_offset,
842                })
843            },
844            FrameType::Control(ControlCommand::ErrorResponse) => {
845                let error_msg = frame
846                    .payload
847                    .map(|p| String::from_utf8_lossy(&p).to_string())
848                    .unwrap_or_else(|| "Unknown error".to_string());
849                Err(ClientError::ServerError(error_msg))
850            },
851            other => Err(ClientError::InvalidResponse(format!(
852                "Expected CommitAck, got {:?}",
853                other
854            ))),
855        }
856    }
857
858    fn parse_fetch_response(&self, frame: Frame) -> Result<FetchResult> {
859        match frame.frame_type {
860            FrameType::Control(ControlCommand::FetchResponse) => {
861                let payload = frame.payload.ok_or_else(|| {
862                    ClientError::InvalidResponse("Empty fetch response".to_string())
863                })?;
864
865                if payload.len() < 16 {
866                    return Err(ClientError::ProtocolError(
867                        "Fetch response too small".to_string(),
868                    ));
869                }
870
871                let next_offset = u64::from_le_bytes([
872                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
873                    payload[6], payload[7],
874                ]);
875                let bytes_returned =
876                    u32::from_le_bytes([payload[8], payload[9], payload[10], payload[11]]);
877                let record_count =
878                    u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
879                let data = payload.slice(16..);
880
881                Ok(FetchResult {
882                    data,
883                    next_offset,
884                    bytes_returned,
885                    record_count,
886                })
887            },
888            FrameType::Control(ControlCommand::ErrorResponse) => {
889                let error_msg = frame
890                    .payload
891                    .map(|p| String::from_utf8_lossy(&p).to_string())
892                    .unwrap_or_else(|| "Unknown error".to_string());
893                Err(ClientError::ServerError(error_msg))
894            },
895            other => Err(ClientError::InvalidResponse(format!(
896                "Expected FetchResponse, got {:?}",
897                other
898            ))),
899        }
900    }
901
902    fn parse_delete_response(&self, frame: Frame) -> Result<()> {
903        expect_success_response(frame)
904    }
905
906    fn parse_retention_response(&self, frame: Frame) -> Result<()> {
907        expect_success_response(frame)
908    }
909
910    fn parse_topic_response(&self, frame: Frame) -> Result<TopicInfo> {
911        match frame.frame_type {
912            FrameType::Control(ControlCommand::TopicResponse) => {
913                let payload = frame.payload.ok_or_else(|| {
914                    ClientError::InvalidResponse("Empty topic response".to_string())
915                })?;
916                let json: serde_json::Value = serde_json::from_slice(&payload)
917                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
918
919                let retention = if json.get("retention").is_some() {
920                    Some(RetentionInfo {
921                        max_age_secs: json["retention"]["max_age_secs"].as_u64().unwrap_or(0),
922                        max_bytes: json["retention"]["max_bytes"].as_u64().unwrap_or(0),
923                    })
924                } else {
925                    None
926                };
927
928                Ok(TopicInfo {
929                    id: json["id"].as_u64().unwrap_or(0) as u32,
930                    name: json["name"].as_str().unwrap_or("").to_string(),
931                    created_at: json["created_at"].as_u64().unwrap_or(0),
932                    retention,
933                })
934            },
935            FrameType::Control(ControlCommand::ErrorResponse) => {
936                let error_msg = frame
937                    .payload
938                    .map(|p| String::from_utf8_lossy(&p).to_string())
939                    .unwrap_or_else(|| "Unknown error".to_string());
940                Err(ClientError::ServerError(error_msg))
941            },
942            other => Err(ClientError::InvalidResponse(format!(
943                "Expected TopicResponse, got {:?}",
944                other
945            ))),
946        }
947    }
948
949    fn parse_topic_list_response(&self, frame: Frame) -> Result<Vec<TopicInfo>> {
950        match frame.frame_type {
951            FrameType::Control(ControlCommand::TopicResponse) => {
952                let payload = frame.payload.ok_or_else(|| {
953                    ClientError::InvalidResponse("Empty topic list response".to_string())
954                })?;
955                let json: serde_json::Value = serde_json::from_slice(&payload)
956                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
957
958                let topics = json["topics"]
959                    .as_array()
960                    .map(|arr| {
961                        arr.iter()
962                            .map(|t| {
963                                let retention = if t.get("retention").is_some() {
964                                    Some(RetentionInfo {
965                                        max_age_secs: t["retention"]["max_age_secs"]
966                                            .as_u64()
967                                            .unwrap_or(0),
968                                        max_bytes: t["retention"]["max_bytes"]
969                                            .as_u64()
970                                            .unwrap_or(0),
971                                    })
972                                } else {
973                                    None
974                                };
975                                TopicInfo {
976                                    id: t["id"].as_u64().unwrap_or(0) as u32,
977                                    name: t["name"].as_str().unwrap_or("").to_string(),
978                                    created_at: t["created_at"].as_u64().unwrap_or(0),
979                                    retention,
980                                }
981                            })
982                            .collect()
983                    })
984                    .unwrap_or_default();
985
986                Ok(topics)
987            },
988            FrameType::Control(ControlCommand::ErrorResponse) => {
989                let error_msg = frame
990                    .payload
991                    .map(|p| String::from_utf8_lossy(&p).to_string())
992                    .unwrap_or_else(|| "Unknown error".to_string());
993                Err(ClientError::ServerError(error_msg))
994            },
995            other => Err(ClientError::InvalidResponse(format!(
996                "Expected TopicResponse, got {:?}",
997                other
998            ))),
999        }
1000    }
1001
1002    async fn recv_frame(&mut self) -> Result<Frame> {
1003        loop {
1004            if self.read_offset >= LWP_HEADER_SIZE {
1005                if let Some((frame, consumed)) = parse_frame(&self.read_buffer[..self.read_offset])?
1006                {
1007                    self.read_buffer.copy_within(consumed..self.read_offset, 0);
1008                    self.read_offset -= consumed;
1009                    return Ok(frame);
1010                }
1011            }
1012
1013            let n = tokio::time::timeout(
1014                self.config.read_timeout,
1015                self.stream.read(&mut self.read_buffer[self.read_offset..]),
1016            )
1017            .await
1018            .map_err(|_| ClientError::Timeout)??;
1019
1020            if n == 0 {
1021                return Err(ClientError::ConnectionClosed);
1022            }
1023
1024            self.read_offset += n;
1025        }
1026    }
1027
1028    pub fn config(&self) -> &ClientConfig {
1029        &self.config
1030    }
1031
1032    pub async fn close(mut self) -> Result<()> {
1033        self.stream.shutdown().await?;
1034        Ok(())
1035    }
1036}
1037
1038impl std::fmt::Debug for LanceClient {
1039    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1040        f.debug_struct("LanceClient")
1041            .field("addr", &self.config.addr)
1042            .field("batch_id", &self.batch_id.load(Ordering::SeqCst))
1043            .finish()
1044    }
1045}