Skip to main content

fast_cache/commands/
exists.rs

1//! EXISTS command parsing and execution.
2
3use crate::Result;
4#[cfg(feature = "server")]
5use crate::protocol::FastCodec;
6use crate::protocol::{FastCommand, FastRequest, FastResponse, Frame};
7#[cfg(feature = "server")]
8use crate::server::commands::{
9    BorrowedCommandContext, DirectCommandContext, DirectFastCommand, FastCommandContext,
10    FastDirectCommand, FcnpCommandContext, FcnpDirectCommand, FcnpDispatch, RawCommandContext,
11    RawDirectCommand,
12};
13#[cfg(feature = "server")]
14use crate::server::wire::ServerWire;
15use crate::storage::{Command, EngineCommandContext, EngineFastFuture, EngineFrameFuture};
16use crate::storage::{ShardOperation, ShardReply};
17use crate::{FastCacheError, commands::EngineCommandDispatch};
18
19use super::DecodedFastCommand;
20use super::parsing::CommandArity;
21
22pub(crate) struct Exists;
23pub(crate) static COMMAND: Exists = Exists;
24
25#[derive(Debug, Clone)]
26pub(crate) struct OwnedExists {
27    key: Vec<u8>,
28}
29
30impl OwnedExists {
31    fn new(key: Vec<u8>) -> Self {
32        Self { key }
33    }
34}
35
36impl super::OwnedCommandData for OwnedExists {
37    type Spec = Exists;
38
39    fn route_key(&self) -> Option<&[u8]> {
40        Some(&self.key)
41    }
42
43    fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
44        Box::new(BorrowedExists::new(&self.key))
45    }
46}
47
48#[derive(Debug, Clone, Copy)]
49pub(crate) struct BorrowedExists<'a> {
50    key: &'a [u8],
51}
52
53impl<'a> BorrowedExists<'a> {
54    fn new(key: &'a [u8]) -> Self {
55        Self { key }
56    }
57}
58
59impl<'a> super::BorrowedCommandData<'a> for BorrowedExists<'a> {
60    type Spec = Exists;
61
62    fn route_key(&self) -> Option<&'a [u8]> {
63        Some(self.key)
64    }
65
66    fn to_owned_command(&self) -> Command {
67        Command::new(Box::new(OwnedExists::new(self.key.to_vec())))
68    }
69
70    fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
71    where
72        'a: 'b,
73    {
74        let key = self.key;
75        Box::pin(async move { Exists::execute_engine_frame(ctx, key).await })
76    }
77
78    #[cfg(feature = "server")]
79    fn execute_borrowed_frame(&self, store: &crate::storage::EmbeddedStore, _now_ms: u64) -> Frame {
80        Frame::Integer(store.exists(self.key) as i64)
81    }
82
83    #[cfg(feature = "server")]
84    fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
85        ServerWire::write_resp_integer(ctx.out, ctx.store.exists(self.key) as i64);
86    }
87
88    #[cfg(feature = "server")]
89    fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
90        Frame::Integer(ctx.exists(self.key) as i64)
91    }
92}
93
94impl super::CommandSpec for Exists {
95    const NAME: &'static str = "EXISTS";
96    const MUTATES_VALUE: bool = false;
97}
98
99impl super::OwnedCommandParse for Exists {
100    fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
101        CommandArity::<Self>::exact(parts.len(), 2)?;
102        Ok(Command::new(Box::new(OwnedExists::new(parts[1].clone()))))
103    }
104}
105
106impl<'a> super::BorrowedCommandParse<'a> for Exists {
107    fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
108        CommandArity::<Self>::exact(parts.len(), 2)?;
109        Ok(Box::new(BorrowedExists::new(parts[1])))
110    }
111}
112
113impl DecodedFastCommand for Exists {
114    fn matches_decoded_fast(&self, command: &FastCommand<'_>) -> bool {
115        matches!(command, FastCommand::Exists { .. })
116    }
117}
118
119impl EngineCommandDispatch for Exists {
120    fn execute_engine_fast<'a>(
121        &'static self,
122        ctx: EngineCommandContext<'a>,
123        request: FastRequest<'a>,
124    ) -> EngineFastFuture<'a> {
125        Box::pin(async move {
126            match request.command {
127                FastCommand::Exists { key } => Exists::execute_engine_integer(ctx, key)
128                    .await
129                    .map(FastResponse::Integer),
130                _ => Ok(FastResponse::Error(b"ERR unsupported command".to_vec())),
131            }
132        })
133    }
134}
135
136impl Exists {
137    async fn execute_engine_frame(ctx: EngineCommandContext<'_>, key: &[u8]) -> Result<Frame> {
138        Self::execute_engine_integer(ctx, key)
139            .await
140            .map(Frame::Integer)
141    }
142
143    async fn execute_engine_integer(ctx: EngineCommandContext<'_>, key: &[u8]) -> Result<i64> {
144        let shard = ctx.route_key(key);
145        match ctx
146            .request(shard, ShardOperation::Exists(key.to_vec()))
147            .await?
148        {
149            ShardReply::Integer(value) => Ok(value),
150            _ => Err(FastCacheError::Command(
151                "EXISTS received unexpected shard reply".into(),
152            )),
153        }
154    }
155}
156
157#[cfg(feature = "server")]
158impl RawDirectCommand for Exists {
159    fn execute(&self, ctx: RawCommandContext<'_, '_, '_>) {
160        match ctx.args.as_slice() {
161            [key] => ServerWire::write_resp_integer(ctx.out, ctx.store.exists(key) as i64),
162            _ => ServerWire::write_resp_error(
163                ctx.out,
164                "ERR wrong number of arguments for 'EXISTS' command",
165            ),
166        }
167    }
168}
169
170#[cfg(feature = "server")]
171impl DirectFastCommand for Exists {
172    fn execute_direct_fast(
173        &self,
174        ctx: DirectCommandContext,
175        request: FastRequest<'_>,
176    ) -> FastResponse {
177        match request.command {
178            FastCommand::Exists { key } => FastResponse::Integer(ctx.exists(key) as i64),
179            _ => FastResponse::Error(b"ERR unsupported command".to_vec()),
180        }
181    }
182}
183
184#[cfg(feature = "server")]
185impl FastDirectCommand for Exists {
186    fn execute_fast(&self, ctx: FastCommandContext<'_, '_>, command: FastCommand<'_>) {
187        match command {
188            FastCommand::Exists { key } => {
189                ServerWire::write_fast_integer(ctx.out, ctx.store.exists(key) as i64);
190            }
191            _ => ServerWire::write_fast_error(ctx.out, "ERR unsupported command"),
192        }
193    }
194}
195
196#[cfg(feature = "server")]
197impl FcnpDirectCommand for Exists {
198    fn opcode(&self) -> u8 {
199        6
200    }
201
202    fn try_execute_fcnp(&self, ctx: FcnpCommandContext<'_, '_, '_, '_>) -> FcnpDispatch {
203        let frame_len = ctx.frame.frame_len;
204        let Ok(Some((request, consumed))) = FastCodec::decode_request(&ctx.frame.buf[..frame_len])
205        else {
206            return FcnpDispatch::Unsupported;
207        };
208        let FastCommand::Exists { key } = request.command else {
209            return FcnpDispatch::Unsupported;
210        };
211        let Some(key_hash) = request.key_hash else {
212            return FcnpDispatch::Unsupported;
213        };
214        if let Some(owned_shard_id) = ctx.owned_shard_id {
215            match request.route_shard.map(|shard| shard as usize) {
216                Some(route_shard)
217                    if route_shard == owned_shard_id
218                        && ctx.request_matches_owned_shard_for_key(route_shard, key_hash, key) => {}
219                _ => {
220                    ServerWire::write_fast_error(ctx.out, "ERR FCNP route shard mismatch");
221                    return FcnpDispatch::Complete(consumed);
222                }
223            }
224        }
225        ServerWire::write_fast_integer(ctx.out, ctx.store.exists(key) as i64);
226        FcnpDispatch::Complete(consumed)
227    }
228}