1use crate::error::{ClientError, Result};
2use crate::tls::TlsClientConfig;
3use bytes::Bytes;
4use lnc_network::{
5 ControlCommand, Frame, FrameType, LWP_HEADER_SIZE, TlsConnector, encode_frame, parse_frame,
6};
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
13use tokio::net::TcpStream;
14use tokio::net::lookup_host;
15use tracing::{debug, trace, warn};
16
17#[allow(clippy::large_enum_variant)]
19pub enum ClientStream {
20 Tcp(TcpStream),
22 Tls(tokio_rustls::client::TlsStream<TcpStream>),
24}
25
26impl AsyncRead for ClientStream {
27 fn poll_read(
31 self: Pin<&mut Self>,
32 cx: &mut Context<'_>,
33 buf: &mut ReadBuf<'_>,
34 ) -> Poll<std::io::Result<()>> {
35 match self.get_mut() {
36 ClientStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
37 ClientStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
38 }
39 }
40}
41
42impl AsyncWrite for ClientStream {
43 fn poll_write(
48 self: Pin<&mut Self>,
49 cx: &mut Context<'_>,
50 buf: &[u8],
51 ) -> Poll<std::io::Result<usize>> {
52 match self.get_mut() {
53 ClientStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
54 ClientStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
55 }
56 }
57
58 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
62 match self.get_mut() {
63 ClientStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
64 ClientStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
65 }
66 }
67
68 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
72 match self.get_mut() {
73 ClientStream::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
74 ClientStream::Tls(stream) => Pin::new(stream).poll_shutdown(cx),
75 }
76 }
77}
78
79fn extract_error_message(frame: &Frame) -> String {
81 frame
82 .payload
83 .as_ref()
84 .map(|p| String::from_utf8_lossy(p).to_string())
85 .unwrap_or_else(|| "Unknown error".to_string())
86}
87
88#[allow(dead_code)] fn expect_frame_type(frame: Frame, expected: ControlCommand, expected_name: &str) -> Result<Frame> {
91 match frame.frame_type {
92 FrameType::Control(cmd) if cmd == expected => Ok(frame),
93 FrameType::Control(ControlCommand::ErrorResponse) => {
94 Err(ClientError::ServerError(extract_error_message(&frame)))
95 },
96 other => Err(ClientError::InvalidResponse(format!(
97 "Expected {}, got {:?}",
98 expected_name, other
99 ))),
100 }
101}
102
103fn expect_success_response(frame: Frame) -> Result<()> {
105 match frame.frame_type {
106 FrameType::Control(ControlCommand::TopicResponse) => Ok(()),
107 FrameType::Control(ControlCommand::ErrorResponse) => {
108 Err(ClientError::ServerError(extract_error_message(&frame)))
109 },
110 other => Err(ClientError::InvalidResponse(format!(
111 "Expected TopicResponse, got {:?}",
112 other
113 ))),
114 }
115}
116
117#[derive(Debug, Clone, Default)]
119pub struct AuthConfig {
120 pub mtls_enabled: bool,
122 pub client_cert_path: Option<String>,
124 pub client_key_path: Option<String>,
126}
127
128#[derive(Debug, Clone, Default)]
130pub struct RetentionInfo {
131 pub max_age_secs: u64,
133 pub max_bytes: u64,
135}
136
137#[derive(Debug, Clone)]
139pub struct TopicInfo {
140 pub id: u32,
142 pub name: String,
144 pub created_at: u64,
146 pub topic_epoch: u64,
148 pub retention: Option<RetentionInfo>,
150}
151
152#[derive(Debug, Clone)]
154pub struct FetchResult {
155 pub data: Bytes,
157 pub next_offset: u64,
159 pub bytes_returned: u32,
161 pub record_count: u32,
163}
164
165#[derive(Debug, Clone)]
167pub struct SubscribeResult {
168 pub consumer_id: u64,
170 pub start_offset: u64,
172}
173
174#[derive(Debug, Clone)]
176pub struct CommitResult {
177 pub consumer_id: u64,
179 pub committed_offset: u64,
181}
182
183#[derive(Debug, Clone)]
185pub struct ClusterStatus {
186 pub node_id: u16,
187 pub is_leader: bool,
188 pub leader_id: Option<u16>,
189 pub current_term: u64,
190 pub node_count: usize,
191 pub healthy_nodes: usize,
192 pub quorum_available: bool,
193 pub peer_states: std::collections::HashMap<u16, String>,
194}
195
196#[derive(Debug, Clone)]
198pub struct ClientConfig {
199 pub addr: String,
201 pub connect_timeout: Duration,
203 pub read_timeout: Duration,
205 pub write_timeout: Duration,
207 pub keepalive_interval: Duration,
209 pub tls: Option<TlsClientConfig>,
211}
212
213impl Default for ClientConfig {
214 fn default() -> Self {
215 Self {
216 addr: "127.0.0.1:1992".to_string(),
217 connect_timeout: Duration::from_secs(10),
218 read_timeout: Duration::from_secs(30),
219 write_timeout: Duration::from_secs(10),
220 keepalive_interval: Duration::from_secs(10),
221 tls: None,
222 }
223 }
224}
225
226impl ClientConfig {
227 pub fn new(addr: impl Into<String>) -> Self {
233 Self {
234 addr: addr.into(),
235 ..Default::default()
236 }
237 }
238
239 pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
250 self.tls = Some(tls_config);
251 self
252 }
253
254 pub fn is_tls_enabled(&self) -> bool {
256 self.tls.is_some()
257 }
258}
259
260pub struct LanceClient {
264 stream: ClientStream,
265 config: ClientConfig,
266 batch_id: AtomicU64,
267 read_buffer: Vec<u8>,
268 read_offset: usize,
269}
270
271impl LanceClient {
272 async fn resolve_address(addr: &str) -> Result<SocketAddr> {
287 if let Ok(socket_addr) = addr.parse::<SocketAddr>() {
289 return Ok(socket_addr);
290 }
291
292 let mut addrs = lookup_host(addr).await.map_err(|e| {
294 ClientError::ProtocolError(format!("DNS resolution failed for '{}': {}", addr, e))
295 })?;
296
297 addrs
298 .next()
299 .ok_or_else(|| ClientError::ProtocolError(format!("No addresses found for '{}'", addr)))
300 }
301
302 pub async fn connect(config: ClientConfig) -> Result<Self> {
307 if let Some(ref tls_config) = config.tls {
309 return Self::connect_tls(config.clone(), tls_config.clone()).await;
310 }
311
312 debug!(addr = %config.addr, "Connecting to LANCE server");
313
314 let socket_addr = Self::resolve_address(&config.addr).await?;
316 debug!(resolved_addr = %socket_addr, "Resolved server address");
317
318 let stream = tokio::time::timeout(config.connect_timeout, TcpStream::connect(socket_addr))
319 .await
320 .map_err(|_| ClientError::Timeout)?
321 .map_err(ClientError::ConnectionFailed)?;
322
323 stream.set_nodelay(true)?;
324
325 debug!(addr = %config.addr, "Connected to LANCE server");
326
327 Ok(Self {
328 stream: ClientStream::Tcp(stream),
329 config,
330 batch_id: AtomicU64::new(0),
331 read_buffer: vec![0u8; 64 * 1024],
332 read_offset: 0,
333 })
334 }
335
336 pub async fn connect_tls(config: ClientConfig, tls_config: TlsClientConfig) -> Result<Self> {
353 debug!(addr = %config.addr, "Connecting to LANCE server with TLS");
354
355 let socket_addr = Self::resolve_address(&config.addr).await?;
357 debug!(resolved_addr = %socket_addr, "Resolved server address");
358
359 let tcp_stream =
361 tokio::time::timeout(config.connect_timeout, TcpStream::connect(socket_addr))
362 .await
363 .map_err(|_| ClientError::Timeout)?
364 .map_err(ClientError::ConnectionFailed)?;
365
366 tcp_stream.set_nodelay(true)?;
367
368 let network_config = tls_config.to_network_config();
370 let connector =
371 TlsConnector::new(network_config).map_err(|e| ClientError::TlsError(e.to_string()))?;
372
373 let server_name = tls_config.server_name.unwrap_or_else(|| {
375 config
377 .addr
378 .rsplit_once(':')
379 .map(|(host, _)| host.to_string())
380 .unwrap_or_else(|| socket_addr.ip().to_string())
381 });
382
383 let tls_stream = connector
385 .connect(&server_name, tcp_stream)
386 .await
387 .map_err(|e| ClientError::TlsError(e.to_string()))?;
388
389 debug!(addr = %config.addr, "TLS connection established");
390
391 Ok(Self {
392 stream: ClientStream::Tls(tls_stream),
393 config,
394 batch_id: AtomicU64::new(0),
395 read_buffer: vec![0u8; 64 * 1024],
396 read_offset: 0,
397 })
398 }
399
400 pub async fn connect_to(addr: &str) -> Result<Self> {
405 Self::connect(ClientConfig::new(addr)).await
406 }
407
408 pub async fn connect_tls_to(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
413 Self::connect_tls(ClientConfig::new(addr), tls_config).await
414 }
415
416 fn next_batch_id(&self) -> u64 {
417 self.batch_id.fetch_add(1, Ordering::SeqCst) + 1
418 }
419
420 pub async fn send_ingest(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
434 self.send_ingest_to_topic(0, payload, record_count, None)
435 .await
436 }
437
438 pub async fn send_ingest_to_topic(
451 &mut self,
452 topic_id: u32,
453 payload: Bytes,
454 record_count: u32,
455 _auth_config: Option<&AuthConfig>,
456 ) -> Result<u64> {
457 let batch_id = self.next_batch_id();
458 let timestamp_ns = std::time::SystemTime::now()
459 .duration_since(std::time::UNIX_EPOCH)
460 .map(|d| d.as_nanos() as u64)
461 .unwrap_or(0);
462
463 let frame =
464 Frame::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, topic_id);
465 let frame_bytes = encode_frame(&frame);
466
467 trace!(
468 batch_id,
469 topic_id,
470 payload_len = frame.payload_length(),
471 "Sending ingest frame"
472 );
473
474 tokio::time::timeout(
475 self.config.write_timeout,
476 self.stream.write_all(&frame_bytes),
477 )
478 .await
479 .map_err(|_| ClientError::Timeout)??;
480
481 Ok(batch_id)
482 }
483
484 pub async fn send_ingest_sync(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
495 self.send_ingest_to_topic_sync(0, payload, record_count, None)
496 .await
497 }
498
499 pub async fn send_ingest_to_topic_sync(
512 &mut self,
513 topic_id: u32,
514 payload: Bytes,
515 record_count: u32,
516 auth_config: Option<&AuthConfig>,
517 ) -> Result<u64> {
518 let batch_id = self
519 .send_ingest_to_topic(topic_id, payload, record_count, auth_config)
520 .await?;
521 self.wait_for_ack(batch_id).await
522 }
523
524 async fn wait_for_ack(&mut self, expected_batch_id: u64) -> Result<u64> {
537 let frame = self.recv_frame().await?;
538
539 match frame.frame_type {
540 FrameType::Ack => {
541 let acked_id = frame.batch_id();
542 if acked_id != expected_batch_id {
543 return Err(ClientError::InvalidResponse(format!(
544 "Ack batch_id mismatch: sent {}, received {}",
545 expected_batch_id, acked_id
546 )));
547 }
548 trace!(batch_id = acked_id, "Received ack");
549 Ok(acked_id)
550 },
551 FrameType::Control(ControlCommand::ErrorResponse) => {
552 let error_msg = frame
553 .payload
554 .map(|p| String::from_utf8_lossy(&p).to_string())
555 .unwrap_or_else(|| "Unknown error".to_string());
556 Err(ClientError::ServerError(error_msg))
557 },
558 FrameType::Backpressure => {
559 warn!("Server signaled backpressure");
560 Err(ClientError::ServerBackpressure)
561 },
562 other => Err(ClientError::InvalidResponse(format!(
563 "Expected Ack, got {:?}",
564 other
565 ))),
566 }
567 }
568
569 pub async fn recv_ack(&mut self) -> Result<u64> {
580 let frame = self.recv_frame().await?;
581
582 match frame.frame_type {
583 FrameType::Ack => {
584 trace!(batch_id = frame.batch_id(), "Received ack");
585 Ok(frame.batch_id())
586 },
587 FrameType::Backpressure => {
588 warn!("Server signaled backpressure");
589 Err(ClientError::ServerBackpressure)
590 },
591 other => Err(ClientError::InvalidResponse(format!(
592 "Expected Ack, got {:?}",
593 other
594 ))),
595 }
596 }
597
598 pub async fn send_keepalive(&mut self) -> Result<()> {
605 let frame = Frame::new_keepalive();
606 let frame_bytes = encode_frame(&frame);
607
608 trace!("Sending keepalive");
609
610 tokio::time::timeout(
611 self.config.write_timeout,
612 self.stream.write_all(&frame_bytes),
613 )
614 .await
615 .map_err(|_| ClientError::Timeout)??;
616
617 Ok(())
618 }
619
620 pub async fn recv_keepalive(&mut self) -> Result<()> {
630 let frame = self.recv_frame().await?;
631
632 match frame.frame_type {
633 FrameType::Keepalive => {
634 trace!("Received keepalive response");
635 Ok(())
636 },
637 other => Err(ClientError::InvalidResponse(format!(
638 "Expected Keepalive, got {:?}",
639 other
640 ))),
641 }
642 }
643
644 pub async fn ping(&mut self) -> Result<Duration> {
646 let start = std::time::Instant::now();
647 self.send_keepalive().await?;
648 self.recv_keepalive().await?;
649 Ok(start.elapsed())
650 }
651
652 pub async fn create_topic(&mut self, name: &str) -> Result<TopicInfo> {
654 const DEFAULT_CREATE_TOPIC_ATTEMPTS: usize = 20;
655 const DEFAULT_CREATE_TOPIC_BACKOFF_MS: u64 = 500;
656 self.ensure_topic(
657 name,
658 DEFAULT_CREATE_TOPIC_ATTEMPTS,
659 DEFAULT_CREATE_TOPIC_BACKOFF_MS,
660 )
661 .await
662 }
663
664 async fn create_topic_once(&mut self, name: &str) -> Result<TopicInfo> {
665 let frame = Frame::new_create_topic(name);
666 let frame_bytes = encode_frame(&frame);
667
668 trace!(topic_name = %name, "Creating topic");
669
670 tokio::time::timeout(
671 self.config.write_timeout,
672 self.stream.write_all(&frame_bytes),
673 )
674 .await
675 .map_err(|_| ClientError::Timeout)??;
676
677 let response = self.recv_frame().await?;
678 self.parse_topic_response(response)
679 }
680
681 pub async fn ensure_topic(
686 &mut self,
687 name: &str,
688 max_attempts: usize,
689 base_backoff_ms: u64,
690 ) -> Result<TopicInfo> {
691 let attempts = max_attempts.max(1);
692 let mut last_error: Option<ClientError> = None;
693 let mut saw_retryable_error = false;
694
695 for attempt in 1..=attempts {
696 let mut retryable_this_attempt = false;
697
698 match self.create_topic_once(name).await {
699 Ok(info) => {
700 trace!(
701 topic_id = info.id,
702 topic_name = %info.name,
703 attempt,
704 max_attempts = attempts,
705 "Topic ensured via create_topic"
706 );
707 return Ok(info);
708 },
709 Err(create_err) => {
710 if create_err.is_retryable() {
711 retryable_this_attempt = true;
712 saw_retryable_error = true;
713 }
714 last_error = Some(ClientError::ServerError(create_err.to_string()));
715 warn!(
716 topic_name = %name,
717 attempt,
718 max_attempts = attempts,
719 error = %create_err,
720 "create_topic failed during ensure_topic; retrying with list fallback"
721 );
722 },
723 }
724
725 match self.list_topics().await {
726 Ok(topics) => {
727 if let Some(topic) = topics.into_iter().find(|t| t.name == name) {
728 trace!(
729 topic_id = topic.id,
730 topic_name = %topic.name,
731 attempt,
732 max_attempts = attempts,
733 "Topic ensured via list_topics fallback"
734 );
735 return Ok(topic);
736 }
737 },
738 Err(list_err) => {
739 if list_err.is_retryable() {
740 retryable_this_attempt = true;
741 saw_retryable_error = true;
742 }
743 last_error = Some(ClientError::ServerError(list_err.to_string()));
744 warn!(
745 topic_name = %name,
746 attempt,
747 max_attempts = attempts,
748 error = %list_err,
749 "list_topics failed during ensure_topic"
750 );
751 },
752 }
753
754 if attempt < attempts {
755 let backoff_ms = if retryable_this_attempt {
756 base_backoff_ms.saturating_mul(attempt as u64).max(1)
757 } else {
758 base_backoff_ms.max(1)
760 };
761 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
762
763 let reconnect_config = self.config.clone();
766 match Self::connect(reconnect_config).await {
767 Ok(new_client) => {
768 *self = new_client;
769 },
770 Err(reconnect_err) => {
771 warn!(
772 topic_name = %name,
773 attempt,
774 max_attempts = attempts,
775 error = %reconnect_err,
776 "ensure_topic reconnect attempt failed"
777 );
778 last_error = Some(reconnect_err);
779 },
780 }
781 }
782 }
783
784 if let Some(err) = last_error {
785 return Err(ClientError::ServerError(format!(
786 "ensure_topic('{}') failed after {} attempts: {}",
787 name, attempts, err
788 )));
789 }
790
791 if saw_retryable_error {
792 return Err(ClientError::ServerError(format!(
793 "ensure_topic('{}') exhausted {} retryable attempts",
794 name, attempts
795 )));
796 }
797
798 Err(ClientError::ServerError(format!(
799 "topic '{}' not found after {} ensure_topic attempts",
800 name, attempts
801 )))
802 }
803
804 pub async fn ensure_topic_default(&mut self, name: &str) -> Result<TopicInfo> {
806 const DEFAULT_ENSURE_TOPIC_ATTEMPTS: usize = 20;
807 const DEFAULT_ENSURE_TOPIC_BACKOFF_MS: u64 = 500;
808 self.ensure_topic(
809 name,
810 DEFAULT_ENSURE_TOPIC_ATTEMPTS,
811 DEFAULT_ENSURE_TOPIC_BACKOFF_MS,
812 )
813 .await
814 }
815
816 pub async fn list_topics(&mut self) -> Result<Vec<TopicInfo>> {
818 let frame = Frame::new_list_topics();
819 let frame_bytes = encode_frame(&frame);
820
821 trace!("Listing topics");
822
823 tokio::time::timeout(
824 self.config.write_timeout,
825 self.stream.write_all(&frame_bytes),
826 )
827 .await
828 .map_err(|_| ClientError::Timeout)??;
829
830 let response = self.recv_frame().await?;
831 self.parse_topic_list_response(response)
832 }
833
834 pub async fn get_topic(&mut self, topic_id: u32) -> Result<TopicInfo> {
836 let frame = Frame::new_get_topic(topic_id);
837 let frame_bytes = encode_frame(&frame);
838
839 trace!(topic_id, "Getting topic");
840
841 tokio::time::timeout(
842 self.config.write_timeout,
843 self.stream.write_all(&frame_bytes),
844 )
845 .await
846 .map_err(|_| ClientError::Timeout)??;
847
848 let response = self.recv_frame().await?;
849 self.parse_topic_response(response)
850 }
851
852 pub async fn delete_topic(&mut self, topic_id: u32) -> Result<()> {
854 let frame = Frame::new_delete_topic(topic_id);
855 let frame_bytes = encode_frame(&frame);
856
857 trace!(topic_id, "Deleting topic");
858
859 tokio::time::timeout(
860 self.config.write_timeout,
861 self.stream.write_all(&frame_bytes),
862 )
863 .await
864 .map_err(|_| ClientError::Timeout)??;
865
866 let response = self.recv_frame().await?;
867 self.parse_delete_response(response)
868 }
869
870 pub async fn set_retention(
878 &mut self,
879 topic_id: u32,
880 max_age_secs: u64,
881 max_bytes: u64,
882 ) -> Result<()> {
883 let frame = Frame::new_set_retention(topic_id, max_age_secs, max_bytes);
884 let frame_bytes = encode_frame(&frame);
885
886 trace!(
887 topic_id,
888 max_age_secs, max_bytes, "Setting retention policy"
889 );
890
891 tokio::time::timeout(
892 self.config.write_timeout,
893 self.stream.write_all(&frame_bytes),
894 )
895 .await
896 .map_err(|_| ClientError::Timeout)??;
897
898 let response = self.recv_frame().await?;
899 self.parse_retention_response(response)
900 }
901
902 pub async fn create_topic_with_retention(
909 &mut self,
910 name: &str,
911 max_age_secs: u64,
912 max_bytes: u64,
913 ) -> Result<TopicInfo> {
914 let frame = Frame::new_create_topic_with_retention(name, max_age_secs, max_bytes);
915 let frame_bytes = encode_frame(&frame);
916
917 trace!(
918 name,
919 max_age_secs, max_bytes, "Creating topic with retention"
920 );
921
922 tokio::time::timeout(
923 self.config.write_timeout,
924 self.stream.write_all(&frame_bytes),
925 )
926 .await
927 .map_err(|_| ClientError::Timeout)??;
928
929 let response = self.recv_frame().await?;
930 self.parse_topic_response(response)
931 }
932
933 pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus> {
935 let frame = Frame::new_get_cluster_status();
936 let frame_bytes = encode_frame(&frame);
937
938 tokio::time::timeout(
939 self.config.write_timeout,
940 self.stream.write_all(&frame_bytes),
941 )
942 .await
943 .map_err(|_| ClientError::Timeout)??;
944
945 let response = self.recv_frame().await?;
946 self.parse_cluster_status_response(response)
947 }
948
949 fn parse_cluster_status_response(&self, frame: Frame) -> Result<ClusterStatus> {
950 match frame.frame_type {
951 FrameType::Control(ControlCommand::ClusterStatusResponse) => {
952 let payload = frame.payload.ok_or_else(|| {
953 ClientError::InvalidResponse("Empty cluster status response".to_string())
954 })?;
955 let json: serde_json::Value = serde_json::from_slice(&payload)
956 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
957
958 let peer_states: std::collections::HashMap<u16, String> = json["peer_states"]
959 .as_object()
960 .map(|obj| {
961 obj.iter()
962 .filter_map(|(k, v)| {
963 k.parse::<u16>()
964 .ok()
965 .map(|id| (id, v.as_str().unwrap_or("unknown").to_string()))
966 })
967 .collect()
968 })
969 .unwrap_or_default();
970
971 Ok(ClusterStatus {
972 node_id: json["node_id"].as_u64().unwrap_or(0) as u16,
973 is_leader: json["is_leader"].as_bool().unwrap_or(false),
974 leader_id: json["leader_id"].as_u64().map(|id| id as u16),
975 current_term: json["current_term"].as_u64().unwrap_or(0),
976 node_count: json["node_count"].as_u64().unwrap_or(1) as usize,
977 healthy_nodes: json["healthy_nodes"].as_u64().unwrap_or(1) as usize,
978 quorum_available: json["quorum_available"].as_bool().unwrap_or(true),
979 peer_states,
980 })
981 },
982 FrameType::Control(ControlCommand::ErrorResponse) => {
983 let error_msg = frame
984 .payload
985 .map(|p| String::from_utf8_lossy(&p).to_string())
986 .unwrap_or_else(|| "Unknown error".to_string());
987 Err(ClientError::ServerError(error_msg))
988 },
989 other => Err(ClientError::InvalidResponse(format!(
990 "Expected ClusterStatusResponse, got {:?}",
991 other
992 ))),
993 }
994 }
995
996 pub async fn fetch(
999 &mut self,
1000 topic_id: u32,
1001 start_offset: u64,
1002 max_bytes: u32,
1003 ) -> Result<FetchResult> {
1004 let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
1005 let frame_bytes = encode_frame(&frame);
1006
1007 trace!(topic_id, start_offset, max_bytes, "Fetching data");
1008
1009 tokio::time::timeout(
1010 self.config.write_timeout,
1011 self.stream.write_all(&frame_bytes),
1012 )
1013 .await
1014 .map_err(|_| ClientError::Timeout)??;
1015
1016 let response = self.recv_frame().await?;
1017 self.parse_fetch_response(response)
1018 }
1019
1020 pub async fn subscribe(
1037 &mut self,
1038 topic_id: u32,
1039 start_offset: u64,
1040 max_batch_bytes: u32,
1041 consumer_id: u64,
1042 ) -> Result<SubscribeResult> {
1043 let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
1044 let frame_bytes = encode_frame(&frame);
1045
1046 trace!(topic_id, start_offset, consumer_id, "Subscribing to topic");
1047
1048 tokio::time::timeout(
1049 self.config.write_timeout,
1050 self.stream.write_all(&frame_bytes),
1051 )
1052 .await
1053 .map_err(|_| ClientError::Timeout)??;
1054
1055 let response = self.recv_frame().await?;
1056 self.parse_subscribe_response(response)
1057 }
1058
1059 pub async fn unsubscribe(&mut self, topic_id: u32, consumer_id: u64) -> Result<()> {
1073 let frame = Frame::new_unsubscribe(topic_id, consumer_id);
1074 let frame_bytes = encode_frame(&frame);
1075
1076 trace!(topic_id, consumer_id, "Unsubscribing from topic");
1077
1078 tokio::time::timeout(
1079 self.config.write_timeout,
1080 self.stream.write_all(&frame_bytes),
1081 )
1082 .await
1083 .map_err(|_| ClientError::Timeout)??;
1084
1085 let response = self.recv_frame().await?;
1087 match response.frame_type {
1088 FrameType::Ack => Ok(()),
1089 FrameType::Control(ControlCommand::ErrorResponse) => {
1090 let error_msg = response
1091 .payload
1092 .map(|p| String::from_utf8_lossy(&p).to_string())
1093 .unwrap_or_else(|| "Unknown error".to_string());
1094 Err(ClientError::ServerError(error_msg))
1095 },
1096 other => Err(ClientError::InvalidResponse(format!(
1097 "Expected Ack, got {:?}",
1098 other
1099 ))),
1100 }
1101 }
1102
1103 pub async fn commit_offset(
1105 &mut self,
1106 topic_id: u32,
1107 consumer_id: u64,
1108 offset: u64,
1109 ) -> Result<CommitResult> {
1110 let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
1111 let frame_bytes = encode_frame(&frame);
1112
1113 trace!(topic_id, consumer_id, offset, "Committing offset");
1114
1115 tokio::time::timeout(
1116 self.config.write_timeout,
1117 self.stream.write_all(&frame_bytes),
1118 )
1119 .await
1120 .map_err(|_| ClientError::Timeout)??;
1121
1122 let response = self.recv_frame().await?;
1123 self.parse_commit_response(response)
1124 }
1125
1126 fn parse_subscribe_response(&self, frame: Frame) -> Result<SubscribeResult> {
1127 match frame.frame_type {
1128 FrameType::Control(ControlCommand::SubscribeAck) => {
1129 let payload = frame.payload.ok_or_else(|| {
1130 ClientError::InvalidResponse("Empty subscribe response".to_string())
1131 })?;
1132
1133 if payload.len() < 16 {
1134 return Err(ClientError::ProtocolError(
1135 "Subscribe response too small".to_string(),
1136 ));
1137 }
1138
1139 let consumer_id = u64::from_le_bytes([
1140 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1141 payload[6], payload[7],
1142 ]);
1143 let start_offset = u64::from_le_bytes([
1144 payload[8],
1145 payload[9],
1146 payload[10],
1147 payload[11],
1148 payload[12],
1149 payload[13],
1150 payload[14],
1151 payload[15],
1152 ]);
1153
1154 Ok(SubscribeResult {
1155 consumer_id,
1156 start_offset,
1157 })
1158 },
1159 FrameType::Control(ControlCommand::ErrorResponse) => {
1160 let error_msg = frame
1161 .payload
1162 .map(|p| String::from_utf8_lossy(&p).to_string())
1163 .unwrap_or_else(|| "Unknown error".to_string());
1164 Err(ClientError::ServerError(error_msg))
1165 },
1166 other => Err(ClientError::InvalidResponse(format!(
1167 "Expected SubscribeAck, got {:?}",
1168 other
1169 ))),
1170 }
1171 }
1172
1173 fn parse_commit_response(&self, frame: Frame) -> Result<CommitResult> {
1174 match frame.frame_type {
1175 FrameType::Control(ControlCommand::CommitAck) => {
1176 let payload = frame.payload.ok_or_else(|| {
1177 ClientError::InvalidResponse("Empty commit response".to_string())
1178 })?;
1179
1180 if payload.len() < 16 {
1181 return Err(ClientError::ProtocolError(
1182 "Commit response too small".to_string(),
1183 ));
1184 }
1185
1186 let consumer_id = u64::from_le_bytes([
1187 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1188 payload[6], payload[7],
1189 ]);
1190 let committed_offset = u64::from_le_bytes([
1191 payload[8],
1192 payload[9],
1193 payload[10],
1194 payload[11],
1195 payload[12],
1196 payload[13],
1197 payload[14],
1198 payload[15],
1199 ]);
1200
1201 Ok(CommitResult {
1202 consumer_id,
1203 committed_offset,
1204 })
1205 },
1206 FrameType::Control(ControlCommand::ErrorResponse) => {
1207 let error_msg = frame
1208 .payload
1209 .map(|p| String::from_utf8_lossy(&p).to_string())
1210 .unwrap_or_else(|| "Unknown error".to_string());
1211 Err(ClientError::ServerError(error_msg))
1212 },
1213 other => Err(ClientError::InvalidResponse(format!(
1214 "Expected CommitAck, got {:?}",
1215 other
1216 ))),
1217 }
1218 }
1219
1220 fn parse_fetch_response(&self, frame: Frame) -> Result<FetchResult> {
1221 match frame.frame_type {
1222 FrameType::Control(ControlCommand::CatchingUp) => {
1223 let server_offset = frame
1224 .payload
1225 .as_ref()
1226 .filter(|p| p.len() >= 8)
1227 .map(|p| u64::from_le_bytes([p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7]]))
1228 .unwrap_or(0);
1229 Err(ClientError::ServerCatchingUp { server_offset })
1230 },
1231 FrameType::Control(ControlCommand::FetchResponse) => {
1232 let payload = frame.payload.ok_or_else(|| {
1233 ClientError::InvalidResponse("Empty fetch response".to_string())
1234 })?;
1235
1236 if payload.len() < 16 {
1237 return Err(ClientError::ProtocolError(
1238 "Fetch response too small".to_string(),
1239 ));
1240 }
1241
1242 let next_offset = u64::from_le_bytes([
1243 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1244 payload[6], payload[7],
1245 ]);
1246 let bytes_returned =
1247 u32::from_le_bytes([payload[8], payload[9], payload[10], payload[11]]);
1248 let record_count =
1249 u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
1250 let data = payload.slice(16..);
1251
1252 Ok(FetchResult {
1253 data,
1254 next_offset,
1255 bytes_returned,
1256 record_count,
1257 })
1258 },
1259 FrameType::Control(ControlCommand::ErrorResponse) => {
1260 let error_msg = frame
1261 .payload
1262 .map(|p| String::from_utf8_lossy(&p).to_string())
1263 .unwrap_or_else(|| "Unknown error".to_string());
1264 Err(ClientError::ServerError(error_msg))
1265 },
1266 other => Err(ClientError::InvalidResponse(format!(
1267 "Expected FetchResponse, got {:?}",
1268 other
1269 ))),
1270 }
1271 }
1272
1273 fn parse_delete_response(&self, frame: Frame) -> Result<()> {
1274 expect_success_response(frame)
1275 }
1276
1277 fn parse_retention_response(&self, frame: Frame) -> Result<()> {
1278 expect_success_response(frame)
1279 }
1280
1281 fn parse_topic_response(&self, frame: Frame) -> Result<TopicInfo> {
1282 match frame.frame_type {
1283 FrameType::Control(ControlCommand::TopicResponse) => {
1284 let payload = frame.payload.ok_or_else(|| {
1285 ClientError::InvalidResponse("Empty topic response".to_string())
1286 })?;
1287 let json: serde_json::Value = serde_json::from_slice(&payload)
1288 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
1289
1290 let retention = if json.get("retention").is_some() {
1291 Some(RetentionInfo {
1292 max_age_secs: json["retention"]["max_age_secs"].as_u64().unwrap_or(0),
1293 max_bytes: json["retention"]["max_bytes"].as_u64().unwrap_or(0),
1294 })
1295 } else {
1296 None
1297 };
1298
1299 Ok(TopicInfo {
1300 id: json["id"].as_u64().unwrap_or(0) as u32,
1301 name: json["name"].as_str().unwrap_or("").to_string(),
1302 created_at: json["created_at"].as_u64().unwrap_or(0),
1303 topic_epoch: json["topic_epoch"].as_u64().unwrap_or(1),
1304 retention,
1305 })
1306 },
1307 FrameType::Control(ControlCommand::ErrorResponse) => {
1308 let error_msg = frame
1309 .payload
1310 .map(|p| String::from_utf8_lossy(&p).to_string())
1311 .unwrap_or_else(|| "Unknown error".to_string());
1312 Err(ClientError::ServerError(error_msg))
1313 },
1314 other => Err(ClientError::InvalidResponse(format!(
1315 "Expected TopicResponse, got {:?}",
1316 other
1317 ))),
1318 }
1319 }
1320
1321 fn parse_topic_list_response(&self, frame: Frame) -> Result<Vec<TopicInfo>> {
1322 match frame.frame_type {
1323 FrameType::Control(ControlCommand::TopicResponse) => {
1324 let payload = frame.payload.ok_or_else(|| {
1325 ClientError::InvalidResponse("Empty topic list response".to_string())
1326 })?;
1327 let json: serde_json::Value = serde_json::from_slice(&payload)
1328 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
1329
1330 let topics = json["topics"]
1331 .as_array()
1332 .map(|arr| {
1333 arr.iter()
1334 .map(|t| {
1335 let retention = if t.get("retention").is_some() {
1336 Some(RetentionInfo {
1337 max_age_secs: t["retention"]["max_age_secs"]
1338 .as_u64()
1339 .unwrap_or(0),
1340 max_bytes: t["retention"]["max_bytes"]
1341 .as_u64()
1342 .unwrap_or(0),
1343 })
1344 } else {
1345 None
1346 };
1347 TopicInfo {
1348 id: t["id"].as_u64().unwrap_or(0) as u32,
1349 name: t["name"].as_str().unwrap_or("").to_string(),
1350 created_at: t["created_at"].as_u64().unwrap_or(0),
1351 topic_epoch: t["topic_epoch"].as_u64().unwrap_or(1),
1352 retention,
1353 }
1354 })
1355 .collect()
1356 })
1357 .unwrap_or_default();
1358
1359 Ok(topics)
1360 },
1361 FrameType::Control(ControlCommand::ErrorResponse) => {
1362 let error_msg = frame
1363 .payload
1364 .map(|p| String::from_utf8_lossy(&p).to_string())
1365 .unwrap_or_else(|| "Unknown error".to_string());
1366 Err(ClientError::ServerError(error_msg))
1367 },
1368 other => Err(ClientError::InvalidResponse(format!(
1369 "Expected TopicResponse, got {:?}",
1370 other
1371 ))),
1372 }
1373 }
1374
1375 async fn recv_frame(&mut self) -> Result<Frame> {
1387 const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
1389
1390 loop {
1391 if self.read_offset >= LWP_HEADER_SIZE {
1392 let payload_len = u32::from_le_bytes([
1394 self.read_buffer[32],
1395 self.read_buffer[33],
1396 self.read_buffer[34],
1397 self.read_buffer[35],
1398 ]) as usize;
1399 let total_frame_size = LWP_HEADER_SIZE + payload_len;
1400 if total_frame_size > MAX_FRAME_SIZE {
1401 return Err(ClientError::ServerError(format!(
1402 "Frame too large: {} bytes",
1403 total_frame_size
1404 )));
1405 }
1406 if total_frame_size > self.read_buffer.len() {
1407 self.read_buffer.resize(total_frame_size, 0);
1408 }
1409
1410 if let Some((frame, consumed)) = parse_frame(&self.read_buffer[..self.read_offset])?
1411 {
1412 self.read_buffer.copy_within(consumed..self.read_offset, 0);
1413 self.read_offset -= consumed;
1414 if self.read_buffer.len() > 64 * 1024 && self.read_offset < 64 * 1024 {
1416 self.read_buffer.resize(64 * 1024, 0);
1417 }
1418 return Ok(frame);
1419 }
1420 }
1421
1422 let n = tokio::time::timeout(
1423 self.config.read_timeout,
1424 self.stream.read(&mut self.read_buffer[self.read_offset..]),
1425 )
1426 .await
1427 .map_err(|_| ClientError::Timeout)??;
1428
1429 if n == 0 {
1430 return Err(ClientError::ConnectionClosed);
1431 }
1432
1433 self.read_offset += n;
1434 }
1435 }
1436
1437 pub fn config(&self) -> &ClientConfig {
1439 &self.config
1440 }
1441
1442 pub async fn close(mut self) -> Result<()> {
1444 self.stream.shutdown().await?;
1445 Ok(())
1446 }
1447}
1448
1449impl std::fmt::Debug for LanceClient {
1450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1451 f.debug_struct("LanceClient")
1452 .field("addr", &self.config.addr)
1453 .field("batch_id", &self.batch_id.load(Ordering::SeqCst))
1454 .finish()
1455 }
1456}