1use std::collections::HashMap;
2use std::future::Future;
3use std::ops::Range;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::thread::{self, JoinHandle};
7use std::time::{Duration, Instant};
8
9use bytes::Bytes as SharedBytes;
10use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, unbounded};
11use parking_lot::Mutex;
12use tokio::sync::oneshot;
13
14use crate::commands::EngineCommandCatalog;
15use crate::config::FastCacheConfig;
16use crate::persistence::{PersistenceRuntime, load_recovery_state};
17use crate::protocol::{CommandSpanFrame, FastRequest, FastResponse, Frame, RespCodec};
18use crate::replication::{ReplicationBatchBuilder, ReplicationMutation, ReplicationPrimary};
19use crate::storage::command::{BorrowedCommand, Command};
20use crate::storage::stats::{GlobalStatsSnapshot, ShardStatsSnapshot};
21use crate::storage::{
22 Bytes, FlatMap, MutationOp, MutationRecord, StoredEntry, hash_key, now_millis, shift_for,
23 stripe_index,
24};
25#[cfg(feature = "telemetry")]
26use crate::storage::{CacheTelemetry, CacheTelemetryHandle};
27use crate::{FastCacheError, Result};
28
29#[derive(Clone)]
30pub struct EngineHandle {
31 inner: Arc<EngineInner>,
32}
33
34struct EngineInner {
35 config: FastCacheConfig,
36 started_at: Instant,
37 shift: u32,
38 shard_senders: Vec<Sender<ShardMessage>>,
39 shard_threads: Mutex<Vec<JoinHandle<()>>>,
40 persistence: PersistenceRuntime,
41 replication: Option<Arc<ReplicationPrimary>>,
42 #[cfg(feature = "telemetry")]
43 metrics: Option<Arc<CacheTelemetry>>,
44}
45
46const MULTIKEY_INLINE_KEY_MAX: usize = 32;
47const MULTIKEY_INLINE_VALUE_MAX: usize = 64;
48pub(crate) const RESP_SPANNED_VALUE_MIN: usize = 2 * 1024;
49
50type InlineKey = smallvec::SmallVec<[u8; MULTIKEY_INLINE_KEY_MAX]>;
51type InlineValue = smallvec::SmallVec<[u8; MULTIKEY_INLINE_VALUE_MAX]>;
52pub(crate) type EngineFrameFuture<'a> = Pin<Box<dyn Future<Output = Result<Frame>> + Send + 'a>>;
53pub(crate) type EngineFastFuture<'a> =
54 Pin<Box<dyn Future<Output = Result<FastResponse>> + Send + 'a>>;
55pub(crate) type EngineRespSpanFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
56
57#[derive(Clone, Copy)]
58pub(crate) struct EngineCommandContext<'engine> {
59 engine: &'engine EngineHandle,
60}
61
62impl<'engine> EngineCommandContext<'engine> {
63 #[inline(always)]
64 pub(crate) fn new(engine: &'engine EngineHandle) -> Self {
65 Self { engine }
66 }
67
68 pub(crate) fn route_key(self, key: &[u8]) -> usize {
69 self.engine.route(key)
70 }
71
72 pub(crate) fn route_key_hash(self, key_hash: u64) -> usize {
73 self.engine.route_hash_to_shard(key_hash)
74 }
75
76 pub(crate) async fn request(self, shard_id: usize, op: ShardOperation) -> Result<ShardReply> {
77 self.engine.request(shard_id, op).await
78 }
79}
80
81pub(crate) enum ShardKey {
82 Inline(InlineKey),
83 Shared(SharedBytes),
84}
85
86impl ShardKey {
87 pub(crate) fn inline(key: &[u8]) -> Self {
88 Self::Inline(InlineKey::from_slice(key))
89 }
90
91 pub(crate) fn from_owner(owner: &SharedBytes, range: Range<usize>) -> Self {
92 if range.len() <= MULTIKEY_INLINE_KEY_MAX {
93 Self::Inline(InlineKey::from_slice(&owner[range]))
94 } else {
95 Self::Shared(owner.slice(range))
96 }
97 }
98
99 fn as_ref(&self) -> &[u8] {
100 match self {
101 Self::Inline(key) => key.as_ref(),
102 Self::Shared(key) => key.as_ref(),
103 }
104 }
105
106 fn into_shared(self) -> SharedBytes {
107 match self {
108 Self::Inline(key) => SharedBytes::from(key.into_vec()),
109 Self::Shared(key) => key,
110 }
111 }
112}
113
114pub(crate) enum ShardValue {
115 Inline(InlineValue),
116 Shared(SharedBytes),
117}
118
119impl ShardValue {
120 pub(crate) fn inline(value: &[u8]) -> Self {
121 Self::Inline(InlineValue::from_slice(value))
122 }
123
124 pub(crate) fn from_owner(owner: &SharedBytes, range: Range<usize>) -> Self {
125 if range.len() >= RESP_SPANNED_VALUE_MIN {
126 Self::Shared(owner.slice(range))
127 } else {
128 Self::Inline(InlineValue::from_slice(&owner[range]))
129 }
130 }
131
132 fn into_shared(self) -> SharedBytes {
133 match self {
134 Self::Inline(value) => SharedBytes::from(value.into_vec()),
135 Self::Shared(value) => value,
136 }
137 }
138}
139
140#[derive(Clone, Copy, PartialEq, Eq)]
141pub(crate) enum ExpirationChange {
142 Keep,
143 ExpireAt(u64),
144 Persist,
145}
146
147impl ExpirationChange {
148 fn expire_at_ms(self) -> Option<u64> {
149 match self {
150 Self::Keep | Self::Persist => None,
151 Self::ExpireAt(expire_at_ms) => Some(expire_at_ms),
152 }
153 }
154}
155
156#[allow(clippy::large_enum_variant)]
157enum ShardMessage {
158 Execute {
159 op: ShardOperation,
160 reply: oneshot::Sender<Result<ShardReply>>,
161 },
162 Snapshot {
163 reply: oneshot::Sender<Vec<StoredEntry>>,
164 },
165 Stats {
166 reply: oneshot::Sender<ShardStatsSnapshot>,
167 },
168 Shutdown {
169 reply: oneshot::Sender<()>,
170 },
171}
172
173#[allow(clippy::large_enum_variant)]
174pub(crate) enum ShardOperation {
175 Get(Vec<u8>),
176 GetEx {
177 key_hash: u64,
178 key: ShardKey,
179 expiration: ExpirationChange,
180 },
181 Set {
182 key_hash: u64,
183 key: ShardKey,
184 value: ShardValue,
185 expire_at_ms: Option<u64>,
186 },
187 Delete {
188 key_hash: u64,
189 key: ShardKey,
190 },
191 Exists(Vec<u8>),
192 Ttl {
193 key: Vec<u8>,
194 millis: bool,
195 },
196 Expire {
197 key_hash: u64,
198 key: ShardKey,
199 expire_at_ms: Option<u64>,
200 },
201}
202
203#[allow(clippy::large_enum_variant)]
204pub(crate) enum ShardReply {
205 Value(Option<Bytes>),
206 Integer(i64),
207 Ok,
208}
209
210struct ShardState {
211 shard_id: usize,
212 map: FlatMap,
213 wal: Option<crate::persistence::WalAppender>,
214 replication: Option<Arc<ReplicationPrimary>>,
215 replication_batch: Option<ReplicationBatchBuilder>,
216 sequence: u64,
217}
218
219impl EngineHandle {
220 pub fn open(config: FastCacheConfig) -> Result<Self> {
221 #[cfg(feature = "telemetry")]
222 {
223 Self::open_with_metrics(config, None)
224 }
225 #[cfg(not(feature = "telemetry"))]
226 {
227 config.validate()?;
228 config.ensure_paths()?;
229
230 let recovery = load_recovery_state(&config.persistence)?;
231 let persistence =
232 PersistenceRuntime::start(config.shard_count, config.persistence.clone())?;
233 let replication = start_replication_primary(&config)?;
234 let shift = shift_for(config.shard_count);
235 let recovered = RecoveredShardEntries::partition(&recovery.entries, shift);
236
237 let mut shard_senders = Vec::with_capacity(config.shard_count);
238 let mut shard_threads = Vec::with_capacity(config.shard_count);
239 for shard_id in 0..config.shard_count {
240 let (tx, rx) = unbounded::<ShardMessage>();
241 let shard_entries = recovered.get(&shard_id).cloned().unwrap_or_default();
242 let shard_config = config.clone();
243 let wal = persistence.appender(shard_id);
244 let replication = replication.clone();
245 let join = thread::Builder::new()
246 .name(format!("fast-cache-shard-{shard_id}"))
247 .spawn(move || {
248 ShardWorker::run(
249 shard_id,
250 rx,
251 shard_config,
252 shard_entries,
253 wal,
254 replication,
255 )
256 })
257 .map_err(|error| {
258 FastCacheError::Config(format!("failed to start shard {shard_id}: {error}"))
259 })?;
260 shard_senders.push(tx);
261 shard_threads.push(join);
262 }
263
264 Ok(Self {
265 inner: Arc::new(EngineInner {
266 config,
267 started_at: Instant::now(),
268 shift,
269 shard_senders,
270 shard_threads: Mutex::new(shard_threads),
271 persistence,
272 replication,
273 }),
274 })
275 }
276 }
277
278 #[cfg(feature = "telemetry")]
279 pub fn open_with_metrics(
280 config: FastCacheConfig,
281 metrics: Option<Arc<CacheTelemetry>>,
282 ) -> Result<Self> {
283 config.validate()?;
284 config.ensure_paths()?;
285
286 let recovery = load_recovery_state(&config.persistence)?;
287 let persistence = PersistenceRuntime::start_with_metrics(
288 config.shard_count,
289 config.persistence.clone(),
290 metrics.clone(),
291 )?;
292 let replication = start_replication_primary(&config)?;
293 let shift = shift_for(config.shard_count);
294 let recovered = RecoveredShardEntries::partition(&recovery.entries, shift);
295
296 let mut shard_senders = Vec::with_capacity(config.shard_count);
297 let mut shard_threads = Vec::with_capacity(config.shard_count);
298 for shard_id in 0..config.shard_count {
299 let (tx, rx) = unbounded::<ShardMessage>();
300 let shard_entries = recovered.get(&shard_id).cloned().unwrap_or_default();
301 let shard_config = config.clone();
302 let wal = persistence.appender(shard_id);
303 let replication = replication.clone();
304 let shard_metrics = metrics.clone();
305 let join = thread::Builder::new()
306 .name(format!("fast-cache-shard-{shard_id}"))
307 .spawn(move || {
308 ShardWorker::run(
309 shard_id,
310 rx,
311 shard_config,
312 shard_entries,
313 wal,
314 replication,
315 shard_metrics,
316 )
317 })
318 .map_err(|error| {
319 FastCacheError::Config(format!("failed to start shard {shard_id}: {error}"))
320 })?;
321 shard_senders.push(tx);
322 shard_threads.push(join);
323 }
324
325 Ok(Self {
326 inner: Arc::new(EngineInner {
327 config,
328 started_at: Instant::now(),
329 shift,
330 shard_senders,
331 shard_threads: Mutex::new(shard_threads),
332 persistence,
333 replication,
334 metrics,
335 }),
336 })
337 }
338
339 pub fn config(&self) -> &FastCacheConfig {
340 &self.inner.config
341 }
342
343 pub async fn execute(&self, command: Command) -> Result<Frame> {
344 self.execute_owned(command).await
345 }
346
347 pub async fn execute_fast<'a>(&'a self, request: FastRequest<'a>) -> Result<FastResponse> {
348 EngineCommandCatalog::execute_fast(EngineCommandContext::new(self), request)
349 .await
350 .unwrap_or_else(|| Ok(FastResponse::Error(b"ERR unsupported command".to_vec())))
351 }
352
353 pub async fn execute_borrowed<'a>(&'a self, command: BorrowedCommand<'a>) -> Result<Frame> {
354 command
355 .execute_engine(EngineCommandContext::new(self))
356 .await
357 }
358
359 pub(crate) async fn execute_resp_borrowed_into<'a>(
360 &'a self,
361 command: BorrowedCommand<'a>,
362 out: &mut Vec<u8>,
363 ) -> Result<()> {
364 let response = self.execute_borrowed(command).await?;
365 RespCodec::encode(&response, out);
366 Ok(())
367 }
368
369 pub(crate) fn should_use_spanned_resp(command: &BorrowedCommand<'_>) -> bool {
370 command.supports_spanned_resp()
371 }
372
373 pub(crate) async fn execute_resp_spanned_into(
374 &self,
375 frame: CommandSpanFrame,
376 owner: SharedBytes,
377 out: &mut Vec<u8>,
378 ) -> Result<()> {
379 if frame.parts.is_empty() {
380 return Err(FastCacheError::Command("empty command".into()));
381 }
382 let name = String::from_utf8_lossy(&owner[frame.parts[0].clone()]).into_owned();
383 if let Some(result) = EngineCommandCatalog::execute_resp_spanned(
384 EngineCommandContext::new(self),
385 frame,
386 owner,
387 out,
388 )
389 .await
390 {
391 return result;
392 }
393 Err(FastCacheError::Command(format!(
394 "unsupported spanned command: {name}",
395 )))
396 }
397
398 async fn execute_owned(&self, command: Command) -> Result<Frame> {
399 self.execute_borrowed(command.to_borrowed_command()).await
400 }
401
402 pub async fn snapshot(&self) -> Result<std::path::PathBuf> {
403 let now_ms = now_millis();
404 let mut entries = Vec::new();
405 for shard in 0..self.inner.shard_senders.len() {
406 let (tx, rx) = oneshot::channel();
407 self.inner.shard_senders[shard]
408 .send(ShardMessage::Snapshot { reply: tx })
409 .map_err(|_| FastCacheError::ChannelClosed("snapshot request"))?;
410 entries.extend(
411 rx.await
412 .map_err(|_| FastCacheError::ChannelClosed("snapshot response"))?,
413 );
414 }
415 self.inner.persistence.snapshot(&entries, now_ms)
416 }
417
418 pub async fn stats_snapshot(&self) -> Result<GlobalStatsSnapshot> {
419 let mut shards = Vec::with_capacity(self.inner.shard_senders.len());
420 for shard in 0..self.inner.shard_senders.len() {
421 let (tx, rx) = oneshot::channel();
422 self.inner.shard_senders[shard]
423 .send(ShardMessage::Stats { reply: tx })
424 .map_err(|_| FastCacheError::ChannelClosed("stats request"))?;
425 shards.push(
426 rx.await
427 .map_err(|_| FastCacheError::ChannelClosed("stats response"))?,
428 );
429 }
430
431 let total_keys = shards.iter().map(|shard| shard.key_count).sum();
432 let total_reads = shards.iter().map(|shard| shard.reads).sum();
433 let total_writes = shards.iter().map(|shard| shard.writes).sum();
434 let total_deletes = shards.iter().map(|shard| shard.deletes).sum();
435 let total_expired = shards.iter().map(|shard| shard.expired).sum();
436
437 Ok(GlobalStatsSnapshot {
438 uptime_ms: self.inner.started_at.elapsed().as_millis() as u64,
439 shard_count: self.inner.shard_senders.len(),
440 total_keys,
441 total_reads,
442 total_writes,
443 total_deletes,
444 total_expired,
445 shards,
446 wal: self.inner.persistence.stats_snapshot(),
447 })
448 }
449
450 pub async fn shutdown(&self) -> Result<()> {
451 for shard in &self.inner.shard_senders {
452 let (tx, rx) = oneshot::channel();
453 shard
454 .send(ShardMessage::Shutdown { reply: tx })
455 .map_err(|_| FastCacheError::ChannelClosed("shutdown request"))?;
456 rx.await
457 .map_err(|_| FastCacheError::ChannelClosed("shutdown response"))?;
458 }
459 while let Some(join) = self.inner.shard_threads.lock().pop() {
460 join.join()
461 .map_err(|_| FastCacheError::TaskJoin("shard thread panicked".into()))?;
462 }
463 self.inner.persistence.shutdown()?;
464 if let Some(replication) = &self.inner.replication {
465 replication.shutdown()?;
466 }
467 Ok(())
468 }
469
470 pub fn snapshot_interval(&self) -> Duration {
471 self.inner.config.snapshot_interval()
472 }
473
474 pub fn snapshot_min_writes(&self) -> u64 {
475 self.inner.config.persistence.snapshot_min_writes
476 }
477
478 #[cfg(feature = "telemetry")]
479 pub fn metrics(&self) -> Option<Arc<CacheTelemetry>> {
480 self.inner.metrics.clone()
481 }
482
483 fn route(&self, key: &[u8]) -> usize {
484 stripe_index(hash_key(key), self.inner.shift)
485 }
486
487 fn route_hash_to_shard(&self, route_hash: u64) -> usize {
488 stripe_index(route_hash, self.inner.shift)
489 }
490
491 async fn request(&self, shard_id: usize, op: ShardOperation) -> Result<ShardReply> {
492 self.request_pending(shard_id, op)?
493 .await
494 .map_err(|_| FastCacheError::ChannelClosed("shard response"))?
495 }
496
497 fn request_pending(
498 &self,
499 shard_id: usize,
500 op: ShardOperation,
501 ) -> Result<oneshot::Receiver<Result<ShardReply>>> {
502 let (tx, rx) = oneshot::channel();
503 self.inner.shard_senders[shard_id]
504 .send(ShardMessage::Execute { op, reply: tx })
505 .map_err(|_| FastCacheError::ChannelClosed("shard request"))?;
506 Ok(rx)
507 }
508}
509
510struct RecoveredShardEntries;
511
512impl RecoveredShardEntries {
513 fn partition(entries: &[StoredEntry], shift: u32) -> HashMap<usize, Vec<StoredEntry>> {
514 let mut shards = HashMap::<usize, Vec<StoredEntry>>::new();
515 for entry in entries {
516 let shard = stripe_index(hash_key(entry.key.as_ref()), shift);
517 shards.entry(shard).or_default().push(entry.clone());
518 }
519 shards
520 }
521}
522
523struct ShardWorker;
524
525fn start_replication_primary(config: &FastCacheConfig) -> Result<Option<Arc<ReplicationPrimary>>> {
526 if !config.replication.enabled {
527 return Ok(None);
528 }
529 if config.replication.role != crate::config::ReplicationRole::Primary {
530 return Ok(None);
531 }
532 Ok(Some(Arc::new(ReplicationPrimary::start(
533 config.shard_count,
534 config.replication.clone(),
535 )?)))
536}
537
538impl ShardWorker {
539 fn run(
540 shard_id: usize,
541 receiver: Receiver<ShardMessage>,
542 config: FastCacheConfig,
543 recovered_entries: Vec<StoredEntry>,
544 wal: Option<crate::persistence::WalAppender>,
545 replication: Option<Arc<ReplicationPrimary>>,
546 #[cfg(feature = "telemetry")] metrics: Option<Arc<CacheTelemetry>>,
547 ) {
548 Self::pin_current_thread(shard_id);
549
550 #[allow(unused_mut)]
551 let mut map = FlatMap::from_entries(recovered_entries, now_millis());
552 map.configure_memory_policy(
553 config.per_shard_memory_limit_bytes(),
554 config.eviction_policy,
555 now_millis(),
556 );
557 #[cfg(feature = "telemetry")]
558 if let Some(metrics) = &metrics {
559 map.attach_metrics(CacheTelemetryHandle::from_arc(metrics), shard_id);
560 }
561
562 let mut state = ShardState {
563 shard_id,
564 map,
565 wal,
566 replication_batch: replication
567 .as_ref()
568 .map(|_| ReplicationBatchBuilder::new(config.replication.clone())),
569 replication,
570 sequence: 0,
571 };
572 let maintenance_interval = config.ttl_sweep_interval();
573 let mut next_maintenance = Instant::now() + maintenance_interval;
574
575 loop {
576 state.flush_replication_due();
577 let timeout = state.next_worker_timeout(next_maintenance);
578 match receiver.recv_timeout(timeout) {
579 Ok(ShardMessage::Execute { op, reply }) => {
580 let _ = reply.send(state.execute(op));
581 state.flush_replication_due();
582 }
583 Ok(ShardMessage::Snapshot { reply }) => {
584 let _ = reply.send(state.map.snapshot_entries(now_millis()));
585 state.flush_replication_due();
586 }
587 Ok(ShardMessage::Stats { reply }) => {
588 let _ = reply.send(state.stats_snapshot());
589 state.flush_replication_due();
590 }
591 Ok(ShardMessage::Shutdown { reply }) => {
592 state.flush_replication();
593 let _ = reply.send(());
594 break;
595 }
596 Err(RecvTimeoutError::Timeout) => {
597 state.flush_replication_due();
598 if Instant::now() >= next_maintenance {
599 state.map.process_maintenance(now_millis());
600 next_maintenance = Instant::now() + maintenance_interval;
601 }
602 }
603 Err(RecvTimeoutError::Disconnected) => break,
604 }
605 }
606 state.flush_replication();
607 }
608
609 fn pin_current_thread(shard_id: usize) {
610 if let Some(cores) = core_affinity::get_core_ids()
611 && let Some(core) = cores.get(shard_id % cores.len())
612 {
613 core_affinity::set_for_current(*core);
614 }
615 }
616}
617
618impl ShardState {
619 fn execute(&mut self, op: ShardOperation) -> Result<ShardReply> {
620 let now_ms = now_millis();
621 let reply = match op {
622 ShardOperation::Get(key) => ShardReply::Value(self.map.get(&key, now_ms)),
623 ShardOperation::GetEx {
624 key_hash,
625 key,
626 expiration,
627 } => {
628 let value = self.map.get(key.as_ref(), now_ms);
629 if value.is_some() && expiration != ExpirationChange::Keep {
630 self.apply_expiration_change(key_hash, key, expiration, now_ms)?;
631 }
632 ShardReply::Value(value)
633 }
634 ShardOperation::Set {
635 key_hash,
636 key,
637 value,
638 expire_at_ms,
639 } => {
640 self.set_and_append_wal(key_hash, key, value, expire_at_ms, now_ms)?;
641 ShardReply::Ok
642 }
643 ShardOperation::Delete { key_hash, key } => {
644 ShardReply::Integer(self.delete_and_append_wal(key_hash, key, now_ms)? as i64)
645 }
646 ShardOperation::Exists(key) => {
647 ShardReply::Integer(self.map.exists(&key, now_ms) as i64)
648 }
649 ShardOperation::Ttl { key, millis } => {
650 let ttl = match millis {
651 true => self.map.ttl_millis(&key, now_ms),
652 false => self.map.ttl_seconds(&key, now_ms),
653 };
654 ShardReply::Integer(ttl)
655 }
656 ShardOperation::Expire {
657 key_hash,
658 key,
659 expire_at_ms,
660 } => {
661 let expiration = expire_at_ms
662 .map(ExpirationChange::ExpireAt)
663 .unwrap_or(ExpirationChange::Persist);
664 ShardReply::Integer(
665 self.apply_expiration_change(key_hash, key, expiration, now_ms)? as i64,
666 )
667 }
668 };
669 Ok(reply)
670 }
671
672 fn set_and_append_wal(
673 &mut self,
674 key_hash: u64,
675 key: ShardKey,
676 value: ShardValue,
677 expire_at_ms: Option<u64>,
678 timestamp_ms: u64,
679 ) -> Result<()> {
680 if self.wal.is_some() || self.replication.is_some() {
681 self.sequence = self.sequence.saturating_add(1);
682 let key = key.into_shared();
683 let value = value.into_shared();
684 self.map.set_bytes_hashed(
685 key_hash,
686 key.as_ref(),
687 value.clone(),
688 expire_at_ms,
689 timestamp_ms,
690 );
691 let record = MutationRecord {
692 shard_id: self.shard_id,
693 sequence: self.sequence,
694 timestamp_ms,
695 op: MutationOp::Set,
696 key,
697 value,
698 expire_at_ms,
699 };
700 if let Some(wal) = &self.wal {
701 wal.append(record.clone())?;
702 }
703 self.emit_replication(ReplicationMutation::from_record_with_key_hash(
704 &record, key_hash,
705 ));
706 } else {
707 match value {
708 ShardValue::Inline(value) => {
709 self.map.set_slice_hashed(
710 key_hash,
711 key.as_ref(),
712 value.as_ref(),
713 expire_at_ms,
714 timestamp_ms,
715 );
716 }
717 ShardValue::Shared(value) => {
718 self.map.set_bytes_hashed(
719 key_hash,
720 key.as_ref(),
721 value,
722 expire_at_ms,
723 timestamp_ms,
724 );
725 }
726 }
727 }
728 Ok(())
729 }
730
731 fn delete_and_append_wal(
732 &mut self,
733 key_hash: u64,
734 key: ShardKey,
735 timestamp_ms: u64,
736 ) -> Result<bool> {
737 if self.wal.is_some() || self.replication.is_some() {
738 self.sequence = self.sequence.saturating_add(1);
739 let key = key.into_shared();
740 let deleted = self.map.delete_hashed(key_hash, key.as_ref(), timestamp_ms);
741 let record = MutationRecord {
742 shard_id: self.shard_id,
743 sequence: self.sequence,
744 timestamp_ms,
745 op: MutationOp::Del,
746 key,
747 value: SharedBytes::new(),
748 expire_at_ms: None,
749 };
750 if let Some(wal) = &self.wal {
751 wal.append(record.clone())?;
752 }
753 self.emit_replication(ReplicationMutation::from_record_with_key_hash(
754 &record, key_hash,
755 ));
756 return Ok(deleted);
757 }
758
759 Ok(self.map.delete_hashed(key_hash, key.as_ref(), timestamp_ms))
760 }
761
762 fn apply_expiration_change(
763 &mut self,
764 key_hash: u64,
765 key: ShardKey,
766 expiration: ExpirationChange,
767 timestamp_ms: u64,
768 ) -> Result<bool> {
769 let changed = match expiration {
770 ExpirationChange::Keep => false,
771 ExpirationChange::ExpireAt(expire_at_ms) => {
772 self.map.expire(key.as_ref(), expire_at_ms, timestamp_ms)
773 }
774 ExpirationChange::Persist => self.map.persist(key.as_ref(), timestamp_ms),
775 };
776 if !changed {
777 return Ok(false);
778 }
779 if self.wal.is_some() || self.replication.is_some() {
780 self.sequence = self.sequence.saturating_add(1);
781 let key = key.into_shared();
782 let record = MutationRecord {
783 shard_id: self.shard_id,
784 sequence: self.sequence,
785 timestamp_ms,
786 op: MutationOp::Expire,
787 key,
788 value: SharedBytes::new(),
789 expire_at_ms: expiration.expire_at_ms(),
790 };
791 if let Some(wal) = &self.wal {
792 wal.append(record.clone())?;
793 }
794 self.emit_replication(ReplicationMutation::from_record_with_key_hash(
795 &record, key_hash,
796 ));
797 }
798 Ok(true)
799 }
800
801 fn emit_replication(&mut self, mutation: ReplicationMutation) {
802 let Some(replication) = &self.replication else {
803 return;
804 };
805 let Some(batch_builder) = &mut self.replication_batch else {
806 replication.emit(mutation);
807 return;
808 };
809 if let Some(batch) = batch_builder.push(mutation) {
810 replication.export_batch_direct(batch);
811 }
812 }
813
814 fn flush_replication_due(&mut self) {
815 let Some(replication) = &self.replication else {
816 return;
817 };
818 let Some(batch_builder) = &mut self.replication_batch else {
819 return;
820 };
821 if let Some(batch) = batch_builder.flush_due() {
822 replication.export_batch_direct(batch);
823 }
824 }
825
826 fn flush_replication(&mut self) {
827 let Some(replication) = &self.replication else {
828 return;
829 };
830 let Some(batch_builder) = &mut self.replication_batch else {
831 return;
832 };
833 if let Some(batch) = batch_builder.flush() {
834 replication.export_batch_direct(batch);
835 }
836 }
837
838 fn next_worker_timeout(&self, next_maintenance: Instant) -> Duration {
839 let maintenance_timeout = next_maintenance
840 .checked_duration_since(Instant::now())
841 .unwrap_or_default();
842 match self
843 .replication_batch
844 .as_ref()
845 .and_then(ReplicationBatchBuilder::next_timeout)
846 {
847 Some(replication_timeout) => maintenance_timeout.min(replication_timeout),
848 None => maintenance_timeout,
849 }
850 }
851
852 fn stats_snapshot(&self) -> ShardStatsSnapshot {
853 let (hot, warm, cold) = self.map.stats_snapshot();
854 ShardStatsSnapshot {
855 shard_id: self.shard_id,
856 key_count: self.map.len(),
857 reads: 0,
858 writes: 0,
859 deletes: 0,
860 expired: 0,
861 maintenance_runs: 0,
862 hot,
863 warm,
864 cold,
865 }
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use super::*;
872 use crate::config::FastCacheConfig;
873
874 #[tokio::test(flavor = "multi_thread")]
875 async fn large_resp_set_uses_spanned_owner_path() {
876 let temp_dir = tempfile::tempdir().expect("temp dir");
877 let mut config = FastCacheConfig {
878 shard_count: 4,
879 ..FastCacheConfig::default()
880 };
881 config.persistence.enabled = false;
882 config.persistence.data_dir = temp_dir.path().to_path_buf();
883
884 let engine = EngineHandle::open(config).expect("engine");
885 let value = vec![b'x'; RESP_SPANNED_VALUE_MIN];
886 let frame = Frame::Array(vec![
887 Frame::BlobString(b"SET".to_vec()),
888 Frame::BlobString(b"large".to_vec()),
889 Frame::BlobString(value.clone()),
890 ]);
891 let mut encoded = Vec::new();
892 RespCodec::encode(&frame, &mut encoded);
893
894 {
895 let (borrowed_frame, borrowed_consumed) = RespCodec::decode_command(&encoded)
896 .expect("borrowed decode")
897 .expect("borrowed frame");
898 assert_eq!(borrowed_consumed, encoded.len());
899 let command = BorrowedCommand::from_frame(borrowed_frame).expect("borrowed command");
900 assert!(EngineHandle::should_use_spanned_resp(&command));
901 }
902
903 let (span_frame, span_consumed) = RespCodec::decode_command_spans(&encoded)
904 .expect("span decode")
905 .expect("span frame");
906 assert_eq!(span_consumed, encoded.len());
907
908 let mut out = Vec::new();
909 engine
910 .execute_resp_spanned_into(span_frame, SharedBytes::from(encoded), &mut out)
911 .await
912 .expect("spanned SET");
913 assert_eq!(out, b"+OK\r\n");
914
915 let get = BorrowedCommand::from_parts(&[b"GET".as_slice(), b"large".as_slice()])
916 .expect("GET command");
917 let response = engine.execute_borrowed(get).await.expect("GET response");
918 assert_eq!(response, Frame::BlobString(value));
919
920 engine.shutdown().await.expect("shutdown");
921 }
922}