Skip to main content

fast_cache/replication/
embedded.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread::{self, JoinHandle};
4use std::time::{Duration, Instant};
5
6use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, bounded};
7use parking_lot::Mutex;
8
9use crate::config::ReplicationConfig;
10use crate::storage::{
11    Bytes, EmbeddedRouteMode, EmbeddedStore, MutationOp, hash_key_tag_from_hash, now_millis,
12    ttl_now_millis,
13};
14use crate::{FastCacheError, Result};
15
16use super::ReplicationFrameBytes;
17use super::backlog::BacklogCatchUp;
18use super::batcher::{
19    EncodedReplicationBatch, EncodedReplicationBatchBuilder, ReplicationBatch,
20    ReplicationBatchBuilder, ReplicationPrimary,
21};
22use super::metrics::{ReplicationMetrics, ReplicationMetricsSnapshot};
23use super::protocol::{
24    BorrowedReplicationMutation, FrameBackedReplicationMutation, FrameKind,
25    ReplicationFrameBytesPayload, ReplicationFramePayload, ReplicationMutation,
26    ReplicationMutationOp, ReplicationSnapshot, ShardWatermarks, decode_frame_payload,
27    decode_frame_payload_bytes, mutation_batch_record_count, visit_mutation_batch_payload,
28    visit_mutation_batch_payload_bytes,
29};
30
31const DIRECT_ENCODED_SET_MAX_VALUE_LEN: usize = 128;
32
33#[derive(Debug)]
34pub struct ReplicatedEmbeddedStore {
35    store: EmbeddedStore,
36    primary: Arc<ReplicationPrimary>,
37    emitters: Arc<ReplicatedEmbeddedEmitters>,
38}
39
40#[derive(Debug)]
41pub struct ReplicationReplica {
42    store: EmbeddedStore,
43    watermarks: ShardWatermarks,
44    metrics: ReplicationMetrics,
45}
46
47#[derive(Debug)]
48struct ReplicatedEmbeddedEmitters {
49    primary: Arc<ReplicationPrimary>,
50    shards: Vec<Mutex<ReplicatedEmbeddedShardEmitter>>,
51    flusher_stop: AtomicBool,
52    exporter_stop: Arc<AtomicBool>,
53    flusher_join: Mutex<Option<JoinHandle<()>>>,
54    exporter_joins: Mutex<Vec<JoinHandle<()>>>,
55    flush_interval: Duration,
56}
57
58#[derive(Debug)]
59struct ReplicatedEmbeddedShardEmitter {
60    sequence: u64,
61    batch: ReplicationBatchBuilder,
62    encoded_batch: EncodedReplicationBatchBuilder,
63    tx: Sender<ReplicatedEmbeddedBatch>,
64}
65
66#[derive(Debug)]
67enum ReplicatedEmbeddedBatch {
68    Owned(ReplicationBatch),
69    Encoded(EncodedReplicationBatch),
70}
71
72#[derive(Debug, Clone, Copy)]
73struct BorrowedSetReplication<'a> {
74    shard_id: usize,
75    timestamp_ms: u64,
76    key_hash: u64,
77    key_tag: u64,
78    key: &'a [u8],
79    value: &'a [u8],
80    expire_at_ms: Option<u64>,
81}
82
83impl ReplicatedEmbeddedStore {
84    pub fn new(shard_count: usize, config: ReplicationConfig) -> Result<Self> {
85        Self::with_route_mode(shard_count, EmbeddedRouteMode::FullKey, config)
86    }
87
88    pub fn with_route_mode(
89        shard_count: usize,
90        route_mode: EmbeddedRouteMode,
91        config: ReplicationConfig,
92    ) -> Result<Self> {
93        if !config.enabled {
94            return Err(FastCacheError::Config(
95                "ReplicatedEmbeddedStore requires replication.enabled = true".into(),
96            ));
97        }
98        let store = EmbeddedStore::with_route_mode(shard_count, route_mode);
99        let primary = Arc::new(ReplicationPrimary::start(shard_count, config.clone())?);
100        let emitters =
101            ReplicatedEmbeddedEmitters::start(Arc::clone(&primary), shard_count, config)?;
102        Ok(Self {
103            store,
104            primary,
105            emitters,
106        })
107    }
108
109    pub fn get(&self, key: &[u8]) -> Option<Bytes> {
110        self.store.get(key)
111    }
112
113    pub fn set(&self, key: Bytes, value: Bytes, ttl_ms: Option<u64>) {
114        let route = self.store.route_key(&key);
115        let key = bytes::Bytes::from(key);
116        let value = bytes::Bytes::from(value);
117        let direct_encode = direct_encoded_set_enabled(value.len());
118        match ttl_ms {
119            Some(ttl_ms) => {
120                let now_ms = now_millis();
121                let expire_at_ms = Some(now_ms.saturating_add(ttl_ms));
122                self.store.set_value_bytes_routed_expire_at_then(
123                    route,
124                    key.as_ref(),
125                    value.clone(),
126                    expire_at_ms,
127                    now_ms,
128                    || match direct_encode {
129                        true => self.emitters.emit_borrowed_set(BorrowedSetReplication {
130                            shard_id: route.shard_id,
131                            timestamp_ms: now_ms,
132                            key_hash: route.key_hash,
133                            key_tag: hash_key_tag_from_hash(route.key_hash),
134                            key: key.as_ref(),
135                            value: value.as_ref(),
136                            expire_at_ms,
137                        }),
138                        false => self.emitters.emit(
139                            route.shard_id,
140                            ReplicationMutation {
141                                shard_id: route.shard_id,
142                                sequence: 0,
143                                timestamp_ms: now_ms,
144                                op: ReplicationMutationOp::Set,
145                                key_hash: route.key_hash,
146                                key_tag: hash_key_tag_from_hash(route.key_hash),
147                                key: key.clone(),
148                                value: value.clone(),
149                                expire_at_ms,
150                            },
151                        ),
152                    },
153                );
154            }
155            None => {
156                let timestamp_ms = ttl_now_millis();
157                self.store.set_value_bytes_routed_no_ttl_then(
158                    route,
159                    key.as_ref(),
160                    value.clone(),
161                    || match direct_encode {
162                        true => self.emitters.emit_borrowed_set(BorrowedSetReplication {
163                            shard_id: route.shard_id,
164                            timestamp_ms,
165                            key_hash: route.key_hash,
166                            key_tag: hash_key_tag_from_hash(route.key_hash),
167                            key: key.as_ref(),
168                            value: value.as_ref(),
169                            expire_at_ms: None,
170                        }),
171                        false => self.emitters.emit(
172                            route.shard_id,
173                            ReplicationMutation {
174                                shard_id: route.shard_id,
175                                sequence: 0,
176                                timestamp_ms,
177                                op: ReplicationMutationOp::Set,
178                                key_hash: route.key_hash,
179                                key_tag: hash_key_tag_from_hash(route.key_hash),
180                                key: key.clone(),
181                                value: value.clone(),
182                                expire_at_ms: None,
183                            },
184                        ),
185                    },
186                );
187            }
188        }
189    }
190
191    pub fn delete(&self, key: &[u8]) -> bool {
192        let route = self.store.route_key(key);
193        let now_ms = now_millis();
194        self.store.delete_routed_then(route, key, now_ms, || {
195            self.emitters.emit(
196                route.shard_id,
197                ReplicationMutation {
198                    shard_id: route.shard_id,
199                    sequence: 0,
200                    timestamp_ms: now_ms,
201                    op: ReplicationMutationOp::Del,
202                    key_hash: route.key_hash,
203                    key_tag: hash_key_tag_from_hash(route.key_hash),
204                    key: bytes::Bytes::copy_from_slice(key),
205                    value: bytes::Bytes::new(),
206                    expire_at_ms: None,
207                },
208            );
209        })
210    }
211
212    pub fn expire(&self, key: &[u8], expire_at_ms: u64) -> bool {
213        let route = self.store.route_key(key);
214        let now_ms = now_millis();
215        self.store
216            .expire_routed_then(route, key, expire_at_ms, now_ms, || {
217                self.emitters.emit(
218                    route.shard_id,
219                    ReplicationMutation {
220                        shard_id: route.shard_id,
221                        sequence: 0,
222                        timestamp_ms: now_ms,
223                        op: ReplicationMutationOp::Expire,
224                        key_hash: route.key_hash,
225                        key_tag: hash_key_tag_from_hash(route.key_hash),
226                        key: bytes::Bytes::copy_from_slice(key),
227                        value: bytes::Bytes::new(),
228                        expire_at_ms: Some(expire_at_ms),
229                    },
230                );
231            })
232    }
233
234    /// Captures a consistent snapshot for replica bootstrap.
235    ///
236    /// Watermarks are taken first so any mutation that lands between the
237    /// watermark read and the entry read is reflected in the entries. Catch-up
238    /// from this watermark may re-deliver those mutations, but `apply_mutation`
239    /// deduplicates by sequence so re-delivery is a no-op.
240    pub fn snapshot(&self) -> ReplicationSnapshot {
241        self.emitters.flush_all_and_wait();
242        let watermarks = self.primary.current_watermarks();
243        ReplicationSnapshot {
244            entries: self.store.entry_snapshot(),
245            watermarks,
246        }
247    }
248
249    pub fn catch_up_replica(&self, replica: &mut ReplicationReplica) -> Result<()> {
250        self.emitters.flush_all_and_wait();
251        // Attempt backlog-only catch-up first.
252        match self.primary.catch_up_since(&replica.watermarks)? {
253            BacklogCatchUp::Available(frames) => {
254                replica.apply_frames(&frames)?;
255                replica.metrics.record_backlog_catch_up();
256                return Ok(());
257            }
258            BacklogCatchUp::NeedsSnapshot => {}
259        }
260
261        // Fall back to a full snapshot, then drain whatever new mutations the
262        // backlog has accumulated since.
263        let mut attempts = 0;
264        loop {
265            attempts += 1;
266            replica.replace_with_snapshot(self.snapshot());
267            let watermarks = replica.watermarks.clone();
268            match self.primary.catch_up_since(&watermarks)? {
269                BacklogCatchUp::Available(frames) => {
270                    replica.apply_frames(&frames)?;
271                    replica.metrics.record_snapshot_catch_up();
272                    return Ok(());
273                }
274                BacklogCatchUp::NeedsSnapshot if attempts >= MAX_SNAPSHOT_CATCH_UP_ATTEMPTS => {
275                    return Err(FastCacheError::Protocol(format!(
276                        "replication backlog could not catch up after {attempts} snapshot attempts"
277                    )));
278                }
279                BacklogCatchUp::NeedsSnapshot => {}
280            }
281        }
282    }
283
284    pub fn primary(&self) -> Arc<ReplicationPrimary> {
285        Arc::clone(&self.primary)
286    }
287
288    pub fn metrics_snapshot(&self) -> ReplicationMetricsSnapshot {
289        self.emitters.flush_all_and_wait();
290        self.primary.metrics_snapshot()
291    }
292
293    pub fn inner(&self) -> &EmbeddedStore {
294        &self.store
295    }
296}
297
298fn direct_encoded_set_enabled(value_len: usize) -> bool {
299    value_len <= DIRECT_ENCODED_SET_MAX_VALUE_LEN
300}
301
302impl Drop for ReplicatedEmbeddedStore {
303    fn drop(&mut self) {
304        self.emitters.shutdown();
305    }
306}
307
308impl ReplicatedEmbeddedEmitters {
309    fn start(
310        primary: Arc<ReplicationPrimary>,
311        shard_count: usize,
312        config: ReplicationConfig,
313    ) -> Result<Arc<Self>> {
314        let shard_count = shard_count.max(1);
315        let flush_interval = Duration::from_micros(config.batch_max_delay_us.max(1));
316        let exporter_stop = Arc::new(AtomicBool::new(false));
317        let mut shards = Vec::with_capacity(shard_count);
318        let mut exporter_joins = Vec::with_capacity(shard_count);
319        for shard_id in 0..shard_count {
320            let (tx, rx) = bounded(config.queue_capacity.max(1));
321            shards.push(Mutex::new(ReplicatedEmbeddedShardEmitter {
322                sequence: 0,
323                batch: ReplicationBatchBuilder::new_clockless(config.clone()),
324                encoded_batch: EncodedReplicationBatchBuilder::new_clockless(
325                    config.clone(),
326                    shard_id,
327                ),
328                tx,
329            }));
330            exporter_joins.push(start_embedded_exporter(
331                shard_id,
332                Arc::clone(&primary),
333                rx,
334                Arc::clone(&exporter_stop),
335            )?);
336        }
337        let emitters = Arc::new(Self {
338            primary,
339            shards,
340            flusher_stop: AtomicBool::new(false),
341            exporter_stop,
342            flusher_join: Mutex::new(None),
343            exporter_joins: Mutex::new(exporter_joins),
344            flush_interval,
345        });
346        let flusher = Arc::clone(&emitters);
347        let join = thread::Builder::new()
348            .name("fast-cache-replicated-embedded-flusher".into())
349            .spawn(move || flusher.run_flusher())
350            .map_err(|error| {
351                FastCacheError::Config(format!(
352                    "failed to start replicated embedded flusher: {error}"
353                ))
354            })?;
355        *emitters.flusher_join.lock() = Some(join);
356        Ok(emitters)
357    }
358
359    fn emit(&self, shard_id: usize, mutation: ReplicationMutation) {
360        let Some(shard) = self.shards.get(shard_id) else {
361            self.primary.emit(mutation);
362            return;
363        };
364        {
365            let mut emitter = shard.lock();
366            emitter.emit(mutation);
367        }
368    }
369
370    fn emit_borrowed_set(&self, set: BorrowedSetReplication<'_>) {
371        let Some(shard) = self.shards.get(set.shard_id) else {
372            self.primary.emit(ReplicationMutation {
373                shard_id: set.shard_id,
374                sequence: 0,
375                timestamp_ms: set.timestamp_ms,
376                op: ReplicationMutationOp::Set,
377                key_hash: set.key_hash,
378                key_tag: set.key_tag,
379                key: bytes::Bytes::copy_from_slice(set.key),
380                value: bytes::Bytes::copy_from_slice(set.value),
381                expire_at_ms: set.expire_at_ms,
382            });
383            return;
384        };
385        let mut emitter = shard.lock();
386        emitter.emit_borrowed_set(set);
387    }
388
389    fn run_flusher(&self) {
390        while !self.flusher_stop.load(Ordering::Relaxed) {
391            thread::sleep(self.flush_interval);
392            self.flush_due();
393        }
394        self.flush_all();
395    }
396
397    fn flush_due(&self) {
398        for shard in &self.shards {
399            if let Some(mut emitter) = shard.try_lock() {
400                emitter.flush_due();
401            }
402        }
403    }
404
405    fn flush_all(&self) -> Vec<u64> {
406        let mut targets = Vec::with_capacity(self.shards.len());
407        for shard in &self.shards {
408            let mut emitter = shard.lock();
409            emitter.flush();
410            targets.push(emitter.sequence);
411        }
412        targets
413    }
414
415    fn flush_all_and_wait(&self) {
416        let targets = self.flush_all();
417        let deadline = Instant::now() + Duration::from_millis(250);
418        while !replication_watermarks_reached(&self.primary, &targets) {
419            if Instant::now() >= deadline {
420                break;
421            }
422            thread::yield_now();
423        }
424    }
425
426    fn shutdown(&self) {
427        self.flusher_stop.store(true, Ordering::Relaxed);
428        if let Some(join) = self.flusher_join.lock().take()
429            && join.thread().id() != thread::current().id()
430        {
431            let _ = join.join();
432        }
433        self.flush_all_and_wait();
434        self.exporter_stop.store(true, Ordering::Relaxed);
435        for join in self.exporter_joins.lock().drain(..) {
436            if join.thread().id() != thread::current().id() {
437                let _ = join.join();
438            }
439        }
440    }
441}
442
443impl ReplicatedEmbeddedShardEmitter {
444    fn emit(&mut self, mut mutation: ReplicationMutation) {
445        self.flush_encoded();
446        self.sequence = self.sequence.saturating_add(1);
447        mutation.sequence = self.sequence;
448        if let Some(batch) = self.batch.push(mutation) {
449            self.send_owned_batch(batch);
450        }
451    }
452
453    fn emit_borrowed_set(&mut self, set: BorrowedSetReplication<'_>) {
454        self.flush_owned();
455        self.sequence = self.sequence.saturating_add(1);
456        let mutation = BorrowedReplicationMutation {
457            shard_id: self.encoded_batch.shard_id(),
458            sequence: self.sequence,
459            timestamp_ms: set.timestamp_ms,
460            op: ReplicationMutationOp::Set,
461            key_hash: set.key_hash,
462            key_tag: set.key_tag,
463            key: set.key,
464            value: set.value,
465            expire_at_ms: set.expire_at_ms,
466        };
467        if let Some(batch) = self.encoded_batch.push(mutation) {
468            self.send_encoded_batch(batch);
469        }
470    }
471
472    fn flush_due(&mut self) {
473        if let Some(batch) = self.batch.flush_due() {
474            self.send_owned_batch(batch);
475        }
476        if let Some(batch) = self.encoded_batch.flush_due() {
477            self.send_encoded_batch(batch);
478        }
479    }
480
481    fn flush(&mut self) {
482        self.flush_owned();
483        self.flush_encoded();
484    }
485
486    fn flush_owned(&mut self) {
487        if let Some(batch) = self.batch.flush() {
488            self.send_owned_batch(batch);
489        }
490    }
491
492    fn flush_encoded(&mut self) {
493        if let Some(batch) = self.encoded_batch.flush() {
494            self.send_encoded_batch(batch);
495        }
496    }
497
498    fn send_owned_batch(&self, batch: ReplicationBatch) {
499        self.send_batch(ReplicatedEmbeddedBatch::Owned(batch));
500    }
501
502    fn send_encoded_batch(&self, batch: EncodedReplicationBatch) {
503        self.send_batch(ReplicatedEmbeddedBatch::Encoded(batch));
504    }
505
506    fn send_batch(&self, batch: ReplicatedEmbeddedBatch) {
507        if self.tx.send(batch).is_err() {
508            tracing::warn!("dropping replicated embedded batch because exporter stopped");
509        }
510    }
511}
512
513fn start_embedded_exporter(
514    shard_id: usize,
515    primary: Arc<ReplicationPrimary>,
516    rx: Receiver<ReplicatedEmbeddedBatch>,
517    stop: Arc<AtomicBool>,
518) -> Result<JoinHandle<()>> {
519    thread::Builder::new()
520        .name(format!("fast-cache-replicated-embedded-export-{shard_id}"))
521        .spawn(move || run_embedded_exporter(primary, rx, stop))
522        .map_err(|error| {
523            FastCacheError::Config(format!(
524                "failed to start replicated embedded exporter {shard_id}: {error}"
525            ))
526        })
527}
528
529fn run_embedded_exporter(
530    primary: Arc<ReplicationPrimary>,
531    rx: Receiver<ReplicatedEmbeddedBatch>,
532    stop: Arc<AtomicBool>,
533) {
534    loop {
535        match rx.recv_timeout(Duration::from_millis(1)) {
536            Ok(batch) => {
537                export_embedded_batch(&primary, batch);
538                while let Ok(batch) = rx.try_recv() {
539                    export_embedded_batch(&primary, batch);
540                }
541            }
542            Err(RecvTimeoutError::Timeout) if stop.load(Ordering::Relaxed) && rx.is_empty() => {
543                break;
544            }
545            Err(RecvTimeoutError::Timeout) => {}
546            Err(RecvTimeoutError::Disconnected) => break,
547        }
548    }
549    while let Ok(batch) = rx.try_recv() {
550        export_embedded_batch(&primary, batch);
551    }
552}
553
554fn export_embedded_batch(primary: &ReplicationPrimary, batch: ReplicatedEmbeddedBatch) {
555    match batch {
556        ReplicatedEmbeddedBatch::Owned(batch) => primary.export_batch_direct(batch),
557        ReplicatedEmbeddedBatch::Encoded(batch) => primary.export_encoded_batch_direct(batch),
558    }
559}
560
561fn replication_watermarks_reached(primary: &ReplicationPrimary, targets: &[u64]) -> bool {
562    let watermarks = primary.current_watermarks();
563    targets
564        .iter()
565        .enumerate()
566        .all(|(shard_id, target)| watermarks.get(shard_id) >= *target)
567}
568
569const MAX_SNAPSHOT_CATCH_UP_ATTEMPTS: usize = 4;
570
571impl ReplicationReplica {
572    pub fn new(shard_count: usize) -> Self {
573        Self::with_route_mode(shard_count, EmbeddedRouteMode::FullKey)
574    }
575
576    pub fn with_route_mode(shard_count: usize, route_mode: EmbeddedRouteMode) -> Self {
577        Self {
578            store: EmbeddedStore::with_route_mode(shard_count, route_mode),
579            watermarks: ShardWatermarks::new(shard_count),
580            metrics: ReplicationMetrics::default(),
581        }
582    }
583
584    pub fn get(&self, key: &[u8]) -> Option<Bytes> {
585        self.store.get(key)
586    }
587
588    pub fn watermarks(&self) -> &ShardWatermarks {
589        &self.watermarks
590    }
591
592    pub fn metrics_snapshot(&self) -> ReplicationMetricsSnapshot {
593        self.metrics.snapshot()
594    }
595
596    pub fn apply_frame_bytes(&mut self, frame: &[u8]) -> Result<()> {
597        let frame = decode_frame_payload(frame)?;
598        self.apply_frame_payload(frame)
599    }
600
601    pub fn apply_frame(&mut self, frame: ReplicationFrameBytes) -> Result<()> {
602        let frame = decode_frame_payload_bytes(frame)?;
603        self.apply_frame_bytes_payload(frame)
604    }
605
606    pub fn apply_frames(&mut self, frames: &[ReplicationFrameBytes]) -> Result<()> {
607        for frame in frames {
608            self.apply_frame(frame.clone())?;
609        }
610        Ok(())
611    }
612
613    pub fn apply_decoded_frame(&mut self, frame: super::protocol::ReplicationFrame) -> Result<()> {
614        match frame.kind {
615            FrameKind::MutationBatch => {
616                self.apply_owned_mutation_batch_payload(bytes::Bytes::from(frame.payload))
617            }
618            other => Err(FastCacheError::Protocol(format!(
619                "replica cannot apply FCRP frame kind: {other:?}"
620            ))),
621        }
622    }
623
624    pub fn apply_frame_payload(&mut self, frame: ReplicationFramePayload<'_>) -> Result<()> {
625        match frame.kind {
626            FrameKind::MutationBatch => self.apply_mutation_batch_payload(frame.payload.as_ref()),
627            other => Err(FastCacheError::Protocol(format!(
628                "replica cannot apply FCRP frame kind: {other:?}"
629            ))),
630        }
631    }
632
633    pub fn apply_frame_bytes_payload(&mut self, frame: ReplicationFrameBytesPayload) -> Result<()> {
634        match frame.kind {
635            FrameKind::MutationBatch => self.apply_owned_mutation_batch_payload(frame.payload),
636            other => Err(FastCacheError::Protocol(format!(
637                "replica cannot apply FCRP frame kind: {other:?}"
638            ))),
639        }
640    }
641
642    fn apply_owned_mutation_batch_payload(&mut self, payload: bytes::Bytes) -> Result<()> {
643        match mutation_batch_record_count(payload.as_ref())? {
644            1 => self.apply_mutation_batch_payload_bytes(payload),
645            _ => self.apply_mutation_batch_payload(payload.as_ref()),
646        }
647    }
648
649    fn apply_mutation_batch_payload(&mut self, payload: &[u8]) -> Result<()> {
650        let started = Instant::now();
651        let mut now_ms = None;
652        let mut applied = 0_u64;
653        let mut skipped = 0_u64;
654        visit_mutation_batch_payload(payload, |mutation| {
655            if self.apply_borrowed_mutation_inner(mutation, &mut now_ms) {
656                applied += 1;
657            } else {
658                skipped += 1;
659            }
660            Ok(())
661        })?;
662        self.metrics.record_replica_apply_batch(
663            applied,
664            skipped,
665            started.elapsed().as_nanos() as u64,
666        );
667        Ok(())
668    }
669
670    fn apply_mutation_batch_payload_bytes(&mut self, payload: bytes::Bytes) -> Result<()> {
671        let started = Instant::now();
672        let mut now_ms = None;
673        let mut applied = 0_u64;
674        let mut skipped = 0_u64;
675        visit_mutation_batch_payload_bytes(payload, |mutation| {
676            if self.apply_frame_backed_mutation_inner(mutation, &mut now_ms) {
677                applied += 1;
678            } else {
679                skipped += 1;
680            }
681            Ok(())
682        })?;
683        self.metrics.record_replica_apply_batch(
684            applied,
685            skipped,
686            started.elapsed().as_nanos() as u64,
687        );
688        Ok(())
689    }
690
691    pub fn apply_mutation(&mut self, mutation: ReplicationMutation) {
692        let started = Instant::now();
693        let applied = self.apply_mutation_inner(mutation, &mut None);
694        self.metrics
695            .record_replica_apply(applied, started.elapsed().as_nanos() as u64);
696    }
697
698    fn apply_mutation_inner(
699        &mut self,
700        mutation: ReplicationMutation,
701        now_ms: &mut Option<u64>,
702    ) -> bool {
703        if mutation.sequence <= self.watermarks.get(mutation.shard_id) {
704            return false;
705        }
706
707        match mutation.op {
708            ReplicationMutationOp::Set => {
709                self.apply_set(
710                    mutation.key_hash,
711                    mutation.key.as_ref(),
712                    mutation.value,
713                    mutation.expire_at_ms,
714                    now_ms,
715                );
716            }
717            ReplicationMutationOp::Del => {
718                self.store.delete(&mutation.key);
719            }
720            ReplicationMutationOp::Expire => match mutation.expire_at_ms {
721                Some(expire_at_ms) => {
722                    self.store.expire(&mutation.key, expire_at_ms);
723                }
724                None => {
725                    self.store.persist(&mutation.key);
726                }
727            },
728        }
729        self.watermarks
730            .observe(mutation.shard_id, mutation.sequence);
731        true
732    }
733
734    fn apply_frame_backed_mutation_inner(
735        &mut self,
736        mutation: FrameBackedReplicationMutation<'_>,
737        now_ms: &mut Option<u64>,
738    ) -> bool {
739        if mutation.sequence <= self.watermarks.get(mutation.shard_id) {
740            return false;
741        }
742
743        match mutation.op {
744            ReplicationMutationOp::Set => {
745                self.apply_set(
746                    mutation.key_hash,
747                    mutation.key,
748                    mutation.value,
749                    mutation.expire_at_ms,
750                    now_ms,
751                );
752            }
753            ReplicationMutationOp::Del => {
754                self.store.delete(mutation.key);
755            }
756            ReplicationMutationOp::Expire => match mutation.expire_at_ms {
757                Some(expire_at_ms) => {
758                    self.store.expire(mutation.key, expire_at_ms);
759                }
760                None => {
761                    self.store.persist(mutation.key);
762                }
763            },
764        }
765        self.watermarks
766            .observe(mutation.shard_id, mutation.sequence);
767        true
768    }
769
770    fn apply_borrowed_mutation_inner(
771        &mut self,
772        mutation: BorrowedReplicationMutation<'_>,
773        now_ms: &mut Option<u64>,
774    ) -> bool {
775        if mutation.sequence <= self.watermarks.get(mutation.shard_id) {
776            return false;
777        }
778
779        match mutation.op {
780            ReplicationMutationOp::Set => {
781                self.apply_set(
782                    mutation.key_hash,
783                    mutation.key,
784                    bytes::Bytes::copy_from_slice(mutation.value),
785                    mutation.expire_at_ms,
786                    now_ms,
787                );
788            }
789            ReplicationMutationOp::Del => {
790                self.store.delete(mutation.key);
791            }
792            ReplicationMutationOp::Expire => match mutation.expire_at_ms {
793                Some(expire_at_ms) => {
794                    self.store.expire(mutation.key, expire_at_ms);
795                }
796                None => {
797                    self.store.persist(mutation.key);
798                }
799            },
800        }
801        self.watermarks
802            .observe(mutation.shard_id, mutation.sequence);
803        true
804    }
805
806    fn apply_set(
807        &mut self,
808        key_hash: u64,
809        key: &[u8],
810        value: bytes::Bytes,
811        expire_at_ms: Option<u64>,
812        now_ms: &mut Option<u64>,
813    ) {
814        let route = self.store.route_key_prehashed(key_hash, key);
815        match expire_at_ms {
816            Some(expire_at_ms) => {
817                let now_ms = *now_ms.get_or_insert_with(now_millis);
818                if expire_at_ms <= now_ms {
819                    return;
820                }
821                self.store.set_value_bytes_routed_expire_at(
822                    route,
823                    key,
824                    value,
825                    Some(expire_at_ms),
826                    now_ms,
827                );
828            }
829            None => self
830                .store
831                .set_value_bytes_routed_no_ttl_then(route, key, value, || {}),
832        }
833    }
834
835    pub fn replace_with_snapshot(&mut self, snapshot: ReplicationSnapshot) {
836        let route_mode = self.store.route_mode();
837        let shard_count = self.store.shard_count();
838        let store = EmbeddedStore::with_route_mode(shard_count, route_mode);
839        store.restore_entries(snapshot.entries);
840        self.store = store;
841        self.watermarks = snapshot.watermarks;
842    }
843
844    pub fn inner(&self) -> &EmbeddedStore {
845        &self.store
846    }
847}
848
849impl From<&ReplicationMutation> for MutationOp {
850    fn from(value: &ReplicationMutation) -> Self {
851        match value.op {
852            ReplicationMutationOp::Set => MutationOp::Set,
853            ReplicationMutationOp::Del => MutationOp::Del,
854            ReplicationMutationOp::Expire => MutationOp::Expire,
855        }
856    }
857}
858
859#[cfg(test)]
860mod tests {
861    use std::thread;
862    use std::time::Duration;
863
864    use crate::config::{ReplicationCompression, ReplicationConfig, ReplicationSendPolicy};
865
866    use super::*;
867
868    fn config(send_policy: ReplicationSendPolicy) -> ReplicationConfig {
869        ReplicationConfig {
870            enabled: true,
871            compression: ReplicationCompression::None,
872            send_policy,
873            batch_max_records: 2,
874            batch_max_delay_us: 1_000,
875            ..ReplicationConfig::default()
876        }
877    }
878
879    #[test]
880    fn embedded_replica_applies_immediate_mutation() {
881        let primary = ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Immediate))
882            .expect("primary");
883        let mut replica = ReplicationReplica::new(4);
884        let subscriber = primary.primary().subscribe(8);
885        primary.set(b"alpha".to_vec(), b"one".to_vec(), None);
886        let frame = subscriber
887            .recv_timeout(Duration::from_secs(2))
888            .expect("replication frame");
889        replica.apply_frame(frame.clone()).expect("apply");
890        assert_eq!(replica.get(b"alpha"), Some(b"one".to_vec()));
891        let stored = replica
892            .inner()
893            .get_value_bytes(b"alpha")
894            .expect("stored value");
895        assert!(bytes_points_inside(&stored, &frame));
896    }
897
898    #[test]
899    fn embedded_replica_applies_batched_mutations() {
900        let primary =
901            ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Batch)).expect("primary");
902        let mut replica = ReplicationReplica::new(4);
903        let subscriber = primary.primary().subscribe(8);
904        let (first_key, second_key) = same_source_shard_keys(&primary);
905        primary.set(first_key.clone(), b"one".to_vec(), None);
906        primary.set(second_key.clone(), b"two".to_vec(), None);
907        let frame = subscriber
908            .recv_timeout(Duration::from_secs(2))
909            .expect("replication frame");
910        replica.apply_frame(frame.clone()).expect("apply");
911        assert_eq!(replica.get(&first_key), Some(b"one".to_vec()));
912        assert_eq!(replica.get(&second_key), Some(b"two".to_vec()));
913        let stored = replica
914            .inner()
915            .get_value_bytes(&first_key)
916            .expect("stored value");
917        assert!(!bytes_points_inside(&stored, &frame));
918    }
919
920    #[test]
921    fn backlog_catch_up_replays_missing_mutations() {
922        let primary = ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Immediate))
923            .expect("primary");
924        primary.set(b"alpha".to_vec(), b"one".to_vec(), None);
925        thread::sleep(Duration::from_millis(20));
926        let mut replica = ReplicationReplica::new(4);
927        primary.catch_up_replica(&mut replica).expect("catch up");
928        assert_eq!(replica.get(b"alpha"), Some(b"one".to_vec()));
929    }
930
931    #[test]
932    fn snapshot_catch_up_restores_when_backlog_is_insufficient() {
933        let mut cfg = config(ReplicationSendPolicy::Immediate);
934        cfg.backlog_bytes = 1;
935        let primary = ReplicatedEmbeddedStore::new(4, cfg).expect("primary");
936        primary.set(b"alpha".to_vec(), b"one".to_vec(), None);
937        thread::sleep(Duration::from_millis(20));
938        let mut replica = ReplicationReplica::new(4);
939        primary.catch_up_replica(&mut replica).expect("catch up");
940        assert_eq!(replica.get(b"alpha"), Some(b"one".to_vec()));
941        assert_eq!(replica.metrics_snapshot().catch_up_snapshot_count, 1);
942    }
943
944    #[test]
945    fn snapshot_atomicity_under_concurrent_writes() {
946        let primary = ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Immediate))
947            .expect("primary");
948        let writer = Arc::new(primary);
949        let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
950        let writer_clone = Arc::clone(&writer);
951        let stop_clone = Arc::clone(&stop);
952        let handle = thread::spawn(move || {
953            let mut i = 0u64;
954            while !stop_clone.load(std::sync::atomic::Ordering::Relaxed) {
955                let key = format!("key-{i}");
956                writer_clone.set(key.into_bytes(), b"v".to_vec(), None);
957                i += 1;
958            }
959        });
960        thread::sleep(Duration::from_millis(20));
961        let snapshot = writer.snapshot();
962        let entry_count = snapshot.entries.len();
963        let max_watermark = snapshot
964            .watermarks
965            .as_slice()
966            .iter()
967            .copied()
968            .max()
969            .unwrap_or(0);
970        stop.store(true, std::sync::atomic::Ordering::Relaxed);
971        handle.join().expect("writer");
972        // Watermark must be captured no later than entry read, so it cannot
973        // exceed the count of writes that landed in the snapshot.
974        assert!(
975            max_watermark as usize <= entry_count,
976            "watermark {max_watermark} exceeds entry count {entry_count}"
977        );
978    }
979
980    #[test]
981    fn multi_shard_catch_up_via_backlog() {
982        let primary = ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Immediate))
983            .expect("primary");
984        for i in 0..16 {
985            primary.set(format!("key-{i}").into_bytes(), b"v".to_vec(), None);
986        }
987        thread::sleep(Duration::from_millis(20));
988        let mut replica = ReplicationReplica::new(4);
989        primary.catch_up_replica(&mut replica).expect("catch up");
990        for i in 0..16 {
991            assert_eq!(
992                replica.get(format!("key-{i}").as_bytes()),
993                Some(b"v".to_vec())
994            );
995        }
996    }
997
998    #[test]
999    fn replica_applies_mutation_using_its_local_route() {
1000        let primary = ReplicatedEmbeddedStore::new(2, config(ReplicationSendPolicy::Immediate))
1001            .expect("primary");
1002        let mut replica = ReplicationReplica::new(4);
1003        let key = (0..10_000)
1004            .map(|index| format!("key-{index}").into_bytes())
1005            .find(|key| {
1006                let source = primary.inner().route_key(key);
1007                let target = replica.inner().route_key(key);
1008                source.shard_id != target.shard_id
1009            })
1010            .expect("key with different source and target shard routes");
1011
1012        let subscriber = primary.primary().subscribe(8);
1013        primary.set(key.clone(), b"one".to_vec(), None);
1014        let frame = subscriber
1015            .recv_timeout(Duration::from_secs(2))
1016            .expect("replication frame");
1017        replica.apply_frame_bytes(&frame).expect("apply");
1018
1019        assert_eq!(replica.get(&key), Some(b"one".to_vec()));
1020    }
1021
1022    #[test]
1023    fn new_requires_enabled_config() {
1024        let cfg = ReplicationConfig {
1025            enabled: false,
1026            ..ReplicationConfig::default()
1027        };
1028        let err = ReplicatedEmbeddedStore::new(2, cfg).expect_err("should reject disabled config");
1029        assert!(matches!(err, FastCacheError::Config(_)));
1030    }
1031
1032    fn bytes_points_inside(value: &bytes::Bytes, owner: &bytes::Bytes) -> bool {
1033        let value_start = value.as_ptr() as usize;
1034        let value_end = value_start + value.len();
1035        let owner_start = owner.as_ptr() as usize;
1036        let owner_end = owner_start + owner.len();
1037        value_start >= owner_start && value_end <= owner_end
1038    }
1039
1040    fn same_source_shard_keys(primary: &ReplicatedEmbeddedStore) -> (Bytes, Bytes) {
1041        let first = b"batch-key-0".to_vec();
1042        let first_shard = primary.inner().route_key(&first).shard_id;
1043        let second = (1..10_000)
1044            .map(|index| format!("batch-key-{index}").into_bytes())
1045            .find(|key| primary.inner().route_key(key).shard_id == first_shard)
1046            .expect("second key on same source shard");
1047        (first, second)
1048    }
1049}