1pub mod credential;
44pub mod error;
45pub mod pool;
46pub mod proto;
47
48pub use credential::Credential;
49pub use error::Error;
50pub use pool::{Pool, PoolConfig};
51
52use std::collections::HashMap;
53use std::net::SocketAddr;
54use std::time::Instant;
55
56use bytes::Bytes;
57use ringline::{ConnCtx, ParseResult};
58
59use crate::proto::{
60 CacheCommand, CacheResponse, CacheResponseResult, DecodedMessage, StatusCode, UnaryCommand,
61 decode_length_delimited_message_bytes,
62};
63
64pub const MAX_RECV_SKIPS: usize = 256;
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
77pub struct RequestId(u64);
78
79impl RequestId {
80 pub fn value(&self) -> u64 {
82 self.0
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88#[non_exhaustive]
89pub enum CommandType {
90 Get,
91 Set,
92 Delete,
93}
94
95#[derive(Debug)]
97#[non_exhaustive]
98pub enum CompletedOp {
99 Get {
101 id: RequestId,
102 key: Bytes,
103 result: Result<Option<Bytes>, Error>,
104 user_data: u64,
105 latency_ns: u64,
106 },
107 Set {
109 id: RequestId,
110 key: Bytes,
111 result: Result<(), Error>,
112 user_data: u64,
113 latency_ns: u64,
114 },
115 Delete {
117 id: RequestId,
118 key: Bytes,
119 result: Result<(), Error>,
120 user_data: u64,
121 latency_ns: u64,
122 },
123}
124
125impl CompletedOp {
126 fn set_latency(self, latency_ns: u64) -> Self {
127 match self {
128 Self::Get {
129 id,
130 key,
131 result,
132 user_data,
133 ..
134 } => Self::Get {
135 id,
136 key,
137 result,
138 user_data,
139 latency_ns,
140 },
141 Self::Set {
142 id,
143 key,
144 result,
145 user_data,
146 ..
147 } => Self::Set {
148 id,
149 key,
150 result,
151 user_data,
152 latency_ns,
153 },
154 Self::Delete {
155 id,
156 key,
157 result,
158 user_data,
159 ..
160 } => Self::Delete {
161 id,
162 key,
163 result,
164 user_data,
165 latency_ns,
166 },
167 }
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct CommandResult {
174 pub command: CommandType,
176 pub latency_ns: u64,
178 pub success: bool,
180 pub ttfb_ns: Option<u64>,
182 pub tx_bytes: u32,
184 pub rx_bytes: u32,
186}
187
188type ResultCallback = Box<dyn Fn(&CommandResult)>;
190
191enum PendingOpKind {
195 Get,
196 Set,
197 Delete,
198}
199
200struct PendingOp {
202 kind: PendingOpKind,
203 key: Bytes,
204 send_ts: u64,
205 start: Option<Instant>,
206 user_data: u64,
207 tx_bytes: u32,
208}
209
210#[cfg(feature = "metrics")]
215pub struct ClientMetrics {
216 pub latency: histogram::Histogram,
218 pub requests: u64,
220 pub errors: u64,
222}
223
224#[cfg(feature = "metrics")]
225impl ClientMetrics {
226 fn new() -> Self {
227 Self {
228 latency: histogram::Histogram::new(7, 64).unwrap(),
229 requests: 0,
230 errors: 0,
231 }
232 }
233
234 fn record(&mut self, result: &CommandResult) {
235 self.requests += 1;
236 let _ = self.latency.increment(result.latency_ns);
237
238 if !result.success {
239 self.errors += 1;
240 }
241 }
242}
243
244pub struct ClientBuilder {
248 conn: ConnCtx,
249 on_result: Option<ResultCallback>,
250 namespace: Bytes,
251 max_in_flight: usize,
252 #[cfg(feature = "timestamps")]
253 use_kernel_ts: bool,
254 #[cfg(feature = "metrics")]
255 with_metrics: bool,
256}
257
258impl ClientBuilder {
259 pub(crate) fn new(conn: ConnCtx) -> Self {
260 Self {
261 conn,
262 on_result: None,
263 namespace: Bytes::new(),
264 max_in_flight: usize::MAX,
265 #[cfg(feature = "timestamps")]
266 use_kernel_ts: false,
267 #[cfg(feature = "metrics")]
268 with_metrics: false,
269 }
270 }
271
272 pub fn max_in_flight(mut self, n: usize) -> Self {
276 self.max_in_flight = n;
277 self
278 }
279
280 pub fn namespace(mut self, ns: impl AsRef<[u8]>) -> Self {
283 self.namespace = Bytes::copy_from_slice(ns.as_ref());
284 self
285 }
286
287 pub fn on_result<F: Fn(&CommandResult) + 'static>(mut self, f: F) -> Self {
289 self.on_result = Some(Box::new(f));
290 self
291 }
292
293 #[cfg(feature = "timestamps")]
295 pub fn kernel_timestamps(mut self, enabled: bool) -> Self {
296 self.use_kernel_ts = enabled;
297 self
298 }
299
300 #[cfg(feature = "metrics")]
302 pub fn with_metrics(mut self) -> Self {
303 self.with_metrics = true;
304 self
305 }
306
307 pub fn build(self) -> Client {
309 Client {
310 conn: self.conn,
311 next_message_id: 1,
312 pending: HashMap::new(),
313 send_buf: Vec::with_capacity(4096),
314 on_result: self.on_result,
315 namespace: self.namespace,
316 max_in_flight: self.max_in_flight,
317 #[cfg(feature = "timestamps")]
318 use_kernel_ts: self.use_kernel_ts,
319 #[cfg(feature = "metrics")]
320 metrics: if self.with_metrics {
321 Some(ClientMetrics::new())
322 } else {
323 None
324 },
325 }
326 }
327}
328
329pub struct Client {
337 conn: ConnCtx,
338 next_message_id: u64,
339 pending: HashMap<u64, PendingOp>,
340 send_buf: Vec<u8>,
341 on_result: Option<ResultCallback>,
342 namespace: Bytes,
343 max_in_flight: usize,
346 #[cfg(feature = "timestamps")]
347 use_kernel_ts: bool,
348 #[cfg(feature = "metrics")]
349 metrics: Option<ClientMetrics>,
350}
351
352impl Client {
353 pub async fn connect(credential: &Credential) -> Result<Self, Error> {
355 let host = credential.host();
356 let port = credential.port();
357 let addr: SocketAddr = Self::resolve_addr(host, port)?;
358 let tls_host = credential.tls_host();
359
360 let conn = ringline::connect_tls(addr, tls_host)?.await?;
361
362 let mut client = Self {
363 conn,
364 next_message_id: 1,
365 pending: HashMap::new(),
366 send_buf: Vec::with_capacity(4096),
367 on_result: None,
368 namespace: Bytes::new(),
369 max_in_flight: usize::MAX,
370 #[cfg(feature = "timestamps")]
371 use_kernel_ts: false,
372 #[cfg(feature = "metrics")]
373 metrics: None,
374 };
375
376 client.authenticate(credential.token()).await?;
377 Ok(client)
378 }
379
380 pub async fn connect_with_timeout(
382 credential: &Credential,
383 timeout_ms: u64,
384 ) -> Result<Self, Error> {
385 let host = credential.host();
386 let port = credential.port();
387 let addr: SocketAddr = Self::resolve_addr(host, port)?;
388 let tls_host = credential.tls_host();
389
390 let conn = ringline::connect_tls_with_timeout(addr, tls_host, timeout_ms)?.await?;
391
392 let mut client = Self {
393 conn,
394 next_message_id: 1,
395 pending: HashMap::new(),
396 send_buf: Vec::with_capacity(4096),
397 on_result: None,
398 namespace: Bytes::new(),
399 max_in_flight: usize::MAX,
400 #[cfg(feature = "timestamps")]
401 use_kernel_ts: false,
402 #[cfg(feature = "metrics")]
403 metrics: None,
404 };
405
406 client.authenticate(credential.token()).await?;
407 Ok(client)
408 }
409
410 pub fn builder(conn: ConnCtx) -> ClientBuilder {
414 ClientBuilder::new(conn)
415 }
416
417 pub fn conn(&self) -> ConnCtx {
419 self.conn
420 }
421
422 #[cfg(feature = "metrics")]
424 pub fn metrics(&self) -> Option<&ClientMetrics> {
425 self.metrics.as_ref()
426 }
427
428 #[cfg(feature = "metrics")]
430 pub fn metrics_mut(&mut self) -> Option<&mut ClientMetrics> {
431 self.metrics.as_mut()
432 }
433
434 pub fn set_namespace(&mut self, ns: impl AsRef<[u8]>) {
437 self.namespace = Bytes::copy_from_slice(ns.as_ref());
438 }
439
440 pub fn pending_count(&self) -> usize {
442 self.pending.len()
443 }
444
445 fn poison(&mut self) {
451 self.pending.clear();
452 self.conn.close();
453 }
454
455 #[inline]
461 fn check_in_flight(&self) -> Result<(), Error> {
462 if self.pending.len() >= self.max_in_flight {
463 Err(Error::TooManyInFlight)
464 } else {
465 Ok(())
466 }
467 }
468
469 pub fn fire_get(
471 &mut self,
472 cache: &str,
473 key: &[u8],
474 user_data: u64,
475 ) -> Result<RequestId, Error> {
476 self.check_in_flight()?;
477 let message_id = self.next_id();
478 let ns = self.namespace_for(cache);
479 let key = Bytes::copy_from_slice(key);
480 let cmd = CacheCommand::new(
481 message_id,
482 UnaryCommand::Get {
483 namespace: ns,
484 key: key.clone(),
485 },
486 );
487
488 self.send_command(&cmd)?;
489 let tx_bytes = self.send_buf.len() as u32;
490
491 let (send_ts, start) = self.timing_start();
492 self.pending.insert(
493 message_id,
494 PendingOp {
495 kind: PendingOpKind::Get,
496 key,
497 send_ts,
498 start,
499 user_data,
500 tx_bytes,
501 },
502 );
503
504 Ok(RequestId(message_id))
505 }
506
507 pub fn fire_set(
509 &mut self,
510 cache: &str,
511 key: &[u8],
512 value: &[u8],
513 ttl_ms: u64,
514 user_data: u64,
515 ) -> Result<RequestId, Error> {
516 self.check_in_flight()?;
517 let message_id = self.next_id();
518 let ns = self.namespace_for(cache);
519 let key = Bytes::copy_from_slice(key);
520 let cmd = CacheCommand::new(
521 message_id,
522 UnaryCommand::Set {
523 namespace: ns,
524 key: key.clone(),
525 value: Bytes::copy_from_slice(value),
526 ttl_millis: ttl_ms,
527 },
528 );
529
530 self.send_command(&cmd)?;
531 let tx_bytes = self.send_buf.len() as u32;
532
533 let (send_ts, start) = self.timing_start();
534 self.pending.insert(
535 message_id,
536 PendingOp {
537 kind: PendingOpKind::Set,
538 key,
539 send_ts,
540 start,
541 user_data,
542 tx_bytes,
543 },
544 );
545
546 Ok(RequestId(message_id))
547 }
548
549 pub fn fire_delete(
551 &mut self,
552 cache: &str,
553 key: &[u8],
554 user_data: u64,
555 ) -> Result<RequestId, Error> {
556 self.check_in_flight()?;
557 let message_id = self.next_id();
558 let ns = self.namespace_for(cache);
559 let key = Bytes::copy_from_slice(key);
560 let cmd = CacheCommand::new(
561 message_id,
562 UnaryCommand::Delete {
563 namespace: ns,
564 key: key.clone(),
565 },
566 );
567
568 self.send_command(&cmd)?;
569 let tx_bytes = self.send_buf.len() as u32;
570
571 let (send_ts, start) = self.timing_start();
572 self.pending.insert(
573 message_id,
574 PendingOp {
575 kind: PendingOpKind::Delete,
576 key,
577 send_ts,
578 start,
579 user_data,
580 tx_bytes,
581 },
582 );
583
584 Ok(RequestId(message_id))
585 }
586
587 pub async fn recv(&mut self) -> Result<CompletedOp, Error> {
608 if self.pending.is_empty() {
609 return Err(Error::NoPending);
610 }
611
612 let mut skips = 0usize;
613 let (dr, total_bytes) = loop {
614 let pending = &mut self.pending;
615 let mut dispatch_result: Option<DispatchResult> = None;
616 let mut malformed = false;
617 let mut oversize = false;
618
619 let n = self
620 .conn
621 .with_bytes(
622 |bytes| match decode_length_delimited_message_bytes(&bytes) {
623 DecodedMessage::Message(consumed, msg_bytes) => {
624 if let Some(response) = CacheResponse::decode_bytes(msg_bytes) {
625 dispatch_result = dispatch_response(response, pending);
626 } else {
627 malformed = true;
628 }
629 ParseResult::Consumed(consumed)
630 }
631 DecodedMessage::Incomplete => ParseResult::Consumed(0),
632 DecodedMessage::Oversize => {
633 oversize = true;
634 ParseResult::Consumed(0)
635 }
636 },
637 )
638 .await;
639
640 if oversize {
641 self.poison();
645 return Err(Error::Protocol(
646 "inbound message exceeded MAX_MESSAGE_SIZE".into(),
647 ));
648 }
649 if n == 0 {
650 self.poison();
655 return Err(Error::ConnectionClosed);
656 }
657 if malformed {
658 self.poison();
661 return Err(Error::Protocol("failed to decode response".into()));
662 }
663 if let Some(dr) = dispatch_result {
664 break (dr, n);
665 }
666 if self.pending.is_empty() {
671 return Err(Error::NoPending);
672 }
673 skips += 1;
674 if skips > MAX_RECV_SKIPS {
675 self.poison();
679 return Err(Error::Protocol(
680 "too many consecutive unmatched responses".into(),
681 ));
682 }
683 };
684 let n = total_bytes;
685
686 let recv_ts = self.capture_recv_ts();
691 let rx_bytes = n as u32;
692 let ttfb_ns = Self::ttfb_from_timestamps(recv_ts, dr.send_ts);
693 let latency_ns = if self.is_instrumented() {
694 let latency_ns = self.finish_timing(recv_ts, dr.send_ts, dr.start);
695 self.record(&CommandResult {
696 command: dr.cmd_type,
697 latency_ns,
698 success: dr.success,
699 ttfb_ns,
700 tx_bytes: dr.tx_bytes,
701 rx_bytes,
702 });
703 latency_ns
704 } else {
705 0
706 };
707
708 Ok(dr.op.set_latency(latency_ns))
709 }
710
711 #[inline]
719 fn check_no_pending(&self) -> Result<(), Error> {
720 if self.pending.is_empty() {
721 Ok(())
722 } else {
723 Err(Error::PendingOpsInFlight)
724 }
725 }
726
727 pub async fn get(&mut self, cache: &str, key: &[u8]) -> Result<Option<Bytes>, Error> {
734 self.check_no_pending()?;
735 let _id = self.fire_get(cache, key, 0)?;
736 match self.recv().await? {
737 CompletedOp::Get { result, .. } => result,
738 _ => Err(Error::Protocol("unexpected response type".into())),
739 }
740 }
741
742 pub async fn set(
744 &mut self,
745 cache: &str,
746 key: &[u8],
747 value: &[u8],
748 ttl_ms: u64,
749 ) -> Result<(), Error> {
750 self.check_no_pending()?;
751 let _id = self.fire_set(cache, key, value, ttl_ms, 0)?;
752 match self.recv().await? {
753 CompletedOp::Set { result, .. } => result,
754 _ => Err(Error::Protocol("unexpected response type".into())),
755 }
756 }
757
758 pub async fn delete(&mut self, cache: &str, key: &[u8]) -> Result<(), Error> {
760 self.check_no_pending()?;
761 let _id = self.fire_delete(cache, key, 0)?;
762 match self.recv().await? {
763 CompletedOp::Delete { result, .. } => result,
764 _ => Err(Error::Protocol("unexpected response type".into())),
765 }
766 }
767
768 #[inline]
772 fn namespace_for(&self, cache: &str) -> Bytes {
773 if self.namespace.is_empty() {
774 Bytes::copy_from_slice(cache.as_bytes())
775 } else {
776 self.namespace.clone()
777 }
778 }
779
780 fn next_id(&mut self) -> u64 {
781 let id = self.next_message_id;
782 self.next_message_id += 1;
783 id
784 }
785
786 fn send_command(&mut self, cmd: &CacheCommand) -> Result<(), Error> {
787 self.send_buf.clear();
788 cmd.encode_length_delimited_into(&mut self.send_buf);
789 self.conn.send_nowait(&self.send_buf)?;
790 Ok(())
791 }
792
793 async fn authenticate(&mut self, token: &str) -> Result<(), Error> {
805 let message_id = self.next_id();
806 let cmd = CacheCommand::new(
807 message_id,
808 UnaryCommand::Authenticate {
809 auth_token: token.to_string(),
810 },
811 );
812
813 self.send_buf.clear();
814 cmd.encode_length_delimited_into(&mut self.send_buf);
815 if let Err(e) = self.conn.send_nowait(&self.send_buf) {
816 self.conn.close();
817 return Err(Error::Io(e));
818 }
819
820 let mut skips = 0usize;
821 loop {
822 let mut auth_result: Option<Result<(), Error>> = None;
823 let mut malformed = false;
824 let mut oversize = false;
825
826 let n = self
827 .conn
828 .with_bytes(
829 |bytes| match decode_length_delimited_message_bytes(&bytes) {
830 DecodedMessage::Message(consumed, msg_bytes) => {
831 if let Some(response) = CacheResponse::decode_bytes(msg_bytes) {
832 if response.message_id == message_id {
833 match response.result {
834 CacheResponseResult::Authenticate => {
835 auth_result = Some(Ok(()));
836 }
837 CacheResponseResult::Error(err) => {
838 auth_result = Some(Err(Error::AuthFailed(err.message)));
839 }
840 _ => {
841 auth_result = Some(Err(Error::Protocol(
842 "unexpected auth response type".into(),
843 )));
844 }
845 }
846 }
847 } else {
848 malformed = true;
849 }
850 ParseResult::Consumed(consumed)
851 }
852 DecodedMessage::Incomplete => ParseResult::Consumed(0),
853 DecodedMessage::Oversize => {
854 oversize = true;
855 ParseResult::Consumed(0)
856 }
857 },
858 )
859 .await;
860
861 if oversize {
862 self.conn.close();
863 return Err(Error::Protocol(
864 "auth response exceeded MAX_MESSAGE_SIZE".into(),
865 ));
866 }
867 if n == 0 {
868 self.conn.close();
869 return Err(Error::ConnectionClosed);
870 }
871 if malformed {
872 self.conn.close();
873 return Err(Error::Protocol("failed to decode auth response".into()));
874 }
875 if let Some(r) = auth_result {
876 return r;
877 }
878 skips += 1;
883 if skips > MAX_RECV_SKIPS {
884 self.conn.close();
885 return Err(Error::Protocol(
886 "too many unmatched messages before auth response".into(),
887 ));
888 }
889 }
890 }
891
892 fn resolve_addr(host: &str, port: u16) -> Result<SocketAddr, Error> {
893 use std::net::ToSocketAddrs;
894 let addr_str = format!("{}:{}", host, port);
895 addr_str
896 .to_socket_addrs()
897 .map_err(|e| Error::Config(format!("failed to resolve {}: {}", addr_str, e)))?
898 .next()
899 .ok_or_else(|| Error::Config(format!("no addresses found for {}", addr_str)))
900 }
901
902 #[inline]
905 fn is_instrumented(&self) -> bool {
906 if self.on_result.is_some() {
907 return true;
908 }
909 #[cfg(feature = "metrics")]
910 if self.metrics.is_some() {
911 return true;
912 }
913 false
914 }
915
916 #[inline]
917 fn timing_start(&self) -> (u64, Option<Instant>) {
918 if self.is_instrumented() {
919 (self.send_timestamp(), Some(Instant::now()))
920 } else {
921 (0, None)
922 }
923 }
924
925 #[cfg(feature = "timestamps")]
926 #[inline]
927 fn capture_recv_ts(&self) -> u64 {
928 if self.use_kernel_ts {
929 self.conn.recv_timestamp()
930 } else {
931 0
932 }
933 }
934
935 #[cfg(not(feature = "timestamps"))]
936 #[inline]
937 fn capture_recv_ts(&self) -> u64 {
938 0
939 }
940
941 #[inline]
942 fn ttfb_from_timestamps(recv_ts: u64, send_ts: u64) -> Option<u64> {
943 if recv_ts > 0 && recv_ts > send_ts {
944 Some(recv_ts - send_ts)
945 } else {
946 None
947 }
948 }
949
950 #[cfg(feature = "timestamps")]
951 #[inline]
952 fn send_timestamp(&self) -> u64 {
953 if self.use_kernel_ts {
954 now_realtime_ns()
955 } else {
956 0
957 }
958 }
959
960 #[cfg(not(feature = "timestamps"))]
961 #[inline]
962 fn send_timestamp(&self) -> u64 {
963 0
964 }
965
966 #[inline]
967 fn finish_timing(&self, recv_ts: u64, send_ts: u64, start: Option<Instant>) -> u64 {
968 if recv_ts > 0 && recv_ts > send_ts {
969 return recv_ts - send_ts;
970 }
971 start.map_or(0, |s| s.elapsed().as_nanos() as u64)
972 }
973
974 fn record(&mut self, result: &CommandResult) {
975 if let Some(ref cb) = self.on_result {
976 cb(result);
977 }
978 #[cfg(feature = "metrics")]
979 if let Some(ref mut m) = self.metrics {
980 m.record(result);
981 }
982 }
983}
984
985struct DispatchResult {
989 op: CompletedOp,
990 cmd_type: CommandType,
991 success: bool,
992 send_ts: u64,
993 start: Option<Instant>,
994 tx_bytes: u32,
995}
996
997fn dispatch_response(
999 response: CacheResponse,
1000 pending: &mut HashMap<u64, PendingOp>,
1001) -> Option<DispatchResult> {
1002 let message_id = response.message_id;
1003 let id = RequestId(message_id);
1004
1005 let op = pending.remove(&message_id)?;
1006 let send_ts = op.send_ts;
1007 let start = op.start;
1008 let user_data = op.user_data;
1009 let tx_bytes = op.tx_bytes;
1010
1011 match op.kind {
1012 PendingOpKind::Get => {
1013 let result = match response.result {
1014 CacheResponseResult::Get { value } => Ok(value),
1015 CacheResponseResult::Error(ref err) if err.code == StatusCode::NotFound => Ok(None),
1016 CacheResponseResult::Error(err) => Err(Error::Protocol(format!(
1017 "{}: {}",
1018 err.code as u32, err.message
1019 ))),
1020 _ => Err(Error::Protocol("unexpected response type for get".into())),
1021 };
1022 let success = result.is_ok();
1023 Some(DispatchResult {
1024 op: CompletedOp::Get {
1025 id,
1026 key: op.key,
1027 result,
1028 user_data,
1029 latency_ns: 0,
1030 },
1031 cmd_type: CommandType::Get,
1032 success,
1033 send_ts,
1034 start,
1035 tx_bytes,
1036 })
1037 }
1038 PendingOpKind::Set => {
1039 let result = match response.result {
1040 CacheResponseResult::Set => Ok(()),
1041 CacheResponseResult::Error(err) => Err(Error::Protocol(format!(
1042 "{}: {}",
1043 err.code as u32, err.message
1044 ))),
1045 _ => Err(Error::Protocol("unexpected response type for set".into())),
1046 };
1047 let success = result.is_ok();
1048 Some(DispatchResult {
1049 op: CompletedOp::Set {
1050 id,
1051 key: op.key,
1052 result,
1053 user_data,
1054 latency_ns: 0,
1055 },
1056 cmd_type: CommandType::Set,
1057 success,
1058 send_ts,
1059 start,
1060 tx_bytes,
1061 })
1062 }
1063 PendingOpKind::Delete => {
1064 let result = match response.result {
1065 CacheResponseResult::Delete => Ok(()),
1066 CacheResponseResult::Error(err) => Err(Error::Protocol(format!(
1067 "{}: {}",
1068 err.code as u32, err.message
1069 ))),
1070 _ => Err(Error::Protocol(
1071 "unexpected response type for delete".into(),
1072 )),
1073 };
1074 let success = result.is_ok();
1075 Some(DispatchResult {
1076 op: CompletedOp::Delete {
1077 id,
1078 key: op.key,
1079 result,
1080 user_data,
1081 latency_ns: 0,
1082 },
1083 cmd_type: CommandType::Delete,
1084 success,
1085 send_ts,
1086 start,
1087 tx_bytes,
1088 })
1089 }
1090 }
1091}
1092
1093#[cfg(feature = "timestamps")]
1096fn now_realtime_ns() -> u64 {
1097 let mut ts = libc::timespec {
1098 tv_sec: 0,
1099 tv_nsec: 0,
1100 };
1101 unsafe {
1102 libc::clock_gettime(libc::CLOCK_REALTIME, &mut ts);
1103 }
1104 ts.tv_sec as u64 * 1_000_000_000 + ts.tv_nsec as u64
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109 use super::*;
1110 use crate::proto::CacheResponse;
1111
1112 fn make_pending_get(key: &[u8]) -> PendingOp {
1113 PendingOp {
1114 kind: PendingOpKind::Get,
1115 key: Bytes::copy_from_slice(key),
1116 send_ts: 0,
1117 start: None,
1118 user_data: 42,
1119 tx_bytes: 0,
1120 }
1121 }
1122
1123 #[test]
1124 fn max_recv_skips_is_positive() {
1125 const { assert!(MAX_RECV_SKIPS > 0) };
1129 }
1130
1131 #[test]
1132 fn pending_ops_in_flight_display() {
1133 let msg = format!("{}", Error::PendingOpsInFlight);
1134 assert!(
1135 msg.contains("fire_*"),
1136 "PendingOpsInFlight display should mention fire_*, got: {msg}"
1137 );
1138 }
1139
1140 #[test]
1141 fn dispatch_response_returns_some_on_match() {
1142 let mut pending: HashMap<u64, PendingOp> = HashMap::new();
1143 pending.insert(7, make_pending_get(b"k"));
1144 let response = CacheResponse::get_hit(7, Bytes::from_static(b"v"));
1145 let dr = dispatch_response(response, &mut pending);
1146 assert!(dr.is_some(), "expected matched dispatch");
1147 assert!(pending.is_empty(), "matched op should be drained");
1148 }
1149
1150 #[test]
1151 fn dispatch_response_returns_none_on_unmatched_id() {
1152 let mut pending: HashMap<u64, PendingOp> = HashMap::new();
1153 pending.insert(1, make_pending_get(b"k"));
1154 let response = CacheResponse::get_hit(99, Bytes::from_static(b"v"));
1156 let dr = dispatch_response(response, &mut pending);
1157 assert!(dr.is_none(), "unmatched id must return None");
1158 assert_eq!(pending.len(), 1, "pending must be unchanged on miss");
1159 }
1160
1161 #[test]
1162 fn dispatch_response_get_returns_protocol_error_on_wrong_kind() {
1163 let mut pending: HashMap<u64, PendingOp> = HashMap::new();
1164 pending.insert(3, make_pending_get(b"k"));
1165 let response = CacheResponse::set_ok(3);
1167 let dr = dispatch_response(response, &mut pending).expect("matched id");
1168 assert!(pending.is_empty());
1169 assert!(!dr.success, "kind mismatch should mark op as failed");
1170 match dr.op {
1171 CompletedOp::Get { result, .. } => assert!(result.is_err()),
1172 _ => panic!("expected Get op kind preserved"),
1173 }
1174 }
1175}