1use crate::FastCacheError;
4use crate::Result;
5use crate::commands::EngineCommandDispatch;
6#[cfg(feature = "server")]
7use crate::protocol::FastCodec;
8use crate::protocol::{FastCommand, FastRequest, FastResponse, Frame};
9#[cfg(feature = "server")]
10use crate::server::commands::{
11 BorrowedCommandContext, DirectCommandContext, DirectFastCommand, FastCommandContext,
12 FastDirectCommand, FcnpCommandContext, FcnpDirectCommand, FcnpDispatch, RawCommandContext,
13 RawDirectCommand,
14};
15#[cfg(feature = "server")]
16use crate::server::wire::ServerWire;
17use crate::storage::{
18 Command, EngineCommandContext, EngineFastFuture, EngineFrameFuture, ShardOperation, ShardReply,
19};
20
21use super::DecodedFastCommand;
22use super::parsing::CommandArity;
23
24pub(crate) struct Ttl;
25pub(crate) static COMMAND: Ttl = Ttl;
26
27#[derive(Debug, Clone)]
28pub(crate) struct OwnedTtl {
29 key: Vec<u8>,
30}
31
32impl OwnedTtl {
33 fn new(key: Vec<u8>) -> Self {
34 Self { key }
35 }
36}
37
38impl super::OwnedCommandData for OwnedTtl {
39 type Spec = Ttl;
40
41 fn route_key(&self) -> Option<&[u8]> {
42 Some(&self.key)
43 }
44
45 fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
46 Box::new(BorrowedTtl::new(&self.key))
47 }
48}
49
50#[derive(Debug, Clone, Copy)]
51pub(crate) struct BorrowedTtl<'a> {
52 key: &'a [u8],
53}
54
55impl<'a> BorrowedTtl<'a> {
56 fn new(key: &'a [u8]) -> Self {
57 Self { key }
58 }
59}
60
61impl<'a> super::BorrowedCommandData<'a> for BorrowedTtl<'a> {
62 type Spec = Ttl;
63
64 fn route_key(&self) -> Option<&'a [u8]> {
65 Some(self.key)
66 }
67
68 fn to_owned_command(&self) -> Command {
69 Command::new(Box::new(OwnedTtl::new(self.key.to_vec())))
70 }
71
72 fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
73 where
74 'a: 'b,
75 {
76 let key = self.key;
77 Box::pin(async move { Ttl::execute_engine_frame(ctx, key).await })
78 }
79
80 #[cfg(feature = "server")]
81 fn execute_borrowed_frame(&self, store: &crate::storage::EmbeddedStore, _now_ms: u64) -> Frame {
82 Frame::Integer(store.ttl_seconds(self.key))
83 }
84
85 #[cfg(feature = "server")]
86 fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
87 ServerWire::write_resp_integer(ctx.out, ctx.store.ttl_seconds(self.key));
88 }
89
90 #[cfg(feature = "server")]
91 fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
92 Frame::Integer(ctx.ttl(self.key, false))
93 }
94}
95
96impl super::CommandSpec for Ttl {
97 const NAME: &'static str = "TTL";
98 const MUTATES_VALUE: bool = false;
99}
100
101impl super::OwnedCommandParse for Ttl {
102 fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
103 CommandArity::<Self>::exact(parts.len(), 2)?;
104 Ok(Command::new(Box::new(OwnedTtl::new(parts[1].clone()))))
105 }
106}
107
108impl<'a> super::BorrowedCommandParse<'a> for Ttl {
109 fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
110 CommandArity::<Self>::exact(parts.len(), 2)?;
111 Ok(Box::new(BorrowedTtl::new(parts[1])))
112 }
113}
114
115impl DecodedFastCommand for Ttl {
116 fn matches_decoded_fast(&self, command: &FastCommand<'_>) -> bool {
117 matches!(command, FastCommand::Ttl { .. })
118 }
119}
120
121impl EngineCommandDispatch for Ttl {
122 fn execute_engine_fast<'a>(
123 &'static self,
124 ctx: EngineCommandContext<'a>,
125 request: FastRequest<'a>,
126 ) -> EngineFastFuture<'a> {
127 Box::pin(async move {
128 match request.command {
129 FastCommand::Ttl { key } => Ttl::execute_engine_integer(ctx, key, false)
130 .await
131 .map(FastResponse::Integer),
132 _ => Ok(FastResponse::Error(b"ERR unsupported command".to_vec())),
133 }
134 })
135 }
136}
137
138impl Ttl {
139 pub(crate) async fn execute_engine_integer(
140 ctx: EngineCommandContext<'_>,
141 key: &[u8],
142 millis: bool,
143 ) -> Result<i64> {
144 let shard = ctx.route_key(key);
145 match ctx
146 .request(
147 shard,
148 ShardOperation::Ttl {
149 key: key.to_vec(),
150 millis,
151 },
152 )
153 .await?
154 {
155 ShardReply::Integer(value) => Ok(value),
156 _ => Err(FastCacheError::Command(
157 "TTL received unexpected shard reply".into(),
158 )),
159 }
160 }
161
162 async fn execute_engine_frame(ctx: EngineCommandContext<'_>, key: &[u8]) -> Result<Frame> {
163 Self::execute_engine_integer(ctx, key, false)
164 .await
165 .map(Frame::Integer)
166 }
167}
168
169#[cfg(feature = "server")]
170impl RawDirectCommand for Ttl {
171 fn execute(&self, ctx: RawCommandContext<'_, '_, '_>) {
172 match ctx.args.as_slice() {
173 [key] => ServerWire::write_resp_integer(ctx.out, ctx.store.ttl_seconds(key)),
174 _ => ServerWire::write_resp_error(
175 ctx.out,
176 "ERR wrong number of arguments for 'TTL' command",
177 ),
178 }
179 }
180}
181
182#[cfg(feature = "server")]
183impl DirectFastCommand for Ttl {
184 fn execute_direct_fast(
185 &self,
186 ctx: DirectCommandContext,
187 request: FastRequest<'_>,
188 ) -> FastResponse {
189 match request.command {
190 FastCommand::Ttl { key } => FastResponse::Integer(ctx.ttl(key, false)),
191 _ => FastResponse::Error(b"ERR unsupported command".to_vec()),
192 }
193 }
194}
195
196#[cfg(feature = "server")]
197impl FastDirectCommand for Ttl {
198 fn execute_fast(&self, ctx: FastCommandContext<'_, '_>, command: FastCommand<'_>) {
199 match command {
200 FastCommand::Ttl { key } => {
201 ServerWire::write_fast_integer(ctx.out, ctx.store.ttl_seconds(key));
202 }
203 _ => ServerWire::write_fast_error(ctx.out, "ERR unsupported command"),
204 }
205 }
206}
207
208#[cfg(feature = "server")]
209impl FcnpDirectCommand for Ttl {
210 fn opcode(&self) -> u8 {
211 7
212 }
213
214 fn try_execute_fcnp(&self, ctx: FcnpCommandContext<'_, '_, '_, '_>) -> FcnpDispatch {
215 let frame_len = ctx.frame.frame_len;
216 let Ok(Some((request, consumed))) = FastCodec::decode_request(&ctx.frame.buf[..frame_len])
217 else {
218 return FcnpDispatch::Unsupported;
219 };
220 let FastCommand::Ttl { key } = request.command else {
221 return FcnpDispatch::Unsupported;
222 };
223 let Some(key_hash) = request.key_hash else {
224 return FcnpDispatch::Unsupported;
225 };
226 if let Some(owned_shard_id) = ctx.owned_shard_id {
227 match request.route_shard.map(|shard| shard as usize) {
228 Some(route_shard)
229 if route_shard == owned_shard_id
230 && ctx.request_matches_owned_shard_for_key(route_shard, key_hash, key) => {}
231 _ => {
232 ServerWire::write_fast_error(ctx.out, "ERR FCNP route shard mismatch");
233 return FcnpDispatch::Complete(consumed);
234 }
235 }
236 }
237 ServerWire::write_fast_integer(ctx.out, ctx.store.ttl_seconds(key));
238 FcnpDispatch::Complete(consumed)
239 }
240}