1use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::storage::queue::QueueMode;
9use crate::storage::unified::entity::{QueueMessageData, RowData};
10use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
11
12use super::*;
13
14pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
21
22pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
24
25pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
27
28const OUTBOX_WARN_BYTES: u64 = 1 << 30;
30
31const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
33
34static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
36
37const QUEUE_META_COLLECTION: &str = "red_queue_meta";
38const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
39const WORK_DEFAULT_GROUP: &str = "_work_default";
40const FANOUT_GROUP_PREFIX: &str = "_fanout_";
41
42#[derive(Debug, Clone)]
43pub(super) struct QueueRuntimeConfig {
44 pub(super) mode: QueueMode,
45 pub(super) priority: bool,
46 pub(super) max_size: Option<usize>,
47 pub(super) ttl_ms: Option<u64>,
48 pub(super) dlq: Option<String>,
49 pub(super) max_attempts: u32,
50}
51
52#[derive(Debug, Clone)]
53struct QueueGroupEntry {
54 entity_id: EntityId,
55 group: String,
56}
57
58#[derive(Debug, Clone)]
59pub(super) struct QueuePendingEntry {
60 pub(super) entity_id: EntityId,
61 group: String,
62 pub(super) message_id: EntityId,
63 consumer: String,
64 pub(super) delivered_at_ns: u64,
65 pub(super) delivery_count: u32,
66}
67
68#[derive(Debug, Clone)]
69pub(super) struct QueueAckEntry {
70 entity_id: EntityId,
71 group: String,
72 pub(super) message_id: EntityId,
73}
74
75#[derive(Debug, Clone)]
76pub(super) struct QueueMessageView {
77 pub(super) id: EntityId,
78 position: u64,
79 priority: i32,
80 pub(super) payload: Value,
81 attempts: u32,
82 pub(super) max_attempts: u32,
83 enqueued_at_ns: u64,
84}
85
86impl RedDBRuntime {
87 pub(crate) fn enqueue_event_payload(
88 &self,
89 queue: &str,
90 payload: Value,
91 ) -> RedDBResult<EntityId> {
92 let store = self.inner.db.store();
93 if store.get_collection(queue).is_none() {
95 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
96 }
97
98 let payload_bytes = estimate_payload_bytes(&payload);
100 let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
101
102 if outbox_bytes > OUTBOX_MAX_BYTES {
104 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
105 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
106 return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
107 }
108
109 if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
111 tracing::warn!(
112 outbox_bytes,
113 warn_threshold = OUTBOX_WARN_BYTES,
114 "event outbox approaching capacity warning threshold"
115 );
116 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
117 queue: queue.to_string(),
118 dlq: format!("{queue}_outbox_dlq"),
119 reason: "outbox_warn_bytes_exceeded".to_string(),
120 }
121 .emit_global();
122 }
123
124 let config = load_queue_config(store.as_ref(), queue);
125
126 if let Some(max_size) = config.max_size {
128 let current_len = load_queue_message_views(store.as_ref(), queue)
129 .unwrap_or_default()
130 .len();
131 if current_len >= max_size {
132 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
133 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
134 return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
135 }
136 if current_len * 10 >= max_size * 8 {
138 tracing::warn!(
139 queue = %queue,
140 size = current_len,
141 max = max_size,
142 "event target queue near capacity"
143 );
144 }
145 }
146
147 let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
148 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
149 Ok(id)
150 }
151
152 fn route_event_to_outbox_dlq(
154 &self,
155 queue: &str,
156 payload: Value,
157 reason: &str,
158 ) -> RedDBResult<EntityId> {
159 let dlq_name = format!("{queue}_outbox_dlq");
160 EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
161
162 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
163 queue: queue.to_string(),
164 dlq: dlq_name.clone(),
165 reason: reason.to_string(),
166 }
167 .emit_global();
168
169 let store = self.inner.db.store();
170 if store.get_collection(&dlq_name).is_none() {
171 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
172 }
173 let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
174 let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
175 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
176 Ok(id)
177 }
178
179 fn enqueue_event_payload_raw(
181 &self,
182 store: &UnifiedStore,
183 queue: &str,
184 config: &QueueRuntimeConfig,
185 payload: Value,
186 ) -> RedDBResult<EntityId> {
187 let position = next_queue_position(store, queue, QueueSide::Right)?;
188 let mut entity = UnifiedEntity::new(
189 EntityId::new(0),
190 EntityKind::QueueMessage {
191 queue: queue.to_string(),
192 position,
193 },
194 EntityData::QueueMessage(QueueMessageData {
195 payload,
196 priority: None,
197 enqueued_at_ns: now_ns(),
198 attempts: 0,
199 max_attempts: config.max_attempts,
200 acked: false,
201 }),
202 );
203 if let Some(xid) = self.current_xid() {
204 entity.set_xmin(xid);
205 }
206 let id = store
207 .insert_auto(queue, entity)
208 .map_err(|err| RedDBError::Internal(err.to_string()))?;
209 if let Some(ttl_ms) = config.ttl_ms {
210 store
211 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
212 .map_err(|err| RedDBError::Internal(err.to_string()))?;
213 }
214 self.invalidate_result_cache_for_table(queue);
215 Ok(id)
216 }
217
218 pub fn execute_create_queue(
219 &self,
220 raw_query: &str,
221 query: &CreateQueueQuery,
222 ) -> RedDBResult<RuntimeQueryResult> {
223 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
224 if query.dlq.as_deref() == Some(query.name.as_str()) {
225 return Err(RedDBError::Query(
226 "dead-letter queue must be different from the source queue".to_string(),
227 ));
228 }
229
230 let store = self.inner.db.store();
231 let exists = store.get_collection(&query.name).is_some();
232 if exists {
233 if query.if_not_exists {
234 return Ok(RuntimeQueryResult::ok_message(
235 raw_query.to_string(),
236 &format!("queue '{}' already exists", query.name),
237 "create",
238 ));
239 }
240 return Err(RedDBError::Query(format!(
241 "queue '{}' already exists",
242 query.name
243 )));
244 }
245
246 store
247 .create_collection(&query.name)
248 .map_err(|err| RedDBError::Internal(err.to_string()))?;
249 if let Some(ttl_ms) = query.ttl_ms {
250 self.inner
251 .db
252 .set_collection_default_ttl_ms(&query.name, ttl_ms);
253 }
254 self.inner
255 .db
256 .save_collection_contract(queue_collection_contract(
257 &query.name,
258 query.priority,
259 query.ttl_ms,
260 ))
261 .map_err(|err| RedDBError::Internal(err.to_string()))?;
262 save_queue_config(
263 store.as_ref(),
264 &query.name,
265 &QueueRuntimeConfig {
266 mode: query.mode,
267 priority: query.priority,
268 max_size: query.max_size,
269 ttl_ms: query.ttl_ms,
270 dlq: query.dlq.clone(),
271 max_attempts: query.max_attempts,
272 },
273 )?;
274
275 if let Some(dlq) = &query.dlq {
276 if store.get_collection(dlq).is_none() {
277 store
278 .create_collection(dlq)
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 self.inner
281 .db
282 .save_collection_contract(queue_collection_contract(dlq, false, None))
283 .map_err(|err| RedDBError::Internal(err.to_string()))?;
284 }
285 }
286
287 self.invalidate_result_cache();
288 self.inner
289 .db
290 .persist_metadata()
291 .map_err(|err| RedDBError::Internal(err.to_string()))?;
292 let mut type_tags = Vec::new();
297 if let Some(dlq) = &query.dlq {
298 type_tags.push(format!("dlq:{}", dlq));
299 }
300 self.schema_vocabulary_apply(
301 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
302 collection: query.name.clone(),
303 columns: vec!["payload".to_string()],
304 type_tags,
305 description: None,
306 },
307 );
308
309 let mut msg = format!("queue '{}' created", query.name);
310 msg.push_str(&format!(" (mode={})", query.mode.as_str()));
311 if query.priority {
312 msg.push_str(" (priority)");
313 }
314 if let Some(max_size) = query.max_size {
315 msg.push_str(&format!(" (max_size={max_size})"));
316 }
317 if let Some(ttl_ms) = query.ttl_ms {
318 msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
319 }
320 if let Some(dlq) = &query.dlq {
321 msg.push_str(&format!(
322 " (dlq={dlq}, max_attempts={})",
323 query.max_attempts
324 ));
325 }
326
327 Ok(RuntimeQueryResult::ok_message(
328 raw_query.to_string(),
329 &msg,
330 "create",
331 ))
332 }
333
334 pub fn execute_alter_queue(
335 &self,
336 raw_query: &str,
337 query: &AlterQueueQuery,
338 ) -> RedDBResult<RuntimeQueryResult> {
339 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
340 let store = self.inner.db.store();
341 ensure_queue_exists(store.as_ref(), &query.name)?;
342
343 let pending =
344 load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
345 if !pending.is_empty() {
346 tracing::warn!(
347 queue = %query.name,
348 pending_count = pending.len(),
349 new_mode = %query.mode.as_str(),
350 "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
351 new reads use {}",
352 pending.len(),
353 query.mode.as_str(),
354 );
355 }
356
357 let mut config = load_queue_config(store.as_ref(), &query.name);
358 config.mode = query.mode;
359 save_queue_config(store.as_ref(), &query.name, &config)?;
360
361 self.invalidate_result_cache();
362 self.inner
363 .db
364 .persist_metadata()
365 .map_err(|err| RedDBError::Internal(err.to_string()))?;
366
367 Ok(RuntimeQueryResult::ok_message(
368 raw_query.to_string(),
369 &format!("queue '{}' mode set to {}", query.name, query.mode.as_str()),
370 "alter",
371 ))
372 }
373
374 pub fn execute_drop_queue(
375 &self,
376 raw_query: &str,
377 query: &DropQueueQuery,
378 ) -> RedDBResult<RuntimeQueryResult> {
379 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
380 let store = self.inner.db.store();
381 if super::impl_ddl::is_system_schema_name(&query.name) {
382 return Err(RedDBError::Query("system schema is read-only".to_string()));
383 }
384 if store.get_collection(&query.name).is_none() {
385 if query.if_exists {
386 return Ok(RuntimeQueryResult::ok_message(
387 raw_query.to_string(),
388 &format!("queue '{}' does not exist", query.name),
389 "drop",
390 ));
391 }
392 return Err(RedDBError::NotFound(format!(
393 "queue '{}' not found",
394 query.name
395 )));
396 }
397 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
398 &query.name,
399 &self.inner.db.catalog_model_snapshot(),
400 )?;
401 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
402 crate::catalog::CollectionModel::Queue,
403 actual,
404 )?;
405
406 store
407 .drop_collection(&query.name)
408 .map_err(|err| RedDBError::Internal(err.to_string()))?;
409 self.inner.db.clear_collection_default_ttl_ms(&query.name);
410 self.inner
411 .db
412 .remove_collection_contract(&query.name)
413 .map_err(|err| RedDBError::Internal(err.to_string()))?;
414 remove_queue_metadata(store.as_ref(), &query.name);
415 self.invalidate_result_cache();
416 self.inner
417 .db
418 .persist_metadata()
419 .map_err(|err| RedDBError::Internal(err.to_string()))?;
420 self.schema_vocabulary_apply(
422 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
423 collection: query.name.clone(),
424 },
425 );
426
427 Ok(RuntimeQueryResult::ok_message(
428 raw_query.to_string(),
429 &format!("queue '{}' dropped", query.name),
430 "drop",
431 ))
432 }
433
434 pub fn execute_queue_command(
435 &self,
436 raw_query: &str,
437 cmd: &QueueCommand,
438 ) -> RedDBResult<RuntimeQueryResult> {
439 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
440 match cmd {
441 QueueCommand::Push {
442 queue,
443 value,
444 side,
445 priority,
446 } => {
447 let store = self.inner.db.store();
448 ensure_queue_exists(store.as_ref(), queue)?;
449 let config = load_queue_config(store.as_ref(), queue);
450 if priority.is_some() && !config.priority {
451 return Err(RedDBError::Query(format!(
452 "queue '{}' is not a priority queue",
453 queue
454 )));
455 }
456 if let Some(max_size) = config.max_size {
457 let current_len =
458 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
459 .len();
460 if current_len >= max_size {
461 return Err(RedDBError::Query(format!(
462 "queue '{}' is full (max_size={max_size})",
463 queue
464 )));
465 }
466 }
467
468 let position = next_queue_position(store.as_ref(), queue, *side)?;
469 let mut entity = UnifiedEntity::new(
470 EntityId::new(0),
471 EntityKind::QueueMessage {
472 queue: queue.clone(),
473 position,
474 },
475 EntityData::QueueMessage(QueueMessageData {
476 payload: value.clone(),
477 priority: if config.priority { *priority } else { None },
478 enqueued_at_ns: now_ns(),
479 attempts: 0,
480 max_attempts: config.max_attempts,
481 acked: false,
482 }),
483 );
484 if let Some(xid) = self.current_xid() {
487 entity.set_xmin(xid);
488 }
489 let id = store
490 .insert_auto(queue, entity)
491 .map_err(|err| RedDBError::Internal(err.to_string()))?;
492 if let Some(ttl_ms) = config.ttl_ms {
493 store
494 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
495 .map_err(|err| RedDBError::Internal(err.to_string()))?;
496 }
497 self.invalidate_result_cache();
498
499 let mut result = UnifiedResult::with_columns(vec![
500 "message_id".into(),
501 "side".into(),
502 "queue".into(),
503 ]);
504 let mut record = UnifiedRecord::new();
505 record.set("message_id", Value::text(message_id_string(id)));
506 record.set(
507 "side",
508 Value::text(match side {
509 QueueSide::Left => "left".to_string(),
510 QueueSide::Right => "right".to_string(),
511 }),
512 );
513 record.set("queue", Value::text(queue.clone()));
514 result.push(record);
515
516 Ok(RuntimeQueryResult {
517 query: raw_query.to_string(),
518 mode: QueryMode::Sql,
519 statement: "queue_push",
520 engine: "runtime-queue",
521 result,
522 affected_rows: 1,
523 statement_type: "insert",
524 })
525 }
526 QueueCommand::Pop { queue, side, count } => {
527 let store = self.inner.db.store();
528 ensure_queue_exists(store.as_ref(), queue)?;
529 let popped = super::queue_delivery::pop_messages(
530 self,
531 store.as_ref(),
532 queue,
533 *side,
534 *count,
535 )?;
536
537 let mut result =
538 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
539 for message in &popped {
540 let mut record = UnifiedRecord::new();
541 record.set(
542 "message_id",
543 Value::text(message_id_string(message.message_id)),
544 );
545 record.set("payload", message.payload.clone());
546 result.push(record);
547 }
548 let popped_count = popped.len() as u64;
549 if popped_count > 0 {
550 self.invalidate_result_cache();
551 }
552
553 Ok(RuntimeQueryResult {
554 query: raw_query.to_string(),
555 mode: QueryMode::Sql,
556 statement: "queue_pop",
557 engine: "runtime-queue",
558 result,
559 affected_rows: popped_count,
560 statement_type: "delete",
561 })
562 }
563 QueueCommand::Peek { queue, count } => {
564 let store = self.inner.db.store();
565 ensure_queue_exists(store.as_ref(), queue)?;
566 let messages =
567 super::queue_delivery::peek_messages(self, store.as_ref(), queue, *count)?;
568
569 let mut result =
570 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
571 for message in messages {
572 let mut record = UnifiedRecord::new();
573 record.set(
574 "message_id",
575 Value::text(message_id_string(message.message_id)),
576 );
577 record.set("payload", message.payload);
578 result.push(record);
579 }
580
581 Ok(RuntimeQueryResult {
582 query: raw_query.to_string(),
583 mode: QueryMode::Sql,
584 statement: "queue_peek",
585 engine: "runtime-queue",
586 result,
587 affected_rows: 0,
588 statement_type: "select",
589 })
590 }
591 QueueCommand::Len { queue } => {
592 let store = self.inner.db.store();
593 ensure_queue_exists(store.as_ref(), queue)?;
594 let count =
595 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
596 as u64;
597 let mut result = UnifiedResult::with_columns(vec!["len".into()]);
598 let mut record = UnifiedRecord::new();
599 record.set("len", Value::UnsignedInteger(count));
600 result.push(record);
601
602 Ok(RuntimeQueryResult {
603 query: raw_query.to_string(),
604 mode: QueryMode::Sql,
605 statement: "queue_len",
606 engine: "runtime-queue",
607 result,
608 affected_rows: 0,
609 statement_type: "select",
610 })
611 }
612 QueueCommand::Purge { queue } => {
613 let store = self.inner.db.store();
614 ensure_queue_exists(store.as_ref(), queue)?;
615 let count = super::queue_delivery::purge_messages(self, store.as_ref(), queue)?;
616 if count > 0 {
617 self.invalidate_result_cache();
618 }
619
620 Ok(RuntimeQueryResult::ok_message(
621 raw_query.to_string(),
622 &format!("{count} messages purged from queue '{queue}'"),
623 "delete",
624 ))
625 }
626 QueueCommand::GroupCreate { queue, group } => {
627 let store = self.inner.db.store();
628 ensure_queue_exists(store.as_ref(), queue)?;
629 if queue_group_exists(store.as_ref(), queue, group)? {
630 return Ok(RuntimeQueryResult::ok_message(
631 raw_query.to_string(),
632 &format!(
633 "consumer group '{}' already exists on queue '{}'",
634 group, queue
635 ),
636 "create",
637 ));
638 }
639 save_queue_group(store.as_ref(), queue, group)?;
640 self.invalidate_result_cache();
641
642 Ok(RuntimeQueryResult::ok_message(
643 raw_query.to_string(),
644 &format!("consumer group '{}' created on queue '{}'", group, queue),
645 "create",
646 ))
647 }
648 QueueCommand::GroupRead {
649 queue,
650 group,
651 consumer,
652 count,
653 } => {
654 let store = self.inner.db.store();
655 ensure_queue_exists(store.as_ref(), queue)?;
656 let delivered = super::queue_delivery::read_messages(
657 self,
658 store.as_ref(),
659 queue,
660 group.as_deref(),
661 consumer,
662 *count,
663 )?;
664
665 let mut result = UnifiedResult::with_columns(vec![
666 "message_id".into(),
667 "payload".into(),
668 "consumer".into(),
669 "delivery_count".into(),
670 "attempts".into(),
671 ]);
672
673 for message in delivered {
674 let mut record = UnifiedRecord::new();
675 record.set(
676 "message_id",
677 Value::text(message_id_string(message.message_id)),
678 );
679 record.set("payload", message.payload);
680 record.set("consumer", Value::text(message.consumer));
681 record.set(
682 "delivery_count",
683 Value::UnsignedInteger(u64::from(message.delivery_count)),
684 );
685 record.set(
686 "attempts",
687 Value::UnsignedInteger(u64::from(message.delivery_count)),
688 );
689 result.push(record);
690 }
691 if !result.records.is_empty() {
692 self.invalidate_result_cache();
693 }
694
695 Ok(RuntimeQueryResult {
696 query: raw_query.to_string(),
697 mode: QueryMode::Sql,
698 statement: "queue_group_read",
699 engine: "runtime-queue",
700 result,
701 affected_rows: 0,
702 statement_type: "select",
703 })
704 }
705 QueueCommand::Pending { queue, group } => {
706 let store = self.inner.db.store();
707 ensure_queue_exists(store.as_ref(), queue)?;
708 require_queue_group(store.as_ref(), queue, group)?;
709 let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
710 pending.sort_by_key(|entry| entry.delivered_at_ns);
711 let current_time_ns = now_ns();
712
713 let mut result = UnifiedResult::with_columns(vec![
714 "message_id".into(),
715 "consumer".into(),
716 "delivered_at_ns".into(),
717 "delivery_count".into(),
718 "idle_ms".into(),
719 ]);
720 for entry in pending {
721 let mut record = UnifiedRecord::new();
722 record.set(
723 "message_id",
724 Value::text(message_id_string(entry.message_id)),
725 );
726 record.set("consumer", Value::text(entry.consumer));
727 record.set(
728 "delivered_at_ns",
729 Value::UnsignedInteger(entry.delivered_at_ns),
730 );
731 record.set(
732 "delivery_count",
733 Value::UnsignedInteger(u64::from(entry.delivery_count)),
734 );
735 record.set(
736 "idle_ms",
737 Value::UnsignedInteger(
738 current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
739 ),
740 );
741 result.push(record);
742 }
743
744 Ok(RuntimeQueryResult {
745 query: raw_query.to_string(),
746 mode: QueryMode::Sql,
747 statement: "queue_pending",
748 engine: "runtime-queue",
749 result,
750 affected_rows: 0,
751 statement_type: "select",
752 })
753 }
754 QueueCommand::Claim {
755 queue,
756 group,
757 consumer,
758 min_idle_ms,
759 } => {
760 let store = self.inner.db.store();
761 ensure_queue_exists(store.as_ref(), queue)?;
762 let delivered = super::queue_delivery::claim_messages(
763 self,
764 store.as_ref(),
765 queue,
766 group,
767 consumer,
768 *min_idle_ms,
769 )?;
770
771 let mut result = UnifiedResult::with_columns(vec![
772 "message_id".into(),
773 "payload".into(),
774 "consumer".into(),
775 "delivery_count".into(),
776 ]);
777
778 for message in delivered {
779 let mut record = UnifiedRecord::new();
780 record.set(
781 "message_id",
782 Value::text(message_id_string(message.message_id)),
783 );
784 record.set("payload", message.payload);
785 record.set("consumer", Value::text(message.consumer));
786 record.set(
787 "delivery_count",
788 Value::UnsignedInteger(u64::from(message.delivery_count)),
789 );
790 result.push(record);
791 }
792 if !result.records.is_empty() {
793 self.invalidate_result_cache();
794 }
795 let affected_rows = result.records.len() as u64;
796
797 Ok(RuntimeQueryResult {
798 query: raw_query.to_string(),
799 mode: QueryMode::Sql,
800 statement: "queue_claim",
801 engine: "runtime-queue",
802 result,
803 affected_rows,
804 statement_type: "update",
805 })
806 }
807 QueueCommand::Ack {
808 queue,
809 group,
810 message_id,
811 } => {
812 let store = self.inner.db.store();
813 ensure_queue_exists(store.as_ref(), queue)?;
814 require_queue_group(store.as_ref(), queue, group)?;
815 let message_id = parse_message_id(message_id)?;
816 let config = load_queue_config(store.as_ref(), queue);
817 super::queue_delivery::ack_message(
818 self,
819 store.as_ref(),
820 queue,
821 group,
822 message_id,
823 &config,
824 )?;
825 self.invalidate_result_cache();
826
827 Ok(RuntimeQueryResult::ok_message(
828 raw_query.to_string(),
829 "message acknowledged",
830 "update",
831 ))
832 }
833 QueueCommand::Nack {
834 queue,
835 group,
836 message_id,
837 } => {
838 let store = self.inner.db.store();
839 ensure_queue_exists(store.as_ref(), queue)?;
840 require_queue_group(store.as_ref(), queue, group)?;
841 let message_id = parse_message_id(message_id)?;
842 let config = load_queue_config(store.as_ref(), queue);
843 let message = match super::queue_delivery::nack_message(
844 self,
845 store.as_ref(),
846 queue,
847 group,
848 message_id,
849 &config,
850 )? {
851 super::queue_delivery::NackOutcome::Requeued => "message requeued".to_string(),
852 super::queue_delivery::NackOutcome::MovedToDlq(dlq) => {
853 format!("message moved to dead-letter queue '{}'", dlq)
854 }
855 super::queue_delivery::NackOutcome::Dropped => {
856 "message dropped after max attempts".to_string()
857 }
858 };
859 self.invalidate_result_cache();
860
861 Ok(RuntimeQueryResult::ok_message(
862 raw_query.to_string(),
863 &message,
864 "update",
865 ))
866 }
867 QueueCommand::Move {
868 source,
869 destination,
870 filter,
871 limit,
872 } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
873 }
874 }
875
876 pub fn execute_queue_select(
877 &self,
878 raw_query: &str,
879 query: &QueueSelectQuery,
880 ) -> RedDBResult<RuntimeQueryResult> {
881 let store = self.inner.db.store();
882 ensure_queue_exists(store.as_ref(), &query.queue)?;
883 let config = load_queue_config(store.as_ref(), &query.queue);
884 let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
885 let columns = if query.columns.is_empty() {
886 queue_projection_default_columns()
887 } else {
888 query.columns.clone()
889 };
890
891 let mut messages =
892 load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
893 sort_queue_messages(&mut messages, &config, QueueSide::Left);
894
895 let mut result = UnifiedResult::with_columns(columns.clone());
896 for message in messages {
897 if query
898 .filter
899 .as_ref()
900 .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
901 {
902 continue;
903 }
904 let record = queue_projection_record(&columns, &message, dlq)?;
905 result.push(record);
906 if query
907 .limit
908 .is_some_and(|limit| result.records.len() >= limit as usize)
909 {
910 break;
911 }
912 }
913
914 Ok(RuntimeQueryResult {
915 query: raw_query.to_string(),
916 mode: QueryMode::Sql,
917 statement: "queue_select",
918 engine: "runtime-queue",
919 result,
920 affected_rows: 0,
921 statement_type: "select",
922 })
923 }
924
925 fn execute_queue_move(
926 &self,
927 raw_query: &str,
928 source: &str,
929 destination: &str,
930 filter: Option<&Filter>,
931 limit: usize,
932 ) -> RedDBResult<RuntimeQueryResult> {
933 if source == destination {
934 return Err(RedDBError::Query(
935 "QUEUE MOVE source and destination must be different".to_string(),
936 ));
937 }
938 let store = self.inner.db.store();
939 ensure_queue_exists(store.as_ref(), source)?;
940 ensure_queue_exists(store.as_ref(), destination)?;
941 let source_config = load_queue_config(store.as_ref(), source);
942 let destination_config = load_queue_config(store.as_ref(), destination);
943 let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
944
945 let mut messages =
946 load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
947 sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
948 let selected = messages
949 .into_iter()
950 .filter(|message| {
951 filter
952 .map(|f| queue_message_matches_filter(message, source_dlq, f))
953 .unwrap_or(true)
954 })
955 .take(limit)
956 .collect::<Vec<_>>();
957
958 if let Some(max_size) = destination_config.max_size {
959 let current_len =
960 load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
961 .len();
962 if current_len + selected.len() > max_size {
963 return Err(RedDBError::Query(format!(
964 "queue '{}' is full (max_size={max_size})",
965 destination
966 )));
967 }
968 }
969
970 for message in &selected {
971 let lock = queue_message_lock_handle(self, source, message.id);
972 let Some(_guard) = lock.try_lock() else {
973 return Err(RedDBError::Query(format!(
974 "message '{}' is locked on queue '{}'",
975 message.id.raw(),
976 source
977 )));
978 };
979 if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
980 return Err(RedDBError::Query(format!(
981 "message '{}' is no longer available on queue '{}'",
982 message.id.raw(),
983 source
984 )));
985 }
986 }
987
988 let mut inserted = Vec::new();
989 for message in &selected {
990 match insert_moved_queue_message(
991 store.as_ref(),
992 destination,
993 &destination_config,
994 message,
995 ) {
996 Ok(id) => inserted.push(id),
997 Err(err) => {
998 for id in inserted {
999 let _ = store.delete(destination, id);
1000 }
1001 return Err(err);
1002 }
1003 }
1004 }
1005
1006 for message in &selected {
1007 super::queue_delivery::delete_message_with_state(
1008 Some(self),
1009 store.as_ref(),
1010 source,
1011 message.id,
1012 )?;
1013 }
1014 if !selected.is_empty() {
1015 self.invalidate_result_cache();
1016 }
1017
1018 let selected_count = selected.len() as u64;
1019 self.audit_log().record_event(
1020 AuditEvent::builder("queue/move")
1021 .source(AuditAuthSource::System)
1022 .outcome(Outcome::Success)
1023 .resource(format!("queue:{source}->{destination}"))
1024 .fields([
1025 AuditFieldEscaper::field("source", source),
1026 AuditFieldEscaper::field("destination", destination),
1027 AuditFieldEscaper::field("selected", selected_count),
1028 AuditFieldEscaper::field("committed", selected_count),
1029 ])
1030 .build(),
1031 );
1032
1033 let mut result = UnifiedResult::with_columns(vec![
1034 "source".into(),
1035 "destination".into(),
1036 "selected".into(),
1037 "committed".into(),
1038 ]);
1039 let mut record = UnifiedRecord::new();
1040 record.set("source", Value::text(source.to_string()));
1041 record.set("destination", Value::text(destination.to_string()));
1042 record.set("selected", Value::UnsignedInteger(selected_count));
1043 record.set("committed", Value::UnsignedInteger(selected_count));
1044 result.push(record);
1045
1046 Ok(RuntimeQueryResult {
1047 query: raw_query.to_string(),
1048 mode: QueryMode::Sql,
1049 statement: "queue_move",
1050 engine: "runtime-queue",
1051 result,
1052 affected_rows: selected_count,
1053 statement_type: "update",
1054 })
1055 }
1056}
1057
1058fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1059 if store.get_collection(queue).is_some() {
1060 Ok(())
1061 } else {
1062 Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1063 }
1064}
1065
1066pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1067 let default = QueueRuntimeConfig {
1068 mode: QueueMode::Work,
1069 priority: false,
1070 max_size: None,
1071 ttl_ms: None,
1072 dlq: None,
1073 max_attempts: 3,
1074 };
1075
1076 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1077 return default;
1078 };
1079 manager
1080 .query_all(|entity| {
1081 entity.data.as_row().is_some_and(|row| {
1082 row_text(row, "kind").as_deref() == Some("queue_config")
1083 && row_text(row, "queue").as_deref() == Some(queue)
1084 })
1085 })
1086 .into_iter()
1087 .find_map(|entity| {
1088 let row = entity.data.as_row()?;
1089 Some(QueueRuntimeConfig {
1090 mode: row_text(row, "mode")
1091 .as_deref()
1092 .and_then(QueueMode::parse)
1093 .unwrap_or_default(),
1094 priority: row_bool(row, "priority").unwrap_or(false),
1095 max_size: row_u64(row, "max_size").map(|value| value as usize),
1096 ttl_ms: row_u64(row, "ttl_ms"),
1097 dlq: row_text(row, "dlq"),
1098 max_attempts: row_u64(row, "max_attempts")
1099 .map(|value| value as u32)
1100 .unwrap_or(3),
1101 })
1102 })
1103 .unwrap_or(default)
1104}
1105
1106pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1107 load_queue_config(store, queue).mode.as_str()
1108}
1109
1110fn save_queue_config(
1111 store: &UnifiedStore,
1112 queue: &str,
1113 config: &QueueRuntimeConfig,
1114) -> RedDBResult<()> {
1115 remove_meta_rows(store, |row| {
1116 row_text(row, "kind").as_deref() == Some("queue_config")
1117 && row_text(row, "queue").as_deref() == Some(queue)
1118 });
1119
1120 let mut fields = HashMap::new();
1121 fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1122 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1123 fields.insert(
1124 "mode".to_string(),
1125 Value::text(config.mode.as_str().to_string()),
1126 );
1127 fields.insert("priority".to_string(), Value::Boolean(config.priority));
1128 fields.insert(
1129 "max_size".to_string(),
1130 config
1131 .max_size
1132 .map(|value| Value::UnsignedInteger(value as u64))
1133 .unwrap_or(Value::Null),
1134 );
1135 fields.insert(
1136 "ttl_ms".to_string(),
1137 config
1138 .ttl_ms
1139 .map(Value::UnsignedInteger)
1140 .unwrap_or(Value::Null),
1141 );
1142 fields.insert(
1143 "dlq".to_string(),
1144 config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1145 );
1146 fields.insert(
1147 "max_attempts".to_string(),
1148 Value::UnsignedInteger(u64::from(config.max_attempts)),
1149 );
1150 insert_meta_row(store, fields)
1151}
1152
1153fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1154 remove_meta_rows(store, |row| {
1155 row_text(row, "queue").as_deref() == Some(queue)
1156 });
1157}
1158
1159fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1160 Ok(load_queue_groups(store, queue)?
1161 .into_iter()
1162 .any(|entry| entry.group == group))
1163}
1164
1165pub(super) fn require_queue_group(
1166 store: &UnifiedStore,
1167 queue: &str,
1168 group: &str,
1169) -> RedDBResult<()> {
1170 if queue_group_exists(store, queue, group)? {
1171 Ok(())
1172 } else {
1173 Err(RedDBError::NotFound(format!(
1174 "consumer group '{}' not found on queue '{}'",
1175 group, queue
1176 )))
1177 }
1178}
1179
1180pub(super) fn resolve_read_group(
1181 store: &UnifiedStore,
1182 queue: &str,
1183 group: Option<&str>,
1184 consumer: &str,
1185 config: &QueueRuntimeConfig,
1186) -> RedDBResult<String> {
1187 if let Some(group) = group {
1188 require_queue_group(store, queue, group)?;
1189 return Ok(group.to_string());
1190 }
1191
1192 match config.mode {
1193 QueueMode::Work => {
1194 if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1195 save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1196 }
1197 Ok(WORK_DEFAULT_GROUP.to_string())
1198 }
1199 QueueMode::Fanout => {
1200 let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1201 if !queue_group_exists(store, queue, &fanout_group)? {
1202 save_queue_group(store, queue, &fanout_group)?;
1203 }
1204 Ok(fanout_group)
1205 }
1206 }
1207}
1208
1209fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1210 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1211 return Ok(Vec::new());
1212 };
1213 Ok(manager
1214 .query_all(|entity| {
1215 entity.data.as_row().is_some_and(|row| {
1216 row_text(row, "kind").as_deref() == Some("queue_group")
1217 && row_text(row, "queue").as_deref() == Some(queue)
1218 })
1219 })
1220 .into_iter()
1221 .filter_map(|entity| {
1222 let row = entity.data.as_row()?;
1223 Some(QueueGroupEntry {
1224 entity_id: entity.id,
1225 group: row_text(row, "group")?,
1226 })
1227 })
1228 .collect())
1229}
1230
1231fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1232 let mut fields = HashMap::new();
1233 fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1234 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1235 fields.insert("group".to_string(), Value::text(group.to_string()));
1236 fields.insert(
1237 "created_at_ns".to_string(),
1238 Value::UnsignedInteger(now_ns()),
1239 );
1240 insert_meta_row(store, fields)
1241}
1242
1243pub(super) fn load_pending_entries(
1244 store: &UnifiedStore,
1245 queue: &str,
1246 group: Option<&str>,
1247 message_id: Option<EntityId>,
1248) -> RedDBResult<Vec<QueuePendingEntry>> {
1249 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1250 return Ok(Vec::new());
1251 };
1252 Ok(manager
1253 .query_all(|entity| {
1254 entity.data.as_row().is_some_and(|row| {
1255 row_text(row, "kind").as_deref() == Some("queue_pending")
1256 && row_text(row, "queue").as_deref() == Some(queue)
1257 && group
1258 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1259 .unwrap_or(true)
1260 && message_id
1261 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1262 .unwrap_or(true)
1263 })
1264 })
1265 .into_iter()
1266 .filter_map(|entity| {
1267 let row = entity.data.as_row()?;
1268 Some(QueuePendingEntry {
1269 entity_id: entity.id,
1270 group: row_text(row, "group")?,
1271 message_id: EntityId::new(row_u64(row, "message_id")?),
1272 consumer: row_text(row, "consumer")?,
1273 delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1274 delivery_count: row_u64(row, "delivery_count")
1275 .map(|value| value as u32)
1276 .unwrap_or(1),
1277 })
1278 })
1279 .collect())
1280}
1281
1282pub(super) fn save_queue_pending(
1283 store: &UnifiedStore,
1284 queue: &str,
1285 group: &str,
1286 message_id: EntityId,
1287 consumer: &str,
1288 delivered_at_ns: u64,
1289 delivery_count: u32,
1290) -> RedDBResult<()> {
1291 remove_meta_rows(store, |row| {
1292 row_text(row, "kind").as_deref() == Some("queue_pending")
1293 && row_text(row, "queue").as_deref() == Some(queue)
1294 && row_text(row, "group").as_deref() == Some(group)
1295 && row_u64(row, "message_id") == Some(message_id.raw())
1296 });
1297
1298 let mut fields = HashMap::new();
1299 fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1300 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1301 fields.insert("group".to_string(), Value::text(group.to_string()));
1302 fields.insert(
1303 "message_id".to_string(),
1304 Value::UnsignedInteger(message_id.raw()),
1305 );
1306 fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1307 fields.insert(
1308 "delivered_at_ns".to_string(),
1309 Value::UnsignedInteger(delivered_at_ns),
1310 );
1311 fields.insert(
1312 "delivery_count".to_string(),
1313 Value::UnsignedInteger(u64::from(delivery_count)),
1314 );
1315 insert_meta_row(store, fields)
1316}
1317
1318pub(super) fn require_pending_entry(
1319 store: &UnifiedStore,
1320 queue: &str,
1321 group: &str,
1322 message_id: EntityId,
1323) -> RedDBResult<QueuePendingEntry> {
1324 load_pending_entries(store, queue, Some(group), Some(message_id))?
1325 .into_iter()
1326 .next()
1327 .ok_or_else(|| {
1328 RedDBError::NotFound(format!(
1329 "message '{}' is not pending in group '{}' on queue '{}'",
1330 message_id.raw(),
1331 group,
1332 queue
1333 ))
1334 })
1335}
1336
1337pub(super) fn load_ack_entries(
1338 store: &UnifiedStore,
1339 queue: &str,
1340 group: Option<&str>,
1341 message_id: Option<EntityId>,
1342) -> RedDBResult<Vec<QueueAckEntry>> {
1343 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1344 return Ok(Vec::new());
1345 };
1346 Ok(manager
1347 .query_all(|entity| {
1348 entity.data.as_row().is_some_and(|row| {
1349 row_text(row, "kind").as_deref() == Some("queue_ack")
1350 && row_text(row, "queue").as_deref() == Some(queue)
1351 && group
1352 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1353 .unwrap_or(true)
1354 && message_id
1355 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1356 .unwrap_or(true)
1357 })
1358 })
1359 .into_iter()
1360 .filter_map(|entity| {
1361 let row = entity.data.as_row()?;
1362 Some(QueueAckEntry {
1363 entity_id: entity.id,
1364 group: row_text(row, "group")?,
1365 message_id: EntityId::new(row_u64(row, "message_id")?),
1366 })
1367 })
1368 .collect())
1369}
1370
1371pub(super) fn save_queue_ack(
1372 store: &UnifiedStore,
1373 queue: &str,
1374 group: &str,
1375 message_id: EntityId,
1376) -> RedDBResult<()> {
1377 let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1378 if !existing.is_empty() {
1379 return Ok(());
1380 }
1381
1382 let mut fields = HashMap::new();
1383 fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1384 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1385 fields.insert("group".to_string(), Value::text(group.to_string()));
1386 fields.insert(
1387 "message_id".to_string(),
1388 Value::UnsignedInteger(message_id.raw()),
1389 );
1390 fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1391 insert_meta_row(store, fields)
1392}
1393
1394pub(super) fn queue_message_completed_for_all_groups(
1395 store: &UnifiedStore,
1396 queue: &str,
1397 message_id: EntityId,
1398) -> RedDBResult<bool> {
1399 let groups = load_queue_groups(store, queue)?;
1400 let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1401 if !pending.is_empty() {
1402 return Ok(false);
1403 }
1404 if groups.is_empty() {
1405 return Ok(true);
1406 }
1407
1408 let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1409 .into_iter()
1410 .map(|entry| entry.group)
1411 .collect::<HashSet<_>>();
1412 Ok(groups
1413 .into_iter()
1414 .all(|group| acked_groups.contains(&group.group)))
1415}
1416
1417fn load_queue_message_views(
1418 store: &UnifiedStore,
1419 queue: &str,
1420) -> RedDBResult<Vec<QueueMessageView>> {
1421 load_queue_message_views_with_runtime(None, store, queue)
1422}
1423
1424pub(super) fn load_queue_message_views_with_runtime(
1431 runtime: Option<&RedDBRuntime>,
1432 store: &UnifiedStore,
1433 queue: &str,
1434) -> RedDBResult<Vec<QueueMessageView>> {
1435 let manager = store
1436 .get_collection(queue)
1437 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1438 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1442 let rls_filter = runtime.and_then(|rt| {
1443 crate::runtime::impl_core::rls_policy_filter_for_kind(
1444 rt,
1445 queue,
1446 crate::storage::query::ast::PolicyAction::Select,
1447 crate::storage::query::ast::PolicyTargetKind::Messages,
1448 )
1449 });
1450 let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1451 && rls_filter.is_none()
1452 && runtime.is_some();
1453 if rls_enabled_but_denied {
1454 return Ok(Vec::new());
1456 }
1457 let filter_arc = rls_filter.map(std::sync::Arc::new);
1458 let rt_arc = runtime;
1459 Ok(manager
1460 .query_all(move |entity| {
1461 if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1462 return false;
1463 }
1464 if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1465 return false;
1466 }
1467 if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1468 return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1469 Some(&rt.inner.db),
1470 entity,
1471 filter,
1472 queue,
1473 queue,
1474 );
1475 }
1476 true
1477 })
1478 .into_iter()
1479 .filter_map(queue_message_view_from_entity)
1480 .collect())
1481}
1482
1483fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1484 let (position, _) = match &entity.kind {
1485 EntityKind::QueueMessage { position, queue } => (*position, queue),
1486 _ => return None,
1487 };
1488 let data = match entity.data {
1489 EntityData::QueueMessage(data) => data,
1490 _ => return None,
1491 };
1492 Some(QueueMessageView {
1493 id: entity.id,
1494 position,
1495 priority: data.priority.unwrap_or(0),
1496 payload: data.payload,
1497 attempts: data.attempts,
1498 max_attempts: data.max_attempts,
1499 enqueued_at_ns: data.enqueued_at_ns,
1500 })
1501}
1502
1503fn insert_moved_queue_message(
1504 store: &UnifiedStore,
1505 queue: &str,
1506 config: &QueueRuntimeConfig,
1507 message: &QueueMessageView,
1508) -> RedDBResult<EntityId> {
1509 let position = next_queue_position(store, queue, QueueSide::Right)?;
1510 let entity = UnifiedEntity::new(
1511 EntityId::new(0),
1512 EntityKind::QueueMessage {
1513 queue: queue.to_string(),
1514 position,
1515 },
1516 EntityData::QueueMessage(QueueMessageData {
1517 payload: message.payload.clone(),
1518 priority: if config.priority {
1519 Some(message.priority)
1520 } else {
1521 None
1522 },
1523 enqueued_at_ns: message.enqueued_at_ns,
1524 attempts: message.attempts,
1525 max_attempts: message.max_attempts,
1526 acked: false,
1527 }),
1528 );
1529 let id = store
1530 .insert_auto(queue, entity)
1531 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1532 if let Some(ttl_ms) = config.ttl_ms {
1533 store
1534 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1535 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1536 }
1537 Ok(id)
1538}
1539
1540fn queue_projection_default_columns() -> Vec<String> {
1541 [
1542 "id",
1543 "payload",
1544 "priority",
1545 "attempts",
1546 "last_error",
1547 "enqueued_at",
1548 "available_at",
1549 "dlq",
1550 "tenant",
1551 ]
1552 .into_iter()
1553 .map(str::to_string)
1554 .collect()
1555}
1556
1557fn queue_projection_record(
1558 columns: &[String],
1559 message: &QueueMessageView,
1560 dlq: bool,
1561) -> RedDBResult<UnifiedRecord> {
1562 let mut record = UnifiedRecord::new();
1563 for column in columns {
1564 let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
1565 RedDBError::Query(format!("unknown queue projection column '{}'", column))
1566 })?;
1567 record.set(column, value);
1568 }
1569 Ok(record)
1570}
1571
1572fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
1573 match column {
1574 "id" => Some(Value::text(message_id_string(message.id))),
1575 "payload" => Some(message.payload.clone()),
1576 "priority" => Some(Value::Integer(i64::from(message.priority))),
1577 "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
1578 "last_error" => Some(Value::Null),
1579 "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1580 "available_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1581 "dlq" => Some(Value::Boolean(dlq)),
1582 "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
1583 _ => None,
1584 }
1585}
1586
1587fn queue_message_tenant(payload: &Value) -> Option<Value> {
1588 let Value::Json(bytes) = payload else {
1589 return None;
1590 };
1591 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1592 json.get("tenant")
1593 .and_then(crate::json::Value::as_str)
1594 .map(|tenant| Value::text(tenant.to_string()))
1595}
1596
1597fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
1598 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1599 return false;
1600 };
1601 !manager
1602 .query_all(|entity| {
1603 entity.data.as_row().is_some_and(|row| {
1604 row_text(row, "kind").as_deref() == Some("queue_config")
1605 && row_text(row, "dlq").as_deref() == Some(queue)
1606 })
1607 })
1608 .is_empty()
1609}
1610
1611fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
1612 match filter {
1613 Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
1614 .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
1615 Filter::CompareFields { left, op, right } => {
1616 match (
1617 queue_filter_field_value(message, dlq, left),
1618 queue_filter_field_value(message, dlq, right),
1619 ) {
1620 (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
1621 _ => false,
1622 }
1623 }
1624 Filter::And(left, right) => {
1625 queue_message_matches_filter(message, dlq, left)
1626 && queue_message_matches_filter(message, dlq, right)
1627 }
1628 Filter::Or(left, right) => {
1629 queue_message_matches_filter(message, dlq, left)
1630 || queue_message_matches_filter(message, dlq, right)
1631 }
1632 Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
1633 Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
1634 .is_none_or(|value| matches!(value, Value::Null)),
1635 Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
1636 .is_some_and(|value| !matches!(value, Value::Null)),
1637 Filter::In { field, values } => {
1638 queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
1639 values
1640 .iter()
1641 .any(|value| queue_values_equal(&candidate, value))
1642 })
1643 }
1644 Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
1645 .is_some_and(|candidate| {
1646 queue_compare_values(&candidate, low, CompareOp::Ge)
1647 && queue_compare_values(&candidate, high, CompareOp::Le)
1648 }),
1649 Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
1650 .is_some_and(|value| queue_like_matches(&value, pattern)),
1651 Filter::StartsWith { field, prefix } => {
1652 queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
1653 }
1654 Filter::EndsWith { field, suffix } => {
1655 queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
1656 }
1657 Filter::Contains { field, substring } => {
1658 queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
1659 }
1660 Filter::CompareExpr { .. } => false,
1661 }
1662}
1663
1664fn queue_filter_field_value(
1665 message: &QueueMessageView,
1666 dlq: bool,
1667 field: &FieldRef,
1668) -> Option<Value> {
1669 match field {
1670 FieldRef::TableColumn { table, column } if table.is_empty() => {
1671 queue_projection_value(message, dlq, column)
1672 .or_else(|| queue_payload_field_value(&message.payload, column))
1673 }
1674 FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
1675 .or_else(|| queue_payload_field_value(&message.payload, column)),
1676 _ => None,
1677 }
1678}
1679
1680fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
1681 let Value::Json(bytes) = payload else {
1682 return None;
1683 };
1684 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1685 let value = json.get(field)?;
1686 json_value_to_schema_value(value)
1687}
1688
1689fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
1690 if matches!(value, crate::json::Value::Null) {
1691 Some(Value::Null)
1692 } else if let Some(value) = value.as_bool() {
1693 Some(Value::Boolean(value))
1694 } else if let Some(value) = value.as_i64() {
1695 Some(Value::Integer(value))
1696 } else if let Some(value) = value.as_u64() {
1697 Some(Value::UnsignedInteger(value))
1698 } else if let Some(value) = value.as_f64() {
1699 Some(Value::Float(value))
1700 } else if let Some(value) = value.as_str() {
1701 Some(Value::text(value.to_string()))
1702 } else {
1703 Some(Value::Json(value.to_string_compact().into_bytes()))
1704 }
1705}
1706
1707fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
1708 queue_filter_field_value(message, dlq, field).and_then(|value| match value {
1709 Value::Text(value) => Some(value.to_string()),
1710 Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
1711 Value::Integer(value) => Some(value.to_string()),
1712 Value::UnsignedInteger(value) => Some(value.to_string()),
1713 Value::Float(value) => Some(value.to_string()),
1714 Value::Boolean(value) => Some(value.to_string()),
1715 _ => None,
1716 })
1717}
1718
1719fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
1720 match op {
1721 CompareOp::Eq => queue_values_equal(left, right),
1722 CompareOp::Ne => !queue_values_equal(left, right),
1723 CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
1724 CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
1725 CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
1726 CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
1727 }
1728}
1729
1730fn queue_values_equal(left: &Value, right: &Value) -> bool {
1731 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1732 return (left - right).abs() < f64::EPSILON;
1733 }
1734 match (left, right) {
1735 (Value::Text(left), Value::Text(right)) => left == right,
1736 (Value::Boolean(left), Value::Boolean(right)) => left == right,
1737 _ => left == right,
1738 }
1739}
1740
1741fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
1742 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1743 return left.partial_cmp(&right);
1744 }
1745 match (left, right) {
1746 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1747 _ => None,
1748 }
1749}
1750
1751fn queue_value_number(value: &Value) -> Option<f64> {
1752 match value {
1753 Value::Integer(value) => Some(*value as f64),
1754 Value::UnsignedInteger(value) => Some(*value as f64),
1755 Value::Float(value) => Some(*value),
1756 Value::Text(value) => value.parse().ok(),
1757 _ => None,
1758 }
1759}
1760
1761fn queue_like_matches(value: &str, pattern: &str) -> bool {
1762 if pattern == "%" {
1763 return true;
1764 }
1765 let starts_wild = pattern.starts_with('%');
1766 let ends_wild = pattern.ends_with('%');
1767 let needle = pattern.trim_matches('%');
1768 match (starts_wild, ends_wild) {
1769 (true, true) => value.contains(needle),
1770 (true, false) => value.ends_with(needle),
1771 (false, true) => value.starts_with(needle),
1772 (false, false) => value == needle,
1773 }
1774}
1775
1776pub(super) fn queue_message_view_by_id(
1777 store: &UnifiedStore,
1778 queue: &str,
1779 message_id: EntityId,
1780) -> RedDBResult<Option<QueueMessageView>> {
1781 let manager = queue_manager(store, queue)?;
1782 Ok(manager
1783 .get(message_id)
1784 .and_then(queue_message_view_from_entity))
1785}
1786
1787pub(super) fn sort_queue_messages(
1788 messages: &mut [QueueMessageView],
1789 config: &QueueRuntimeConfig,
1790 side: QueueSide,
1791) {
1792 messages.sort_by(|left, right| {
1793 if config.priority {
1794 right
1795 .priority
1796 .cmp(&left.priority)
1797 .then_with(|| match side {
1798 QueueSide::Left => left.position.cmp(&right.position),
1799 QueueSide::Right => right.position.cmp(&left.position),
1800 })
1801 .then_with(|| left.id.raw().cmp(&right.id.raw()))
1802 } else {
1803 match side {
1804 QueueSide::Left => left.position.cmp(&right.position),
1805 QueueSide::Right => right.position.cmp(&left.position),
1806 }
1807 .then_with(|| left.id.raw().cmp(&right.id.raw()))
1808 }
1809 });
1810}
1811
1812pub(super) fn next_queue_position(
1813 store: &UnifiedStore,
1814 queue: &str,
1815 side: QueueSide,
1816) -> RedDBResult<u64> {
1817 let messages = load_queue_message_views(store, queue)?;
1818 if messages.is_empty() {
1819 return Ok(QUEUE_POSITION_CENTER);
1820 }
1821 match side {
1822 QueueSide::Left => Ok(messages
1823 .iter()
1824 .map(|message| message.position)
1825 .min()
1826 .unwrap_or(QUEUE_POSITION_CENTER)
1827 .saturating_sub(1)),
1828 QueueSide::Right => Ok(messages
1829 .iter()
1830 .map(|message| message.position)
1831 .max()
1832 .unwrap_or(QUEUE_POSITION_CENTER)
1833 .saturating_add(1)),
1834 }
1835}
1836
1837pub(super) fn increment_queue_attempts(
1838 store: &UnifiedStore,
1839 queue: &str,
1840 message_id: EntityId,
1841) -> RedDBResult<u32> {
1842 let manager = queue_manager(store, queue)?;
1843 let mut entity = manager
1844 .get(message_id)
1845 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1846 match &mut entity.data {
1847 EntityData::QueueMessage(message) => {
1848 message.attempts = message.attempts.saturating_add(1);
1849 let attempts = message.attempts;
1850 manager
1851 .update(entity)
1852 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1853 Ok(attempts)
1854 }
1855 _ => Err(RedDBError::Query(format!(
1856 "entity '{}' is not a queue message",
1857 message_id.raw()
1858 ))),
1859 }
1860}
1861
1862pub(super) fn queue_message_attempts(
1863 store: &UnifiedStore,
1864 queue: &str,
1865 message_id: EntityId,
1866) -> RedDBResult<u32> {
1867 Ok(queue_message_data(store, queue, message_id)?.attempts)
1868}
1869
1870pub(super) fn queue_message_max_attempts(
1871 store: &UnifiedStore,
1872 queue: &str,
1873 message_id: EntityId,
1874) -> RedDBResult<u32> {
1875 Ok(queue_message_data(store, queue, message_id)?.max_attempts)
1876}
1877
1878pub(super) fn queue_message_payload(
1879 store: &UnifiedStore,
1880 queue: &str,
1881 message_id: EntityId,
1882) -> RedDBResult<Value> {
1883 Ok(queue_message_data(store, queue, message_id)?.payload)
1884}
1885
1886pub(super) fn queue_message_pending_any(
1887 store: &UnifiedStore,
1888 queue: &str,
1889 message_id: EntityId,
1890) -> RedDBResult<bool> {
1891 Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
1892}
1893
1894pub(super) fn queue_message_pending_for_group(
1895 store: &UnifiedStore,
1896 queue: &str,
1897 group: &str,
1898 message_id: EntityId,
1899) -> RedDBResult<bool> {
1900 Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1901}
1902
1903pub(super) fn queue_message_acked_for_group(
1904 store: &UnifiedStore,
1905 queue: &str,
1906 group: &str,
1907 message_id: EntityId,
1908) -> RedDBResult<bool> {
1909 Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1910}
1911
1912fn queue_manager(
1913 store: &UnifiedStore,
1914 queue: &str,
1915) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
1916 store
1917 .get_collection(queue)
1918 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
1919}
1920
1921pub(super) fn queue_message_data(
1922 store: &UnifiedStore,
1923 queue: &str,
1924 message_id: EntityId,
1925) -> RedDBResult<QueueMessageData> {
1926 let manager = queue_manager(store, queue)?;
1927 let entity = manager
1928 .get(message_id)
1929 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1930 match entity.data {
1931 EntityData::QueueMessage(message) => Ok(message),
1932 _ => Err(RedDBError::Query(format!(
1933 "entity '{}' is not a queue message",
1934 message_id.raw()
1935 ))),
1936 }
1937}
1938
1939fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
1940 let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
1941 store
1942 .insert_auto(
1943 QUEUE_META_COLLECTION,
1944 UnifiedEntity::new(
1945 EntityId::new(0),
1946 EntityKind::TableRow {
1947 table: Arc::from(QUEUE_META_COLLECTION),
1948 row_id: 0,
1949 },
1950 EntityData::Row(RowData {
1951 columns: Vec::new(),
1952 named: Some(fields),
1953 schema: None,
1954 }),
1955 ),
1956 )
1957 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1958 Ok(())
1959}
1960
1961pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
1962 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1963 return;
1964 };
1965 let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
1966 for row in rows {
1967 let _ = store.delete(QUEUE_META_COLLECTION, row.id);
1968 }
1969}
1970
1971pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
1972 let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
1973}
1974
1975fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
1976 format!("{queue}:{}", message_id.raw())
1977}
1978
1979pub(super) fn queue_message_lock_handle(
1980 runtime: &RedDBRuntime,
1981 queue: &str,
1982 message_id: EntityId,
1983) -> Arc<parking_lot::Mutex<()>> {
1984 let key = queue_message_lock_key(queue, message_id);
1985 if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
1986 return lock;
1987 }
1988
1989 let mut locks = runtime.inner.queue_message_locks.write();
1990 locks
1991 .entry(key)
1992 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
1993 .clone()
1994}
1995
1996pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
1997 runtime
1998 .inner
1999 .queue_message_locks
2000 .write()
2001 .remove(&queue_message_lock_key(queue, message_id));
2002}
2003
2004fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2005 let raw = value.strip_prefix('e').unwrap_or(value);
2006 raw.parse::<u64>()
2007 .map(EntityId::new)
2008 .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2009}
2010
2011fn message_id_string(message_id: EntityId) -> String {
2012 message_id.raw().to_string()
2013}
2014
2015pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2016 match row.get_field(field)?.clone() {
2017 Value::Text(value) => Some(value.to_string()),
2018 Value::NodeRef(value) => Some(value),
2019 Value::EdgeRef(value) => Some(value),
2020 Value::TableRef(value) => Some(value),
2021 _ => None,
2022 }
2023}
2024
2025pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2026 match row.get_field(field)?.clone() {
2027 Value::UnsignedInteger(value) => Some(value),
2028 Value::Integer(value) if value >= 0 => Some(value as u64),
2029 Value::Float(value) if value >= 0.0 => Some(value as u64),
2030 Value::Text(value) => value.parse().ok(),
2031 _ => None,
2032 }
2033}
2034
2035fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2036 match row.get_field(field)?.clone() {
2037 Value::Boolean(value) => Some(value),
2038 Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2039 "true" => Some(true),
2040 "false" => Some(false),
2041 _ => None,
2042 },
2043 _ => None,
2044 }
2045}
2046
2047fn queue_collection_contract(
2048 name: &str,
2049 priority: bool,
2050 ttl_ms: Option<u64>,
2051) -> crate::physical::CollectionContract {
2052 let now = current_unix_ms();
2053 let mut context_index_fields = Vec::new();
2054 if priority {
2055 context_index_fields.push("priority".to_string());
2056 }
2057
2058 crate::physical::CollectionContract {
2059 name: name.to_string(),
2060 declared_model: crate::catalog::CollectionModel::Queue,
2061 schema_mode: crate::catalog::SchemaMode::Dynamic,
2062 origin: crate::physical::ContractOrigin::Explicit,
2063 version: 1,
2064 created_at_unix_ms: now,
2065 updated_at_unix_ms: now,
2066 default_ttl_ms: ttl_ms,
2067 vector_dimension: None,
2068 vector_metric: None,
2069 context_index_fields,
2070 declared_columns: Vec::new(),
2071 table_def: None,
2072 timestamps_enabled: false,
2073 context_index_enabled: false,
2074 metrics_raw_retention_ms: None,
2075 metrics_rollup_policies: Vec::new(),
2076 metrics_tenant_identity: None,
2077 metrics_namespace: None,
2078 append_only: true,
2082 subscriptions: Vec::new(),
2083 }
2084}
2085
2086fn current_unix_ms() -> u128 {
2087 std::time::SystemTime::now()
2088 .duration_since(std::time::UNIX_EPOCH)
2089 .unwrap_or_default()
2090 .as_millis()
2091}
2092
2093pub(super) fn now_ns() -> u64 {
2094 std::time::SystemTime::now()
2095 .duration_since(std::time::UNIX_EPOCH)
2096 .unwrap_or_default()
2097 .as_nanos() as u64
2098}
2099
2100pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2101 Metadata::with_fields(
2102 [(
2103 "_ttl_ms".to_string(),
2104 if ttl_ms <= i64::MAX as u64 {
2105 MetadataValue::Int(ttl_ms as i64)
2106 } else {
2107 MetadataValue::Timestamp(ttl_ms)
2108 },
2109 )]
2110 .into_iter()
2111 .collect(),
2112 )
2113}
2114
2115fn estimate_payload_bytes(payload: &Value) -> u64 {
2117 match payload {
2118 Value::Json(v) => v.len() as u64,
2119 Value::Text(s) => s.len() as u64,
2120 _ => 64,
2121 }
2122}