Skip to main content

heliosdb_proxy/distribcache/tiers/
l3_distributed.rs

1//! L3 Distributed Cache - Cache mesh with <10ms access time
2//!
3//! Features:
4//! - Consistent hashing for key distribution
5//! - Replication for availability
6//! - TCP-based peer-to-peer communication
7//! - Gossip protocol for peer discovery (planned)
8
9use dashmap::DashMap;
10use std::collections::BTreeMap;
11use std::net::SocketAddr;
12use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tokio::net::{TcpListener, TcpStream};
16
17use super::{CacheEntry, TierStats};
18use crate::distribcache::QueryFingerprint;
19
20/// Cache protocol message types
21#[derive(Debug, Clone, Copy)]
22#[repr(u8)]
23enum MessageType {
24    Get = 1,
25    GetResponse = 2,
26    Put = 3,
27    PutResponse = 4,
28    Invalidate = 5,
29    Ping = 6,
30    Pong = 7,
31}
32
33impl TryFrom<u8> for MessageType {
34    type Error = ();
35
36    fn try_from(value: u8) -> Result<Self, Self::Error> {
37        match value {
38            1 => Ok(MessageType::Get),
39            2 => Ok(MessageType::GetResponse),
40            3 => Ok(MessageType::Put),
41            4 => Ok(MessageType::PutResponse),
42            5 => Ok(MessageType::Invalidate),
43            6 => Ok(MessageType::Ping),
44            7 => Ok(MessageType::Pong),
45            _ => Err(()),
46        }
47    }
48}
49
50/// Peer identifier
51#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
52pub struct PeerId(pub u64);
53
54impl PeerId {
55    pub fn new(addr: &SocketAddr) -> Self {
56        use std::collections::hash_map::DefaultHasher;
57        use std::hash::{Hash, Hasher};
58
59        let mut hasher = DefaultHasher::new();
60        addr.hash(&mut hasher);
61        Self(hasher.finish())
62    }
63
64    pub fn local() -> Self {
65        Self(0)
66    }
67}
68
69/// Consistent hash ring for key distribution
70struct HashRing {
71    /// Ring nodes (virtual nodes -> peer)
72    ring: BTreeMap<u64, PeerId>,
73    /// Number of virtual nodes per peer
74    virtual_nodes: usize,
75}
76
77impl HashRing {
78    fn new(virtual_nodes: usize) -> Self {
79        Self {
80            ring: BTreeMap::new(),
81            virtual_nodes,
82        }
83    }
84
85    fn add_peer(&mut self, peer: PeerId) {
86        for i in 0..self.virtual_nodes {
87            let hash = Self::hash_peer(peer, i);
88            self.ring.insert(hash, peer);
89        }
90    }
91
92    fn remove_peer(&mut self, peer: PeerId) {
93        self.ring.retain(|_, p| *p != peer);
94    }
95
96    fn get_nodes(&self, key: &[u8], count: u32) -> Vec<PeerId> {
97        if self.ring.is_empty() {
98            return Vec::new();
99        }
100
101        let key_hash = Self::hash_key(key);
102        let mut nodes = Vec::new();
103        let mut seen = std::collections::HashSet::new();
104
105        // Find first node >= key_hash
106        let iter = self
107            .ring
108            .range(key_hash..)
109            .chain(self.ring.range(..key_hash));
110
111        for (_, peer) in iter {
112            if !seen.contains(peer) {
113                seen.insert(*peer);
114                nodes.push(*peer);
115                if nodes.len() >= count as usize {
116                    break;
117                }
118            }
119        }
120
121        nodes
122    }
123
124    fn hash_peer(peer: PeerId, vnode: usize) -> u64 {
125        use std::collections::hash_map::DefaultHasher;
126        use std::hash::{Hash, Hasher};
127
128        let mut hasher = DefaultHasher::new();
129        peer.0.hash(&mut hasher);
130        vnode.hash(&mut hasher);
131        hasher.finish()
132    }
133
134    fn hash_key(key: &[u8]) -> u64 {
135        use std::collections::hash_map::DefaultHasher;
136        use std::hash::{Hash, Hasher};
137
138        let mut hasher = DefaultHasher::new();
139        key.hash(&mut hasher);
140        hasher.finish()
141    }
142}
143
144/// Peer connection state
145#[derive(Debug)]
146pub struct PeerConnection {
147    /// Peer address
148    pub addr: SocketAddr,
149    /// Connection healthy
150    pub healthy: bool,
151    /// Last seen timestamp
152    pub last_seen: u64,
153    /// Round-trip time in microseconds
154    pub rtt_us: u64,
155    /// Connection timeout in milliseconds
156    timeout_ms: u64,
157}
158
159impl Clone for PeerConnection {
160    fn clone(&self) -> Self {
161        Self {
162            addr: self.addr,
163            healthy: self.healthy,
164            last_seen: self.last_seen,
165            rtt_us: self.rtt_us,
166            timeout_ms: self.timeout_ms,
167        }
168    }
169}
170
171impl PeerConnection {
172    fn new(addr: SocketAddr) -> Self {
173        Self {
174            addr,
175            healthy: true,
176            last_seen: 0,
177            rtt_us: 0,
178            timeout_ms: 5000, // 5 second timeout
179        }
180    }
181
182    /// Get entry from peer via TCP
183    pub async fn get(&self, fingerprint: &QueryFingerprint) -> Result<CacheEntry, &'static str> {
184        let _start = std::time::Instant::now();
185
186        // Try to connect with timeout
187        let stream = match tokio::time::timeout(
188            std::time::Duration::from_millis(self.timeout_ms),
189            TcpStream::connect(self.addr),
190        )
191        .await
192        {
193            Ok(Ok(s)) => s,
194            Ok(Err(_)) => return Err("Connection failed"),
195            Err(_) => return Err("Connection timeout"),
196        };
197
198        // Build request message
199        let fp_bytes = match bincode::serialize(fingerprint) {
200            Ok(b) => b,
201            Err(_) => return Err("Serialization failed"),
202        };
203
204        // Send GET request
205        let (mut reader, mut writer) = stream.into_split();
206
207        // Message format: [type: u8][length: u32][data: bytes]
208        let mut header = vec![MessageType::Get as u8];
209        header.extend_from_slice(&(fp_bytes.len() as u32).to_le_bytes());
210
211        if writer.write_all(&header).await.is_err() {
212            return Err("Failed to write header");
213        }
214        if writer.write_all(&fp_bytes).await.is_err() {
215            return Err("Failed to write data");
216        }
217
218        // Read response
219        let mut resp_header = [0u8; 5];
220        if reader.read_exact(&mut resp_header).await.is_err() {
221            return Err("Failed to read response header");
222        }
223
224        let _msg_type =
225            MessageType::try_from(resp_header[0]).map_err(|_| "Invalid message type")?;
226        let length = u32::from_le_bytes([
227            resp_header[1],
228            resp_header[2],
229            resp_header[3],
230            resp_header[4],
231        ]) as usize;
232
233        if length == 0 {
234            return Err("Entry not found");
235        }
236
237        let mut data = vec![0u8; length];
238        if reader.read_exact(&mut data).await.is_err() {
239            return Err("Failed to read response data");
240        }
241
242        // Deserialize entry
243        let entry: CacheEntry =
244            bincode::deserialize(&data).map_err(|_| "Deserialization failed")?;
245
246        Ok(entry)
247    }
248
249    /// Insert entry to peer via TCP
250    pub async fn insert(
251        &self,
252        fingerprint: QueryFingerprint,
253        entry: CacheEntry,
254    ) -> Result<(), &'static str> {
255        // Try to connect with timeout
256        let stream = match tokio::time::timeout(
257            std::time::Duration::from_millis(self.timeout_ms),
258            TcpStream::connect(self.addr),
259        )
260        .await
261        {
262            Ok(Ok(s)) => s,
263            Ok(Err(_)) => return Err("Connection failed"),
264            Err(_) => return Err("Connection timeout"),
265        };
266
267        // Serialize fingerprint and entry
268        let fp_bytes = bincode::serialize(&fingerprint).map_err(|_| "FP serialization failed")?;
269        let entry_bytes = bincode::serialize(&entry).map_err(|_| "Entry serialization failed")?;
270
271        // Build message: [type: u8][fp_len: u32][entry_len: u32][fp_data][entry_data]
272        let mut message = Vec::with_capacity(1 + 4 + 4 + fp_bytes.len() + entry_bytes.len());
273        message.push(MessageType::Put as u8);
274        message.extend_from_slice(&(fp_bytes.len() as u32).to_le_bytes());
275        message.extend_from_slice(&(entry_bytes.len() as u32).to_le_bytes());
276        message.extend_from_slice(&fp_bytes);
277        message.extend_from_slice(&entry_bytes);
278
279        let (mut reader, mut writer) = stream.into_split();
280
281        if writer.write_all(&message).await.is_err() {
282            return Err("Failed to write");
283        }
284
285        // Read response (ack)
286        let mut resp_header = [0u8; 5];
287        if reader.read_exact(&mut resp_header).await.is_err() {
288            return Err("Failed to read ack");
289        }
290
291        Ok(())
292    }
293
294    /// Ping peer to check health
295    #[allow(dead_code)]
296    pub async fn ping(&self) -> bool {
297        let _start = std::time::Instant::now();
298
299        let stream = match tokio::time::timeout(
300            std::time::Duration::from_millis(1000),
301            TcpStream::connect(self.addr),
302        )
303        .await
304        {
305            Ok(Ok(s)) => s,
306            _ => return false,
307        };
308
309        let (mut reader, mut writer) = stream.into_split();
310
311        // Send ping
312        let ping_msg = [MessageType::Ping as u8, 0, 0, 0, 0];
313        if writer.write_all(&ping_msg).await.is_err() {
314            return false;
315        }
316
317        // Wait for pong
318        let mut resp = [0u8; 5];
319        match tokio::time::timeout(
320            std::time::Duration::from_millis(1000),
321            reader.read_exact(&mut resp),
322        )
323        .await
324        {
325            Ok(Ok(_)) => resp[0] == MessageType::Pong as u8,
326            _ => false,
327        }
328    }
329
330    /// Send invalidation message to peer
331    pub async fn invalidate(&self, fingerprint: &QueryFingerprint) -> Result<(), &'static str> {
332        let stream = match tokio::time::timeout(
333            std::time::Duration::from_millis(self.timeout_ms),
334            TcpStream::connect(self.addr),
335        )
336        .await
337        {
338            Ok(Ok(s)) => s,
339            Ok(Err(_)) => return Err("Connection failed"),
340            Err(_) => return Err("Connection timeout"),
341        };
342
343        let fp_bytes = bincode::serialize(fingerprint).map_err(|_| "Serialization failed")?;
344
345        let mut message = vec![MessageType::Invalidate as u8];
346        message.extend_from_slice(&(fp_bytes.len() as u32).to_le_bytes());
347        message.extend_from_slice(&fp_bytes);
348
349        let (_, mut writer) = stream.into_split();
350        writer
351            .write_all(&message)
352            .await
353            .map_err(|_| "Write failed")?;
354
355        Ok(())
356    }
357}
358
359/// L3 Distributed Cache - Cache mesh with consistent hashing
360pub struct DistributedCache {
361    /// Local peer ID
362    local_peer_id: PeerId,
363
364    /// Consistent hash ring
365    hash_ring: std::sync::RwLock<HashRing>,
366
367    /// Peer connections
368    peers: DashMap<PeerId, PeerConnection>,
369
370    /// Local cache for owned keys
371    local: DashMap<u64, CacheEntry>,
372
373    /// Replication factor
374    replication_factor: u32,
375
376    /// Statistics
377    hits: AtomicU64,
378    misses: AtomicU64,
379    remote_hits: AtomicU64,
380    #[allow(dead_code)]
381    replication_lag_ms: AtomicU64,
382    healthy_peers: AtomicU32,
383}
384
385impl DistributedCache {
386    /// Create a new distributed cache
387    pub fn new(replication_factor: u32, peer_addrs: Vec<SocketAddr>) -> Self {
388        let local_peer_id = PeerId::local();
389
390        let mut hash_ring = HashRing::new(100); // 100 virtual nodes per peer
391        hash_ring.add_peer(local_peer_id);
392
393        let peers = DashMap::new();
394        for addr in &peer_addrs {
395            let peer_id = PeerId::new(addr);
396            hash_ring.add_peer(peer_id);
397            peers.insert(peer_id, PeerConnection::new(*addr));
398        }
399
400        Self {
401            local_peer_id,
402            hash_ring: std::sync::RwLock::new(hash_ring),
403            peers,
404            local: DashMap::new(),
405            replication_factor,
406            hits: AtomicU64::new(0),
407            misses: AtomicU64::new(0),
408            remote_hits: AtomicU64::new(0),
409            replication_lag_ms: AtomicU64::new(0),
410            healthy_peers: AtomicU32::new(peer_addrs.len() as u32),
411        }
412    }
413
414    /// Get an entry from the distributed cache
415    pub async fn get(&self, fingerprint: &QueryFingerprint) -> Option<CacheEntry> {
416        let key = self.fingerprint_to_hash(fingerprint);
417        let key_bytes = key.to_le_bytes();
418
419        // Determine owners
420        let owners = {
421            let ring = self.hash_ring.read().ok()?;
422            ring.get_nodes(&key_bytes, self.replication_factor)
423        };
424
425        // Check local first if we own it
426        if owners.contains(&self.local_peer_id) {
427            if let Some(entry) = self.local.get(&key) {
428                if !entry.is_expired() {
429                    self.hits.fetch_add(1, Ordering::Relaxed);
430                    return Some(entry.clone());
431                } else {
432                    drop(entry);
433                    self.local.remove(&key);
434                }
435            }
436        }
437
438        // Query remote peers
439        for owner in owners {
440            if owner == self.local_peer_id {
441                continue;
442            }
443
444            if let Some(peer) = self.peers.get(&owner) {
445                if peer.healthy {
446                    if let Ok(entry) = peer.get(fingerprint).await {
447                        // Cache locally
448                        self.local.insert(key, entry.clone());
449                        self.remote_hits.fetch_add(1, Ordering::Relaxed);
450                        self.hits.fetch_add(1, Ordering::Relaxed);
451                        return Some(entry);
452                    }
453                }
454            }
455        }
456
457        self.misses.fetch_add(1, Ordering::Relaxed);
458        None
459    }
460
461    /// Insert an entry into the distributed cache
462    pub async fn insert(&self, fingerprint: QueryFingerprint, entry: CacheEntry) {
463        let key = self.fingerprint_to_hash(&fingerprint);
464        let key_bytes = key.to_le_bytes();
465
466        // Determine owners
467        let owners = {
468            let ring = self.hash_ring.read().unwrap();
469            ring.get_nodes(&key_bytes, self.replication_factor)
470        };
471
472        // Insert locally if we own it
473        if owners.contains(&self.local_peer_id) {
474            self.local.insert(key, entry.clone());
475        }
476
477        // Replicate to other owners (fire and forget for now)
478        for owner in owners {
479            if owner == self.local_peer_id {
480                continue;
481            }
482
483            if let Some(peer) = self.peers.get(&owner) {
484                if peer.healthy {
485                    let fp = fingerprint.clone();
486                    let e = entry.clone();
487                    let _ = peer.insert(fp, e).await;
488                }
489            }
490        }
491    }
492
493    /// Add a peer to the cache mesh
494    pub fn add_peer(&self, addr: SocketAddr) {
495        let peer_id = PeerId::new(&addr);
496
497        if let Ok(mut ring) = self.hash_ring.write() {
498            ring.add_peer(peer_id);
499        }
500
501        self.peers.insert(peer_id, PeerConnection::new(addr));
502        self.healthy_peers.fetch_add(1, Ordering::Relaxed);
503    }
504
505    /// Remove a peer from the cache mesh
506    pub fn remove_peer(&self, addr: &SocketAddr) {
507        let peer_id = PeerId::new(addr);
508
509        if let Ok(mut ring) = self.hash_ring.write() {
510            ring.remove_peer(peer_id);
511        }
512
513        if self.peers.remove(&peer_id).is_some() {
514            self.healthy_peers.fetch_sub(1, Ordering::Relaxed);
515        }
516    }
517
518    /// Mark peer as unhealthy
519    pub fn mark_unhealthy(&self, addr: &SocketAddr) {
520        let peer_id = PeerId::new(addr);
521
522        if let Some(mut peer) = self.peers.get_mut(&peer_id) {
523            if peer.healthy {
524                peer.healthy = false;
525                self.healthy_peers.fetch_sub(1, Ordering::Relaxed);
526            }
527        }
528    }
529
530    /// Mark peer as healthy
531    pub fn mark_healthy(&self, addr: &SocketAddr) {
532        let peer_id = PeerId::new(addr);
533
534        if let Some(mut peer) = self.peers.get_mut(&peer_id) {
535            if !peer.healthy {
536                peer.healthy = true;
537                self.healthy_peers.fetch_add(1, Ordering::Relaxed);
538            }
539        }
540    }
541
542    /// Invalidate an entry across the mesh
543    pub async fn invalidate(&self, fingerprint: &QueryFingerprint) {
544        let key = self.fingerprint_to_hash(fingerprint);
545
546        // Remove locally
547        self.local.remove(&key);
548
549        // Broadcast invalidation to all healthy peers
550        for peer_ref in self.peers.iter() {
551            let peer = peer_ref.value();
552            if peer.healthy {
553                // Fire and forget - don't wait for ack
554                let fp = fingerprint.clone();
555                let peer_clone = peer.clone();
556                tokio::spawn(async move {
557                    let _ = peer_clone.invalidate(&fp).await;
558                });
559            }
560        }
561    }
562
563    /// Convert fingerprint to hash key
564    fn fingerprint_to_hash(&self, fingerprint: &QueryFingerprint) -> u64 {
565        use std::collections::hash_map::DefaultHasher;
566        use std::hash::{Hash, Hasher};
567
568        let mut hasher = DefaultHasher::new();
569        fingerprint.template.hash(&mut hasher);
570        if let Some(param) = fingerprint.param_hash {
571            param.hash(&mut hasher);
572        }
573        hasher.finish()
574    }
575
576    /// Get cache statistics
577    pub fn stats(&self) -> TierStats {
578        let local_size: usize = self.local.iter().map(|e| e.value().size()).sum();
579
580        TierStats {
581            size_bytes: local_size as u64,
582            max_size_bytes: 0, // Distributed, no single max
583            entry_count: self.local.len() as u64,
584            hits: self.hits.load(Ordering::Relaxed),
585            misses: self.misses.load(Ordering::Relaxed),
586            evictions: 0,
587            compression_ratio: None,
588            peer_count: Some(self.peers.len() as u32 + 1), // +1 for local
589            healthy_peers: Some(self.healthy_peers.load(Ordering::Relaxed) + 1),
590        }
591    }
592
593    /// Get peer addresses
594    pub fn peer_addrs(&self) -> Vec<SocketAddr> {
595        self.peers.iter().map(|p| p.value().addr).collect()
596    }
597
598    /// Copy valid entries to another cache (for branch merging)
599    pub fn copy_valid_entries_to(&self, target: &DistributedCache) {
600        for entry in self.local.iter() {
601            if !entry.value().is_expired() {
602                target.local.insert(*entry.key(), entry.value().clone());
603            }
604        }
605    }
606
607    /// Run the peer server on `addr`: accept connections from other mesh nodes
608    /// and serve the GET / PUT / PING / INVALIDATE wire protocol against this
609    /// node's owned-key map. This is the server counterpart to
610    /// [`PeerConnection`] — without it, replication PUTs and remote GETs from
611    /// peers have nothing to answer them, so the L3 mesh cannot actually
612    /// replicate or serve cross-node reads. Returns when binding fails; loops
613    /// forever otherwise (spawn it as a background task).
614    pub async fn serve(self: Arc<Self>, addr: SocketAddr) -> std::io::Result<()> {
615        let listener = TcpListener::bind(addr).await?;
616        self.serve_on(listener).await;
617        Ok(())
618    }
619
620    /// Like [`serve`](Self::serve) but takes an already-bound listener — used by
621    /// tests that bind `127.0.0.1:0` and need the OS-assigned port first.
622    pub async fn serve_on(self: Arc<Self>, listener: TcpListener) {
623        while let Ok((stream, _peer)) = listener.accept().await {
624            let cache = Arc::clone(&self);
625            tokio::spawn(async move {
626                let _ = cache.handle_peer_conn(stream).await;
627            });
628        }
629    }
630
631    /// Handle one inbound peer connection: read the framed request and reply
632    /// with the matching response, mirroring [`PeerConnection`]'s framing.
633    async fn handle_peer_conn(&self, stream: TcpStream) -> std::io::Result<()> {
634        let (mut reader, mut writer) = stream.into_split();
635
636        let mut type_byte = [0u8; 1];
637        if reader.read_exact(&mut type_byte).await.is_err() {
638            return Ok(());
639        }
640        let msg_type = match MessageType::try_from(type_byte[0]) {
641            Ok(t) => t,
642            Err(_) => return Ok(()),
643        };
644
645        match msg_type {
646            MessageType::Get => {
647                let mut len_buf = [0u8; 4];
648                reader.read_exact(&mut len_buf).await?;
649                let fp_len = u32::from_le_bytes(len_buf) as usize;
650                let mut fp_bytes = vec![0u8; fp_len];
651                reader.read_exact(&mut fp_bytes).await?;
652
653                // Look up the requested fingerprint in the local owned map.
654                let payload = match bincode::deserialize::<QueryFingerprint>(&fp_bytes) {
655                    Ok(fp) => {
656                        let key = self.fingerprint_to_hash(&fp);
657                        self.local.get(&key).and_then(|e| {
658                            if e.is_expired() {
659                                None
660                            } else {
661                                bincode::serialize(e.value()).ok()
662                            }
663                        })
664                    }
665                    Err(_) => None,
666                };
667
668                let mut out = vec![MessageType::GetResponse as u8];
669                match payload {
670                    // length 0 signals "not found" to the client.
671                    Some(bytes) => {
672                        out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
673                        out.extend_from_slice(&bytes);
674                    }
675                    None => out.extend_from_slice(&0u32.to_le_bytes()),
676                }
677                writer.write_all(&out).await?;
678            }
679            MessageType::Put => {
680                let mut len_buf = [0u8; 4];
681                reader.read_exact(&mut len_buf).await?;
682                let fp_len = u32::from_le_bytes(len_buf) as usize;
683                reader.read_exact(&mut len_buf).await?;
684                let entry_len = u32::from_le_bytes(len_buf) as usize;
685
686                let mut fp_bytes = vec![0u8; fp_len];
687                reader.read_exact(&mut fp_bytes).await?;
688                let mut entry_bytes = vec![0u8; entry_len];
689                reader.read_exact(&mut entry_bytes).await?;
690
691                if let (Ok(fp), Ok(entry)) = (
692                    bincode::deserialize::<QueryFingerprint>(&fp_bytes),
693                    bincode::deserialize::<CacheEntry>(&entry_bytes),
694                ) {
695                    let key = self.fingerprint_to_hash(&fp);
696                    self.local.insert(key, entry);
697                }
698
699                // Acknowledge (the client reads a 5-byte response header).
700                let mut out = vec![MessageType::PutResponse as u8];
701                out.extend_from_slice(&0u32.to_le_bytes());
702                writer.write_all(&out).await?;
703            }
704            MessageType::Ping => {
705                let mut len_buf = [0u8; 4];
706                let _ = reader.read_exact(&mut len_buf).await;
707                let mut out = vec![MessageType::Pong as u8];
708                out.extend_from_slice(&0u32.to_le_bytes());
709                writer.write_all(&out).await?;
710            }
711            MessageType::Invalidate => {
712                let mut len_buf = [0u8; 4];
713                reader.read_exact(&mut len_buf).await?;
714                let fp_len = u32::from_le_bytes(len_buf) as usize;
715                let mut fp_bytes = vec![0u8; fp_len];
716                reader.read_exact(&mut fp_bytes).await?;
717                if let Ok(fp) = bincode::deserialize::<QueryFingerprint>(&fp_bytes) {
718                    let key = self.fingerprint_to_hash(&fp);
719                    self.local.remove(&key);
720                }
721                // Invalidate is fire-and-forget; no response expected.
722            }
723            // Response-type frames are never received server-side.
724            MessageType::GetResponse | MessageType::PutResponse | MessageType::Pong => {}
725        }
726        Ok(())
727    }
728}
729
730#[cfg(test)]
731mod tests {
732    use super::*;
733    use std::time::Duration;
734
735    #[test]
736    fn test_hash_ring_distribution() {
737        let mut ring = HashRing::new(10);
738
739        let peer1 = PeerId(1);
740        let peer2 = PeerId(2);
741        let peer3 = PeerId(3);
742
743        ring.add_peer(peer1);
744        ring.add_peer(peer2);
745        ring.add_peer(peer3);
746
747        // Test key distribution
748        let key1 = b"test-key-1";
749        let key2 = b"test-key-2";
750        let key3 = b"test-key-3";
751
752        let nodes1 = ring.get_nodes(key1, 2);
753        let nodes2 = ring.get_nodes(key2, 2);
754        let nodes3 = ring.get_nodes(key3, 2);
755
756        // Each should return 2 nodes
757        assert_eq!(nodes1.len(), 2);
758        assert_eq!(nodes2.len(), 2);
759        assert_eq!(nodes3.len(), 2);
760    }
761
762    #[test]
763    fn test_hash_ring_replication() {
764        let mut ring = HashRing::new(10);
765
766        let peer1 = PeerId(1);
767        let peer2 = PeerId(2);
768
769        ring.add_peer(peer1);
770        ring.add_peer(peer2);
771
772        let key = b"replicated-key";
773        let nodes = ring.get_nodes(key, 2);
774
775        // Should return both peers
776        assert_eq!(nodes.len(), 2);
777        assert!(nodes.contains(&peer1));
778        assert!(nodes.contains(&peer2));
779    }
780
781    #[tokio::test]
782    async fn test_distributed_cache_local_insert_get() {
783        let cache = DistributedCache::new(1, Vec::new());
784
785        let fp = QueryFingerprint::from_query("SELECT * FROM users");
786        let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
787            .with_ttl(Duration::from_secs(300));
788
789        cache.insert(fp.clone(), entry).await;
790
791        let result = cache.get(&fp).await;
792        assert!(result.is_some());
793        assert_eq!(result.unwrap().data, vec![1, 2, 3]);
794    }
795
796    #[test]
797    fn test_distributed_cache_peer_management() {
798        let cache = DistributedCache::new(2, Vec::new());
799
800        let addr1: SocketAddr = "127.0.0.1:9100".parse().unwrap();
801        let addr2: SocketAddr = "127.0.0.1:9101".parse().unwrap();
802
803        cache.add_peer(addr1);
804        cache.add_peer(addr2);
805
806        assert_eq!(cache.stats().peer_count, Some(3)); // 2 remote + 1 local
807
808        cache.mark_unhealthy(&addr1);
809        assert_eq!(cache.stats().healthy_peers, Some(2)); // 1 remote + 1 local
810
811        cache.remove_peer(&addr1);
812        assert_eq!(cache.stats().peer_count, Some(2)); // 1 remote + 1 local
813    }
814
815    #[tokio::test]
816    async fn test_distributed_cache_stats() {
817        let cache = DistributedCache::new(1, Vec::new());
818
819        let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
820        let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
821
822        cache
823            .insert(
824                fp1.clone(),
825                CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
826            )
827            .await;
828
829        cache.get(&fp1).await; // Hit
830        cache.get(&fp2).await; // Miss
831
832        let stats = cache.stats();
833        assert_eq!(stats.hits, 1);
834        assert_eq!(stats.misses, 1);
835        assert_eq!(stats.entry_count, 1);
836    }
837
838    // End-to-end proof that the peer server actually answers the wire protocol:
839    // a PeerConnection client PUTs an entry over real TCP, the server stores it
840    // in its local map, and a subsequent GET round-trips the same bytes back.
841    // Before the server existed, replication PUTs reached nothing and remote
842    // GETs always failed to connect.
843    #[tokio::test]
844    async fn test_peer_server_put_get_roundtrip() {
845        // Bind the server to an OS-assigned port and start serving.
846        let server = Arc::new(DistributedCache::new(1, Vec::new()));
847        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
848        let addr = listener.local_addr().unwrap();
849        tokio::spawn(Arc::clone(&server).serve_on(listener));
850
851        let client = PeerConnection::new(addr);
852        let fp = QueryFingerprint::from_query("SELECT * FROM accounts WHERE id = $1");
853        let entry = CacheEntry::new(vec![9, 8, 7], vec!["accounts".to_string()], 1)
854            .with_ttl(Duration::from_secs(300));
855
856        // PUT over TCP — must be acked, and must land in the server's local map.
857        client.insert(fp.clone(), entry.clone()).await.unwrap();
858        let key = server.fingerprint_to_hash(&fp);
859        assert!(
860            server.local.contains_key(&key),
861            "PUT did not land on server"
862        );
863
864        // GET over TCP — must return the same bytes the client stored.
865        let got = client.get(&fp).await.expect("remote GET failed");
866        assert_eq!(got.data, vec![9, 8, 7]);
867
868        // PING must get a Pong.
869        assert!(client.ping().await, "peer did not answer ping");
870
871        // INVALIDATE must remove it server-side; the next GET then misses.
872        client.invalidate(&fp).await.unwrap();
873        // brief await so the fire-and-forget invalidate is processed
874        for _ in 0..50 {
875            if !server.local.contains_key(&key) {
876                break;
877            }
878            tokio::task::yield_now().await;
879        }
880        assert!(
881            !server.local.contains_key(&key),
882            "INVALIDATE did not remove entry"
883        );
884        assert!(
885            client.get(&fp).await.is_err(),
886            "GET should miss after invalidate"
887        );
888    }
889}