Skip to main content

fast_cache/commands/
expire.rs

1//! EXPIRE command parsing and execution.
2
3use crate::commands::EngineCommandDispatch;
4#[cfg(feature = "server")]
5use crate::protocol::FastCodec;
6use crate::protocol::{FastCommand, FastRequest, FastResponse, Frame};
7#[cfg(feature = "server")]
8use crate::server::commands::{
9    BorrowedCommandContext, DirectCommandContext, DirectFastCommand, FastCommandContext,
10    FastDirectCommand, FcnpCommandContext, FcnpDirectCommand, FcnpDispatch, RawCommandContext,
11    RawDirectCommand,
12};
13#[cfg(feature = "server")]
14use crate::server::wire::ServerWire;
15use crate::storage::{
16    Command, EngineCommandContext, EngineFastFuture, EngineFrameFuture, ShardKey, ShardOperation,
17    ShardReply, hash_key, now_millis,
18};
19use crate::{FastCacheError, Result};
20
21use super::DecodedFastCommand;
22use super::parsing::{CommandArity, TtlMillis};
23
24pub(crate) struct Expire;
25pub(crate) static COMMAND: Expire = Expire;
26
27#[derive(Debug, Clone)]
28pub(crate) struct OwnedExpire {
29    key: Vec<u8>,
30    ttl_ms: u64,
31}
32
33impl OwnedExpire {
34    fn new(key: Vec<u8>, ttl_ms: u64) -> Self {
35        Self { key, ttl_ms }
36    }
37}
38
39impl super::OwnedCommandData for OwnedExpire {
40    type Spec = Expire;
41
42    fn route_key(&self) -> Option<&[u8]> {
43        Some(&self.key)
44    }
45
46    fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
47        Box::new(BorrowedExpire::new(&self.key, self.ttl_ms))
48    }
49}
50
51#[derive(Debug, Clone, Copy)]
52pub(crate) struct BorrowedExpire<'a> {
53    key: &'a [u8],
54    ttl_ms: u64,
55}
56
57impl<'a> BorrowedExpire<'a> {
58    fn new(key: &'a [u8], ttl_ms: u64) -> Self {
59        Self { key, ttl_ms }
60    }
61}
62
63impl<'a> super::BorrowedCommandData<'a> for BorrowedExpire<'a> {
64    type Spec = Expire;
65
66    fn route_key(&self) -> Option<&'a [u8]> {
67        Some(self.key)
68    }
69
70    fn to_owned_command(&self) -> Command {
71        Command::new(Box::new(OwnedExpire::new(self.key.to_vec(), self.ttl_ms)))
72    }
73
74    fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
75    where
76        'a: 'b,
77    {
78        let key = self.key;
79        let expire_at_ms = relative_expire_at_ms(self.ttl_ms);
80        Box::pin(async move { Expire::execute_engine_frame(ctx, key, expire_at_ms).await })
81    }
82
83    #[cfg(feature = "server")]
84    fn execute_borrowed_frame(&self, store: &crate::storage::EmbeddedStore, _now_ms: u64) -> Frame {
85        Frame::Integer(store.expire(self.key, relative_expire_at_ms(self.ttl_ms)) as i64)
86    }
87
88    #[cfg(feature = "server")]
89    fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
90        let changed = ctx
91            .store
92            .expire(self.key, relative_expire_at_ms(self.ttl_ms));
93        ServerWire::write_resp_integer(ctx.out, changed as i64);
94    }
95
96    #[cfg(feature = "server")]
97    fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
98        Frame::Integer(ctx.expire_at(self.key, ctx.now_ms.saturating_add(self.ttl_ms)) as i64)
99    }
100}
101
102impl super::CommandSpec for Expire {
103    const NAME: &'static str = "EXPIRE";
104    const MUTATES_VALUE: bool = true;
105}
106
107impl super::OwnedCommandParse for Expire {
108    fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
109        CommandArity::<Self>::exact(parts.len(), 3)?;
110        Ok(Command::new(Box::new(OwnedExpire::new(
111            parts[1].clone(),
112            TtlMillis::<Self>::seconds(&parts[2])?,
113        ))))
114    }
115}
116
117impl<'a> super::BorrowedCommandParse<'a> for Expire {
118    fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
119        CommandArity::<Self>::exact(parts.len(), 3)?;
120        Ok(Box::new(BorrowedExpire::new(
121            parts[1],
122            TtlMillis::<Self>::seconds(parts[2])?,
123        )))
124    }
125}
126
127impl DecodedFastCommand for Expire {
128    fn matches_decoded_fast(&self, command: &FastCommand<'_>) -> bool {
129        matches!(command, FastCommand::Expire { .. })
130    }
131}
132
133impl EngineCommandDispatch for Expire {
134    fn execute_engine_fast<'a>(
135        &'static self,
136        ctx: EngineCommandContext<'a>,
137        request: FastRequest<'a>,
138    ) -> EngineFastFuture<'a> {
139        Box::pin(async move {
140            match request.command {
141                FastCommand::Expire { key, ttl_ms } => {
142                    Expire::execute_engine_integer(ctx, key, relative_expire_at_ms(ttl_ms))
143                        .await
144                        .map(FastResponse::Integer)
145                }
146                _ => Ok(FastResponse::Error(b"ERR unsupported command".to_vec())),
147            }
148        })
149    }
150}
151
152impl Expire {
153    pub(crate) async fn execute_engine_integer(
154        ctx: EngineCommandContext<'_>,
155        key: &[u8],
156        expire_at_ms: u64,
157    ) -> Result<i64> {
158        let key_hash = hash_key(key);
159        let shard = ctx.route_key_hash(key_hash);
160        match ctx
161            .request(
162                shard,
163                ShardOperation::Expire {
164                    key_hash,
165                    key: ShardKey::inline(key),
166                    expire_at_ms: Some(expire_at_ms),
167                },
168            )
169            .await?
170        {
171            ShardReply::Integer(value) => Ok(value),
172            _ => Err(FastCacheError::Command(
173                "EXPIRE received unexpected shard reply".into(),
174            )),
175        }
176    }
177
178    async fn execute_engine_frame(
179        ctx: EngineCommandContext<'_>,
180        key: &[u8],
181        expire_at_ms: u64,
182    ) -> Result<Frame> {
183        Self::execute_engine_integer(ctx, key, expire_at_ms)
184            .await
185            .map(Frame::Integer)
186    }
187}
188
189pub(crate) fn relative_expire_at_ms(ttl_ms: u64) -> u64 {
190    now_millis().saturating_add(ttl_ms)
191}
192
193#[cfg(feature = "server")]
194impl RawDirectCommand for Expire {
195    fn execute(&self, ctx: RawCommandContext<'_, '_, '_>) {
196        match ctx.args.as_slice() {
197            [key, ttl] => match TtlMillis::<()>::ascii_seconds(ttl) {
198                Some(ttl_ms) => {
199                    let changed = ctx.store.expire(key, relative_expire_at_ms(ttl_ms));
200                    ServerWire::write_resp_integer(ctx.out, changed as i64);
201                }
202                None => ServerWire::write_resp_error(ctx.out, "ERR value is not an integer"),
203            },
204            _ => ServerWire::write_resp_error(
205                ctx.out,
206                "ERR wrong number of arguments for 'EXPIRE' command",
207            ),
208        }
209    }
210}
211
212#[cfg(feature = "server")]
213impl DirectFastCommand for Expire {
214    fn execute_direct_fast(
215        &self,
216        ctx: DirectCommandContext,
217        request: FastRequest<'_>,
218    ) -> FastResponse {
219        match request.command {
220            FastCommand::Expire { key, ttl_ms } => {
221                FastResponse::Integer(ctx.expire_at(key, ctx.now_ms.saturating_add(ttl_ms)) as i64)
222            }
223            _ => FastResponse::Error(b"ERR unsupported command".to_vec()),
224        }
225    }
226}
227
228#[cfg(feature = "server")]
229impl FastDirectCommand for Expire {
230    fn execute_fast(&self, ctx: FastCommandContext<'_, '_>, command: FastCommand<'_>) {
231        match command {
232            FastCommand::Expire { key, ttl_ms } => {
233                let changed = ctx.store.expire(key, relative_expire_at_ms(ttl_ms));
234                ServerWire::write_fast_integer(ctx.out, changed as i64);
235            }
236            _ => ServerWire::write_fast_error(ctx.out, "ERR unsupported command"),
237        }
238    }
239}
240
241#[cfg(feature = "server")]
242impl FcnpDirectCommand for Expire {
243    fn opcode(&self) -> u8 {
244        8
245    }
246
247    fn try_execute_fcnp(&self, ctx: FcnpCommandContext<'_, '_, '_, '_>) -> FcnpDispatch {
248        let frame_len = ctx.frame.frame_len;
249        let Ok(Some((request, consumed))) = FastCodec::decode_request(&ctx.frame.buf[..frame_len])
250        else {
251            return FcnpDispatch::Unsupported;
252        };
253        let FastCommand::Expire { key, ttl_ms } = request.command else {
254            return FcnpDispatch::Unsupported;
255        };
256        let Some(key_hash) = request.key_hash else {
257            return FcnpDispatch::Unsupported;
258        };
259        if let Some(owned_shard_id) = ctx.owned_shard_id {
260            match request.route_shard.map(|shard| shard as usize) {
261                Some(route_shard)
262                    if route_shard == owned_shard_id
263                        && ctx.request_matches_owned_shard_for_key(route_shard, key_hash, key) => {}
264                _ => {
265                    ServerWire::write_fast_error(ctx.out, "ERR FCNP route shard mismatch");
266                    return FcnpDispatch::Complete(consumed);
267                }
268            }
269        }
270        let changed = ctx.store.expire(key, relative_expire_at_ms(ttl_ms));
271        ServerWire::write_fast_integer(ctx.out, changed as i64);
272        FcnpDispatch::Complete(consumed)
273    }
274}