1use dashmap::DashMap;
10use std::collections::BTreeMap;
11use std::net::SocketAddr;
12use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tokio::net::{TcpListener, TcpStream};
16
17use super::{CacheEntry, TierStats};
18use crate::distribcache::QueryFingerprint;
19
20#[derive(Debug, Clone, Copy)]
22#[repr(u8)]
23enum MessageType {
24 Get = 1,
25 GetResponse = 2,
26 Put = 3,
27 PutResponse = 4,
28 Invalidate = 5,
29 Ping = 6,
30 Pong = 7,
31}
32
33impl TryFrom<u8> for MessageType {
34 type Error = ();
35
36 fn try_from(value: u8) -> Result<Self, Self::Error> {
37 match value {
38 1 => Ok(MessageType::Get),
39 2 => Ok(MessageType::GetResponse),
40 3 => Ok(MessageType::Put),
41 4 => Ok(MessageType::PutResponse),
42 5 => Ok(MessageType::Invalidate),
43 6 => Ok(MessageType::Ping),
44 7 => Ok(MessageType::Pong),
45 _ => Err(()),
46 }
47 }
48}
49
50#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
52pub struct PeerId(pub u64);
53
54impl PeerId {
55 pub fn new(addr: &SocketAddr) -> Self {
56 use std::collections::hash_map::DefaultHasher;
57 use std::hash::{Hash, Hasher};
58
59 let mut hasher = DefaultHasher::new();
60 addr.hash(&mut hasher);
61 Self(hasher.finish())
62 }
63
64 pub fn local() -> Self {
65 Self(0)
66 }
67}
68
69struct HashRing {
71 ring: BTreeMap<u64, PeerId>,
73 virtual_nodes: usize,
75}
76
77impl HashRing {
78 fn new(virtual_nodes: usize) -> Self {
79 Self {
80 ring: BTreeMap::new(),
81 virtual_nodes,
82 }
83 }
84
85 fn add_peer(&mut self, peer: PeerId) {
86 for i in 0..self.virtual_nodes {
87 let hash = Self::hash_peer(peer, i);
88 self.ring.insert(hash, peer);
89 }
90 }
91
92 fn remove_peer(&mut self, peer: PeerId) {
93 self.ring.retain(|_, p| *p != peer);
94 }
95
96 fn get_nodes(&self, key: &[u8], count: u32) -> Vec<PeerId> {
97 if self.ring.is_empty() {
98 return Vec::new();
99 }
100
101 let key_hash = Self::hash_key(key);
102 let mut nodes = Vec::new();
103 let mut seen = std::collections::HashSet::new();
104
105 let iter = self
107 .ring
108 .range(key_hash..)
109 .chain(self.ring.range(..key_hash));
110
111 for (_, peer) in iter {
112 if !seen.contains(peer) {
113 seen.insert(*peer);
114 nodes.push(*peer);
115 if nodes.len() >= count as usize {
116 break;
117 }
118 }
119 }
120
121 nodes
122 }
123
124 fn hash_peer(peer: PeerId, vnode: usize) -> u64 {
125 use std::collections::hash_map::DefaultHasher;
126 use std::hash::{Hash, Hasher};
127
128 let mut hasher = DefaultHasher::new();
129 peer.0.hash(&mut hasher);
130 vnode.hash(&mut hasher);
131 hasher.finish()
132 }
133
134 fn hash_key(key: &[u8]) -> u64 {
135 use std::collections::hash_map::DefaultHasher;
136 use std::hash::{Hash, Hasher};
137
138 let mut hasher = DefaultHasher::new();
139 key.hash(&mut hasher);
140 hasher.finish()
141 }
142}
143
144#[derive(Debug)]
146pub struct PeerConnection {
147 pub addr: SocketAddr,
149 pub healthy: bool,
151 pub last_seen: u64,
153 pub rtt_us: u64,
155 timeout_ms: u64,
157}
158
159impl Clone for PeerConnection {
160 fn clone(&self) -> Self {
161 Self {
162 addr: self.addr,
163 healthy: self.healthy,
164 last_seen: self.last_seen,
165 rtt_us: self.rtt_us,
166 timeout_ms: self.timeout_ms,
167 }
168 }
169}
170
171impl PeerConnection {
172 fn new(addr: SocketAddr) -> Self {
173 Self {
174 addr,
175 healthy: true,
176 last_seen: 0,
177 rtt_us: 0,
178 timeout_ms: 5000, }
180 }
181
182 pub async fn get(&self, fingerprint: &QueryFingerprint) -> Result<CacheEntry, &'static str> {
184 let _start = std::time::Instant::now();
185
186 let stream = match tokio::time::timeout(
188 std::time::Duration::from_millis(self.timeout_ms),
189 TcpStream::connect(self.addr),
190 )
191 .await
192 {
193 Ok(Ok(s)) => s,
194 Ok(Err(_)) => return Err("Connection failed"),
195 Err(_) => return Err("Connection timeout"),
196 };
197
198 let fp_bytes = match bincode::serialize(fingerprint) {
200 Ok(b) => b,
201 Err(_) => return Err("Serialization failed"),
202 };
203
204 let (mut reader, mut writer) = stream.into_split();
206
207 let mut header = vec![MessageType::Get as u8];
209 header.extend_from_slice(&(fp_bytes.len() as u32).to_le_bytes());
210
211 if writer.write_all(&header).await.is_err() {
212 return Err("Failed to write header");
213 }
214 if writer.write_all(&fp_bytes).await.is_err() {
215 return Err("Failed to write data");
216 }
217
218 let mut resp_header = [0u8; 5];
220 if reader.read_exact(&mut resp_header).await.is_err() {
221 return Err("Failed to read response header");
222 }
223
224 let _msg_type =
225 MessageType::try_from(resp_header[0]).map_err(|_| "Invalid message type")?;
226 let length = u32::from_le_bytes([
227 resp_header[1],
228 resp_header[2],
229 resp_header[3],
230 resp_header[4],
231 ]) as usize;
232
233 if length == 0 {
234 return Err("Entry not found");
235 }
236
237 let mut data = vec![0u8; length];
238 if reader.read_exact(&mut data).await.is_err() {
239 return Err("Failed to read response data");
240 }
241
242 let entry: CacheEntry =
244 bincode::deserialize(&data).map_err(|_| "Deserialization failed")?;
245
246 Ok(entry)
247 }
248
249 pub async fn insert(
251 &self,
252 fingerprint: QueryFingerprint,
253 entry: CacheEntry,
254 ) -> Result<(), &'static str> {
255 let stream = match tokio::time::timeout(
257 std::time::Duration::from_millis(self.timeout_ms),
258 TcpStream::connect(self.addr),
259 )
260 .await
261 {
262 Ok(Ok(s)) => s,
263 Ok(Err(_)) => return Err("Connection failed"),
264 Err(_) => return Err("Connection timeout"),
265 };
266
267 let fp_bytes = bincode::serialize(&fingerprint).map_err(|_| "FP serialization failed")?;
269 let entry_bytes = bincode::serialize(&entry).map_err(|_| "Entry serialization failed")?;
270
271 let mut message = Vec::with_capacity(1 + 4 + 4 + fp_bytes.len() + entry_bytes.len());
273 message.push(MessageType::Put as u8);
274 message.extend_from_slice(&(fp_bytes.len() as u32).to_le_bytes());
275 message.extend_from_slice(&(entry_bytes.len() as u32).to_le_bytes());
276 message.extend_from_slice(&fp_bytes);
277 message.extend_from_slice(&entry_bytes);
278
279 let (mut reader, mut writer) = stream.into_split();
280
281 if writer.write_all(&message).await.is_err() {
282 return Err("Failed to write");
283 }
284
285 let mut resp_header = [0u8; 5];
287 if reader.read_exact(&mut resp_header).await.is_err() {
288 return Err("Failed to read ack");
289 }
290
291 Ok(())
292 }
293
294 #[allow(dead_code)]
296 pub async fn ping(&self) -> bool {
297 let _start = std::time::Instant::now();
298
299 let stream = match tokio::time::timeout(
300 std::time::Duration::from_millis(1000),
301 TcpStream::connect(self.addr),
302 )
303 .await
304 {
305 Ok(Ok(s)) => s,
306 _ => return false,
307 };
308
309 let (mut reader, mut writer) = stream.into_split();
310
311 let ping_msg = [MessageType::Ping as u8, 0, 0, 0, 0];
313 if writer.write_all(&ping_msg).await.is_err() {
314 return false;
315 }
316
317 let mut resp = [0u8; 5];
319 match tokio::time::timeout(
320 std::time::Duration::from_millis(1000),
321 reader.read_exact(&mut resp),
322 )
323 .await
324 {
325 Ok(Ok(_)) => resp[0] == MessageType::Pong as u8,
326 _ => false,
327 }
328 }
329
330 pub async fn invalidate(&self, fingerprint: &QueryFingerprint) -> Result<(), &'static str> {
332 let stream = match tokio::time::timeout(
333 std::time::Duration::from_millis(self.timeout_ms),
334 TcpStream::connect(self.addr),
335 )
336 .await
337 {
338 Ok(Ok(s)) => s,
339 Ok(Err(_)) => return Err("Connection failed"),
340 Err(_) => return Err("Connection timeout"),
341 };
342
343 let fp_bytes = bincode::serialize(fingerprint).map_err(|_| "Serialization failed")?;
344
345 let mut message = vec![MessageType::Invalidate as u8];
346 message.extend_from_slice(&(fp_bytes.len() as u32).to_le_bytes());
347 message.extend_from_slice(&fp_bytes);
348
349 let (_, mut writer) = stream.into_split();
350 writer
351 .write_all(&message)
352 .await
353 .map_err(|_| "Write failed")?;
354
355 Ok(())
356 }
357}
358
359pub struct DistributedCache {
361 local_peer_id: PeerId,
363
364 hash_ring: std::sync::RwLock<HashRing>,
366
367 peers: DashMap<PeerId, PeerConnection>,
369
370 local: DashMap<u64, CacheEntry>,
372
373 replication_factor: u32,
375
376 hits: AtomicU64,
378 misses: AtomicU64,
379 remote_hits: AtomicU64,
380 #[allow(dead_code)]
381 replication_lag_ms: AtomicU64,
382 healthy_peers: AtomicU32,
383}
384
385impl DistributedCache {
386 pub fn new(replication_factor: u32, peer_addrs: Vec<SocketAddr>) -> Self {
388 let local_peer_id = PeerId::local();
389
390 let mut hash_ring = HashRing::new(100); hash_ring.add_peer(local_peer_id);
392
393 let peers = DashMap::new();
394 for addr in &peer_addrs {
395 let peer_id = PeerId::new(addr);
396 hash_ring.add_peer(peer_id);
397 peers.insert(peer_id, PeerConnection::new(*addr));
398 }
399
400 Self {
401 local_peer_id,
402 hash_ring: std::sync::RwLock::new(hash_ring),
403 peers,
404 local: DashMap::new(),
405 replication_factor,
406 hits: AtomicU64::new(0),
407 misses: AtomicU64::new(0),
408 remote_hits: AtomicU64::new(0),
409 replication_lag_ms: AtomicU64::new(0),
410 healthy_peers: AtomicU32::new(peer_addrs.len() as u32),
411 }
412 }
413
414 pub async fn get(&self, fingerprint: &QueryFingerprint) -> Option<CacheEntry> {
416 let key = self.fingerprint_to_hash(fingerprint);
417 let key_bytes = key.to_le_bytes();
418
419 let owners = {
421 let ring = self.hash_ring.read().ok()?;
422 ring.get_nodes(&key_bytes, self.replication_factor)
423 };
424
425 if owners.contains(&self.local_peer_id) {
427 if let Some(entry) = self.local.get(&key) {
428 if !entry.is_expired() {
429 self.hits.fetch_add(1, Ordering::Relaxed);
430 return Some(entry.clone());
431 } else {
432 drop(entry);
433 self.local.remove(&key);
434 }
435 }
436 }
437
438 for owner in owners {
440 if owner == self.local_peer_id {
441 continue;
442 }
443
444 if let Some(peer) = self.peers.get(&owner) {
445 if peer.healthy {
446 if let Ok(entry) = peer.get(fingerprint).await {
447 self.local.insert(key, entry.clone());
449 self.remote_hits.fetch_add(1, Ordering::Relaxed);
450 self.hits.fetch_add(1, Ordering::Relaxed);
451 return Some(entry);
452 }
453 }
454 }
455 }
456
457 self.misses.fetch_add(1, Ordering::Relaxed);
458 None
459 }
460
461 pub async fn insert(&self, fingerprint: QueryFingerprint, entry: CacheEntry) {
463 let key = self.fingerprint_to_hash(&fingerprint);
464 let key_bytes = key.to_le_bytes();
465
466 let owners = {
468 let ring = self.hash_ring.read().unwrap();
469 ring.get_nodes(&key_bytes, self.replication_factor)
470 };
471
472 if owners.contains(&self.local_peer_id) {
474 self.local.insert(key, entry.clone());
475 }
476
477 for owner in owners {
479 if owner == self.local_peer_id {
480 continue;
481 }
482
483 if let Some(peer) = self.peers.get(&owner) {
484 if peer.healthy {
485 let fp = fingerprint.clone();
486 let e = entry.clone();
487 let _ = peer.insert(fp, e).await;
488 }
489 }
490 }
491 }
492
493 pub fn add_peer(&self, addr: SocketAddr) {
495 let peer_id = PeerId::new(&addr);
496
497 if let Ok(mut ring) = self.hash_ring.write() {
498 ring.add_peer(peer_id);
499 }
500
501 self.peers.insert(peer_id, PeerConnection::new(addr));
502 self.healthy_peers.fetch_add(1, Ordering::Relaxed);
503 }
504
505 pub fn remove_peer(&self, addr: &SocketAddr) {
507 let peer_id = PeerId::new(addr);
508
509 if let Ok(mut ring) = self.hash_ring.write() {
510 ring.remove_peer(peer_id);
511 }
512
513 if self.peers.remove(&peer_id).is_some() {
514 self.healthy_peers.fetch_sub(1, Ordering::Relaxed);
515 }
516 }
517
518 pub fn mark_unhealthy(&self, addr: &SocketAddr) {
520 let peer_id = PeerId::new(addr);
521
522 if let Some(mut peer) = self.peers.get_mut(&peer_id) {
523 if peer.healthy {
524 peer.healthy = false;
525 self.healthy_peers.fetch_sub(1, Ordering::Relaxed);
526 }
527 }
528 }
529
530 pub fn mark_healthy(&self, addr: &SocketAddr) {
532 let peer_id = PeerId::new(addr);
533
534 if let Some(mut peer) = self.peers.get_mut(&peer_id) {
535 if !peer.healthy {
536 peer.healthy = true;
537 self.healthy_peers.fetch_add(1, Ordering::Relaxed);
538 }
539 }
540 }
541
542 pub async fn invalidate(&self, fingerprint: &QueryFingerprint) {
544 let key = self.fingerprint_to_hash(fingerprint);
545
546 self.local.remove(&key);
548
549 for peer_ref in self.peers.iter() {
551 let peer = peer_ref.value();
552 if peer.healthy {
553 let fp = fingerprint.clone();
555 let peer_clone = peer.clone();
556 tokio::spawn(async move {
557 let _ = peer_clone.invalidate(&fp).await;
558 });
559 }
560 }
561 }
562
563 fn fingerprint_to_hash(&self, fingerprint: &QueryFingerprint) -> u64 {
565 use std::collections::hash_map::DefaultHasher;
566 use std::hash::{Hash, Hasher};
567
568 let mut hasher = DefaultHasher::new();
569 fingerprint.template.hash(&mut hasher);
570 if let Some(param) = fingerprint.param_hash {
571 param.hash(&mut hasher);
572 }
573 hasher.finish()
574 }
575
576 pub fn stats(&self) -> TierStats {
578 let local_size: usize = self.local.iter().map(|e| e.value().size()).sum();
579
580 TierStats {
581 size_bytes: local_size as u64,
582 max_size_bytes: 0, entry_count: self.local.len() as u64,
584 hits: self.hits.load(Ordering::Relaxed),
585 misses: self.misses.load(Ordering::Relaxed),
586 evictions: 0,
587 compression_ratio: None,
588 peer_count: Some(self.peers.len() as u32 + 1), healthy_peers: Some(self.healthy_peers.load(Ordering::Relaxed) + 1),
590 }
591 }
592
593 pub fn peer_addrs(&self) -> Vec<SocketAddr> {
595 self.peers.iter().map(|p| p.value().addr).collect()
596 }
597
598 pub fn copy_valid_entries_to(&self, target: &DistributedCache) {
600 for entry in self.local.iter() {
601 if !entry.value().is_expired() {
602 target.local.insert(*entry.key(), entry.value().clone());
603 }
604 }
605 }
606
607 pub async fn serve(self: Arc<Self>, addr: SocketAddr) -> std::io::Result<()> {
615 let listener = TcpListener::bind(addr).await?;
616 self.serve_on(listener).await;
617 Ok(())
618 }
619
620 pub async fn serve_on(self: Arc<Self>, listener: TcpListener) {
623 while let Ok((stream, _peer)) = listener.accept().await {
624 let cache = Arc::clone(&self);
625 tokio::spawn(async move {
626 let _ = cache.handle_peer_conn(stream).await;
627 });
628 }
629 }
630
631 async fn handle_peer_conn(&self, stream: TcpStream) -> std::io::Result<()> {
634 let (mut reader, mut writer) = stream.into_split();
635
636 let mut type_byte = [0u8; 1];
637 if reader.read_exact(&mut type_byte).await.is_err() {
638 return Ok(());
639 }
640 let msg_type = match MessageType::try_from(type_byte[0]) {
641 Ok(t) => t,
642 Err(_) => return Ok(()),
643 };
644
645 match msg_type {
646 MessageType::Get => {
647 let mut len_buf = [0u8; 4];
648 reader.read_exact(&mut len_buf).await?;
649 let fp_len = u32::from_le_bytes(len_buf) as usize;
650 let mut fp_bytes = vec![0u8; fp_len];
651 reader.read_exact(&mut fp_bytes).await?;
652
653 let payload = match bincode::deserialize::<QueryFingerprint>(&fp_bytes) {
655 Ok(fp) => {
656 let key = self.fingerprint_to_hash(&fp);
657 self.local.get(&key).and_then(|e| {
658 if e.is_expired() {
659 None
660 } else {
661 bincode::serialize(e.value()).ok()
662 }
663 })
664 }
665 Err(_) => None,
666 };
667
668 let mut out = vec![MessageType::GetResponse as u8];
669 match payload {
670 Some(bytes) => {
672 out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
673 out.extend_from_slice(&bytes);
674 }
675 None => out.extend_from_slice(&0u32.to_le_bytes()),
676 }
677 writer.write_all(&out).await?;
678 }
679 MessageType::Put => {
680 let mut len_buf = [0u8; 4];
681 reader.read_exact(&mut len_buf).await?;
682 let fp_len = u32::from_le_bytes(len_buf) as usize;
683 reader.read_exact(&mut len_buf).await?;
684 let entry_len = u32::from_le_bytes(len_buf) as usize;
685
686 let mut fp_bytes = vec![0u8; fp_len];
687 reader.read_exact(&mut fp_bytes).await?;
688 let mut entry_bytes = vec![0u8; entry_len];
689 reader.read_exact(&mut entry_bytes).await?;
690
691 if let (Ok(fp), Ok(entry)) = (
692 bincode::deserialize::<QueryFingerprint>(&fp_bytes),
693 bincode::deserialize::<CacheEntry>(&entry_bytes),
694 ) {
695 let key = self.fingerprint_to_hash(&fp);
696 self.local.insert(key, entry);
697 }
698
699 let mut out = vec![MessageType::PutResponse as u8];
701 out.extend_from_slice(&0u32.to_le_bytes());
702 writer.write_all(&out).await?;
703 }
704 MessageType::Ping => {
705 let mut len_buf = [0u8; 4];
706 let _ = reader.read_exact(&mut len_buf).await;
707 let mut out = vec![MessageType::Pong as u8];
708 out.extend_from_slice(&0u32.to_le_bytes());
709 writer.write_all(&out).await?;
710 }
711 MessageType::Invalidate => {
712 let mut len_buf = [0u8; 4];
713 reader.read_exact(&mut len_buf).await?;
714 let fp_len = u32::from_le_bytes(len_buf) as usize;
715 let mut fp_bytes = vec![0u8; fp_len];
716 reader.read_exact(&mut fp_bytes).await?;
717 if let Ok(fp) = bincode::deserialize::<QueryFingerprint>(&fp_bytes) {
718 let key = self.fingerprint_to_hash(&fp);
719 self.local.remove(&key);
720 }
721 }
723 MessageType::GetResponse | MessageType::PutResponse | MessageType::Pong => {}
725 }
726 Ok(())
727 }
728}
729
730#[cfg(test)]
731mod tests {
732 use super::*;
733 use std::time::Duration;
734
735 #[test]
736 fn test_hash_ring_distribution() {
737 let mut ring = HashRing::new(10);
738
739 let peer1 = PeerId(1);
740 let peer2 = PeerId(2);
741 let peer3 = PeerId(3);
742
743 ring.add_peer(peer1);
744 ring.add_peer(peer2);
745 ring.add_peer(peer3);
746
747 let key1 = b"test-key-1";
749 let key2 = b"test-key-2";
750 let key3 = b"test-key-3";
751
752 let nodes1 = ring.get_nodes(key1, 2);
753 let nodes2 = ring.get_nodes(key2, 2);
754 let nodes3 = ring.get_nodes(key3, 2);
755
756 assert_eq!(nodes1.len(), 2);
758 assert_eq!(nodes2.len(), 2);
759 assert_eq!(nodes3.len(), 2);
760 }
761
762 #[test]
763 fn test_hash_ring_replication() {
764 let mut ring = HashRing::new(10);
765
766 let peer1 = PeerId(1);
767 let peer2 = PeerId(2);
768
769 ring.add_peer(peer1);
770 ring.add_peer(peer2);
771
772 let key = b"replicated-key";
773 let nodes = ring.get_nodes(key, 2);
774
775 assert_eq!(nodes.len(), 2);
777 assert!(nodes.contains(&peer1));
778 assert!(nodes.contains(&peer2));
779 }
780
781 #[tokio::test]
782 async fn test_distributed_cache_local_insert_get() {
783 let cache = DistributedCache::new(1, Vec::new());
784
785 let fp = QueryFingerprint::from_query("SELECT * FROM users");
786 let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
787 .with_ttl(Duration::from_secs(300));
788
789 cache.insert(fp.clone(), entry).await;
790
791 let result = cache.get(&fp).await;
792 assert!(result.is_some());
793 assert_eq!(result.unwrap().data, vec![1, 2, 3]);
794 }
795
796 #[test]
797 fn test_distributed_cache_peer_management() {
798 let cache = DistributedCache::new(2, Vec::new());
799
800 let addr1: SocketAddr = "127.0.0.1:9100".parse().unwrap();
801 let addr2: SocketAddr = "127.0.0.1:9101".parse().unwrap();
802
803 cache.add_peer(addr1);
804 cache.add_peer(addr2);
805
806 assert_eq!(cache.stats().peer_count, Some(3)); cache.mark_unhealthy(&addr1);
809 assert_eq!(cache.stats().healthy_peers, Some(2)); cache.remove_peer(&addr1);
812 assert_eq!(cache.stats().peer_count, Some(2)); }
814
815 #[tokio::test]
816 async fn test_distributed_cache_stats() {
817 let cache = DistributedCache::new(1, Vec::new());
818
819 let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
820 let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
821
822 cache
823 .insert(
824 fp1.clone(),
825 CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
826 )
827 .await;
828
829 cache.get(&fp1).await; cache.get(&fp2).await; let stats = cache.stats();
833 assert_eq!(stats.hits, 1);
834 assert_eq!(stats.misses, 1);
835 assert_eq!(stats.entry_count, 1);
836 }
837
838 #[tokio::test]
844 async fn test_peer_server_put_get_roundtrip() {
845 let server = Arc::new(DistributedCache::new(1, Vec::new()));
847 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
848 let addr = listener.local_addr().unwrap();
849 tokio::spawn(Arc::clone(&server).serve_on(listener));
850
851 let client = PeerConnection::new(addr);
852 let fp = QueryFingerprint::from_query("SELECT * FROM accounts WHERE id = $1");
853 let entry = CacheEntry::new(vec![9, 8, 7], vec!["accounts".to_string()], 1)
854 .with_ttl(Duration::from_secs(300));
855
856 client.insert(fp.clone(), entry.clone()).await.unwrap();
858 let key = server.fingerprint_to_hash(&fp);
859 assert!(
860 server.local.contains_key(&key),
861 "PUT did not land on server"
862 );
863
864 let got = client.get(&fp).await.expect("remote GET failed");
866 assert_eq!(got.data, vec![9, 8, 7]);
867
868 assert!(client.ping().await, "peer did not answer ping");
870
871 client.invalidate(&fp).await.unwrap();
873 for _ in 0..50 {
875 if !server.local.contains_key(&key) {
876 break;
877 }
878 tokio::task::yield_now().await;
879 }
880 assert!(
881 !server.local.contains_key(&key),
882 "INVALIDATE did not remove entry"
883 );
884 assert!(
885 client.get(&fp).await.is_err(),
886 "GET should miss after invalidate"
887 );
888 }
889}