Skip to main content

fast_cache/commands/
setex.rs

1//! SETEX command parsing and execution.
2
3use crate::commands::EngineCommandDispatch;
4#[cfg(feature = "server")]
5use crate::protocol::FastCodec;
6use crate::protocol::{FastCommand, FastRequest, FastResponse, Frame};
7#[cfg(feature = "server")]
8use crate::server::commands::{
9    BorrowedCommandContext, DirectCommandContext, DirectFastCommand, FastCommandContext,
10    FastDirectCommand, FcnpCommandContext, FcnpDirectCommand, FcnpDispatch, RawCommandContext,
11    RawDirectCommand,
12};
13#[cfg(feature = "server")]
14use crate::server::wire::ServerWire;
15use crate::storage::{
16    Command, EngineCommandContext, EngineFastFuture, EngineFrameFuture, ShardKey, ShardOperation,
17    ShardReply, ShardValue, hash_key, now_millis,
18};
19use crate::{FastCacheError, Result};
20
21use super::DecodedFastCommand;
22use super::parsing::{CommandArity, TtlMillis};
23
24pub(crate) struct SetEx;
25pub(crate) static COMMAND: SetEx = SetEx;
26
27#[derive(Debug, Clone)]
28pub(crate) struct OwnedSetEx {
29    key: Vec<u8>,
30    ttl_ms: u64,
31    value: Vec<u8>,
32}
33
34impl OwnedSetEx {
35    fn new(key: Vec<u8>, ttl_ms: u64, value: Vec<u8>) -> Self {
36        Self { key, ttl_ms, value }
37    }
38}
39
40impl super::OwnedCommandData for OwnedSetEx {
41    type Spec = SetEx;
42
43    fn route_key(&self) -> Option<&[u8]> {
44        Some(&self.key)
45    }
46
47    fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
48        Box::new(BorrowedSetEx::new(&self.key, self.ttl_ms, &self.value))
49    }
50}
51
52#[derive(Debug, Clone, Copy)]
53pub(crate) struct BorrowedSetEx<'a> {
54    key: &'a [u8],
55    ttl_ms: u64,
56    value: &'a [u8],
57}
58
59impl<'a> BorrowedSetEx<'a> {
60    fn new(key: &'a [u8], ttl_ms: u64, value: &'a [u8]) -> Self {
61        Self { key, ttl_ms, value }
62    }
63}
64
65impl<'a> super::BorrowedCommandData<'a> for BorrowedSetEx<'a> {
66    type Spec = SetEx;
67
68    fn route_key(&self) -> Option<&'a [u8]> {
69        Some(self.key)
70    }
71
72    fn to_owned_command(&self) -> Command {
73        Command::new(Box::new(OwnedSetEx::new(
74            self.key.to_vec(),
75            self.ttl_ms,
76            self.value.to_vec(),
77        )))
78    }
79
80    fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
81    where
82        'a: 'b,
83    {
84        let key = self.key;
85        let ttl_ms = self.ttl_ms;
86        let value = self.value;
87        Box::pin(async move { SetEx::execute_engine_frame(ctx, key, ttl_ms, value).await })
88    }
89
90    #[cfg(feature = "server")]
91    fn execute_borrowed_frame(&self, store: &crate::storage::EmbeddedStore, _now_ms: u64) -> Frame {
92        store.set(self.key.to_vec(), self.value.to_vec(), Some(self.ttl_ms));
93        Frame::SimpleString("OK".into())
94    }
95
96    #[cfg(feature = "server")]
97    fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
98        ctx.store
99            .set(self.key.to_vec(), self.value.to_vec(), Some(self.ttl_ms));
100        ctx.out.extend_from_slice(b"+OK\r\n");
101    }
102
103    #[cfg(feature = "server")]
104    fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
105        ctx.set_owned(self.key.to_vec(), self.value.to_vec(), Some(self.ttl_ms));
106        Frame::SimpleString("OK".into())
107    }
108}
109
110impl super::CommandSpec for SetEx {
111    const NAME: &'static str = "SETEX";
112    const MUTATES_VALUE: bool = true;
113}
114
115impl super::OwnedCommandParse for SetEx {
116    fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
117        CommandArity::<Self>::exact(parts.len(), 4)?;
118        Ok(Command::new(Box::new(OwnedSetEx::new(
119            parts[1].clone(),
120            TtlMillis::<Self>::seconds(&parts[2])?,
121            parts[3].clone(),
122        ))))
123    }
124}
125
126impl<'a> super::BorrowedCommandParse<'a> for SetEx {
127    fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
128        CommandArity::<Self>::exact(parts.len(), 4)?;
129        Ok(Box::new(BorrowedSetEx::new(
130            parts[1],
131            TtlMillis::<Self>::seconds(parts[2])?,
132            parts[3],
133        )))
134    }
135}
136
137impl DecodedFastCommand for SetEx {
138    fn matches_decoded_fast(&self, command: &FastCommand<'_>) -> bool {
139        matches!(command, FastCommand::SetEx { .. })
140    }
141}
142
143impl EngineCommandDispatch for SetEx {
144    fn execute_engine_fast<'a>(
145        &'static self,
146        ctx: EngineCommandContext<'a>,
147        request: FastRequest<'a>,
148    ) -> EngineFastFuture<'a> {
149        Box::pin(async move {
150            match request.command {
151                FastCommand::SetEx { key, value, ttl_ms } => {
152                    SetEx::store_value(ctx, key, ttl_ms, value).await?;
153                    Ok(FastResponse::Ok)
154                }
155                _ => Ok(FastResponse::Error(b"ERR unsupported command".to_vec())),
156            }
157        })
158    }
159}
160
161impl SetEx {
162    async fn execute_engine_frame(
163        ctx: EngineCommandContext<'_>,
164        key: &[u8],
165        ttl_ms: u64,
166        value: &[u8],
167    ) -> Result<Frame> {
168        Self::store_value(ctx, key, ttl_ms, value).await?;
169        Ok(Frame::SimpleString("OK".into()))
170    }
171
172    pub(crate) async fn store_value(
173        ctx: EngineCommandContext<'_>,
174        key: &[u8],
175        ttl_ms: u64,
176        value: &[u8],
177    ) -> Result<()> {
178        let key_hash = hash_key(key);
179        let shard = ctx.route_key_hash(key_hash);
180        let expire_at_ms = Some(now_millis().saturating_add(ttl_ms));
181        match ctx
182            .request(
183                shard,
184                ShardOperation::Set {
185                    key_hash,
186                    key: ShardKey::inline(key),
187                    value: ShardValue::inline(value),
188                    expire_at_ms,
189                },
190            )
191            .await?
192        {
193            ShardReply::Ok => Ok(()),
194            _ => Err(FastCacheError::Command(
195                "SETEX received unexpected shard reply".into(),
196            )),
197        }
198    }
199}
200
201#[cfg(feature = "server")]
202impl RawDirectCommand for SetEx {
203    fn execute(&self, ctx: RawCommandContext<'_, '_, '_>) {
204        match ctx.args.as_slice() {
205            [key, ttl, value] => match TtlMillis::<()>::ascii_seconds(ttl) {
206                Some(ttl_ms) => {
207                    ctx.store.set(key.to_vec(), value.to_vec(), Some(ttl_ms));
208                    ctx.out.extend_from_slice(b"+OK\r\n");
209                }
210                None => ServerWire::write_resp_error(ctx.out, "ERR value is not an integer"),
211            },
212            _ => ServerWire::write_resp_error(
213                ctx.out,
214                "ERR wrong number of arguments for 'SETEX' command",
215            ),
216        }
217    }
218}
219
220#[cfg(feature = "server")]
221impl DirectFastCommand for SetEx {
222    fn execute_direct_fast(
223        &self,
224        ctx: DirectCommandContext,
225        request: FastRequest<'_>,
226    ) -> FastResponse {
227        match request.command {
228            FastCommand::SetEx { key, value, ttl_ms } => {
229                ctx.set_owned(key.to_vec(), value.to_vec(), Some(ttl_ms));
230                FastResponse::Ok
231            }
232            _ => FastResponse::Error(b"ERR unsupported command".to_vec()),
233        }
234    }
235}
236
237#[cfg(feature = "server")]
238impl FastDirectCommand for SetEx {
239    fn execute_fast(&self, ctx: FastCommandContext<'_, '_>, command: FastCommand<'_>) {
240        match command {
241            FastCommand::SetEx { key, value, ttl_ms } => {
242                ctx.store.set(key.to_vec(), value.to_vec(), Some(ttl_ms));
243                ServerWire::write_fast_ok(ctx.out);
244            }
245            _ => ServerWire::write_fast_error(ctx.out, "ERR unsupported command"),
246        }
247    }
248}
249
250#[cfg(feature = "server")]
251impl FcnpDirectCommand for SetEx {
252    fn opcode(&self) -> u8 {
253        3
254    }
255
256    fn try_execute_fcnp(&self, ctx: FcnpCommandContext<'_, '_, '_, '_>) -> FcnpDispatch {
257        let frame_len = ctx.frame.frame_len;
258        let Ok(Some((request, consumed))) = FastCodec::decode_request(&ctx.frame.buf[..frame_len])
259        else {
260            return FcnpDispatch::Unsupported;
261        };
262        let FastCommand::SetEx { key, value, ttl_ms } = request.command else {
263            return FcnpDispatch::Unsupported;
264        };
265        let Some(key_hash) = request.key_hash else {
266            return FcnpDispatch::Unsupported;
267        };
268        if let Some(owned_shard_id) = ctx.owned_shard_id {
269            match request.route_shard.map(|shard| shard as usize) {
270                Some(route_shard)
271                    if route_shard == owned_shard_id
272                        && ctx.request_matches_owned_shard_for_key(route_shard, key_hash, key) => {}
273                _ => {
274                    ServerWire::write_fast_error(ctx.out, "ERR FCNP route shard mismatch");
275                    return FcnpDispatch::Complete(consumed);
276                }
277            }
278        }
279        ctx.store.set(key.to_vec(), value.to_vec(), Some(ttl_ms));
280        ServerWire::write_fast_ok(ctx.out);
281        FcnpDispatch::Complete(consumed)
282    }
283}