1use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::runtime::impl_core::{
9 clear_current_auth_identity, clear_current_tenant, current_auth_identity, current_tenant,
10 set_current_auth_identity, set_current_tenant,
11};
12use crate::runtime::queue_telemetry::NackOutcomeLabel;
13use crate::storage::queue::QueueMode;
14use crate::storage::unified::entity::{QueueMessageData, RowData};
15use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
16use crate::telemetry::operator_event::OperatorEvent;
17
18use super::*;
19
20use super::primary_queue_store::PrimaryQueueStore;
21use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
22use crate::storage::queue::lifecycle::{
23 QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
24};
25
26pub(super) fn runtime_lifecycle(
33 runtime: &RedDBRuntime,
34 queue: &str,
35) -> (
36 QueueLifecycle<PrimaryQueueStore>,
37 PrimaryQueueStore,
38 QueueTxn,
39) {
40 let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
41 let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
42 let txn = primary_for_lifecycle.new_txn();
43 let cfg = primary_for_lifecycle.lifecycle_config(queue);
44 (
45 QueueLifecycle::new(primary_for_lifecycle, cfg)
46 .with_telemetry(Arc::clone(&runtime.inner.queue_telemetry)),
47 primary_for_lookup,
48 txn,
49 )
50}
51
52pub(crate) const QUEUE_READ_WAIT_CANCELLED: &str =
58 "QUEUE READ WAIT cancelled — server shutting down";
59
60pub(crate) const QUEUE_MAX_WAIT_MS_CONFIG_KEY: &str = "red.config.queue.max_wait_ms";
64
65pub(crate) const QUEUE_MAX_WAIT_MS_DEFAULT: u64 = 60_000;
68
69#[derive(Debug)]
80pub(crate) enum RedwireWaitOutcome {
81 Delivered(Vec<crate::serde_json::Value>),
82 TimedOut,
83 Cancelled,
84}
85
86pub(super) fn queue_wait_scope() -> String {
91 crate::runtime::impl_core::current_tenant().unwrap_or_default()
92}
93
94fn with_redwire_wait_context<T>(
95 auth_identity: &Option<(String, crate::auth::Role)>,
96 tenant: &Option<String>,
97 f: impl FnOnce() -> T,
98) -> T {
99 let previous_auth = current_auth_identity();
100 let previous_tenant = current_tenant();
101 match tenant {
102 Some(t) => set_current_tenant(t.clone()),
103 None => clear_current_tenant(),
104 }
105 match auth_identity {
106 Some((username, role)) => set_current_auth_identity(username.clone(), *role),
107 None => clear_current_auth_identity(),
108 }
109 let result = f();
110 match previous_tenant {
111 Some(t) => set_current_tenant(t),
112 None => clear_current_tenant(),
113 }
114 match previous_auth {
115 Some((username, role)) => set_current_auth_identity(username, role),
116 None => clear_current_auth_identity(),
117 }
118 result
119}
120
121fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
125 use crate::storage::query::ast::QueueSide as Ast;
126 match side {
127 Ast::Left => LcQueueSide::Left,
128 Ast::Right => LcQueueSide::Right,
129 }
130}
131
132fn map_qse(err: QueueStoreError) -> RedDBError {
135 match err {
136 QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
137 "delivery_id '{id}' does not resolve to a live pending delivery"
138 )),
139 QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
140 QueueStoreError::ReplicaImmutable => {
141 RedDBError::Internal("replica QueueStore is immutable".to_string())
142 }
143 }
144}
145
146pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
153
154pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
156
157pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
159
160const OUTBOX_WARN_BYTES: u64 = 1 << 30;
162
163const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
165
166static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
168
169const QUEUE_META_COLLECTION: &str = "red_queue_meta";
170const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
171const WORK_DEFAULT_GROUP: &str = "_work_default";
172const FANOUT_GROUP_PREFIX: &str = "_fanout_";
173
174#[derive(Debug, Clone)]
175pub(super) struct QueueRuntimeConfig {
176 pub(super) mode: QueueMode,
177 pub(super) priority: bool,
178 pub(super) max_size: Option<usize>,
179 pub(super) ttl_ms: Option<u64>,
180 pub(super) dlq: Option<String>,
181 pub(super) max_attempts: u32,
182 pub(super) lock_deadline_ms: u64,
183 pub(super) in_flight_cap_per_group: u32,
184 pub(super) retry_delay_ms: Option<u64>,
189}
190
191#[derive(Debug, Clone)]
192struct QueueGroupEntry {
193 entity_id: EntityId,
194 group: String,
195}
196
197#[derive(Debug, Clone)]
198pub(super) struct QueuePendingEntry {
199 pub(super) entity_id: EntityId,
200 group: String,
201 pub(super) message_id: EntityId,
202 consumer: String,
203 pub(super) delivered_at_ns: u64,
204 pub(super) delivery_count: u32,
205}
206
207#[derive(Debug, Clone)]
208pub(super) struct QueueAckEntry {
209 entity_id: EntityId,
210 group: String,
211 pub(super) message_id: EntityId,
212}
213
214#[derive(Debug, Clone)]
215pub(super) struct QueueMessageView {
216 pub(super) id: EntityId,
217 position: u64,
218 priority: i32,
219 pub(super) payload: Value,
220 attempts: u32,
221 pub(super) max_attempts: u32,
222 enqueued_at_ns: u64,
223 pub(super) available_at_ns: Option<u64>,
227}
228
229impl QueueMessageView {
230 pub(super) fn is_available_now(&self) -> bool {
235 match self.available_at_ns {
236 Some(at) => at <= now_ns(),
237 None => true,
238 }
239 }
240}
241
242impl RedDBRuntime {
243 pub(super) fn group_read_with_optional_wait(
252 &self,
253 queue: &str,
254 group: &str,
255 consumer: &str,
256 count: usize,
257 wait_ms: Option<u64>,
258 ) -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
259 let do_read =
260 |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
261 let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
262 lifecycle
263 .group_read(&txn, queue, group, consumer, count)
264 .map_err(map_qse)
265 };
266
267 let delivered = do_read(self)?;
268 let Some(wait_ms) = wait_ms else {
269 return Ok(delivered);
270 };
271 if !delivered.is_empty() {
272 return Ok(delivered);
273 }
274 let registry = self.queue_wait_registry();
284 let scope = queue_wait_scope();
285 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
286 let telemetry = self.queue_telemetry();
287 telemetry.record_wait_started(&scope, queue);
288 let wait_start = std::time::Instant::now();
289 let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
290 let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
291 telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
292 };
293 loop {
294 let snapshot = registry.snapshot(&scope, queue);
298 let delivered = do_read(self)?;
299 if !delivered.is_empty() {
300 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
301 return Ok(delivered);
302 }
303 if registry.is_cancelled() {
304 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
305 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
306 }
307 if std::time::Instant::now() >= deadline {
308 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
309 return Ok(Vec::new());
310 }
311 let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
321 Some(at_ns) => {
322 let now_ns = now_ns();
323 if at_ns <= now_ns {
324 deadline.min(std::time::Instant::now())
326 } else {
327 let wait_ns = at_ns - now_ns;
328 let due_instant =
329 std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
330 deadline.min(due_instant)
331 }
332 }
333 None => deadline,
334 };
335 match registry.wait_until(&snapshot, park_deadline) {
336 crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
337 crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
338 if std::time::Instant::now() >= deadline {
342 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
343 return Ok(Vec::new());
344 }
345 continue;
346 }
347 crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
348 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
349 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
350 }
351 }
352 }
353 }
354
355 pub(crate) async fn redwire_queue_wait_json(
372 &self,
373 queue: &str,
374 group: Option<&str>,
375 consumer: &str,
376 count: usize,
377 wait_ms: u64,
378 auth_identity: Option<(String, crate::auth::Role)>,
379 tenant: Option<String>,
380 ) -> RedDBResult<RedwireWaitOutcome> {
381 let group_owned: RedDBResult<String> =
382 with_redwire_wait_context(&auth_identity, &tenant, || {
383 let expr = crate::storage::query::ast::QueryExpr::QueueCommand(
384 crate::storage::query::ast::QueueCommand::GroupRead {
385 queue: queue.to_string(),
386 group: group.map(str::to_string),
387 consumer: consumer.to_string(),
388 count,
389 wait_ms: Some(wait_ms),
390 },
391 );
392 self.check_query_privilege(&expr)
393 .map_err(RedDBError::Query)?;
394 let store = self.inner.db.store();
395 ensure_queue_exists(store.as_ref(), queue)?;
396 let config = load_queue_config(store.as_ref(), queue);
397 resolve_read_group(store.as_ref(), queue, group, consumer, &config)
398 });
399 let group_owned = group_owned?;
400 let group_ref = group_owned.as_str();
401
402 let do_read =
403 |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
404 with_redwire_wait_context(&auth_identity, &tenant, || {
405 let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
406 lifecycle
407 .group_read(&txn, queue, group_ref, consumer, count)
408 .map_err(map_qse)
409 })
410 };
411
412 let render = |delivered: Vec<crate::runtime::queue_lifecycle::DeliveredMessage>| {
413 RedwireWaitOutcome::Delivered(
414 delivered.into_iter().map(delivered_message_json).collect(),
415 )
416 };
417
418 let delivered = do_read(self)?;
420 if !delivered.is_empty() {
421 return Ok(render(delivered));
422 }
423
424 let registry = self.queue_wait_registry();
425 let scope = with_redwire_wait_context(&auth_identity, &tenant, queue_wait_scope);
426 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
427 let telemetry = self.queue_telemetry();
428 telemetry.record_wait_started(&scope, queue);
429 let wait_start = std::time::Instant::now();
430 tracing::debug!(
431 target: "reddb::redwire::queue_wait",
432 queue,
433 group = group_ref,
434 consumer,
435 count,
436 wait_ms,
437 scope = scope.as_str(),
438 "redwire queue wait parked"
439 );
440 let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
441 let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
442 telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
443 tracing::debug!(
444 target: "reddb::redwire::queue_wait",
445 queue,
446 group = group_ref,
447 consumer,
448 count,
449 wait_ms,
450 scope = scope.as_str(),
451 outcome = outcome.as_str(),
452 duration_ms = elapsed_ms,
453 "redwire queue wait resolved"
454 );
455 };
456 loop {
457 let waiter = registry.async_waiter(&scope, queue);
461 let delivered = do_read(self)?;
462 if !delivered.is_empty() {
463 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
464 return Ok(render(delivered));
465 }
466 if registry.is_cancelled() {
467 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
468 return Ok(RedwireWaitOutcome::Cancelled);
469 }
470 if std::time::Instant::now() >= deadline {
471 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
472 return Ok(RedwireWaitOutcome::TimedOut);
473 }
474 let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
475 Some(at_ns) => {
476 let now_ns = now_ns();
477 if at_ns <= now_ns {
478 deadline.min(std::time::Instant::now())
479 } else {
480 let wait_ns = at_ns - now_ns;
481 let due_instant =
482 std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
483 deadline.min(due_instant)
484 }
485 }
486 None => deadline,
487 };
488 match registry.wait_until_async(&waiter, park_deadline).await {
493 crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
494 crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
495 if std::time::Instant::now() >= deadline {
496 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
497 return Ok(RedwireWaitOutcome::TimedOut);
498 }
499 continue;
500 }
501 crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
502 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
503 return Ok(RedwireWaitOutcome::Cancelled);
504 }
505 }
506 }
507 }
508
509 pub(crate) fn redwire_queue_wait_cap_check(&self, wait_ms: u64) -> Result<(), String> {
518 let cap = self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
519 if wait_ms > cap {
520 Err(format!(
521 "queue-wait WAIT {wait_ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
522 ))
523 } else {
524 Ok(())
525 }
526 }
527
528 pub(crate) fn enqueue_event_payload(
529 &self,
530 queue: &str,
531 payload: Value,
532 ) -> RedDBResult<EntityId> {
533 let store = self.inner.db.store();
534 if store.get_collection(queue).is_none() {
536 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
537 }
538
539 let payload_bytes = estimate_payload_bytes(&payload);
541 let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
542
543 if outbox_bytes > OUTBOX_MAX_BYTES {
545 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
546 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
547 return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
548 }
549
550 if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
552 tracing::warn!(
553 outbox_bytes,
554 warn_threshold = OUTBOX_WARN_BYTES,
555 "event outbox approaching capacity warning threshold"
556 );
557 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
558 queue: queue.to_string(),
559 dlq: format!("{queue}_outbox_dlq"),
560 reason: "outbox_warn_bytes_exceeded".to_string(),
561 }
562 .emit_global();
563 }
564
565 let config = load_queue_config(store.as_ref(), queue);
566
567 if let Some(max_size) = config.max_size {
569 let current_len = load_queue_message_views(store.as_ref(), queue)
570 .unwrap_or_default()
571 .len();
572 if current_len >= max_size {
573 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
574 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
575 return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
576 }
577 if current_len * 10 >= max_size * 8 {
579 tracing::warn!(
580 queue = %queue,
581 size = current_len,
582 max = max_size,
583 "event target queue near capacity"
584 );
585 }
586 }
587
588 let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
589 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
590 Ok(id)
591 }
592
593 fn route_event_to_outbox_dlq(
595 &self,
596 queue: &str,
597 payload: Value,
598 reason: &str,
599 ) -> RedDBResult<EntityId> {
600 let dlq_name = format!("{queue}_outbox_dlq");
601 EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
602
603 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
604 queue: queue.to_string(),
605 dlq: dlq_name.clone(),
606 reason: reason.to_string(),
607 }
608 .emit_global();
609
610 let store = self.inner.db.store();
611 if store.get_collection(&dlq_name).is_none() {
612 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
613 }
614 let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
615 let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
616 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
617 Ok(id)
618 }
619
620 fn enqueue_event_payload_raw(
622 &self,
623 store: &UnifiedStore,
624 queue: &str,
625 config: &QueueRuntimeConfig,
626 payload: Value,
627 ) -> RedDBResult<EntityId> {
628 let position = next_queue_position(store, queue, QueueSide::Right)?;
629 let mut entity = UnifiedEntity::new(
630 EntityId::new(0),
631 EntityKind::QueueMessage {
632 queue: queue.to_string(),
633 position,
634 },
635 EntityData::QueueMessage(QueueMessageData {
636 payload,
637 priority: None,
638 enqueued_at_ns: now_ns(),
639 attempts: 0,
640 max_attempts: config.max_attempts,
641 acked: false,
642 }),
643 );
644 if let Some(xid) = self.current_xid() {
645 entity.set_xmin(xid);
646 }
647 let id = store
648 .insert_auto(queue, entity)
649 .map_err(|err| RedDBError::Internal(err.to_string()))?;
650 if let Some(ttl_ms) = config.ttl_ms {
651 store
652 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
653 .map_err(|err| RedDBError::Internal(err.to_string()))?;
654 }
655 self.invalidate_result_cache_for_table(queue);
656 Ok(id)
657 }
658
659 pub fn execute_create_queue(
660 &self,
661 raw_query: &str,
662 query: &CreateQueueQuery,
663 ) -> RedDBResult<RuntimeQueryResult> {
664 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
665 if query.dlq.as_deref() == Some(query.name.as_str()) {
666 return Err(RedDBError::Query(
667 "dead-letter queue must be different from the source queue".to_string(),
668 ));
669 }
670
671 let store = self.inner.db.store();
672 let exists = store.get_collection(&query.name).is_some();
673 if exists {
674 if query.if_not_exists {
675 return Ok(RuntimeQueryResult::ok_message(
676 raw_query.to_string(),
677 &format!("queue '{}' already exists", query.name),
678 "create",
679 ));
680 }
681 return Err(RedDBError::Query(format!(
682 "queue '{}' already exists",
683 query.name
684 )));
685 }
686
687 store
688 .create_collection(&query.name)
689 .map_err(|err| RedDBError::Internal(err.to_string()))?;
690 if let Some(ttl_ms) = query.ttl_ms {
691 self.inner
692 .db
693 .set_collection_default_ttl_ms(&query.name, ttl_ms);
694 }
695 self.inner
696 .db
697 .save_collection_contract(queue_collection_contract(
698 &query.name,
699 query.priority,
700 query.ttl_ms,
701 ))
702 .map_err(|err| RedDBError::Internal(err.to_string()))?;
703 save_queue_config(
704 store.as_ref(),
705 &query.name,
706 &QueueRuntimeConfig {
707 mode: query.mode,
708 priority: query.priority,
709 max_size: query.max_size,
710 ttl_ms: query.ttl_ms,
711 dlq: query.dlq.clone(),
712 max_attempts: query.max_attempts,
713 lock_deadline_ms: query.lock_deadline_ms,
714 in_flight_cap_per_group: query.in_flight_cap_per_group,
715 retry_delay_ms: query.retry_delay_ms,
716 },
717 )?;
718
719 if let Some(dlq) = &query.dlq {
720 if store.get_collection(dlq).is_none() {
721 store
722 .create_collection(dlq)
723 .map_err(|err| RedDBError::Internal(err.to_string()))?;
724 self.inner
725 .db
726 .save_collection_contract(queue_collection_contract(dlq, false, None))
727 .map_err(|err| RedDBError::Internal(err.to_string()))?;
728 }
729 }
730
731 self.invalidate_result_cache();
732 self.inner
733 .db
734 .persist_metadata()
735 .map_err(|err| RedDBError::Internal(err.to_string()))?;
736 let mut type_tags = Vec::new();
741 if let Some(dlq) = &query.dlq {
742 type_tags.push(format!("dlq:{}", dlq));
743 }
744 self.schema_vocabulary_apply(
745 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
746 collection: query.name.clone(),
747 columns: vec!["payload".to_string()],
748 type_tags,
749 description: None,
750 },
751 );
752
753 let mut msg = format!("queue '{}' created", query.name);
754 msg.push_str(&format!(" (mode={})", query.mode.as_str()));
755 if query.priority {
756 msg.push_str(" (priority)");
757 }
758 if let Some(max_size) = query.max_size {
759 msg.push_str(&format!(" (max_size={max_size})"));
760 }
761 if let Some(ttl_ms) = query.ttl_ms {
762 msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
763 }
764 if let Some(dlq) = &query.dlq {
765 msg.push_str(&format!(
766 " (dlq={dlq}, max_attempts={})",
767 query.max_attempts
768 ));
769 }
770
771 Ok(RuntimeQueryResult::ok_message(
772 raw_query.to_string(),
773 &msg,
774 "create",
775 ))
776 }
777
778 pub fn execute_alter_queue(
779 &self,
780 raw_query: &str,
781 query: &AlterQueueQuery,
782 ) -> RedDBResult<RuntimeQueryResult> {
783 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
784 let store = self.inner.db.store();
785 ensure_queue_exists(store.as_ref(), &query.name)?;
786
787 let mut config = load_queue_config(store.as_ref(), &query.name);
788 let mut summary: Vec<String> = Vec::new();
789
790 if let Some(new_mode) = query.mode {
791 let pending =
792 load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
793 if !pending.is_empty() {
794 tracing::warn!(
795 queue = %query.name,
796 pending_count = pending.len(),
797 new_mode = %new_mode.as_str(),
798 "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
799 new reads use {}",
800 pending.len(),
801 new_mode.as_str(),
802 );
803 }
804 config.mode = new_mode;
805 summary.push(format!("mode={}", new_mode.as_str()));
806 }
807 if let Some(max_attempts) = query.max_attempts {
808 config.max_attempts = max_attempts;
809 summary.push(format!("max_attempts={max_attempts}"));
810 }
811 if let Some(lock_deadline_ms) = query.lock_deadline_ms {
812 config.lock_deadline_ms = lock_deadline_ms;
813 summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
814 }
815 if let Some(in_flight_cap) = query.in_flight_cap_per_group {
816 config.in_flight_cap_per_group = in_flight_cap;
817 summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
818 }
819 if let Some(dlq) = &query.dlq {
820 if dlq == &query.name {
821 return Err(RedDBError::Query(
822 "dead-letter queue must be different from the source queue".to_string(),
823 ));
824 }
825 config.dlq = Some(dlq.clone());
826 summary.push(format!("dlq={dlq}"));
827 }
828 if let Some(retry_delay_ms) = query.retry_delay_ms {
829 config.retry_delay_ms = if retry_delay_ms == 0 {
830 None
831 } else {
832 Some(retry_delay_ms)
833 };
834 summary.push(format!("retry_delay_ms={retry_delay_ms}"));
835 }
836
837 save_queue_config(store.as_ref(), &query.name, &config)?;
838
839 self.invalidate_result_cache();
840 self.inner
841 .db
842 .persist_metadata()
843 .map_err(|err| RedDBError::Internal(err.to_string()))?;
844
845 Ok(RuntimeQueryResult::ok_message(
846 raw_query.to_string(),
847 &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
848 "alter",
849 ))
850 }
851
852 pub fn execute_drop_queue(
853 &self,
854 raw_query: &str,
855 query: &DropQueueQuery,
856 ) -> RedDBResult<RuntimeQueryResult> {
857 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
858 let store = self.inner.db.store();
859 if super::impl_ddl::is_system_schema_name(&query.name) {
860 return Err(RedDBError::Query("system schema is read-only".to_string()));
861 }
862 if store.get_collection(&query.name).is_none() {
863 if query.if_exists {
864 return Ok(RuntimeQueryResult::ok_message(
865 raw_query.to_string(),
866 &format!("queue '{}' does not exist", query.name),
867 "drop",
868 ));
869 }
870 return Err(RedDBError::NotFound(format!(
871 "queue '{}' not found",
872 query.name
873 )));
874 }
875 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
876 &query.name,
877 &self.inner.db.catalog_model_snapshot(),
878 )?;
879 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
880 crate::catalog::CollectionModel::Queue,
881 actual,
882 )?;
883
884 store
885 .drop_collection(&query.name)
886 .map_err(|err| RedDBError::Internal(err.to_string()))?;
887 self.inner.db.clear_collection_default_ttl_ms(&query.name);
888 self.inner
889 .db
890 .remove_collection_contract(&query.name)
891 .map_err(|err| RedDBError::Internal(err.to_string()))?;
892 remove_queue_metadata(store.as_ref(), &query.name);
893 self.invalidate_result_cache();
894 self.inner
895 .db
896 .persist_metadata()
897 .map_err(|err| RedDBError::Internal(err.to_string()))?;
898 self.schema_vocabulary_apply(
900 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
901 collection: query.name.clone(),
902 },
903 );
904
905 Ok(RuntimeQueryResult::ok_message(
906 raw_query.to_string(),
907 &format!("queue '{}' dropped", query.name),
908 "drop",
909 ))
910 }
911
912 pub fn execute_queue_command(
913 &self,
914 raw_query: &str,
915 cmd: &QueueCommand,
916 ) -> RedDBResult<RuntimeQueryResult> {
917 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
918 match cmd {
919 QueueCommand::Push {
920 queue,
921 value,
922 side,
923 priority,
924 available,
925 } => {
926 let store = self.inner.db.store();
927 ensure_queue_exists(store.as_ref(), queue)?;
928 let config = load_queue_config(store.as_ref(), queue);
929 if priority.is_some() && !config.priority {
930 return Err(RedDBError::Query(format!(
931 "queue '{}' is not a priority queue",
932 queue
933 )));
934 }
935 if let Some(max_size) = config.max_size {
936 let current_len =
937 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
938 .len();
939 if current_len >= max_size {
940 return Err(RedDBError::Query(format!(
941 "queue '{}' is full (max_size={max_size})",
942 queue
943 )));
944 }
945 }
946
947 let position = next_queue_position(store.as_ref(), queue, *side)?;
948 let mut entity = UnifiedEntity::new(
949 EntityId::new(0),
950 EntityKind::QueueMessage {
951 queue: queue.clone(),
952 position,
953 },
954 EntityData::QueueMessage(QueueMessageData {
955 payload: value.clone(),
956 priority: if config.priority { *priority } else { None },
957 enqueued_at_ns: now_ns(),
958 attempts: 0,
959 max_attempts: config.max_attempts,
960 acked: false,
961 }),
962 );
963 if let Some(xid) = self.current_xid() {
966 entity.set_xmin(xid);
967 }
968 let id = store
969 .insert_auto(queue, entity)
970 .map_err(|err| RedDBError::Internal(err.to_string()))?;
971 let available_at_ns = available.map(|a| match a {
976 crate::storage::query::ast::QueueAvailability::DelayMs(ms) => {
977 now_ns().saturating_add(ms.saturating_mul(1_000_000))
978 }
979 crate::storage::query::ast::QueueAvailability::AtUnixMs(ms) => {
980 ms.saturating_mul(1_000_000)
981 }
982 });
983 if config.ttl_ms.is_some() || available_at_ns.is_some() {
984 store
985 .set_metadata(
986 queue,
987 id,
988 queue_message_metadata(config.ttl_ms, available_at_ns),
989 )
990 .map_err(|err| RedDBError::Internal(err.to_string()))?;
991 }
992 self.record_queue_wake(&queue_wait_scope(), queue);
997 self.invalidate_result_cache();
998
999 let mut result = UnifiedResult::with_columns(vec![
1000 "message_id".into(),
1001 "side".into(),
1002 "queue".into(),
1003 ]);
1004 let mut record = UnifiedRecord::new();
1005 record.set("message_id", Value::text(message_id_string(id)));
1006 record.set(
1007 "side",
1008 Value::text(match side {
1009 QueueSide::Left => "left".to_string(),
1010 QueueSide::Right => "right".to_string(),
1011 }),
1012 );
1013 record.set("queue", Value::text(queue.clone()));
1014 result.push(record);
1015
1016 Ok(RuntimeQueryResult {
1017 query: raw_query.to_string(),
1018 mode: QueryMode::Sql,
1019 statement: "queue_push",
1020 engine: "runtime-queue",
1021 result,
1022 affected_rows: 1,
1023 statement_type: "insert",
1024 bookmark: None,
1025 })
1026 }
1027 QueueCommand::Pop { queue, side, count } => {
1028 let store = self.inner.db.store();
1029 ensure_queue_exists(store.as_ref(), queue)?;
1030 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1031 let popped = lifecycle
1032 .pop(queue, ast_side_to_lc(*side), *count, &txn)
1033 .map_err(map_qse)?;
1034
1035 let mut result =
1036 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1037 for (message_id, payload) in &popped {
1038 let mut record = UnifiedRecord::new();
1039 record.set(
1040 "message_id",
1041 Value::text(message_id_string(EntityId::new(*message_id))),
1042 );
1043 record.set("payload", payload.clone());
1044 result.push(record);
1045 }
1046 let popped_count = popped.len() as u64;
1047 if popped_count > 0 {
1048 self.invalidate_result_cache();
1049 }
1050
1051 Ok(RuntimeQueryResult {
1052 query: raw_query.to_string(),
1053 mode: QueryMode::Sql,
1054 statement: "queue_pop",
1055 engine: "runtime-queue",
1056 result,
1057 affected_rows: popped_count,
1058 statement_type: "delete",
1059 bookmark: None,
1060 })
1061 }
1062 QueueCommand::Peek { queue, count } => {
1063 let store = self.inner.db.store();
1064 ensure_queue_exists(store.as_ref(), queue)?;
1065 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1066 let messages = lifecycle.peek(queue, *count, &txn);
1067
1068 let mut result =
1069 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1070 for message in messages {
1071 let mut record = UnifiedRecord::new();
1072 record.set(
1073 "message_id",
1074 Value::text(message_id_string(EntityId::new(message.message_id))),
1075 );
1076 record.set("payload", message.payload);
1077 result.push(record);
1078 }
1079
1080 Ok(RuntimeQueryResult {
1081 query: raw_query.to_string(),
1082 mode: QueryMode::Sql,
1083 statement: "queue_peek",
1084 engine: "runtime-queue",
1085 result,
1086 affected_rows: 0,
1087 statement_type: "select",
1088 bookmark: None,
1089 })
1090 }
1091 QueueCommand::Len { queue } => {
1092 let store = self.inner.db.store();
1093 ensure_queue_exists(store.as_ref(), queue)?;
1094 let count =
1095 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
1096 as u64;
1097 let mut result = UnifiedResult::with_columns(vec!["len".into()]);
1098 let mut record = UnifiedRecord::new();
1099 record.set("len", Value::UnsignedInteger(count));
1100 result.push(record);
1101
1102 Ok(RuntimeQueryResult {
1103 query: raw_query.to_string(),
1104 mode: QueryMode::Sql,
1105 statement: "queue_len",
1106 engine: "runtime-queue",
1107 result,
1108 affected_rows: 0,
1109 statement_type: "select",
1110 bookmark: None,
1111 })
1112 }
1113 QueueCommand::Purge { queue } => {
1114 let store = self.inner.db.store();
1115 ensure_queue_exists(store.as_ref(), queue)?;
1116 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1117 let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
1118 if count > 0 {
1119 self.invalidate_result_cache();
1120 }
1121
1122 Ok(RuntimeQueryResult::ok_message(
1123 raw_query.to_string(),
1124 &format!("{count} messages purged from queue '{queue}'"),
1125 "delete",
1126 ))
1127 }
1128 QueueCommand::GroupCreate { queue, group } => {
1129 let store = self.inner.db.store();
1130 ensure_queue_exists(store.as_ref(), queue)?;
1131 if queue_group_exists(store.as_ref(), queue, group)? {
1132 return Ok(RuntimeQueryResult::ok_message(
1133 raw_query.to_string(),
1134 &format!(
1135 "consumer group '{}' already exists on queue '{}'",
1136 group, queue
1137 ),
1138 "create",
1139 ));
1140 }
1141 save_queue_group(store.as_ref(), queue, group)?;
1142 self.invalidate_result_cache();
1143
1144 Ok(RuntimeQueryResult::ok_message(
1145 raw_query.to_string(),
1146 &format!("consumer group '{}' created on queue '{}'", group, queue),
1147 "create",
1148 ))
1149 }
1150 QueueCommand::GroupRead {
1151 queue,
1152 group,
1153 consumer,
1154 count,
1155 wait_ms,
1156 } => {
1157 let store = self.inner.db.store();
1158 ensure_queue_exists(store.as_ref(), queue)?;
1159 if let Some(ms) = *wait_ms {
1165 if self.current_xid().is_some() {
1166 return Err(RedDBError::Query(
1167 "QUEUE READ … WAIT is autocommit-only: refusing to park inside an explicit transaction (BEGIN/COMMIT)"
1168 .to_string(),
1169 ));
1170 }
1171 let cap =
1172 self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
1173 if ms > cap {
1174 return Err(RedDBError::Query(format!(
1175 "QUEUE READ … WAIT {ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
1176 )));
1177 }
1178 }
1179 let config = load_queue_config(store.as_ref(), queue);
1183 let group_owned =
1184 resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
1185 let group_ref = group_owned.as_str();
1186 let delivered = self
1187 .group_read_with_optional_wait(queue, group_ref, consumer, *count, *wait_ms)?;
1188
1189 {
1193 let lease_count = u32::try_from(delivered.len()).unwrap_or(u32::MAX);
1194 let now_ns = std::time::SystemTime::now()
1195 .duration_since(std::time::UNIX_EPOCH)
1196 .map(|d| d.as_nanos() as u64)
1197 .unwrap_or(0);
1198 self.queue_presence().heartbeat(
1199 queue,
1200 group_ref,
1201 consumer,
1202 lease_count,
1203 now_ns,
1204 );
1205 }
1206
1207 let mut result = UnifiedResult::with_columns(vec![
1208 "message_id".into(),
1209 "payload".into(),
1210 "consumer".into(),
1211 "delivery_count".into(),
1212 "attempts".into(),
1213 ]);
1214
1215 for message in delivered {
1216 let mut record = UnifiedRecord::new();
1217 record.set(
1218 "message_id",
1219 Value::text(message_id_string(EntityId::new(message.message_id))),
1220 );
1221 record.set("payload", message.payload);
1222 record.set("consumer", Value::text(message.consumer));
1223 record.set(
1224 "delivery_count",
1225 Value::UnsignedInteger(u64::from(message.delivery_count)),
1226 );
1227 record.set(
1228 "attempts",
1229 Value::UnsignedInteger(u64::from(message.delivery_count)),
1230 );
1231 result.push(record);
1232 }
1233 if !result.records.is_empty() {
1234 self.invalidate_result_cache();
1235 }
1236
1237 Ok(RuntimeQueryResult {
1238 query: raw_query.to_string(),
1239 mode: QueryMode::Sql,
1240 statement: "queue_group_read",
1241 engine: "runtime-queue",
1242 result,
1243 affected_rows: 0,
1244 statement_type: "select",
1245 bookmark: None,
1246 })
1247 }
1248 QueueCommand::Pending { queue, group } => {
1249 let store = self.inner.db.store();
1250 ensure_queue_exists(store.as_ref(), queue)?;
1251 require_queue_group(store.as_ref(), queue, group)?;
1252 let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
1253 pending.sort_by_key(|entry| entry.delivered_at_ns);
1254 let current_time_ns = now_ns();
1255
1256 let mut result = UnifiedResult::with_columns(vec![
1257 "message_id".into(),
1258 "consumer".into(),
1259 "delivered_at_ns".into(),
1260 "delivery_count".into(),
1261 "idle_ms".into(),
1262 ]);
1263 for entry in pending {
1264 let mut record = UnifiedRecord::new();
1265 record.set(
1266 "message_id",
1267 Value::text(message_id_string(entry.message_id)),
1268 );
1269 record.set("consumer", Value::text(entry.consumer));
1270 record.set(
1271 "delivered_at_ns",
1272 Value::UnsignedInteger(entry.delivered_at_ns),
1273 );
1274 record.set(
1275 "delivery_count",
1276 Value::UnsignedInteger(u64::from(entry.delivery_count)),
1277 );
1278 record.set(
1279 "idle_ms",
1280 Value::UnsignedInteger(
1281 current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
1282 ),
1283 );
1284 result.push(record);
1285 }
1286
1287 Ok(RuntimeQueryResult {
1288 query: raw_query.to_string(),
1289 mode: QueryMode::Sql,
1290 statement: "queue_pending",
1291 engine: "runtime-queue",
1292 result,
1293 affected_rows: 0,
1294 statement_type: "select",
1295 bookmark: None,
1296 })
1297 }
1298 QueueCommand::Claim {
1299 queue,
1300 group,
1301 consumer,
1302 min_idle_ms,
1303 } => {
1304 let store = self.inner.db.store();
1305 ensure_queue_exists(store.as_ref(), queue)?;
1306 require_queue_group(store.as_ref(), queue, group)?;
1307 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1308 let delivered = lifecycle
1309 .claim_delivering(queue, consumer, *min_idle_ms, &txn)
1310 .map_err(map_qse)?;
1311
1312 let mut result = UnifiedResult::with_columns(vec![
1313 "message_id".into(),
1314 "payload".into(),
1315 "consumer".into(),
1316 "delivery_count".into(),
1317 ]);
1318
1319 for message in delivered {
1320 let mut record = UnifiedRecord::new();
1321 record.set(
1322 "message_id",
1323 Value::text(message_id_string(EntityId::new(message.message_id))),
1324 );
1325 record.set("payload", message.payload);
1326 record.set("consumer", Value::text(message.consumer));
1327 record.set(
1328 "delivery_count",
1329 Value::UnsignedInteger(u64::from(message.delivery_count)),
1330 );
1331 result.push(record);
1332 }
1333 if !result.records.is_empty() {
1334 self.invalidate_result_cache();
1335 }
1336 let affected_rows = result.records.len() as u64;
1337
1338 Ok(RuntimeQueryResult {
1339 query: raw_query.to_string(),
1340 mode: QueryMode::Sql,
1341 statement: "queue_claim",
1342 engine: "runtime-queue",
1343 result,
1344 affected_rows,
1345 statement_type: "update",
1346 bookmark: None,
1347 })
1348 }
1349 QueueCommand::Ack {
1350 queue,
1351 group,
1352 message_id,
1353 delivery_id,
1354 } => {
1355 let store = self.inner.db.store();
1356 ensure_queue_exists(store.as_ref(), queue)?;
1357 let (group_owned, message_entity) = resolve_ack_nack_handle(
1358 store.as_ref(),
1359 queue,
1360 group,
1361 message_id,
1362 delivery_id.as_deref(),
1363 )?;
1364 let group_ref = group_owned.as_str();
1365 require_queue_group(store.as_ref(), queue, group_ref)?;
1366 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1367 let did = match delivery_id.as_deref() {
1368 Some(d) => d.to_string(),
1369 None => ps
1370 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1371 .ok_or_else(|| {
1372 RedDBError::NotFound(format!(
1373 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1374 message_entity.raw(),
1375 queue,
1376 group_ref
1377 ))
1378 })?,
1379 };
1380 lifecycle.ack(&txn, &did).map_err(map_qse)?;
1381 self.invalidate_result_cache();
1382
1383 Ok(RuntimeQueryResult::ok_message(
1384 raw_query.to_string(),
1385 "message acknowledged",
1386 "update",
1387 ))
1388 }
1389 QueueCommand::Nack {
1390 queue,
1391 group,
1392 message_id,
1393 delivery_id,
1394 delay_ms,
1395 } => {
1396 let store = self.inner.db.store();
1397 ensure_queue_exists(store.as_ref(), queue)?;
1398 let config = load_queue_config(store.as_ref(), queue);
1399 if delay_ms.is_some() {
1405 if let Some((_, role)) = current_auth_identity() {
1406 if !role.can_write() {
1407 return Err(RedDBError::InvalidOperation(format!(
1408 "role '{role}' is not authorized to override NACK retry delay on queue '{queue}'"
1409 )));
1410 }
1411 }
1412 }
1413 let (group_owned, message_entity) = resolve_ack_nack_handle(
1414 store.as_ref(),
1415 queue,
1416 group,
1417 message_id,
1418 delivery_id.as_deref(),
1419 )?;
1420 let group_ref = group_owned.as_str();
1421 require_queue_group(store.as_ref(), queue, group_ref)?;
1422 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1423 let did = match delivery_id.as_deref() {
1424 Some(d) => d.to_string(),
1425 None => ps
1426 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1427 .ok_or_else(|| {
1428 RedDBError::NotFound(format!(
1429 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1430 message_entity.raw(),
1431 queue,
1432 group_ref
1433 ))
1434 })?,
1435 };
1436 let effective_delay_ms = delay_ms.or(config.retry_delay_ms).unwrap_or(0);
1440 let pending_attempt = ps.read_pending_attempt(&did).map_err(map_qse)?;
1441 let nack_attempts = pending_attempt.attempts.saturating_add(1);
1442 let outcome = lifecycle.nack(&txn, &did).map_err(map_qse)?;
1443 if matches!(outcome, RetirementOutcome::Requeued) && effective_delay_ms > 0 {
1447 let at_ns =
1448 now_ns().saturating_add(effective_delay_ms.saturating_mul(1_000_000));
1449 set_message_available_at_ns(
1450 store.as_ref(),
1451 queue,
1452 message_entity,
1453 Some(at_ns),
1454 config.ttl_ms,
1455 )?;
1456 }
1457 self.maybe_emit_nack_audit(
1464 queue,
1465 group_ref,
1466 &did,
1467 *delay_ms,
1468 config.retry_delay_ms,
1469 &outcome,
1470 );
1471 let outcome_label = match &outcome {
1472 RetirementOutcome::Requeued => NackOutcomeLabel::Retry,
1473 RetirementOutcome::MovedToDlq(_) => NackOutcomeLabel::Dlq,
1474 RetirementOutcome::Dropped => NackOutcomeLabel::Drop,
1475 };
1476 self.queue_telemetry().record_nacked(
1477 queue,
1478 group_ref,
1479 config.mode.as_str(),
1480 outcome_label,
1481 );
1482 if let RetirementOutcome::MovedToDlq(dlq) = &outcome {
1483 OperatorEvent::QueueDlqPromoted {
1484 queue: queue.to_string(),
1485 group: group_ref.to_string(),
1486 dlq: dlq.clone(),
1487 message_id: pending_attempt.message_id,
1488 attempts: nack_attempts,
1489 reason: format!("lifecycle_nack:{did}"),
1490 }
1491 .emit(self.audit_log());
1492 }
1493 let message = match outcome {
1494 RetirementOutcome::Requeued => {
1495 if effective_delay_ms > 0 {
1496 format!("message requeued (delay={effective_delay_ms}ms)")
1497 } else {
1498 "message requeued".to_string()
1499 }
1500 }
1501 RetirementOutcome::MovedToDlq(dlq) => {
1502 format!("message moved to dead-letter queue '{}'", dlq)
1503 }
1504 RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
1505 };
1506 self.invalidate_result_cache();
1507
1508 Ok(RuntimeQueryResult::ok_message(
1509 raw_query.to_string(),
1510 &message,
1511 "update",
1512 ))
1513 }
1514 QueueCommand::Move {
1515 source,
1516 destination,
1517 filter,
1518 limit,
1519 } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
1520 }
1521 }
1522
1523 pub fn execute_queue_select(
1524 &self,
1525 raw_query: &str,
1526 query: &QueueSelectQuery,
1527 ) -> RedDBResult<RuntimeQueryResult> {
1528 let store = self.inner.db.store();
1529 ensure_queue_exists(store.as_ref(), &query.queue)?;
1530 let config = load_queue_config(store.as_ref(), &query.queue);
1531 let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
1532 let columns = if query.columns.is_empty() {
1533 queue_projection_default_columns()
1534 } else {
1535 query.columns.clone()
1536 };
1537
1538 let mut messages =
1539 load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1540 sort_queue_messages(&mut messages, &config, QueueSide::Left);
1541
1542 let mut result = UnifiedResult::with_columns(columns.clone());
1543 for message in messages {
1544 if query
1545 .filter
1546 .as_ref()
1547 .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1548 {
1549 continue;
1550 }
1551 let record = queue_projection_record(&columns, &message, dlq)?;
1552 result.push(record);
1553 if query
1554 .limit
1555 .is_some_and(|limit| result.records.len() >= limit as usize)
1556 {
1557 break;
1558 }
1559 }
1560
1561 Ok(RuntimeQueryResult {
1562 query: raw_query.to_string(),
1563 mode: QueryMode::Sql,
1564 statement: "queue_select",
1565 engine: "runtime-queue",
1566 result,
1567 affected_rows: 0,
1568 statement_type: "select",
1569 bookmark: None,
1570 })
1571 }
1572
1573 fn execute_queue_move(
1574 &self,
1575 raw_query: &str,
1576 source: &str,
1577 destination: &str,
1578 filter: Option<&Filter>,
1579 limit: usize,
1580 ) -> RedDBResult<RuntimeQueryResult> {
1581 if source == destination {
1582 return Err(RedDBError::Query(
1583 "QUEUE MOVE source and destination must be different".to_string(),
1584 ));
1585 }
1586 let store = self.inner.db.store();
1587 ensure_queue_exists(store.as_ref(), source)?;
1588 ensure_queue_exists(store.as_ref(), destination)?;
1589 let source_config = load_queue_config(store.as_ref(), source);
1590 let destination_config = load_queue_config(store.as_ref(), destination);
1591 let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1592
1593 let mut messages =
1594 load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1595 sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1596 let selected = messages
1597 .into_iter()
1598 .filter(|message| {
1599 filter
1600 .map(|f| queue_message_matches_filter(message, source_dlq, f))
1601 .unwrap_or(true)
1602 })
1603 .take(limit)
1604 .collect::<Vec<_>>();
1605
1606 if let Some(max_size) = destination_config.max_size {
1607 let current_len =
1608 load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1609 .len();
1610 if current_len + selected.len() > max_size {
1611 return Err(RedDBError::Query(format!(
1612 "queue '{}' is full (max_size={max_size})",
1613 destination
1614 )));
1615 }
1616 }
1617
1618 for message in &selected {
1619 let lock = queue_message_lock_handle(self, source, message.id);
1620 let Some(_guard) = lock.try_lock() else {
1621 return Err(RedDBError::Query(format!(
1622 "message '{}' is locked on queue '{}'",
1623 message.id.raw(),
1624 source
1625 )));
1626 };
1627 if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1628 return Err(RedDBError::Query(format!(
1629 "message '{}' is no longer available on queue '{}'",
1630 message.id.raw(),
1631 source
1632 )));
1633 }
1634 }
1635
1636 let mut inserted = Vec::new();
1637 for message in &selected {
1638 match insert_moved_queue_message(
1639 store.as_ref(),
1640 destination,
1641 &destination_config,
1642 message,
1643 ) {
1644 Ok(id) => inserted.push(id),
1645 Err(err) => {
1646 for id in inserted {
1647 let _ = store.delete(destination, id);
1648 }
1649 return Err(err);
1650 }
1651 }
1652 }
1653
1654 let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1655 for message in &selected {
1656 move_lifecycle
1657 .delete_with_state(source, message.id.raw(), &move_txn)
1658 .map_err(map_qse)?;
1659 }
1660 if !selected.is_empty() {
1661 self.invalidate_result_cache();
1662 }
1663
1664 let selected_count = selected.len() as u64;
1665 self.audit_log().record_event(
1666 AuditEvent::builder("queue/move")
1667 .source(AuditAuthSource::System)
1668 .outcome(Outcome::Success)
1669 .resource(format!("queue:{source}->{destination}"))
1670 .fields([
1671 AuditFieldEscaper::field("source", source),
1672 AuditFieldEscaper::field("destination", destination),
1673 AuditFieldEscaper::field("selected", selected_count),
1674 AuditFieldEscaper::field("committed", selected_count),
1675 ])
1676 .build(),
1677 );
1678
1679 let mut result = UnifiedResult::with_columns(vec![
1680 "source".into(),
1681 "destination".into(),
1682 "selected".into(),
1683 "committed".into(),
1684 ]);
1685 let mut record = UnifiedRecord::new();
1686 record.set("source", Value::text(source.to_string()));
1687 record.set("destination", Value::text(destination.to_string()));
1688 record.set("selected", Value::UnsignedInteger(selected_count));
1689 record.set("committed", Value::UnsignedInteger(selected_count));
1690 result.push(record);
1691
1692 Ok(RuntimeQueryResult {
1693 query: raw_query.to_string(),
1694 mode: QueryMode::Sql,
1695 statement: "queue_move",
1696 engine: "runtime-queue",
1697 result,
1698 affected_rows: selected_count,
1699 statement_type: "update",
1700 bookmark: None,
1701 })
1702 }
1703
1704 fn maybe_emit_nack_audit(
1719 &self,
1720 queue: &str,
1721 group: &str,
1722 delivery_id: &str,
1723 override_ms: Option<u64>,
1724 default_ms: Option<u64>,
1725 outcome: &RetirementOutcome,
1726 ) {
1727 let Some(override_ms) = override_ms else {
1728 return;
1729 };
1730 let outcome_label = match outcome {
1731 RetirementOutcome::Requeued => "requeued",
1732 RetirementOutcome::MovedToDlq(_) => "dlq",
1733 RetirementOutcome::Dropped => "dropped",
1734 };
1735 const SIGNIFICANT_DELAY_MS: u64 = 60_000;
1736 let destination_changed = !matches!(outcome, RetirementOutcome::Requeued);
1737 if override_ms < SIGNIFICANT_DELAY_MS && !destination_changed {
1738 return;
1739 }
1740 self.audit_log().record_event(
1741 AuditEvent::builder("queue/nack/override")
1742 .source(AuditAuthSource::System)
1743 .outcome(Outcome::Success)
1744 .resource(format!("queue:{queue}"))
1745 .fields([
1746 AuditFieldEscaper::field("queue", queue),
1747 AuditFieldEscaper::field("group", group),
1748 AuditFieldEscaper::field("delivery_id", delivery_id),
1749 AuditFieldEscaper::field("override_delay_ms", override_ms),
1750 AuditFieldEscaper::field("default_delay_ms", default_ms.unwrap_or(0)),
1751 AuditFieldEscaper::field("outcome", outcome_label),
1752 ])
1753 .build(),
1754 );
1755 }
1756}
1757
1758fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1759 if store.get_collection(queue).is_some() {
1760 Ok(())
1761 } else {
1762 Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1763 }
1764}
1765
1766pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1767 let default = QueueRuntimeConfig {
1768 mode: QueueMode::Work,
1769 priority: false,
1770 max_size: None,
1771 ttl_ms: None,
1772 dlq: None,
1773 max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1774 lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1775 in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1776 retry_delay_ms: None,
1777 };
1778
1779 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1780 return default;
1781 };
1782 manager
1783 .query_all(|entity| {
1784 entity.data.as_row().is_some_and(|row| {
1785 row_text(row, "kind").as_deref() == Some("queue_config")
1786 && row_text(row, "queue").as_deref() == Some(queue)
1787 })
1788 })
1789 .into_iter()
1790 .find_map(|entity| {
1791 let row = entity.data.as_row()?;
1792 Some(QueueRuntimeConfig {
1793 mode: row_text(row, "mode")
1794 .as_deref()
1795 .and_then(QueueMode::parse)
1796 .unwrap_or_default(),
1797 priority: row_bool(row, "priority").unwrap_or(false),
1798 max_size: row_u64(row, "max_size").map(|value| value as usize),
1799 ttl_ms: row_u64(row, "ttl_ms"),
1800 dlq: row_text(row, "dlq"),
1801 max_attempts: row_u64(row, "max_attempts")
1802 .map(|value| value as u32)
1803 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1804 lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1805 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1806 in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1807 .map(|value| value as u32)
1808 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1809 retry_delay_ms: row_u64(row, "retry_delay_ms").filter(|v| *v > 0),
1810 })
1811 })
1812 .unwrap_or(default)
1813}
1814
1815pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1816 load_queue_config(store, queue).mode.as_str()
1817}
1818
1819fn save_queue_config(
1820 store: &UnifiedStore,
1821 queue: &str,
1822 config: &QueueRuntimeConfig,
1823) -> RedDBResult<()> {
1824 remove_meta_rows(store, |row| {
1825 row_text(row, "kind").as_deref() == Some("queue_config")
1826 && row_text(row, "queue").as_deref() == Some(queue)
1827 });
1828
1829 let mut fields = HashMap::new();
1830 fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1831 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1832 fields.insert(
1833 "mode".to_string(),
1834 Value::text(config.mode.as_str().to_string()),
1835 );
1836 fields.insert("priority".to_string(), Value::Boolean(config.priority));
1837 fields.insert(
1838 "max_size".to_string(),
1839 config
1840 .max_size
1841 .map(|value| Value::UnsignedInteger(value as u64))
1842 .unwrap_or(Value::Null),
1843 );
1844 fields.insert(
1845 "ttl_ms".to_string(),
1846 config
1847 .ttl_ms
1848 .map(Value::UnsignedInteger)
1849 .unwrap_or(Value::Null),
1850 );
1851 fields.insert(
1852 "dlq".to_string(),
1853 config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1854 );
1855 fields.insert(
1856 "max_attempts".to_string(),
1857 Value::UnsignedInteger(u64::from(config.max_attempts)),
1858 );
1859 fields.insert(
1860 "lock_deadline_ms".to_string(),
1861 Value::UnsignedInteger(config.lock_deadline_ms),
1862 );
1863 fields.insert(
1864 "in_flight_cap_per_group".to_string(),
1865 Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1866 );
1867 fields.insert(
1868 "retry_delay_ms".to_string(),
1869 config
1870 .retry_delay_ms
1871 .map(Value::UnsignedInteger)
1872 .unwrap_or(Value::Null),
1873 );
1874 insert_meta_row(store, fields)
1875}
1876
1877fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1878 remove_meta_rows(store, |row| {
1879 row_text(row, "queue").as_deref() == Some(queue)
1880 });
1881}
1882
1883fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1884 Ok(load_queue_groups(store, queue)?
1885 .into_iter()
1886 .any(|entry| entry.group == group))
1887}
1888
1889pub(super) fn require_queue_group(
1890 store: &UnifiedStore,
1891 queue: &str,
1892 group: &str,
1893) -> RedDBResult<()> {
1894 if queue_group_exists(store, queue, group)? {
1895 Ok(())
1896 } else {
1897 Err(RedDBError::NotFound(format!(
1898 "consumer group '{}' not found on queue '{}'",
1899 group, queue
1900 )))
1901 }
1902}
1903
1904pub(super) fn resolve_read_group(
1905 store: &UnifiedStore,
1906 queue: &str,
1907 group: Option<&str>,
1908 consumer: &str,
1909 config: &QueueRuntimeConfig,
1910) -> RedDBResult<String> {
1911 if let Some(group) = group {
1912 require_queue_group(store, queue, group)?;
1913 return Ok(group.to_string());
1914 }
1915
1916 match config.mode {
1917 QueueMode::Work => {
1918 if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1919 save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1920 }
1921 Ok(WORK_DEFAULT_GROUP.to_string())
1922 }
1923 QueueMode::Fanout => {
1924 let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1925 if !queue_group_exists(store, queue, &fanout_group)? {
1926 save_queue_group(store, queue, &fanout_group)?;
1927 }
1928 Ok(fanout_group)
1929 }
1930 }
1931}
1932
1933fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1934 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1935 return Ok(Vec::new());
1936 };
1937 Ok(manager
1938 .query_all(|entity| {
1939 entity.data.as_row().is_some_and(|row| {
1940 row_text(row, "kind").as_deref() == Some("queue_group")
1941 && row_text(row, "queue").as_deref() == Some(queue)
1942 })
1943 })
1944 .into_iter()
1945 .filter_map(|entity| {
1946 let row = entity.data.as_row()?;
1947 Some(QueueGroupEntry {
1948 entity_id: entity.id,
1949 group: row_text(row, "group")?,
1950 })
1951 })
1952 .collect())
1953}
1954
1955fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1956 let mut fields = HashMap::new();
1957 fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1958 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1959 fields.insert("group".to_string(), Value::text(group.to_string()));
1960 fields.insert(
1961 "created_at_ns".to_string(),
1962 Value::UnsignedInteger(now_ns()),
1963 );
1964 insert_meta_row(store, fields)
1965}
1966
1967pub(super) fn load_pending_entries(
1968 store: &UnifiedStore,
1969 queue: &str,
1970 group: Option<&str>,
1971 message_id: Option<EntityId>,
1972) -> RedDBResult<Vec<QueuePendingEntry>> {
1973 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1974 return Ok(Vec::new());
1975 };
1976 let lock_deadline_ns = load_queue_config(store, queue)
1977 .lock_deadline_ms
1978 .saturating_mul(1_000_000);
1979 let attempts_by_key: HashMap<(String, String, u64), u64> = manager
1980 .query_all(|entity| {
1981 entity.data.as_row().is_some_and(|row| {
1982 row_text(row, "kind").as_deref() == Some("queue_attempts_lc")
1983 && row_text(row, "queue").as_deref() == Some(queue)
1984 })
1985 })
1986 .into_iter()
1987 .filter_map(|entity| {
1988 let row = entity.data.as_row()?;
1989 Some((
1990 (
1991 row_text(row, "queue")?,
1992 row_text(row, "group")?,
1993 row_u64(row, "message_id")?,
1994 ),
1995 row_u64(row, "attempts").unwrap_or(1),
1996 ))
1997 })
1998 .collect();
1999 Ok(manager
2000 .query_all(|entity| {
2001 entity.data.as_row().is_some_and(|row| {
2002 matches!(
2003 row_text(row, "kind").as_deref(),
2004 Some("queue_pending") | Some("queue_pending_lc")
2005 ) && row_text(row, "queue").as_deref() == Some(queue)
2006 && group
2007 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2008 .unwrap_or(true)
2009 && message_id
2010 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2011 .unwrap_or(true)
2012 })
2013 })
2014 .into_iter()
2015 .filter_map(|entity| {
2016 let row = entity.data.as_row()?;
2017 let group = row_text(row, "group")?;
2018 let message_id = row_u64(row, "message_id")?;
2019 let kind = row_text(row, "kind")?;
2020 let delivered_at_ns = if kind == "queue_pending_lc" {
2021 row_u64(row, "lock_deadline_ns")
2022 .unwrap_or(0)
2023 .saturating_sub(lock_deadline_ns)
2024 } else {
2025 row_u64(row, "delivered_at_ns")?
2026 };
2027 let delivery_count = if kind == "queue_pending_lc" {
2028 attempts_by_key
2029 .get(&(queue.to_string(), group.clone(), message_id))
2030 .copied()
2031 .unwrap_or(1)
2032 } else {
2033 row_u64(row, "delivery_count").unwrap_or(1)
2034 };
2035 Some(QueuePendingEntry {
2036 entity_id: entity.id,
2037 group,
2038 message_id: EntityId::new(message_id),
2039 consumer: row_text(row, "consumer").unwrap_or_default(),
2040 delivered_at_ns,
2041 delivery_count: delivery_count as u32,
2042 })
2043 })
2044 .collect())
2045}
2046
2047pub(super) fn save_queue_pending(
2048 store: &UnifiedStore,
2049 queue: &str,
2050 group: &str,
2051 message_id: EntityId,
2052 consumer: &str,
2053 delivered_at_ns: u64,
2054 delivery_count: u32,
2055) -> RedDBResult<()> {
2056 remove_meta_rows(store, |row| {
2057 row_text(row, "kind").as_deref() == Some("queue_pending")
2058 && row_text(row, "queue").as_deref() == Some(queue)
2059 && row_text(row, "group").as_deref() == Some(group)
2060 && row_u64(row, "message_id") == Some(message_id.raw())
2061 });
2062
2063 let mut fields = HashMap::new();
2064 fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
2065 fields.insert("queue".to_string(), Value::text(queue.to_string()));
2066 fields.insert("group".to_string(), Value::text(group.to_string()));
2067 fields.insert(
2068 "message_id".to_string(),
2069 Value::UnsignedInteger(message_id.raw()),
2070 );
2071 fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
2072 fields.insert(
2073 "delivered_at_ns".to_string(),
2074 Value::UnsignedInteger(delivered_at_ns),
2075 );
2076 fields.insert(
2077 "delivery_count".to_string(),
2078 Value::UnsignedInteger(u64::from(delivery_count)),
2079 );
2080 insert_meta_row(store, fields)
2081}
2082
2083pub(super) fn require_pending_entry(
2084 store: &UnifiedStore,
2085 queue: &str,
2086 group: &str,
2087 message_id: EntityId,
2088) -> RedDBResult<QueuePendingEntry> {
2089 load_pending_entries(store, queue, Some(group), Some(message_id))?
2090 .into_iter()
2091 .next()
2092 .ok_or_else(|| {
2093 RedDBError::NotFound(format!(
2094 "message '{}' is not pending in group '{}' on queue '{}'",
2095 message_id.raw(),
2096 group,
2097 queue
2098 ))
2099 })
2100}
2101
2102pub(super) fn load_ack_entries(
2103 store: &UnifiedStore,
2104 queue: &str,
2105 group: Option<&str>,
2106 message_id: Option<EntityId>,
2107) -> RedDBResult<Vec<QueueAckEntry>> {
2108 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2109 return Ok(Vec::new());
2110 };
2111 Ok(manager
2112 .query_all(|entity| {
2113 entity.data.as_row().is_some_and(|row| {
2114 row_text(row, "kind").as_deref() == Some("queue_ack")
2115 && row_text(row, "queue").as_deref() == Some(queue)
2116 && group
2117 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2118 .unwrap_or(true)
2119 && message_id
2120 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2121 .unwrap_or(true)
2122 })
2123 })
2124 .into_iter()
2125 .filter_map(|entity| {
2126 let row = entity.data.as_row()?;
2127 Some(QueueAckEntry {
2128 entity_id: entity.id,
2129 group: row_text(row, "group")?,
2130 message_id: EntityId::new(row_u64(row, "message_id")?),
2131 })
2132 })
2133 .collect())
2134}
2135
2136pub(super) fn save_queue_ack(
2137 store: &UnifiedStore,
2138 queue: &str,
2139 group: &str,
2140 message_id: EntityId,
2141) -> RedDBResult<()> {
2142 let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
2143 if !existing.is_empty() {
2144 return Ok(());
2145 }
2146
2147 let mut fields = HashMap::new();
2148 fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
2149 fields.insert("queue".to_string(), Value::text(queue.to_string()));
2150 fields.insert("group".to_string(), Value::text(group.to_string()));
2151 fields.insert(
2152 "message_id".to_string(),
2153 Value::UnsignedInteger(message_id.raw()),
2154 );
2155 fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
2156 insert_meta_row(store, fields)
2157}
2158
2159pub(super) fn queue_message_completed_for_all_groups(
2160 store: &UnifiedStore,
2161 queue: &str,
2162 message_id: EntityId,
2163) -> RedDBResult<bool> {
2164 let groups = load_queue_groups(store, queue)?;
2165 let pending = load_pending_entries(store, queue, None, Some(message_id))?;
2166 if !pending.is_empty() {
2167 return Ok(false);
2168 }
2169 if groups.is_empty() {
2170 return Ok(true);
2171 }
2172
2173 let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
2174 .into_iter()
2175 .map(|entry| entry.group)
2176 .collect::<HashSet<_>>();
2177 Ok(groups
2178 .into_iter()
2179 .all(|group| acked_groups.contains(&group.group)))
2180}
2181
2182fn load_queue_message_views(
2183 store: &UnifiedStore,
2184 queue: &str,
2185) -> RedDBResult<Vec<QueueMessageView>> {
2186 load_queue_message_views_with_runtime(None, store, queue)
2187}
2188
2189pub(super) fn load_queue_message_views_with_runtime(
2196 runtime: Option<&RedDBRuntime>,
2197 store: &UnifiedStore,
2198 queue: &str,
2199) -> RedDBResult<Vec<QueueMessageView>> {
2200 let manager = store
2201 .get_collection(queue)
2202 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
2203 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
2207 let rls_filter = runtime.and_then(|rt| {
2208 crate::runtime::impl_core::rls_policy_filter_for_kind(
2209 rt,
2210 queue,
2211 crate::storage::query::ast::PolicyAction::Select,
2212 crate::storage::query::ast::PolicyTargetKind::Messages,
2213 )
2214 });
2215 let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
2216 && rls_filter.is_none()
2217 && runtime.is_some();
2218 if rls_enabled_but_denied {
2219 return Ok(Vec::new());
2221 }
2222 let filter_arc = rls_filter.map(std::sync::Arc::new);
2223 let rt_arc = runtime;
2224 Ok(manager
2225 .query_all(move |entity| {
2226 if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
2227 return false;
2228 }
2229 if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
2230 return false;
2231 }
2232 if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
2233 return crate::runtime::query_exec::evaluate_entity_filter_with_db(
2234 Some(&rt.inner.db),
2235 entity,
2236 filter,
2237 queue,
2238 queue,
2239 );
2240 }
2241 true
2242 })
2243 .into_iter()
2244 .filter_map(queue_message_view_from_entity)
2245 .map(|mut view| {
2246 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2247 view
2248 })
2249 .collect())
2250}
2251
2252fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
2253 let (position, _) = match &entity.kind {
2254 EntityKind::QueueMessage { position, queue } => (*position, queue),
2255 _ => return None,
2256 };
2257 let data = match entity.data {
2258 EntityData::QueueMessage(data) => data,
2259 _ => return None,
2260 };
2261 Some(QueueMessageView {
2262 id: entity.id,
2263 position,
2264 priority: data.priority.unwrap_or(0),
2265 payload: data.payload,
2266 attempts: data.attempts,
2267 max_attempts: data.max_attempts,
2268 enqueued_at_ns: data.enqueued_at_ns,
2269 available_at_ns: None,
2270 })
2271}
2272
2273pub(super) fn insert_moved_queue_message_payload(
2280 store: &UnifiedStore,
2281 queue: &str,
2282 payload: &Value,
2283) -> RedDBResult<EntityId> {
2284 let config = load_queue_config(store, queue);
2285 let position = next_queue_position(store, queue, QueueSide::Right)?;
2286 let enqueued_at_ns = std::time::SystemTime::now()
2287 .duration_since(std::time::UNIX_EPOCH)
2288 .map(|d| d.as_nanos() as u64)
2289 .unwrap_or(0);
2290 let entity = UnifiedEntity::new(
2291 EntityId::new(0),
2292 EntityKind::QueueMessage {
2293 queue: queue.to_string(),
2294 position,
2295 },
2296 EntityData::QueueMessage(QueueMessageData {
2297 payload: payload.clone(),
2298 priority: None,
2299 enqueued_at_ns,
2300 attempts: 0,
2301 max_attempts: config.max_attempts,
2302 acked: false,
2303 }),
2304 );
2305 let id = store
2306 .insert_auto(queue, entity)
2307 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2308 if let Some(ttl_ms) = config.ttl_ms {
2309 store
2310 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2311 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2312 }
2313 Ok(id)
2314}
2315
2316fn insert_moved_queue_message(
2317 store: &UnifiedStore,
2318 queue: &str,
2319 config: &QueueRuntimeConfig,
2320 message: &QueueMessageView,
2321) -> RedDBResult<EntityId> {
2322 let position = next_queue_position(store, queue, QueueSide::Right)?;
2323 let entity = UnifiedEntity::new(
2324 EntityId::new(0),
2325 EntityKind::QueueMessage {
2326 queue: queue.to_string(),
2327 position,
2328 },
2329 EntityData::QueueMessage(QueueMessageData {
2330 payload: message.payload.clone(),
2331 priority: if config.priority {
2332 Some(message.priority)
2333 } else {
2334 None
2335 },
2336 enqueued_at_ns: message.enqueued_at_ns,
2337 attempts: message.attempts,
2338 max_attempts: message.max_attempts,
2339 acked: false,
2340 }),
2341 );
2342 let id = store
2343 .insert_auto(queue, entity)
2344 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2345 if let Some(ttl_ms) = config.ttl_ms {
2346 store
2347 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2348 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2349 }
2350 Ok(id)
2351}
2352
2353fn queue_projection_default_columns() -> Vec<String> {
2354 [
2355 "id",
2356 "payload",
2357 "priority",
2358 "attempts",
2359 "last_error",
2360 "enqueued_at",
2361 "available_at",
2362 "dlq",
2363 "tenant",
2364 ]
2365 .into_iter()
2366 .map(str::to_string)
2367 .collect()
2368}
2369
2370fn queue_projection_record(
2371 columns: &[String],
2372 message: &QueueMessageView,
2373 dlq: bool,
2374) -> RedDBResult<UnifiedRecord> {
2375 let mut record = UnifiedRecord::new();
2376 for column in columns {
2377 let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
2378 RedDBError::Query(format!("unknown queue projection column '{}'", column))
2379 })?;
2380 record.set(column, value);
2381 }
2382 Ok(record)
2383}
2384
2385fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
2386 match column {
2387 "id" => Some(Value::text(message_id_string(message.id))),
2388 "payload" => Some(message.payload.clone()),
2389 "priority" => Some(Value::Integer(i64::from(message.priority))),
2390 "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
2391 "last_error" => Some(Value::Null),
2392 "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
2393 "available_at" => Some(Value::UnsignedInteger(
2394 message.available_at_ns.unwrap_or(message.enqueued_at_ns),
2395 )),
2396 "dlq" => Some(Value::Boolean(dlq)),
2397 "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
2398 _ => None,
2399 }
2400}
2401
2402fn queue_message_tenant(payload: &Value) -> Option<Value> {
2403 let Value::Json(bytes) = payload else {
2404 return None;
2405 };
2406 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2407 json.get("tenant")
2408 .and_then(crate::json::Value::as_str)
2409 .map(|tenant| Value::text(tenant.to_string()))
2410}
2411
2412fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
2413 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2414 return false;
2415 };
2416 !manager
2417 .query_all(|entity| {
2418 entity.data.as_row().is_some_and(|row| {
2419 row_text(row, "kind").as_deref() == Some("queue_config")
2420 && row_text(row, "dlq").as_deref() == Some(queue)
2421 })
2422 })
2423 .is_empty()
2424}
2425
2426fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
2427 match filter {
2428 Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
2429 .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
2430 Filter::CompareFields { left, op, right } => {
2431 match (
2432 queue_filter_field_value(message, dlq, left),
2433 queue_filter_field_value(message, dlq, right),
2434 ) {
2435 (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
2436 _ => false,
2437 }
2438 }
2439 Filter::And(left, right) => {
2440 queue_message_matches_filter(message, dlq, left)
2441 && queue_message_matches_filter(message, dlq, right)
2442 }
2443 Filter::Or(left, right) => {
2444 queue_message_matches_filter(message, dlq, left)
2445 || queue_message_matches_filter(message, dlq, right)
2446 }
2447 Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
2448 Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
2449 .is_none_or(|value| matches!(value, Value::Null)),
2450 Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
2451 .is_some_and(|value| !matches!(value, Value::Null)),
2452 Filter::In { field, values } => {
2453 queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
2454 values
2455 .iter()
2456 .any(|value| queue_values_equal(&candidate, value))
2457 })
2458 }
2459 Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
2460 .is_some_and(|candidate| {
2461 queue_compare_values(&candidate, low, CompareOp::Ge)
2462 && queue_compare_values(&candidate, high, CompareOp::Le)
2463 }),
2464 Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
2465 .is_some_and(|value| queue_like_matches(&value, pattern)),
2466 Filter::StartsWith { field, prefix } => {
2467 queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
2468 }
2469 Filter::EndsWith { field, suffix } => {
2470 queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
2471 }
2472 Filter::Contains { field, substring } => {
2473 queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
2474 }
2475 Filter::CompareExpr { .. } => false,
2476 }
2477}
2478
2479fn queue_filter_field_value(
2480 message: &QueueMessageView,
2481 dlq: bool,
2482 field: &FieldRef,
2483) -> Option<Value> {
2484 match field {
2485 FieldRef::TableColumn { table, column } if table.is_empty() => {
2486 queue_projection_value(message, dlq, column)
2487 .or_else(|| queue_payload_field_value(&message.payload, column))
2488 }
2489 FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
2490 .or_else(|| queue_payload_field_value(&message.payload, column)),
2491 _ => None,
2492 }
2493}
2494
2495fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
2496 let Value::Json(bytes) = payload else {
2497 return None;
2498 };
2499 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2500 let value = json.get(field)?;
2501 json_value_to_schema_value(value)
2502}
2503
2504fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
2505 if matches!(value, crate::json::Value::Null) {
2506 Some(Value::Null)
2507 } else if let Some(value) = value.as_bool() {
2508 Some(Value::Boolean(value))
2509 } else if let Some(value) = value.as_i64() {
2510 Some(Value::Integer(value))
2511 } else if let Some(value) = value.as_u64() {
2512 Some(Value::UnsignedInteger(value))
2513 } else if let Some(value) = value.as_f64() {
2514 Some(Value::Float(value))
2515 } else if let Some(value) = value.as_str() {
2516 Some(Value::text(value.to_string()))
2517 } else {
2518 Some(Value::Json(value.to_string_compact().into_bytes()))
2519 }
2520}
2521
2522fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
2523 queue_filter_field_value(message, dlq, field).and_then(|value| match value {
2524 Value::Text(value) => Some(value.to_string()),
2525 Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
2526 Value::Integer(value) => Some(value.to_string()),
2527 Value::UnsignedInteger(value) => Some(value.to_string()),
2528 Value::Float(value) => Some(value.to_string()),
2529 Value::Boolean(value) => Some(value.to_string()),
2530 _ => None,
2531 })
2532}
2533
2534fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
2535 match op {
2536 CompareOp::Eq => queue_values_equal(left, right),
2537 CompareOp::Ne => !queue_values_equal(left, right),
2538 CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
2539 CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
2540 CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
2541 CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
2542 }
2543}
2544
2545fn queue_values_equal(left: &Value, right: &Value) -> bool {
2546 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2547 return (left - right).abs() < f64::EPSILON;
2548 }
2549 match (left, right) {
2550 (Value::Text(left), Value::Text(right)) => left == right,
2551 (Value::Boolean(left), Value::Boolean(right)) => left == right,
2552 _ => left == right,
2553 }
2554}
2555
2556fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
2557 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2558 return left.partial_cmp(&right);
2559 }
2560 match (left, right) {
2561 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
2562 _ => None,
2563 }
2564}
2565
2566fn queue_value_number(value: &Value) -> Option<f64> {
2567 match value {
2568 Value::Integer(value) => Some(*value as f64),
2569 Value::UnsignedInteger(value) => Some(*value as f64),
2570 Value::Float(value) => Some(*value),
2571 Value::Text(value) => value.parse().ok(),
2572 _ => None,
2573 }
2574}
2575
2576fn queue_like_matches(value: &str, pattern: &str) -> bool {
2577 if pattern == "%" {
2578 return true;
2579 }
2580 let starts_wild = pattern.starts_with('%');
2581 let ends_wild = pattern.ends_with('%');
2582 let needle = pattern.trim_matches('%');
2583 match (starts_wild, ends_wild) {
2584 (true, true) => value.contains(needle),
2585 (true, false) => value.ends_with(needle),
2586 (false, true) => value.starts_with(needle),
2587 (false, false) => value == needle,
2588 }
2589}
2590
2591pub(super) fn queue_message_view_by_id(
2592 store: &UnifiedStore,
2593 queue: &str,
2594 message_id: EntityId,
2595) -> RedDBResult<Option<QueueMessageView>> {
2596 let manager = queue_manager(store, queue)?;
2597 Ok(manager
2598 .get(message_id)
2599 .and_then(queue_message_view_from_entity)
2600 .map(|mut view| {
2601 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2602 view
2603 }))
2604}
2605
2606pub(super) fn sort_queue_messages(
2607 messages: &mut [QueueMessageView],
2608 config: &QueueRuntimeConfig,
2609 side: QueueSide,
2610) {
2611 messages.sort_by(|left, right| {
2612 if config.priority {
2613 right
2614 .priority
2615 .cmp(&left.priority)
2616 .then_with(|| match side {
2617 QueueSide::Left => left.position.cmp(&right.position),
2618 QueueSide::Right => right.position.cmp(&left.position),
2619 })
2620 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2621 } else {
2622 match side {
2623 QueueSide::Left => left.position.cmp(&right.position),
2624 QueueSide::Right => right.position.cmp(&left.position),
2625 }
2626 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2627 }
2628 });
2629}
2630
2631pub(super) fn next_queue_position(
2632 store: &UnifiedStore,
2633 queue: &str,
2634 side: QueueSide,
2635) -> RedDBResult<u64> {
2636 let messages = load_queue_message_views(store, queue)?;
2637 if messages.is_empty() {
2638 return Ok(QUEUE_POSITION_CENTER);
2639 }
2640 match side {
2641 QueueSide::Left => Ok(messages
2642 .iter()
2643 .map(|message| message.position)
2644 .min()
2645 .unwrap_or(QUEUE_POSITION_CENTER)
2646 .saturating_sub(1)),
2647 QueueSide::Right => Ok(messages
2648 .iter()
2649 .map(|message| message.position)
2650 .max()
2651 .unwrap_or(QUEUE_POSITION_CENTER)
2652 .saturating_add(1)),
2653 }
2654}
2655
2656pub(super) fn increment_queue_attempts(
2657 store: &UnifiedStore,
2658 queue: &str,
2659 message_id: EntityId,
2660) -> RedDBResult<u32> {
2661 let manager = queue_manager(store, queue)?;
2662 let mut entity = manager
2663 .get(message_id)
2664 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2665 match &mut entity.data {
2666 EntityData::QueueMessage(message) => {
2667 message.attempts = message.attempts.saturating_add(1);
2668 let attempts = message.attempts;
2669 manager
2670 .update(entity)
2671 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2672 Ok(attempts)
2673 }
2674 _ => Err(RedDBError::Query(format!(
2675 "entity '{}' is not a queue message",
2676 message_id.raw()
2677 ))),
2678 }
2679}
2680
2681pub(super) fn queue_message_attempts(
2682 store: &UnifiedStore,
2683 queue: &str,
2684 message_id: EntityId,
2685) -> RedDBResult<u32> {
2686 Ok(queue_message_data(store, queue, message_id)?.attempts)
2687}
2688
2689pub(super) fn queue_message_max_attempts(
2690 store: &UnifiedStore,
2691 queue: &str,
2692 message_id: EntityId,
2693) -> RedDBResult<u32> {
2694 Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2695}
2696
2697pub(super) fn queue_message_payload(
2698 store: &UnifiedStore,
2699 queue: &str,
2700 message_id: EntityId,
2701) -> RedDBResult<Value> {
2702 Ok(queue_message_data(store, queue, message_id)?.payload)
2703}
2704
2705pub(super) fn queue_message_pending_any(
2706 store: &UnifiedStore,
2707 queue: &str,
2708 message_id: EntityId,
2709) -> RedDBResult<bool> {
2710 Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2711}
2712
2713pub(super) fn queue_message_pending_for_group(
2714 store: &UnifiedStore,
2715 queue: &str,
2716 group: &str,
2717 message_id: EntityId,
2718) -> RedDBResult<bool> {
2719 Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2720}
2721
2722pub(super) fn queue_message_acked_for_group(
2723 store: &UnifiedStore,
2724 queue: &str,
2725 group: &str,
2726 message_id: EntityId,
2727) -> RedDBResult<bool> {
2728 Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2729}
2730
2731fn queue_manager(
2732 store: &UnifiedStore,
2733 queue: &str,
2734) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2735 store
2736 .get_collection(queue)
2737 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2738}
2739
2740pub(super) fn queue_message_data(
2741 store: &UnifiedStore,
2742 queue: &str,
2743 message_id: EntityId,
2744) -> RedDBResult<QueueMessageData> {
2745 let manager = queue_manager(store, queue)?;
2746 let entity = manager
2747 .get(message_id)
2748 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2749 match entity.data {
2750 EntityData::QueueMessage(message) => Ok(message),
2751 _ => Err(RedDBError::Query(format!(
2752 "entity '{}' is not a queue message",
2753 message_id.raw()
2754 ))),
2755 }
2756}
2757
2758fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2759 let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2760 store
2761 .insert_auto(
2762 QUEUE_META_COLLECTION,
2763 UnifiedEntity::new(
2764 EntityId::new(0),
2765 EntityKind::TableRow {
2766 table: Arc::from(QUEUE_META_COLLECTION),
2767 row_id: 0,
2768 },
2769 EntityData::Row(RowData {
2770 columns: Vec::new(),
2771 named: Some(fields),
2772 schema: None,
2773 }),
2774 ),
2775 )
2776 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2777 Ok(())
2778}
2779
2780pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2781 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2782 return;
2783 };
2784 let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2785 for row in rows {
2786 let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2787 }
2788}
2789
2790pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2791 let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2792}
2793
2794fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2795 format!("{queue}:{}", message_id.raw())
2796}
2797
2798pub(super) fn queue_message_lock_handle(
2799 runtime: &RedDBRuntime,
2800 queue: &str,
2801 message_id: EntityId,
2802) -> Arc<parking_lot::Mutex<()>> {
2803 let key = queue_message_lock_key(queue, message_id);
2804 if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2805 return lock;
2806 }
2807
2808 let mut locks = runtime.inner.queue_message_locks.write();
2809 locks
2810 .entry(key)
2811 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2812 .clone()
2813}
2814
2815pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2816 runtime
2817 .inner
2818 .queue_message_locks
2819 .write()
2820 .remove(&queue_message_lock_key(queue, message_id));
2821}
2822
2823fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2824 let raw = value.strip_prefix('e').unwrap_or(value);
2825 raw.parse::<u64>()
2826 .map(EntityId::new)
2827 .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2828}
2829
2830pub(super) fn resolve_ack_nack_handle(
2836 store: &UnifiedStore,
2837 queue: &str,
2838 group_hint: &str,
2839 message_id_hint: &str,
2840 delivery_id: Option<&str>,
2841) -> RedDBResult<(String, EntityId)> {
2842 if let Some(did) = delivery_id {
2843 return resolve_delivery_id(store, queue, did);
2844 }
2845 if group_hint.is_empty() || message_id_hint.is_empty() {
2846 return Err(RedDBError::Query(
2847 "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2848 .to_string(),
2849 ));
2850 }
2851 log_tuple_deprecation(queue);
2852 let entity = parse_message_id(message_id_hint)?;
2853 Ok((group_hint.to_string(), entity))
2854}
2855
2856fn resolve_delivery_id(
2857 store: &UnifiedStore,
2858 queue: &str,
2859 delivery_id: &str,
2860) -> RedDBResult<(String, EntityId)> {
2861 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2862 return Err(RedDBError::Query(format!(
2863 "delivery_id '{}' does not resolve to a live pending delivery",
2864 delivery_id
2865 )));
2866 };
2867 for entity in manager.query_all(|entity| {
2868 entity.data.as_row().is_some_and(|row| {
2869 row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2870 && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2871 })
2872 }) {
2873 if let Some(row) = entity.data.as_row() {
2874 let row_queue = row_text(row, "queue").unwrap_or_default();
2875 let row_group = row_text(row, "group").unwrap_or_default();
2876 let row_message = row_u64(row, "message_id").unwrap_or(0);
2877 if row_queue != queue {
2878 return Err(RedDBError::Query(format!(
2879 "delivery_id '{}' belongs to queue '{}', not '{}'",
2880 delivery_id, row_queue, queue
2881 )));
2882 }
2883 return Ok((row_group, EntityId::new(row_message)));
2884 }
2885 }
2886 Err(RedDBError::Query(format!(
2887 "delivery_id '{}' does not resolve to a live pending delivery",
2888 delivery_id
2889 )))
2890}
2891
2892fn log_tuple_deprecation(queue: &str) {
2895 use std::sync::atomic::Ordering;
2896 use std::sync::{Mutex, OnceLock};
2897 use std::time::Instant;
2898
2899 static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2900 const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2901
2902 let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2903 let key = (super::impl_core::current_connection_id(), queue.to_string());
2904 let now = Instant::now();
2905 let mut guard = match map.lock() {
2906 Ok(g) => g,
2907 Err(_) => return,
2908 };
2909 let should_emit =
2910 !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2911 if should_emit {
2912 guard.insert(key.clone(), now);
2913 drop(guard);
2914 TUPLE_DEPRECATION_EMITS.fetch_add(1, Ordering::Relaxed);
2915 tracing::warn!(
2916 target: "reddb::queue_lifecycle",
2917 queue = queue,
2918 connection_id = key.0,
2919 "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2920 switch to the server-issued delivery_id (ADR 0026). \
2921 The tuple path will be removed one minor release after introduction.",
2922 );
2923 }
2924}
2925
2926pub static TUPLE_DEPRECATION_EMITS: std::sync::atomic::AtomicU64 =
2932 std::sync::atomic::AtomicU64::new(0);
2933
2934fn message_id_string(message_id: EntityId) -> String {
2935 message_id.raw().to_string()
2936}
2937
2938fn delivered_message_json(
2944 message: crate::runtime::queue_lifecycle::DeliveredMessage,
2945) -> crate::serde_json::Value {
2946 use crate::serde_json::{Map, Value as JsonValue};
2947 let mut obj = Map::new();
2948 obj.insert(
2949 "message_id".to_string(),
2950 JsonValue::String(message_id_string(EntityId::new(message.message_id))),
2951 );
2952 obj.insert(
2953 "payload".to_string(),
2954 crate::presentation::entity_json::storage_value_to_json(&message.payload),
2955 );
2956 obj.insert("consumer".to_string(), JsonValue::String(message.consumer));
2957 obj.insert(
2958 "delivery_count".to_string(),
2959 JsonValue::Number(message.delivery_count as f64),
2960 );
2961 JsonValue::Object(obj)
2962}
2963
2964pub(crate) fn pending_counts_by_group(
2969 store: &UnifiedStore,
2970) -> std::collections::BTreeMap<(String, String), u64> {
2971 let mut counts: std::collections::BTreeMap<(String, String), u64> =
2972 std::collections::BTreeMap::new();
2973 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2974 return counts;
2975 };
2976 for entity in manager.query_all(|entity| {
2977 entity
2978 .data
2979 .as_row()
2980 .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2981 }) {
2982 if let Some(row) = entity.data.as_row() {
2983 let queue = row_text(row, "queue");
2984 let group = row_text(row, "group");
2985 if let (Some(q), Some(g)) = (queue, group) {
2986 *counts.entry((q, g)).or_insert(0) += 1;
2987 }
2988 }
2989 }
2990 counts
2991}
2992
2993pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2994 match row.get_field(field)?.clone() {
2995 Value::Text(value) => Some(value.to_string()),
2996 Value::NodeRef(value) => Some(value),
2997 Value::EdgeRef(value) => Some(value),
2998 Value::TableRef(value) => Some(value),
2999 _ => None,
3000 }
3001}
3002
3003pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
3004 match row.get_field(field)?.clone() {
3005 Value::UnsignedInteger(value) => Some(value),
3006 Value::Integer(value) if value >= 0 => Some(value as u64),
3007 Value::Float(value) if value >= 0.0 => Some(value as u64),
3008 Value::Text(value) => value.parse().ok(),
3009 _ => None,
3010 }
3011}
3012
3013fn row_bool(row: &RowData, field: &str) -> Option<bool> {
3014 match row.get_field(field)?.clone() {
3015 Value::Boolean(value) => Some(value),
3016 Value::Text(value) => match value.to_ascii_lowercase().as_str() {
3017 "true" => Some(true),
3018 "false" => Some(false),
3019 _ => None,
3020 },
3021 _ => None,
3022 }
3023}
3024
3025fn queue_collection_contract(
3026 name: &str,
3027 priority: bool,
3028 ttl_ms: Option<u64>,
3029) -> crate::physical::CollectionContract {
3030 let now = current_unix_ms();
3031 let mut context_index_fields = Vec::new();
3032 if priority {
3033 context_index_fields.push("priority".to_string());
3034 }
3035
3036 crate::physical::CollectionContract {
3037 name: name.to_string(),
3038 declared_model: crate::catalog::CollectionModel::Queue,
3039 schema_mode: crate::catalog::SchemaMode::Dynamic,
3040 origin: crate::physical::ContractOrigin::Explicit,
3041 version: 1,
3042 created_at_unix_ms: now,
3043 updated_at_unix_ms: now,
3044 default_ttl_ms: ttl_ms,
3045 vector_dimension: None,
3046 vector_metric: None,
3047 context_index_fields,
3048 declared_columns: Vec::new(),
3049 table_def: None,
3050 timestamps_enabled: false,
3051 context_index_enabled: false,
3052 metrics_raw_retention_ms: None,
3053 metrics_rollup_policies: Vec::new(),
3054 metrics_tenant_identity: None,
3055 metrics_namespace: None,
3056 append_only: true,
3060 subscriptions: Vec::new(),
3061 analytics_config: Vec::new(),
3062 session_key: None,
3063 session_gap_ms: None,
3064 retention_duration_ms: None,
3065 analytical_storage: None,
3066
3067 ai_policy: None,
3068 }
3069}
3070
3071fn current_unix_ms() -> u128 {
3072 std::time::SystemTime::now()
3073 .duration_since(std::time::UNIX_EPOCH)
3074 .unwrap_or_default()
3075 .as_millis()
3076}
3077
3078pub(super) fn now_ns() -> u64 {
3079 std::time::SystemTime::now()
3080 .duration_since(std::time::UNIX_EPOCH)
3081 .unwrap_or_default()
3082 .as_nanos() as u64
3083}
3084
3085pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
3086 queue_message_metadata(Some(ttl_ms), None)
3087}
3088
3089pub(super) fn queue_message_metadata(
3095 ttl_ms: Option<u64>,
3096 available_at_ns: Option<u64>,
3097) -> Metadata {
3098 let mut fields = HashMap::new();
3099 if let Some(ttl_ms) = ttl_ms {
3100 fields.insert(
3101 "_ttl_ms".to_string(),
3102 if ttl_ms <= i64::MAX as u64 {
3103 MetadataValue::Int(ttl_ms as i64)
3104 } else {
3105 MetadataValue::Timestamp(ttl_ms)
3106 },
3107 );
3108 }
3109 if let Some(at_ns) = available_at_ns {
3110 fields.insert(
3111 "_available_at_ns".to_string(),
3112 MetadataValue::Timestamp(at_ns),
3113 );
3114 }
3115 Metadata::with_fields(fields)
3116}
3117
3118pub(super) fn earliest_future_available_at(store: &UnifiedStore, queue: &str) -> Option<u64> {
3126 let now_ns = now_ns();
3127 let views = load_queue_message_views_with_runtime(None, store, queue).ok()?;
3128 views
3129 .iter()
3130 .filter_map(|v| v.available_at_ns)
3131 .filter(|at| *at > now_ns)
3132 .min()
3133}
3134
3135pub(super) fn set_message_available_at_ns(
3146 store: &UnifiedStore,
3147 queue: &str,
3148 message_id: EntityId,
3149 available_at_ns: Option<u64>,
3150 fallback_ttl_ms: Option<u64>,
3151) -> RedDBResult<()> {
3152 let existing_ttl_ms = store
3153 .get_metadata(queue, message_id)
3154 .and_then(|md| match md.get("_ttl_ms")? {
3155 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3156 MetadataValue::Timestamp(t) => Some(*t),
3157 _ => None,
3158 })
3159 .or(fallback_ttl_ms);
3160 let metadata = queue_message_metadata(existing_ttl_ms, available_at_ns);
3161 match store.set_metadata(queue, message_id, metadata) {
3162 Ok(()) => Ok(()),
3163 Err(crate::storage::StoreError::CollectionNotFound(_)) => Ok(()),
3164 Err(err) => Err(RedDBError::Internal(err.to_string())),
3165 }
3166}
3167
3168pub(super) fn read_message_available_at_ns(
3172 store: &UnifiedStore,
3173 queue: &str,
3174 message_id: EntityId,
3175) -> Option<u64> {
3176 let md = store.get_metadata(queue, message_id)?;
3177 match md.get("_available_at_ns")? {
3178 MetadataValue::Timestamp(t) => Some(*t),
3179 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3180 _ => None,
3181 }
3182}
3183
3184fn estimate_payload_bytes(payload: &Value) -> u64 {
3186 match payload {
3187 Value::Json(v) => v.len() as u64,
3188 Value::Text(s) => s.len() as u64,
3189 _ => 64,
3190 }
3191}
3192
3193#[cfg(test)]
3194mod presence_integration_tests {
3195 use super::*;
3196 use crate::storage::queue::presence::{PresenceState, DEFAULT_PRESENCE_TTL_MS};
3197 use crate::{RedDBOptions, RedDBRuntime};
3198
3199 #[test]
3203 fn queue_read_emits_consumer_presence_heartbeat() {
3204 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3205 rt.execute_query("CREATE QUEUE tasks").unwrap();
3206 rt.execute_query("QUEUE GROUP CREATE tasks workers")
3207 .unwrap();
3208 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
3209 rt.execute_query("QUEUE READ tasks GROUP workers CONSUMER w1")
3210 .unwrap();
3211
3212 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3213 assert_eq!(snap.len(), 1, "exactly one heartbeat recorded");
3214 let row = &snap[0];
3215 assert_eq!(row.queue, "tasks");
3216 assert_eq!(row.group, "workers");
3217 assert_eq!(row.consumer, "w1");
3218 assert_eq!(row.state, PresenceState::Active);
3219
3220 let counts = rt.queue_active_consumer_counts(DEFAULT_PRESENCE_TTL_MS);
3221 assert_eq!(
3222 counts[&("tasks".to_string(), "workers".to_string())],
3223 1,
3224 "active count reflects the live consumer"
3225 );
3226 }
3227
3228 #[test]
3232 fn empty_queue_read_still_heartbeats() {
3233 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3234 rt.execute_query("CREATE QUEUE empty_q").unwrap();
3235 rt.execute_query("QUEUE GROUP CREATE empty_q workers")
3236 .unwrap();
3237 rt.execute_query("QUEUE READ empty_q GROUP workers CONSUMER w1")
3239 .unwrap();
3240
3241 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3242 assert_eq!(
3243 snap.len(),
3244 1,
3245 "empty read still registers consumer presence"
3246 );
3247 assert_eq!(snap[0].state, PresenceState::Active);
3248 assert_eq!(
3249 snap[0].lease_count, 0,
3250 "no messages delivered → zero leases"
3251 );
3252 }
3253}