1use std::time::Duration;
19
20#[cfg(feature = "watch")]
21use std::sync::Arc;
22
23use bytes::Bytes;
24use d_engine_core::MaybeCloneOneshot;
25use d_engine_core::RaftEvent;
26use d_engine_core::RaftOneshot;
27use d_engine_core::ScanResult;
28use d_engine_core::client::{
29 ClientApi, ClientApiError, ClientApiResult, ClientReadRequest, ClientResponsePayload,
30 ClientWriteRequest, ErrorCode, LeaderHint, ReadResults, WriteOperation,
31};
32use d_engine_core::config::ReadConsistencyPolicy;
33use tokio::sync::mpsc;
34
35#[cfg(feature = "watch")]
36use d_engine_core::watch::WatchRegistry;
37
38fn channel_closed_error() -> ClientApiError {
43 ClientApiError::Network {
44 code: ErrorCode::ConnectionTimeout,
45 message: "Channel closed, node may be shutting down".to_string(),
46 retry_after_ms: None,
47 leader_hint: None,
48 }
49}
50
51fn timeout_error(duration: Duration) -> ClientApiError {
52 ClientApiError::Network {
53 code: ErrorCode::ConnectionTimeout,
54 message: format!("Operation timed out after {duration:?}"),
55 retry_after_ms: Some(1000),
56 leader_hint: None,
57 }
58}
59
60fn not_leader_error(
61 leader_id: Option<String>,
62 leader_address: Option<String>,
63 retry_after_ms: Option<u64>,
64) -> ClientApiError {
65 let message = match (&leader_address, &leader_id) {
66 (Some(addr), _) => format!("Not leader, try leader at: {addr}"),
67 (None, Some(id)) => format!("Not leader, leader_id: {id}"),
68 (None, None) => "Not leader".to_string(),
69 };
70
71 let leader_hint = match (&leader_id, &leader_address) {
72 (Some(id_str), Some(addr)) => id_str.parse::<u32>().ok().map(|id| LeaderHint {
73 leader_id: id,
74 address: addr.clone(),
75 }),
76 _ => None,
77 };
78
79 ClientApiError::Network {
80 code: ErrorCode::NotLeader,
81 message,
82 retry_after_ms: retry_after_ms.or(Some(100)),
83 leader_hint,
84 }
85}
86
87fn server_error(msg: String) -> ClientApiError {
88 ClientApiError::Business {
89 code: ErrorCode::Uncategorized,
90 message: msg,
91 required_action: None,
92 }
93}
94
95fn extract_read_payload(result: Option<ClientResponsePayload>) -> ClientApiResult<ReadResults> {
102 match result {
103 Some(ClientResponsePayload::Read(r)) => Ok(r),
104 Some(ClientResponsePayload::Write(_)) => Err(ClientApiError::Protocol {
105 code: ErrorCode::InvalidResponse,
106 message: "expected ReadData payload, got WriteResult".to_string(),
107 supported_versions: None,
108 }),
109 None => Err(ClientApiError::Protocol {
110 code: ErrorCode::InvalidResponse,
111 message: "expected ReadData payload, got None".to_string(),
112 supported_versions: None,
113 }),
114 }
115}
116
117#[derive(Clone)]
121pub struct EmbeddedClient {
122 event_tx: mpsc::Sender<RaftEvent>,
123 cmd_tx: mpsc::Sender<d_engine_core::ClientCmd>,
124 client_id: u32,
125 timeout: Duration,
126 #[cfg(feature = "watch")]
127 watch_registry: Option<Arc<WatchRegistry>>,
128}
129
130impl EmbeddedClient {
131 pub(crate) fn new_internal(
133 event_tx: mpsc::Sender<RaftEvent>,
134 cmd_tx: mpsc::Sender<d_engine_core::ClientCmd>,
135 client_id: u32,
136 timeout: Duration,
137 ) -> Self {
138 Self {
139 event_tx,
140 cmd_tx,
141 client_id,
142 timeout,
143 #[cfg(feature = "watch")]
144 watch_registry: None,
145 }
146 }
147
148 #[cfg(feature = "watch")]
150 pub(crate) fn with_watch_registry(
151 mut self,
152 registry: Arc<WatchRegistry>,
153 ) -> Self {
154 self.watch_registry = Some(registry);
155 self
156 }
157
158 fn map_error_response(
159 error: ErrorCode,
160 leader_hint: Option<LeaderHint>,
161 retry_after_ms: Option<u64>,
162 ) -> ClientApiError {
163 match error {
164 ErrorCode::NotLeader => {
165 let (leader_id, leader_address) = if let Some(hint) = leader_hint {
166 (Some(hint.leader_id.to_string()), Some(hint.address))
167 } else {
168 (None, None)
169 };
170 not_leader_error(leader_id, leader_address, retry_after_ms)
171 }
172 _ => server_error(format!("Error code: {error:?}")),
173 }
174 }
175
176 pub async fn put(
182 &self,
183 key: impl AsRef<[u8]>,
184 value: impl AsRef<[u8]>,
185 ) -> ClientApiResult<()> {
186 let request = ClientWriteRequest {
187 client_id: self.client_id,
188 command: Some(WriteOperation::Insert {
189 key: Bytes::copy_from_slice(key.as_ref()),
190 value: Bytes::copy_from_slice(value.as_ref()),
191 ttl_secs: None,
192 }),
193 };
194
195 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
196
197 self.cmd_tx
198 .send(d_engine_core::ClientCmd::Propose(request, resp_tx))
199 .await
200 .map_err(|_| channel_closed_error())?;
201
202 let result = tokio::time::timeout(self.timeout, resp_rx)
203 .await
204 .map_err(|_| timeout_error(self.timeout))?
205 .map_err(|_| channel_closed_error())?;
206
207 let response =
208 result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
209
210 if response.error != ErrorCode::Success {
211 return Err(Self::map_error_response(
212 response.error,
213 response.leader_hint,
214 response.retry_after_ms,
215 ));
216 }
217
218 Ok(())
219 }
220
221 pub async fn get_linearizable(
239 &self,
240 key: impl AsRef<[u8]>,
241 ) -> ClientApiResult<Option<Bytes>> {
242 self.get_with_consistency(key, ReadConsistencyPolicy::LinearizableRead).await
243 }
244
245 pub async fn get_eventual(
265 &self,
266 key: impl AsRef<[u8]>,
267 ) -> ClientApiResult<Option<Bytes>> {
268 self.get_with_consistency(key, ReadConsistencyPolicy::EventualConsistency).await
269 }
270
271 pub async fn get_with_consistency(
290 &self,
291 key: impl AsRef<[u8]>,
292 consistency: ReadConsistencyPolicy,
293 ) -> ClientApiResult<Option<Bytes>> {
294 let request = ClientReadRequest {
295 client_id: self.client_id,
296 keys: vec![Bytes::copy_from_slice(key.as_ref())],
297 consistency_policy: Some(consistency),
298 };
299
300 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
301
302 self.cmd_tx
303 .send(d_engine_core::ClientCmd::Read(request, resp_tx))
304 .await
305 .map_err(|_| channel_closed_error())?;
306
307 let result = tokio::time::timeout(self.timeout, resp_rx)
308 .await
309 .map_err(|_| timeout_error(self.timeout))?
310 .map_err(|_| channel_closed_error())?;
311
312 let response =
313 result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
314
315 if response.error != ErrorCode::Success {
316 return Err(Self::map_error_response(
317 response.error,
318 response.leader_hint,
319 response.retry_after_ms,
320 ));
321 }
322
323 let read_results = extract_read_payload(response.result)?;
324 Ok(read_results.entries.first().map(|e| e.value.clone()))
325 }
326
327 pub async fn get_multi_linearizable(
337 &self,
338 keys: &[Bytes],
339 ) -> ClientApiResult<Vec<Option<Bytes>>> {
340 self.get_multi_with_consistency(keys, ReadConsistencyPolicy::LinearizableRead)
341 .await
342 }
343
344 pub async fn get_multi_eventual(
354 &self,
355 keys: &[Bytes],
356 ) -> ClientApiResult<Vec<Option<Bytes>>> {
357 self.get_multi_with_consistency(keys, ReadConsistencyPolicy::EventualConsistency)
358 .await
359 }
360
361 pub async fn get_multi_with_consistency(
363 &self,
364 keys: &[Bytes],
365 consistency: ReadConsistencyPolicy,
366 ) -> ClientApiResult<Vec<Option<Bytes>>> {
367 let request = ClientReadRequest {
368 client_id: self.client_id,
369 keys: keys.to_vec(),
370 consistency_policy: Some(consistency),
371 };
372
373 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
374
375 self.cmd_tx
376 .send(d_engine_core::ClientCmd::Read(request, resp_tx))
377 .await
378 .map_err(|_| channel_closed_error())?;
379
380 let result = tokio::time::timeout(self.timeout, resp_rx)
381 .await
382 .map_err(|_| timeout_error(self.timeout))?
383 .map_err(|_| channel_closed_error())?;
384
385 let response =
386 result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
387
388 if response.error != ErrorCode::Success {
389 return Err(Self::map_error_response(
390 response.error,
391 response.leader_hint,
392 response.retry_after_ms,
393 ));
394 }
395
396 let read_results = extract_read_payload(response.result)?;
397 let results_by_key: std::collections::HashMap<_, _> =
401 read_results.entries.into_iter().map(|e| (e.key, e.value)).collect();
402 Ok(keys.iter().map(|k| results_by_key.get(k).cloned()).collect())
403 }
404
405 pub async fn delete(
411 &self,
412 key: impl AsRef<[u8]>,
413 ) -> ClientApiResult<()> {
414 let request = ClientWriteRequest {
415 client_id: self.client_id,
416 command: Some(WriteOperation::Delete {
417 key: Bytes::copy_from_slice(key.as_ref()),
418 }),
419 };
420
421 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
422
423 self.cmd_tx
424 .send(d_engine_core::ClientCmd::Propose(request, resp_tx))
425 .await
426 .map_err(|_| channel_closed_error())?;
427
428 let result = tokio::time::timeout(self.timeout, resp_rx)
429 .await
430 .map_err(|_| timeout_error(self.timeout))?
431 .map_err(|_| channel_closed_error())?;
432
433 let response =
434 result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
435
436 if response.error != ErrorCode::Success {
437 return Err(Self::map_error_response(
438 response.error,
439 response.leader_hint,
440 response.retry_after_ms,
441 ));
442 }
443
444 Ok(())
445 }
446
447 pub fn client_id(&self) -> u32 {
449 self.client_id
450 }
451
452 pub fn timeout(&self) -> Duration {
454 self.timeout
455 }
456
457 #[cfg(feature = "watch")]
484 pub fn watch(
485 &self,
486 key: impl AsRef<[u8]>,
487 ) -> ClientApiResult<d_engine_core::watch::WatcherHandle> {
488 self.watch_with_options(key, false)
489 }
490
491 #[cfg(feature = "watch")]
543 pub fn watch_with_options(
544 &self,
545 key: impl AsRef<[u8]>,
546 prev_kv: bool,
547 ) -> ClientApiResult<d_engine_core::watch::WatcherHandle> {
548 let registry = self.watch_registry.as_ref().ok_or_else(|| ClientApiError::Business {
549 code: ErrorCode::Uncategorized,
550 message: "Watch feature disabled (WatchRegistry not initialized)".to_string(),
551 required_action: None,
552 })?;
553
554 let key_bytes = Bytes::copy_from_slice(key.as_ref());
555 registry.register(key_bytes, prev_kv).map_err(|e| ClientApiError::Business {
556 code: ErrorCode::Uncategorized,
557 message: e.to_string(),
558 required_action: None,
559 })
560 }
561
562 #[cfg(feature = "watch")]
567 pub fn watch_prefix(
568 &self,
569 prefix: impl AsRef<[u8]>,
570 ) -> ClientApiResult<d_engine_core::watch::WatcherHandle> {
571 self.watch_prefix_with_options(prefix, false)
572 }
573
574 #[cfg(feature = "watch")]
607 pub fn watch_prefix_with_options(
608 &self,
609 prefix: impl AsRef<[u8]>,
610 prev_kv: bool,
611 ) -> ClientApiResult<d_engine_core::watch::WatcherHandle> {
612 let registry = self.watch_registry.as_ref().ok_or_else(|| ClientApiError::Business {
613 code: ErrorCode::Uncategorized,
614 message: "Watch feature disabled (WatchRegistry not initialized)".to_string(),
615 required_action: None,
616 })?;
617
618 let prefix_bytes = Bytes::copy_from_slice(prefix.as_ref());
619 registry
620 .register_prefix(prefix_bytes, prev_kv)
621 .map_err(|e| ClientApiError::Business {
622 code: ErrorCode::Uncategorized,
623 message: e.to_string(),
624 required_action: None,
625 })
626 }
627
628 pub async fn scan_prefix(
634 &self,
635 prefix: impl AsRef<[u8]>,
636 ) -> ClientApiResult<ScanResult> {
637 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
638
639 self.cmd_tx
640 .send(d_engine_core::ClientCmd::Scan(
641 Bytes::copy_from_slice(prefix.as_ref()),
642 resp_tx,
643 ))
644 .await
645 .map_err(|_| channel_closed_error())?;
646
647 let result = tokio::time::timeout(self.timeout, resp_rx)
648 .await
649 .map_err(|_| timeout_error(self.timeout))?
650 .map_err(|_| channel_closed_error())?;
651
652 result.map_err(|status| server_error(format!("RPC error: {}", status.message())))
653 }
654
655 async fn get_cluster_membership(
657 &self
658 ) -> ClientApiResult<d_engine_proto::server::cluster::ClusterMembership> {
659 let request = d_engine_proto::server::cluster::MetadataRequest {};
660
661 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
662
663 self.event_tx
664 .send(RaftEvent::ClusterConf(request, resp_tx))
665 .await
666 .map_err(|_| channel_closed_error())?;
667
668 let result = tokio::time::timeout(self.timeout, resp_rx)
669 .await
670 .map_err(|_| timeout_error(self.timeout))?
671 .map_err(|_| channel_closed_error())?;
672
673 result.map_err(|status| server_error(format!("ClusterConf error: {}", status.message())))
674 }
675}
676
677impl std::fmt::Debug for EmbeddedClient {
678 fn fmt(
679 &self,
680 f: &mut std::fmt::Formatter<'_>,
681 ) -> std::fmt::Result {
682 f.debug_struct("EmbeddedClient")
683 .field("client_id", &self.client_id)
684 .field("timeout", &self.timeout)
685 .finish()
686 }
687}
688
689#[async_trait::async_trait]
691impl ClientApi for EmbeddedClient {
692 async fn put(
693 &self,
694 key: impl AsRef<[u8]> + Send,
695 value: impl AsRef<[u8]> + Send,
696 ) -> ClientApiResult<()> {
697 self.put(key, value).await
698 }
699
700 async fn put_with_ttl(
701 &self,
702 key: impl AsRef<[u8]> + Send,
703 value: impl AsRef<[u8]> + Send,
704 ttl_secs: u64,
705 ) -> ClientApiResult<()> {
706 let request = ClientWriteRequest {
707 client_id: self.client_id,
708 command: Some(WriteOperation::Insert {
709 key: Bytes::copy_from_slice(key.as_ref()),
710 value: Bytes::copy_from_slice(value.as_ref()),
711 ttl_secs: Some(ttl_secs),
712 }),
713 };
714
715 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
716
717 self.cmd_tx
718 .send(d_engine_core::ClientCmd::Propose(request, resp_tx))
719 .await
720 .map_err(|_| channel_closed_error())?;
721
722 let result = tokio::time::timeout(self.timeout, resp_rx)
723 .await
724 .map_err(|_| timeout_error(self.timeout))?
725 .map_err(|_| channel_closed_error())?;
726
727 let response =
728 result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
729
730 if response.error != ErrorCode::Success {
731 return Err(Self::map_error_response(
732 response.error,
733 response.leader_hint,
734 response.retry_after_ms,
735 ));
736 }
737
738 Ok(())
739 }
740
741 async fn get(
742 &self,
743 key: impl AsRef<[u8]> + Send,
744 ) -> ClientApiResult<Option<Bytes>> {
745 self.get_linearizable(key).await
746 }
747
748 async fn get_multi(
749 &self,
750 keys: &[Bytes],
751 ) -> ClientApiResult<Vec<Option<Bytes>>> {
752 self.get_multi_linearizable(keys).await
753 }
754
755 async fn delete(
756 &self,
757 key: impl AsRef<[u8]> + Send,
758 ) -> ClientApiResult<()> {
759 self.delete(key).await
760 }
761
762 async fn compare_and_swap(
763 &self,
764 key: impl AsRef<[u8]> + Send,
765 expected_value: Option<impl AsRef<[u8]> + Send>,
766 new_value: impl AsRef<[u8]> + Send,
767 ) -> ClientApiResult<bool> {
768 let request = ClientWriteRequest {
769 client_id: self.client_id,
770 command: Some(WriteOperation::CompareAndSwap {
771 key: Bytes::copy_from_slice(key.as_ref()),
772 expected: expected_value.map(|v| Bytes::copy_from_slice(v.as_ref())),
773 new_value: Bytes::copy_from_slice(new_value.as_ref()),
774 }),
775 };
776
777 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
778
779 self.cmd_tx
780 .send(d_engine_core::ClientCmd::Propose(request, resp_tx))
781 .await
782 .map_err(|_| channel_closed_error())?;
783
784 let result = tokio::time::timeout(self.timeout, resp_rx)
785 .await
786 .map_err(|_| timeout_error(self.timeout))?
787 .map_err(|_| channel_closed_error())?;
788
789 let response =
790 result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
791
792 if response.error != ErrorCode::Success {
793 return Err(Self::map_error_response(
794 response.error,
795 response.leader_hint,
796 response.retry_after_ms,
797 ));
798 }
799
800 match response.result {
801 Some(ClientResponsePayload::Write(result)) => Ok(result.succeeded),
802 _ => Err(server_error("Invalid CAS response".to_string())),
803 }
804 }
805
806 async fn list_members(
807 &self
808 ) -> ClientApiResult<Vec<d_engine_proto::server::cluster::NodeMeta>> {
809 let cluster_membership = self.get_cluster_membership().await?;
810 Ok(cluster_membership.nodes)
811 }
812
813 async fn get_leader_id(&self) -> ClientApiResult<Option<u32>> {
814 let cluster_membership = self.get_cluster_membership().await?;
815 Ok(cluster_membership.current_leader_id)
816 }
817
818 async fn get_multi_with_policy(
819 &self,
820 keys: &[Bytes],
821 consistency_policy: Option<d_engine_core::config::ReadConsistencyPolicy>,
822 ) -> ClientApiResult<Vec<Option<Bytes>>> {
823 self.get_multi_with_consistency(
824 keys,
825 consistency_policy
826 .unwrap_or(d_engine_core::config::ReadConsistencyPolicy::LinearizableRead),
827 )
828 .await
829 }
830
831 async fn get_linearizable(
832 &self,
833 key: impl AsRef<[u8]> + Send,
834 ) -> ClientApiResult<Option<Bytes>> {
835 self.get_linearizable(key).await
836 }
837
838 async fn get_lease(
839 &self,
840 key: impl AsRef<[u8]> + Send,
841 ) -> ClientApiResult<Option<Bytes>> {
842 self.get_with_consistency(key, ReadConsistencyPolicy::LeaseRead).await
843 }
844
845 async fn get_eventual(
846 &self,
847 key: impl AsRef<[u8]> + Send,
848 ) -> ClientApiResult<Option<Bytes>> {
849 self.get_eventual(key).await
850 }
851
852 async fn scan_prefix(
853 &self,
854 prefix: impl AsRef<[u8]> + Send,
855 ) -> ClientApiResult<ScanResult> {
856 self.scan_prefix(prefix).await
857 }
858}
859
860#[cfg(test)]
861mod error_helper_tests {
862 use d_engine_core::client::KvEntry;
863
864 use super::*;
865
866 #[test]
869 fn test_not_leader_uses_server_retry_after_ms_when_provided() {
870 let err = not_leader_error(
871 Some("1".to_string()),
872 Some("127.0.0.1:5001".to_string()),
873 Some(500),
874 );
875 match err {
876 ClientApiError::Network { retry_after_ms, .. } => {
877 assert_eq!(retry_after_ms, Some(500));
878 }
879 _ => panic!("expected Network error"),
880 }
881 }
882
883 #[test]
884 fn test_not_leader_falls_back_to_100ms_when_server_provides_none() {
885 let err = not_leader_error(None, None, None);
886 match err {
887 ClientApiError::Network { retry_after_ms, .. } => {
888 assert_eq!(retry_after_ms, Some(100));
889 }
890 _ => panic!("expected Network error"),
891 }
892 }
893
894 #[test]
895 fn test_not_leader_zero_is_not_treated_as_none() {
896 let err = not_leader_error(None, None, Some(0));
898 match err {
899 ClientApiError::Network { retry_after_ms, .. } => {
900 assert_eq!(retry_after_ms, Some(0));
901 }
902 _ => panic!("expected Network error"),
903 }
904 }
905
906 #[test]
909 fn test_map_error_response_not_leader_forwards_retry_after_ms() {
910 let err = EmbeddedClient::map_error_response(
911 ErrorCode::NotLeader,
912 Some(LeaderHint {
913 leader_id: 2,
914 address: "10.0.0.2:5002".into(),
915 }),
916 Some(250),
917 );
918 match err {
919 ClientApiError::Network {
920 code,
921 retry_after_ms,
922 leader_hint,
923 ..
924 } => {
925 assert_eq!(code, ErrorCode::NotLeader);
926 assert_eq!(retry_after_ms, Some(250));
927 let h = leader_hint.unwrap();
928 assert_eq!(h.leader_id, 2);
929 }
930 _ => panic!("expected Network error"),
931 }
932 }
933
934 #[test]
935 fn test_map_error_response_not_leader_falls_back_to_100ms_when_none() {
936 let err = EmbeddedClient::map_error_response(ErrorCode::NotLeader, None, None);
937 match err {
938 ClientApiError::Network { retry_after_ms, .. } => {
939 assert_eq!(retry_after_ms, Some(100));
940 }
941 _ => panic!("expected Network error"),
942 }
943 }
944
945 #[test]
953 fn test_extract_read_payload_returns_read_results_on_success() {
954 let entries = vec![KvEntry {
956 key: Bytes::from("k"),
957 value: Bytes::from("v"),
958 }];
959 let payload = Some(ClientResponsePayload::Read(ReadResults {
960 entries: entries.clone(),
961 }));
962
963 let result = extract_read_payload(payload).unwrap();
964 assert_eq!(result.entries.len(), 1);
965 assert_eq!(result.entries[0].key, Bytes::from("k"));
966 assert_eq!(result.entries[0].value, Bytes::from("v"));
967 }
968
969 #[test]
970 fn test_extract_read_payload_rejects_write_result_payload() {
971 let payload = Some(ClientResponsePayload::Write(
974 d_engine_core::client::WriteResult { succeeded: true },
975 ));
976
977 let err = extract_read_payload(payload).unwrap_err();
978 assert_eq!(err.code(), ErrorCode::InvalidResponse);
979 assert!(
980 err.message().contains("WriteResult"),
981 "error message should identify the unexpected variant; got: {}",
982 err.message()
983 );
984 }
985
986 #[test]
987 fn test_extract_read_payload_rejects_none_payload() {
988 let err = extract_read_payload(None).unwrap_err();
991 assert_eq!(err.code(), ErrorCode::InvalidResponse);
992 assert!(
993 err.message().contains("None"),
994 "error message should identify missing payload; got: {}",
995 err.message()
996 );
997 }
998
999 #[test]
1000 fn test_extract_read_payload_empty_entries_is_valid() {
1001 let payload = Some(ClientResponsePayload::Read(ReadResults { entries: vec![] }));
1004
1005 let result = extract_read_payload(payload).unwrap();
1006 assert!(result.entries.is_empty());
1007 }
1008
1009 #[test]
1010 fn test_extract_read_payload_multiple_entries_are_preserved() {
1011 let payload = Some(ClientResponsePayload::Read(ReadResults {
1013 entries: vec![
1014 KvEntry {
1015 key: Bytes::from("k1"),
1016 value: Bytes::from("v1"),
1017 },
1018 KvEntry {
1019 key: Bytes::from("k2"),
1020 value: Bytes::from("v2"),
1021 },
1022 ],
1023 }));
1024
1025 let result = extract_read_payload(payload).unwrap();
1026 assert_eq!(result.entries.len(), 2);
1027 assert_eq!(result.entries[1].key, Bytes::from("k2"));
1028 }
1029}