1use crate::commands::EngineCommandDispatch;
4#[cfg(feature = "server")]
5use crate::protocol::FastCodec;
6use crate::protocol::{FastCommand, FastRequest, FastResponse, Frame};
7#[cfg(feature = "server")]
8use crate::server::commands::{
9 BorrowedCommandContext, DirectCommandContext, DirectFastCommand, FastCommandContext,
10 FastDirectCommand, FcnpCommandContext, FcnpDirectCommand, FcnpDispatch, RawCommandContext,
11 RawDirectCommand,
12};
13#[cfg(feature = "server")]
14use crate::server::wire::ServerWire;
15use crate::storage::{
16 Command, EngineCommandContext, EngineFastFuture, EngineFrameFuture, ShardKey, ShardOperation,
17 ShardReply, hash_key, now_millis,
18};
19use crate::{FastCacheError, Result};
20
21use super::DecodedFastCommand;
22use super::parsing::{CommandArity, TtlMillis};
23
24pub(crate) struct Expire;
25pub(crate) static COMMAND: Expire = Expire;
26
27#[derive(Debug, Clone)]
28pub(crate) struct OwnedExpire {
29 key: Vec<u8>,
30 ttl_ms: u64,
31}
32
33impl OwnedExpire {
34 fn new(key: Vec<u8>, ttl_ms: u64) -> Self {
35 Self { key, ttl_ms }
36 }
37}
38
39impl super::OwnedCommandData for OwnedExpire {
40 type Spec = Expire;
41
42 fn route_key(&self) -> Option<&[u8]> {
43 Some(&self.key)
44 }
45
46 fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
47 Box::new(BorrowedExpire::new(&self.key, self.ttl_ms))
48 }
49}
50
51#[derive(Debug, Clone, Copy)]
52pub(crate) struct BorrowedExpire<'a> {
53 key: &'a [u8],
54 ttl_ms: u64,
55}
56
57impl<'a> BorrowedExpire<'a> {
58 fn new(key: &'a [u8], ttl_ms: u64) -> Self {
59 Self { key, ttl_ms }
60 }
61}
62
63impl<'a> super::BorrowedCommandData<'a> for BorrowedExpire<'a> {
64 type Spec = Expire;
65
66 fn route_key(&self) -> Option<&'a [u8]> {
67 Some(self.key)
68 }
69
70 fn to_owned_command(&self) -> Command {
71 Command::new(Box::new(OwnedExpire::new(self.key.to_vec(), self.ttl_ms)))
72 }
73
74 fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
75 where
76 'a: 'b,
77 {
78 let key = self.key;
79 let expire_at_ms = relative_expire_at_ms(self.ttl_ms);
80 Box::pin(async move { Expire::execute_engine_frame(ctx, key, expire_at_ms).await })
81 }
82
83 #[cfg(feature = "server")]
84 fn execute_borrowed_frame(&self, store: &crate::storage::EmbeddedStore, _now_ms: u64) -> Frame {
85 Frame::Integer(store.expire(self.key, relative_expire_at_ms(self.ttl_ms)) as i64)
86 }
87
88 #[cfg(feature = "server")]
89 fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
90 let changed = ctx
91 .store
92 .expire(self.key, relative_expire_at_ms(self.ttl_ms));
93 ServerWire::write_resp_integer(ctx.out, changed as i64);
94 }
95
96 #[cfg(feature = "server")]
97 fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
98 Frame::Integer(ctx.expire_at(self.key, ctx.now_ms.saturating_add(self.ttl_ms)) as i64)
99 }
100}
101
102impl super::CommandSpec for Expire {
103 const NAME: &'static str = "EXPIRE";
104 const MUTATES_VALUE: bool = true;
105}
106
107impl super::OwnedCommandParse for Expire {
108 fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
109 CommandArity::<Self>::exact(parts.len(), 3)?;
110 Ok(Command::new(Box::new(OwnedExpire::new(
111 parts[1].clone(),
112 TtlMillis::<Self>::seconds(&parts[2])?,
113 ))))
114 }
115}
116
117impl<'a> super::BorrowedCommandParse<'a> for Expire {
118 fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
119 CommandArity::<Self>::exact(parts.len(), 3)?;
120 Ok(Box::new(BorrowedExpire::new(
121 parts[1],
122 TtlMillis::<Self>::seconds(parts[2])?,
123 )))
124 }
125}
126
127impl DecodedFastCommand for Expire {
128 fn matches_decoded_fast(&self, command: &FastCommand<'_>) -> bool {
129 matches!(command, FastCommand::Expire { .. })
130 }
131}
132
133impl EngineCommandDispatch for Expire {
134 fn execute_engine_fast<'a>(
135 &'static self,
136 ctx: EngineCommandContext<'a>,
137 request: FastRequest<'a>,
138 ) -> EngineFastFuture<'a> {
139 Box::pin(async move {
140 match request.command {
141 FastCommand::Expire { key, ttl_ms } => {
142 Expire::execute_engine_integer(ctx, key, relative_expire_at_ms(ttl_ms))
143 .await
144 .map(FastResponse::Integer)
145 }
146 _ => Ok(FastResponse::Error(b"ERR unsupported command".to_vec())),
147 }
148 })
149 }
150}
151
152impl Expire {
153 pub(crate) async fn execute_engine_integer(
154 ctx: EngineCommandContext<'_>,
155 key: &[u8],
156 expire_at_ms: u64,
157 ) -> Result<i64> {
158 let key_hash = hash_key(key);
159 let shard = ctx.route_key_hash(key_hash);
160 match ctx
161 .request(
162 shard,
163 ShardOperation::Expire {
164 key_hash,
165 key: ShardKey::inline(key),
166 expire_at_ms: Some(expire_at_ms),
167 },
168 )
169 .await?
170 {
171 ShardReply::Integer(value) => Ok(value),
172 _ => Err(FastCacheError::Command(
173 "EXPIRE received unexpected shard reply".into(),
174 )),
175 }
176 }
177
178 async fn execute_engine_frame(
179 ctx: EngineCommandContext<'_>,
180 key: &[u8],
181 expire_at_ms: u64,
182 ) -> Result<Frame> {
183 Self::execute_engine_integer(ctx, key, expire_at_ms)
184 .await
185 .map(Frame::Integer)
186 }
187}
188
189pub(crate) fn relative_expire_at_ms(ttl_ms: u64) -> u64 {
190 now_millis().saturating_add(ttl_ms)
191}
192
193#[cfg(feature = "server")]
194impl RawDirectCommand for Expire {
195 fn execute(&self, ctx: RawCommandContext<'_, '_, '_>) {
196 match ctx.args.as_slice() {
197 [key, ttl] => match TtlMillis::<()>::ascii_seconds(ttl) {
198 Some(ttl_ms) => {
199 let changed = ctx.store.expire(key, relative_expire_at_ms(ttl_ms));
200 ServerWire::write_resp_integer(ctx.out, changed as i64);
201 }
202 None => ServerWire::write_resp_error(ctx.out, "ERR value is not an integer"),
203 },
204 _ => ServerWire::write_resp_error(
205 ctx.out,
206 "ERR wrong number of arguments for 'EXPIRE' command",
207 ),
208 }
209 }
210}
211
212#[cfg(feature = "server")]
213impl DirectFastCommand for Expire {
214 fn execute_direct_fast(
215 &self,
216 ctx: DirectCommandContext,
217 request: FastRequest<'_>,
218 ) -> FastResponse {
219 match request.command {
220 FastCommand::Expire { key, ttl_ms } => {
221 FastResponse::Integer(ctx.expire_at(key, ctx.now_ms.saturating_add(ttl_ms)) as i64)
222 }
223 _ => FastResponse::Error(b"ERR unsupported command".to_vec()),
224 }
225 }
226}
227
228#[cfg(feature = "server")]
229impl FastDirectCommand for Expire {
230 fn execute_fast(&self, ctx: FastCommandContext<'_, '_>, command: FastCommand<'_>) {
231 match command {
232 FastCommand::Expire { key, ttl_ms } => {
233 let changed = ctx.store.expire(key, relative_expire_at_ms(ttl_ms));
234 ServerWire::write_fast_integer(ctx.out, changed as i64);
235 }
236 _ => ServerWire::write_fast_error(ctx.out, "ERR unsupported command"),
237 }
238 }
239}
240
241#[cfg(feature = "server")]
242impl FcnpDirectCommand for Expire {
243 fn opcode(&self) -> u8 {
244 8
245 }
246
247 fn try_execute_fcnp(&self, ctx: FcnpCommandContext<'_, '_, '_, '_>) -> FcnpDispatch {
248 let frame_len = ctx.frame.frame_len;
249 let Ok(Some((request, consumed))) = FastCodec::decode_request(&ctx.frame.buf[..frame_len])
250 else {
251 return FcnpDispatch::Unsupported;
252 };
253 let FastCommand::Expire { key, ttl_ms } = request.command else {
254 return FcnpDispatch::Unsupported;
255 };
256 let Some(key_hash) = request.key_hash else {
257 return FcnpDispatch::Unsupported;
258 };
259 if let Some(owned_shard_id) = ctx.owned_shard_id {
260 match request.route_shard.map(|shard| shard as usize) {
261 Some(route_shard)
262 if route_shard == owned_shard_id
263 && ctx.request_matches_owned_shard_for_key(route_shard, key_hash, key) => {}
264 _ => {
265 ServerWire::write_fast_error(ctx.out, "ERR FCNP route shard mismatch");
266 return FcnpDispatch::Complete(consumed);
267 }
268 }
269 }
270 let changed = ctx.store.expire(key, relative_expire_at_ms(ttl_ms));
271 ServerWire::write_fast_integer(ctx.out, changed as i64);
272 FcnpDispatch::Complete(consumed)
273 }
274}