1use crate::error::{ClientError, Result};
2use crate::tls::TlsClientConfig;
3use bytes::Bytes;
4use lnc_network::{
5 ControlCommand, Frame, FrameType, LWP_HEADER_SIZE, TlsConnector, encode_frame, parse_frame,
6};
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
13use tokio::net::TcpStream;
14use tokio::net::lookup_host;
15use tracing::{debug, trace, warn};
16
17#[allow(clippy::large_enum_variant)]
19pub enum ClientStream {
20 Tcp(TcpStream),
22 Tls(tokio_rustls::client::TlsStream<TcpStream>),
24}
25
26impl AsyncRead for ClientStream {
27 fn poll_read(
31 self: Pin<&mut Self>,
32 cx: &mut Context<'_>,
33 buf: &mut ReadBuf<'_>,
34 ) -> Poll<std::io::Result<()>> {
35 match self.get_mut() {
36 ClientStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
37 ClientStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
38 }
39 }
40}
41
42impl AsyncWrite for ClientStream {
43 fn poll_write(
48 self: Pin<&mut Self>,
49 cx: &mut Context<'_>,
50 buf: &[u8],
51 ) -> Poll<std::io::Result<usize>> {
52 match self.get_mut() {
53 ClientStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
54 ClientStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
55 }
56 }
57
58 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
62 match self.get_mut() {
63 ClientStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
64 ClientStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
65 }
66 }
67
68 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
72 match self.get_mut() {
73 ClientStream::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
74 ClientStream::Tls(stream) => Pin::new(stream).poll_shutdown(cx),
75 }
76 }
77}
78
79fn extract_error_message(frame: &Frame) -> String {
81 frame
82 .payload
83 .as_ref()
84 .map(|p| String::from_utf8_lossy(p).to_string())
85 .unwrap_or_else(|| "Unknown error".to_string())
86}
87
88#[allow(dead_code)] fn expect_frame_type(frame: Frame, expected: ControlCommand, expected_name: &str) -> Result<Frame> {
91 match frame.frame_type {
92 FrameType::Control(cmd) if cmd == expected => Ok(frame),
93 FrameType::Control(ControlCommand::ErrorResponse) => {
94 Err(ClientError::ServerError(extract_error_message(&frame)))
95 },
96 other => Err(ClientError::InvalidResponse(format!(
97 "Expected {}, got {:?}",
98 expected_name, other
99 ))),
100 }
101}
102
103fn expect_success_response(frame: Frame) -> Result<()> {
105 match frame.frame_type {
106 FrameType::Control(ControlCommand::TopicResponse) => Ok(()),
107 FrameType::Control(ControlCommand::ErrorResponse) => {
108 Err(ClientError::ServerError(extract_error_message(&frame)))
109 },
110 other => Err(ClientError::InvalidResponse(format!(
111 "Expected TopicResponse, got {:?}",
112 other
113 ))),
114 }
115}
116
117#[derive(Debug, Clone, Default)]
119pub struct AuthConfig {
120 pub mtls_enabled: bool,
122 pub client_cert_path: Option<String>,
124 pub client_key_path: Option<String>,
126}
127
128#[derive(Debug, Clone, Default)]
130pub struct RetentionInfo {
131 pub max_age_secs: u64,
133 pub max_bytes: u64,
135}
136
137#[derive(Debug, Clone)]
139pub struct TopicInfo {
140 pub id: u32,
142 pub name: String,
144 pub created_at: u64,
146 pub topic_epoch: u64,
148 pub retention: Option<RetentionInfo>,
150}
151
152#[derive(Debug, Clone)]
154pub struct FetchResult {
155 pub data: Bytes,
157 pub next_offset: u64,
159 pub bytes_returned: u32,
161 pub record_count: u32,
163}
164
165#[derive(Debug, Clone)]
167pub struct SubscribeResult {
168 pub consumer_id: u64,
170 pub start_offset: u64,
172}
173
174#[derive(Debug, Clone)]
176pub struct CommitResult {
177 pub consumer_id: u64,
179 pub committed_offset: u64,
181}
182
183#[derive(Debug, Clone)]
185pub struct ClusterStatus {
186 pub node_id: u16,
187 pub is_leader: bool,
188 pub leader_id: Option<u16>,
189 pub current_term: u64,
190 pub node_count: usize,
191 pub healthy_nodes: usize,
192 pub quorum_available: bool,
193 pub peer_states: std::collections::HashMap<u16, String>,
194}
195
196#[derive(Debug, Clone)]
198pub struct ClientConfig {
199 pub addr: String,
201 pub connect_timeout: Duration,
203 pub read_timeout: Duration,
205 pub write_timeout: Duration,
207 pub keepalive_interval: Duration,
209 pub tls: Option<TlsClientConfig>,
211}
212
213impl Default for ClientConfig {
214 fn default() -> Self {
215 Self {
216 addr: "127.0.0.1:1992".to_string(),
217 connect_timeout: Duration::from_secs(10),
218 read_timeout: Duration::from_secs(30),
219 write_timeout: Duration::from_secs(10),
220 keepalive_interval: Duration::from_secs(10),
221 tls: None,
222 }
223 }
224}
225
226impl ClientConfig {
227 pub fn new(addr: impl Into<String>) -> Self {
233 Self {
234 addr: addr.into(),
235 ..Default::default()
236 }
237 }
238
239 pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
250 self.tls = Some(tls_config);
251 self
252 }
253
254 pub fn is_tls_enabled(&self) -> bool {
256 self.tls.is_some()
257 }
258}
259
260pub struct LanceClient {
264 stream: ClientStream,
265 config: ClientConfig,
266 batch_id: AtomicU64,
267 read_buffer: Vec<u8>,
268 read_offset: usize,
269}
270
271impl LanceClient {
272 async fn resolve_address(addr: &str) -> Result<SocketAddr> {
287 if let Ok(socket_addr) = addr.parse::<SocketAddr>() {
289 return Ok(socket_addr);
290 }
291
292 let mut addrs = lookup_host(addr).await.map_err(|e| {
294 ClientError::ProtocolError(format!("DNS resolution failed for '{}': {}", addr, e))
295 })?;
296
297 addrs
298 .next()
299 .ok_or_else(|| ClientError::ProtocolError(format!("No addresses found for '{}'", addr)))
300 }
301
302 pub async fn connect(config: ClientConfig) -> Result<Self> {
307 if let Some(ref tls_config) = config.tls {
309 return Self::connect_tls(config.clone(), tls_config.clone()).await;
310 }
311
312 debug!(addr = %config.addr, "Connecting to LANCE server");
313
314 let socket_addr = Self::resolve_address(&config.addr).await?;
316 debug!(resolved_addr = %socket_addr, "Resolved server address");
317
318 let stream = tokio::time::timeout(config.connect_timeout, TcpStream::connect(socket_addr))
319 .await
320 .map_err(|_| ClientError::Timeout)?
321 .map_err(ClientError::ConnectionFailed)?;
322
323 stream.set_nodelay(true)?;
324
325 debug!(addr = %config.addr, "Connected to LANCE server");
326
327 Ok(Self {
328 stream: ClientStream::Tcp(stream),
329 config,
330 batch_id: AtomicU64::new(1),
331 read_buffer: vec![0u8; 64 * 1024],
332 read_offset: 0,
333 })
334 }
335
336 pub async fn connect_tls(config: ClientConfig, tls_config: TlsClientConfig) -> Result<Self> {
353 debug!(addr = %config.addr, "Connecting to LANCE server with TLS");
354
355 let socket_addr = Self::resolve_address(&config.addr).await?;
357 debug!(resolved_addr = %socket_addr, "Resolved server address");
358
359 let tcp_stream =
361 tokio::time::timeout(config.connect_timeout, TcpStream::connect(socket_addr))
362 .await
363 .map_err(|_| ClientError::Timeout)?
364 .map_err(ClientError::ConnectionFailed)?;
365
366 tcp_stream.set_nodelay(true)?;
367
368 let network_config = tls_config.to_network_config();
370 let connector =
371 TlsConnector::new(network_config).map_err(|e| ClientError::TlsError(e.to_string()))?;
372
373 let server_name = tls_config.server_name.unwrap_or_else(|| {
375 config
377 .addr
378 .rsplit_once(':')
379 .map(|(host, _)| host.to_string())
380 .unwrap_or_else(|| socket_addr.ip().to_string())
381 });
382
383 let tls_stream = connector
385 .connect(&server_name, tcp_stream)
386 .await
387 .map_err(|e| ClientError::TlsError(e.to_string()))?;
388
389 debug!(addr = %config.addr, "TLS connection established");
390
391 Ok(Self {
392 stream: ClientStream::Tls(tls_stream),
393 config,
394 batch_id: AtomicU64::new(1),
395 read_buffer: vec![0u8; 64 * 1024],
396 read_offset: 0,
397 })
398 }
399
400 pub async fn connect_to(addr: &str) -> Result<Self> {
405 Self::connect(ClientConfig::new(addr)).await
406 }
407
408 pub async fn connect_tls_to(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
413 Self::connect_tls(ClientConfig::new(addr), tls_config).await
414 }
415
416 fn next_batch_id(&self) -> u64 {
417 self.batch_id.fetch_add(1, Ordering::SeqCst)
418 }
419
420 pub async fn send_ingest(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
434 self.send_ingest_to_topic(0, payload, record_count, None)
435 .await
436 }
437
438 pub async fn send_ingest_to_topic(
451 &mut self,
452 topic_id: u32,
453 payload: Bytes,
454 record_count: u32,
455 _auth_config: Option<&AuthConfig>,
456 ) -> Result<u64> {
457 let batch_id = self.next_batch_id();
458 let timestamp_ns = std::time::SystemTime::now()
459 .duration_since(std::time::UNIX_EPOCH)
460 .map(|d| d.as_nanos() as u64)
461 .unwrap_or(0);
462
463 let frame =
464 Frame::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, topic_id);
465 let frame_bytes = encode_frame(&frame);
466
467 trace!(
468 batch_id,
469 topic_id,
470 payload_len = frame.payload_length(),
471 "Sending ingest frame"
472 );
473
474 tokio::time::timeout(
475 self.config.write_timeout,
476 self.stream.write_all(&frame_bytes),
477 )
478 .await
479 .map_err(|_| ClientError::Timeout)??;
480
481 Ok(batch_id)
482 }
483
484 pub async fn send_ingest_sync(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
495 self.send_ingest_to_topic_sync(0, payload, record_count, None)
496 .await
497 }
498
499 pub async fn send_ingest_to_topic_sync(
512 &mut self,
513 topic_id: u32,
514 payload: Bytes,
515 record_count: u32,
516 auth_config: Option<&AuthConfig>,
517 ) -> Result<u64> {
518 let batch_id = self
519 .send_ingest_to_topic(topic_id, payload, record_count, auth_config)
520 .await?;
521 self.wait_for_ack(batch_id).await
522 }
523
524 async fn wait_for_ack(&mut self, expected_batch_id: u64) -> Result<u64> {
537 loop {
543 let frame = self.recv_frame().await?;
544
545 match frame.frame_type {
546 FrameType::Ack => {
547 let acked_id = frame.batch_id();
548 if acked_id == expected_batch_id {
549 trace!(batch_id = acked_id, "Received ack");
550 return Ok(acked_id);
551 }
552 if acked_id < expected_batch_id {
553 warn!(
554 expected = expected_batch_id,
555 received = acked_id,
556 "Draining stale ack with lower batch_id"
557 );
558 continue;
559 }
560 return Err(ClientError::InvalidResponse(format!(
562 "Ack batch_id mismatch: sent {}, received {} (ahead)",
563 expected_batch_id, acked_id
564 )));
565 },
566 FrameType::Control(ControlCommand::ErrorResponse) => {
567 let error_msg = frame
568 .payload
569 .map(|p| String::from_utf8_lossy(&p).to_string())
570 .unwrap_or_else(|| "Unknown error".to_string());
571 return Err(ClientError::ServerError(error_msg));
572 },
573 FrameType::Backpressure => {
574 warn!("Server signaled backpressure");
575 return Err(ClientError::ServerBackpressure);
576 },
577 other => {
578 return Err(ClientError::InvalidResponse(format!(
579 "Expected Ack, got {:?}",
580 other
581 )));
582 },
583 }
584 }
585 }
586
587 pub async fn recv_ack(&mut self) -> Result<u64> {
598 let frame = self.recv_frame().await?;
599
600 match frame.frame_type {
601 FrameType::Ack => {
602 trace!(batch_id = frame.batch_id(), "Received ack");
603 Ok(frame.batch_id())
604 },
605 FrameType::Backpressure => {
606 warn!("Server signaled backpressure");
607 Err(ClientError::ServerBackpressure)
608 },
609 other => Err(ClientError::InvalidResponse(format!(
610 "Expected Ack, got {:?}",
611 other
612 ))),
613 }
614 }
615
616 pub async fn send_keepalive(&mut self) -> Result<()> {
623 let frame = Frame::new_keepalive();
624 let frame_bytes = encode_frame(&frame);
625
626 trace!("Sending keepalive");
627
628 tokio::time::timeout(
629 self.config.write_timeout,
630 self.stream.write_all(&frame_bytes),
631 )
632 .await
633 .map_err(|_| ClientError::Timeout)??;
634
635 Ok(())
636 }
637
638 pub async fn recv_keepalive(&mut self) -> Result<()> {
648 let frame = self.recv_frame().await?;
649
650 match frame.frame_type {
651 FrameType::Keepalive => {
652 trace!("Received keepalive response");
653 Ok(())
654 },
655 other => Err(ClientError::InvalidResponse(format!(
656 "Expected Keepalive, got {:?}",
657 other
658 ))),
659 }
660 }
661
662 pub async fn ping(&mut self) -> Result<Duration> {
664 let start = std::time::Instant::now();
665 self.send_keepalive().await?;
666 self.recv_keepalive().await?;
667 Ok(start.elapsed())
668 }
669
670 pub async fn create_topic(&mut self, name: &str) -> Result<TopicInfo> {
672 const DEFAULT_CREATE_TOPIC_ATTEMPTS: usize = 20;
673 const DEFAULT_CREATE_TOPIC_BACKOFF_MS: u64 = 500;
674 self.ensure_topic(
675 name,
676 DEFAULT_CREATE_TOPIC_ATTEMPTS,
677 DEFAULT_CREATE_TOPIC_BACKOFF_MS,
678 )
679 .await
680 }
681
682 async fn create_topic_once(&mut self, name: &str) -> Result<TopicInfo> {
683 let frame = Frame::new_create_topic(name);
684 let frame_bytes = encode_frame(&frame);
685
686 trace!(topic_name = %name, "Creating topic");
687
688 tokio::time::timeout(
689 self.config.write_timeout,
690 self.stream.write_all(&frame_bytes),
691 )
692 .await
693 .map_err(|_| ClientError::Timeout)??;
694
695 let response = self.recv_frame().await?;
696 self.parse_topic_response(response)
697 }
698
699 pub async fn ensure_topic(
704 &mut self,
705 name: &str,
706 max_attempts: usize,
707 base_backoff_ms: u64,
708 ) -> Result<TopicInfo> {
709 let attempts = max_attempts.max(1);
710 let mut last_error: Option<ClientError> = None;
711 let mut saw_retryable_error = false;
712
713 for attempt in 1..=attempts {
714 let mut retryable_this_attempt = false;
715
716 match self.create_topic_once(name).await {
717 Ok(info) => {
718 trace!(
719 topic_id = info.id,
720 topic_name = %info.name,
721 attempt,
722 max_attempts = attempts,
723 "Topic ensured via create_topic"
724 );
725 return Ok(info);
726 },
727 Err(create_err) => {
728 if create_err.is_retryable() {
729 retryable_this_attempt = true;
730 saw_retryable_error = true;
731 }
732 last_error = Some(ClientError::ServerError(create_err.to_string()));
733 warn!(
734 topic_name = %name,
735 attempt,
736 max_attempts = attempts,
737 error = %create_err,
738 "create_topic failed during ensure_topic; retrying with list fallback"
739 );
740 },
741 }
742
743 match self.list_topics().await {
744 Ok(topics) => {
745 if let Some(topic) = topics.into_iter().find(|t| t.name == name) {
746 trace!(
747 topic_id = topic.id,
748 topic_name = %topic.name,
749 attempt,
750 max_attempts = attempts,
751 "Topic ensured via list_topics fallback"
752 );
753 return Ok(topic);
754 }
755 },
756 Err(list_err) => {
757 if list_err.is_retryable() {
758 retryable_this_attempt = true;
759 saw_retryable_error = true;
760 }
761 last_error = Some(ClientError::ServerError(list_err.to_string()));
762 warn!(
763 topic_name = %name,
764 attempt,
765 max_attempts = attempts,
766 error = %list_err,
767 "list_topics failed during ensure_topic"
768 );
769 },
770 }
771
772 if attempt < attempts {
773 let backoff_ms = if retryable_this_attempt {
774 base_backoff_ms.saturating_mul(attempt as u64).max(1)
775 } else {
776 base_backoff_ms.max(1)
778 };
779 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
780
781 let reconnect_config = self.config.clone();
784 match Self::connect(reconnect_config).await {
785 Ok(new_client) => {
786 *self = new_client;
787 },
788 Err(reconnect_err) => {
789 warn!(
790 topic_name = %name,
791 attempt,
792 max_attempts = attempts,
793 error = %reconnect_err,
794 "ensure_topic reconnect attempt failed"
795 );
796 last_error = Some(reconnect_err);
797 },
798 }
799 }
800 }
801
802 if let Some(err) = last_error {
803 return Err(ClientError::ServerError(format!(
804 "ensure_topic('{}') failed after {} attempts: {}",
805 name, attempts, err
806 )));
807 }
808
809 if saw_retryable_error {
810 return Err(ClientError::ServerError(format!(
811 "ensure_topic('{}') exhausted {} retryable attempts",
812 name, attempts
813 )));
814 }
815
816 Err(ClientError::ServerError(format!(
817 "topic '{}' not found after {} ensure_topic attempts",
818 name, attempts
819 )))
820 }
821
822 pub async fn ensure_topic_default(&mut self, name: &str) -> Result<TopicInfo> {
824 const DEFAULT_ENSURE_TOPIC_ATTEMPTS: usize = 20;
825 const DEFAULT_ENSURE_TOPIC_BACKOFF_MS: u64 = 500;
826 self.ensure_topic(
827 name,
828 DEFAULT_ENSURE_TOPIC_ATTEMPTS,
829 DEFAULT_ENSURE_TOPIC_BACKOFF_MS,
830 )
831 .await
832 }
833
834 pub async fn list_topics(&mut self) -> Result<Vec<TopicInfo>> {
836 let frame = Frame::new_list_topics();
837 let frame_bytes = encode_frame(&frame);
838
839 trace!("Listing topics");
840
841 tokio::time::timeout(
842 self.config.write_timeout,
843 self.stream.write_all(&frame_bytes),
844 )
845 .await
846 .map_err(|_| ClientError::Timeout)??;
847
848 let response = self.recv_frame().await?;
849 self.parse_topic_list_response(response)
850 }
851
852 pub async fn get_topic(&mut self, topic_id: u32) -> Result<TopicInfo> {
854 let frame = Frame::new_get_topic(topic_id);
855 let frame_bytes = encode_frame(&frame);
856
857 trace!(topic_id, "Getting topic");
858
859 tokio::time::timeout(
860 self.config.write_timeout,
861 self.stream.write_all(&frame_bytes),
862 )
863 .await
864 .map_err(|_| ClientError::Timeout)??;
865
866 let response = self.recv_frame().await?;
867 self.parse_topic_response(response)
868 }
869
870 pub async fn delete_topic(&mut self, topic_id: u32) -> Result<()> {
872 let frame = Frame::new_delete_topic(topic_id);
873 let frame_bytes = encode_frame(&frame);
874
875 trace!(topic_id, "Deleting topic");
876
877 tokio::time::timeout(
878 self.config.write_timeout,
879 self.stream.write_all(&frame_bytes),
880 )
881 .await
882 .map_err(|_| ClientError::Timeout)??;
883
884 let response = self.recv_frame().await?;
885 self.parse_delete_response(response)
886 }
887
888 pub async fn set_retention(
896 &mut self,
897 topic_id: u32,
898 max_age_secs: u64,
899 max_bytes: u64,
900 ) -> Result<()> {
901 let frame = Frame::new_set_retention(topic_id, max_age_secs, max_bytes);
902 let frame_bytes = encode_frame(&frame);
903
904 trace!(
905 topic_id,
906 max_age_secs, max_bytes, "Setting retention policy"
907 );
908
909 tokio::time::timeout(
910 self.config.write_timeout,
911 self.stream.write_all(&frame_bytes),
912 )
913 .await
914 .map_err(|_| ClientError::Timeout)??;
915
916 let response = self.recv_frame().await?;
917 self.parse_retention_response(response)
918 }
919
920 pub async fn create_topic_with_retention(
927 &mut self,
928 name: &str,
929 max_age_secs: u64,
930 max_bytes: u64,
931 ) -> Result<TopicInfo> {
932 let frame = Frame::new_create_topic_with_retention(name, max_age_secs, max_bytes);
933 let frame_bytes = encode_frame(&frame);
934
935 trace!(
936 name,
937 max_age_secs, max_bytes, "Creating topic with retention"
938 );
939
940 tokio::time::timeout(
941 self.config.write_timeout,
942 self.stream.write_all(&frame_bytes),
943 )
944 .await
945 .map_err(|_| ClientError::Timeout)??;
946
947 let response = self.recv_frame().await?;
948 self.parse_topic_response(response)
949 }
950
951 pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus> {
953 let frame = Frame::new_get_cluster_status();
954 let frame_bytes = encode_frame(&frame);
955
956 tokio::time::timeout(
957 self.config.write_timeout,
958 self.stream.write_all(&frame_bytes),
959 )
960 .await
961 .map_err(|_| ClientError::Timeout)??;
962
963 let response = self.recv_frame().await?;
964 self.parse_cluster_status_response(response)
965 }
966
967 fn parse_cluster_status_response(&self, frame: Frame) -> Result<ClusterStatus> {
968 match frame.frame_type {
969 FrameType::Control(ControlCommand::ClusterStatusResponse) => {
970 let payload = frame.payload.ok_or_else(|| {
971 ClientError::InvalidResponse("Empty cluster status response".to_string())
972 })?;
973 let json: serde_json::Value = serde_json::from_slice(&payload)
974 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
975
976 let peer_states: std::collections::HashMap<u16, String> = json["peer_states"]
977 .as_object()
978 .map(|obj| {
979 obj.iter()
980 .filter_map(|(k, v)| {
981 k.parse::<u16>()
982 .ok()
983 .map(|id| (id, v.as_str().unwrap_or("unknown").to_string()))
984 })
985 .collect()
986 })
987 .unwrap_or_default();
988
989 Ok(ClusterStatus {
990 node_id: json["node_id"].as_u64().unwrap_or(0) as u16,
991 is_leader: json["is_leader"].as_bool().unwrap_or(false),
992 leader_id: json["leader_id"].as_u64().map(|id| id as u16),
993 current_term: json["current_term"].as_u64().unwrap_or(0),
994 node_count: json["node_count"].as_u64().unwrap_or(1) as usize,
995 healthy_nodes: json["healthy_nodes"].as_u64().unwrap_or(1) as usize,
996 quorum_available: json["quorum_available"].as_bool().unwrap_or(true),
997 peer_states,
998 })
999 },
1000 FrameType::Control(ControlCommand::ErrorResponse) => {
1001 let error_msg = frame
1002 .payload
1003 .map(|p| String::from_utf8_lossy(&p).to_string())
1004 .unwrap_or_else(|| "Unknown error".to_string());
1005 Err(ClientError::ServerError(error_msg))
1006 },
1007 other => Err(ClientError::InvalidResponse(format!(
1008 "Expected ClusterStatusResponse, got {:?}",
1009 other
1010 ))),
1011 }
1012 }
1013
1014 pub async fn fetch(
1017 &mut self,
1018 topic_id: u32,
1019 start_offset: u64,
1020 max_bytes: u32,
1021 ) -> Result<FetchResult> {
1022 let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
1023 let frame_bytes = encode_frame(&frame);
1024
1025 trace!(topic_id, start_offset, max_bytes, "Fetching data");
1026
1027 tokio::time::timeout(
1028 self.config.write_timeout,
1029 self.stream.write_all(&frame_bytes),
1030 )
1031 .await
1032 .map_err(|_| ClientError::Timeout)??;
1033
1034 let response = self.recv_frame().await?;
1035 self.parse_fetch_response(response)
1036 }
1037
1038 pub async fn subscribe(
1055 &mut self,
1056 topic_id: u32,
1057 start_offset: u64,
1058 max_batch_bytes: u32,
1059 consumer_id: u64,
1060 ) -> Result<SubscribeResult> {
1061 let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
1062 let frame_bytes = encode_frame(&frame);
1063
1064 trace!(topic_id, start_offset, consumer_id, "Subscribing to topic");
1065
1066 tokio::time::timeout(
1067 self.config.write_timeout,
1068 self.stream.write_all(&frame_bytes),
1069 )
1070 .await
1071 .map_err(|_| ClientError::Timeout)??;
1072
1073 let response = self.recv_frame().await?;
1074 self.parse_subscribe_response(response)
1075 }
1076
1077 pub async fn unsubscribe(&mut self, topic_id: u32, consumer_id: u64) -> Result<()> {
1091 let frame = Frame::new_unsubscribe(topic_id, consumer_id);
1092 let frame_bytes = encode_frame(&frame);
1093
1094 trace!(topic_id, consumer_id, "Unsubscribing from topic");
1095
1096 tokio::time::timeout(
1097 self.config.write_timeout,
1098 self.stream.write_all(&frame_bytes),
1099 )
1100 .await
1101 .map_err(|_| ClientError::Timeout)??;
1102
1103 let response = self.recv_frame().await?;
1105 match response.frame_type {
1106 FrameType::Ack => Ok(()),
1107 FrameType::Control(ControlCommand::ErrorResponse) => {
1108 let error_msg = response
1109 .payload
1110 .map(|p| String::from_utf8_lossy(&p).to_string())
1111 .unwrap_or_else(|| "Unknown error".to_string());
1112 Err(ClientError::ServerError(error_msg))
1113 },
1114 other => Err(ClientError::InvalidResponse(format!(
1115 "Expected Ack, got {:?}",
1116 other
1117 ))),
1118 }
1119 }
1120
1121 pub async fn commit_offset(
1123 &mut self,
1124 topic_id: u32,
1125 consumer_id: u64,
1126 offset: u64,
1127 ) -> Result<CommitResult> {
1128 let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
1129 let frame_bytes = encode_frame(&frame);
1130
1131 trace!(topic_id, consumer_id, offset, "Committing offset");
1132
1133 tokio::time::timeout(
1134 self.config.write_timeout,
1135 self.stream.write_all(&frame_bytes),
1136 )
1137 .await
1138 .map_err(|_| ClientError::Timeout)??;
1139
1140 let response = self.recv_frame().await?;
1141 self.parse_commit_response(response)
1142 }
1143
1144 fn parse_subscribe_response(&self, frame: Frame) -> Result<SubscribeResult> {
1145 match frame.frame_type {
1146 FrameType::Control(ControlCommand::SubscribeAck) => {
1147 let payload = frame.payload.ok_or_else(|| {
1148 ClientError::InvalidResponse("Empty subscribe response".to_string())
1149 })?;
1150
1151 if payload.len() < 16 {
1152 return Err(ClientError::ProtocolError(
1153 "Subscribe response too small".to_string(),
1154 ));
1155 }
1156
1157 let consumer_id = u64::from_le_bytes([
1158 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1159 payload[6], payload[7],
1160 ]);
1161 let start_offset = u64::from_le_bytes([
1162 payload[8],
1163 payload[9],
1164 payload[10],
1165 payload[11],
1166 payload[12],
1167 payload[13],
1168 payload[14],
1169 payload[15],
1170 ]);
1171
1172 Ok(SubscribeResult {
1173 consumer_id,
1174 start_offset,
1175 })
1176 },
1177 FrameType::Control(ControlCommand::ErrorResponse) => {
1178 let error_msg = frame
1179 .payload
1180 .map(|p| String::from_utf8_lossy(&p).to_string())
1181 .unwrap_or_else(|| "Unknown error".to_string());
1182 Err(ClientError::ServerError(error_msg))
1183 },
1184 other => Err(ClientError::InvalidResponse(format!(
1185 "Expected SubscribeAck, got {:?}",
1186 other
1187 ))),
1188 }
1189 }
1190
1191 fn parse_commit_response(&self, frame: Frame) -> Result<CommitResult> {
1192 match frame.frame_type {
1193 FrameType::Control(ControlCommand::CommitAck) => {
1194 let payload = frame.payload.ok_or_else(|| {
1195 ClientError::InvalidResponse("Empty commit response".to_string())
1196 })?;
1197
1198 if payload.len() < 16 {
1199 return Err(ClientError::ProtocolError(
1200 "Commit response too small".to_string(),
1201 ));
1202 }
1203
1204 let consumer_id = u64::from_le_bytes([
1205 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1206 payload[6], payload[7],
1207 ]);
1208 let committed_offset = u64::from_le_bytes([
1209 payload[8],
1210 payload[9],
1211 payload[10],
1212 payload[11],
1213 payload[12],
1214 payload[13],
1215 payload[14],
1216 payload[15],
1217 ]);
1218
1219 Ok(CommitResult {
1220 consumer_id,
1221 committed_offset,
1222 })
1223 },
1224 FrameType::Control(ControlCommand::ErrorResponse) => {
1225 let error_msg = frame
1226 .payload
1227 .map(|p| String::from_utf8_lossy(&p).to_string())
1228 .unwrap_or_else(|| "Unknown error".to_string());
1229 Err(ClientError::ServerError(error_msg))
1230 },
1231 other => Err(ClientError::InvalidResponse(format!(
1232 "Expected CommitAck, got {:?}",
1233 other
1234 ))),
1235 }
1236 }
1237
1238 fn parse_fetch_response(&self, frame: Frame) -> Result<FetchResult> {
1239 match frame.frame_type {
1240 FrameType::Control(ControlCommand::CatchingUp) => {
1241 let server_offset = frame
1242 .payload
1243 .as_ref()
1244 .filter(|p| p.len() >= 8)
1245 .map(|p| u64::from_le_bytes([p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7]]))
1246 .unwrap_or(0);
1247 Err(ClientError::ServerCatchingUp { server_offset })
1248 },
1249 FrameType::Control(ControlCommand::FetchResponse) => {
1250 let payload = frame.payload.ok_or_else(|| {
1251 ClientError::InvalidResponse("Empty fetch response".to_string())
1252 })?;
1253
1254 if payload.len() < 16 {
1255 return Err(ClientError::ProtocolError(
1256 "Fetch response too small".to_string(),
1257 ));
1258 }
1259
1260 let next_offset = u64::from_le_bytes([
1261 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
1262 payload[6], payload[7],
1263 ]);
1264 let bytes_returned =
1265 u32::from_le_bytes([payload[8], payload[9], payload[10], payload[11]]);
1266 let record_count =
1267 u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
1268 let data = payload.slice(16..);
1269
1270 Ok(FetchResult {
1271 data,
1272 next_offset,
1273 bytes_returned,
1274 record_count,
1275 })
1276 },
1277 FrameType::Control(ControlCommand::ErrorResponse) => {
1278 let error_msg = frame
1279 .payload
1280 .map(|p| String::from_utf8_lossy(&p).to_string())
1281 .unwrap_or_else(|| "Unknown error".to_string());
1282 Err(ClientError::ServerError(error_msg))
1283 },
1284 other => Err(ClientError::InvalidResponse(format!(
1285 "Expected FetchResponse, got {:?}",
1286 other
1287 ))),
1288 }
1289 }
1290
1291 fn parse_delete_response(&self, frame: Frame) -> Result<()> {
1292 expect_success_response(frame)
1293 }
1294
1295 fn parse_retention_response(&self, frame: Frame) -> Result<()> {
1296 expect_success_response(frame)
1297 }
1298
1299 fn parse_topic_response(&self, frame: Frame) -> Result<TopicInfo> {
1300 match frame.frame_type {
1301 FrameType::Control(ControlCommand::TopicResponse) => {
1302 let payload = frame.payload.ok_or_else(|| {
1303 ClientError::InvalidResponse("Empty topic response".to_string())
1304 })?;
1305 let json: serde_json::Value = serde_json::from_slice(&payload)
1306 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
1307
1308 let retention = if json.get("retention").is_some() {
1309 Some(RetentionInfo {
1310 max_age_secs: json["retention"]["max_age_secs"].as_u64().unwrap_or(0),
1311 max_bytes: json["retention"]["max_bytes"].as_u64().unwrap_or(0),
1312 })
1313 } else {
1314 None
1315 };
1316
1317 Ok(TopicInfo {
1318 id: json["id"].as_u64().unwrap_or(0) as u32,
1319 name: json["name"].as_str().unwrap_or("").to_string(),
1320 created_at: json["created_at"].as_u64().unwrap_or(0),
1321 topic_epoch: json["topic_epoch"].as_u64().unwrap_or(1),
1322 retention,
1323 })
1324 },
1325 FrameType::Control(ControlCommand::ErrorResponse) => {
1326 let error_msg = frame
1327 .payload
1328 .map(|p| String::from_utf8_lossy(&p).to_string())
1329 .unwrap_or_else(|| "Unknown error".to_string());
1330 Err(ClientError::ServerError(error_msg))
1331 },
1332 other => Err(ClientError::InvalidResponse(format!(
1333 "Expected TopicResponse, got {:?}",
1334 other
1335 ))),
1336 }
1337 }
1338
1339 fn parse_topic_list_response(&self, frame: Frame) -> Result<Vec<TopicInfo>> {
1340 match frame.frame_type {
1341 FrameType::Control(ControlCommand::TopicResponse) => {
1342 let payload = frame.payload.ok_or_else(|| {
1343 ClientError::InvalidResponse("Empty topic list response".to_string())
1344 })?;
1345 let json: serde_json::Value = serde_json::from_slice(&payload)
1346 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
1347
1348 let topics = json["topics"]
1349 .as_array()
1350 .map(|arr| {
1351 arr.iter()
1352 .map(|t| {
1353 let retention = if t.get("retention").is_some() {
1354 Some(RetentionInfo {
1355 max_age_secs: t["retention"]["max_age_secs"]
1356 .as_u64()
1357 .unwrap_or(0),
1358 max_bytes: t["retention"]["max_bytes"]
1359 .as_u64()
1360 .unwrap_or(0),
1361 })
1362 } else {
1363 None
1364 };
1365 TopicInfo {
1366 id: t["id"].as_u64().unwrap_or(0) as u32,
1367 name: t["name"].as_str().unwrap_or("").to_string(),
1368 created_at: t["created_at"].as_u64().unwrap_or(0),
1369 topic_epoch: t["topic_epoch"].as_u64().unwrap_or(1),
1370 retention,
1371 }
1372 })
1373 .collect()
1374 })
1375 .unwrap_or_default();
1376
1377 Ok(topics)
1378 },
1379 FrameType::Control(ControlCommand::ErrorResponse) => {
1380 let error_msg = frame
1381 .payload
1382 .map(|p| String::from_utf8_lossy(&p).to_string())
1383 .unwrap_or_else(|| "Unknown error".to_string());
1384 Err(ClientError::ServerError(error_msg))
1385 },
1386 other => Err(ClientError::InvalidResponse(format!(
1387 "Expected TopicResponse, got {:?}",
1388 other
1389 ))),
1390 }
1391 }
1392
1393 async fn recv_frame(&mut self) -> Result<Frame> {
1405 const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
1407
1408 loop {
1409 if self.read_offset >= LWP_HEADER_SIZE {
1410 let payload_len = u32::from_le_bytes([
1412 self.read_buffer[32],
1413 self.read_buffer[33],
1414 self.read_buffer[34],
1415 self.read_buffer[35],
1416 ]) as usize;
1417 let total_frame_size = LWP_HEADER_SIZE + payload_len;
1418 if total_frame_size > MAX_FRAME_SIZE {
1419 return Err(ClientError::ServerError(format!(
1420 "Frame too large: {} bytes",
1421 total_frame_size
1422 )));
1423 }
1424 if total_frame_size > self.read_buffer.len() {
1425 self.read_buffer.resize(total_frame_size, 0);
1426 }
1427
1428 if let Some((frame, consumed)) = parse_frame(&self.read_buffer[..self.read_offset])?
1429 {
1430 self.read_buffer.copy_within(consumed..self.read_offset, 0);
1431 self.read_offset -= consumed;
1432 if self.read_buffer.len() > 64 * 1024 && self.read_offset < 64 * 1024 {
1434 self.read_buffer.resize(64 * 1024, 0);
1435 }
1436 return Ok(frame);
1437 }
1438 }
1439
1440 let n = tokio::time::timeout(
1441 self.config.read_timeout,
1442 self.stream.read(&mut self.read_buffer[self.read_offset..]),
1443 )
1444 .await
1445 .map_err(|_| ClientError::Timeout)??;
1446
1447 if n == 0 {
1448 return Err(ClientError::ConnectionClosed);
1449 }
1450
1451 self.read_offset += n;
1452 }
1453 }
1454
1455 pub fn config(&self) -> &ClientConfig {
1457 &self.config
1458 }
1459
1460 pub async fn close(mut self) -> Result<()> {
1462 self.stream.shutdown().await?;
1463 Ok(())
1464 }
1465}
1466
1467impl std::fmt::Debug for LanceClient {
1468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1469 f.debug_struct("LanceClient")
1470 .field("addr", &self.config.addr)
1471 .field("batch_id", &self.batch_id.load(Ordering::SeqCst))
1472 .finish()
1473 }
1474}