Skip to main content

fast_cache/commands/
ttl.rs

1//! TTL command parsing and execution.
2
3use crate::FastCacheError;
4use crate::Result;
5use crate::commands::EngineCommandDispatch;
6#[cfg(feature = "server")]
7use crate::protocol::FastCodec;
8use crate::protocol::{FastCommand, FastRequest, FastResponse, Frame};
9#[cfg(feature = "server")]
10use crate::server::commands::{
11    BorrowedCommandContext, DirectCommandContext, DirectFastCommand, FastCommandContext,
12    FastDirectCommand, FcnpCommandContext, FcnpDirectCommand, FcnpDispatch, RawCommandContext,
13    RawDirectCommand,
14};
15#[cfg(feature = "server")]
16use crate::server::wire::ServerWire;
17use crate::storage::{
18    Command, EngineCommandContext, EngineFastFuture, EngineFrameFuture, ShardOperation, ShardReply,
19};
20
21use super::DecodedFastCommand;
22use super::parsing::CommandArity;
23
24pub(crate) struct Ttl;
25pub(crate) static COMMAND: Ttl = Ttl;
26
27#[derive(Debug, Clone)]
28pub(crate) struct OwnedTtl {
29    key: Vec<u8>,
30}
31
32impl OwnedTtl {
33    fn new(key: Vec<u8>) -> Self {
34        Self { key }
35    }
36}
37
38impl super::OwnedCommandData for OwnedTtl {
39    type Spec = Ttl;
40
41    fn route_key(&self) -> Option<&[u8]> {
42        Some(&self.key)
43    }
44
45    fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
46        Box::new(BorrowedTtl::new(&self.key))
47    }
48}
49
50#[derive(Debug, Clone, Copy)]
51pub(crate) struct BorrowedTtl<'a> {
52    key: &'a [u8],
53}
54
55impl<'a> BorrowedTtl<'a> {
56    fn new(key: &'a [u8]) -> Self {
57        Self { key }
58    }
59}
60
61impl<'a> super::BorrowedCommandData<'a> for BorrowedTtl<'a> {
62    type Spec = Ttl;
63
64    fn route_key(&self) -> Option<&'a [u8]> {
65        Some(self.key)
66    }
67
68    fn to_owned_command(&self) -> Command {
69        Command::new(Box::new(OwnedTtl::new(self.key.to_vec())))
70    }
71
72    fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
73    where
74        'a: 'b,
75    {
76        let key = self.key;
77        Box::pin(async move { Ttl::execute_engine_frame(ctx, key).await })
78    }
79
80    #[cfg(feature = "server")]
81    fn execute_borrowed_frame(&self, store: &crate::storage::EmbeddedStore, _now_ms: u64) -> Frame {
82        Frame::Integer(store.ttl_seconds(self.key))
83    }
84
85    #[cfg(feature = "server")]
86    fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
87        ServerWire::write_resp_integer(ctx.out, ctx.store.ttl_seconds(self.key));
88    }
89
90    #[cfg(feature = "server")]
91    fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
92        Frame::Integer(ctx.ttl(self.key, false))
93    }
94}
95
96impl super::CommandSpec for Ttl {
97    const NAME: &'static str = "TTL";
98    const MUTATES_VALUE: bool = false;
99}
100
101impl super::OwnedCommandParse for Ttl {
102    fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
103        CommandArity::<Self>::exact(parts.len(), 2)?;
104        Ok(Command::new(Box::new(OwnedTtl::new(parts[1].clone()))))
105    }
106}
107
108impl<'a> super::BorrowedCommandParse<'a> for Ttl {
109    fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
110        CommandArity::<Self>::exact(parts.len(), 2)?;
111        Ok(Box::new(BorrowedTtl::new(parts[1])))
112    }
113}
114
115impl DecodedFastCommand for Ttl {
116    fn matches_decoded_fast(&self, command: &FastCommand<'_>) -> bool {
117        matches!(command, FastCommand::Ttl { .. })
118    }
119}
120
121impl EngineCommandDispatch for Ttl {
122    fn execute_engine_fast<'a>(
123        &'static self,
124        ctx: EngineCommandContext<'a>,
125        request: FastRequest<'a>,
126    ) -> EngineFastFuture<'a> {
127        Box::pin(async move {
128            match request.command {
129                FastCommand::Ttl { key } => Ttl::execute_engine_integer(ctx, key, false)
130                    .await
131                    .map(FastResponse::Integer),
132                _ => Ok(FastResponse::Error(b"ERR unsupported command".to_vec())),
133            }
134        })
135    }
136}
137
138impl Ttl {
139    pub(crate) async fn execute_engine_integer(
140        ctx: EngineCommandContext<'_>,
141        key: &[u8],
142        millis: bool,
143    ) -> Result<i64> {
144        let shard = ctx.route_key(key);
145        match ctx
146            .request(
147                shard,
148                ShardOperation::Ttl {
149                    key: key.to_vec(),
150                    millis,
151                },
152            )
153            .await?
154        {
155            ShardReply::Integer(value) => Ok(value),
156            _ => Err(FastCacheError::Command(
157                "TTL received unexpected shard reply".into(),
158            )),
159        }
160    }
161
162    async fn execute_engine_frame(ctx: EngineCommandContext<'_>, key: &[u8]) -> Result<Frame> {
163        Self::execute_engine_integer(ctx, key, false)
164            .await
165            .map(Frame::Integer)
166    }
167}
168
169#[cfg(feature = "server")]
170impl RawDirectCommand for Ttl {
171    fn execute(&self, ctx: RawCommandContext<'_, '_, '_>) {
172        match ctx.args.as_slice() {
173            [key] => ServerWire::write_resp_integer(ctx.out, ctx.store.ttl_seconds(key)),
174            _ => ServerWire::write_resp_error(
175                ctx.out,
176                "ERR wrong number of arguments for 'TTL' command",
177            ),
178        }
179    }
180}
181
182#[cfg(feature = "server")]
183impl DirectFastCommand for Ttl {
184    fn execute_direct_fast(
185        &self,
186        ctx: DirectCommandContext,
187        request: FastRequest<'_>,
188    ) -> FastResponse {
189        match request.command {
190            FastCommand::Ttl { key } => FastResponse::Integer(ctx.ttl(key, false)),
191            _ => FastResponse::Error(b"ERR unsupported command".to_vec()),
192        }
193    }
194}
195
196#[cfg(feature = "server")]
197impl FastDirectCommand for Ttl {
198    fn execute_fast(&self, ctx: FastCommandContext<'_, '_>, command: FastCommand<'_>) {
199        match command {
200            FastCommand::Ttl { key } => {
201                ServerWire::write_fast_integer(ctx.out, ctx.store.ttl_seconds(key));
202            }
203            _ => ServerWire::write_fast_error(ctx.out, "ERR unsupported command"),
204        }
205    }
206}
207
208#[cfg(feature = "server")]
209impl FcnpDirectCommand for Ttl {
210    fn opcode(&self) -> u8 {
211        7
212    }
213
214    fn try_execute_fcnp(&self, ctx: FcnpCommandContext<'_, '_, '_, '_>) -> FcnpDispatch {
215        let frame_len = ctx.frame.frame_len;
216        let Ok(Some((request, consumed))) = FastCodec::decode_request(&ctx.frame.buf[..frame_len])
217        else {
218            return FcnpDispatch::Unsupported;
219        };
220        let FastCommand::Ttl { key } = request.command else {
221            return FcnpDispatch::Unsupported;
222        };
223        let Some(key_hash) = request.key_hash else {
224            return FcnpDispatch::Unsupported;
225        };
226        if let Some(owned_shard_id) = ctx.owned_shard_id {
227            match request.route_shard.map(|shard| shard as usize) {
228                Some(route_shard)
229                    if route_shard == owned_shard_id
230                        && ctx.request_matches_owned_shard_for_key(route_shard, key_hash, key) => {}
231                _ => {
232                    ServerWire::write_fast_error(ctx.out, "ERR FCNP route shard mismatch");
233                    return FcnpDispatch::Complete(consumed);
234                }
235            }
236        }
237        ServerWire::write_fast_integer(ctx.out, ctx.store.ttl_seconds(key));
238        FcnpDispatch::Complete(consumed)
239    }
240}