Skip to main content

fast_cache/storage/
engine.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::ops::Range;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::thread::{self, JoinHandle};
7use std::time::{Duration, Instant};
8
9use bytes::Bytes as SharedBytes;
10use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, unbounded};
11use parking_lot::Mutex;
12use tokio::sync::oneshot;
13
14use crate::commands::EngineCommandCatalog;
15use crate::config::FastCacheConfig;
16use crate::persistence::{PersistenceRuntime, load_recovery_state};
17use crate::protocol::{CommandSpanFrame, FastRequest, FastResponse, Frame, RespCodec};
18use crate::replication::{ReplicationBatchBuilder, ReplicationMutation, ReplicationPrimary};
19use crate::storage::command::{BorrowedCommand, Command};
20use crate::storage::stats::{GlobalStatsSnapshot, ShardStatsSnapshot};
21use crate::storage::{
22    Bytes, FlatMap, MutationOp, MutationRecord, StoredEntry, hash_key, now_millis, shift_for,
23    stripe_index,
24};
25#[cfg(feature = "telemetry")]
26use crate::storage::{CacheTelemetry, CacheTelemetryHandle};
27use crate::{FastCacheError, Result};
28
29#[derive(Clone)]
30pub struct EngineHandle {
31    inner: Arc<EngineInner>,
32}
33
34struct EngineInner {
35    config: FastCacheConfig,
36    started_at: Instant,
37    shift: u32,
38    shard_senders: Vec<Sender<ShardMessage>>,
39    shard_threads: Mutex<Vec<JoinHandle<()>>>,
40    persistence: PersistenceRuntime,
41    replication: Option<Arc<ReplicationPrimary>>,
42    #[cfg(feature = "telemetry")]
43    metrics: Option<Arc<CacheTelemetry>>,
44}
45
46const MULTIKEY_INLINE_KEY_MAX: usize = 32;
47const MULTIKEY_INLINE_VALUE_MAX: usize = 64;
48pub(crate) const RESP_SPANNED_VALUE_MIN: usize = 2 * 1024;
49
50type InlineKey = smallvec::SmallVec<[u8; MULTIKEY_INLINE_KEY_MAX]>;
51type InlineValue = smallvec::SmallVec<[u8; MULTIKEY_INLINE_VALUE_MAX]>;
52pub(crate) type EngineFrameFuture<'a> = Pin<Box<dyn Future<Output = Result<Frame>> + Send + 'a>>;
53pub(crate) type EngineFastFuture<'a> =
54    Pin<Box<dyn Future<Output = Result<FastResponse>> + Send + 'a>>;
55pub(crate) type EngineRespSpanFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
56
57#[derive(Clone, Copy)]
58pub(crate) struct EngineCommandContext<'engine> {
59    engine: &'engine EngineHandle,
60}
61
62impl<'engine> EngineCommandContext<'engine> {
63    #[inline(always)]
64    pub(crate) fn new(engine: &'engine EngineHandle) -> Self {
65        Self { engine }
66    }
67
68    pub(crate) fn route_key(self, key: &[u8]) -> usize {
69        self.engine.route(key)
70    }
71
72    pub(crate) fn route_key_hash(self, key_hash: u64) -> usize {
73        self.engine.route_hash_to_shard(key_hash)
74    }
75
76    pub(crate) async fn request(self, shard_id: usize, op: ShardOperation) -> Result<ShardReply> {
77        self.engine.request(shard_id, op).await
78    }
79}
80
81pub(crate) enum ShardKey {
82    Inline(InlineKey),
83    Shared(SharedBytes),
84}
85
86impl ShardKey {
87    pub(crate) fn inline(key: &[u8]) -> Self {
88        Self::Inline(InlineKey::from_slice(key))
89    }
90
91    pub(crate) fn from_owner(owner: &SharedBytes, range: Range<usize>) -> Self {
92        if range.len() <= MULTIKEY_INLINE_KEY_MAX {
93            Self::Inline(InlineKey::from_slice(&owner[range]))
94        } else {
95            Self::Shared(owner.slice(range))
96        }
97    }
98
99    fn as_ref(&self) -> &[u8] {
100        match self {
101            Self::Inline(key) => key.as_ref(),
102            Self::Shared(key) => key.as_ref(),
103        }
104    }
105
106    fn into_shared(self) -> SharedBytes {
107        match self {
108            Self::Inline(key) => SharedBytes::from(key.into_vec()),
109            Self::Shared(key) => key,
110        }
111    }
112}
113
114pub(crate) enum ShardValue {
115    Inline(InlineValue),
116    Shared(SharedBytes),
117}
118
119impl ShardValue {
120    pub(crate) fn inline(value: &[u8]) -> Self {
121        Self::Inline(InlineValue::from_slice(value))
122    }
123
124    pub(crate) fn from_owner(owner: &SharedBytes, range: Range<usize>) -> Self {
125        if range.len() >= RESP_SPANNED_VALUE_MIN {
126            Self::Shared(owner.slice(range))
127        } else {
128            Self::Inline(InlineValue::from_slice(&owner[range]))
129        }
130    }
131
132    fn into_shared(self) -> SharedBytes {
133        match self {
134            Self::Inline(value) => SharedBytes::from(value.into_vec()),
135            Self::Shared(value) => value,
136        }
137    }
138}
139
140#[derive(Clone, Copy, PartialEq, Eq)]
141pub(crate) enum ExpirationChange {
142    Keep,
143    ExpireAt(u64),
144    Persist,
145}
146
147impl ExpirationChange {
148    fn expire_at_ms(self) -> Option<u64> {
149        match self {
150            Self::Keep | Self::Persist => None,
151            Self::ExpireAt(expire_at_ms) => Some(expire_at_ms),
152        }
153    }
154}
155
156#[allow(clippy::large_enum_variant)]
157enum ShardMessage {
158    Execute {
159        op: ShardOperation,
160        reply: oneshot::Sender<Result<ShardReply>>,
161    },
162    Snapshot {
163        reply: oneshot::Sender<Vec<StoredEntry>>,
164    },
165    Stats {
166        reply: oneshot::Sender<ShardStatsSnapshot>,
167    },
168    Shutdown {
169        reply: oneshot::Sender<()>,
170    },
171}
172
173#[allow(clippy::large_enum_variant)]
174pub(crate) enum ShardOperation {
175    Get(Vec<u8>),
176    GetEx {
177        key_hash: u64,
178        key: ShardKey,
179        expiration: ExpirationChange,
180    },
181    Set {
182        key_hash: u64,
183        key: ShardKey,
184        value: ShardValue,
185        expire_at_ms: Option<u64>,
186    },
187    Delete {
188        key_hash: u64,
189        key: ShardKey,
190    },
191    Exists(Vec<u8>),
192    Ttl {
193        key: Vec<u8>,
194        millis: bool,
195    },
196    Expire {
197        key_hash: u64,
198        key: ShardKey,
199        expire_at_ms: Option<u64>,
200    },
201}
202
203#[allow(clippy::large_enum_variant)]
204pub(crate) enum ShardReply {
205    Value(Option<Bytes>),
206    Integer(i64),
207    Ok,
208}
209
210struct ShardState {
211    shard_id: usize,
212    map: FlatMap,
213    wal: Option<crate::persistence::WalAppender>,
214    replication: Option<Arc<ReplicationPrimary>>,
215    replication_batch: Option<ReplicationBatchBuilder>,
216    sequence: u64,
217}
218
219impl EngineHandle {
220    pub fn open(config: FastCacheConfig) -> Result<Self> {
221        #[cfg(feature = "telemetry")]
222        {
223            Self::open_with_metrics(config, None)
224        }
225        #[cfg(not(feature = "telemetry"))]
226        {
227            config.validate()?;
228            config.ensure_paths()?;
229
230            let recovery = load_recovery_state(&config.persistence)?;
231            let persistence =
232                PersistenceRuntime::start(config.shard_count, config.persistence.clone())?;
233            let replication = start_replication_primary(&config)?;
234            let shift = shift_for(config.shard_count);
235            let recovered = RecoveredShardEntries::partition(&recovery.entries, shift);
236
237            let mut shard_senders = Vec::with_capacity(config.shard_count);
238            let mut shard_threads = Vec::with_capacity(config.shard_count);
239            for shard_id in 0..config.shard_count {
240                let (tx, rx) = unbounded::<ShardMessage>();
241                let shard_entries = recovered.get(&shard_id).cloned().unwrap_or_default();
242                let shard_config = config.clone();
243                let wal = persistence.appender(shard_id);
244                let replication = replication.clone();
245                let join = thread::Builder::new()
246                    .name(format!("fast-cache-shard-{shard_id}"))
247                    .spawn(move || {
248                        ShardWorker::run(
249                            shard_id,
250                            rx,
251                            shard_config,
252                            shard_entries,
253                            wal,
254                            replication,
255                        )
256                    })
257                    .map_err(|error| {
258                        FastCacheError::Config(format!("failed to start shard {shard_id}: {error}"))
259                    })?;
260                shard_senders.push(tx);
261                shard_threads.push(join);
262            }
263
264            Ok(Self {
265                inner: Arc::new(EngineInner {
266                    config,
267                    started_at: Instant::now(),
268                    shift,
269                    shard_senders,
270                    shard_threads: Mutex::new(shard_threads),
271                    persistence,
272                    replication,
273                }),
274            })
275        }
276    }
277
278    #[cfg(feature = "telemetry")]
279    pub fn open_with_metrics(
280        config: FastCacheConfig,
281        metrics: Option<Arc<CacheTelemetry>>,
282    ) -> Result<Self> {
283        config.validate()?;
284        config.ensure_paths()?;
285
286        let recovery = load_recovery_state(&config.persistence)?;
287        let persistence = PersistenceRuntime::start_with_metrics(
288            config.shard_count,
289            config.persistence.clone(),
290            metrics.clone(),
291        )?;
292        let replication = start_replication_primary(&config)?;
293        let shift = shift_for(config.shard_count);
294        let recovered = RecoveredShardEntries::partition(&recovery.entries, shift);
295
296        let mut shard_senders = Vec::with_capacity(config.shard_count);
297        let mut shard_threads = Vec::with_capacity(config.shard_count);
298        for shard_id in 0..config.shard_count {
299            let (tx, rx) = unbounded::<ShardMessage>();
300            let shard_entries = recovered.get(&shard_id).cloned().unwrap_or_default();
301            let shard_config = config.clone();
302            let wal = persistence.appender(shard_id);
303            let replication = replication.clone();
304            let shard_metrics = metrics.clone();
305            let join = thread::Builder::new()
306                .name(format!("fast-cache-shard-{shard_id}"))
307                .spawn(move || {
308                    ShardWorker::run(
309                        shard_id,
310                        rx,
311                        shard_config,
312                        shard_entries,
313                        wal,
314                        replication,
315                        shard_metrics,
316                    )
317                })
318                .map_err(|error| {
319                    FastCacheError::Config(format!("failed to start shard {shard_id}: {error}"))
320                })?;
321            shard_senders.push(tx);
322            shard_threads.push(join);
323        }
324
325        Ok(Self {
326            inner: Arc::new(EngineInner {
327                config,
328                started_at: Instant::now(),
329                shift,
330                shard_senders,
331                shard_threads: Mutex::new(shard_threads),
332                persistence,
333                replication,
334                metrics,
335            }),
336        })
337    }
338
339    pub fn config(&self) -> &FastCacheConfig {
340        &self.inner.config
341    }
342
343    pub async fn execute(&self, command: Command) -> Result<Frame> {
344        self.execute_owned(command).await
345    }
346
347    pub async fn execute_fast<'a>(&'a self, request: FastRequest<'a>) -> Result<FastResponse> {
348        EngineCommandCatalog::execute_fast(EngineCommandContext::new(self), request)
349            .await
350            .unwrap_or_else(|| Ok(FastResponse::Error(b"ERR unsupported command".to_vec())))
351    }
352
353    pub async fn execute_borrowed<'a>(&'a self, command: BorrowedCommand<'a>) -> Result<Frame> {
354        command
355            .execute_engine(EngineCommandContext::new(self))
356            .await
357    }
358
359    pub(crate) async fn execute_resp_borrowed_into<'a>(
360        &'a self,
361        command: BorrowedCommand<'a>,
362        out: &mut Vec<u8>,
363    ) -> Result<()> {
364        let response = self.execute_borrowed(command).await?;
365        RespCodec::encode(&response, out);
366        Ok(())
367    }
368
369    pub(crate) fn should_use_spanned_resp(command: &BorrowedCommand<'_>) -> bool {
370        command.supports_spanned_resp()
371    }
372
373    pub(crate) async fn execute_resp_spanned_into(
374        &self,
375        frame: CommandSpanFrame,
376        owner: SharedBytes,
377        out: &mut Vec<u8>,
378    ) -> Result<()> {
379        if frame.parts.is_empty() {
380            return Err(FastCacheError::Command("empty command".into()));
381        }
382        let name = String::from_utf8_lossy(&owner[frame.parts[0].clone()]).into_owned();
383        if let Some(result) = EngineCommandCatalog::execute_resp_spanned(
384            EngineCommandContext::new(self),
385            frame,
386            owner,
387            out,
388        )
389        .await
390        {
391            return result;
392        }
393        Err(FastCacheError::Command(format!(
394            "unsupported spanned command: {name}",
395        )))
396    }
397
398    async fn execute_owned(&self, command: Command) -> Result<Frame> {
399        self.execute_borrowed(command.to_borrowed_command()).await
400    }
401
402    pub async fn snapshot(&self) -> Result<std::path::PathBuf> {
403        let now_ms = now_millis();
404        let mut entries = Vec::new();
405        for shard in 0..self.inner.shard_senders.len() {
406            let (tx, rx) = oneshot::channel();
407            self.inner.shard_senders[shard]
408                .send(ShardMessage::Snapshot { reply: tx })
409                .map_err(|_| FastCacheError::ChannelClosed("snapshot request"))?;
410            entries.extend(
411                rx.await
412                    .map_err(|_| FastCacheError::ChannelClosed("snapshot response"))?,
413            );
414        }
415        self.inner.persistence.snapshot(&entries, now_ms)
416    }
417
418    pub async fn stats_snapshot(&self) -> Result<GlobalStatsSnapshot> {
419        let mut shards = Vec::with_capacity(self.inner.shard_senders.len());
420        for shard in 0..self.inner.shard_senders.len() {
421            let (tx, rx) = oneshot::channel();
422            self.inner.shard_senders[shard]
423                .send(ShardMessage::Stats { reply: tx })
424                .map_err(|_| FastCacheError::ChannelClosed("stats request"))?;
425            shards.push(
426                rx.await
427                    .map_err(|_| FastCacheError::ChannelClosed("stats response"))?,
428            );
429        }
430
431        let total_keys = shards.iter().map(|shard| shard.key_count).sum();
432        let total_reads = shards.iter().map(|shard| shard.reads).sum();
433        let total_writes = shards.iter().map(|shard| shard.writes).sum();
434        let total_deletes = shards.iter().map(|shard| shard.deletes).sum();
435        let total_expired = shards.iter().map(|shard| shard.expired).sum();
436
437        Ok(GlobalStatsSnapshot {
438            uptime_ms: self.inner.started_at.elapsed().as_millis() as u64,
439            shard_count: self.inner.shard_senders.len(),
440            total_keys,
441            total_reads,
442            total_writes,
443            total_deletes,
444            total_expired,
445            shards,
446            wal: self.inner.persistence.stats_snapshot(),
447        })
448    }
449
450    pub async fn shutdown(&self) -> Result<()> {
451        for shard in &self.inner.shard_senders {
452            let (tx, rx) = oneshot::channel();
453            shard
454                .send(ShardMessage::Shutdown { reply: tx })
455                .map_err(|_| FastCacheError::ChannelClosed("shutdown request"))?;
456            rx.await
457                .map_err(|_| FastCacheError::ChannelClosed("shutdown response"))?;
458        }
459        while let Some(join) = self.inner.shard_threads.lock().pop() {
460            join.join()
461                .map_err(|_| FastCacheError::TaskJoin("shard thread panicked".into()))?;
462        }
463        self.inner.persistence.shutdown()?;
464        if let Some(replication) = &self.inner.replication {
465            replication.shutdown()?;
466        }
467        Ok(())
468    }
469
470    pub fn snapshot_interval(&self) -> Duration {
471        self.inner.config.snapshot_interval()
472    }
473
474    pub fn snapshot_min_writes(&self) -> u64 {
475        self.inner.config.persistence.snapshot_min_writes
476    }
477
478    #[cfg(feature = "telemetry")]
479    pub fn metrics(&self) -> Option<Arc<CacheTelemetry>> {
480        self.inner.metrics.clone()
481    }
482
483    fn route(&self, key: &[u8]) -> usize {
484        stripe_index(hash_key(key), self.inner.shift)
485    }
486
487    fn route_hash_to_shard(&self, route_hash: u64) -> usize {
488        stripe_index(route_hash, self.inner.shift)
489    }
490
491    async fn request(&self, shard_id: usize, op: ShardOperation) -> Result<ShardReply> {
492        self.request_pending(shard_id, op)?
493            .await
494            .map_err(|_| FastCacheError::ChannelClosed("shard response"))?
495    }
496
497    fn request_pending(
498        &self,
499        shard_id: usize,
500        op: ShardOperation,
501    ) -> Result<oneshot::Receiver<Result<ShardReply>>> {
502        let (tx, rx) = oneshot::channel();
503        self.inner.shard_senders[shard_id]
504            .send(ShardMessage::Execute { op, reply: tx })
505            .map_err(|_| FastCacheError::ChannelClosed("shard request"))?;
506        Ok(rx)
507    }
508}
509
510struct RecoveredShardEntries;
511
512impl RecoveredShardEntries {
513    fn partition(entries: &[StoredEntry], shift: u32) -> HashMap<usize, Vec<StoredEntry>> {
514        let mut shards = HashMap::<usize, Vec<StoredEntry>>::new();
515        for entry in entries {
516            let shard = stripe_index(hash_key(entry.key.as_ref()), shift);
517            shards.entry(shard).or_default().push(entry.clone());
518        }
519        shards
520    }
521}
522
523struct ShardWorker;
524
525fn start_replication_primary(config: &FastCacheConfig) -> Result<Option<Arc<ReplicationPrimary>>> {
526    if !config.replication.enabled {
527        return Ok(None);
528    }
529    if config.replication.role != crate::config::ReplicationRole::Primary {
530        return Ok(None);
531    }
532    Ok(Some(Arc::new(ReplicationPrimary::start(
533        config.shard_count,
534        config.replication.clone(),
535    )?)))
536}
537
538impl ShardWorker {
539    fn run(
540        shard_id: usize,
541        receiver: Receiver<ShardMessage>,
542        config: FastCacheConfig,
543        recovered_entries: Vec<StoredEntry>,
544        wal: Option<crate::persistence::WalAppender>,
545        replication: Option<Arc<ReplicationPrimary>>,
546        #[cfg(feature = "telemetry")] metrics: Option<Arc<CacheTelemetry>>,
547    ) {
548        Self::pin_current_thread(shard_id);
549
550        #[allow(unused_mut)]
551        let mut map = FlatMap::from_entries(recovered_entries, now_millis());
552        map.configure_memory_policy(
553            config.per_shard_memory_limit_bytes(),
554            config.eviction_policy,
555            now_millis(),
556        );
557        #[cfg(feature = "telemetry")]
558        if let Some(metrics) = &metrics {
559            map.attach_metrics(CacheTelemetryHandle::from_arc(metrics), shard_id);
560        }
561
562        let mut state = ShardState {
563            shard_id,
564            map,
565            wal,
566            replication_batch: replication
567                .as_ref()
568                .map(|_| ReplicationBatchBuilder::new(config.replication.clone())),
569            replication,
570            sequence: 0,
571        };
572        let maintenance_interval = config.ttl_sweep_interval();
573        let mut next_maintenance = Instant::now() + maintenance_interval;
574
575        loop {
576            state.flush_replication_due();
577            let timeout = state.next_worker_timeout(next_maintenance);
578            match receiver.recv_timeout(timeout) {
579                Ok(ShardMessage::Execute { op, reply }) => {
580                    let _ = reply.send(state.execute(op));
581                    state.flush_replication_due();
582                }
583                Ok(ShardMessage::Snapshot { reply }) => {
584                    let _ = reply.send(state.map.snapshot_entries(now_millis()));
585                    state.flush_replication_due();
586                }
587                Ok(ShardMessage::Stats { reply }) => {
588                    let _ = reply.send(state.stats_snapshot());
589                    state.flush_replication_due();
590                }
591                Ok(ShardMessage::Shutdown { reply }) => {
592                    state.flush_replication();
593                    let _ = reply.send(());
594                    break;
595                }
596                Err(RecvTimeoutError::Timeout) => {
597                    state.flush_replication_due();
598                    if Instant::now() >= next_maintenance {
599                        state.map.process_maintenance(now_millis());
600                        next_maintenance = Instant::now() + maintenance_interval;
601                    }
602                }
603                Err(RecvTimeoutError::Disconnected) => break,
604            }
605        }
606        state.flush_replication();
607    }
608
609    fn pin_current_thread(shard_id: usize) {
610        if let Some(cores) = core_affinity::get_core_ids()
611            && let Some(core) = cores.get(shard_id % cores.len())
612        {
613            core_affinity::set_for_current(*core);
614        }
615    }
616}
617
618impl ShardState {
619    fn execute(&mut self, op: ShardOperation) -> Result<ShardReply> {
620        let now_ms = now_millis();
621        let reply = match op {
622            ShardOperation::Get(key) => ShardReply::Value(self.map.get(&key, now_ms)),
623            ShardOperation::GetEx {
624                key_hash,
625                key,
626                expiration,
627            } => {
628                let value = self.map.get(key.as_ref(), now_ms);
629                if value.is_some() && expiration != ExpirationChange::Keep {
630                    self.apply_expiration_change(key_hash, key, expiration, now_ms)?;
631                }
632                ShardReply::Value(value)
633            }
634            ShardOperation::Set {
635                key_hash,
636                key,
637                value,
638                expire_at_ms,
639            } => {
640                self.set_and_append_wal(key_hash, key, value, expire_at_ms, now_ms)?;
641                ShardReply::Ok
642            }
643            ShardOperation::Delete { key_hash, key } => {
644                ShardReply::Integer(self.delete_and_append_wal(key_hash, key, now_ms)? as i64)
645            }
646            ShardOperation::Exists(key) => {
647                ShardReply::Integer(self.map.exists(&key, now_ms) as i64)
648            }
649            ShardOperation::Ttl { key, millis } => {
650                let ttl = match millis {
651                    true => self.map.ttl_millis(&key, now_ms),
652                    false => self.map.ttl_seconds(&key, now_ms),
653                };
654                ShardReply::Integer(ttl)
655            }
656            ShardOperation::Expire {
657                key_hash,
658                key,
659                expire_at_ms,
660            } => {
661                let expiration = expire_at_ms
662                    .map(ExpirationChange::ExpireAt)
663                    .unwrap_or(ExpirationChange::Persist);
664                ShardReply::Integer(
665                    self.apply_expiration_change(key_hash, key, expiration, now_ms)? as i64,
666                )
667            }
668        };
669        Ok(reply)
670    }
671
672    fn set_and_append_wal(
673        &mut self,
674        key_hash: u64,
675        key: ShardKey,
676        value: ShardValue,
677        expire_at_ms: Option<u64>,
678        timestamp_ms: u64,
679    ) -> Result<()> {
680        if self.wal.is_some() || self.replication.is_some() {
681            self.sequence = self.sequence.saturating_add(1);
682            let key = key.into_shared();
683            let value = value.into_shared();
684            self.map.set_bytes_hashed(
685                key_hash,
686                key.as_ref(),
687                value.clone(),
688                expire_at_ms,
689                timestamp_ms,
690            );
691            let record = MutationRecord {
692                shard_id: self.shard_id,
693                sequence: self.sequence,
694                timestamp_ms,
695                op: MutationOp::Set,
696                key,
697                value,
698                expire_at_ms,
699            };
700            if let Some(wal) = &self.wal {
701                wal.append(record.clone())?;
702            }
703            self.emit_replication(ReplicationMutation::from_record_with_key_hash(
704                &record, key_hash,
705            ));
706        } else {
707            match value {
708                ShardValue::Inline(value) => {
709                    self.map.set_slice_hashed(
710                        key_hash,
711                        key.as_ref(),
712                        value.as_ref(),
713                        expire_at_ms,
714                        timestamp_ms,
715                    );
716                }
717                ShardValue::Shared(value) => {
718                    self.map.set_bytes_hashed(
719                        key_hash,
720                        key.as_ref(),
721                        value,
722                        expire_at_ms,
723                        timestamp_ms,
724                    );
725                }
726            }
727        }
728        Ok(())
729    }
730
731    fn delete_and_append_wal(
732        &mut self,
733        key_hash: u64,
734        key: ShardKey,
735        timestamp_ms: u64,
736    ) -> Result<bool> {
737        if self.wal.is_some() || self.replication.is_some() {
738            self.sequence = self.sequence.saturating_add(1);
739            let key = key.into_shared();
740            let deleted = self.map.delete_hashed(key_hash, key.as_ref(), timestamp_ms);
741            let record = MutationRecord {
742                shard_id: self.shard_id,
743                sequence: self.sequence,
744                timestamp_ms,
745                op: MutationOp::Del,
746                key,
747                value: SharedBytes::new(),
748                expire_at_ms: None,
749            };
750            if let Some(wal) = &self.wal {
751                wal.append(record.clone())?;
752            }
753            self.emit_replication(ReplicationMutation::from_record_with_key_hash(
754                &record, key_hash,
755            ));
756            return Ok(deleted);
757        }
758
759        Ok(self.map.delete_hashed(key_hash, key.as_ref(), timestamp_ms))
760    }
761
762    fn apply_expiration_change(
763        &mut self,
764        key_hash: u64,
765        key: ShardKey,
766        expiration: ExpirationChange,
767        timestamp_ms: u64,
768    ) -> Result<bool> {
769        let changed = match expiration {
770            ExpirationChange::Keep => false,
771            ExpirationChange::ExpireAt(expire_at_ms) => {
772                self.map.expire(key.as_ref(), expire_at_ms, timestamp_ms)
773            }
774            ExpirationChange::Persist => self.map.persist(key.as_ref(), timestamp_ms),
775        };
776        if !changed {
777            return Ok(false);
778        }
779        if self.wal.is_some() || self.replication.is_some() {
780            self.sequence = self.sequence.saturating_add(1);
781            let key = key.into_shared();
782            let record = MutationRecord {
783                shard_id: self.shard_id,
784                sequence: self.sequence,
785                timestamp_ms,
786                op: MutationOp::Expire,
787                key,
788                value: SharedBytes::new(),
789                expire_at_ms: expiration.expire_at_ms(),
790            };
791            if let Some(wal) = &self.wal {
792                wal.append(record.clone())?;
793            }
794            self.emit_replication(ReplicationMutation::from_record_with_key_hash(
795                &record, key_hash,
796            ));
797        }
798        Ok(true)
799    }
800
801    fn emit_replication(&mut self, mutation: ReplicationMutation) {
802        let Some(replication) = &self.replication else {
803            return;
804        };
805        let Some(batch_builder) = &mut self.replication_batch else {
806            replication.emit(mutation);
807            return;
808        };
809        if let Some(batch) = batch_builder.push(mutation) {
810            replication.export_batch_direct(batch);
811        }
812    }
813
814    fn flush_replication_due(&mut self) {
815        let Some(replication) = &self.replication else {
816            return;
817        };
818        let Some(batch_builder) = &mut self.replication_batch else {
819            return;
820        };
821        if let Some(batch) = batch_builder.flush_due() {
822            replication.export_batch_direct(batch);
823        }
824    }
825
826    fn flush_replication(&mut self) {
827        let Some(replication) = &self.replication else {
828            return;
829        };
830        let Some(batch_builder) = &mut self.replication_batch else {
831            return;
832        };
833        if let Some(batch) = batch_builder.flush() {
834            replication.export_batch_direct(batch);
835        }
836    }
837
838    fn next_worker_timeout(&self, next_maintenance: Instant) -> Duration {
839        let maintenance_timeout = next_maintenance
840            .checked_duration_since(Instant::now())
841            .unwrap_or_default();
842        match self
843            .replication_batch
844            .as_ref()
845            .and_then(ReplicationBatchBuilder::next_timeout)
846        {
847            Some(replication_timeout) => maintenance_timeout.min(replication_timeout),
848            None => maintenance_timeout,
849        }
850    }
851
852    fn stats_snapshot(&self) -> ShardStatsSnapshot {
853        let (hot, warm, cold) = self.map.stats_snapshot();
854        ShardStatsSnapshot {
855            shard_id: self.shard_id,
856            key_count: self.map.len(),
857            reads: 0,
858            writes: 0,
859            deletes: 0,
860            expired: 0,
861            maintenance_runs: 0,
862            hot,
863            warm,
864            cold,
865        }
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use super::*;
872    use crate::config::FastCacheConfig;
873
874    #[tokio::test(flavor = "multi_thread")]
875    async fn large_resp_set_uses_spanned_owner_path() {
876        let temp_dir = tempfile::tempdir().expect("temp dir");
877        let mut config = FastCacheConfig {
878            shard_count: 4,
879            ..FastCacheConfig::default()
880        };
881        config.persistence.enabled = false;
882        config.persistence.data_dir = temp_dir.path().to_path_buf();
883
884        let engine = EngineHandle::open(config).expect("engine");
885        let value = vec![b'x'; RESP_SPANNED_VALUE_MIN];
886        let frame = Frame::Array(vec![
887            Frame::BlobString(b"SET".to_vec()),
888            Frame::BlobString(b"large".to_vec()),
889            Frame::BlobString(value.clone()),
890        ]);
891        let mut encoded = Vec::new();
892        RespCodec::encode(&frame, &mut encoded);
893
894        {
895            let (borrowed_frame, borrowed_consumed) = RespCodec::decode_command(&encoded)
896                .expect("borrowed decode")
897                .expect("borrowed frame");
898            assert_eq!(borrowed_consumed, encoded.len());
899            let command = BorrowedCommand::from_frame(borrowed_frame).expect("borrowed command");
900            assert!(EngineHandle::should_use_spanned_resp(&command));
901        }
902
903        let (span_frame, span_consumed) = RespCodec::decode_command_spans(&encoded)
904            .expect("span decode")
905            .expect("span frame");
906        assert_eq!(span_consumed, encoded.len());
907
908        let mut out = Vec::new();
909        engine
910            .execute_resp_spanned_into(span_frame, SharedBytes::from(encoded), &mut out)
911            .await
912            .expect("spanned SET");
913        assert_eq!(out, b"+OK\r\n");
914
915        let get = BorrowedCommand::from_parts(&[b"GET".as_slice(), b"large".as_slice()])
916            .expect("GET command");
917        let response = engine.execute_borrowed(get).await.expect("GET response");
918        assert_eq!(response, Frame::BlobString(value));
919
920        engine.shutdown().await.expect("shutdown");
921    }
922}