fast_cache/commands/
exists.rs1use crate::Result;
4#[cfg(feature = "server")]
5use crate::protocol::FastCodec;
6use crate::protocol::{FastCommand, FastRequest, FastResponse, Frame};
7#[cfg(feature = "server")]
8use crate::server::commands::{
9 BorrowedCommandContext, DirectCommandContext, DirectFastCommand, FastCommandContext,
10 FastDirectCommand, FcnpCommandContext, FcnpDirectCommand, FcnpDispatch, RawCommandContext,
11 RawDirectCommand,
12};
13#[cfg(feature = "server")]
14use crate::server::wire::ServerWire;
15use crate::storage::{Command, EngineCommandContext, EngineFastFuture, EngineFrameFuture};
16use crate::storage::{ShardOperation, ShardReply};
17use crate::{FastCacheError, commands::EngineCommandDispatch};
18
19use super::DecodedFastCommand;
20use super::parsing::CommandArity;
21
22pub(crate) struct Exists;
23pub(crate) static COMMAND: Exists = Exists;
24
25#[derive(Debug, Clone)]
26pub(crate) struct OwnedExists {
27 key: Vec<u8>,
28}
29
30impl OwnedExists {
31 fn new(key: Vec<u8>) -> Self {
32 Self { key }
33 }
34}
35
36impl super::OwnedCommandData for OwnedExists {
37 type Spec = Exists;
38
39 fn route_key(&self) -> Option<&[u8]> {
40 Some(&self.key)
41 }
42
43 fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
44 Box::new(BorrowedExists::new(&self.key))
45 }
46}
47
48#[derive(Debug, Clone, Copy)]
49pub(crate) struct BorrowedExists<'a> {
50 key: &'a [u8],
51}
52
53impl<'a> BorrowedExists<'a> {
54 fn new(key: &'a [u8]) -> Self {
55 Self { key }
56 }
57}
58
59impl<'a> super::BorrowedCommandData<'a> for BorrowedExists<'a> {
60 type Spec = Exists;
61
62 fn route_key(&self) -> Option<&'a [u8]> {
63 Some(self.key)
64 }
65
66 fn to_owned_command(&self) -> Command {
67 Command::new(Box::new(OwnedExists::new(self.key.to_vec())))
68 }
69
70 fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
71 where
72 'a: 'b,
73 {
74 let key = self.key;
75 Box::pin(async move { Exists::execute_engine_frame(ctx, key).await })
76 }
77
78 #[cfg(feature = "server")]
79 fn execute_borrowed_frame(&self, store: &crate::storage::EmbeddedStore, _now_ms: u64) -> Frame {
80 Frame::Integer(store.exists(self.key) as i64)
81 }
82
83 #[cfg(feature = "server")]
84 fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
85 ServerWire::write_resp_integer(ctx.out, ctx.store.exists(self.key) as i64);
86 }
87
88 #[cfg(feature = "server")]
89 fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
90 Frame::Integer(ctx.exists(self.key) as i64)
91 }
92}
93
94impl super::CommandSpec for Exists {
95 const NAME: &'static str = "EXISTS";
96 const MUTATES_VALUE: bool = false;
97}
98
99impl super::OwnedCommandParse for Exists {
100 fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
101 CommandArity::<Self>::exact(parts.len(), 2)?;
102 Ok(Command::new(Box::new(OwnedExists::new(parts[1].clone()))))
103 }
104}
105
106impl<'a> super::BorrowedCommandParse<'a> for Exists {
107 fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
108 CommandArity::<Self>::exact(parts.len(), 2)?;
109 Ok(Box::new(BorrowedExists::new(parts[1])))
110 }
111}
112
113impl DecodedFastCommand for Exists {
114 fn matches_decoded_fast(&self, command: &FastCommand<'_>) -> bool {
115 matches!(command, FastCommand::Exists { .. })
116 }
117}
118
119impl EngineCommandDispatch for Exists {
120 fn execute_engine_fast<'a>(
121 &'static self,
122 ctx: EngineCommandContext<'a>,
123 request: FastRequest<'a>,
124 ) -> EngineFastFuture<'a> {
125 Box::pin(async move {
126 match request.command {
127 FastCommand::Exists { key } => Exists::execute_engine_integer(ctx, key)
128 .await
129 .map(FastResponse::Integer),
130 _ => Ok(FastResponse::Error(b"ERR unsupported command".to_vec())),
131 }
132 })
133 }
134}
135
136impl Exists {
137 async fn execute_engine_frame(ctx: EngineCommandContext<'_>, key: &[u8]) -> Result<Frame> {
138 Self::execute_engine_integer(ctx, key)
139 .await
140 .map(Frame::Integer)
141 }
142
143 async fn execute_engine_integer(ctx: EngineCommandContext<'_>, key: &[u8]) -> Result<i64> {
144 let shard = ctx.route_key(key);
145 match ctx
146 .request(shard, ShardOperation::Exists(key.to_vec()))
147 .await?
148 {
149 ShardReply::Integer(value) => Ok(value),
150 _ => Err(FastCacheError::Command(
151 "EXISTS received unexpected shard reply".into(),
152 )),
153 }
154 }
155}
156
157#[cfg(feature = "server")]
158impl RawDirectCommand for Exists {
159 fn execute(&self, ctx: RawCommandContext<'_, '_, '_>) {
160 match ctx.args.as_slice() {
161 [key] => ServerWire::write_resp_integer(ctx.out, ctx.store.exists(key) as i64),
162 _ => ServerWire::write_resp_error(
163 ctx.out,
164 "ERR wrong number of arguments for 'EXISTS' command",
165 ),
166 }
167 }
168}
169
170#[cfg(feature = "server")]
171impl DirectFastCommand for Exists {
172 fn execute_direct_fast(
173 &self,
174 ctx: DirectCommandContext,
175 request: FastRequest<'_>,
176 ) -> FastResponse {
177 match request.command {
178 FastCommand::Exists { key } => FastResponse::Integer(ctx.exists(key) as i64),
179 _ => FastResponse::Error(b"ERR unsupported command".to_vec()),
180 }
181 }
182}
183
184#[cfg(feature = "server")]
185impl FastDirectCommand for Exists {
186 fn execute_fast(&self, ctx: FastCommandContext<'_, '_>, command: FastCommand<'_>) {
187 match command {
188 FastCommand::Exists { key } => {
189 ServerWire::write_fast_integer(ctx.out, ctx.store.exists(key) as i64);
190 }
191 _ => ServerWire::write_fast_error(ctx.out, "ERR unsupported command"),
192 }
193 }
194}
195
196#[cfg(feature = "server")]
197impl FcnpDirectCommand for Exists {
198 fn opcode(&self) -> u8 {
199 6
200 }
201
202 fn try_execute_fcnp(&self, ctx: FcnpCommandContext<'_, '_, '_, '_>) -> FcnpDispatch {
203 let frame_len = ctx.frame.frame_len;
204 let Ok(Some((request, consumed))) = FastCodec::decode_request(&ctx.frame.buf[..frame_len])
205 else {
206 return FcnpDispatch::Unsupported;
207 };
208 let FastCommand::Exists { key } = request.command else {
209 return FcnpDispatch::Unsupported;
210 };
211 let Some(key_hash) = request.key_hash else {
212 return FcnpDispatch::Unsupported;
213 };
214 if let Some(owned_shard_id) = ctx.owned_shard_id {
215 match request.route_shard.map(|shard| shard as usize) {
216 Some(route_shard)
217 if route_shard == owned_shard_id
218 && ctx.request_matches_owned_shard_for_key(route_shard, key_hash, key) => {}
219 _ => {
220 ServerWire::write_fast_error(ctx.out, "ERR FCNP route shard mismatch");
221 return FcnpDispatch::Complete(consumed);
222 }
223 }
224 }
225 ServerWire::write_fast_integer(ctx.out, ctx.store.exists(key) as i64);
226 FcnpDispatch::Complete(consumed)
227 }
228}