1use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::runtime::impl_core::{
9 clear_current_auth_identity, clear_current_tenant, current_auth_identity, current_tenant,
10 set_current_auth_identity, set_current_tenant,
11};
12use crate::storage::queue::QueueMode;
13use crate::storage::unified::entity::{QueueMessageData, RowData};
14use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
15
16use super::*;
17
18use super::primary_queue_store::PrimaryQueueStore;
19use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
20use crate::storage::queue::lifecycle::{
21 QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
22};
23
24pub(super) fn runtime_lifecycle(
31 runtime: &RedDBRuntime,
32 queue: &str,
33) -> (
34 QueueLifecycle<PrimaryQueueStore>,
35 PrimaryQueueStore,
36 QueueTxn,
37) {
38 let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
39 let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
40 let txn = primary_for_lifecycle.new_txn();
41 let cfg = primary_for_lifecycle.lifecycle_config(queue);
42 (
43 QueueLifecycle::new(primary_for_lifecycle, cfg),
44 primary_for_lookup,
45 txn,
46 )
47}
48
49pub(crate) const QUEUE_READ_WAIT_CANCELLED: &str =
55 "QUEUE READ WAIT cancelled — server shutting down";
56
57pub(crate) const QUEUE_MAX_WAIT_MS_CONFIG_KEY: &str = "red.config.queue.max_wait_ms";
61
62pub(crate) const QUEUE_MAX_WAIT_MS_DEFAULT: u64 = 60_000;
65
66#[derive(Debug)]
77pub(crate) enum RedwireWaitOutcome {
78 Delivered(Vec<crate::serde_json::Value>),
79 TimedOut,
80 Cancelled,
81}
82
83pub(super) fn queue_wait_scope() -> String {
88 crate::runtime::impl_core::current_tenant().unwrap_or_default()
89}
90
91fn with_redwire_wait_context<T>(
92 auth_identity: &Option<(String, crate::auth::Role)>,
93 tenant: &Option<String>,
94 f: impl FnOnce() -> T,
95) -> T {
96 let previous_auth = current_auth_identity();
97 let previous_tenant = current_tenant();
98 match tenant {
99 Some(t) => set_current_tenant(t.clone()),
100 None => clear_current_tenant(),
101 }
102 match auth_identity {
103 Some((username, role)) => set_current_auth_identity(username.clone(), *role),
104 None => clear_current_auth_identity(),
105 }
106 let result = f();
107 match previous_tenant {
108 Some(t) => set_current_tenant(t),
109 None => clear_current_tenant(),
110 }
111 match previous_auth {
112 Some((username, role)) => set_current_auth_identity(username, role),
113 None => clear_current_auth_identity(),
114 }
115 result
116}
117
118fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
122 use crate::storage::query::ast::QueueSide as Ast;
123 match side {
124 Ast::Left => LcQueueSide::Left,
125 Ast::Right => LcQueueSide::Right,
126 }
127}
128
129fn map_qse(err: QueueStoreError) -> RedDBError {
133 match err {
134 QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
135 "delivery_id '{id}' does not resolve to a live pending delivery"
136 )),
137 QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
138 QueueStoreError::ReplicaImmutable => {
139 RedDBError::Internal("replica QueueStore is immutable".to_string())
140 }
141 }
142}
143
144pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
151
152pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
154
155pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
157
158const OUTBOX_WARN_BYTES: u64 = 1 << 30;
160
161const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
163
164static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
166
167const QUEUE_META_COLLECTION: &str = "red_queue_meta";
168const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
169const WORK_DEFAULT_GROUP: &str = "_work_default";
170const FANOUT_GROUP_PREFIX: &str = "_fanout_";
171
172#[derive(Debug, Clone)]
173pub(super) struct QueueRuntimeConfig {
174 pub(super) mode: QueueMode,
175 pub(super) priority: bool,
176 pub(super) max_size: Option<usize>,
177 pub(super) ttl_ms: Option<u64>,
178 pub(super) dlq: Option<String>,
179 pub(super) max_attempts: u32,
180 pub(super) lock_deadline_ms: u64,
181 pub(super) in_flight_cap_per_group: u32,
182 pub(super) retry_delay_ms: Option<u64>,
187}
188
189#[derive(Debug, Clone)]
190struct QueueGroupEntry {
191 entity_id: EntityId,
192 group: String,
193}
194
195#[derive(Debug, Clone)]
196pub(super) struct QueuePendingEntry {
197 pub(super) entity_id: EntityId,
198 group: String,
199 pub(super) message_id: EntityId,
200 consumer: String,
201 pub(super) delivered_at_ns: u64,
202 pub(super) delivery_count: u32,
203}
204
205#[derive(Debug, Clone)]
206pub(super) struct QueueAckEntry {
207 entity_id: EntityId,
208 group: String,
209 pub(super) message_id: EntityId,
210}
211
212#[derive(Debug, Clone)]
213pub(super) struct QueueMessageView {
214 pub(super) id: EntityId,
215 position: u64,
216 priority: i32,
217 pub(super) payload: Value,
218 attempts: u32,
219 pub(super) max_attempts: u32,
220 enqueued_at_ns: u64,
221 pub(super) available_at_ns: Option<u64>,
225}
226
227impl QueueMessageView {
228 pub(super) fn is_available_now(&self) -> bool {
233 match self.available_at_ns {
234 Some(at) => at <= now_ns(),
235 None => true,
236 }
237 }
238}
239
240impl RedDBRuntime {
241 pub(super) fn group_read_with_optional_wait(
250 &self,
251 queue: &str,
252 group: &str,
253 consumer: &str,
254 count: usize,
255 wait_ms: Option<u64>,
256 ) -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
257 let do_read =
258 |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
259 let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
260 lifecycle
261 .group_read(&txn, queue, group, consumer, count)
262 .map_err(map_qse)
263 };
264
265 let delivered = do_read(self)?;
266 let Some(wait_ms) = wait_ms else {
267 return Ok(delivered);
268 };
269 if !delivered.is_empty() {
270 return Ok(delivered);
271 }
272 let registry = self.queue_wait_registry();
282 let scope = queue_wait_scope();
283 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
284 let telemetry = self.queue_telemetry();
285 telemetry.record_wait_started(&scope, queue);
286 let wait_start = std::time::Instant::now();
287 let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
288 let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
289 telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
290 };
291 loop {
292 let snapshot = registry.snapshot(&scope, queue);
296 let delivered = do_read(self)?;
297 if !delivered.is_empty() {
298 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
299 return Ok(delivered);
300 }
301 if registry.is_cancelled() {
302 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
303 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
304 }
305 if std::time::Instant::now() >= deadline {
306 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
307 return Ok(Vec::new());
308 }
309 let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
319 Some(at_ns) => {
320 let now_ns = now_ns();
321 if at_ns <= now_ns {
322 deadline.min(std::time::Instant::now())
324 } else {
325 let wait_ns = at_ns - now_ns;
326 let due_instant =
327 std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
328 deadline.min(due_instant)
329 }
330 }
331 None => deadline,
332 };
333 match registry.wait_until(&snapshot, park_deadline) {
334 crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
335 crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
336 if std::time::Instant::now() >= deadline {
340 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
341 return Ok(Vec::new());
342 }
343 continue;
344 }
345 crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
346 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
347 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
348 }
349 }
350 }
351 }
352
353 pub(crate) async fn redwire_queue_wait_json(
370 &self,
371 queue: &str,
372 group: Option<&str>,
373 consumer: &str,
374 count: usize,
375 wait_ms: u64,
376 auth_identity: Option<(String, crate::auth::Role)>,
377 tenant: Option<String>,
378 ) -> RedDBResult<RedwireWaitOutcome> {
379 let group_owned: RedDBResult<String> =
380 with_redwire_wait_context(&auth_identity, &tenant, || {
381 let expr = crate::storage::query::ast::QueryExpr::QueueCommand(
382 crate::storage::query::ast::QueueCommand::GroupRead {
383 queue: queue.to_string(),
384 group: group.map(str::to_string),
385 consumer: consumer.to_string(),
386 count,
387 wait_ms: Some(wait_ms),
388 },
389 );
390 self.check_query_privilege(&expr)
391 .map_err(RedDBError::Query)?;
392 let store = self.inner.db.store();
393 ensure_queue_exists(store.as_ref(), queue)?;
394 let config = load_queue_config(store.as_ref(), queue);
395 resolve_read_group(store.as_ref(), queue, group, consumer, &config)
396 });
397 let group_owned = group_owned?;
398 let group_ref = group_owned.as_str();
399
400 let do_read =
401 |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
402 with_redwire_wait_context(&auth_identity, &tenant, || {
403 let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
404 lifecycle
405 .group_read(&txn, queue, group_ref, consumer, count)
406 .map_err(map_qse)
407 })
408 };
409
410 let render = |delivered: Vec<crate::runtime::queue_lifecycle::DeliveredMessage>| {
411 RedwireWaitOutcome::Delivered(
412 delivered.into_iter().map(delivered_message_json).collect(),
413 )
414 };
415
416 let delivered = do_read(self)?;
418 if !delivered.is_empty() {
419 return Ok(render(delivered));
420 }
421
422 let registry = self.queue_wait_registry();
423 let scope = with_redwire_wait_context(&auth_identity, &tenant, queue_wait_scope);
424 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
425 let telemetry = self.queue_telemetry();
426 telemetry.record_wait_started(&scope, queue);
427 let wait_start = std::time::Instant::now();
428 tracing::debug!(
429 target: "reddb::redwire::queue_wait",
430 queue,
431 group = group_ref,
432 consumer,
433 count,
434 wait_ms,
435 scope = scope.as_str(),
436 "redwire queue wait parked"
437 );
438 let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
439 let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
440 telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
441 tracing::debug!(
442 target: "reddb::redwire::queue_wait",
443 queue,
444 group = group_ref,
445 consumer,
446 count,
447 wait_ms,
448 scope = scope.as_str(),
449 outcome = outcome.as_str(),
450 duration_ms = elapsed_ms,
451 "redwire queue wait resolved"
452 );
453 };
454 loop {
455 let waiter = registry.async_waiter(&scope, queue);
459 let delivered = do_read(self)?;
460 if !delivered.is_empty() {
461 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
462 return Ok(render(delivered));
463 }
464 if registry.is_cancelled() {
465 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
466 return Ok(RedwireWaitOutcome::Cancelled);
467 }
468 if std::time::Instant::now() >= deadline {
469 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
470 return Ok(RedwireWaitOutcome::TimedOut);
471 }
472 let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
473 Some(at_ns) => {
474 let now_ns = now_ns();
475 if at_ns <= now_ns {
476 deadline.min(std::time::Instant::now())
477 } else {
478 let wait_ns = at_ns - now_ns;
479 let due_instant =
480 std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
481 deadline.min(due_instant)
482 }
483 }
484 None => deadline,
485 };
486 match registry.wait_until_async(&waiter, park_deadline).await {
491 crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
492 crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
493 if std::time::Instant::now() >= deadline {
494 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
495 return Ok(RedwireWaitOutcome::TimedOut);
496 }
497 continue;
498 }
499 crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
500 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
501 return Ok(RedwireWaitOutcome::Cancelled);
502 }
503 }
504 }
505 }
506
507 pub(crate) fn redwire_queue_wait_cap_check(&self, wait_ms: u64) -> Result<(), String> {
516 let cap = self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
517 if wait_ms > cap {
518 Err(format!(
519 "queue-wait WAIT {wait_ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
520 ))
521 } else {
522 Ok(())
523 }
524 }
525
526 pub(crate) fn enqueue_event_payload(
527 &self,
528 queue: &str,
529 payload: Value,
530 ) -> RedDBResult<EntityId> {
531 let store = self.inner.db.store();
532 if store.get_collection(queue).is_none() {
534 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
535 }
536
537 let payload_bytes = estimate_payload_bytes(&payload);
539 let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
540
541 if outbox_bytes > OUTBOX_MAX_BYTES {
543 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
544 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
545 return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
546 }
547
548 if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
550 tracing::warn!(
551 outbox_bytes,
552 warn_threshold = OUTBOX_WARN_BYTES,
553 "event outbox approaching capacity warning threshold"
554 );
555 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
556 queue: queue.to_string(),
557 dlq: format!("{queue}_outbox_dlq"),
558 reason: "outbox_warn_bytes_exceeded".to_string(),
559 }
560 .emit_global();
561 }
562
563 let config = load_queue_config(store.as_ref(), queue);
564
565 if let Some(max_size) = config.max_size {
567 let current_len = load_queue_message_views(store.as_ref(), queue)
568 .unwrap_or_default()
569 .len();
570 if current_len >= max_size {
571 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
572 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
573 return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
574 }
575 if current_len * 10 >= max_size * 8 {
577 tracing::warn!(
578 queue = %queue,
579 size = current_len,
580 max = max_size,
581 "event target queue near capacity"
582 );
583 }
584 }
585
586 let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
587 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
588 Ok(id)
589 }
590
591 fn route_event_to_outbox_dlq(
593 &self,
594 queue: &str,
595 payload: Value,
596 reason: &str,
597 ) -> RedDBResult<EntityId> {
598 let dlq_name = format!("{queue}_outbox_dlq");
599 EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
600
601 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
602 queue: queue.to_string(),
603 dlq: dlq_name.clone(),
604 reason: reason.to_string(),
605 }
606 .emit_global();
607
608 let store = self.inner.db.store();
609 if store.get_collection(&dlq_name).is_none() {
610 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
611 }
612 let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
613 let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
614 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
615 Ok(id)
616 }
617
618 fn enqueue_event_payload_raw(
620 &self,
621 store: &UnifiedStore,
622 queue: &str,
623 config: &QueueRuntimeConfig,
624 payload: Value,
625 ) -> RedDBResult<EntityId> {
626 let position = next_queue_position(store, queue, QueueSide::Right)?;
627 let mut entity = UnifiedEntity::new(
628 EntityId::new(0),
629 EntityKind::QueueMessage {
630 queue: queue.to_string(),
631 position,
632 },
633 EntityData::QueueMessage(QueueMessageData {
634 payload,
635 priority: None,
636 enqueued_at_ns: now_ns(),
637 attempts: 0,
638 max_attempts: config.max_attempts,
639 acked: false,
640 }),
641 );
642 if let Some(xid) = self.current_xid() {
643 entity.set_xmin(xid);
644 }
645 let id = store
646 .insert_auto(queue, entity)
647 .map_err(|err| RedDBError::Internal(err.to_string()))?;
648 if let Some(ttl_ms) = config.ttl_ms {
649 store
650 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
651 .map_err(|err| RedDBError::Internal(err.to_string()))?;
652 }
653 self.invalidate_result_cache_for_table(queue);
654 Ok(id)
655 }
656
657 pub fn execute_create_queue(
658 &self,
659 raw_query: &str,
660 query: &CreateQueueQuery,
661 ) -> RedDBResult<RuntimeQueryResult> {
662 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
663 if query.dlq.as_deref() == Some(query.name.as_str()) {
664 return Err(RedDBError::Query(
665 "dead-letter queue must be different from the source queue".to_string(),
666 ));
667 }
668
669 let store = self.inner.db.store();
670 let exists = store.get_collection(&query.name).is_some();
671 if exists {
672 if query.if_not_exists {
673 return Ok(RuntimeQueryResult::ok_message(
674 raw_query.to_string(),
675 &format!("queue '{}' already exists", query.name),
676 "create",
677 ));
678 }
679 return Err(RedDBError::Query(format!(
680 "queue '{}' already exists",
681 query.name
682 )));
683 }
684
685 store
686 .create_collection(&query.name)
687 .map_err(|err| RedDBError::Internal(err.to_string()))?;
688 if let Some(ttl_ms) = query.ttl_ms {
689 self.inner
690 .db
691 .set_collection_default_ttl_ms(&query.name, ttl_ms);
692 }
693 self.inner
694 .db
695 .save_collection_contract(queue_collection_contract(
696 &query.name,
697 query.priority,
698 query.ttl_ms,
699 ))
700 .map_err(|err| RedDBError::Internal(err.to_string()))?;
701 save_queue_config(
702 store.as_ref(),
703 &query.name,
704 &QueueRuntimeConfig {
705 mode: query.mode,
706 priority: query.priority,
707 max_size: query.max_size,
708 ttl_ms: query.ttl_ms,
709 dlq: query.dlq.clone(),
710 max_attempts: query.max_attempts,
711 lock_deadline_ms: query.lock_deadline_ms,
712 in_flight_cap_per_group: query.in_flight_cap_per_group,
713 retry_delay_ms: query.retry_delay_ms,
714 },
715 )?;
716
717 if let Some(dlq) = &query.dlq {
718 if store.get_collection(dlq).is_none() {
719 store
720 .create_collection(dlq)
721 .map_err(|err| RedDBError::Internal(err.to_string()))?;
722 self.inner
723 .db
724 .save_collection_contract(queue_collection_contract(dlq, false, None))
725 .map_err(|err| RedDBError::Internal(err.to_string()))?;
726 }
727 }
728
729 self.invalidate_result_cache();
730 self.inner
731 .db
732 .persist_metadata()
733 .map_err(|err| RedDBError::Internal(err.to_string()))?;
734 let mut type_tags = Vec::new();
739 if let Some(dlq) = &query.dlq {
740 type_tags.push(format!("dlq:{}", dlq));
741 }
742 self.schema_vocabulary_apply(
743 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
744 collection: query.name.clone(),
745 columns: vec!["payload".to_string()],
746 type_tags,
747 description: None,
748 },
749 );
750
751 let mut msg = format!("queue '{}' created", query.name);
752 msg.push_str(&format!(" (mode={})", query.mode.as_str()));
753 if query.priority {
754 msg.push_str(" (priority)");
755 }
756 if let Some(max_size) = query.max_size {
757 msg.push_str(&format!(" (max_size={max_size})"));
758 }
759 if let Some(ttl_ms) = query.ttl_ms {
760 msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
761 }
762 if let Some(dlq) = &query.dlq {
763 msg.push_str(&format!(
764 " (dlq={dlq}, max_attempts={})",
765 query.max_attempts
766 ));
767 }
768
769 Ok(RuntimeQueryResult::ok_message(
770 raw_query.to_string(),
771 &msg,
772 "create",
773 ))
774 }
775
776 pub fn execute_alter_queue(
777 &self,
778 raw_query: &str,
779 query: &AlterQueueQuery,
780 ) -> RedDBResult<RuntimeQueryResult> {
781 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
782 let store = self.inner.db.store();
783 ensure_queue_exists(store.as_ref(), &query.name)?;
784
785 let mut config = load_queue_config(store.as_ref(), &query.name);
786 let mut summary: Vec<String> = Vec::new();
787
788 if let Some(new_mode) = query.mode {
789 let pending =
790 load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
791 if !pending.is_empty() {
792 tracing::warn!(
793 queue = %query.name,
794 pending_count = pending.len(),
795 new_mode = %new_mode.as_str(),
796 "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
797 new reads use {}",
798 pending.len(),
799 new_mode.as_str(),
800 );
801 }
802 config.mode = new_mode;
803 summary.push(format!("mode={}", new_mode.as_str()));
804 }
805 if let Some(max_attempts) = query.max_attempts {
806 config.max_attempts = max_attempts;
807 summary.push(format!("max_attempts={max_attempts}"));
808 }
809 if let Some(lock_deadline_ms) = query.lock_deadline_ms {
810 config.lock_deadline_ms = lock_deadline_ms;
811 summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
812 }
813 if let Some(in_flight_cap) = query.in_flight_cap_per_group {
814 config.in_flight_cap_per_group = in_flight_cap;
815 summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
816 }
817 if let Some(dlq) = &query.dlq {
818 if dlq == &query.name {
819 return Err(RedDBError::Query(
820 "dead-letter queue must be different from the source queue".to_string(),
821 ));
822 }
823 config.dlq = Some(dlq.clone());
824 summary.push(format!("dlq={dlq}"));
825 }
826 if let Some(retry_delay_ms) = query.retry_delay_ms {
827 config.retry_delay_ms = if retry_delay_ms == 0 {
828 None
829 } else {
830 Some(retry_delay_ms)
831 };
832 summary.push(format!("retry_delay_ms={retry_delay_ms}"));
833 }
834
835 save_queue_config(store.as_ref(), &query.name, &config)?;
836
837 self.invalidate_result_cache();
838 self.inner
839 .db
840 .persist_metadata()
841 .map_err(|err| RedDBError::Internal(err.to_string()))?;
842
843 Ok(RuntimeQueryResult::ok_message(
844 raw_query.to_string(),
845 &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
846 "alter",
847 ))
848 }
849
850 pub fn execute_drop_queue(
851 &self,
852 raw_query: &str,
853 query: &DropQueueQuery,
854 ) -> RedDBResult<RuntimeQueryResult> {
855 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
856 let store = self.inner.db.store();
857 if super::impl_ddl::is_system_schema_name(&query.name) {
858 return Err(RedDBError::Query("system schema is read-only".to_string()));
859 }
860 if store.get_collection(&query.name).is_none() {
861 if query.if_exists {
862 return Ok(RuntimeQueryResult::ok_message(
863 raw_query.to_string(),
864 &format!("queue '{}' does not exist", query.name),
865 "drop",
866 ));
867 }
868 return Err(RedDBError::NotFound(format!(
869 "queue '{}' not found",
870 query.name
871 )));
872 }
873 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
874 &query.name,
875 &self.inner.db.catalog_model_snapshot(),
876 )?;
877 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
878 crate::catalog::CollectionModel::Queue,
879 actual,
880 )?;
881
882 store
883 .drop_collection(&query.name)
884 .map_err(|err| RedDBError::Internal(err.to_string()))?;
885 self.inner.db.clear_collection_default_ttl_ms(&query.name);
886 self.inner
887 .db
888 .remove_collection_contract(&query.name)
889 .map_err(|err| RedDBError::Internal(err.to_string()))?;
890 remove_queue_metadata(store.as_ref(), &query.name);
891 self.invalidate_result_cache();
892 self.inner
893 .db
894 .persist_metadata()
895 .map_err(|err| RedDBError::Internal(err.to_string()))?;
896 self.schema_vocabulary_apply(
898 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
899 collection: query.name.clone(),
900 },
901 );
902
903 Ok(RuntimeQueryResult::ok_message(
904 raw_query.to_string(),
905 &format!("queue '{}' dropped", query.name),
906 "drop",
907 ))
908 }
909
910 pub fn execute_queue_command(
911 &self,
912 raw_query: &str,
913 cmd: &QueueCommand,
914 ) -> RedDBResult<RuntimeQueryResult> {
915 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
916 match cmd {
917 QueueCommand::Push {
918 queue,
919 value,
920 side,
921 priority,
922 available,
923 } => {
924 let store = self.inner.db.store();
925 ensure_queue_exists(store.as_ref(), queue)?;
926 let config = load_queue_config(store.as_ref(), queue);
927 if priority.is_some() && !config.priority {
928 return Err(RedDBError::Query(format!(
929 "queue '{}' is not a priority queue",
930 queue
931 )));
932 }
933 if let Some(max_size) = config.max_size {
934 let current_len =
935 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
936 .len();
937 if current_len >= max_size {
938 return Err(RedDBError::Query(format!(
939 "queue '{}' is full (max_size={max_size})",
940 queue
941 )));
942 }
943 }
944
945 let position = next_queue_position(store.as_ref(), queue, *side)?;
946 let mut entity = UnifiedEntity::new(
947 EntityId::new(0),
948 EntityKind::QueueMessage {
949 queue: queue.clone(),
950 position,
951 },
952 EntityData::QueueMessage(QueueMessageData {
953 payload: value.clone(),
954 priority: if config.priority { *priority } else { None },
955 enqueued_at_ns: now_ns(),
956 attempts: 0,
957 max_attempts: config.max_attempts,
958 acked: false,
959 }),
960 );
961 if let Some(xid) = self.current_xid() {
964 entity.set_xmin(xid);
965 }
966 let id = store
967 .insert_auto(queue, entity)
968 .map_err(|err| RedDBError::Internal(err.to_string()))?;
969 let available_at_ns = available.map(|a| match a {
974 crate::storage::query::ast::QueueAvailability::DelayMs(ms) => {
975 now_ns().saturating_add(ms.saturating_mul(1_000_000))
976 }
977 crate::storage::query::ast::QueueAvailability::AtUnixMs(ms) => {
978 ms.saturating_mul(1_000_000)
979 }
980 });
981 if config.ttl_ms.is_some() || available_at_ns.is_some() {
982 store
983 .set_metadata(
984 queue,
985 id,
986 queue_message_metadata(config.ttl_ms, available_at_ns),
987 )
988 .map_err(|err| RedDBError::Internal(err.to_string()))?;
989 }
990 self.record_queue_wake(&queue_wait_scope(), queue);
995 self.invalidate_result_cache();
996
997 let mut result = UnifiedResult::with_columns(vec![
998 "message_id".into(),
999 "side".into(),
1000 "queue".into(),
1001 ]);
1002 let mut record = UnifiedRecord::new();
1003 record.set("message_id", Value::text(message_id_string(id)));
1004 record.set(
1005 "side",
1006 Value::text(match side {
1007 QueueSide::Left => "left".to_string(),
1008 QueueSide::Right => "right".to_string(),
1009 }),
1010 );
1011 record.set("queue", Value::text(queue.clone()));
1012 result.push(record);
1013
1014 Ok(RuntimeQueryResult {
1015 query: raw_query.to_string(),
1016 mode: QueryMode::Sql,
1017 statement: "queue_push",
1018 engine: "runtime-queue",
1019 result,
1020 affected_rows: 1,
1021 statement_type: "insert",
1022 bookmark: None,
1023 })
1024 }
1025 QueueCommand::Pop { queue, side, count } => {
1026 let store = self.inner.db.store();
1027 ensure_queue_exists(store.as_ref(), queue)?;
1028 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1029 let popped = lifecycle
1030 .pop(queue, ast_side_to_lc(*side), *count, &txn)
1031 .map_err(map_qse)?;
1032
1033 let mut result =
1034 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1035 for (message_id, payload) in &popped {
1036 let mut record = UnifiedRecord::new();
1037 record.set(
1038 "message_id",
1039 Value::text(message_id_string(EntityId::new(*message_id))),
1040 );
1041 record.set("payload", payload.clone());
1042 result.push(record);
1043 }
1044 let popped_count = popped.len() as u64;
1045 if popped_count > 0 {
1046 self.invalidate_result_cache();
1047 }
1048
1049 Ok(RuntimeQueryResult {
1050 query: raw_query.to_string(),
1051 mode: QueryMode::Sql,
1052 statement: "queue_pop",
1053 engine: "runtime-queue",
1054 result,
1055 affected_rows: popped_count,
1056 statement_type: "delete",
1057 bookmark: None,
1058 })
1059 }
1060 QueueCommand::Peek { queue, count } => {
1061 let store = self.inner.db.store();
1062 ensure_queue_exists(store.as_ref(), queue)?;
1063 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1064 let messages = lifecycle.peek(queue, *count, &txn);
1065
1066 let mut result =
1067 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1068 for message in messages {
1069 let mut record = UnifiedRecord::new();
1070 record.set(
1071 "message_id",
1072 Value::text(message_id_string(EntityId::new(message.message_id))),
1073 );
1074 record.set("payload", message.payload);
1075 result.push(record);
1076 }
1077
1078 Ok(RuntimeQueryResult {
1079 query: raw_query.to_string(),
1080 mode: QueryMode::Sql,
1081 statement: "queue_peek",
1082 engine: "runtime-queue",
1083 result,
1084 affected_rows: 0,
1085 statement_type: "select",
1086 bookmark: None,
1087 })
1088 }
1089 QueueCommand::Len { queue } => {
1090 let store = self.inner.db.store();
1091 ensure_queue_exists(store.as_ref(), queue)?;
1092 let count =
1093 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
1094 as u64;
1095 let mut result = UnifiedResult::with_columns(vec!["len".into()]);
1096 let mut record = UnifiedRecord::new();
1097 record.set("len", Value::UnsignedInteger(count));
1098 result.push(record);
1099
1100 Ok(RuntimeQueryResult {
1101 query: raw_query.to_string(),
1102 mode: QueryMode::Sql,
1103 statement: "queue_len",
1104 engine: "runtime-queue",
1105 result,
1106 affected_rows: 0,
1107 statement_type: "select",
1108 bookmark: None,
1109 })
1110 }
1111 QueueCommand::Purge { queue } => {
1112 let store = self.inner.db.store();
1113 ensure_queue_exists(store.as_ref(), queue)?;
1114 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1115 let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
1116 if count > 0 {
1117 self.invalidate_result_cache();
1118 }
1119
1120 Ok(RuntimeQueryResult::ok_message(
1121 raw_query.to_string(),
1122 &format!("{count} messages purged from queue '{queue}'"),
1123 "delete",
1124 ))
1125 }
1126 QueueCommand::GroupCreate { queue, group } => {
1127 let store = self.inner.db.store();
1128 ensure_queue_exists(store.as_ref(), queue)?;
1129 if queue_group_exists(store.as_ref(), queue, group)? {
1130 return Ok(RuntimeQueryResult::ok_message(
1131 raw_query.to_string(),
1132 &format!(
1133 "consumer group '{}' already exists on queue '{}'",
1134 group, queue
1135 ),
1136 "create",
1137 ));
1138 }
1139 save_queue_group(store.as_ref(), queue, group)?;
1140 self.invalidate_result_cache();
1141
1142 Ok(RuntimeQueryResult::ok_message(
1143 raw_query.to_string(),
1144 &format!("consumer group '{}' created on queue '{}'", group, queue),
1145 "create",
1146 ))
1147 }
1148 QueueCommand::GroupRead {
1149 queue,
1150 group,
1151 consumer,
1152 count,
1153 wait_ms,
1154 } => {
1155 let store = self.inner.db.store();
1156 ensure_queue_exists(store.as_ref(), queue)?;
1157 if let Some(ms) = *wait_ms {
1163 if self.current_xid().is_some() {
1164 return Err(RedDBError::Query(
1165 "QUEUE READ … WAIT is autocommit-only: refusing to park inside an explicit transaction (BEGIN/COMMIT)"
1166 .to_string(),
1167 ));
1168 }
1169 let cap =
1170 self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
1171 if ms > cap {
1172 return Err(RedDBError::Query(format!(
1173 "QUEUE READ … WAIT {ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
1174 )));
1175 }
1176 }
1177 let config = load_queue_config(store.as_ref(), queue);
1181 let group_owned =
1182 resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
1183 let group_ref = group_owned.as_str();
1184 let delivered = self
1185 .group_read_with_optional_wait(queue, group_ref, consumer, *count, *wait_ms)?;
1186
1187 {
1191 let lease_count = u32::try_from(delivered.len()).unwrap_or(u32::MAX);
1192 let now_ns = std::time::SystemTime::now()
1193 .duration_since(std::time::UNIX_EPOCH)
1194 .map(|d| d.as_nanos() as u64)
1195 .unwrap_or(0);
1196 self.queue_presence().heartbeat(
1197 queue,
1198 group_ref,
1199 consumer,
1200 lease_count,
1201 now_ns,
1202 );
1203 }
1204
1205 let mut result = UnifiedResult::with_columns(vec![
1206 "message_id".into(),
1207 "payload".into(),
1208 "consumer".into(),
1209 "delivery_count".into(),
1210 "attempts".into(),
1211 ]);
1212
1213 for message in delivered {
1214 let mut record = UnifiedRecord::new();
1215 record.set(
1216 "message_id",
1217 Value::text(message_id_string(EntityId::new(message.message_id))),
1218 );
1219 record.set("payload", message.payload);
1220 record.set("consumer", Value::text(message.consumer));
1221 record.set(
1222 "delivery_count",
1223 Value::UnsignedInteger(u64::from(message.delivery_count)),
1224 );
1225 record.set(
1226 "attempts",
1227 Value::UnsignedInteger(u64::from(message.delivery_count)),
1228 );
1229 result.push(record);
1230 }
1231 if !result.records.is_empty() {
1232 self.invalidate_result_cache();
1233 }
1234
1235 Ok(RuntimeQueryResult {
1236 query: raw_query.to_string(),
1237 mode: QueryMode::Sql,
1238 statement: "queue_group_read",
1239 engine: "runtime-queue",
1240 result,
1241 affected_rows: 0,
1242 statement_type: "select",
1243 bookmark: None,
1244 })
1245 }
1246 QueueCommand::Pending { queue, group } => {
1247 let store = self.inner.db.store();
1248 ensure_queue_exists(store.as_ref(), queue)?;
1249 require_queue_group(store.as_ref(), queue, group)?;
1250 let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
1251 pending.sort_by_key(|entry| entry.delivered_at_ns);
1252 let current_time_ns = now_ns();
1253
1254 let mut result = UnifiedResult::with_columns(vec![
1255 "message_id".into(),
1256 "consumer".into(),
1257 "delivered_at_ns".into(),
1258 "delivery_count".into(),
1259 "idle_ms".into(),
1260 ]);
1261 for entry in pending {
1262 let mut record = UnifiedRecord::new();
1263 record.set(
1264 "message_id",
1265 Value::text(message_id_string(entry.message_id)),
1266 );
1267 record.set("consumer", Value::text(entry.consumer));
1268 record.set(
1269 "delivered_at_ns",
1270 Value::UnsignedInteger(entry.delivered_at_ns),
1271 );
1272 record.set(
1273 "delivery_count",
1274 Value::UnsignedInteger(u64::from(entry.delivery_count)),
1275 );
1276 record.set(
1277 "idle_ms",
1278 Value::UnsignedInteger(
1279 current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
1280 ),
1281 );
1282 result.push(record);
1283 }
1284
1285 Ok(RuntimeQueryResult {
1286 query: raw_query.to_string(),
1287 mode: QueryMode::Sql,
1288 statement: "queue_pending",
1289 engine: "runtime-queue",
1290 result,
1291 affected_rows: 0,
1292 statement_type: "select",
1293 bookmark: None,
1294 })
1295 }
1296 QueueCommand::Claim {
1297 queue,
1298 group,
1299 consumer,
1300 min_idle_ms,
1301 } => {
1302 let store = self.inner.db.store();
1303 ensure_queue_exists(store.as_ref(), queue)?;
1304 require_queue_group(store.as_ref(), queue, group)?;
1305 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1306 let delivered = lifecycle
1307 .claim_delivering(queue, consumer, *min_idle_ms, &txn)
1308 .map_err(map_qse)?;
1309
1310 let mut result = UnifiedResult::with_columns(vec![
1311 "message_id".into(),
1312 "payload".into(),
1313 "consumer".into(),
1314 "delivery_count".into(),
1315 ]);
1316
1317 for message in delivered {
1318 let mut record = UnifiedRecord::new();
1319 record.set(
1320 "message_id",
1321 Value::text(message_id_string(EntityId::new(message.message_id))),
1322 );
1323 record.set("payload", message.payload);
1324 record.set("consumer", Value::text(message.consumer));
1325 record.set(
1326 "delivery_count",
1327 Value::UnsignedInteger(u64::from(message.delivery_count)),
1328 );
1329 result.push(record);
1330 }
1331 if !result.records.is_empty() {
1332 self.invalidate_result_cache();
1333 }
1334 let affected_rows = result.records.len() as u64;
1335
1336 Ok(RuntimeQueryResult {
1337 query: raw_query.to_string(),
1338 mode: QueryMode::Sql,
1339 statement: "queue_claim",
1340 engine: "runtime-queue",
1341 result,
1342 affected_rows,
1343 statement_type: "update",
1344 bookmark: None,
1345 })
1346 }
1347 QueueCommand::Ack {
1348 queue,
1349 group,
1350 message_id,
1351 delivery_id,
1352 } => {
1353 let store = self.inner.db.store();
1354 ensure_queue_exists(store.as_ref(), queue)?;
1355 let (group_owned, message_entity) = resolve_ack_nack_handle(
1356 store.as_ref(),
1357 queue,
1358 group,
1359 message_id,
1360 delivery_id.as_deref(),
1361 )?;
1362 let group_ref = group_owned.as_str();
1363 require_queue_group(store.as_ref(), queue, group_ref)?;
1364 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1365 let did = match delivery_id.as_deref() {
1366 Some(d) => d.to_string(),
1367 None => ps
1368 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1369 .ok_or_else(|| {
1370 RedDBError::NotFound(format!(
1371 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1372 message_entity.raw(),
1373 queue,
1374 group_ref
1375 ))
1376 })?,
1377 };
1378 lifecycle.ack(&txn, &did).map_err(map_qse)?;
1379 self.invalidate_result_cache();
1380
1381 Ok(RuntimeQueryResult::ok_message(
1382 raw_query.to_string(),
1383 "message acknowledged",
1384 "update",
1385 ))
1386 }
1387 QueueCommand::Nack {
1388 queue,
1389 group,
1390 message_id,
1391 delivery_id,
1392 delay_ms,
1393 } => {
1394 let store = self.inner.db.store();
1395 ensure_queue_exists(store.as_ref(), queue)?;
1396 let config = load_queue_config(store.as_ref(), queue);
1397 if delay_ms.is_some() {
1403 if let Some((_, role)) = current_auth_identity() {
1404 if !role.can_write() {
1405 return Err(RedDBError::InvalidOperation(format!(
1406 "role '{role}' is not authorized to override NACK retry delay on queue '{queue}'"
1407 )));
1408 }
1409 }
1410 }
1411 let (group_owned, message_entity) = resolve_ack_nack_handle(
1412 store.as_ref(),
1413 queue,
1414 group,
1415 message_id,
1416 delivery_id.as_deref(),
1417 )?;
1418 let group_ref = group_owned.as_str();
1419 require_queue_group(store.as_ref(), queue, group_ref)?;
1420 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1421 let did = match delivery_id.as_deref() {
1422 Some(d) => d.to_string(),
1423 None => ps
1424 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1425 .ok_or_else(|| {
1426 RedDBError::NotFound(format!(
1427 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1428 message_entity.raw(),
1429 queue,
1430 group_ref
1431 ))
1432 })?,
1433 };
1434 let effective_delay_ms = delay_ms.or(config.retry_delay_ms).unwrap_or(0);
1438 let outcome = lifecycle.nack(&txn, &did).map_err(map_qse)?;
1439 if matches!(outcome, RetirementOutcome::Requeued) && effective_delay_ms > 0 {
1443 let at_ns =
1444 now_ns().saturating_add(effective_delay_ms.saturating_mul(1_000_000));
1445 set_message_available_at_ns(
1446 store.as_ref(),
1447 queue,
1448 message_entity,
1449 Some(at_ns),
1450 config.ttl_ms,
1451 )?;
1452 }
1453 self.maybe_emit_nack_audit(
1460 queue,
1461 group_ref,
1462 &did,
1463 *delay_ms,
1464 config.retry_delay_ms,
1465 &outcome,
1466 );
1467 let message = match outcome {
1468 RetirementOutcome::Requeued => {
1469 if effective_delay_ms > 0 {
1470 format!("message requeued (delay={effective_delay_ms}ms)")
1471 } else {
1472 "message requeued".to_string()
1473 }
1474 }
1475 RetirementOutcome::MovedToDlq(dlq) => {
1476 format!("message moved to dead-letter queue '{}'", dlq)
1477 }
1478 RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
1479 };
1480 self.invalidate_result_cache();
1481
1482 Ok(RuntimeQueryResult::ok_message(
1483 raw_query.to_string(),
1484 &message,
1485 "update",
1486 ))
1487 }
1488 QueueCommand::Move {
1489 source,
1490 destination,
1491 filter,
1492 limit,
1493 } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
1494 }
1495 }
1496
1497 pub fn execute_queue_select(
1498 &self,
1499 raw_query: &str,
1500 query: &QueueSelectQuery,
1501 ) -> RedDBResult<RuntimeQueryResult> {
1502 let store = self.inner.db.store();
1503 ensure_queue_exists(store.as_ref(), &query.queue)?;
1504 let config = load_queue_config(store.as_ref(), &query.queue);
1505 let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
1506 let columns = if query.columns.is_empty() {
1507 queue_projection_default_columns()
1508 } else {
1509 query.columns.clone()
1510 };
1511
1512 let mut messages =
1513 load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1514 sort_queue_messages(&mut messages, &config, QueueSide::Left);
1515
1516 let mut result = UnifiedResult::with_columns(columns.clone());
1517 for message in messages {
1518 if query
1519 .filter
1520 .as_ref()
1521 .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1522 {
1523 continue;
1524 }
1525 let record = queue_projection_record(&columns, &message, dlq)?;
1526 result.push(record);
1527 if query
1528 .limit
1529 .is_some_and(|limit| result.records.len() >= limit as usize)
1530 {
1531 break;
1532 }
1533 }
1534
1535 Ok(RuntimeQueryResult {
1536 query: raw_query.to_string(),
1537 mode: QueryMode::Sql,
1538 statement: "queue_select",
1539 engine: "runtime-queue",
1540 result,
1541 affected_rows: 0,
1542 statement_type: "select",
1543 bookmark: None,
1544 })
1545 }
1546
1547 fn execute_queue_move(
1548 &self,
1549 raw_query: &str,
1550 source: &str,
1551 destination: &str,
1552 filter: Option<&Filter>,
1553 limit: usize,
1554 ) -> RedDBResult<RuntimeQueryResult> {
1555 if source == destination {
1556 return Err(RedDBError::Query(
1557 "QUEUE MOVE source and destination must be different".to_string(),
1558 ));
1559 }
1560 let store = self.inner.db.store();
1561 ensure_queue_exists(store.as_ref(), source)?;
1562 ensure_queue_exists(store.as_ref(), destination)?;
1563 let source_config = load_queue_config(store.as_ref(), source);
1564 let destination_config = load_queue_config(store.as_ref(), destination);
1565 let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1566
1567 let mut messages =
1568 load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1569 sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1570 let selected = messages
1571 .into_iter()
1572 .filter(|message| {
1573 filter
1574 .map(|f| queue_message_matches_filter(message, source_dlq, f))
1575 .unwrap_or(true)
1576 })
1577 .take(limit)
1578 .collect::<Vec<_>>();
1579
1580 if let Some(max_size) = destination_config.max_size {
1581 let current_len =
1582 load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1583 .len();
1584 if current_len + selected.len() > max_size {
1585 return Err(RedDBError::Query(format!(
1586 "queue '{}' is full (max_size={max_size})",
1587 destination
1588 )));
1589 }
1590 }
1591
1592 for message in &selected {
1593 let lock = queue_message_lock_handle(self, source, message.id);
1594 let Some(_guard) = lock.try_lock() else {
1595 return Err(RedDBError::Query(format!(
1596 "message '{}' is locked on queue '{}'",
1597 message.id.raw(),
1598 source
1599 )));
1600 };
1601 if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1602 return Err(RedDBError::Query(format!(
1603 "message '{}' is no longer available on queue '{}'",
1604 message.id.raw(),
1605 source
1606 )));
1607 }
1608 }
1609
1610 let mut inserted = Vec::new();
1611 for message in &selected {
1612 match insert_moved_queue_message(
1613 store.as_ref(),
1614 destination,
1615 &destination_config,
1616 message,
1617 ) {
1618 Ok(id) => inserted.push(id),
1619 Err(err) => {
1620 for id in inserted {
1621 let _ = store.delete(destination, id);
1622 }
1623 return Err(err);
1624 }
1625 }
1626 }
1627
1628 let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1629 for message in &selected {
1630 move_lifecycle
1631 .delete_with_state(source, message.id.raw(), &move_txn)
1632 .map_err(map_qse)?;
1633 }
1634 if !selected.is_empty() {
1635 self.invalidate_result_cache();
1636 }
1637
1638 let selected_count = selected.len() as u64;
1639 self.audit_log().record_event(
1640 AuditEvent::builder("queue/move")
1641 .source(AuditAuthSource::System)
1642 .outcome(Outcome::Success)
1643 .resource(format!("queue:{source}->{destination}"))
1644 .fields([
1645 AuditFieldEscaper::field("source", source),
1646 AuditFieldEscaper::field("destination", destination),
1647 AuditFieldEscaper::field("selected", selected_count),
1648 AuditFieldEscaper::field("committed", selected_count),
1649 ])
1650 .build(),
1651 );
1652
1653 let mut result = UnifiedResult::with_columns(vec![
1654 "source".into(),
1655 "destination".into(),
1656 "selected".into(),
1657 "committed".into(),
1658 ]);
1659 let mut record = UnifiedRecord::new();
1660 record.set("source", Value::text(source.to_string()));
1661 record.set("destination", Value::text(destination.to_string()));
1662 record.set("selected", Value::UnsignedInteger(selected_count));
1663 record.set("committed", Value::UnsignedInteger(selected_count));
1664 result.push(record);
1665
1666 Ok(RuntimeQueryResult {
1667 query: raw_query.to_string(),
1668 mode: QueryMode::Sql,
1669 statement: "queue_move",
1670 engine: "runtime-queue",
1671 result,
1672 affected_rows: selected_count,
1673 statement_type: "update",
1674 bookmark: None,
1675 })
1676 }
1677
1678 fn maybe_emit_nack_audit(
1693 &self,
1694 queue: &str,
1695 group: &str,
1696 delivery_id: &str,
1697 override_ms: Option<u64>,
1698 default_ms: Option<u64>,
1699 outcome: &RetirementOutcome,
1700 ) {
1701 let Some(override_ms) = override_ms else {
1702 return;
1703 };
1704 let outcome_label = match outcome {
1705 RetirementOutcome::Requeued => "requeued",
1706 RetirementOutcome::MovedToDlq(_) => "dlq",
1707 RetirementOutcome::Dropped => "dropped",
1708 };
1709 const SIGNIFICANT_DELAY_MS: u64 = 60_000;
1710 let destination_changed = !matches!(outcome, RetirementOutcome::Requeued);
1711 if override_ms < SIGNIFICANT_DELAY_MS && !destination_changed {
1712 return;
1713 }
1714 self.audit_log().record_event(
1715 AuditEvent::builder("queue/nack/override")
1716 .source(AuditAuthSource::System)
1717 .outcome(Outcome::Success)
1718 .resource(format!("queue:{queue}"))
1719 .fields([
1720 AuditFieldEscaper::field("queue", queue),
1721 AuditFieldEscaper::field("group", group),
1722 AuditFieldEscaper::field("delivery_id", delivery_id),
1723 AuditFieldEscaper::field("override_delay_ms", override_ms),
1724 AuditFieldEscaper::field("default_delay_ms", default_ms.unwrap_or(0)),
1725 AuditFieldEscaper::field("outcome", outcome_label),
1726 ])
1727 .build(),
1728 );
1729 }
1730}
1731
1732fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1733 if store.get_collection(queue).is_some() {
1734 Ok(())
1735 } else {
1736 Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1737 }
1738}
1739
1740pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1741 let default = QueueRuntimeConfig {
1742 mode: QueueMode::Work,
1743 priority: false,
1744 max_size: None,
1745 ttl_ms: None,
1746 dlq: None,
1747 max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1748 lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1749 in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1750 retry_delay_ms: None,
1751 };
1752
1753 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1754 return default;
1755 };
1756 manager
1757 .query_all(|entity| {
1758 entity.data.as_row().is_some_and(|row| {
1759 row_text(row, "kind").as_deref() == Some("queue_config")
1760 && row_text(row, "queue").as_deref() == Some(queue)
1761 })
1762 })
1763 .into_iter()
1764 .find_map(|entity| {
1765 let row = entity.data.as_row()?;
1766 Some(QueueRuntimeConfig {
1767 mode: row_text(row, "mode")
1768 .as_deref()
1769 .and_then(QueueMode::parse)
1770 .unwrap_or_default(),
1771 priority: row_bool(row, "priority").unwrap_or(false),
1772 max_size: row_u64(row, "max_size").map(|value| value as usize),
1773 ttl_ms: row_u64(row, "ttl_ms"),
1774 dlq: row_text(row, "dlq"),
1775 max_attempts: row_u64(row, "max_attempts")
1776 .map(|value| value as u32)
1777 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1778 lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1779 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1780 in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1781 .map(|value| value as u32)
1782 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1783 retry_delay_ms: row_u64(row, "retry_delay_ms").filter(|v| *v > 0),
1784 })
1785 })
1786 .unwrap_or(default)
1787}
1788
1789pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1790 load_queue_config(store, queue).mode.as_str()
1791}
1792
1793fn save_queue_config(
1794 store: &UnifiedStore,
1795 queue: &str,
1796 config: &QueueRuntimeConfig,
1797) -> RedDBResult<()> {
1798 remove_meta_rows(store, |row| {
1799 row_text(row, "kind").as_deref() == Some("queue_config")
1800 && row_text(row, "queue").as_deref() == Some(queue)
1801 });
1802
1803 let mut fields = HashMap::new();
1804 fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1805 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1806 fields.insert(
1807 "mode".to_string(),
1808 Value::text(config.mode.as_str().to_string()),
1809 );
1810 fields.insert("priority".to_string(), Value::Boolean(config.priority));
1811 fields.insert(
1812 "max_size".to_string(),
1813 config
1814 .max_size
1815 .map(|value| Value::UnsignedInteger(value as u64))
1816 .unwrap_or(Value::Null),
1817 );
1818 fields.insert(
1819 "ttl_ms".to_string(),
1820 config
1821 .ttl_ms
1822 .map(Value::UnsignedInteger)
1823 .unwrap_or(Value::Null),
1824 );
1825 fields.insert(
1826 "dlq".to_string(),
1827 config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1828 );
1829 fields.insert(
1830 "max_attempts".to_string(),
1831 Value::UnsignedInteger(u64::from(config.max_attempts)),
1832 );
1833 fields.insert(
1834 "lock_deadline_ms".to_string(),
1835 Value::UnsignedInteger(config.lock_deadline_ms),
1836 );
1837 fields.insert(
1838 "in_flight_cap_per_group".to_string(),
1839 Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1840 );
1841 fields.insert(
1842 "retry_delay_ms".to_string(),
1843 config
1844 .retry_delay_ms
1845 .map(Value::UnsignedInteger)
1846 .unwrap_or(Value::Null),
1847 );
1848 insert_meta_row(store, fields)
1849}
1850
1851fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1852 remove_meta_rows(store, |row| {
1853 row_text(row, "queue").as_deref() == Some(queue)
1854 });
1855}
1856
1857fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1858 Ok(load_queue_groups(store, queue)?
1859 .into_iter()
1860 .any(|entry| entry.group == group))
1861}
1862
1863pub(super) fn require_queue_group(
1864 store: &UnifiedStore,
1865 queue: &str,
1866 group: &str,
1867) -> RedDBResult<()> {
1868 if queue_group_exists(store, queue, group)? {
1869 Ok(())
1870 } else {
1871 Err(RedDBError::NotFound(format!(
1872 "consumer group '{}' not found on queue '{}'",
1873 group, queue
1874 )))
1875 }
1876}
1877
1878pub(super) fn resolve_read_group(
1879 store: &UnifiedStore,
1880 queue: &str,
1881 group: Option<&str>,
1882 consumer: &str,
1883 config: &QueueRuntimeConfig,
1884) -> RedDBResult<String> {
1885 if let Some(group) = group {
1886 require_queue_group(store, queue, group)?;
1887 return Ok(group.to_string());
1888 }
1889
1890 match config.mode {
1891 QueueMode::Work => {
1892 if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1893 save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1894 }
1895 Ok(WORK_DEFAULT_GROUP.to_string())
1896 }
1897 QueueMode::Fanout => {
1898 let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1899 if !queue_group_exists(store, queue, &fanout_group)? {
1900 save_queue_group(store, queue, &fanout_group)?;
1901 }
1902 Ok(fanout_group)
1903 }
1904 }
1905}
1906
1907fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1908 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1909 return Ok(Vec::new());
1910 };
1911 Ok(manager
1912 .query_all(|entity| {
1913 entity.data.as_row().is_some_and(|row| {
1914 row_text(row, "kind").as_deref() == Some("queue_group")
1915 && row_text(row, "queue").as_deref() == Some(queue)
1916 })
1917 })
1918 .into_iter()
1919 .filter_map(|entity| {
1920 let row = entity.data.as_row()?;
1921 Some(QueueGroupEntry {
1922 entity_id: entity.id,
1923 group: row_text(row, "group")?,
1924 })
1925 })
1926 .collect())
1927}
1928
1929fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1930 let mut fields = HashMap::new();
1931 fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1932 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1933 fields.insert("group".to_string(), Value::text(group.to_string()));
1934 fields.insert(
1935 "created_at_ns".to_string(),
1936 Value::UnsignedInteger(now_ns()),
1937 );
1938 insert_meta_row(store, fields)
1939}
1940
1941pub(super) fn load_pending_entries(
1942 store: &UnifiedStore,
1943 queue: &str,
1944 group: Option<&str>,
1945 message_id: Option<EntityId>,
1946) -> RedDBResult<Vec<QueuePendingEntry>> {
1947 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1948 return Ok(Vec::new());
1949 };
1950 let lock_deadline_ns = load_queue_config(store, queue)
1951 .lock_deadline_ms
1952 .saturating_mul(1_000_000);
1953 let attempts_by_key: HashMap<(String, String, u64), u64> = manager
1954 .query_all(|entity| {
1955 entity.data.as_row().is_some_and(|row| {
1956 row_text(row, "kind").as_deref() == Some("queue_attempts_lc")
1957 && row_text(row, "queue").as_deref() == Some(queue)
1958 })
1959 })
1960 .into_iter()
1961 .filter_map(|entity| {
1962 let row = entity.data.as_row()?;
1963 Some((
1964 (
1965 row_text(row, "queue")?,
1966 row_text(row, "group")?,
1967 row_u64(row, "message_id")?,
1968 ),
1969 row_u64(row, "attempts").unwrap_or(1),
1970 ))
1971 })
1972 .collect();
1973 Ok(manager
1974 .query_all(|entity| {
1975 entity.data.as_row().is_some_and(|row| {
1976 matches!(
1977 row_text(row, "kind").as_deref(),
1978 Some("queue_pending") | Some("queue_pending_lc")
1979 ) && row_text(row, "queue").as_deref() == Some(queue)
1980 && group
1981 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1982 .unwrap_or(true)
1983 && message_id
1984 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1985 .unwrap_or(true)
1986 })
1987 })
1988 .into_iter()
1989 .filter_map(|entity| {
1990 let row = entity.data.as_row()?;
1991 let group = row_text(row, "group")?;
1992 let message_id = row_u64(row, "message_id")?;
1993 let kind = row_text(row, "kind")?;
1994 let delivered_at_ns = if kind == "queue_pending_lc" {
1995 row_u64(row, "lock_deadline_ns")
1996 .unwrap_or(0)
1997 .saturating_sub(lock_deadline_ns)
1998 } else {
1999 row_u64(row, "delivered_at_ns")?
2000 };
2001 let delivery_count = if kind == "queue_pending_lc" {
2002 attempts_by_key
2003 .get(&(queue.to_string(), group.clone(), message_id))
2004 .copied()
2005 .unwrap_or(1)
2006 } else {
2007 row_u64(row, "delivery_count").unwrap_or(1)
2008 };
2009 Some(QueuePendingEntry {
2010 entity_id: entity.id,
2011 group,
2012 message_id: EntityId::new(message_id),
2013 consumer: row_text(row, "consumer").unwrap_or_default(),
2014 delivered_at_ns,
2015 delivery_count: delivery_count as u32,
2016 })
2017 })
2018 .collect())
2019}
2020
2021pub(super) fn save_queue_pending(
2022 store: &UnifiedStore,
2023 queue: &str,
2024 group: &str,
2025 message_id: EntityId,
2026 consumer: &str,
2027 delivered_at_ns: u64,
2028 delivery_count: u32,
2029) -> RedDBResult<()> {
2030 remove_meta_rows(store, |row| {
2031 row_text(row, "kind").as_deref() == Some("queue_pending")
2032 && row_text(row, "queue").as_deref() == Some(queue)
2033 && row_text(row, "group").as_deref() == Some(group)
2034 && row_u64(row, "message_id") == Some(message_id.raw())
2035 });
2036
2037 let mut fields = HashMap::new();
2038 fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
2039 fields.insert("queue".to_string(), Value::text(queue.to_string()));
2040 fields.insert("group".to_string(), Value::text(group.to_string()));
2041 fields.insert(
2042 "message_id".to_string(),
2043 Value::UnsignedInteger(message_id.raw()),
2044 );
2045 fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
2046 fields.insert(
2047 "delivered_at_ns".to_string(),
2048 Value::UnsignedInteger(delivered_at_ns),
2049 );
2050 fields.insert(
2051 "delivery_count".to_string(),
2052 Value::UnsignedInteger(u64::from(delivery_count)),
2053 );
2054 insert_meta_row(store, fields)
2055}
2056
2057pub(super) fn require_pending_entry(
2058 store: &UnifiedStore,
2059 queue: &str,
2060 group: &str,
2061 message_id: EntityId,
2062) -> RedDBResult<QueuePendingEntry> {
2063 load_pending_entries(store, queue, Some(group), Some(message_id))?
2064 .into_iter()
2065 .next()
2066 .ok_or_else(|| {
2067 RedDBError::NotFound(format!(
2068 "message '{}' is not pending in group '{}' on queue '{}'",
2069 message_id.raw(),
2070 group,
2071 queue
2072 ))
2073 })
2074}
2075
2076pub(super) fn load_ack_entries(
2077 store: &UnifiedStore,
2078 queue: &str,
2079 group: Option<&str>,
2080 message_id: Option<EntityId>,
2081) -> RedDBResult<Vec<QueueAckEntry>> {
2082 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2083 return Ok(Vec::new());
2084 };
2085 Ok(manager
2086 .query_all(|entity| {
2087 entity.data.as_row().is_some_and(|row| {
2088 row_text(row, "kind").as_deref() == Some("queue_ack")
2089 && row_text(row, "queue").as_deref() == Some(queue)
2090 && group
2091 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2092 .unwrap_or(true)
2093 && message_id
2094 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2095 .unwrap_or(true)
2096 })
2097 })
2098 .into_iter()
2099 .filter_map(|entity| {
2100 let row = entity.data.as_row()?;
2101 Some(QueueAckEntry {
2102 entity_id: entity.id,
2103 group: row_text(row, "group")?,
2104 message_id: EntityId::new(row_u64(row, "message_id")?),
2105 })
2106 })
2107 .collect())
2108}
2109
2110pub(super) fn save_queue_ack(
2111 store: &UnifiedStore,
2112 queue: &str,
2113 group: &str,
2114 message_id: EntityId,
2115) -> RedDBResult<()> {
2116 let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
2117 if !existing.is_empty() {
2118 return Ok(());
2119 }
2120
2121 let mut fields = HashMap::new();
2122 fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
2123 fields.insert("queue".to_string(), Value::text(queue.to_string()));
2124 fields.insert("group".to_string(), Value::text(group.to_string()));
2125 fields.insert(
2126 "message_id".to_string(),
2127 Value::UnsignedInteger(message_id.raw()),
2128 );
2129 fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
2130 insert_meta_row(store, fields)
2131}
2132
2133pub(super) fn queue_message_completed_for_all_groups(
2134 store: &UnifiedStore,
2135 queue: &str,
2136 message_id: EntityId,
2137) -> RedDBResult<bool> {
2138 let groups = load_queue_groups(store, queue)?;
2139 let pending = load_pending_entries(store, queue, None, Some(message_id))?;
2140 if !pending.is_empty() {
2141 return Ok(false);
2142 }
2143 if groups.is_empty() {
2144 return Ok(true);
2145 }
2146
2147 let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
2148 .into_iter()
2149 .map(|entry| entry.group)
2150 .collect::<HashSet<_>>();
2151 Ok(groups
2152 .into_iter()
2153 .all(|group| acked_groups.contains(&group.group)))
2154}
2155
2156fn load_queue_message_views(
2157 store: &UnifiedStore,
2158 queue: &str,
2159) -> RedDBResult<Vec<QueueMessageView>> {
2160 load_queue_message_views_with_runtime(None, store, queue)
2161}
2162
2163pub(super) fn load_queue_message_views_with_runtime(
2170 runtime: Option<&RedDBRuntime>,
2171 store: &UnifiedStore,
2172 queue: &str,
2173) -> RedDBResult<Vec<QueueMessageView>> {
2174 let manager = store
2175 .get_collection(queue)
2176 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
2177 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
2181 let rls_filter = runtime.and_then(|rt| {
2182 crate::runtime::impl_core::rls_policy_filter_for_kind(
2183 rt,
2184 queue,
2185 crate::storage::query::ast::PolicyAction::Select,
2186 crate::storage::query::ast::PolicyTargetKind::Messages,
2187 )
2188 });
2189 let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
2190 && rls_filter.is_none()
2191 && runtime.is_some();
2192 if rls_enabled_but_denied {
2193 return Ok(Vec::new());
2195 }
2196 let filter_arc = rls_filter.map(std::sync::Arc::new);
2197 let rt_arc = runtime;
2198 Ok(manager
2199 .query_all(move |entity| {
2200 if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
2201 return false;
2202 }
2203 if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
2204 return false;
2205 }
2206 if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
2207 return crate::runtime::query_exec::evaluate_entity_filter_with_db(
2208 Some(&rt.inner.db),
2209 entity,
2210 filter,
2211 queue,
2212 queue,
2213 );
2214 }
2215 true
2216 })
2217 .into_iter()
2218 .filter_map(queue_message_view_from_entity)
2219 .map(|mut view| {
2220 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2221 view
2222 })
2223 .collect())
2224}
2225
2226fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
2227 let (position, _) = match &entity.kind {
2228 EntityKind::QueueMessage { position, queue } => (*position, queue),
2229 _ => return None,
2230 };
2231 let data = match entity.data {
2232 EntityData::QueueMessage(data) => data,
2233 _ => return None,
2234 };
2235 Some(QueueMessageView {
2236 id: entity.id,
2237 position,
2238 priority: data.priority.unwrap_or(0),
2239 payload: data.payload,
2240 attempts: data.attempts,
2241 max_attempts: data.max_attempts,
2242 enqueued_at_ns: data.enqueued_at_ns,
2243 available_at_ns: None,
2244 })
2245}
2246
2247pub(super) fn insert_moved_queue_message_payload(
2254 store: &UnifiedStore,
2255 queue: &str,
2256 payload: &Value,
2257) -> RedDBResult<EntityId> {
2258 let config = load_queue_config(store, queue);
2259 let position = next_queue_position(store, queue, QueueSide::Right)?;
2260 let enqueued_at_ns = std::time::SystemTime::now()
2261 .duration_since(std::time::UNIX_EPOCH)
2262 .map(|d| d.as_nanos() as u64)
2263 .unwrap_or(0);
2264 let entity = UnifiedEntity::new(
2265 EntityId::new(0),
2266 EntityKind::QueueMessage {
2267 queue: queue.to_string(),
2268 position,
2269 },
2270 EntityData::QueueMessage(QueueMessageData {
2271 payload: payload.clone(),
2272 priority: None,
2273 enqueued_at_ns,
2274 attempts: 0,
2275 max_attempts: config.max_attempts,
2276 acked: false,
2277 }),
2278 );
2279 let id = store
2280 .insert_auto(queue, entity)
2281 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2282 if let Some(ttl_ms) = config.ttl_ms {
2283 store
2284 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2285 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2286 }
2287 Ok(id)
2288}
2289
2290fn insert_moved_queue_message(
2291 store: &UnifiedStore,
2292 queue: &str,
2293 config: &QueueRuntimeConfig,
2294 message: &QueueMessageView,
2295) -> RedDBResult<EntityId> {
2296 let position = next_queue_position(store, queue, QueueSide::Right)?;
2297 let entity = UnifiedEntity::new(
2298 EntityId::new(0),
2299 EntityKind::QueueMessage {
2300 queue: queue.to_string(),
2301 position,
2302 },
2303 EntityData::QueueMessage(QueueMessageData {
2304 payload: message.payload.clone(),
2305 priority: if config.priority {
2306 Some(message.priority)
2307 } else {
2308 None
2309 },
2310 enqueued_at_ns: message.enqueued_at_ns,
2311 attempts: message.attempts,
2312 max_attempts: message.max_attempts,
2313 acked: false,
2314 }),
2315 );
2316 let id = store
2317 .insert_auto(queue, entity)
2318 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2319 if let Some(ttl_ms) = config.ttl_ms {
2320 store
2321 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2322 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2323 }
2324 Ok(id)
2325}
2326
2327fn queue_projection_default_columns() -> Vec<String> {
2328 [
2329 "id",
2330 "payload",
2331 "priority",
2332 "attempts",
2333 "last_error",
2334 "enqueued_at",
2335 "available_at",
2336 "dlq",
2337 "tenant",
2338 ]
2339 .into_iter()
2340 .map(str::to_string)
2341 .collect()
2342}
2343
2344fn queue_projection_record(
2345 columns: &[String],
2346 message: &QueueMessageView,
2347 dlq: bool,
2348) -> RedDBResult<UnifiedRecord> {
2349 let mut record = UnifiedRecord::new();
2350 for column in columns {
2351 let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
2352 RedDBError::Query(format!("unknown queue projection column '{}'", column))
2353 })?;
2354 record.set(column, value);
2355 }
2356 Ok(record)
2357}
2358
2359fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
2360 match column {
2361 "id" => Some(Value::text(message_id_string(message.id))),
2362 "payload" => Some(message.payload.clone()),
2363 "priority" => Some(Value::Integer(i64::from(message.priority))),
2364 "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
2365 "last_error" => Some(Value::Null),
2366 "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
2367 "available_at" => Some(Value::UnsignedInteger(
2368 message.available_at_ns.unwrap_or(message.enqueued_at_ns),
2369 )),
2370 "dlq" => Some(Value::Boolean(dlq)),
2371 "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
2372 _ => None,
2373 }
2374}
2375
2376fn queue_message_tenant(payload: &Value) -> Option<Value> {
2377 let Value::Json(bytes) = payload else {
2378 return None;
2379 };
2380 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2381 json.get("tenant")
2382 .and_then(crate::json::Value::as_str)
2383 .map(|tenant| Value::text(tenant.to_string()))
2384}
2385
2386fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
2387 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2388 return false;
2389 };
2390 !manager
2391 .query_all(|entity| {
2392 entity.data.as_row().is_some_and(|row| {
2393 row_text(row, "kind").as_deref() == Some("queue_config")
2394 && row_text(row, "dlq").as_deref() == Some(queue)
2395 })
2396 })
2397 .is_empty()
2398}
2399
2400fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
2401 match filter {
2402 Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
2403 .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
2404 Filter::CompareFields { left, op, right } => {
2405 match (
2406 queue_filter_field_value(message, dlq, left),
2407 queue_filter_field_value(message, dlq, right),
2408 ) {
2409 (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
2410 _ => false,
2411 }
2412 }
2413 Filter::And(left, right) => {
2414 queue_message_matches_filter(message, dlq, left)
2415 && queue_message_matches_filter(message, dlq, right)
2416 }
2417 Filter::Or(left, right) => {
2418 queue_message_matches_filter(message, dlq, left)
2419 || queue_message_matches_filter(message, dlq, right)
2420 }
2421 Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
2422 Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
2423 .is_none_or(|value| matches!(value, Value::Null)),
2424 Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
2425 .is_some_and(|value| !matches!(value, Value::Null)),
2426 Filter::In { field, values } => {
2427 queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
2428 values
2429 .iter()
2430 .any(|value| queue_values_equal(&candidate, value))
2431 })
2432 }
2433 Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
2434 .is_some_and(|candidate| {
2435 queue_compare_values(&candidate, low, CompareOp::Ge)
2436 && queue_compare_values(&candidate, high, CompareOp::Le)
2437 }),
2438 Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
2439 .is_some_and(|value| queue_like_matches(&value, pattern)),
2440 Filter::StartsWith { field, prefix } => {
2441 queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
2442 }
2443 Filter::EndsWith { field, suffix } => {
2444 queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
2445 }
2446 Filter::Contains { field, substring } => {
2447 queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
2448 }
2449 Filter::CompareExpr { .. } => false,
2450 }
2451}
2452
2453fn queue_filter_field_value(
2454 message: &QueueMessageView,
2455 dlq: bool,
2456 field: &FieldRef,
2457) -> Option<Value> {
2458 match field {
2459 FieldRef::TableColumn { table, column } if table.is_empty() => {
2460 queue_projection_value(message, dlq, column)
2461 .or_else(|| queue_payload_field_value(&message.payload, column))
2462 }
2463 FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
2464 .or_else(|| queue_payload_field_value(&message.payload, column)),
2465 _ => None,
2466 }
2467}
2468
2469fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
2470 let Value::Json(bytes) = payload else {
2471 return None;
2472 };
2473 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2474 let value = json.get(field)?;
2475 json_value_to_schema_value(value)
2476}
2477
2478fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
2479 if matches!(value, crate::json::Value::Null) {
2480 Some(Value::Null)
2481 } else if let Some(value) = value.as_bool() {
2482 Some(Value::Boolean(value))
2483 } else if let Some(value) = value.as_i64() {
2484 Some(Value::Integer(value))
2485 } else if let Some(value) = value.as_u64() {
2486 Some(Value::UnsignedInteger(value))
2487 } else if let Some(value) = value.as_f64() {
2488 Some(Value::Float(value))
2489 } else if let Some(value) = value.as_str() {
2490 Some(Value::text(value.to_string()))
2491 } else {
2492 Some(Value::Json(value.to_string_compact().into_bytes()))
2493 }
2494}
2495
2496fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
2497 queue_filter_field_value(message, dlq, field).and_then(|value| match value {
2498 Value::Text(value) => Some(value.to_string()),
2499 Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
2500 Value::Integer(value) => Some(value.to_string()),
2501 Value::UnsignedInteger(value) => Some(value.to_string()),
2502 Value::Float(value) => Some(value.to_string()),
2503 Value::Boolean(value) => Some(value.to_string()),
2504 _ => None,
2505 })
2506}
2507
2508fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
2509 match op {
2510 CompareOp::Eq => queue_values_equal(left, right),
2511 CompareOp::Ne => !queue_values_equal(left, right),
2512 CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
2513 CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
2514 CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
2515 CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
2516 }
2517}
2518
2519fn queue_values_equal(left: &Value, right: &Value) -> bool {
2520 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2521 return (left - right).abs() < f64::EPSILON;
2522 }
2523 match (left, right) {
2524 (Value::Text(left), Value::Text(right)) => left == right,
2525 (Value::Boolean(left), Value::Boolean(right)) => left == right,
2526 _ => left == right,
2527 }
2528}
2529
2530fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
2531 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2532 return left.partial_cmp(&right);
2533 }
2534 match (left, right) {
2535 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
2536 _ => None,
2537 }
2538}
2539
2540fn queue_value_number(value: &Value) -> Option<f64> {
2541 match value {
2542 Value::Integer(value) => Some(*value as f64),
2543 Value::UnsignedInteger(value) => Some(*value as f64),
2544 Value::Float(value) => Some(*value),
2545 Value::Text(value) => value.parse().ok(),
2546 _ => None,
2547 }
2548}
2549
2550fn queue_like_matches(value: &str, pattern: &str) -> bool {
2551 if pattern == "%" {
2552 return true;
2553 }
2554 let starts_wild = pattern.starts_with('%');
2555 let ends_wild = pattern.ends_with('%');
2556 let needle = pattern.trim_matches('%');
2557 match (starts_wild, ends_wild) {
2558 (true, true) => value.contains(needle),
2559 (true, false) => value.ends_with(needle),
2560 (false, true) => value.starts_with(needle),
2561 (false, false) => value == needle,
2562 }
2563}
2564
2565pub(super) fn queue_message_view_by_id(
2566 store: &UnifiedStore,
2567 queue: &str,
2568 message_id: EntityId,
2569) -> RedDBResult<Option<QueueMessageView>> {
2570 let manager = queue_manager(store, queue)?;
2571 Ok(manager
2572 .get(message_id)
2573 .and_then(queue_message_view_from_entity)
2574 .map(|mut view| {
2575 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2576 view
2577 }))
2578}
2579
2580pub(super) fn sort_queue_messages(
2581 messages: &mut [QueueMessageView],
2582 config: &QueueRuntimeConfig,
2583 side: QueueSide,
2584) {
2585 messages.sort_by(|left, right| {
2586 if config.priority {
2587 right
2588 .priority
2589 .cmp(&left.priority)
2590 .then_with(|| match side {
2591 QueueSide::Left => left.position.cmp(&right.position),
2592 QueueSide::Right => right.position.cmp(&left.position),
2593 })
2594 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2595 } else {
2596 match side {
2597 QueueSide::Left => left.position.cmp(&right.position),
2598 QueueSide::Right => right.position.cmp(&left.position),
2599 }
2600 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2601 }
2602 });
2603}
2604
2605pub(super) fn next_queue_position(
2606 store: &UnifiedStore,
2607 queue: &str,
2608 side: QueueSide,
2609) -> RedDBResult<u64> {
2610 let messages = load_queue_message_views(store, queue)?;
2611 if messages.is_empty() {
2612 return Ok(QUEUE_POSITION_CENTER);
2613 }
2614 match side {
2615 QueueSide::Left => Ok(messages
2616 .iter()
2617 .map(|message| message.position)
2618 .min()
2619 .unwrap_or(QUEUE_POSITION_CENTER)
2620 .saturating_sub(1)),
2621 QueueSide::Right => Ok(messages
2622 .iter()
2623 .map(|message| message.position)
2624 .max()
2625 .unwrap_or(QUEUE_POSITION_CENTER)
2626 .saturating_add(1)),
2627 }
2628}
2629
2630pub(super) fn increment_queue_attempts(
2631 store: &UnifiedStore,
2632 queue: &str,
2633 message_id: EntityId,
2634) -> RedDBResult<u32> {
2635 let manager = queue_manager(store, queue)?;
2636 let mut entity = manager
2637 .get(message_id)
2638 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2639 match &mut entity.data {
2640 EntityData::QueueMessage(message) => {
2641 message.attempts = message.attempts.saturating_add(1);
2642 let attempts = message.attempts;
2643 manager
2644 .update(entity)
2645 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2646 Ok(attempts)
2647 }
2648 _ => Err(RedDBError::Query(format!(
2649 "entity '{}' is not a queue message",
2650 message_id.raw()
2651 ))),
2652 }
2653}
2654
2655pub(super) fn queue_message_attempts(
2656 store: &UnifiedStore,
2657 queue: &str,
2658 message_id: EntityId,
2659) -> RedDBResult<u32> {
2660 Ok(queue_message_data(store, queue, message_id)?.attempts)
2661}
2662
2663pub(super) fn queue_message_max_attempts(
2664 store: &UnifiedStore,
2665 queue: &str,
2666 message_id: EntityId,
2667) -> RedDBResult<u32> {
2668 Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2669}
2670
2671pub(super) fn queue_message_payload(
2672 store: &UnifiedStore,
2673 queue: &str,
2674 message_id: EntityId,
2675) -> RedDBResult<Value> {
2676 Ok(queue_message_data(store, queue, message_id)?.payload)
2677}
2678
2679pub(super) fn queue_message_pending_any(
2680 store: &UnifiedStore,
2681 queue: &str,
2682 message_id: EntityId,
2683) -> RedDBResult<bool> {
2684 Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2685}
2686
2687pub(super) fn queue_message_pending_for_group(
2688 store: &UnifiedStore,
2689 queue: &str,
2690 group: &str,
2691 message_id: EntityId,
2692) -> RedDBResult<bool> {
2693 Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2694}
2695
2696pub(super) fn queue_message_acked_for_group(
2697 store: &UnifiedStore,
2698 queue: &str,
2699 group: &str,
2700 message_id: EntityId,
2701) -> RedDBResult<bool> {
2702 Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2703}
2704
2705fn queue_manager(
2706 store: &UnifiedStore,
2707 queue: &str,
2708) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2709 store
2710 .get_collection(queue)
2711 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2712}
2713
2714pub(super) fn queue_message_data(
2715 store: &UnifiedStore,
2716 queue: &str,
2717 message_id: EntityId,
2718) -> RedDBResult<QueueMessageData> {
2719 let manager = queue_manager(store, queue)?;
2720 let entity = manager
2721 .get(message_id)
2722 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2723 match entity.data {
2724 EntityData::QueueMessage(message) => Ok(message),
2725 _ => Err(RedDBError::Query(format!(
2726 "entity '{}' is not a queue message",
2727 message_id.raw()
2728 ))),
2729 }
2730}
2731
2732fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2733 let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2734 store
2735 .insert_auto(
2736 QUEUE_META_COLLECTION,
2737 UnifiedEntity::new(
2738 EntityId::new(0),
2739 EntityKind::TableRow {
2740 table: Arc::from(QUEUE_META_COLLECTION),
2741 row_id: 0,
2742 },
2743 EntityData::Row(RowData {
2744 columns: Vec::new(),
2745 named: Some(fields),
2746 schema: None,
2747 }),
2748 ),
2749 )
2750 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2751 Ok(())
2752}
2753
2754pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2755 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2756 return;
2757 };
2758 let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2759 for row in rows {
2760 let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2761 }
2762}
2763
2764pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2765 let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2766}
2767
2768fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2769 format!("{queue}:{}", message_id.raw())
2770}
2771
2772pub(super) fn queue_message_lock_handle(
2773 runtime: &RedDBRuntime,
2774 queue: &str,
2775 message_id: EntityId,
2776) -> Arc<parking_lot::Mutex<()>> {
2777 let key = queue_message_lock_key(queue, message_id);
2778 if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2779 return lock;
2780 }
2781
2782 let mut locks = runtime.inner.queue_message_locks.write();
2783 locks
2784 .entry(key)
2785 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2786 .clone()
2787}
2788
2789pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2790 runtime
2791 .inner
2792 .queue_message_locks
2793 .write()
2794 .remove(&queue_message_lock_key(queue, message_id));
2795}
2796
2797fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2798 let raw = value.strip_prefix('e').unwrap_or(value);
2799 raw.parse::<u64>()
2800 .map(EntityId::new)
2801 .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2802}
2803
2804pub(super) fn resolve_ack_nack_handle(
2810 store: &UnifiedStore,
2811 queue: &str,
2812 group_hint: &str,
2813 message_id_hint: &str,
2814 delivery_id: Option<&str>,
2815) -> RedDBResult<(String, EntityId)> {
2816 if let Some(did) = delivery_id {
2817 return resolve_delivery_id(store, queue, did);
2818 }
2819 if group_hint.is_empty() || message_id_hint.is_empty() {
2820 return Err(RedDBError::Query(
2821 "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2822 .to_string(),
2823 ));
2824 }
2825 log_tuple_deprecation(queue);
2826 let entity = parse_message_id(message_id_hint)?;
2827 Ok((group_hint.to_string(), entity))
2828}
2829
2830fn resolve_delivery_id(
2831 store: &UnifiedStore,
2832 queue: &str,
2833 delivery_id: &str,
2834) -> RedDBResult<(String, EntityId)> {
2835 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2836 return Err(RedDBError::Query(format!(
2837 "delivery_id '{}' does not resolve to a live pending delivery",
2838 delivery_id
2839 )));
2840 };
2841 for entity in manager.query_all(|entity| {
2842 entity.data.as_row().is_some_and(|row| {
2843 row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2844 && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2845 })
2846 }) {
2847 if let Some(row) = entity.data.as_row() {
2848 let row_queue = row_text(row, "queue").unwrap_or_default();
2849 let row_group = row_text(row, "group").unwrap_or_default();
2850 let row_message = row_u64(row, "message_id").unwrap_or(0);
2851 if row_queue != queue {
2852 return Err(RedDBError::Query(format!(
2853 "delivery_id '{}' belongs to queue '{}', not '{}'",
2854 delivery_id, row_queue, queue
2855 )));
2856 }
2857 return Ok((row_group, EntityId::new(row_message)));
2858 }
2859 }
2860 Err(RedDBError::Query(format!(
2861 "delivery_id '{}' does not resolve to a live pending delivery",
2862 delivery_id
2863 )))
2864}
2865
2866fn log_tuple_deprecation(queue: &str) {
2869 use std::sync::atomic::Ordering;
2870 use std::sync::{Mutex, OnceLock};
2871 use std::time::Instant;
2872
2873 static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2874 const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2875
2876 let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2877 let key = (super::impl_core::current_connection_id(), queue.to_string());
2878 let now = Instant::now();
2879 let mut guard = match map.lock() {
2880 Ok(g) => g,
2881 Err(_) => return,
2882 };
2883 let should_emit =
2884 !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2885 if should_emit {
2886 guard.insert(key.clone(), now);
2887 drop(guard);
2888 TUPLE_DEPRECATION_EMITS.fetch_add(1, Ordering::Relaxed);
2889 tracing::warn!(
2890 target: "reddb::queue_lifecycle",
2891 queue = queue,
2892 connection_id = key.0,
2893 "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2894 switch to the server-issued delivery_id (ADR 0026). \
2895 The tuple path will be removed one minor release after introduction.",
2896 );
2897 }
2898}
2899
2900pub static TUPLE_DEPRECATION_EMITS: std::sync::atomic::AtomicU64 =
2906 std::sync::atomic::AtomicU64::new(0);
2907
2908fn message_id_string(message_id: EntityId) -> String {
2909 message_id.raw().to_string()
2910}
2911
2912fn delivered_message_json(
2918 message: crate::runtime::queue_lifecycle::DeliveredMessage,
2919) -> crate::serde_json::Value {
2920 use crate::serde_json::{Map, Value as JsonValue};
2921 let mut obj = Map::new();
2922 obj.insert(
2923 "message_id".to_string(),
2924 JsonValue::String(message_id_string(EntityId::new(message.message_id))),
2925 );
2926 obj.insert(
2927 "payload".to_string(),
2928 crate::presentation::entity_json::storage_value_to_json(&message.payload),
2929 );
2930 obj.insert("consumer".to_string(), JsonValue::String(message.consumer));
2931 obj.insert(
2932 "delivery_count".to_string(),
2933 JsonValue::Number(message.delivery_count as f64),
2934 );
2935 JsonValue::Object(obj)
2936}
2937
2938pub(crate) fn pending_counts_by_group(
2943 store: &UnifiedStore,
2944) -> std::collections::BTreeMap<(String, String), u64> {
2945 let mut counts: std::collections::BTreeMap<(String, String), u64> =
2946 std::collections::BTreeMap::new();
2947 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2948 return counts;
2949 };
2950 for entity in manager.query_all(|entity| {
2951 entity
2952 .data
2953 .as_row()
2954 .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2955 }) {
2956 if let Some(row) = entity.data.as_row() {
2957 let queue = row_text(row, "queue");
2958 let group = row_text(row, "group");
2959 if let (Some(q), Some(g)) = (queue, group) {
2960 *counts.entry((q, g)).or_insert(0) += 1;
2961 }
2962 }
2963 }
2964 counts
2965}
2966
2967pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2968 match row.get_field(field)?.clone() {
2969 Value::Text(value) => Some(value.to_string()),
2970 Value::NodeRef(value) => Some(value),
2971 Value::EdgeRef(value) => Some(value),
2972 Value::TableRef(value) => Some(value),
2973 _ => None,
2974 }
2975}
2976
2977pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2978 match row.get_field(field)?.clone() {
2979 Value::UnsignedInteger(value) => Some(value),
2980 Value::Integer(value) if value >= 0 => Some(value as u64),
2981 Value::Float(value) if value >= 0.0 => Some(value as u64),
2982 Value::Text(value) => value.parse().ok(),
2983 _ => None,
2984 }
2985}
2986
2987fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2988 match row.get_field(field)?.clone() {
2989 Value::Boolean(value) => Some(value),
2990 Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2991 "true" => Some(true),
2992 "false" => Some(false),
2993 _ => None,
2994 },
2995 _ => None,
2996 }
2997}
2998
2999fn queue_collection_contract(
3000 name: &str,
3001 priority: bool,
3002 ttl_ms: Option<u64>,
3003) -> crate::physical::CollectionContract {
3004 let now = current_unix_ms();
3005 let mut context_index_fields = Vec::new();
3006 if priority {
3007 context_index_fields.push("priority".to_string());
3008 }
3009
3010 crate::physical::CollectionContract {
3011 name: name.to_string(),
3012 declared_model: crate::catalog::CollectionModel::Queue,
3013 schema_mode: crate::catalog::SchemaMode::Dynamic,
3014 origin: crate::physical::ContractOrigin::Explicit,
3015 version: 1,
3016 created_at_unix_ms: now,
3017 updated_at_unix_ms: now,
3018 default_ttl_ms: ttl_ms,
3019 vector_dimension: None,
3020 vector_metric: None,
3021 context_index_fields,
3022 declared_columns: Vec::new(),
3023 table_def: None,
3024 timestamps_enabled: false,
3025 context_index_enabled: false,
3026 metrics_raw_retention_ms: None,
3027 metrics_rollup_policies: Vec::new(),
3028 metrics_tenant_identity: None,
3029 metrics_namespace: None,
3030 append_only: true,
3034 subscriptions: Vec::new(),
3035 analytics_config: Vec::new(),
3036 session_key: None,
3037 session_gap_ms: None,
3038 retention_duration_ms: None,
3039 analytical_storage: None,
3040 }
3041}
3042
3043fn current_unix_ms() -> u128 {
3044 std::time::SystemTime::now()
3045 .duration_since(std::time::UNIX_EPOCH)
3046 .unwrap_or_default()
3047 .as_millis()
3048}
3049
3050pub(super) fn now_ns() -> u64 {
3051 std::time::SystemTime::now()
3052 .duration_since(std::time::UNIX_EPOCH)
3053 .unwrap_or_default()
3054 .as_nanos() as u64
3055}
3056
3057pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
3058 queue_message_metadata(Some(ttl_ms), None)
3059}
3060
3061pub(super) fn queue_message_metadata(
3067 ttl_ms: Option<u64>,
3068 available_at_ns: Option<u64>,
3069) -> Metadata {
3070 let mut fields = HashMap::new();
3071 if let Some(ttl_ms) = ttl_ms {
3072 fields.insert(
3073 "_ttl_ms".to_string(),
3074 if ttl_ms <= i64::MAX as u64 {
3075 MetadataValue::Int(ttl_ms as i64)
3076 } else {
3077 MetadataValue::Timestamp(ttl_ms)
3078 },
3079 );
3080 }
3081 if let Some(at_ns) = available_at_ns {
3082 fields.insert(
3083 "_available_at_ns".to_string(),
3084 MetadataValue::Timestamp(at_ns),
3085 );
3086 }
3087 Metadata::with_fields(fields)
3088}
3089
3090pub(super) fn earliest_future_available_at(store: &UnifiedStore, queue: &str) -> Option<u64> {
3098 let now_ns = now_ns();
3099 let views = load_queue_message_views_with_runtime(None, store, queue).ok()?;
3100 views
3101 .iter()
3102 .filter_map(|v| v.available_at_ns)
3103 .filter(|at| *at > now_ns)
3104 .min()
3105}
3106
3107pub(super) fn set_message_available_at_ns(
3118 store: &UnifiedStore,
3119 queue: &str,
3120 message_id: EntityId,
3121 available_at_ns: Option<u64>,
3122 fallback_ttl_ms: Option<u64>,
3123) -> RedDBResult<()> {
3124 let existing_ttl_ms = store
3125 .get_metadata(queue, message_id)
3126 .and_then(|md| match md.get("_ttl_ms")? {
3127 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3128 MetadataValue::Timestamp(t) => Some(*t),
3129 _ => None,
3130 })
3131 .or(fallback_ttl_ms);
3132 let metadata = queue_message_metadata(existing_ttl_ms, available_at_ns);
3133 match store.set_metadata(queue, message_id, metadata) {
3134 Ok(()) => Ok(()),
3135 Err(crate::storage::StoreError::CollectionNotFound(_)) => Ok(()),
3136 Err(err) => Err(RedDBError::Internal(err.to_string())),
3137 }
3138}
3139
3140pub(super) fn read_message_available_at_ns(
3144 store: &UnifiedStore,
3145 queue: &str,
3146 message_id: EntityId,
3147) -> Option<u64> {
3148 let md = store.get_metadata(queue, message_id)?;
3149 match md.get("_available_at_ns")? {
3150 MetadataValue::Timestamp(t) => Some(*t),
3151 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3152 _ => None,
3153 }
3154}
3155
3156fn estimate_payload_bytes(payload: &Value) -> u64 {
3158 match payload {
3159 Value::Json(v) => v.len() as u64,
3160 Value::Text(s) => s.len() as u64,
3161 _ => 64,
3162 }
3163}
3164
3165#[cfg(test)]
3166mod presence_integration_tests {
3167 use super::*;
3168 use crate::storage::queue::presence::{PresenceState, DEFAULT_PRESENCE_TTL_MS};
3169 use crate::{RedDBOptions, RedDBRuntime};
3170
3171 #[test]
3175 fn queue_read_emits_consumer_presence_heartbeat() {
3176 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3177 rt.execute_query("CREATE QUEUE tasks").unwrap();
3178 rt.execute_query("QUEUE GROUP CREATE tasks workers")
3179 .unwrap();
3180 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
3181 rt.execute_query("QUEUE READ tasks GROUP workers CONSUMER w1")
3182 .unwrap();
3183
3184 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3185 assert_eq!(snap.len(), 1, "exactly one heartbeat recorded");
3186 let row = &snap[0];
3187 assert_eq!(row.queue, "tasks");
3188 assert_eq!(row.group, "workers");
3189 assert_eq!(row.consumer, "w1");
3190 assert_eq!(row.state, PresenceState::Active);
3191
3192 let counts = rt.queue_active_consumer_counts(DEFAULT_PRESENCE_TTL_MS);
3193 assert_eq!(
3194 counts[&("tasks".to_string(), "workers".to_string())],
3195 1,
3196 "active count reflects the live consumer"
3197 );
3198 }
3199
3200 #[test]
3204 fn empty_queue_read_still_heartbeats() {
3205 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3206 rt.execute_query("CREATE QUEUE empty_q").unwrap();
3207 rt.execute_query("QUEUE GROUP CREATE empty_q workers")
3208 .unwrap();
3209 rt.execute_query("QUEUE READ empty_q GROUP workers CONSUMER w1")
3211 .unwrap();
3212
3213 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3214 assert_eq!(
3215 snap.len(),
3216 1,
3217 "empty read still registers consumer presence"
3218 );
3219 assert_eq!(snap[0].state, PresenceState::Active);
3220 assert_eq!(
3221 snap[0].lease_count, 0,
3222 "no messages delivered → zero leases"
3223 );
3224 }
3225}