reddb_server/storage/query/parser/
queue.rs1use super::super::ast::{
4 AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueCommand, QueueMode,
5 QueueSide, DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP, DEFAULT_QUEUE_LOCK_DEADLINE_MS,
6 DEFAULT_QUEUE_MAX_ATTEMPTS,
7};
8use super::super::lexer::Token;
9use super::error::ParseError;
10use super::Parser;
11
12impl<'a> Parser<'a> {
13 pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
15 let if_not_exists = self.match_if_not_exists()?;
16 let name = self.expect_ident()?;
17
18 let mut mode = QueueMode::Work;
19 let mut priority = false;
20 let mut max_size = None;
21 let mut ttl_ms = None;
22 let mut dlq = None;
23 let mut max_attempts = DEFAULT_QUEUE_MAX_ATTEMPTS;
24 let mut lock_deadline_ms = DEFAULT_QUEUE_LOCK_DEADLINE_MS;
25 let mut in_flight_cap_per_group = DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP;
26
27 loop {
29 if let Some(parsed_mode) = self.consume_queue_mode()? {
30 mode = parsed_mode;
31 } else if self.consume(&Token::Priority)? {
32 priority = true;
33 } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
34 max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
35 } else if self.consume_ident_ci("MAX_ATTEMPTS")?
36 || self.consume_ident_ci("MAXATTEMPTS")?
37 {
38 max_attempts = self.parse_integer()?.max(1) as u32;
39 } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
40 lock_deadline_ms = self.parse_integer()?.max(1) as u64;
41 } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
42 in_flight_cap_per_group = self.parse_integer()?.max(1) as u32;
43 } else if self.consume(&Token::With)? {
44 if self.consume_ident_ci("EVENTS")? {
45 return Err(ParseError::new(
46 "queues cannot have event subscriptions".to_string(),
47 self.position(),
48 ));
49 } else if self.consume_ident_ci("TTL")? {
50 let value = self.parse_float()?;
51 let unit = self.parse_queue_duration_unit()?;
52 ttl_ms = Some((value * unit) as u64);
53 } else if self.consume_ident_ci("DLQ")? {
54 dlq = Some(self.expect_ident()?);
55 }
56 } else {
57 break;
58 }
59 }
60
61 Ok(QueryExpr::CreateQueue(CreateQueueQuery {
62 name,
63 mode,
64 priority,
65 max_size,
66 ttl_ms,
67 dlq,
68 max_attempts,
69 lock_deadline_ms,
70 in_flight_cap_per_group,
71 if_not_exists,
72 }))
73 }
74
75 pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
84 let name = self.expect_ident()?;
85 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
86 return Err(ParseError::expected(
87 vec!["SET"],
88 self.peek(),
89 self.position(),
90 ));
91 }
92
93 let mut alter = AlterQueueQuery {
94 name,
95 ..Default::default()
96 };
97
98 if self.consume(&Token::Mode)? || self.consume_ident_ci("MODE")? {
99 alter.mode = Some(self.parse_queue_mode()?);
100 } else if self.consume_ident_ci("MAX_ATTEMPTS")?
101 || self.consume_ident_ci("MAXATTEMPTS")?
102 {
103 alter.max_attempts = Some(self.parse_integer()?.max(1) as u32);
104 } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
105 alter.lock_deadline_ms = Some(self.parse_integer()?.max(1) as u64);
106 } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
107 alter.in_flight_cap_per_group = Some(self.parse_integer()?.max(1) as u32);
108 } else if self.consume_ident_ci("DLQ")? {
109 alter.dlq = Some(self.expect_ident()?);
110 } else {
111 return Err(ParseError::expected(
112 vec![
113 "MODE",
114 "MAX_ATTEMPTS",
115 "LOCK_DEADLINE_MS",
116 "IN_FLIGHT_CAP_PER_GROUP",
117 "DLQ",
118 ],
119 self.peek(),
120 self.position(),
121 ));
122 }
123
124 Ok(QueryExpr::AlterQueue(alter))
125 }
126
127 pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
129 let if_exists = self.match_if_exists()?;
130 let name = self.parse_drop_collection_name()?;
131 Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
132 }
133
134 pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
136 self.expect(Token::Queue)?;
137
138 match self.peek().clone() {
139 Token::Push => {
140 self.advance()?;
141 let queue = self.expect_ident()?;
142 let value = self.parse_value()?;
143 let priority = if self.consume(&Token::Priority)? {
144 Some(self.parse_integer()? as i32)
145 } else {
146 None
147 };
148 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
149 queue,
150 value,
151 side: QueueSide::Right,
152 priority,
153 }))
154 }
155 Token::Pop => {
156 self.advance()?;
157 let queue = self.expect_ident()?;
158 let count = if self.consume(&Token::Count)? {
159 self.parse_integer()? as usize
160 } else {
161 1
162 };
163 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
164 queue,
165 side: QueueSide::Left,
166 count,
167 }))
168 }
169 Token::Peek => {
170 self.advance()?;
171 let queue = self.expect_ident()?;
172 let count = if self.consume(&Token::Count)? {
173 self.parse_integer()? as usize
174 } else if matches!(self.peek(), Token::Integer(_)) {
175 self.parse_integer()? as usize
176 } else {
177 1
178 };
179 Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
180 }
181 Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
182 self.advance()?;
183 let queue = self.expect_ident()?;
184 Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
185 }
186 Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
187 self.advance()?;
188 self.expect(Token::From)?;
189 let source = self.expect_ident()?;
190 self.expect(Token::To)?;
191 let destination = self.expect_ident()?;
192 let filter = if self.consume(&Token::Where)? {
193 Some(self.parse_filter()?)
194 } else {
195 None
196 };
197 let limit = if self.consume(&Token::Limit)? {
198 self.parse_positive_integer("LIMIT")? as usize
199 } else if filter.is_some() {
200 return Err(ParseError::expected(
201 vec!["LIMIT"],
202 self.peek(),
203 self.position(),
204 ));
205 } else {
206 1
207 };
208 Ok(QueryExpr::QueueCommand(QueueCommand::Move {
209 source,
210 destination,
211 filter,
212 limit,
213 }))
214 }
215 Token::Purge => {
216 self.advance()?;
217 let queue = self.expect_ident()?;
218 Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
219 }
220 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
221 self.advance()?;
222 let queue = self.expect_ident()?;
223 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
224 queue,
225 side: QueueSide::Left,
226 count: 1,
227 }))
228 }
229 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
230 self.advance()?;
231 let queue = self.expect_ident()?;
232 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
233 queue,
234 side: QueueSide::Right,
235 count: 1,
236 }))
237 }
238 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
239 self.advance()?;
240 let queue = self.expect_ident()?;
241 let value = self.parse_value()?;
242 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
243 queue,
244 value,
245 side: QueueSide::Left,
246 priority: None,
247 }))
248 }
249 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
250 self.advance()?;
251 let queue = self.expect_ident()?;
252 let value = self.parse_value()?;
253 let priority = if self.consume(&Token::Priority)? {
254 Some(self.parse_integer()? as i32)
255 } else {
256 None
257 };
258 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
259 queue,
260 value,
261 side: QueueSide::Right,
262 priority,
263 }))
264 }
265 Token::Group => {
266 self.advance()?;
267 self.expect(Token::Create)?;
268 let queue = self.expect_ident()?;
269 let group = self.expect_ident()?;
270 Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
271 queue,
272 group,
273 }))
274 }
275 Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
276 self.advance()?;
277 let queue = self.expect_ident()?;
278 let group = if self.consume(&Token::Group)? {
279 Some(self.expect_ident()?)
280 } else {
281 None
282 };
283 if !self.consume_ident_ci("CONSUMER")? {
285 return Err(ParseError::expected(
286 vec!["CONSUMER"],
287 self.peek(),
288 self.position(),
289 ));
290 }
291 let consumer = self.expect_ident()?;
292 let count = if self.consume(&Token::Count)? {
293 self.parse_integer()? as usize
294 } else {
295 1
296 };
297 Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
298 queue,
299 group,
300 consumer,
301 count,
302 }))
303 }
304 Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
305 self.advance()?;
306 let queue = self.expect_ident()?;
307 self.expect(Token::Group)?;
308 let group = self.expect_ident()?;
309 Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
310 queue,
311 group,
312 }))
313 }
314 Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
315 self.advance()?;
316 let queue = self.expect_ident()?;
317 self.expect(Token::Group)?;
318 let group = self.expect_ident()?;
319 if !self.consume_ident_ci("CONSUMER")? {
320 return Err(ParseError::expected(
321 vec!["CONSUMER"],
322 self.peek(),
323 self.position(),
324 ));
325 }
326 let consumer = self.expect_ident()?;
327 if !self.consume_ident_ci("MIN_IDLE")? {
328 return Err(ParseError::expected(
329 vec!["MIN_IDLE"],
330 self.peek(),
331 self.position(),
332 ));
333 }
334 let min_idle_ms = self.parse_integer()?.max(0) as u64;
335 Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
336 queue,
337 group,
338 consumer,
339 min_idle_ms,
340 }))
341 }
342 Token::Ack => {
343 self.advance()?;
344 let queue = self.expect_ident()?;
345 self.expect(Token::Group)?;
346 let group = self.expect_ident()?;
347 let message_id = self.parse_string()?;
348 Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
349 queue,
350 group,
351 message_id,
352 }))
353 }
354 Token::Nack => {
355 self.advance()?;
356 let queue = self.expect_ident()?;
357 self.expect(Token::Group)?;
358 let group = self.expect_ident()?;
359 let message_id = self.parse_string()?;
360 Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
361 queue,
362 group,
363 message_id,
364 }))
365 }
366 _ => Err(ParseError::expected(
367 vec![
368 "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
369 "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
370 ],
371 self.peek(),
372 self.position(),
373 )),
374 }
375 }
376
377 fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
378 match self.peek() {
379 Token::Work => {
380 self.advance()?;
381 Ok(Some(QueueMode::Work))
382 }
383 Token::Ident(name) => {
384 if let Some(mode) = QueueMode::parse(name) {
385 self.advance()?;
386 Ok(Some(mode))
387 } else {
388 Ok(None)
389 }
390 }
391 _ => Ok(None),
392 }
393 }
394
395 fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
396 match self.consume_queue_mode()? {
397 Some(mode) => Ok(mode),
398 None => Err(ParseError::expected(
399 vec!["FANOUT", "WORK"],
400 self.peek(),
401 self.position(),
402 )),
403 }
404 }
405
406 fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
408 match self.peek().clone() {
409 Token::Ident(ref unit) => {
410 let mult = match unit.to_ascii_lowercase().as_str() {
411 "ms" => 1.0,
412 "s" | "sec" | "secs" => 1_000.0,
413 "m" | "min" | "mins" => 60_000.0,
414 "h" | "hr" | "hrs" => 3_600_000.0,
415 "d" | "day" | "days" => 86_400_000.0,
416 _ => return Ok(1_000.0),
417 };
418 self.advance()?;
419 Ok(mult)
420 }
421 _ => Ok(1_000.0),
422 }
423 }
424}