Skip to main content

fast_cache/commands/
persist.rs

1//! PERSIST command parsing and execution.
2
3use crate::FastCacheError;
4use crate::Result;
5use crate::protocol::{FastCommand, Frame};
6#[cfg(feature = "server")]
7use crate::server::commands::{BorrowedCommandContext, DirectCommandContext};
8#[cfg(feature = "server")]
9use crate::server::commands::{RawCommandContext, RawDirectCommand};
10#[cfg(feature = "server")]
11use crate::server::wire::ServerWire;
12use crate::storage::{
13    Command, EngineCommandContext, EngineFrameFuture, ShardKey, ShardOperation, ShardReply,
14    hash_key,
15};
16
17use super::DecodedFastCommand;
18use super::parsing::CommandArity;
19
20pub(crate) struct Persist;
21pub(crate) static COMMAND: Persist = Persist;
22
23#[derive(Debug, Clone)]
24pub(crate) struct OwnedPersist {
25    key: Vec<u8>,
26}
27
28impl OwnedPersist {
29    fn new(key: Vec<u8>) -> Self {
30        Self { key }
31    }
32}
33
34impl super::OwnedCommandData for OwnedPersist {
35    type Spec = Persist;
36
37    fn route_key(&self) -> Option<&[u8]> {
38        Some(&self.key)
39    }
40
41    fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
42        Box::new(BorrowedPersist::new(&self.key))
43    }
44}
45
46#[derive(Debug, Clone, Copy)]
47pub(crate) struct BorrowedPersist<'a> {
48    key: &'a [u8],
49}
50
51impl<'a> BorrowedPersist<'a> {
52    fn new(key: &'a [u8]) -> Self {
53        Self { key }
54    }
55}
56
57impl<'a> super::BorrowedCommandData<'a> for BorrowedPersist<'a> {
58    type Spec = Persist;
59
60    fn route_key(&self) -> Option<&'a [u8]> {
61        Some(self.key)
62    }
63
64    fn to_owned_command(&self) -> Command {
65        Command::new(Box::new(OwnedPersist::new(self.key.to_vec())))
66    }
67
68    fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
69    where
70        'a: 'b,
71    {
72        let key = self.key;
73        Box::pin(async move { Persist::execute_engine_frame(ctx, key).await })
74    }
75
76    #[cfg(feature = "server")]
77    fn execute_borrowed_frame(&self, store: &crate::storage::EmbeddedStore, _now_ms: u64) -> Frame {
78        Frame::Integer(store.persist(self.key) as i64)
79    }
80
81    #[cfg(feature = "server")]
82    fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
83        ServerWire::write_resp_integer(ctx.out, ctx.store.persist(self.key) as i64);
84    }
85
86    #[cfg(feature = "server")]
87    fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
88        Frame::Integer(ctx.persist(self.key) as i64)
89    }
90}
91
92impl super::CommandSpec for Persist {
93    const NAME: &'static str = "PERSIST";
94    const MUTATES_VALUE: bool = true;
95}
96
97impl super::OwnedCommandParse for Persist {
98    fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
99        CommandArity::<Self>::exact(parts.len(), 2)?;
100        Ok(Command::new(Box::new(OwnedPersist::new(parts[1].clone()))))
101    }
102}
103
104impl<'a> super::BorrowedCommandParse<'a> for Persist {
105    fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
106        CommandArity::<Self>::exact(parts.len(), 2)?;
107        Ok(Box::new(BorrowedPersist::new(parts[1])))
108    }
109}
110
111impl DecodedFastCommand for Persist {
112    fn matches_decoded_fast(&self, _command: &FastCommand<'_>) -> bool {
113        false
114    }
115}
116
117impl Persist {
118    async fn execute_engine_frame(ctx: EngineCommandContext<'_>, key: &[u8]) -> Result<Frame> {
119        let key_hash = hash_key(key);
120        let shard = ctx.route_key_hash(key_hash);
121        match ctx
122            .request(
123                shard,
124                ShardOperation::Expire {
125                    key_hash,
126                    key: ShardKey::inline(key),
127                    expire_at_ms: None,
128                },
129            )
130            .await?
131        {
132            ShardReply::Integer(value) => Ok(Frame::Integer(value)),
133            _ => Err(FastCacheError::Command(
134                "PERSIST received unexpected shard reply".into(),
135            )),
136        }
137    }
138}
139
140#[cfg(feature = "server")]
141impl RawDirectCommand for Persist {
142    fn execute(&self, ctx: RawCommandContext<'_, '_, '_>) {
143        match ctx.args.as_slice() {
144            [key] => ServerWire::write_resp_integer(ctx.out, ctx.store.persist(key) as i64),
145            _ => ServerWire::write_resp_error(
146                ctx.out,
147                "ERR wrong number of arguments for 'PERSIST' command",
148            ),
149        }
150    }
151}