1use crate::commands::EngineCommandDispatch;
4#[cfg(feature = "server")]
5use crate::protocol::FastCodec;
6use crate::protocol::{FastCommand, FastRequest, FastResponse, Frame};
7#[cfg(feature = "server")]
8use crate::server::commands::{
9 BorrowedCommandContext, DirectCommandContext, DirectFastCommand, FastCommandContext,
10 FastDirectCommand, FcnpCommandContext, FcnpDirectCommand, FcnpDispatch, RawCommandContext,
11 RawDirectCommand,
12};
13#[cfg(feature = "server")]
14use crate::server::wire::ServerWire;
15use crate::storage::{
16 Command, EngineCommandContext, EngineFastFuture, EngineFrameFuture, ShardKey, ShardOperation,
17 ShardReply, ShardValue, hash_key, now_millis,
18};
19use crate::{FastCacheError, Result};
20
21use super::DecodedFastCommand;
22use super::parsing::{CommandArity, TtlMillis};
23
24pub(crate) struct SetEx;
25pub(crate) static COMMAND: SetEx = SetEx;
26
27#[derive(Debug, Clone)]
28pub(crate) struct OwnedSetEx {
29 key: Vec<u8>,
30 ttl_ms: u64,
31 value: Vec<u8>,
32}
33
34impl OwnedSetEx {
35 fn new(key: Vec<u8>, ttl_ms: u64, value: Vec<u8>) -> Self {
36 Self { key, ttl_ms, value }
37 }
38}
39
40impl super::OwnedCommandData for OwnedSetEx {
41 type Spec = SetEx;
42
43 fn route_key(&self) -> Option<&[u8]> {
44 Some(&self.key)
45 }
46
47 fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
48 Box::new(BorrowedSetEx::new(&self.key, self.ttl_ms, &self.value))
49 }
50}
51
52#[derive(Debug, Clone, Copy)]
53pub(crate) struct BorrowedSetEx<'a> {
54 key: &'a [u8],
55 ttl_ms: u64,
56 value: &'a [u8],
57}
58
59impl<'a> BorrowedSetEx<'a> {
60 fn new(key: &'a [u8], ttl_ms: u64, value: &'a [u8]) -> Self {
61 Self { key, ttl_ms, value }
62 }
63}
64
65impl<'a> super::BorrowedCommandData<'a> for BorrowedSetEx<'a> {
66 type Spec = SetEx;
67
68 fn route_key(&self) -> Option<&'a [u8]> {
69 Some(self.key)
70 }
71
72 fn to_owned_command(&self) -> Command {
73 Command::new(Box::new(OwnedSetEx::new(
74 self.key.to_vec(),
75 self.ttl_ms,
76 self.value.to_vec(),
77 )))
78 }
79
80 fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
81 where
82 'a: 'b,
83 {
84 let key = self.key;
85 let ttl_ms = self.ttl_ms;
86 let value = self.value;
87 Box::pin(async move { SetEx::execute_engine_frame(ctx, key, ttl_ms, value).await })
88 }
89
90 #[cfg(feature = "server")]
91 fn execute_borrowed_frame(&self, store: &crate::storage::EmbeddedStore, _now_ms: u64) -> Frame {
92 store.set(self.key.to_vec(), self.value.to_vec(), Some(self.ttl_ms));
93 Frame::SimpleString("OK".into())
94 }
95
96 #[cfg(feature = "server")]
97 fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
98 ctx.store
99 .set(self.key.to_vec(), self.value.to_vec(), Some(self.ttl_ms));
100 ctx.out.extend_from_slice(b"+OK\r\n");
101 }
102
103 #[cfg(feature = "server")]
104 fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
105 ctx.set_owned(self.key.to_vec(), self.value.to_vec(), Some(self.ttl_ms));
106 Frame::SimpleString("OK".into())
107 }
108}
109
110impl super::CommandSpec for SetEx {
111 const NAME: &'static str = "SETEX";
112 const MUTATES_VALUE: bool = true;
113}
114
115impl super::OwnedCommandParse for SetEx {
116 fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
117 CommandArity::<Self>::exact(parts.len(), 4)?;
118 Ok(Command::new(Box::new(OwnedSetEx::new(
119 parts[1].clone(),
120 TtlMillis::<Self>::seconds(&parts[2])?,
121 parts[3].clone(),
122 ))))
123 }
124}
125
126impl<'a> super::BorrowedCommandParse<'a> for SetEx {
127 fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
128 CommandArity::<Self>::exact(parts.len(), 4)?;
129 Ok(Box::new(BorrowedSetEx::new(
130 parts[1],
131 TtlMillis::<Self>::seconds(parts[2])?,
132 parts[3],
133 )))
134 }
135}
136
137impl DecodedFastCommand for SetEx {
138 fn matches_decoded_fast(&self, command: &FastCommand<'_>) -> bool {
139 matches!(command, FastCommand::SetEx { .. })
140 }
141}
142
143impl EngineCommandDispatch for SetEx {
144 fn execute_engine_fast<'a>(
145 &'static self,
146 ctx: EngineCommandContext<'a>,
147 request: FastRequest<'a>,
148 ) -> EngineFastFuture<'a> {
149 Box::pin(async move {
150 match request.command {
151 FastCommand::SetEx { key, value, ttl_ms } => {
152 SetEx::store_value(ctx, key, ttl_ms, value).await?;
153 Ok(FastResponse::Ok)
154 }
155 _ => Ok(FastResponse::Error(b"ERR unsupported command".to_vec())),
156 }
157 })
158 }
159}
160
161impl SetEx {
162 async fn execute_engine_frame(
163 ctx: EngineCommandContext<'_>,
164 key: &[u8],
165 ttl_ms: u64,
166 value: &[u8],
167 ) -> Result<Frame> {
168 Self::store_value(ctx, key, ttl_ms, value).await?;
169 Ok(Frame::SimpleString("OK".into()))
170 }
171
172 pub(crate) async fn store_value(
173 ctx: EngineCommandContext<'_>,
174 key: &[u8],
175 ttl_ms: u64,
176 value: &[u8],
177 ) -> Result<()> {
178 let key_hash = hash_key(key);
179 let shard = ctx.route_key_hash(key_hash);
180 let expire_at_ms = Some(now_millis().saturating_add(ttl_ms));
181 match ctx
182 .request(
183 shard,
184 ShardOperation::Set {
185 key_hash,
186 key: ShardKey::inline(key),
187 value: ShardValue::inline(value),
188 expire_at_ms,
189 },
190 )
191 .await?
192 {
193 ShardReply::Ok => Ok(()),
194 _ => Err(FastCacheError::Command(
195 "SETEX received unexpected shard reply".into(),
196 )),
197 }
198 }
199}
200
201#[cfg(feature = "server")]
202impl RawDirectCommand for SetEx {
203 fn execute(&self, ctx: RawCommandContext<'_, '_, '_>) {
204 match ctx.args.as_slice() {
205 [key, ttl, value] => match TtlMillis::<()>::ascii_seconds(ttl) {
206 Some(ttl_ms) => {
207 ctx.store.set(key.to_vec(), value.to_vec(), Some(ttl_ms));
208 ctx.out.extend_from_slice(b"+OK\r\n");
209 }
210 None => ServerWire::write_resp_error(ctx.out, "ERR value is not an integer"),
211 },
212 _ => ServerWire::write_resp_error(
213 ctx.out,
214 "ERR wrong number of arguments for 'SETEX' command",
215 ),
216 }
217 }
218}
219
220#[cfg(feature = "server")]
221impl DirectFastCommand for SetEx {
222 fn execute_direct_fast(
223 &self,
224 ctx: DirectCommandContext,
225 request: FastRequest<'_>,
226 ) -> FastResponse {
227 match request.command {
228 FastCommand::SetEx { key, value, ttl_ms } => {
229 ctx.set_owned(key.to_vec(), value.to_vec(), Some(ttl_ms));
230 FastResponse::Ok
231 }
232 _ => FastResponse::Error(b"ERR unsupported command".to_vec()),
233 }
234 }
235}
236
237#[cfg(feature = "server")]
238impl FastDirectCommand for SetEx {
239 fn execute_fast(&self, ctx: FastCommandContext<'_, '_>, command: FastCommand<'_>) {
240 match command {
241 FastCommand::SetEx { key, value, ttl_ms } => {
242 ctx.store.set(key.to_vec(), value.to_vec(), Some(ttl_ms));
243 ServerWire::write_fast_ok(ctx.out);
244 }
245 _ => ServerWire::write_fast_error(ctx.out, "ERR unsupported command"),
246 }
247 }
248}
249
250#[cfg(feature = "server")]
251impl FcnpDirectCommand for SetEx {
252 fn opcode(&self) -> u8 {
253 3
254 }
255
256 fn try_execute_fcnp(&self, ctx: FcnpCommandContext<'_, '_, '_, '_>) -> FcnpDispatch {
257 let frame_len = ctx.frame.frame_len;
258 let Ok(Some((request, consumed))) = FastCodec::decode_request(&ctx.frame.buf[..frame_len])
259 else {
260 return FcnpDispatch::Unsupported;
261 };
262 let FastCommand::SetEx { key, value, ttl_ms } = request.command else {
263 return FcnpDispatch::Unsupported;
264 };
265 let Some(key_hash) = request.key_hash else {
266 return FcnpDispatch::Unsupported;
267 };
268 if let Some(owned_shard_id) = ctx.owned_shard_id {
269 match request.route_shard.map(|shard| shard as usize) {
270 Some(route_shard)
271 if route_shard == owned_shard_id
272 && ctx.request_matches_owned_shard_for_key(route_shard, key_hash, key) => {}
273 _ => {
274 ServerWire::write_fast_error(ctx.out, "ERR FCNP route shard mismatch");
275 return FcnpDispatch::Complete(consumed);
276 }
277 }
278 }
279 ctx.store.set(key.to_vec(), value.to_vec(), Some(ttl_ms));
280 ServerWire::write_fast_ok(ctx.out);
281 FcnpDispatch::Complete(consumed)
282 }
283}