Skip to main content

lnc_client/
client.rs

1use crate::error::{ClientError, Result};
2use crate::tls::TlsClientConfig;
3use bytes::Bytes;
4use lnc_network::{
5    ControlCommand, Frame, FrameType, LWP_HEADER_SIZE, TlsConnector, encode_frame, parse_frame,
6};
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
13use tokio::net::TcpStream;
14use tracing::{debug, trace, warn};
15
16/// Wrapper enum for TCP and TLS streams to avoid dynamic dispatch
17#[allow(clippy::large_enum_variant)]
18pub enum ClientStream {
19    /// Plain TCP connection
20    Tcp(TcpStream),
21    /// TLS-encrypted connection
22    Tls(tokio_rustls::client::TlsStream<TcpStream>),
23}
24
25impl AsyncRead for ClientStream {
26    fn poll_read(
27        self: Pin<&mut Self>,
28        cx: &mut Context<'_>,
29        buf: &mut ReadBuf<'_>,
30    ) -> Poll<std::io::Result<()>> {
31        match self.get_mut() {
32            ClientStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
33            ClientStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
34        }
35    }
36}
37
38impl AsyncWrite for ClientStream {
39    fn poll_write(
40        self: Pin<&mut Self>,
41        cx: &mut Context<'_>,
42        buf: &[u8],
43    ) -> Poll<std::io::Result<usize>> {
44        match self.get_mut() {
45            ClientStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
46            ClientStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
47        }
48    }
49
50    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
51        match self.get_mut() {
52            ClientStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
53            ClientStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
54        }
55    }
56
57    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
58        match self.get_mut() {
59            ClientStream::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
60            ClientStream::Tls(stream) => Pin::new(stream).poll_shutdown(cx),
61        }
62    }
63}
64
65/// Helper to extract error message from a frame payload
66fn extract_error_message(frame: &Frame) -> String {
67    frame
68        .payload
69        .as_ref()
70        .map(|p| String::from_utf8_lossy(p).to_string())
71        .unwrap_or_else(|| "Unknown error".to_string())
72}
73
74/// Helper to validate frame type and extract error if present
75#[allow(dead_code)] // Reserved for future protocol extensions
76fn expect_frame_type(frame: Frame, expected: ControlCommand, expected_name: &str) -> Result<Frame> {
77    match frame.frame_type {
78        FrameType::Control(cmd) if cmd == expected => Ok(frame),
79        FrameType::Control(ControlCommand::ErrorResponse) => {
80            Err(ClientError::ServerError(extract_error_message(&frame)))
81        },
82        other => Err(ClientError::InvalidResponse(format!(
83            "Expected {}, got {:?}",
84            expected_name, other
85        ))),
86    }
87}
88
89/// Helper to validate frame is a success response (TopicResponse or similar)
90fn expect_success_response(frame: Frame) -> Result<()> {
91    match frame.frame_type {
92        FrameType::Control(ControlCommand::TopicResponse) => Ok(()),
93        FrameType::Control(ControlCommand::ErrorResponse) => {
94            Err(ClientError::ServerError(extract_error_message(&frame)))
95        },
96        other => Err(ClientError::InvalidResponse(format!(
97            "Expected TopicResponse, got {:?}",
98            other
99        ))),
100    }
101}
102
103/// Authentication configuration for client connections
104#[derive(Debug, Clone, Default)]
105pub struct AuthConfig {
106    /// Whether mutual TLS (mTLS) authentication is enabled
107    pub mtls_enabled: bool,
108    /// Path to the client certificate file for mTLS
109    pub client_cert_path: Option<String>,
110    /// Path to the client private key file for mTLS
111    pub client_key_path: Option<String>,
112}
113
114/// Retention configuration for a topic
115#[derive(Debug, Clone, Default)]
116pub struct RetentionInfo {
117    /// Maximum age of messages in seconds (0 = no limit)
118    pub max_age_secs: u64,
119    /// Maximum size of topic data in bytes (0 = no limit)
120    pub max_bytes: u64,
121}
122
123/// Information about a topic
124#[derive(Debug, Clone)]
125pub struct TopicInfo {
126    /// Unique topic identifier
127    pub id: u32,
128    /// Topic name
129    pub name: String,
130    /// Unix timestamp when the topic was created
131    pub created_at: u64,
132    /// Retention policy configuration (None = no retention policy set)
133    pub retention: Option<RetentionInfo>,
134}
135
136/// Result of a fetch operation
137#[derive(Debug, Clone)]
138pub struct FetchResult {
139    /// Raw data fetched from the topic
140    pub data: Bytes,
141    /// Offset to use for the next fetch operation
142    pub next_offset: u64,
143    /// Number of bytes returned in this fetch
144    pub bytes_returned: u32,
145    /// Number of records in the fetched data
146    pub record_count: u32,
147}
148
149/// Result of a subscribe operation
150#[derive(Debug, Clone)]
151pub struct SubscribeResult {
152    /// Assigned consumer identifier
153    pub consumer_id: u64,
154    /// Starting offset for consumption
155    pub start_offset: u64,
156}
157
158/// Result of a commit offset operation
159#[derive(Debug, Clone)]
160pub struct CommitResult {
161    /// Consumer identifier that committed the offset
162    pub consumer_id: u64,
163    /// The offset that was successfully committed
164    pub committed_offset: u64,
165}
166
167/// Cluster status information
168#[derive(Debug, Clone)]
169pub struct ClusterStatus {
170    pub node_id: u16,
171    pub is_leader: bool,
172    pub leader_id: Option<u16>,
173    pub current_term: u64,
174    pub node_count: usize,
175    pub healthy_nodes: usize,
176    pub quorum_available: bool,
177    pub peer_states: std::collections::HashMap<u16, String>,
178}
179
180/// Configuration for the LANCE client
181#[derive(Debug, Clone)]
182pub struct ClientConfig {
183    /// Server address to connect to
184    pub addr: SocketAddr,
185    /// Timeout for establishing connections
186    pub connect_timeout: Duration,
187    /// Timeout for read operations
188    pub read_timeout: Duration,
189    /// Timeout for write operations
190    pub write_timeout: Duration,
191    /// Interval between keepalive messages
192    pub keepalive_interval: Duration,
193    /// Optional TLS configuration for encrypted connections
194    pub tls: Option<TlsClientConfig>,
195}
196
197impl Default for ClientConfig {
198    fn default() -> Self {
199        Self {
200            addr: "127.0.0.1:1992"
201                .parse()
202                .unwrap_or_else(|_| SocketAddr::from(([127, 0, 0, 1], 1992))),
203            connect_timeout: Duration::from_secs(10),
204            read_timeout: Duration::from_secs(30),
205            write_timeout: Duration::from_secs(10),
206            keepalive_interval: Duration::from_secs(10),
207            tls: None,
208        }
209    }
210}
211
212impl ClientConfig {
213    /// Create a new client configuration with the specified server address
214    pub fn new(addr: SocketAddr) -> Self {
215        Self {
216            addr,
217            ..Default::default()
218        }
219    }
220
221    /// Enable TLS with the provided configuration
222    pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
223        self.tls = Some(tls_config);
224        self
225    }
226
227    /// Check if TLS is enabled
228    pub fn is_tls_enabled(&self) -> bool {
229        self.tls.is_some()
230    }
231}
232
233/// LANCE protocol client for communicating with LANCE servers
234///
235/// Provides methods for ingesting data, managing topics, and consuming records.
236pub struct LanceClient {
237    stream: ClientStream,
238    config: ClientConfig,
239    batch_id: AtomicU64,
240    read_buffer: Vec<u8>,
241    read_offset: usize,
242}
243
244impl LanceClient {
245    /// Connect to LANCE server, automatically using TLS if configured
246    pub async fn connect(config: ClientConfig) -> Result<Self> {
247        // If TLS is configured in ClientConfig, use TLS connection
248        if let Some(ref tls_config) = config.tls {
249            return Self::connect_tls(config.clone(), tls_config.clone()).await;
250        }
251
252        debug!(addr = %config.addr, "Connecting to LANCE server");
253
254        let stream = tokio::time::timeout(config.connect_timeout, TcpStream::connect(config.addr))
255            .await
256            .map_err(|_| ClientError::Timeout)?
257            .map_err(ClientError::ConnectionFailed)?;
258
259        stream.set_nodelay(true)?;
260
261        debug!(addr = %config.addr, "Connected to LANCE server");
262
263        Ok(Self {
264            stream: ClientStream::Tcp(stream),
265            config,
266            batch_id: AtomicU64::new(0),
267            read_buffer: vec![0u8; 64 * 1024],
268            read_offset: 0,
269        })
270    }
271
272    /// Connect to LANCE server with TLS encryption
273    ///
274    /// # Arguments
275    /// * `config` - Client configuration with server address
276    /// * `tls_config` - TLS configuration including certificates
277    ///
278    /// # Example
279    /// ```rust,ignore
280    /// use lnc_client::{ClientConfig, TlsClientConfig, LanceClient};
281    ///
282    /// let config = ClientConfig::new("127.0.0.1:1992".parse().unwrap());
283    /// let tls = TlsClientConfig::new()
284    ///     .with_ca_cert("/path/to/ca.pem");
285    ///
286    /// let client = LanceClient::connect_tls(config, tls).await?;
287    /// ```
288    pub async fn connect_tls(config: ClientConfig, tls_config: TlsClientConfig) -> Result<Self> {
289        debug!(addr = %config.addr, "Connecting to LANCE server with TLS");
290
291        // First establish TCP connection
292        let tcp_stream =
293            tokio::time::timeout(config.connect_timeout, TcpStream::connect(config.addr))
294                .await
295                .map_err(|_| ClientError::Timeout)?
296                .map_err(ClientError::ConnectionFailed)?;
297
298        tcp_stream.set_nodelay(true)?;
299
300        // Create TLS connector
301        let network_config = tls_config.to_network_config();
302        let connector =
303            TlsConnector::new(network_config).map_err(|e| ClientError::TlsError(e.to_string()))?;
304
305        // Determine server name for SNI
306        let server_name = tls_config
307            .server_name
308            .unwrap_or_else(|| config.addr.ip().to_string());
309
310        // Perform TLS handshake
311        let tls_stream = connector
312            .connect(&server_name, tcp_stream)
313            .await
314            .map_err(|e| ClientError::TlsError(e.to_string()))?;
315
316        debug!(addr = %config.addr, "TLS connection established");
317
318        Ok(Self {
319            stream: ClientStream::Tls(tls_stream),
320            config,
321            batch_id: AtomicU64::new(0),
322            read_buffer: vec![0u8; 64 * 1024],
323            read_offset: 0,
324        })
325    }
326
327    /// Connect to a LANCE server using an address string
328    pub async fn connect_to(addr: &str) -> Result<Self> {
329        let socket_addr: SocketAddr = addr
330            .parse()
331            .map_err(|e| ClientError::ProtocolError(format!("Invalid address: {}", e)))?;
332        Self::connect(ClientConfig::new(socket_addr)).await
333    }
334
335    /// Connect to LANCE server with TLS using address string
336    pub async fn connect_tls_to(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
337        let socket_addr: SocketAddr = addr
338            .parse()
339            .map_err(|e| ClientError::ProtocolError(format!("Invalid address: {}", e)))?;
340        Self::connect_tls(ClientConfig::new(socket_addr), tls_config).await
341    }
342
343    fn next_batch_id(&self) -> u64 {
344        self.batch_id.fetch_add(1, Ordering::SeqCst) + 1
345    }
346
347    /// Send an ingest request to the default topic (topic 0)
348    pub async fn send_ingest(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
349        self.send_ingest_to_topic(0, payload, record_count, None)
350            .await
351    }
352
353    /// Send an ingest request to a specific topic
354    pub async fn send_ingest_to_topic(
355        &mut self,
356        topic_id: u32,
357        payload: Bytes,
358        record_count: u32,
359        _auth_config: Option<&AuthConfig>,
360    ) -> Result<u64> {
361        let batch_id = self.next_batch_id();
362        let timestamp_ns = std::time::SystemTime::now()
363            .duration_since(std::time::UNIX_EPOCH)
364            .map(|d| d.as_nanos() as u64)
365            .unwrap_or(0);
366
367        let frame =
368            Frame::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, topic_id);
369        let frame_bytes = encode_frame(&frame);
370
371        trace!(
372            batch_id,
373            topic_id,
374            payload_len = frame.payload_length(),
375            "Sending ingest frame"
376        );
377
378        tokio::time::timeout(
379            self.config.write_timeout,
380            self.stream.write_all(&frame_bytes),
381        )
382        .await
383        .map_err(|_| ClientError::Timeout)??;
384
385        Ok(batch_id)
386    }
387
388    /// Send an ingest request and wait for acknowledgment (default topic)
389    pub async fn send_ingest_sync(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
390        self.send_ingest_to_topic_sync(0, payload, record_count, None)
391            .await
392    }
393
394    /// Send an ingest request to a specific topic and wait for acknowledgment
395    pub async fn send_ingest_to_topic_sync(
396        &mut self,
397        topic_id: u32,
398        payload: Bytes,
399        record_count: u32,
400        auth_config: Option<&AuthConfig>,
401    ) -> Result<u64> {
402        let batch_id = self
403            .send_ingest_to_topic(topic_id, payload, record_count, auth_config)
404            .await?;
405        self.wait_for_ack(batch_id).await
406    }
407
408    async fn wait_for_ack(&mut self, expected_batch_id: u64) -> Result<u64> {
409        let frame = self.recv_frame().await?;
410
411        match frame.frame_type {
412            FrameType::Ack => {
413                let acked_id = frame.batch_id();
414                if acked_id != expected_batch_id {
415                    return Err(ClientError::InvalidResponse(format!(
416                        "Ack batch_id mismatch: sent {}, received {}",
417                        expected_batch_id, acked_id
418                    )));
419                }
420                trace!(batch_id = acked_id, "Received ack");
421                Ok(acked_id)
422            },
423            FrameType::Control(ControlCommand::ErrorResponse) => {
424                let error_msg = frame
425                    .payload
426                    .map(|p| String::from_utf8_lossy(&p).to_string())
427                    .unwrap_or_else(|| "Unknown error".to_string());
428                Err(ClientError::ServerError(error_msg))
429            },
430            FrameType::Backpressure => {
431                warn!("Server signaled backpressure");
432                Err(ClientError::ServerBackpressure)
433            },
434            other => Err(ClientError::InvalidResponse(format!(
435                "Expected Ack, got {:?}",
436                other
437            ))),
438        }
439    }
440
441    /// Receive an acknowledgment for a previously sent ingest request
442    pub async fn recv_ack(&mut self) -> Result<u64> {
443        let frame = self.recv_frame().await?;
444
445        match frame.frame_type {
446            FrameType::Ack => {
447                trace!(batch_id = frame.batch_id(), "Received ack");
448                Ok(frame.batch_id())
449            },
450            FrameType::Backpressure => {
451                warn!("Server signaled backpressure");
452                Err(ClientError::ServerBackpressure)
453            },
454            other => Err(ClientError::InvalidResponse(format!(
455                "Expected Ack, got {:?}",
456                other
457            ))),
458        }
459    }
460
461    /// Send a keepalive message to maintain the connection
462    pub async fn send_keepalive(&mut self) -> Result<()> {
463        let frame = Frame::new_keepalive();
464        let frame_bytes = encode_frame(&frame);
465
466        trace!("Sending keepalive");
467
468        tokio::time::timeout(
469            self.config.write_timeout,
470            self.stream.write_all(&frame_bytes),
471        )
472        .await
473        .map_err(|_| ClientError::Timeout)??;
474
475        Ok(())
476    }
477
478    /// Receive a keepalive response from the server
479    pub async fn recv_keepalive(&mut self) -> Result<()> {
480        let frame = self.recv_frame().await?;
481
482        match frame.frame_type {
483            FrameType::Keepalive => {
484                trace!("Received keepalive response");
485                Ok(())
486            },
487            other => Err(ClientError::InvalidResponse(format!(
488                "Expected Keepalive, got {:?}",
489                other
490            ))),
491        }
492    }
493
494    /// Ping the server and measure round-trip latency
495    pub async fn ping(&mut self) -> Result<Duration> {
496        let start = std::time::Instant::now();
497        self.send_keepalive().await?;
498        self.recv_keepalive().await?;
499        Ok(start.elapsed())
500    }
501
502    /// Create a new topic with the given name
503    pub async fn create_topic(&mut self, name: &str) -> Result<TopicInfo> {
504        let frame = Frame::new_create_topic(name);
505        let frame_bytes = encode_frame(&frame);
506
507        trace!(topic_name = %name, "Creating topic");
508
509        tokio::time::timeout(
510            self.config.write_timeout,
511            self.stream.write_all(&frame_bytes),
512        )
513        .await
514        .map_err(|_| ClientError::Timeout)??;
515
516        let response = self.recv_frame().await?;
517        self.parse_topic_response(response)
518    }
519
520    /// List all topics on the server
521    pub async fn list_topics(&mut self) -> Result<Vec<TopicInfo>> {
522        let frame = Frame::new_list_topics();
523        let frame_bytes = encode_frame(&frame);
524
525        trace!("Listing topics");
526
527        tokio::time::timeout(
528            self.config.write_timeout,
529            self.stream.write_all(&frame_bytes),
530        )
531        .await
532        .map_err(|_| ClientError::Timeout)??;
533
534        let response = self.recv_frame().await?;
535        self.parse_topic_list_response(response)
536    }
537
538    /// Get information about a specific topic
539    pub async fn get_topic(&mut self, topic_id: u32) -> Result<TopicInfo> {
540        let frame = Frame::new_get_topic(topic_id);
541        let frame_bytes = encode_frame(&frame);
542
543        trace!(topic_id, "Getting topic");
544
545        tokio::time::timeout(
546            self.config.write_timeout,
547            self.stream.write_all(&frame_bytes),
548        )
549        .await
550        .map_err(|_| ClientError::Timeout)??;
551
552        let response = self.recv_frame().await?;
553        self.parse_topic_response(response)
554    }
555
556    /// Delete a topic by its ID
557    pub async fn delete_topic(&mut self, topic_id: u32) -> Result<()> {
558        let frame = Frame::new_delete_topic(topic_id);
559        let frame_bytes = encode_frame(&frame);
560
561        trace!(topic_id, "Deleting topic");
562
563        tokio::time::timeout(
564            self.config.write_timeout,
565            self.stream.write_all(&frame_bytes),
566        )
567        .await
568        .map_err(|_| ClientError::Timeout)??;
569
570        let response = self.recv_frame().await?;
571        self.parse_delete_response(response)
572    }
573
574    /// Set retention policy for an existing topic
575    ///
576    /// # Arguments
577    /// * `topic_id` - Topic identifier
578    /// * `max_age_secs` - Maximum age in seconds (0 = no limit)
579    /// * `max_bytes` - Maximum size in bytes (0 = no limit)
580    pub async fn set_retention(
581        &mut self,
582        topic_id: u32,
583        max_age_secs: u64,
584        max_bytes: u64,
585    ) -> Result<()> {
586        let frame = Frame::new_set_retention(topic_id, max_age_secs, max_bytes);
587        let frame_bytes = encode_frame(&frame);
588
589        trace!(
590            topic_id,
591            max_age_secs, max_bytes, "Setting retention policy"
592        );
593
594        tokio::time::timeout(
595            self.config.write_timeout,
596            self.stream.write_all(&frame_bytes),
597        )
598        .await
599        .map_err(|_| ClientError::Timeout)??;
600
601        let response = self.recv_frame().await?;
602        self.parse_retention_response(response)
603    }
604
605    /// Create a topic with retention policy in a single operation
606    ///
607    /// # Arguments
608    /// * `name` - Topic name
609    /// * `max_age_secs` - Maximum age in seconds (0 = no limit)
610    /// * `max_bytes` - Maximum size in bytes (0 = no limit)
611    pub async fn create_topic_with_retention(
612        &mut self,
613        name: &str,
614        max_age_secs: u64,
615        max_bytes: u64,
616    ) -> Result<TopicInfo> {
617        let frame = Frame::new_create_topic_with_retention(name, max_age_secs, max_bytes);
618        let frame_bytes = encode_frame(&frame);
619
620        trace!(
621            name,
622            max_age_secs, max_bytes, "Creating topic with retention"
623        );
624
625        tokio::time::timeout(
626            self.config.write_timeout,
627            self.stream.write_all(&frame_bytes),
628        )
629        .await
630        .map_err(|_| ClientError::Timeout)??;
631
632        let response = self.recv_frame().await?;
633        self.parse_topic_response(response)
634    }
635
636    /// Get cluster status and health information
637    pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus> {
638        let frame = Frame::new_get_cluster_status();
639        let frame_bytes = encode_frame(&frame);
640
641        tokio::time::timeout(
642            self.config.write_timeout,
643            self.stream.write_all(&frame_bytes),
644        )
645        .await
646        .map_err(|_| ClientError::Timeout)??;
647
648        let response = self.recv_frame().await?;
649        self.parse_cluster_status_response(response)
650    }
651
652    fn parse_cluster_status_response(&self, frame: Frame) -> Result<ClusterStatus> {
653        match frame.frame_type {
654            FrameType::Control(ControlCommand::ClusterStatusResponse) => {
655                let payload = frame.payload.ok_or_else(|| {
656                    ClientError::InvalidResponse("Empty cluster status response".to_string())
657                })?;
658                let json: serde_json::Value = serde_json::from_slice(&payload)
659                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
660
661                let peer_states: std::collections::HashMap<u16, String> = json["peer_states"]
662                    .as_object()
663                    .map(|obj| {
664                        obj.iter()
665                            .filter_map(|(k, v)| {
666                                k.parse::<u16>()
667                                    .ok()
668                                    .map(|id| (id, v.as_str().unwrap_or("unknown").to_string()))
669                            })
670                            .collect()
671                    })
672                    .unwrap_or_default();
673
674                Ok(ClusterStatus {
675                    node_id: json["node_id"].as_u64().unwrap_or(0) as u16,
676                    is_leader: json["is_leader"].as_bool().unwrap_or(false),
677                    leader_id: json["leader_id"].as_u64().map(|id| id as u16),
678                    current_term: json["current_term"].as_u64().unwrap_or(0),
679                    node_count: json["node_count"].as_u64().unwrap_or(1) as usize,
680                    healthy_nodes: json["healthy_nodes"].as_u64().unwrap_or(1) as usize,
681                    quorum_available: json["quorum_available"].as_bool().unwrap_or(true),
682                    peer_states,
683                })
684            },
685            FrameType::Control(ControlCommand::ErrorResponse) => {
686                let error_msg = frame
687                    .payload
688                    .map(|p| String::from_utf8_lossy(&p).to_string())
689                    .unwrap_or_else(|| "Unknown error".to_string());
690                Err(ClientError::ServerError(error_msg))
691            },
692            other => Err(ClientError::InvalidResponse(format!(
693                "Expected ClusterStatusResponse, got {:?}",
694                other
695            ))),
696        }
697    }
698
699    /// Fetch data from a topic starting at the given offset
700    /// Returns (data, next_offset, record_count)
701    pub async fn fetch(
702        &mut self,
703        topic_id: u32,
704        start_offset: u64,
705        max_bytes: u32,
706    ) -> Result<FetchResult> {
707        let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
708        let frame_bytes = encode_frame(&frame);
709
710        trace!(topic_id, start_offset, max_bytes, "Fetching data");
711
712        tokio::time::timeout(
713            self.config.write_timeout,
714            self.stream.write_all(&frame_bytes),
715        )
716        .await
717        .map_err(|_| ClientError::Timeout)??;
718
719        let response = self.recv_frame().await?;
720        self.parse_fetch_response(response)
721    }
722
723    /// Subscribe to a topic for streaming data
724    /// Returns the consumer ID and starting offset
725    pub async fn subscribe(
726        &mut self,
727        topic_id: u32,
728        start_offset: u64,
729        max_batch_bytes: u32,
730        consumer_id: u64,
731    ) -> Result<SubscribeResult> {
732        let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
733        let frame_bytes = encode_frame(&frame);
734
735        trace!(topic_id, start_offset, consumer_id, "Subscribing to topic");
736
737        tokio::time::timeout(
738            self.config.write_timeout,
739            self.stream.write_all(&frame_bytes),
740        )
741        .await
742        .map_err(|_| ClientError::Timeout)??;
743
744        let response = self.recv_frame().await?;
745        self.parse_subscribe_response(response)
746    }
747
748    /// Unsubscribe from a topic
749    pub async fn unsubscribe(&mut self, topic_id: u32, consumer_id: u64) -> Result<()> {
750        let frame = Frame::new_unsubscribe(topic_id, consumer_id);
751        let frame_bytes = encode_frame(&frame);
752
753        trace!(topic_id, consumer_id, "Unsubscribing from topic");
754
755        tokio::time::timeout(
756            self.config.write_timeout,
757            self.stream.write_all(&frame_bytes),
758        )
759        .await
760        .map_err(|_| ClientError::Timeout)??;
761
762        // Wait for ack or error
763        let response = self.recv_frame().await?;
764        match response.frame_type {
765            FrameType::Ack => Ok(()),
766            FrameType::Control(ControlCommand::ErrorResponse) => {
767                let error_msg = response
768                    .payload
769                    .map(|p| String::from_utf8_lossy(&p).to_string())
770                    .unwrap_or_else(|| "Unknown error".to_string());
771                Err(ClientError::ServerError(error_msg))
772            },
773            other => Err(ClientError::InvalidResponse(format!(
774                "Expected Ack, got {:?}",
775                other
776            ))),
777        }
778    }
779
780    /// Commit consumer offset for checkpointing
781    pub async fn commit_offset(
782        &mut self,
783        topic_id: u32,
784        consumer_id: u64,
785        offset: u64,
786    ) -> Result<CommitResult> {
787        let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
788        let frame_bytes = encode_frame(&frame);
789
790        trace!(topic_id, consumer_id, offset, "Committing offset");
791
792        tokio::time::timeout(
793            self.config.write_timeout,
794            self.stream.write_all(&frame_bytes),
795        )
796        .await
797        .map_err(|_| ClientError::Timeout)??;
798
799        let response = self.recv_frame().await?;
800        self.parse_commit_response(response)
801    }
802
803    fn parse_subscribe_response(&self, frame: Frame) -> Result<SubscribeResult> {
804        match frame.frame_type {
805            FrameType::Control(ControlCommand::SubscribeAck) => {
806                let payload = frame.payload.ok_or_else(|| {
807                    ClientError::InvalidResponse("Empty subscribe response".to_string())
808                })?;
809
810                if payload.len() < 16 {
811                    return Err(ClientError::ProtocolError(
812                        "Subscribe response too small".to_string(),
813                    ));
814                }
815
816                let consumer_id = u64::from_le_bytes([
817                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
818                    payload[6], payload[7],
819                ]);
820                let start_offset = u64::from_le_bytes([
821                    payload[8],
822                    payload[9],
823                    payload[10],
824                    payload[11],
825                    payload[12],
826                    payload[13],
827                    payload[14],
828                    payload[15],
829                ]);
830
831                Ok(SubscribeResult {
832                    consumer_id,
833                    start_offset,
834                })
835            },
836            FrameType::Control(ControlCommand::ErrorResponse) => {
837                let error_msg = frame
838                    .payload
839                    .map(|p| String::from_utf8_lossy(&p).to_string())
840                    .unwrap_or_else(|| "Unknown error".to_string());
841                Err(ClientError::ServerError(error_msg))
842            },
843            other => Err(ClientError::InvalidResponse(format!(
844                "Expected SubscribeAck, got {:?}",
845                other
846            ))),
847        }
848    }
849
850    fn parse_commit_response(&self, frame: Frame) -> Result<CommitResult> {
851        match frame.frame_type {
852            FrameType::Control(ControlCommand::CommitAck) => {
853                let payload = frame.payload.ok_or_else(|| {
854                    ClientError::InvalidResponse("Empty commit response".to_string())
855                })?;
856
857                if payload.len() < 16 {
858                    return Err(ClientError::ProtocolError(
859                        "Commit response too small".to_string(),
860                    ));
861                }
862
863                let consumer_id = u64::from_le_bytes([
864                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
865                    payload[6], payload[7],
866                ]);
867                let committed_offset = u64::from_le_bytes([
868                    payload[8],
869                    payload[9],
870                    payload[10],
871                    payload[11],
872                    payload[12],
873                    payload[13],
874                    payload[14],
875                    payload[15],
876                ]);
877
878                Ok(CommitResult {
879                    consumer_id,
880                    committed_offset,
881                })
882            },
883            FrameType::Control(ControlCommand::ErrorResponse) => {
884                let error_msg = frame
885                    .payload
886                    .map(|p| String::from_utf8_lossy(&p).to_string())
887                    .unwrap_or_else(|| "Unknown error".to_string());
888                Err(ClientError::ServerError(error_msg))
889            },
890            other => Err(ClientError::InvalidResponse(format!(
891                "Expected CommitAck, got {:?}",
892                other
893            ))),
894        }
895    }
896
897    fn parse_fetch_response(&self, frame: Frame) -> Result<FetchResult> {
898        match frame.frame_type {
899            FrameType::Control(ControlCommand::FetchResponse) => {
900                let payload = frame.payload.ok_or_else(|| {
901                    ClientError::InvalidResponse("Empty fetch response".to_string())
902                })?;
903
904                if payload.len() < 16 {
905                    return Err(ClientError::ProtocolError(
906                        "Fetch response too small".to_string(),
907                    ));
908                }
909
910                let next_offset = u64::from_le_bytes([
911                    payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
912                    payload[6], payload[7],
913                ]);
914                let bytes_returned =
915                    u32::from_le_bytes([payload[8], payload[9], payload[10], payload[11]]);
916                let record_count =
917                    u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
918                let data = payload.slice(16..);
919
920                Ok(FetchResult {
921                    data,
922                    next_offset,
923                    bytes_returned,
924                    record_count,
925                })
926            },
927            FrameType::Control(ControlCommand::ErrorResponse) => {
928                let error_msg = frame
929                    .payload
930                    .map(|p| String::from_utf8_lossy(&p).to_string())
931                    .unwrap_or_else(|| "Unknown error".to_string());
932                Err(ClientError::ServerError(error_msg))
933            },
934            other => Err(ClientError::InvalidResponse(format!(
935                "Expected FetchResponse, got {:?}",
936                other
937            ))),
938        }
939    }
940
941    fn parse_delete_response(&self, frame: Frame) -> Result<()> {
942        expect_success_response(frame)
943    }
944
945    fn parse_retention_response(&self, frame: Frame) -> Result<()> {
946        expect_success_response(frame)
947    }
948
949    fn parse_topic_response(&self, frame: Frame) -> Result<TopicInfo> {
950        match frame.frame_type {
951            FrameType::Control(ControlCommand::TopicResponse) => {
952                let payload = frame.payload.ok_or_else(|| {
953                    ClientError::InvalidResponse("Empty topic response".to_string())
954                })?;
955                let json: serde_json::Value = serde_json::from_slice(&payload)
956                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
957
958                let retention = if json.get("retention").is_some() {
959                    Some(RetentionInfo {
960                        max_age_secs: json["retention"]["max_age_secs"].as_u64().unwrap_or(0),
961                        max_bytes: json["retention"]["max_bytes"].as_u64().unwrap_or(0),
962                    })
963                } else {
964                    None
965                };
966
967                Ok(TopicInfo {
968                    id: json["id"].as_u64().unwrap_or(0) as u32,
969                    name: json["name"].as_str().unwrap_or("").to_string(),
970                    created_at: json["created_at"].as_u64().unwrap_or(0),
971                    retention,
972                })
973            },
974            FrameType::Control(ControlCommand::ErrorResponse) => {
975                let error_msg = frame
976                    .payload
977                    .map(|p| String::from_utf8_lossy(&p).to_string())
978                    .unwrap_or_else(|| "Unknown error".to_string());
979                Err(ClientError::ServerError(error_msg))
980            },
981            other => Err(ClientError::InvalidResponse(format!(
982                "Expected TopicResponse, got {:?}",
983                other
984            ))),
985        }
986    }
987
988    fn parse_topic_list_response(&self, frame: Frame) -> Result<Vec<TopicInfo>> {
989        match frame.frame_type {
990            FrameType::Control(ControlCommand::TopicResponse) => {
991                let payload = frame.payload.ok_or_else(|| {
992                    ClientError::InvalidResponse("Empty topic list response".to_string())
993                })?;
994                let json: serde_json::Value = serde_json::from_slice(&payload)
995                    .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
996
997                let topics = json["topics"]
998                    .as_array()
999                    .map(|arr| {
1000                        arr.iter()
1001                            .map(|t| {
1002                                let retention = if t.get("retention").is_some() {
1003                                    Some(RetentionInfo {
1004                                        max_age_secs: t["retention"]["max_age_secs"]
1005                                            .as_u64()
1006                                            .unwrap_or(0),
1007                                        max_bytes: t["retention"]["max_bytes"]
1008                                            .as_u64()
1009                                            .unwrap_or(0),
1010                                    })
1011                                } else {
1012                                    None
1013                                };
1014                                TopicInfo {
1015                                    id: t["id"].as_u64().unwrap_or(0) as u32,
1016                                    name: t["name"].as_str().unwrap_or("").to_string(),
1017                                    created_at: t["created_at"].as_u64().unwrap_or(0),
1018                                    retention,
1019                                }
1020                            })
1021                            .collect()
1022                    })
1023                    .unwrap_or_default();
1024
1025                Ok(topics)
1026            },
1027            FrameType::Control(ControlCommand::ErrorResponse) => {
1028                let error_msg = frame
1029                    .payload
1030                    .map(|p| String::from_utf8_lossy(&p).to_string())
1031                    .unwrap_or_else(|| "Unknown error".to_string());
1032                Err(ClientError::ServerError(error_msg))
1033            },
1034            other => Err(ClientError::InvalidResponse(format!(
1035                "Expected TopicResponse, got {:?}",
1036                other
1037            ))),
1038        }
1039    }
1040
1041    async fn recv_frame(&mut self) -> Result<Frame> {
1042        loop {
1043            if self.read_offset >= LWP_HEADER_SIZE {
1044                if let Some((frame, consumed)) = parse_frame(&self.read_buffer[..self.read_offset])?
1045                {
1046                    self.read_buffer.copy_within(consumed..self.read_offset, 0);
1047                    self.read_offset -= consumed;
1048                    return Ok(frame);
1049                }
1050            }
1051
1052            let n = tokio::time::timeout(
1053                self.config.read_timeout,
1054                self.stream.read(&mut self.read_buffer[self.read_offset..]),
1055            )
1056            .await
1057            .map_err(|_| ClientError::Timeout)??;
1058
1059            if n == 0 {
1060                return Err(ClientError::ConnectionClosed);
1061            }
1062
1063            self.read_offset += n;
1064        }
1065    }
1066
1067    /// Get a reference to the client configuration
1068    pub fn config(&self) -> &ClientConfig {
1069        &self.config
1070    }
1071
1072    /// Close the client connection
1073    pub async fn close(mut self) -> Result<()> {
1074        self.stream.shutdown().await?;
1075        Ok(())
1076    }
1077}
1078
1079impl std::fmt::Debug for LanceClient {
1080    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1081        f.debug_struct("LanceClient")
1082            .field("addr", &self.config.addr)
1083            .field("batch_id", &self.batch_id.load(Ordering::SeqCst))
1084            .finish()
1085    }
1086}