reddb_server/storage/query/parser/
queue.rs1use super::super::ast::{
4 AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueCommand, QueueMode,
5 QueueSide,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10
11impl<'a> Parser<'a> {
12 pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
14 let if_not_exists = self.match_if_not_exists()?;
15 let name = self.expect_ident()?;
16
17 let mut mode = QueueMode::Work;
18 let mut priority = false;
19 let mut max_size = None;
20 let mut ttl_ms = None;
21 let mut dlq = None;
22 let mut max_attempts = 3u32;
23
24 loop {
26 if let Some(parsed_mode) = self.consume_queue_mode()? {
27 mode = parsed_mode;
28 } else if self.consume(&Token::Priority)? {
29 priority = true;
30 } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
31 max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
32 } else if self.consume_ident_ci("MAX_ATTEMPTS")?
33 || self.consume_ident_ci("MAXATTEMPTS")?
34 {
35 max_attempts = self.parse_integer()?.max(1) as u32;
36 } else if self.consume(&Token::With)? {
37 if self.consume_ident_ci("EVENTS")? {
38 return Err(ParseError::new(
39 "queues cannot have event subscriptions".to_string(),
40 self.position(),
41 ));
42 } else if self.consume_ident_ci("TTL")? {
43 let value = self.parse_float()?;
44 let unit = self.parse_queue_duration_unit()?;
45 ttl_ms = Some((value * unit) as u64);
46 } else if self.consume_ident_ci("DLQ")? {
47 dlq = Some(self.expect_ident()?);
48 }
49 } else {
50 break;
51 }
52 }
53
54 Ok(QueryExpr::CreateQueue(CreateQueueQuery {
55 name,
56 mode,
57 priority,
58 max_size,
59 ttl_ms,
60 dlq,
61 max_attempts,
62 if_not_exists,
63 }))
64 }
65
66 pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
68 let name = self.expect_ident()?;
69 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
70 return Err(ParseError::expected(
71 vec!["SET"],
72 self.peek(),
73 self.position(),
74 ));
75 }
76 if !self.consume(&Token::Mode)? && !self.consume_ident_ci("MODE")? {
77 return Err(ParseError::expected(
78 vec!["MODE"],
79 self.peek(),
80 self.position(),
81 ));
82 }
83 let mode = self.parse_queue_mode()?;
84 Ok(QueryExpr::AlterQueue(AlterQueueQuery { name, mode }))
85 }
86
87 pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
89 let if_exists = self.match_if_exists()?;
90 let name = self.parse_drop_collection_name()?;
91 Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
92 }
93
94 pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
96 self.expect(Token::Queue)?;
97
98 match self.peek().clone() {
99 Token::Push => {
100 self.advance()?;
101 let queue = self.expect_ident()?;
102 let value = self.parse_value()?;
103 let priority = if self.consume(&Token::Priority)? {
104 Some(self.parse_integer()? as i32)
105 } else {
106 None
107 };
108 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
109 queue,
110 value,
111 side: QueueSide::Right,
112 priority,
113 }))
114 }
115 Token::Pop => {
116 self.advance()?;
117 let queue = self.expect_ident()?;
118 let count = if self.consume(&Token::Count)? {
119 self.parse_integer()? as usize
120 } else {
121 1
122 };
123 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
124 queue,
125 side: QueueSide::Left,
126 count,
127 }))
128 }
129 Token::Peek => {
130 self.advance()?;
131 let queue = self.expect_ident()?;
132 let count = if matches!(self.peek(), Token::Integer(_)) {
133 self.parse_integer()? as usize
134 } else {
135 1
136 };
137 Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
138 }
139 Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
140 self.advance()?;
141 let queue = self.expect_ident()?;
142 Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
143 }
144 Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
145 self.advance()?;
146 self.expect(Token::From)?;
147 let source = self.expect_ident()?;
148 self.expect(Token::To)?;
149 let destination = self.expect_ident()?;
150 let filter = if self.consume(&Token::Where)? {
151 Some(self.parse_filter()?)
152 } else {
153 None
154 };
155 let limit = if self.consume(&Token::Limit)? {
156 self.parse_positive_integer("LIMIT")? as usize
157 } else if filter.is_some() {
158 return Err(ParseError::expected(
159 vec!["LIMIT"],
160 self.peek(),
161 self.position(),
162 ));
163 } else {
164 1
165 };
166 Ok(QueryExpr::QueueCommand(QueueCommand::Move {
167 source,
168 destination,
169 filter,
170 limit,
171 }))
172 }
173 Token::Purge => {
174 self.advance()?;
175 let queue = self.expect_ident()?;
176 Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
177 }
178 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
179 self.advance()?;
180 let queue = self.expect_ident()?;
181 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
182 queue,
183 side: QueueSide::Left,
184 count: 1,
185 }))
186 }
187 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
188 self.advance()?;
189 let queue = self.expect_ident()?;
190 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
191 queue,
192 side: QueueSide::Right,
193 count: 1,
194 }))
195 }
196 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
197 self.advance()?;
198 let queue = self.expect_ident()?;
199 let value = self.parse_value()?;
200 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
201 queue,
202 value,
203 side: QueueSide::Left,
204 priority: None,
205 }))
206 }
207 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
208 self.advance()?;
209 let queue = self.expect_ident()?;
210 let value = self.parse_value()?;
211 let priority = if self.consume(&Token::Priority)? {
212 Some(self.parse_integer()? as i32)
213 } else {
214 None
215 };
216 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
217 queue,
218 value,
219 side: QueueSide::Right,
220 priority,
221 }))
222 }
223 Token::Group => {
224 self.advance()?;
225 self.expect(Token::Create)?;
226 let queue = self.expect_ident()?;
227 let group = self.expect_ident()?;
228 Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
229 queue,
230 group,
231 }))
232 }
233 Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
234 self.advance()?;
235 let queue = self.expect_ident()?;
236 let group = if self.consume(&Token::Group)? {
237 Some(self.expect_ident()?)
238 } else {
239 None
240 };
241 if !self.consume_ident_ci("CONSUMER")? {
243 return Err(ParseError::expected(
244 vec!["CONSUMER"],
245 self.peek(),
246 self.position(),
247 ));
248 }
249 let consumer = self.expect_ident()?;
250 let count = if self.consume(&Token::Count)? {
251 self.parse_integer()? as usize
252 } else {
253 1
254 };
255 Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
256 queue,
257 group,
258 consumer,
259 count,
260 }))
261 }
262 Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
263 self.advance()?;
264 let queue = self.expect_ident()?;
265 self.expect(Token::Group)?;
266 let group = self.expect_ident()?;
267 Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
268 queue,
269 group,
270 }))
271 }
272 Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
273 self.advance()?;
274 let queue = self.expect_ident()?;
275 self.expect(Token::Group)?;
276 let group = self.expect_ident()?;
277 if !self.consume_ident_ci("CONSUMER")? {
278 return Err(ParseError::expected(
279 vec!["CONSUMER"],
280 self.peek(),
281 self.position(),
282 ));
283 }
284 let consumer = self.expect_ident()?;
285 if !self.consume_ident_ci("MIN_IDLE")? {
286 return Err(ParseError::expected(
287 vec!["MIN_IDLE"],
288 self.peek(),
289 self.position(),
290 ));
291 }
292 let min_idle_ms = self.parse_integer()?.max(0) as u64;
293 Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
294 queue,
295 group,
296 consumer,
297 min_idle_ms,
298 }))
299 }
300 Token::Ack => {
301 self.advance()?;
302 let queue = self.expect_ident()?;
303 self.expect(Token::Group)?;
304 let group = self.expect_ident()?;
305 let message_id = self.parse_string()?;
306 Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
307 queue,
308 group,
309 message_id,
310 }))
311 }
312 Token::Nack => {
313 self.advance()?;
314 let queue = self.expect_ident()?;
315 self.expect(Token::Group)?;
316 let group = self.expect_ident()?;
317 let message_id = self.parse_string()?;
318 Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
319 queue,
320 group,
321 message_id,
322 }))
323 }
324 _ => Err(ParseError::expected(
325 vec![
326 "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
327 "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
328 ],
329 self.peek(),
330 self.position(),
331 )),
332 }
333 }
334
335 fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
336 match self.peek() {
337 Token::Work => {
338 self.advance()?;
339 Ok(Some(QueueMode::Work))
340 }
341 Token::Ident(name) => {
342 if let Some(mode) = QueueMode::parse(name) {
343 self.advance()?;
344 Ok(Some(mode))
345 } else {
346 Ok(None)
347 }
348 }
349 _ => Ok(None),
350 }
351 }
352
353 fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
354 match self.consume_queue_mode()? {
355 Some(mode) => Ok(mode),
356 None => Err(ParseError::expected(
357 vec!["FANOUT", "WORK"],
358 self.peek(),
359 self.position(),
360 )),
361 }
362 }
363
364 fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
366 match self.peek().clone() {
367 Token::Ident(ref unit) => {
368 let mult = match unit.to_ascii_lowercase().as_str() {
369 "ms" => 1.0,
370 "s" | "sec" | "secs" => 1_000.0,
371 "m" | "min" | "mins" => 60_000.0,
372 "h" | "hr" | "hrs" => 3_600_000.0,
373 "d" | "day" | "days" => 86_400_000.0,
374 _ => return Ok(1_000.0),
375 };
376 self.advance()?;
377 Ok(mult)
378 }
379 _ => Ok(1_000.0),
380 }
381 }
382}