1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread::{self, JoinHandle};
4use std::time::{Duration, Instant};
5
6use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, bounded};
7use parking_lot::Mutex;
8
9use crate::config::ReplicationConfig;
10use crate::storage::{
11 Bytes, EmbeddedRouteMode, EmbeddedStore, MutationOp, hash_key_tag_from_hash, now_millis,
12 ttl_now_millis,
13};
14use crate::{FastCacheError, Result};
15
16use super::ReplicationFrameBytes;
17use super::backlog::BacklogCatchUp;
18use super::batcher::{
19 EncodedReplicationBatch, EncodedReplicationBatchBuilder, ReplicationBatch,
20 ReplicationBatchBuilder, ReplicationPrimary,
21};
22use super::metrics::{ReplicationMetrics, ReplicationMetricsSnapshot};
23use super::protocol::{
24 BorrowedReplicationMutation, FrameBackedReplicationMutation, FrameKind,
25 ReplicationFrameBytesPayload, ReplicationFramePayload, ReplicationMutation,
26 ReplicationMutationOp, ReplicationSnapshot, ShardWatermarks, decode_frame_payload,
27 decode_frame_payload_bytes, mutation_batch_record_count, visit_mutation_batch_payload,
28 visit_mutation_batch_payload_bytes,
29};
30
31const DIRECT_ENCODED_SET_MAX_VALUE_LEN: usize = 128;
32
33#[derive(Debug)]
34pub struct ReplicatedEmbeddedStore {
35 store: EmbeddedStore,
36 primary: Arc<ReplicationPrimary>,
37 emitters: Arc<ReplicatedEmbeddedEmitters>,
38}
39
40#[derive(Debug)]
41pub struct ReplicationReplica {
42 store: EmbeddedStore,
43 watermarks: ShardWatermarks,
44 metrics: ReplicationMetrics,
45}
46
47#[derive(Debug)]
48struct ReplicatedEmbeddedEmitters {
49 primary: Arc<ReplicationPrimary>,
50 shards: Vec<Mutex<ReplicatedEmbeddedShardEmitter>>,
51 flusher_stop: AtomicBool,
52 exporter_stop: Arc<AtomicBool>,
53 flusher_join: Mutex<Option<JoinHandle<()>>>,
54 exporter_joins: Mutex<Vec<JoinHandle<()>>>,
55 flush_interval: Duration,
56}
57
58#[derive(Debug)]
59struct ReplicatedEmbeddedShardEmitter {
60 sequence: u64,
61 batch: ReplicationBatchBuilder,
62 encoded_batch: EncodedReplicationBatchBuilder,
63 tx: Sender<ReplicatedEmbeddedBatch>,
64}
65
66#[derive(Debug)]
67enum ReplicatedEmbeddedBatch {
68 Owned(ReplicationBatch),
69 Encoded(EncodedReplicationBatch),
70}
71
72#[derive(Debug, Clone, Copy)]
73struct BorrowedSetReplication<'a> {
74 shard_id: usize,
75 timestamp_ms: u64,
76 key_hash: u64,
77 key_tag: u64,
78 key: &'a [u8],
79 value: &'a [u8],
80 expire_at_ms: Option<u64>,
81}
82
83impl ReplicatedEmbeddedStore {
84 pub fn new(shard_count: usize, config: ReplicationConfig) -> Result<Self> {
85 Self::with_route_mode(shard_count, EmbeddedRouteMode::FullKey, config)
86 }
87
88 pub fn with_route_mode(
89 shard_count: usize,
90 route_mode: EmbeddedRouteMode,
91 config: ReplicationConfig,
92 ) -> Result<Self> {
93 if !config.enabled {
94 return Err(FastCacheError::Config(
95 "ReplicatedEmbeddedStore requires replication.enabled = true".into(),
96 ));
97 }
98 let store = EmbeddedStore::with_route_mode(shard_count, route_mode);
99 let primary = Arc::new(ReplicationPrimary::start(shard_count, config.clone())?);
100 let emitters =
101 ReplicatedEmbeddedEmitters::start(Arc::clone(&primary), shard_count, config)?;
102 Ok(Self {
103 store,
104 primary,
105 emitters,
106 })
107 }
108
109 pub fn get(&self, key: &[u8]) -> Option<Bytes> {
110 self.store.get(key)
111 }
112
113 pub fn set(&self, key: Bytes, value: Bytes, ttl_ms: Option<u64>) {
114 let route = self.store.route_key(&key);
115 let key = bytes::Bytes::from(key);
116 let value = bytes::Bytes::from(value);
117 let direct_encode = direct_encoded_set_enabled(value.len());
118 match ttl_ms {
119 Some(ttl_ms) => {
120 let now_ms = now_millis();
121 let expire_at_ms = Some(now_ms.saturating_add(ttl_ms));
122 self.store.set_value_bytes_routed_expire_at_then(
123 route,
124 key.as_ref(),
125 value.clone(),
126 expire_at_ms,
127 now_ms,
128 || match direct_encode {
129 true => self.emitters.emit_borrowed_set(BorrowedSetReplication {
130 shard_id: route.shard_id,
131 timestamp_ms: now_ms,
132 key_hash: route.key_hash,
133 key_tag: hash_key_tag_from_hash(route.key_hash),
134 key: key.as_ref(),
135 value: value.as_ref(),
136 expire_at_ms,
137 }),
138 false => self.emitters.emit(
139 route.shard_id,
140 ReplicationMutation {
141 shard_id: route.shard_id,
142 sequence: 0,
143 timestamp_ms: now_ms,
144 op: ReplicationMutationOp::Set,
145 key_hash: route.key_hash,
146 key_tag: hash_key_tag_from_hash(route.key_hash),
147 key: key.clone(),
148 value: value.clone(),
149 expire_at_ms,
150 },
151 ),
152 },
153 );
154 }
155 None => {
156 let timestamp_ms = ttl_now_millis();
157 self.store.set_value_bytes_routed_no_ttl_then(
158 route,
159 key.as_ref(),
160 value.clone(),
161 || match direct_encode {
162 true => self.emitters.emit_borrowed_set(BorrowedSetReplication {
163 shard_id: route.shard_id,
164 timestamp_ms,
165 key_hash: route.key_hash,
166 key_tag: hash_key_tag_from_hash(route.key_hash),
167 key: key.as_ref(),
168 value: value.as_ref(),
169 expire_at_ms: None,
170 }),
171 false => self.emitters.emit(
172 route.shard_id,
173 ReplicationMutation {
174 shard_id: route.shard_id,
175 sequence: 0,
176 timestamp_ms,
177 op: ReplicationMutationOp::Set,
178 key_hash: route.key_hash,
179 key_tag: hash_key_tag_from_hash(route.key_hash),
180 key: key.clone(),
181 value: value.clone(),
182 expire_at_ms: None,
183 },
184 ),
185 },
186 );
187 }
188 }
189 }
190
191 pub fn delete(&self, key: &[u8]) -> bool {
192 let route = self.store.route_key(key);
193 let now_ms = now_millis();
194 self.store.delete_routed_then(route, key, now_ms, || {
195 self.emitters.emit(
196 route.shard_id,
197 ReplicationMutation {
198 shard_id: route.shard_id,
199 sequence: 0,
200 timestamp_ms: now_ms,
201 op: ReplicationMutationOp::Del,
202 key_hash: route.key_hash,
203 key_tag: hash_key_tag_from_hash(route.key_hash),
204 key: bytes::Bytes::copy_from_slice(key),
205 value: bytes::Bytes::new(),
206 expire_at_ms: None,
207 },
208 );
209 })
210 }
211
212 pub fn expire(&self, key: &[u8], expire_at_ms: u64) -> bool {
213 let route = self.store.route_key(key);
214 let now_ms = now_millis();
215 self.store
216 .expire_routed_then(route, key, expire_at_ms, now_ms, || {
217 self.emitters.emit(
218 route.shard_id,
219 ReplicationMutation {
220 shard_id: route.shard_id,
221 sequence: 0,
222 timestamp_ms: now_ms,
223 op: ReplicationMutationOp::Expire,
224 key_hash: route.key_hash,
225 key_tag: hash_key_tag_from_hash(route.key_hash),
226 key: bytes::Bytes::copy_from_slice(key),
227 value: bytes::Bytes::new(),
228 expire_at_ms: Some(expire_at_ms),
229 },
230 );
231 })
232 }
233
234 pub fn snapshot(&self) -> ReplicationSnapshot {
241 self.emitters.flush_all_and_wait();
242 let watermarks = self.primary.current_watermarks();
243 ReplicationSnapshot {
244 entries: self.store.entry_snapshot(),
245 watermarks,
246 }
247 }
248
249 pub fn catch_up_replica(&self, replica: &mut ReplicationReplica) -> Result<()> {
250 self.emitters.flush_all_and_wait();
251 match self.primary.catch_up_since(&replica.watermarks)? {
253 BacklogCatchUp::Available(frames) => {
254 replica.apply_frames(&frames)?;
255 replica.metrics.record_backlog_catch_up();
256 return Ok(());
257 }
258 BacklogCatchUp::NeedsSnapshot => {}
259 }
260
261 let mut attempts = 0;
264 loop {
265 attempts += 1;
266 replica.replace_with_snapshot(self.snapshot());
267 let watermarks = replica.watermarks.clone();
268 match self.primary.catch_up_since(&watermarks)? {
269 BacklogCatchUp::Available(frames) => {
270 replica.apply_frames(&frames)?;
271 replica.metrics.record_snapshot_catch_up();
272 return Ok(());
273 }
274 BacklogCatchUp::NeedsSnapshot if attempts >= MAX_SNAPSHOT_CATCH_UP_ATTEMPTS => {
275 return Err(FastCacheError::Protocol(format!(
276 "replication backlog could not catch up after {attempts} snapshot attempts"
277 )));
278 }
279 BacklogCatchUp::NeedsSnapshot => {}
280 }
281 }
282 }
283
284 pub fn primary(&self) -> Arc<ReplicationPrimary> {
285 Arc::clone(&self.primary)
286 }
287
288 pub fn metrics_snapshot(&self) -> ReplicationMetricsSnapshot {
289 self.emitters.flush_all_and_wait();
290 self.primary.metrics_snapshot()
291 }
292
293 pub fn inner(&self) -> &EmbeddedStore {
294 &self.store
295 }
296}
297
298fn direct_encoded_set_enabled(value_len: usize) -> bool {
299 value_len <= DIRECT_ENCODED_SET_MAX_VALUE_LEN
300}
301
302impl Drop for ReplicatedEmbeddedStore {
303 fn drop(&mut self) {
304 self.emitters.shutdown();
305 }
306}
307
308impl ReplicatedEmbeddedEmitters {
309 fn start(
310 primary: Arc<ReplicationPrimary>,
311 shard_count: usize,
312 config: ReplicationConfig,
313 ) -> Result<Arc<Self>> {
314 let shard_count = shard_count.max(1);
315 let flush_interval = Duration::from_micros(config.batch_max_delay_us.max(1));
316 let exporter_stop = Arc::new(AtomicBool::new(false));
317 let mut shards = Vec::with_capacity(shard_count);
318 let mut exporter_joins = Vec::with_capacity(shard_count);
319 for shard_id in 0..shard_count {
320 let (tx, rx) = bounded(config.queue_capacity.max(1));
321 shards.push(Mutex::new(ReplicatedEmbeddedShardEmitter {
322 sequence: 0,
323 batch: ReplicationBatchBuilder::new_clockless(config.clone()),
324 encoded_batch: EncodedReplicationBatchBuilder::new_clockless(
325 config.clone(),
326 shard_id,
327 ),
328 tx,
329 }));
330 exporter_joins.push(start_embedded_exporter(
331 shard_id,
332 Arc::clone(&primary),
333 rx,
334 Arc::clone(&exporter_stop),
335 )?);
336 }
337 let emitters = Arc::new(Self {
338 primary,
339 shards,
340 flusher_stop: AtomicBool::new(false),
341 exporter_stop,
342 flusher_join: Mutex::new(None),
343 exporter_joins: Mutex::new(exporter_joins),
344 flush_interval,
345 });
346 let flusher = Arc::clone(&emitters);
347 let join = thread::Builder::new()
348 .name("fast-cache-replicated-embedded-flusher".into())
349 .spawn(move || flusher.run_flusher())
350 .map_err(|error| {
351 FastCacheError::Config(format!(
352 "failed to start replicated embedded flusher: {error}"
353 ))
354 })?;
355 *emitters.flusher_join.lock() = Some(join);
356 Ok(emitters)
357 }
358
359 fn emit(&self, shard_id: usize, mutation: ReplicationMutation) {
360 let Some(shard) = self.shards.get(shard_id) else {
361 self.primary.emit(mutation);
362 return;
363 };
364 {
365 let mut emitter = shard.lock();
366 emitter.emit(mutation);
367 }
368 }
369
370 fn emit_borrowed_set(&self, set: BorrowedSetReplication<'_>) {
371 let Some(shard) = self.shards.get(set.shard_id) else {
372 self.primary.emit(ReplicationMutation {
373 shard_id: set.shard_id,
374 sequence: 0,
375 timestamp_ms: set.timestamp_ms,
376 op: ReplicationMutationOp::Set,
377 key_hash: set.key_hash,
378 key_tag: set.key_tag,
379 key: bytes::Bytes::copy_from_slice(set.key),
380 value: bytes::Bytes::copy_from_slice(set.value),
381 expire_at_ms: set.expire_at_ms,
382 });
383 return;
384 };
385 let mut emitter = shard.lock();
386 emitter.emit_borrowed_set(set);
387 }
388
389 fn run_flusher(&self) {
390 while !self.flusher_stop.load(Ordering::Relaxed) {
391 thread::sleep(self.flush_interval);
392 self.flush_due();
393 }
394 self.flush_all();
395 }
396
397 fn flush_due(&self) {
398 for shard in &self.shards {
399 if let Some(mut emitter) = shard.try_lock() {
400 emitter.flush_due();
401 }
402 }
403 }
404
405 fn flush_all(&self) -> Vec<u64> {
406 let mut targets = Vec::with_capacity(self.shards.len());
407 for shard in &self.shards {
408 let mut emitter = shard.lock();
409 emitter.flush();
410 targets.push(emitter.sequence);
411 }
412 targets
413 }
414
415 fn flush_all_and_wait(&self) {
416 let targets = self.flush_all();
417 let deadline = Instant::now() + Duration::from_millis(250);
418 while !replication_watermarks_reached(&self.primary, &targets) {
419 if Instant::now() >= deadline {
420 break;
421 }
422 thread::yield_now();
423 }
424 }
425
426 fn shutdown(&self) {
427 self.flusher_stop.store(true, Ordering::Relaxed);
428 if let Some(join) = self.flusher_join.lock().take()
429 && join.thread().id() != thread::current().id()
430 {
431 let _ = join.join();
432 }
433 self.flush_all_and_wait();
434 self.exporter_stop.store(true, Ordering::Relaxed);
435 for join in self.exporter_joins.lock().drain(..) {
436 if join.thread().id() != thread::current().id() {
437 let _ = join.join();
438 }
439 }
440 }
441}
442
443impl ReplicatedEmbeddedShardEmitter {
444 fn emit(&mut self, mut mutation: ReplicationMutation) {
445 self.flush_encoded();
446 self.sequence = self.sequence.saturating_add(1);
447 mutation.sequence = self.sequence;
448 if let Some(batch) = self.batch.push(mutation) {
449 self.send_owned_batch(batch);
450 }
451 }
452
453 fn emit_borrowed_set(&mut self, set: BorrowedSetReplication<'_>) {
454 self.flush_owned();
455 self.sequence = self.sequence.saturating_add(1);
456 let mutation = BorrowedReplicationMutation {
457 shard_id: self.encoded_batch.shard_id(),
458 sequence: self.sequence,
459 timestamp_ms: set.timestamp_ms,
460 op: ReplicationMutationOp::Set,
461 key_hash: set.key_hash,
462 key_tag: set.key_tag,
463 key: set.key,
464 value: set.value,
465 expire_at_ms: set.expire_at_ms,
466 };
467 if let Some(batch) = self.encoded_batch.push(mutation) {
468 self.send_encoded_batch(batch);
469 }
470 }
471
472 fn flush_due(&mut self) {
473 if let Some(batch) = self.batch.flush_due() {
474 self.send_owned_batch(batch);
475 }
476 if let Some(batch) = self.encoded_batch.flush_due() {
477 self.send_encoded_batch(batch);
478 }
479 }
480
481 fn flush(&mut self) {
482 self.flush_owned();
483 self.flush_encoded();
484 }
485
486 fn flush_owned(&mut self) {
487 if let Some(batch) = self.batch.flush() {
488 self.send_owned_batch(batch);
489 }
490 }
491
492 fn flush_encoded(&mut self) {
493 if let Some(batch) = self.encoded_batch.flush() {
494 self.send_encoded_batch(batch);
495 }
496 }
497
498 fn send_owned_batch(&self, batch: ReplicationBatch) {
499 self.send_batch(ReplicatedEmbeddedBatch::Owned(batch));
500 }
501
502 fn send_encoded_batch(&self, batch: EncodedReplicationBatch) {
503 self.send_batch(ReplicatedEmbeddedBatch::Encoded(batch));
504 }
505
506 fn send_batch(&self, batch: ReplicatedEmbeddedBatch) {
507 if self.tx.send(batch).is_err() {
508 tracing::warn!("dropping replicated embedded batch because exporter stopped");
509 }
510 }
511}
512
513fn start_embedded_exporter(
514 shard_id: usize,
515 primary: Arc<ReplicationPrimary>,
516 rx: Receiver<ReplicatedEmbeddedBatch>,
517 stop: Arc<AtomicBool>,
518) -> Result<JoinHandle<()>> {
519 thread::Builder::new()
520 .name(format!("fast-cache-replicated-embedded-export-{shard_id}"))
521 .spawn(move || run_embedded_exporter(primary, rx, stop))
522 .map_err(|error| {
523 FastCacheError::Config(format!(
524 "failed to start replicated embedded exporter {shard_id}: {error}"
525 ))
526 })
527}
528
529fn run_embedded_exporter(
530 primary: Arc<ReplicationPrimary>,
531 rx: Receiver<ReplicatedEmbeddedBatch>,
532 stop: Arc<AtomicBool>,
533) {
534 loop {
535 match rx.recv_timeout(Duration::from_millis(1)) {
536 Ok(batch) => {
537 export_embedded_batch(&primary, batch);
538 while let Ok(batch) = rx.try_recv() {
539 export_embedded_batch(&primary, batch);
540 }
541 }
542 Err(RecvTimeoutError::Timeout) if stop.load(Ordering::Relaxed) && rx.is_empty() => {
543 break;
544 }
545 Err(RecvTimeoutError::Timeout) => {}
546 Err(RecvTimeoutError::Disconnected) => break,
547 }
548 }
549 while let Ok(batch) = rx.try_recv() {
550 export_embedded_batch(&primary, batch);
551 }
552}
553
554fn export_embedded_batch(primary: &ReplicationPrimary, batch: ReplicatedEmbeddedBatch) {
555 match batch {
556 ReplicatedEmbeddedBatch::Owned(batch) => primary.export_batch_direct(batch),
557 ReplicatedEmbeddedBatch::Encoded(batch) => primary.export_encoded_batch_direct(batch),
558 }
559}
560
561fn replication_watermarks_reached(primary: &ReplicationPrimary, targets: &[u64]) -> bool {
562 let watermarks = primary.current_watermarks();
563 targets
564 .iter()
565 .enumerate()
566 .all(|(shard_id, target)| watermarks.get(shard_id) >= *target)
567}
568
569const MAX_SNAPSHOT_CATCH_UP_ATTEMPTS: usize = 4;
570
571impl ReplicationReplica {
572 pub fn new(shard_count: usize) -> Self {
573 Self::with_route_mode(shard_count, EmbeddedRouteMode::FullKey)
574 }
575
576 pub fn with_route_mode(shard_count: usize, route_mode: EmbeddedRouteMode) -> Self {
577 Self {
578 store: EmbeddedStore::with_route_mode(shard_count, route_mode),
579 watermarks: ShardWatermarks::new(shard_count),
580 metrics: ReplicationMetrics::default(),
581 }
582 }
583
584 pub fn get(&self, key: &[u8]) -> Option<Bytes> {
585 self.store.get(key)
586 }
587
588 pub fn watermarks(&self) -> &ShardWatermarks {
589 &self.watermarks
590 }
591
592 pub fn metrics_snapshot(&self) -> ReplicationMetricsSnapshot {
593 self.metrics.snapshot()
594 }
595
596 pub fn apply_frame_bytes(&mut self, frame: &[u8]) -> Result<()> {
597 let frame = decode_frame_payload(frame)?;
598 self.apply_frame_payload(frame)
599 }
600
601 pub fn apply_frame(&mut self, frame: ReplicationFrameBytes) -> Result<()> {
602 let frame = decode_frame_payload_bytes(frame)?;
603 self.apply_frame_bytes_payload(frame)
604 }
605
606 pub fn apply_frames(&mut self, frames: &[ReplicationFrameBytes]) -> Result<()> {
607 for frame in frames {
608 self.apply_frame(frame.clone())?;
609 }
610 Ok(())
611 }
612
613 pub fn apply_decoded_frame(&mut self, frame: super::protocol::ReplicationFrame) -> Result<()> {
614 match frame.kind {
615 FrameKind::MutationBatch => {
616 self.apply_owned_mutation_batch_payload(bytes::Bytes::from(frame.payload))
617 }
618 other => Err(FastCacheError::Protocol(format!(
619 "replica cannot apply FCRP frame kind: {other:?}"
620 ))),
621 }
622 }
623
624 pub fn apply_frame_payload(&mut self, frame: ReplicationFramePayload<'_>) -> Result<()> {
625 match frame.kind {
626 FrameKind::MutationBatch => self.apply_mutation_batch_payload(frame.payload.as_ref()),
627 other => Err(FastCacheError::Protocol(format!(
628 "replica cannot apply FCRP frame kind: {other:?}"
629 ))),
630 }
631 }
632
633 pub fn apply_frame_bytes_payload(&mut self, frame: ReplicationFrameBytesPayload) -> Result<()> {
634 match frame.kind {
635 FrameKind::MutationBatch => self.apply_owned_mutation_batch_payload(frame.payload),
636 other => Err(FastCacheError::Protocol(format!(
637 "replica cannot apply FCRP frame kind: {other:?}"
638 ))),
639 }
640 }
641
642 fn apply_owned_mutation_batch_payload(&mut self, payload: bytes::Bytes) -> Result<()> {
643 match mutation_batch_record_count(payload.as_ref())? {
644 1 => self.apply_mutation_batch_payload_bytes(payload),
645 _ => self.apply_mutation_batch_payload(payload.as_ref()),
646 }
647 }
648
649 fn apply_mutation_batch_payload(&mut self, payload: &[u8]) -> Result<()> {
650 let started = Instant::now();
651 let mut now_ms = None;
652 let mut applied = 0_u64;
653 let mut skipped = 0_u64;
654 visit_mutation_batch_payload(payload, |mutation| {
655 if self.apply_borrowed_mutation_inner(mutation, &mut now_ms) {
656 applied += 1;
657 } else {
658 skipped += 1;
659 }
660 Ok(())
661 })?;
662 self.metrics.record_replica_apply_batch(
663 applied,
664 skipped,
665 started.elapsed().as_nanos() as u64,
666 );
667 Ok(())
668 }
669
670 fn apply_mutation_batch_payload_bytes(&mut self, payload: bytes::Bytes) -> Result<()> {
671 let started = Instant::now();
672 let mut now_ms = None;
673 let mut applied = 0_u64;
674 let mut skipped = 0_u64;
675 visit_mutation_batch_payload_bytes(payload, |mutation| {
676 if self.apply_frame_backed_mutation_inner(mutation, &mut now_ms) {
677 applied += 1;
678 } else {
679 skipped += 1;
680 }
681 Ok(())
682 })?;
683 self.metrics.record_replica_apply_batch(
684 applied,
685 skipped,
686 started.elapsed().as_nanos() as u64,
687 );
688 Ok(())
689 }
690
691 pub fn apply_mutation(&mut self, mutation: ReplicationMutation) {
692 let started = Instant::now();
693 let applied = self.apply_mutation_inner(mutation, &mut None);
694 self.metrics
695 .record_replica_apply(applied, started.elapsed().as_nanos() as u64);
696 }
697
698 fn apply_mutation_inner(
699 &mut self,
700 mutation: ReplicationMutation,
701 now_ms: &mut Option<u64>,
702 ) -> bool {
703 if mutation.sequence <= self.watermarks.get(mutation.shard_id) {
704 return false;
705 }
706
707 match mutation.op {
708 ReplicationMutationOp::Set => {
709 self.apply_set(
710 mutation.key_hash,
711 mutation.key.as_ref(),
712 mutation.value,
713 mutation.expire_at_ms,
714 now_ms,
715 );
716 }
717 ReplicationMutationOp::Del => {
718 self.store.delete(&mutation.key);
719 }
720 ReplicationMutationOp::Expire => match mutation.expire_at_ms {
721 Some(expire_at_ms) => {
722 self.store.expire(&mutation.key, expire_at_ms);
723 }
724 None => {
725 self.store.persist(&mutation.key);
726 }
727 },
728 }
729 self.watermarks
730 .observe(mutation.shard_id, mutation.sequence);
731 true
732 }
733
734 fn apply_frame_backed_mutation_inner(
735 &mut self,
736 mutation: FrameBackedReplicationMutation<'_>,
737 now_ms: &mut Option<u64>,
738 ) -> bool {
739 if mutation.sequence <= self.watermarks.get(mutation.shard_id) {
740 return false;
741 }
742
743 match mutation.op {
744 ReplicationMutationOp::Set => {
745 self.apply_set(
746 mutation.key_hash,
747 mutation.key,
748 mutation.value,
749 mutation.expire_at_ms,
750 now_ms,
751 );
752 }
753 ReplicationMutationOp::Del => {
754 self.store.delete(mutation.key);
755 }
756 ReplicationMutationOp::Expire => match mutation.expire_at_ms {
757 Some(expire_at_ms) => {
758 self.store.expire(mutation.key, expire_at_ms);
759 }
760 None => {
761 self.store.persist(mutation.key);
762 }
763 },
764 }
765 self.watermarks
766 .observe(mutation.shard_id, mutation.sequence);
767 true
768 }
769
770 fn apply_borrowed_mutation_inner(
771 &mut self,
772 mutation: BorrowedReplicationMutation<'_>,
773 now_ms: &mut Option<u64>,
774 ) -> bool {
775 if mutation.sequence <= self.watermarks.get(mutation.shard_id) {
776 return false;
777 }
778
779 match mutation.op {
780 ReplicationMutationOp::Set => {
781 self.apply_set(
782 mutation.key_hash,
783 mutation.key,
784 bytes::Bytes::copy_from_slice(mutation.value),
785 mutation.expire_at_ms,
786 now_ms,
787 );
788 }
789 ReplicationMutationOp::Del => {
790 self.store.delete(mutation.key);
791 }
792 ReplicationMutationOp::Expire => match mutation.expire_at_ms {
793 Some(expire_at_ms) => {
794 self.store.expire(mutation.key, expire_at_ms);
795 }
796 None => {
797 self.store.persist(mutation.key);
798 }
799 },
800 }
801 self.watermarks
802 .observe(mutation.shard_id, mutation.sequence);
803 true
804 }
805
806 fn apply_set(
807 &mut self,
808 key_hash: u64,
809 key: &[u8],
810 value: bytes::Bytes,
811 expire_at_ms: Option<u64>,
812 now_ms: &mut Option<u64>,
813 ) {
814 let route = self.store.route_key_prehashed(key_hash, key);
815 match expire_at_ms {
816 Some(expire_at_ms) => {
817 let now_ms = *now_ms.get_or_insert_with(now_millis);
818 if expire_at_ms <= now_ms {
819 return;
820 }
821 self.store.set_value_bytes_routed_expire_at(
822 route,
823 key,
824 value,
825 Some(expire_at_ms),
826 now_ms,
827 );
828 }
829 None => self
830 .store
831 .set_value_bytes_routed_no_ttl_then(route, key, value, || {}),
832 }
833 }
834
835 pub fn replace_with_snapshot(&mut self, snapshot: ReplicationSnapshot) {
836 let route_mode = self.store.route_mode();
837 let shard_count = self.store.shard_count();
838 let store = EmbeddedStore::with_route_mode(shard_count, route_mode);
839 store.restore_entries(snapshot.entries);
840 self.store = store;
841 self.watermarks = snapshot.watermarks;
842 }
843
844 pub fn inner(&self) -> &EmbeddedStore {
845 &self.store
846 }
847}
848
849impl From<&ReplicationMutation> for MutationOp {
850 fn from(value: &ReplicationMutation) -> Self {
851 match value.op {
852 ReplicationMutationOp::Set => MutationOp::Set,
853 ReplicationMutationOp::Del => MutationOp::Del,
854 ReplicationMutationOp::Expire => MutationOp::Expire,
855 }
856 }
857}
858
859#[cfg(test)]
860mod tests {
861 use std::thread;
862 use std::time::Duration;
863
864 use crate::config::{ReplicationCompression, ReplicationConfig, ReplicationSendPolicy};
865
866 use super::*;
867
868 fn config(send_policy: ReplicationSendPolicy) -> ReplicationConfig {
869 ReplicationConfig {
870 enabled: true,
871 compression: ReplicationCompression::None,
872 send_policy,
873 batch_max_records: 2,
874 batch_max_delay_us: 1_000,
875 ..ReplicationConfig::default()
876 }
877 }
878
879 #[test]
880 fn embedded_replica_applies_immediate_mutation() {
881 let primary = ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Immediate))
882 .expect("primary");
883 let mut replica = ReplicationReplica::new(4);
884 let subscriber = primary.primary().subscribe(8);
885 primary.set(b"alpha".to_vec(), b"one".to_vec(), None);
886 let frame = subscriber
887 .recv_timeout(Duration::from_secs(2))
888 .expect("replication frame");
889 replica.apply_frame(frame.clone()).expect("apply");
890 assert_eq!(replica.get(b"alpha"), Some(b"one".to_vec()));
891 let stored = replica
892 .inner()
893 .get_value_bytes(b"alpha")
894 .expect("stored value");
895 assert!(bytes_points_inside(&stored, &frame));
896 }
897
898 #[test]
899 fn embedded_replica_applies_batched_mutations() {
900 let primary =
901 ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Batch)).expect("primary");
902 let mut replica = ReplicationReplica::new(4);
903 let subscriber = primary.primary().subscribe(8);
904 let (first_key, second_key) = same_source_shard_keys(&primary);
905 primary.set(first_key.clone(), b"one".to_vec(), None);
906 primary.set(second_key.clone(), b"two".to_vec(), None);
907 let frame = subscriber
908 .recv_timeout(Duration::from_secs(2))
909 .expect("replication frame");
910 replica.apply_frame(frame.clone()).expect("apply");
911 assert_eq!(replica.get(&first_key), Some(b"one".to_vec()));
912 assert_eq!(replica.get(&second_key), Some(b"two".to_vec()));
913 let stored = replica
914 .inner()
915 .get_value_bytes(&first_key)
916 .expect("stored value");
917 assert!(!bytes_points_inside(&stored, &frame));
918 }
919
920 #[test]
921 fn backlog_catch_up_replays_missing_mutations() {
922 let primary = ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Immediate))
923 .expect("primary");
924 primary.set(b"alpha".to_vec(), b"one".to_vec(), None);
925 thread::sleep(Duration::from_millis(20));
926 let mut replica = ReplicationReplica::new(4);
927 primary.catch_up_replica(&mut replica).expect("catch up");
928 assert_eq!(replica.get(b"alpha"), Some(b"one".to_vec()));
929 }
930
931 #[test]
932 fn snapshot_catch_up_restores_when_backlog_is_insufficient() {
933 let mut cfg = config(ReplicationSendPolicy::Immediate);
934 cfg.backlog_bytes = 1;
935 let primary = ReplicatedEmbeddedStore::new(4, cfg).expect("primary");
936 primary.set(b"alpha".to_vec(), b"one".to_vec(), None);
937 thread::sleep(Duration::from_millis(20));
938 let mut replica = ReplicationReplica::new(4);
939 primary.catch_up_replica(&mut replica).expect("catch up");
940 assert_eq!(replica.get(b"alpha"), Some(b"one".to_vec()));
941 assert_eq!(replica.metrics_snapshot().catch_up_snapshot_count, 1);
942 }
943
944 #[test]
945 fn snapshot_atomicity_under_concurrent_writes() {
946 let primary = ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Immediate))
947 .expect("primary");
948 let writer = Arc::new(primary);
949 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
950 let writer_clone = Arc::clone(&writer);
951 let stop_clone = Arc::clone(&stop);
952 let handle = thread::spawn(move || {
953 let mut i = 0u64;
954 while !stop_clone.load(std::sync::atomic::Ordering::Relaxed) {
955 let key = format!("key-{i}");
956 writer_clone.set(key.into_bytes(), b"v".to_vec(), None);
957 i += 1;
958 }
959 });
960 thread::sleep(Duration::from_millis(20));
961 let snapshot = writer.snapshot();
962 let entry_count = snapshot.entries.len();
963 let max_watermark = snapshot
964 .watermarks
965 .as_slice()
966 .iter()
967 .copied()
968 .max()
969 .unwrap_or(0);
970 stop.store(true, std::sync::atomic::Ordering::Relaxed);
971 handle.join().expect("writer");
972 assert!(
975 max_watermark as usize <= entry_count,
976 "watermark {max_watermark} exceeds entry count {entry_count}"
977 );
978 }
979
980 #[test]
981 fn multi_shard_catch_up_via_backlog() {
982 let primary = ReplicatedEmbeddedStore::new(4, config(ReplicationSendPolicy::Immediate))
983 .expect("primary");
984 for i in 0..16 {
985 primary.set(format!("key-{i}").into_bytes(), b"v".to_vec(), None);
986 }
987 thread::sleep(Duration::from_millis(20));
988 let mut replica = ReplicationReplica::new(4);
989 primary.catch_up_replica(&mut replica).expect("catch up");
990 for i in 0..16 {
991 assert_eq!(
992 replica.get(format!("key-{i}").as_bytes()),
993 Some(b"v".to_vec())
994 );
995 }
996 }
997
998 #[test]
999 fn replica_applies_mutation_using_its_local_route() {
1000 let primary = ReplicatedEmbeddedStore::new(2, config(ReplicationSendPolicy::Immediate))
1001 .expect("primary");
1002 let mut replica = ReplicationReplica::new(4);
1003 let key = (0..10_000)
1004 .map(|index| format!("key-{index}").into_bytes())
1005 .find(|key| {
1006 let source = primary.inner().route_key(key);
1007 let target = replica.inner().route_key(key);
1008 source.shard_id != target.shard_id
1009 })
1010 .expect("key with different source and target shard routes");
1011
1012 let subscriber = primary.primary().subscribe(8);
1013 primary.set(key.clone(), b"one".to_vec(), None);
1014 let frame = subscriber
1015 .recv_timeout(Duration::from_secs(2))
1016 .expect("replication frame");
1017 replica.apply_frame_bytes(&frame).expect("apply");
1018
1019 assert_eq!(replica.get(&key), Some(b"one".to_vec()));
1020 }
1021
1022 #[test]
1023 fn new_requires_enabled_config() {
1024 let cfg = ReplicationConfig {
1025 enabled: false,
1026 ..ReplicationConfig::default()
1027 };
1028 let err = ReplicatedEmbeddedStore::new(2, cfg).expect_err("should reject disabled config");
1029 assert!(matches!(err, FastCacheError::Config(_)));
1030 }
1031
1032 fn bytes_points_inside(value: &bytes::Bytes, owner: &bytes::Bytes) -> bool {
1033 let value_start = value.as_ptr() as usize;
1034 let value_end = value_start + value.len();
1035 let owner_start = owner.as_ptr() as usize;
1036 let owner_end = owner_start + owner.len();
1037 value_start >= owner_start && value_end <= owner_end
1038 }
1039
1040 fn same_source_shard_keys(primary: &ReplicatedEmbeddedStore) -> (Bytes, Bytes) {
1041 let first = b"batch-key-0".to_vec();
1042 let first_shard = primary.inner().route_key(&first).shard_id;
1043 let second = (1..10_000)
1044 .map(|index| format!("batch-key-{index}").into_bytes())
1045 .find(|key| primary.inner().route_key(key).shard_id == first_shard)
1046 .expect("second key on same source shard");
1047 (first, second)
1048 }
1049}