1use crate::error::{ClientError, Result};
2use crate::tls::TlsClientConfig;
3use bytes::Bytes;
4use lnc_network::{
5 ControlCommand, Frame, FrameType, LWP_HEADER_SIZE, TlsConnector, encode_frame, parse_frame,
6};
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
13use tokio::net::TcpStream;
14use tracing::{debug, trace, warn};
15
16#[allow(clippy::large_enum_variant)]
18pub enum ClientStream {
19 Tcp(TcpStream),
21 Tls(tokio_rustls::client::TlsStream<TcpStream>),
23}
24
25impl AsyncRead for ClientStream {
26 fn poll_read(
27 self: Pin<&mut Self>,
28 cx: &mut Context<'_>,
29 buf: &mut ReadBuf<'_>,
30 ) -> Poll<std::io::Result<()>> {
31 match self.get_mut() {
32 ClientStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
33 ClientStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
34 }
35 }
36}
37
38impl AsyncWrite for ClientStream {
39 fn poll_write(
40 self: Pin<&mut Self>,
41 cx: &mut Context<'_>,
42 buf: &[u8],
43 ) -> Poll<std::io::Result<usize>> {
44 match self.get_mut() {
45 ClientStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
46 ClientStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
47 }
48 }
49
50 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
51 match self.get_mut() {
52 ClientStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
53 ClientStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
54 }
55 }
56
57 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
58 match self.get_mut() {
59 ClientStream::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
60 ClientStream::Tls(stream) => Pin::new(stream).poll_shutdown(cx),
61 }
62 }
63}
64
65fn extract_error_message(frame: &Frame) -> String {
67 frame
68 .payload
69 .as_ref()
70 .map(|p| String::from_utf8_lossy(p).to_string())
71 .unwrap_or_else(|| "Unknown error".to_string())
72}
73
74#[allow(dead_code)] fn expect_frame_type(frame: Frame, expected: ControlCommand, expected_name: &str) -> Result<Frame> {
77 match frame.frame_type {
78 FrameType::Control(cmd) if cmd == expected => Ok(frame),
79 FrameType::Control(ControlCommand::ErrorResponse) => {
80 Err(ClientError::ServerError(extract_error_message(&frame)))
81 },
82 other => Err(ClientError::InvalidResponse(format!(
83 "Expected {}, got {:?}",
84 expected_name, other
85 ))),
86 }
87}
88
89fn expect_success_response(frame: Frame) -> Result<()> {
91 match frame.frame_type {
92 FrameType::Control(ControlCommand::TopicResponse) => Ok(()),
93 FrameType::Control(ControlCommand::ErrorResponse) => {
94 Err(ClientError::ServerError(extract_error_message(&frame)))
95 },
96 other => Err(ClientError::InvalidResponse(format!(
97 "Expected TopicResponse, got {:?}",
98 other
99 ))),
100 }
101}
102
103#[derive(Debug, Clone, Default)]
105pub struct AuthConfig {
106 pub mtls_enabled: bool,
108 pub client_cert_path: Option<String>,
110 pub client_key_path: Option<String>,
112}
113
114#[derive(Debug, Clone, Default)]
116pub struct RetentionInfo {
117 pub max_age_secs: u64,
119 pub max_bytes: u64,
121}
122
123#[derive(Debug, Clone)]
125pub struct TopicInfo {
126 pub id: u32,
128 pub name: String,
130 pub created_at: u64,
132 pub retention: Option<RetentionInfo>,
134}
135
136#[derive(Debug, Clone)]
138pub struct FetchResult {
139 pub data: Bytes,
141 pub next_offset: u64,
143 pub bytes_returned: u32,
145 pub record_count: u32,
147}
148
149#[derive(Debug, Clone)]
151pub struct SubscribeResult {
152 pub consumer_id: u64,
154 pub start_offset: u64,
156}
157
158#[derive(Debug, Clone)]
160pub struct CommitResult {
161 pub consumer_id: u64,
163 pub committed_offset: u64,
165}
166
167#[derive(Debug, Clone)]
169pub struct ClusterStatus {
170 pub node_id: u16,
171 pub is_leader: bool,
172 pub leader_id: Option<u16>,
173 pub current_term: u64,
174 pub node_count: usize,
175 pub healthy_nodes: usize,
176 pub quorum_available: bool,
177 pub peer_states: std::collections::HashMap<u16, String>,
178}
179
180#[derive(Debug, Clone)]
182pub struct ClientConfig {
183 pub addr: SocketAddr,
185 pub connect_timeout: Duration,
187 pub read_timeout: Duration,
189 pub write_timeout: Duration,
191 pub keepalive_interval: Duration,
193 pub tls: Option<TlsClientConfig>,
195}
196
197impl Default for ClientConfig {
198 fn default() -> Self {
199 Self {
200 addr: "127.0.0.1:1992"
201 .parse()
202 .unwrap_or_else(|_| SocketAddr::from(([127, 0, 0, 1], 1992))),
203 connect_timeout: Duration::from_secs(10),
204 read_timeout: Duration::from_secs(30),
205 write_timeout: Duration::from_secs(10),
206 keepalive_interval: Duration::from_secs(10),
207 tls: None,
208 }
209 }
210}
211
212impl ClientConfig {
213 pub fn new(addr: SocketAddr) -> Self {
215 Self {
216 addr,
217 ..Default::default()
218 }
219 }
220
221 pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
223 self.tls = Some(tls_config);
224 self
225 }
226
227 pub fn is_tls_enabled(&self) -> bool {
229 self.tls.is_some()
230 }
231}
232
233pub struct LanceClient {
237 stream: ClientStream,
238 config: ClientConfig,
239 batch_id: AtomicU64,
240 read_buffer: Vec<u8>,
241 read_offset: usize,
242}
243
244impl LanceClient {
245 pub async fn connect(config: ClientConfig) -> Result<Self> {
247 if let Some(ref tls_config) = config.tls {
249 return Self::connect_tls(config.clone(), tls_config.clone()).await;
250 }
251
252 debug!(addr = %config.addr, "Connecting to LANCE server");
253
254 let stream = tokio::time::timeout(config.connect_timeout, TcpStream::connect(config.addr))
255 .await
256 .map_err(|_| ClientError::Timeout)?
257 .map_err(ClientError::ConnectionFailed)?;
258
259 stream.set_nodelay(true)?;
260
261 debug!(addr = %config.addr, "Connected to LANCE server");
262
263 Ok(Self {
264 stream: ClientStream::Tcp(stream),
265 config,
266 batch_id: AtomicU64::new(0),
267 read_buffer: vec![0u8; 64 * 1024],
268 read_offset: 0,
269 })
270 }
271
272 pub async fn connect_tls(config: ClientConfig, tls_config: TlsClientConfig) -> Result<Self> {
289 debug!(addr = %config.addr, "Connecting to LANCE server with TLS");
290
291 let tcp_stream =
293 tokio::time::timeout(config.connect_timeout, TcpStream::connect(config.addr))
294 .await
295 .map_err(|_| ClientError::Timeout)?
296 .map_err(ClientError::ConnectionFailed)?;
297
298 tcp_stream.set_nodelay(true)?;
299
300 let network_config = tls_config.to_network_config();
302 let connector =
303 TlsConnector::new(network_config).map_err(|e| ClientError::TlsError(e.to_string()))?;
304
305 let server_name = tls_config
307 .server_name
308 .unwrap_or_else(|| config.addr.ip().to_string());
309
310 let tls_stream = connector
312 .connect(&server_name, tcp_stream)
313 .await
314 .map_err(|e| ClientError::TlsError(e.to_string()))?;
315
316 debug!(addr = %config.addr, "TLS connection established");
317
318 Ok(Self {
319 stream: ClientStream::Tls(tls_stream),
320 config,
321 batch_id: AtomicU64::new(0),
322 read_buffer: vec![0u8; 64 * 1024],
323 read_offset: 0,
324 })
325 }
326
327 pub async fn connect_to(addr: &str) -> Result<Self> {
329 let socket_addr: SocketAddr = addr
330 .parse()
331 .map_err(|e| ClientError::ProtocolError(format!("Invalid address: {}", e)))?;
332 Self::connect(ClientConfig::new(socket_addr)).await
333 }
334
335 pub async fn connect_tls_to(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
337 let socket_addr: SocketAddr = addr
338 .parse()
339 .map_err(|e| ClientError::ProtocolError(format!("Invalid address: {}", e)))?;
340 Self::connect_tls(ClientConfig::new(socket_addr), tls_config).await
341 }
342
343 fn next_batch_id(&self) -> u64 {
344 self.batch_id.fetch_add(1, Ordering::SeqCst) + 1
345 }
346
347 pub async fn send_ingest(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
349 self.send_ingest_to_topic(0, payload, record_count, None)
350 .await
351 }
352
353 pub async fn send_ingest_to_topic(
355 &mut self,
356 topic_id: u32,
357 payload: Bytes,
358 record_count: u32,
359 _auth_config: Option<&AuthConfig>,
360 ) -> Result<u64> {
361 let batch_id = self.next_batch_id();
362 let timestamp_ns = std::time::SystemTime::now()
363 .duration_since(std::time::UNIX_EPOCH)
364 .map(|d| d.as_nanos() as u64)
365 .unwrap_or(0);
366
367 let frame =
368 Frame::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, topic_id);
369 let frame_bytes = encode_frame(&frame);
370
371 trace!(
372 batch_id,
373 topic_id,
374 payload_len = frame.payload_length(),
375 "Sending ingest frame"
376 );
377
378 tokio::time::timeout(
379 self.config.write_timeout,
380 self.stream.write_all(&frame_bytes),
381 )
382 .await
383 .map_err(|_| ClientError::Timeout)??;
384
385 Ok(batch_id)
386 }
387
388 pub async fn send_ingest_sync(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
390 self.send_ingest_to_topic_sync(0, payload, record_count, None)
391 .await
392 }
393
394 pub async fn send_ingest_to_topic_sync(
396 &mut self,
397 topic_id: u32,
398 payload: Bytes,
399 record_count: u32,
400 auth_config: Option<&AuthConfig>,
401 ) -> Result<u64> {
402 let batch_id = self
403 .send_ingest_to_topic(topic_id, payload, record_count, auth_config)
404 .await?;
405 self.wait_for_ack(batch_id).await
406 }
407
408 async fn wait_for_ack(&mut self, expected_batch_id: u64) -> Result<u64> {
409 let frame = self.recv_frame().await?;
410
411 match frame.frame_type {
412 FrameType::Ack => {
413 let acked_id = frame.batch_id();
414 if acked_id != expected_batch_id {
415 return Err(ClientError::InvalidResponse(format!(
416 "Ack batch_id mismatch: sent {}, received {}",
417 expected_batch_id, acked_id
418 )));
419 }
420 trace!(batch_id = acked_id, "Received ack");
421 Ok(acked_id)
422 },
423 FrameType::Control(ControlCommand::ErrorResponse) => {
424 let error_msg = frame
425 .payload
426 .map(|p| String::from_utf8_lossy(&p).to_string())
427 .unwrap_or_else(|| "Unknown error".to_string());
428 Err(ClientError::ServerError(error_msg))
429 },
430 FrameType::Backpressure => {
431 warn!("Server signaled backpressure");
432 Err(ClientError::ServerBackpressure)
433 },
434 other => Err(ClientError::InvalidResponse(format!(
435 "Expected Ack, got {:?}",
436 other
437 ))),
438 }
439 }
440
441 pub async fn recv_ack(&mut self) -> Result<u64> {
443 let frame = self.recv_frame().await?;
444
445 match frame.frame_type {
446 FrameType::Ack => {
447 trace!(batch_id = frame.batch_id(), "Received ack");
448 Ok(frame.batch_id())
449 },
450 FrameType::Backpressure => {
451 warn!("Server signaled backpressure");
452 Err(ClientError::ServerBackpressure)
453 },
454 other => Err(ClientError::InvalidResponse(format!(
455 "Expected Ack, got {:?}",
456 other
457 ))),
458 }
459 }
460
461 pub async fn send_keepalive(&mut self) -> Result<()> {
463 let frame = Frame::new_keepalive();
464 let frame_bytes = encode_frame(&frame);
465
466 trace!("Sending keepalive");
467
468 tokio::time::timeout(
469 self.config.write_timeout,
470 self.stream.write_all(&frame_bytes),
471 )
472 .await
473 .map_err(|_| ClientError::Timeout)??;
474
475 Ok(())
476 }
477
478 pub async fn recv_keepalive(&mut self) -> Result<()> {
480 let frame = self.recv_frame().await?;
481
482 match frame.frame_type {
483 FrameType::Keepalive => {
484 trace!("Received keepalive response");
485 Ok(())
486 },
487 other => Err(ClientError::InvalidResponse(format!(
488 "Expected Keepalive, got {:?}",
489 other
490 ))),
491 }
492 }
493
494 pub async fn ping(&mut self) -> Result<Duration> {
496 let start = std::time::Instant::now();
497 self.send_keepalive().await?;
498 self.recv_keepalive().await?;
499 Ok(start.elapsed())
500 }
501
502 pub async fn create_topic(&mut self, name: &str) -> Result<TopicInfo> {
504 let frame = Frame::new_create_topic(name);
505 let frame_bytes = encode_frame(&frame);
506
507 trace!(topic_name = %name, "Creating topic");
508
509 tokio::time::timeout(
510 self.config.write_timeout,
511 self.stream.write_all(&frame_bytes),
512 )
513 .await
514 .map_err(|_| ClientError::Timeout)??;
515
516 let response = self.recv_frame().await?;
517 self.parse_topic_response(response)
518 }
519
520 pub async fn list_topics(&mut self) -> Result<Vec<TopicInfo>> {
522 let frame = Frame::new_list_topics();
523 let frame_bytes = encode_frame(&frame);
524
525 trace!("Listing topics");
526
527 tokio::time::timeout(
528 self.config.write_timeout,
529 self.stream.write_all(&frame_bytes),
530 )
531 .await
532 .map_err(|_| ClientError::Timeout)??;
533
534 let response = self.recv_frame().await?;
535 self.parse_topic_list_response(response)
536 }
537
538 pub async fn get_topic(&mut self, topic_id: u32) -> Result<TopicInfo> {
540 let frame = Frame::new_get_topic(topic_id);
541 let frame_bytes = encode_frame(&frame);
542
543 trace!(topic_id, "Getting topic");
544
545 tokio::time::timeout(
546 self.config.write_timeout,
547 self.stream.write_all(&frame_bytes),
548 )
549 .await
550 .map_err(|_| ClientError::Timeout)??;
551
552 let response = self.recv_frame().await?;
553 self.parse_topic_response(response)
554 }
555
556 pub async fn delete_topic(&mut self, topic_id: u32) -> Result<()> {
558 let frame = Frame::new_delete_topic(topic_id);
559 let frame_bytes = encode_frame(&frame);
560
561 trace!(topic_id, "Deleting topic");
562
563 tokio::time::timeout(
564 self.config.write_timeout,
565 self.stream.write_all(&frame_bytes),
566 )
567 .await
568 .map_err(|_| ClientError::Timeout)??;
569
570 let response = self.recv_frame().await?;
571 self.parse_delete_response(response)
572 }
573
574 pub async fn set_retention(
581 &mut self,
582 topic_id: u32,
583 max_age_secs: u64,
584 max_bytes: u64,
585 ) -> Result<()> {
586 let frame = Frame::new_set_retention(topic_id, max_age_secs, max_bytes);
587 let frame_bytes = encode_frame(&frame);
588
589 trace!(
590 topic_id,
591 max_age_secs, max_bytes, "Setting retention policy"
592 );
593
594 tokio::time::timeout(
595 self.config.write_timeout,
596 self.stream.write_all(&frame_bytes),
597 )
598 .await
599 .map_err(|_| ClientError::Timeout)??;
600
601 let response = self.recv_frame().await?;
602 self.parse_retention_response(response)
603 }
604
605 pub async fn create_topic_with_retention(
612 &mut self,
613 name: &str,
614 max_age_secs: u64,
615 max_bytes: u64,
616 ) -> Result<TopicInfo> {
617 let frame = Frame::new_create_topic_with_retention(name, max_age_secs, max_bytes);
618 let frame_bytes = encode_frame(&frame);
619
620 trace!(
621 name,
622 max_age_secs, max_bytes, "Creating topic with retention"
623 );
624
625 tokio::time::timeout(
626 self.config.write_timeout,
627 self.stream.write_all(&frame_bytes),
628 )
629 .await
630 .map_err(|_| ClientError::Timeout)??;
631
632 let response = self.recv_frame().await?;
633 self.parse_topic_response(response)
634 }
635
636 pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus> {
638 let frame = Frame::new_get_cluster_status();
639 let frame_bytes = encode_frame(&frame);
640
641 tokio::time::timeout(
642 self.config.write_timeout,
643 self.stream.write_all(&frame_bytes),
644 )
645 .await
646 .map_err(|_| ClientError::Timeout)??;
647
648 let response = self.recv_frame().await?;
649 self.parse_cluster_status_response(response)
650 }
651
652 fn parse_cluster_status_response(&self, frame: Frame) -> Result<ClusterStatus> {
653 match frame.frame_type {
654 FrameType::Control(ControlCommand::ClusterStatusResponse) => {
655 let payload = frame.payload.ok_or_else(|| {
656 ClientError::InvalidResponse("Empty cluster status response".to_string())
657 })?;
658 let json: serde_json::Value = serde_json::from_slice(&payload)
659 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
660
661 let peer_states: std::collections::HashMap<u16, String> = json["peer_states"]
662 .as_object()
663 .map(|obj| {
664 obj.iter()
665 .filter_map(|(k, v)| {
666 k.parse::<u16>()
667 .ok()
668 .map(|id| (id, v.as_str().unwrap_or("unknown").to_string()))
669 })
670 .collect()
671 })
672 .unwrap_or_default();
673
674 Ok(ClusterStatus {
675 node_id: json["node_id"].as_u64().unwrap_or(0) as u16,
676 is_leader: json["is_leader"].as_bool().unwrap_or(false),
677 leader_id: json["leader_id"].as_u64().map(|id| id as u16),
678 current_term: json["current_term"].as_u64().unwrap_or(0),
679 node_count: json["node_count"].as_u64().unwrap_or(1) as usize,
680 healthy_nodes: json["healthy_nodes"].as_u64().unwrap_or(1) as usize,
681 quorum_available: json["quorum_available"].as_bool().unwrap_or(true),
682 peer_states,
683 })
684 },
685 FrameType::Control(ControlCommand::ErrorResponse) => {
686 let error_msg = frame
687 .payload
688 .map(|p| String::from_utf8_lossy(&p).to_string())
689 .unwrap_or_else(|| "Unknown error".to_string());
690 Err(ClientError::ServerError(error_msg))
691 },
692 other => Err(ClientError::InvalidResponse(format!(
693 "Expected ClusterStatusResponse, got {:?}",
694 other
695 ))),
696 }
697 }
698
699 pub async fn fetch(
702 &mut self,
703 topic_id: u32,
704 start_offset: u64,
705 max_bytes: u32,
706 ) -> Result<FetchResult> {
707 let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
708 let frame_bytes = encode_frame(&frame);
709
710 trace!(topic_id, start_offset, max_bytes, "Fetching data");
711
712 tokio::time::timeout(
713 self.config.write_timeout,
714 self.stream.write_all(&frame_bytes),
715 )
716 .await
717 .map_err(|_| ClientError::Timeout)??;
718
719 let response = self.recv_frame().await?;
720 self.parse_fetch_response(response)
721 }
722
723 pub async fn subscribe(
726 &mut self,
727 topic_id: u32,
728 start_offset: u64,
729 max_batch_bytes: u32,
730 consumer_id: u64,
731 ) -> Result<SubscribeResult> {
732 let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
733 let frame_bytes = encode_frame(&frame);
734
735 trace!(topic_id, start_offset, consumer_id, "Subscribing to topic");
736
737 tokio::time::timeout(
738 self.config.write_timeout,
739 self.stream.write_all(&frame_bytes),
740 )
741 .await
742 .map_err(|_| ClientError::Timeout)??;
743
744 let response = self.recv_frame().await?;
745 self.parse_subscribe_response(response)
746 }
747
748 pub async fn unsubscribe(&mut self, topic_id: u32, consumer_id: u64) -> Result<()> {
750 let frame = Frame::new_unsubscribe(topic_id, consumer_id);
751 let frame_bytes = encode_frame(&frame);
752
753 trace!(topic_id, consumer_id, "Unsubscribing from topic");
754
755 tokio::time::timeout(
756 self.config.write_timeout,
757 self.stream.write_all(&frame_bytes),
758 )
759 .await
760 .map_err(|_| ClientError::Timeout)??;
761
762 let response = self.recv_frame().await?;
764 match response.frame_type {
765 FrameType::Ack => Ok(()),
766 FrameType::Control(ControlCommand::ErrorResponse) => {
767 let error_msg = response
768 .payload
769 .map(|p| String::from_utf8_lossy(&p).to_string())
770 .unwrap_or_else(|| "Unknown error".to_string());
771 Err(ClientError::ServerError(error_msg))
772 },
773 other => Err(ClientError::InvalidResponse(format!(
774 "Expected Ack, got {:?}",
775 other
776 ))),
777 }
778 }
779
780 pub async fn commit_offset(
782 &mut self,
783 topic_id: u32,
784 consumer_id: u64,
785 offset: u64,
786 ) -> Result<CommitResult> {
787 let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
788 let frame_bytes = encode_frame(&frame);
789
790 trace!(topic_id, consumer_id, offset, "Committing offset");
791
792 tokio::time::timeout(
793 self.config.write_timeout,
794 self.stream.write_all(&frame_bytes),
795 )
796 .await
797 .map_err(|_| ClientError::Timeout)??;
798
799 let response = self.recv_frame().await?;
800 self.parse_commit_response(response)
801 }
802
803 fn parse_subscribe_response(&self, frame: Frame) -> Result<SubscribeResult> {
804 match frame.frame_type {
805 FrameType::Control(ControlCommand::SubscribeAck) => {
806 let payload = frame.payload.ok_or_else(|| {
807 ClientError::InvalidResponse("Empty subscribe response".to_string())
808 })?;
809
810 if payload.len() < 16 {
811 return Err(ClientError::ProtocolError(
812 "Subscribe response too small".to_string(),
813 ));
814 }
815
816 let consumer_id = u64::from_le_bytes([
817 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
818 payload[6], payload[7],
819 ]);
820 let start_offset = u64::from_le_bytes([
821 payload[8],
822 payload[9],
823 payload[10],
824 payload[11],
825 payload[12],
826 payload[13],
827 payload[14],
828 payload[15],
829 ]);
830
831 Ok(SubscribeResult {
832 consumer_id,
833 start_offset,
834 })
835 },
836 FrameType::Control(ControlCommand::ErrorResponse) => {
837 let error_msg = frame
838 .payload
839 .map(|p| String::from_utf8_lossy(&p).to_string())
840 .unwrap_or_else(|| "Unknown error".to_string());
841 Err(ClientError::ServerError(error_msg))
842 },
843 other => Err(ClientError::InvalidResponse(format!(
844 "Expected SubscribeAck, got {:?}",
845 other
846 ))),
847 }
848 }
849
850 fn parse_commit_response(&self, frame: Frame) -> Result<CommitResult> {
851 match frame.frame_type {
852 FrameType::Control(ControlCommand::CommitAck) => {
853 let payload = frame.payload.ok_or_else(|| {
854 ClientError::InvalidResponse("Empty commit response".to_string())
855 })?;
856
857 if payload.len() < 16 {
858 return Err(ClientError::ProtocolError(
859 "Commit response too small".to_string(),
860 ));
861 }
862
863 let consumer_id = u64::from_le_bytes([
864 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
865 payload[6], payload[7],
866 ]);
867 let committed_offset = u64::from_le_bytes([
868 payload[8],
869 payload[9],
870 payload[10],
871 payload[11],
872 payload[12],
873 payload[13],
874 payload[14],
875 payload[15],
876 ]);
877
878 Ok(CommitResult {
879 consumer_id,
880 committed_offset,
881 })
882 },
883 FrameType::Control(ControlCommand::ErrorResponse) => {
884 let error_msg = frame
885 .payload
886 .map(|p| String::from_utf8_lossy(&p).to_string())
887 .unwrap_or_else(|| "Unknown error".to_string());
888 Err(ClientError::ServerError(error_msg))
889 },
890 other => Err(ClientError::InvalidResponse(format!(
891 "Expected CommitAck, got {:?}",
892 other
893 ))),
894 }
895 }
896
897 fn parse_fetch_response(&self, frame: Frame) -> Result<FetchResult> {
898 match frame.frame_type {
899 FrameType::Control(ControlCommand::FetchResponse) => {
900 let payload = frame.payload.ok_or_else(|| {
901 ClientError::InvalidResponse("Empty fetch response".to_string())
902 })?;
903
904 if payload.len() < 16 {
905 return Err(ClientError::ProtocolError(
906 "Fetch response too small".to_string(),
907 ));
908 }
909
910 let next_offset = u64::from_le_bytes([
911 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
912 payload[6], payload[7],
913 ]);
914 let bytes_returned =
915 u32::from_le_bytes([payload[8], payload[9], payload[10], payload[11]]);
916 let record_count =
917 u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
918 let data = payload.slice(16..);
919
920 Ok(FetchResult {
921 data,
922 next_offset,
923 bytes_returned,
924 record_count,
925 })
926 },
927 FrameType::Control(ControlCommand::ErrorResponse) => {
928 let error_msg = frame
929 .payload
930 .map(|p| String::from_utf8_lossy(&p).to_string())
931 .unwrap_or_else(|| "Unknown error".to_string());
932 Err(ClientError::ServerError(error_msg))
933 },
934 other => Err(ClientError::InvalidResponse(format!(
935 "Expected FetchResponse, got {:?}",
936 other
937 ))),
938 }
939 }
940
941 fn parse_delete_response(&self, frame: Frame) -> Result<()> {
942 expect_success_response(frame)
943 }
944
945 fn parse_retention_response(&self, frame: Frame) -> Result<()> {
946 expect_success_response(frame)
947 }
948
949 fn parse_topic_response(&self, frame: Frame) -> Result<TopicInfo> {
950 match frame.frame_type {
951 FrameType::Control(ControlCommand::TopicResponse) => {
952 let payload = frame.payload.ok_or_else(|| {
953 ClientError::InvalidResponse("Empty topic response".to_string())
954 })?;
955 let json: serde_json::Value = serde_json::from_slice(&payload)
956 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
957
958 let retention = if json.get("retention").is_some() {
959 Some(RetentionInfo {
960 max_age_secs: json["retention"]["max_age_secs"].as_u64().unwrap_or(0),
961 max_bytes: json["retention"]["max_bytes"].as_u64().unwrap_or(0),
962 })
963 } else {
964 None
965 };
966
967 Ok(TopicInfo {
968 id: json["id"].as_u64().unwrap_or(0) as u32,
969 name: json["name"].as_str().unwrap_or("").to_string(),
970 created_at: json["created_at"].as_u64().unwrap_or(0),
971 retention,
972 })
973 },
974 FrameType::Control(ControlCommand::ErrorResponse) => {
975 let error_msg = frame
976 .payload
977 .map(|p| String::from_utf8_lossy(&p).to_string())
978 .unwrap_or_else(|| "Unknown error".to_string());
979 Err(ClientError::ServerError(error_msg))
980 },
981 other => Err(ClientError::InvalidResponse(format!(
982 "Expected TopicResponse, got {:?}",
983 other
984 ))),
985 }
986 }
987
988 fn parse_topic_list_response(&self, frame: Frame) -> Result<Vec<TopicInfo>> {
989 match frame.frame_type {
990 FrameType::Control(ControlCommand::TopicResponse) => {
991 let payload = frame.payload.ok_or_else(|| {
992 ClientError::InvalidResponse("Empty topic list response".to_string())
993 })?;
994 let json: serde_json::Value = serde_json::from_slice(&payload)
995 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
996
997 let topics = json["topics"]
998 .as_array()
999 .map(|arr| {
1000 arr.iter()
1001 .map(|t| {
1002 let retention = if t.get("retention").is_some() {
1003 Some(RetentionInfo {
1004 max_age_secs: t["retention"]["max_age_secs"]
1005 .as_u64()
1006 .unwrap_or(0),
1007 max_bytes: t["retention"]["max_bytes"]
1008 .as_u64()
1009 .unwrap_or(0),
1010 })
1011 } else {
1012 None
1013 };
1014 TopicInfo {
1015 id: t["id"].as_u64().unwrap_or(0) as u32,
1016 name: t["name"].as_str().unwrap_or("").to_string(),
1017 created_at: t["created_at"].as_u64().unwrap_or(0),
1018 retention,
1019 }
1020 })
1021 .collect()
1022 })
1023 .unwrap_or_default();
1024
1025 Ok(topics)
1026 },
1027 FrameType::Control(ControlCommand::ErrorResponse) => {
1028 let error_msg = frame
1029 .payload
1030 .map(|p| String::from_utf8_lossy(&p).to_string())
1031 .unwrap_or_else(|| "Unknown error".to_string());
1032 Err(ClientError::ServerError(error_msg))
1033 },
1034 other => Err(ClientError::InvalidResponse(format!(
1035 "Expected TopicResponse, got {:?}",
1036 other
1037 ))),
1038 }
1039 }
1040
1041 async fn recv_frame(&mut self) -> Result<Frame> {
1042 loop {
1043 if self.read_offset >= LWP_HEADER_SIZE {
1044 if let Some((frame, consumed)) = parse_frame(&self.read_buffer[..self.read_offset])?
1045 {
1046 self.read_buffer.copy_within(consumed..self.read_offset, 0);
1047 self.read_offset -= consumed;
1048 return Ok(frame);
1049 }
1050 }
1051
1052 let n = tokio::time::timeout(
1053 self.config.read_timeout,
1054 self.stream.read(&mut self.read_buffer[self.read_offset..]),
1055 )
1056 .await
1057 .map_err(|_| ClientError::Timeout)??;
1058
1059 if n == 0 {
1060 return Err(ClientError::ConnectionClosed);
1061 }
1062
1063 self.read_offset += n;
1064 }
1065 }
1066
1067 pub fn config(&self) -> &ClientConfig {
1069 &self.config
1070 }
1071
1072 pub async fn close(mut self) -> Result<()> {
1074 self.stream.shutdown().await?;
1075 Ok(())
1076 }
1077}
1078
1079impl std::fmt::Debug for LanceClient {
1080 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1081 f.debug_struct("LanceClient")
1082 .field("addr", &self.config.addr)
1083 .field("batch_id", &self.batch_id.load(Ordering::SeqCst))
1084 .finish()
1085 }
1086}