1use crate::error::{ClientError, Result};
2use crate::tls::TlsClientConfig;
3use bytes::Bytes;
4use lnc_network::{
5 ControlCommand, Frame, FrameType, LWP_HEADER_SIZE, TlsConnector, encode_frame, parse_frame,
6};
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
13use tokio::net::TcpStream;
14use tracing::{debug, trace, warn};
15
16#[allow(clippy::large_enum_variant)]
18pub enum ClientStream {
19 Tcp(TcpStream),
21 Tls(tokio_rustls::client::TlsStream<TcpStream>),
23}
24
25impl AsyncRead for ClientStream {
26 fn poll_read(
27 self: Pin<&mut Self>,
28 cx: &mut Context<'_>,
29 buf: &mut ReadBuf<'_>,
30 ) -> Poll<std::io::Result<()>> {
31 match self.get_mut() {
32 ClientStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
33 ClientStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
34 }
35 }
36}
37
38impl AsyncWrite for ClientStream {
39 fn poll_write(
40 self: Pin<&mut Self>,
41 cx: &mut Context<'_>,
42 buf: &[u8],
43 ) -> Poll<std::io::Result<usize>> {
44 match self.get_mut() {
45 ClientStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
46 ClientStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
47 }
48 }
49
50 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
51 match self.get_mut() {
52 ClientStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
53 ClientStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
54 }
55 }
56
57 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
58 match self.get_mut() {
59 ClientStream::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
60 ClientStream::Tls(stream) => Pin::new(stream).poll_shutdown(cx),
61 }
62 }
63}
64
65fn extract_error_message(frame: &Frame) -> String {
67 frame
68 .payload
69 .as_ref()
70 .map(|p| String::from_utf8_lossy(p).to_string())
71 .unwrap_or_else(|| "Unknown error".to_string())
72}
73
74#[allow(dead_code)] fn expect_frame_type(frame: Frame, expected: ControlCommand, expected_name: &str) -> Result<Frame> {
77 match frame.frame_type {
78 FrameType::Control(cmd) if cmd == expected => Ok(frame),
79 FrameType::Control(ControlCommand::ErrorResponse) => {
80 Err(ClientError::ServerError(extract_error_message(&frame)))
81 },
82 other => Err(ClientError::InvalidResponse(format!(
83 "Expected {}, got {:?}",
84 expected_name, other
85 ))),
86 }
87}
88
89fn expect_success_response(frame: Frame) -> Result<()> {
91 match frame.frame_type {
92 FrameType::Control(ControlCommand::TopicResponse) => Ok(()),
93 FrameType::Control(ControlCommand::ErrorResponse) => {
94 Err(ClientError::ServerError(extract_error_message(&frame)))
95 },
96 other => Err(ClientError::InvalidResponse(format!(
97 "Expected TopicResponse, got {:?}",
98 other
99 ))),
100 }
101}
102
103#[derive(Debug, Clone, Default)]
104pub struct AuthConfig {
105 pub mtls_enabled: bool,
106 pub client_cert_path: Option<String>,
107 pub client_key_path: Option<String>,
108}
109
110#[derive(Debug, Clone, Default)]
112pub struct RetentionInfo {
113 pub max_age_secs: u64,
115 pub max_bytes: u64,
117}
118
119#[derive(Debug, Clone)]
120pub struct TopicInfo {
121 pub id: u32,
122 pub name: String,
123 pub created_at: u64,
124 pub retention: Option<RetentionInfo>,
126}
127
128#[derive(Debug, Clone)]
130pub struct FetchResult {
131 pub data: Bytes,
132 pub next_offset: u64,
133 pub bytes_returned: u32,
134 pub record_count: u32,
135}
136
137#[derive(Debug, Clone)]
139pub struct SubscribeResult {
140 pub consumer_id: u64,
141 pub start_offset: u64,
142}
143
144#[derive(Debug, Clone)]
146pub struct CommitResult {
147 pub consumer_id: u64,
148 pub committed_offset: u64,
149}
150
151#[derive(Debug, Clone)]
153pub struct ClusterStatus {
154 pub node_id: u16,
155 pub is_leader: bool,
156 pub leader_id: Option<u16>,
157 pub current_term: u64,
158 pub node_count: usize,
159 pub healthy_nodes: usize,
160 pub quorum_available: bool,
161 pub peer_states: std::collections::HashMap<u16, String>,
162}
163
164#[derive(Debug, Clone)]
165pub struct ClientConfig {
166 pub addr: SocketAddr,
167 pub connect_timeout: Duration,
168 pub read_timeout: Duration,
169 pub write_timeout: Duration,
170 pub keepalive_interval: Duration,
171 pub tls: Option<TlsClientConfig>,
173}
174
175impl Default for ClientConfig {
176 fn default() -> Self {
177 Self {
178 addr: "127.0.0.1:1992"
179 .parse()
180 .unwrap_or_else(|_| SocketAddr::from(([127, 0, 0, 1], 1992))),
181 connect_timeout: Duration::from_secs(10),
182 read_timeout: Duration::from_secs(30),
183 write_timeout: Duration::from_secs(10),
184 keepalive_interval: Duration::from_secs(10),
185 tls: None,
186 }
187 }
188}
189
190impl ClientConfig {
191 pub fn new(addr: SocketAddr) -> Self {
192 Self {
193 addr,
194 ..Default::default()
195 }
196 }
197
198 pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
200 self.tls = Some(tls_config);
201 self
202 }
203
204 pub fn is_tls_enabled(&self) -> bool {
206 self.tls.is_some()
207 }
208}
209
210pub struct LanceClient {
211 stream: ClientStream,
212 config: ClientConfig,
213 batch_id: AtomicU64,
214 read_buffer: Vec<u8>,
215 read_offset: usize,
216}
217
218impl LanceClient {
219 pub async fn connect(config: ClientConfig) -> Result<Self> {
221 if let Some(ref tls_config) = config.tls {
223 return Self::connect_tls(config.clone(), tls_config.clone()).await;
224 }
225
226 debug!(addr = %config.addr, "Connecting to LANCE server");
227
228 let stream = tokio::time::timeout(config.connect_timeout, TcpStream::connect(config.addr))
229 .await
230 .map_err(|_| ClientError::Timeout)?
231 .map_err(ClientError::ConnectionFailed)?;
232
233 stream.set_nodelay(true)?;
234
235 debug!(addr = %config.addr, "Connected to LANCE server");
236
237 Ok(Self {
238 stream: ClientStream::Tcp(stream),
239 config,
240 batch_id: AtomicU64::new(0),
241 read_buffer: vec![0u8; 64 * 1024],
242 read_offset: 0,
243 })
244 }
245
246 pub async fn connect_tls(config: ClientConfig, tls_config: TlsClientConfig) -> Result<Self> {
263 debug!(addr = %config.addr, "Connecting to LANCE server with TLS");
264
265 let tcp_stream =
267 tokio::time::timeout(config.connect_timeout, TcpStream::connect(config.addr))
268 .await
269 .map_err(|_| ClientError::Timeout)?
270 .map_err(ClientError::ConnectionFailed)?;
271
272 tcp_stream.set_nodelay(true)?;
273
274 let network_config = tls_config.to_network_config();
276 let connector =
277 TlsConnector::new(network_config).map_err(|e| ClientError::TlsError(e.to_string()))?;
278
279 let server_name = tls_config
281 .server_name
282 .unwrap_or_else(|| config.addr.ip().to_string());
283
284 let tls_stream = connector
286 .connect(&server_name, tcp_stream)
287 .await
288 .map_err(|e| ClientError::TlsError(e.to_string()))?;
289
290 debug!(addr = %config.addr, "TLS connection established");
291
292 Ok(Self {
293 stream: ClientStream::Tls(tls_stream),
294 config,
295 batch_id: AtomicU64::new(0),
296 read_buffer: vec![0u8; 64 * 1024],
297 read_offset: 0,
298 })
299 }
300
301 pub async fn connect_to(addr: &str) -> Result<Self> {
302 let socket_addr: SocketAddr = addr
303 .parse()
304 .map_err(|e| ClientError::ProtocolError(format!("Invalid address: {}", e)))?;
305 Self::connect(ClientConfig::new(socket_addr)).await
306 }
307
308 pub async fn connect_tls_to(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
310 let socket_addr: SocketAddr = addr
311 .parse()
312 .map_err(|e| ClientError::ProtocolError(format!("Invalid address: {}", e)))?;
313 Self::connect_tls(ClientConfig::new(socket_addr), tls_config).await
314 }
315
316 fn next_batch_id(&self) -> u64 {
317 self.batch_id.fetch_add(1, Ordering::SeqCst) + 1
318 }
319
320 pub async fn send_ingest(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
321 self.send_ingest_to_topic(0, payload, record_count, None)
322 .await
323 }
324
325 pub async fn send_ingest_to_topic(
326 &mut self,
327 topic_id: u32,
328 payload: Bytes,
329 record_count: u32,
330 _auth_config: Option<&AuthConfig>,
331 ) -> Result<u64> {
332 let batch_id = self.next_batch_id();
333 let timestamp_ns = std::time::SystemTime::now()
334 .duration_since(std::time::UNIX_EPOCH)
335 .map(|d| d.as_nanos() as u64)
336 .unwrap_or(0);
337
338 let frame =
339 Frame::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, topic_id);
340 let frame_bytes = encode_frame(&frame);
341
342 trace!(
343 batch_id,
344 topic_id,
345 payload_len = frame.payload_length(),
346 "Sending ingest frame"
347 );
348
349 tokio::time::timeout(
350 self.config.write_timeout,
351 self.stream.write_all(&frame_bytes),
352 )
353 .await
354 .map_err(|_| ClientError::Timeout)??;
355
356 Ok(batch_id)
357 }
358
359 pub async fn send_ingest_sync(&mut self, payload: Bytes, record_count: u32) -> Result<u64> {
360 self.send_ingest_to_topic_sync(0, payload, record_count, None)
361 .await
362 }
363
364 pub async fn send_ingest_to_topic_sync(
365 &mut self,
366 topic_id: u32,
367 payload: Bytes,
368 record_count: u32,
369 auth_config: Option<&AuthConfig>,
370 ) -> Result<u64> {
371 let batch_id = self
372 .send_ingest_to_topic(topic_id, payload, record_count, auth_config)
373 .await?;
374 self.wait_for_ack(batch_id).await
375 }
376
377 async fn wait_for_ack(&mut self, expected_batch_id: u64) -> Result<u64> {
378 let frame = self.recv_frame().await?;
379
380 match frame.frame_type {
381 FrameType::Ack => {
382 let acked_id = frame.batch_id();
383 if acked_id != expected_batch_id {
384 return Err(ClientError::InvalidResponse(format!(
385 "Ack batch_id mismatch: sent {}, received {}",
386 expected_batch_id, acked_id
387 )));
388 }
389 trace!(batch_id = acked_id, "Received ack");
390 Ok(acked_id)
391 },
392 FrameType::Control(ControlCommand::ErrorResponse) => {
393 let error_msg = frame
394 .payload
395 .map(|p| String::from_utf8_lossy(&p).to_string())
396 .unwrap_or_else(|| "Unknown error".to_string());
397 Err(ClientError::ServerError(error_msg))
398 },
399 FrameType::Backpressure => {
400 warn!("Server signaled backpressure");
401 Err(ClientError::ServerBackpressure)
402 },
403 other => Err(ClientError::InvalidResponse(format!(
404 "Expected Ack, got {:?}",
405 other
406 ))),
407 }
408 }
409
410 pub async fn recv_ack(&mut self) -> Result<u64> {
411 let frame = self.recv_frame().await?;
412
413 match frame.frame_type {
414 FrameType::Ack => {
415 trace!(batch_id = frame.batch_id(), "Received ack");
416 Ok(frame.batch_id())
417 },
418 FrameType::Backpressure => {
419 warn!("Server signaled backpressure");
420 Err(ClientError::ServerBackpressure)
421 },
422 other => Err(ClientError::InvalidResponse(format!(
423 "Expected Ack, got {:?}",
424 other
425 ))),
426 }
427 }
428
429 pub async fn send_keepalive(&mut self) -> Result<()> {
430 let frame = Frame::new_keepalive();
431 let frame_bytes = encode_frame(&frame);
432
433 trace!("Sending keepalive");
434
435 tokio::time::timeout(
436 self.config.write_timeout,
437 self.stream.write_all(&frame_bytes),
438 )
439 .await
440 .map_err(|_| ClientError::Timeout)??;
441
442 Ok(())
443 }
444
445 pub async fn recv_keepalive(&mut self) -> Result<()> {
446 let frame = self.recv_frame().await?;
447
448 match frame.frame_type {
449 FrameType::Keepalive => {
450 trace!("Received keepalive response");
451 Ok(())
452 },
453 other => Err(ClientError::InvalidResponse(format!(
454 "Expected Keepalive, got {:?}",
455 other
456 ))),
457 }
458 }
459
460 pub async fn ping(&mut self) -> Result<Duration> {
461 let start = std::time::Instant::now();
462 self.send_keepalive().await?;
463 self.recv_keepalive().await?;
464 Ok(start.elapsed())
465 }
466
467 pub async fn create_topic(&mut self, name: &str) -> Result<TopicInfo> {
468 let frame = Frame::new_create_topic(name);
469 let frame_bytes = encode_frame(&frame);
470
471 trace!(topic_name = %name, "Creating topic");
472
473 tokio::time::timeout(
474 self.config.write_timeout,
475 self.stream.write_all(&frame_bytes),
476 )
477 .await
478 .map_err(|_| ClientError::Timeout)??;
479
480 let response = self.recv_frame().await?;
481 self.parse_topic_response(response)
482 }
483
484 pub async fn list_topics(&mut self) -> Result<Vec<TopicInfo>> {
485 let frame = Frame::new_list_topics();
486 let frame_bytes = encode_frame(&frame);
487
488 trace!("Listing topics");
489
490 tokio::time::timeout(
491 self.config.write_timeout,
492 self.stream.write_all(&frame_bytes),
493 )
494 .await
495 .map_err(|_| ClientError::Timeout)??;
496
497 let response = self.recv_frame().await?;
498 self.parse_topic_list_response(response)
499 }
500
501 pub async fn get_topic(&mut self, topic_id: u32) -> Result<TopicInfo> {
502 let frame = Frame::new_get_topic(topic_id);
503 let frame_bytes = encode_frame(&frame);
504
505 trace!(topic_id, "Getting topic");
506
507 tokio::time::timeout(
508 self.config.write_timeout,
509 self.stream.write_all(&frame_bytes),
510 )
511 .await
512 .map_err(|_| ClientError::Timeout)??;
513
514 let response = self.recv_frame().await?;
515 self.parse_topic_response(response)
516 }
517
518 pub async fn delete_topic(&mut self, topic_id: u32) -> Result<()> {
519 let frame = Frame::new_delete_topic(topic_id);
520 let frame_bytes = encode_frame(&frame);
521
522 trace!(topic_id, "Deleting topic");
523
524 tokio::time::timeout(
525 self.config.write_timeout,
526 self.stream.write_all(&frame_bytes),
527 )
528 .await
529 .map_err(|_| ClientError::Timeout)??;
530
531 let response = self.recv_frame().await?;
532 self.parse_delete_response(response)
533 }
534
535 pub async fn set_retention(
542 &mut self,
543 topic_id: u32,
544 max_age_secs: u64,
545 max_bytes: u64,
546 ) -> Result<()> {
547 let frame = Frame::new_set_retention(topic_id, max_age_secs, max_bytes);
548 let frame_bytes = encode_frame(&frame);
549
550 trace!(
551 topic_id,
552 max_age_secs, max_bytes, "Setting retention policy"
553 );
554
555 tokio::time::timeout(
556 self.config.write_timeout,
557 self.stream.write_all(&frame_bytes),
558 )
559 .await
560 .map_err(|_| ClientError::Timeout)??;
561
562 let response = self.recv_frame().await?;
563 self.parse_retention_response(response)
564 }
565
566 pub async fn create_topic_with_retention(
573 &mut self,
574 name: &str,
575 max_age_secs: u64,
576 max_bytes: u64,
577 ) -> Result<TopicInfo> {
578 let frame = Frame::new_create_topic_with_retention(name, max_age_secs, max_bytes);
579 let frame_bytes = encode_frame(&frame);
580
581 trace!(
582 name,
583 max_age_secs, max_bytes, "Creating topic with retention"
584 );
585
586 tokio::time::timeout(
587 self.config.write_timeout,
588 self.stream.write_all(&frame_bytes),
589 )
590 .await
591 .map_err(|_| ClientError::Timeout)??;
592
593 let response = self.recv_frame().await?;
594 self.parse_topic_response(response)
595 }
596
597 pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus> {
599 let frame = Frame::new_get_cluster_status();
600 let frame_bytes = encode_frame(&frame);
601
602 tokio::time::timeout(
603 self.config.write_timeout,
604 self.stream.write_all(&frame_bytes),
605 )
606 .await
607 .map_err(|_| ClientError::Timeout)??;
608
609 let response = self.recv_frame().await?;
610 self.parse_cluster_status_response(response)
611 }
612
613 fn parse_cluster_status_response(&self, frame: Frame) -> Result<ClusterStatus> {
614 match frame.frame_type {
615 FrameType::Control(ControlCommand::ClusterStatusResponse) => {
616 let payload = frame.payload.ok_or_else(|| {
617 ClientError::InvalidResponse("Empty cluster status response".to_string())
618 })?;
619 let json: serde_json::Value = serde_json::from_slice(&payload)
620 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
621
622 let peer_states: std::collections::HashMap<u16, String> = json["peer_states"]
623 .as_object()
624 .map(|obj| {
625 obj.iter()
626 .filter_map(|(k, v)| {
627 k.parse::<u16>()
628 .ok()
629 .map(|id| (id, v.as_str().unwrap_or("unknown").to_string()))
630 })
631 .collect()
632 })
633 .unwrap_or_default();
634
635 Ok(ClusterStatus {
636 node_id: json["node_id"].as_u64().unwrap_or(0) as u16,
637 is_leader: json["is_leader"].as_bool().unwrap_or(false),
638 leader_id: json["leader_id"].as_u64().map(|id| id as u16),
639 current_term: json["current_term"].as_u64().unwrap_or(0),
640 node_count: json["node_count"].as_u64().unwrap_or(1) as usize,
641 healthy_nodes: json["healthy_nodes"].as_u64().unwrap_or(1) as usize,
642 quorum_available: json["quorum_available"].as_bool().unwrap_or(true),
643 peer_states,
644 })
645 },
646 FrameType::Control(ControlCommand::ErrorResponse) => {
647 let error_msg = frame
648 .payload
649 .map(|p| String::from_utf8_lossy(&p).to_string())
650 .unwrap_or_else(|| "Unknown error".to_string());
651 Err(ClientError::ServerError(error_msg))
652 },
653 other => Err(ClientError::InvalidResponse(format!(
654 "Expected ClusterStatusResponse, got {:?}",
655 other
656 ))),
657 }
658 }
659
660 pub async fn fetch(
663 &mut self,
664 topic_id: u32,
665 start_offset: u64,
666 max_bytes: u32,
667 ) -> Result<FetchResult> {
668 let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
669 let frame_bytes = encode_frame(&frame);
670
671 trace!(topic_id, start_offset, max_bytes, "Fetching data");
672
673 tokio::time::timeout(
674 self.config.write_timeout,
675 self.stream.write_all(&frame_bytes),
676 )
677 .await
678 .map_err(|_| ClientError::Timeout)??;
679
680 let response = self.recv_frame().await?;
681 self.parse_fetch_response(response)
682 }
683
684 pub async fn subscribe(
687 &mut self,
688 topic_id: u32,
689 start_offset: u64,
690 max_batch_bytes: u32,
691 consumer_id: u64,
692 ) -> Result<SubscribeResult> {
693 let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
694 let frame_bytes = encode_frame(&frame);
695
696 trace!(topic_id, start_offset, consumer_id, "Subscribing to topic");
697
698 tokio::time::timeout(
699 self.config.write_timeout,
700 self.stream.write_all(&frame_bytes),
701 )
702 .await
703 .map_err(|_| ClientError::Timeout)??;
704
705 let response = self.recv_frame().await?;
706 self.parse_subscribe_response(response)
707 }
708
709 pub async fn unsubscribe(&mut self, topic_id: u32, consumer_id: u64) -> Result<()> {
711 let frame = Frame::new_unsubscribe(topic_id, consumer_id);
712 let frame_bytes = encode_frame(&frame);
713
714 trace!(topic_id, consumer_id, "Unsubscribing from topic");
715
716 tokio::time::timeout(
717 self.config.write_timeout,
718 self.stream.write_all(&frame_bytes),
719 )
720 .await
721 .map_err(|_| ClientError::Timeout)??;
722
723 let response = self.recv_frame().await?;
725 match response.frame_type {
726 FrameType::Ack => Ok(()),
727 FrameType::Control(ControlCommand::ErrorResponse) => {
728 let error_msg = response
729 .payload
730 .map(|p| String::from_utf8_lossy(&p).to_string())
731 .unwrap_or_else(|| "Unknown error".to_string());
732 Err(ClientError::ServerError(error_msg))
733 },
734 other => Err(ClientError::InvalidResponse(format!(
735 "Expected Ack, got {:?}",
736 other
737 ))),
738 }
739 }
740
741 pub async fn commit_offset(
743 &mut self,
744 topic_id: u32,
745 consumer_id: u64,
746 offset: u64,
747 ) -> Result<CommitResult> {
748 let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
749 let frame_bytes = encode_frame(&frame);
750
751 trace!(topic_id, consumer_id, offset, "Committing offset");
752
753 tokio::time::timeout(
754 self.config.write_timeout,
755 self.stream.write_all(&frame_bytes),
756 )
757 .await
758 .map_err(|_| ClientError::Timeout)??;
759
760 let response = self.recv_frame().await?;
761 self.parse_commit_response(response)
762 }
763
764 fn parse_subscribe_response(&self, frame: Frame) -> Result<SubscribeResult> {
765 match frame.frame_type {
766 FrameType::Control(ControlCommand::SubscribeAck) => {
767 let payload = frame.payload.ok_or_else(|| {
768 ClientError::InvalidResponse("Empty subscribe response".to_string())
769 })?;
770
771 if payload.len() < 16 {
772 return Err(ClientError::ProtocolError(
773 "Subscribe response too small".to_string(),
774 ));
775 }
776
777 let consumer_id = u64::from_le_bytes([
778 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
779 payload[6], payload[7],
780 ]);
781 let start_offset = u64::from_le_bytes([
782 payload[8],
783 payload[9],
784 payload[10],
785 payload[11],
786 payload[12],
787 payload[13],
788 payload[14],
789 payload[15],
790 ]);
791
792 Ok(SubscribeResult {
793 consumer_id,
794 start_offset,
795 })
796 },
797 FrameType::Control(ControlCommand::ErrorResponse) => {
798 let error_msg = frame
799 .payload
800 .map(|p| String::from_utf8_lossy(&p).to_string())
801 .unwrap_or_else(|| "Unknown error".to_string());
802 Err(ClientError::ServerError(error_msg))
803 },
804 other => Err(ClientError::InvalidResponse(format!(
805 "Expected SubscribeAck, got {:?}",
806 other
807 ))),
808 }
809 }
810
811 fn parse_commit_response(&self, frame: Frame) -> Result<CommitResult> {
812 match frame.frame_type {
813 FrameType::Control(ControlCommand::CommitAck) => {
814 let payload = frame.payload.ok_or_else(|| {
815 ClientError::InvalidResponse("Empty commit response".to_string())
816 })?;
817
818 if payload.len() < 16 {
819 return Err(ClientError::ProtocolError(
820 "Commit response too small".to_string(),
821 ));
822 }
823
824 let consumer_id = u64::from_le_bytes([
825 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
826 payload[6], payload[7],
827 ]);
828 let committed_offset = u64::from_le_bytes([
829 payload[8],
830 payload[9],
831 payload[10],
832 payload[11],
833 payload[12],
834 payload[13],
835 payload[14],
836 payload[15],
837 ]);
838
839 Ok(CommitResult {
840 consumer_id,
841 committed_offset,
842 })
843 },
844 FrameType::Control(ControlCommand::ErrorResponse) => {
845 let error_msg = frame
846 .payload
847 .map(|p| String::from_utf8_lossy(&p).to_string())
848 .unwrap_or_else(|| "Unknown error".to_string());
849 Err(ClientError::ServerError(error_msg))
850 },
851 other => Err(ClientError::InvalidResponse(format!(
852 "Expected CommitAck, got {:?}",
853 other
854 ))),
855 }
856 }
857
858 fn parse_fetch_response(&self, frame: Frame) -> Result<FetchResult> {
859 match frame.frame_type {
860 FrameType::Control(ControlCommand::FetchResponse) => {
861 let payload = frame.payload.ok_or_else(|| {
862 ClientError::InvalidResponse("Empty fetch response".to_string())
863 })?;
864
865 if payload.len() < 16 {
866 return Err(ClientError::ProtocolError(
867 "Fetch response too small".to_string(),
868 ));
869 }
870
871 let next_offset = u64::from_le_bytes([
872 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5],
873 payload[6], payload[7],
874 ]);
875 let bytes_returned =
876 u32::from_le_bytes([payload[8], payload[9], payload[10], payload[11]]);
877 let record_count =
878 u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
879 let data = payload.slice(16..);
880
881 Ok(FetchResult {
882 data,
883 next_offset,
884 bytes_returned,
885 record_count,
886 })
887 },
888 FrameType::Control(ControlCommand::ErrorResponse) => {
889 let error_msg = frame
890 .payload
891 .map(|p| String::from_utf8_lossy(&p).to_string())
892 .unwrap_or_else(|| "Unknown error".to_string());
893 Err(ClientError::ServerError(error_msg))
894 },
895 other => Err(ClientError::InvalidResponse(format!(
896 "Expected FetchResponse, got {:?}",
897 other
898 ))),
899 }
900 }
901
902 fn parse_delete_response(&self, frame: Frame) -> Result<()> {
903 expect_success_response(frame)
904 }
905
906 fn parse_retention_response(&self, frame: Frame) -> Result<()> {
907 expect_success_response(frame)
908 }
909
910 fn parse_topic_response(&self, frame: Frame) -> Result<TopicInfo> {
911 match frame.frame_type {
912 FrameType::Control(ControlCommand::TopicResponse) => {
913 let payload = frame.payload.ok_or_else(|| {
914 ClientError::InvalidResponse("Empty topic response".to_string())
915 })?;
916 let json: serde_json::Value = serde_json::from_slice(&payload)
917 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
918
919 let retention = if json.get("retention").is_some() {
920 Some(RetentionInfo {
921 max_age_secs: json["retention"]["max_age_secs"].as_u64().unwrap_or(0),
922 max_bytes: json["retention"]["max_bytes"].as_u64().unwrap_or(0),
923 })
924 } else {
925 None
926 };
927
928 Ok(TopicInfo {
929 id: json["id"].as_u64().unwrap_or(0) as u32,
930 name: json["name"].as_str().unwrap_or("").to_string(),
931 created_at: json["created_at"].as_u64().unwrap_or(0),
932 retention,
933 })
934 },
935 FrameType::Control(ControlCommand::ErrorResponse) => {
936 let error_msg = frame
937 .payload
938 .map(|p| String::from_utf8_lossy(&p).to_string())
939 .unwrap_or_else(|| "Unknown error".to_string());
940 Err(ClientError::ServerError(error_msg))
941 },
942 other => Err(ClientError::InvalidResponse(format!(
943 "Expected TopicResponse, got {:?}",
944 other
945 ))),
946 }
947 }
948
949 fn parse_topic_list_response(&self, frame: Frame) -> Result<Vec<TopicInfo>> {
950 match frame.frame_type {
951 FrameType::Control(ControlCommand::TopicResponse) => {
952 let payload = frame.payload.ok_or_else(|| {
953 ClientError::InvalidResponse("Empty topic list response".to_string())
954 })?;
955 let json: serde_json::Value = serde_json::from_slice(&payload)
956 .map_err(|e| ClientError::ProtocolError(format!("Invalid JSON: {}", e)))?;
957
958 let topics = json["topics"]
959 .as_array()
960 .map(|arr| {
961 arr.iter()
962 .map(|t| {
963 let retention = if t.get("retention").is_some() {
964 Some(RetentionInfo {
965 max_age_secs: t["retention"]["max_age_secs"]
966 .as_u64()
967 .unwrap_or(0),
968 max_bytes: t["retention"]["max_bytes"]
969 .as_u64()
970 .unwrap_or(0),
971 })
972 } else {
973 None
974 };
975 TopicInfo {
976 id: t["id"].as_u64().unwrap_or(0) as u32,
977 name: t["name"].as_str().unwrap_or("").to_string(),
978 created_at: t["created_at"].as_u64().unwrap_or(0),
979 retention,
980 }
981 })
982 .collect()
983 })
984 .unwrap_or_default();
985
986 Ok(topics)
987 },
988 FrameType::Control(ControlCommand::ErrorResponse) => {
989 let error_msg = frame
990 .payload
991 .map(|p| String::from_utf8_lossy(&p).to_string())
992 .unwrap_or_else(|| "Unknown error".to_string());
993 Err(ClientError::ServerError(error_msg))
994 },
995 other => Err(ClientError::InvalidResponse(format!(
996 "Expected TopicResponse, got {:?}",
997 other
998 ))),
999 }
1000 }
1001
1002 async fn recv_frame(&mut self) -> Result<Frame> {
1003 loop {
1004 if self.read_offset >= LWP_HEADER_SIZE {
1005 if let Some((frame, consumed)) = parse_frame(&self.read_buffer[..self.read_offset])?
1006 {
1007 self.read_buffer.copy_within(consumed..self.read_offset, 0);
1008 self.read_offset -= consumed;
1009 return Ok(frame);
1010 }
1011 }
1012
1013 let n = tokio::time::timeout(
1014 self.config.read_timeout,
1015 self.stream.read(&mut self.read_buffer[self.read_offset..]),
1016 )
1017 .await
1018 .map_err(|_| ClientError::Timeout)??;
1019
1020 if n == 0 {
1021 return Err(ClientError::ConnectionClosed);
1022 }
1023
1024 self.read_offset += n;
1025 }
1026 }
1027
1028 pub fn config(&self) -> &ClientConfig {
1029 &self.config
1030 }
1031
1032 pub async fn close(mut self) -> Result<()> {
1033 self.stream.shutdown().await?;
1034 Ok(())
1035 }
1036}
1037
1038impl std::fmt::Debug for LanceClient {
1039 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1040 f.debug_struct("LanceClient")
1041 .field("addr", &self.config.addr)
1042 .field("batch_id", &self.batch_id.load(Ordering::SeqCst))
1043 .finish()
1044 }
1045}