1use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::runtime::impl_core::{
9 clear_current_auth_identity, clear_current_tenant, current_auth_identity, current_tenant,
10 set_current_auth_identity, set_current_tenant,
11};
12use crate::storage::queue::QueueMode;
13use crate::storage::unified::entity::{QueueMessageData, RowData};
14use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
15
16use super::*;
17
18use super::primary_queue_store::PrimaryQueueStore;
19use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
20use crate::storage::queue::lifecycle::{
21 QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
22};
23
24pub(super) fn runtime_lifecycle(
31 runtime: &RedDBRuntime,
32 queue: &str,
33) -> (
34 QueueLifecycle<PrimaryQueueStore>,
35 PrimaryQueueStore,
36 QueueTxn,
37) {
38 let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
39 let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
40 let txn = primary_for_lifecycle.new_txn();
41 let cfg = primary_for_lifecycle.lifecycle_config(queue);
42 (
43 QueueLifecycle::new(primary_for_lifecycle, cfg),
44 primary_for_lookup,
45 txn,
46 )
47}
48
49pub(crate) const QUEUE_READ_WAIT_CANCELLED: &str =
55 "QUEUE READ WAIT cancelled — server shutting down";
56
57pub(crate) const QUEUE_MAX_WAIT_MS_CONFIG_KEY: &str = "red.config.queue.max_wait_ms";
61
62pub(crate) const QUEUE_MAX_WAIT_MS_DEFAULT: u64 = 60_000;
65
66#[derive(Debug)]
77pub(crate) enum RedwireWaitOutcome {
78 Delivered(Vec<crate::serde_json::Value>),
79 TimedOut,
80 Cancelled,
81}
82
83pub(super) fn queue_wait_scope() -> String {
88 crate::runtime::impl_core::current_tenant().unwrap_or_default()
89}
90
91fn with_redwire_wait_context<T>(
92 auth_identity: &Option<(String, crate::auth::Role)>,
93 tenant: &Option<String>,
94 f: impl FnOnce() -> T,
95) -> T {
96 let previous_auth = current_auth_identity();
97 let previous_tenant = current_tenant();
98 match tenant {
99 Some(t) => set_current_tenant(t.clone()),
100 None => clear_current_tenant(),
101 }
102 match auth_identity {
103 Some((username, role)) => set_current_auth_identity(username.clone(), *role),
104 None => clear_current_auth_identity(),
105 }
106 let result = f();
107 match previous_tenant {
108 Some(t) => set_current_tenant(t),
109 None => clear_current_tenant(),
110 }
111 match previous_auth {
112 Some((username, role)) => set_current_auth_identity(username, role),
113 None => clear_current_auth_identity(),
114 }
115 result
116}
117
118fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
122 use crate::storage::query::ast::QueueSide as Ast;
123 match side {
124 Ast::Left => LcQueueSide::Left,
125 Ast::Right => LcQueueSide::Right,
126 }
127}
128
129fn map_qse(err: QueueStoreError) -> RedDBError {
133 match err {
134 QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
135 "delivery_id '{id}' does not resolve to a live pending delivery"
136 )),
137 QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
138 QueueStoreError::ReplicaImmutable => {
139 RedDBError::Internal("replica QueueStore is immutable".to_string())
140 }
141 }
142}
143
144pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
151
152pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
154
155pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
157
158const OUTBOX_WARN_BYTES: u64 = 1 << 30;
160
161const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
163
164static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
166
167const QUEUE_META_COLLECTION: &str = "red_queue_meta";
168const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
169const WORK_DEFAULT_GROUP: &str = "_work_default";
170const FANOUT_GROUP_PREFIX: &str = "_fanout_";
171
172#[derive(Debug, Clone)]
173pub(super) struct QueueRuntimeConfig {
174 pub(super) mode: QueueMode,
175 pub(super) priority: bool,
176 pub(super) max_size: Option<usize>,
177 pub(super) ttl_ms: Option<u64>,
178 pub(super) dlq: Option<String>,
179 pub(super) max_attempts: u32,
180 pub(super) lock_deadline_ms: u64,
181 pub(super) in_flight_cap_per_group: u32,
182 pub(super) retry_delay_ms: Option<u64>,
187}
188
189#[derive(Debug, Clone)]
190struct QueueGroupEntry {
191 entity_id: EntityId,
192 group: String,
193}
194
195#[derive(Debug, Clone)]
196pub(super) struct QueuePendingEntry {
197 pub(super) entity_id: EntityId,
198 group: String,
199 pub(super) message_id: EntityId,
200 consumer: String,
201 pub(super) delivered_at_ns: u64,
202 pub(super) delivery_count: u32,
203}
204
205#[derive(Debug, Clone)]
206pub(super) struct QueueAckEntry {
207 entity_id: EntityId,
208 group: String,
209 pub(super) message_id: EntityId,
210}
211
212#[derive(Debug, Clone)]
213pub(super) struct QueueMessageView {
214 pub(super) id: EntityId,
215 position: u64,
216 priority: i32,
217 pub(super) payload: Value,
218 attempts: u32,
219 pub(super) max_attempts: u32,
220 enqueued_at_ns: u64,
221 pub(super) available_at_ns: Option<u64>,
225}
226
227impl QueueMessageView {
228 pub(super) fn is_available_now(&self) -> bool {
233 match self.available_at_ns {
234 Some(at) => at <= now_ns(),
235 None => true,
236 }
237 }
238}
239
240impl RedDBRuntime {
241 pub(super) fn group_read_with_optional_wait(
250 &self,
251 queue: &str,
252 group: &str,
253 consumer: &str,
254 count: usize,
255 wait_ms: Option<u64>,
256 ) -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
257 let do_read =
258 |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
259 let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
260 lifecycle
261 .group_read(&txn, queue, group, consumer, count)
262 .map_err(map_qse)
263 };
264
265 let delivered = do_read(self)?;
266 let Some(wait_ms) = wait_ms else {
267 return Ok(delivered);
268 };
269 if !delivered.is_empty() {
270 return Ok(delivered);
271 }
272 let registry = self.queue_wait_registry();
282 let scope = queue_wait_scope();
283 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
284 let telemetry = self.queue_telemetry();
285 telemetry.record_wait_started(&scope, queue);
286 let wait_start = std::time::Instant::now();
287 let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
288 let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
289 telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
290 };
291 loop {
292 let snapshot = registry.snapshot(&scope, queue);
296 let delivered = do_read(self)?;
297 if !delivered.is_empty() {
298 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
299 return Ok(delivered);
300 }
301 if registry.is_cancelled() {
302 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
303 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
304 }
305 if std::time::Instant::now() >= deadline {
306 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
307 return Ok(Vec::new());
308 }
309 let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
319 Some(at_ns) => {
320 let now_ns = now_ns();
321 if at_ns <= now_ns {
322 deadline.min(std::time::Instant::now())
324 } else {
325 let wait_ns = at_ns - now_ns;
326 let due_instant =
327 std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
328 deadline.min(due_instant)
329 }
330 }
331 None => deadline,
332 };
333 match registry.wait_until(&snapshot, park_deadline) {
334 crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
335 crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
336 if std::time::Instant::now() >= deadline {
340 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
341 return Ok(Vec::new());
342 }
343 continue;
344 }
345 crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
346 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
347 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
348 }
349 }
350 }
351 }
352
353 pub(crate) async fn redwire_queue_wait_json(
370 &self,
371 queue: &str,
372 group: Option<&str>,
373 consumer: &str,
374 count: usize,
375 wait_ms: u64,
376 auth_identity: Option<(String, crate::auth::Role)>,
377 tenant: Option<String>,
378 ) -> RedDBResult<RedwireWaitOutcome> {
379 let group_owned: RedDBResult<String> =
380 with_redwire_wait_context(&auth_identity, &tenant, || {
381 let expr = crate::storage::query::ast::QueryExpr::QueueCommand(
382 crate::storage::query::ast::QueueCommand::GroupRead {
383 queue: queue.to_string(),
384 group: group.map(str::to_string),
385 consumer: consumer.to_string(),
386 count,
387 wait_ms: Some(wait_ms),
388 },
389 );
390 self.check_query_privilege(&expr)
391 .map_err(RedDBError::Query)?;
392 let store = self.inner.db.store();
393 ensure_queue_exists(store.as_ref(), queue)?;
394 let config = load_queue_config(store.as_ref(), queue);
395 resolve_read_group(store.as_ref(), queue, group, consumer, &config)
396 });
397 let group_owned = group_owned?;
398 let group_ref = group_owned.as_str();
399
400 let do_read =
401 |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
402 with_redwire_wait_context(&auth_identity, &tenant, || {
403 let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
404 lifecycle
405 .group_read(&txn, queue, group_ref, consumer, count)
406 .map_err(map_qse)
407 })
408 };
409
410 let render = |delivered: Vec<crate::runtime::queue_lifecycle::DeliveredMessage>| {
411 RedwireWaitOutcome::Delivered(
412 delivered.into_iter().map(delivered_message_json).collect(),
413 )
414 };
415
416 let delivered = do_read(self)?;
418 if !delivered.is_empty() {
419 return Ok(render(delivered));
420 }
421
422 let registry = self.queue_wait_registry();
423 let scope = with_redwire_wait_context(&auth_identity, &tenant, queue_wait_scope);
424 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
425 let telemetry = self.queue_telemetry();
426 telemetry.record_wait_started(&scope, queue);
427 let wait_start = std::time::Instant::now();
428 tracing::debug!(
429 target: "reddb::redwire::queue_wait",
430 queue,
431 group = group_ref,
432 consumer,
433 count,
434 wait_ms,
435 scope = scope.as_str(),
436 "redwire queue wait parked"
437 );
438 let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
439 let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
440 telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
441 tracing::debug!(
442 target: "reddb::redwire::queue_wait",
443 queue,
444 group = group_ref,
445 consumer,
446 count,
447 wait_ms,
448 scope = scope.as_str(),
449 outcome = outcome.as_str(),
450 duration_ms = elapsed_ms,
451 "redwire queue wait resolved"
452 );
453 };
454 loop {
455 let waiter = registry.async_waiter(&scope, queue);
459 let delivered = do_read(self)?;
460 if !delivered.is_empty() {
461 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
462 return Ok(render(delivered));
463 }
464 if registry.is_cancelled() {
465 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
466 return Ok(RedwireWaitOutcome::Cancelled);
467 }
468 if std::time::Instant::now() >= deadline {
469 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
470 return Ok(RedwireWaitOutcome::TimedOut);
471 }
472 let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
473 Some(at_ns) => {
474 let now_ns = now_ns();
475 if at_ns <= now_ns {
476 deadline.min(std::time::Instant::now())
477 } else {
478 let wait_ns = at_ns - now_ns;
479 let due_instant =
480 std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
481 deadline.min(due_instant)
482 }
483 }
484 None => deadline,
485 };
486 match registry.wait_until_async(&waiter, park_deadline).await {
491 crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
492 crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
493 if std::time::Instant::now() >= deadline {
494 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
495 return Ok(RedwireWaitOutcome::TimedOut);
496 }
497 continue;
498 }
499 crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
500 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
501 return Ok(RedwireWaitOutcome::Cancelled);
502 }
503 }
504 }
505 }
506
507 pub(crate) fn redwire_queue_wait_cap_check(&self, wait_ms: u64) -> Result<(), String> {
516 let cap = self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
517 if wait_ms > cap {
518 Err(format!(
519 "queue-wait WAIT {wait_ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
520 ))
521 } else {
522 Ok(())
523 }
524 }
525
526 pub(crate) fn enqueue_event_payload(
527 &self,
528 queue: &str,
529 payload: Value,
530 ) -> RedDBResult<EntityId> {
531 let store = self.inner.db.store();
532 if store.get_collection(queue).is_none() {
534 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
535 }
536
537 let payload_bytes = estimate_payload_bytes(&payload);
539 let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
540
541 if outbox_bytes > OUTBOX_MAX_BYTES {
543 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
544 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
545 return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
546 }
547
548 if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
550 tracing::warn!(
551 outbox_bytes,
552 warn_threshold = OUTBOX_WARN_BYTES,
553 "event outbox approaching capacity warning threshold"
554 );
555 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
556 queue: queue.to_string(),
557 dlq: format!("{queue}_outbox_dlq"),
558 reason: "outbox_warn_bytes_exceeded".to_string(),
559 }
560 .emit_global();
561 }
562
563 let config = load_queue_config(store.as_ref(), queue);
564
565 if let Some(max_size) = config.max_size {
567 let current_len = load_queue_message_views(store.as_ref(), queue)
568 .unwrap_or_default()
569 .len();
570 if current_len >= max_size {
571 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
572 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
573 return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
574 }
575 if current_len * 10 >= max_size * 8 {
577 tracing::warn!(
578 queue = %queue,
579 size = current_len,
580 max = max_size,
581 "event target queue near capacity"
582 );
583 }
584 }
585
586 let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
587 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
588 Ok(id)
589 }
590
591 fn route_event_to_outbox_dlq(
593 &self,
594 queue: &str,
595 payload: Value,
596 reason: &str,
597 ) -> RedDBResult<EntityId> {
598 let dlq_name = format!("{queue}_outbox_dlq");
599 EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
600
601 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
602 queue: queue.to_string(),
603 dlq: dlq_name.clone(),
604 reason: reason.to_string(),
605 }
606 .emit_global();
607
608 let store = self.inner.db.store();
609 if store.get_collection(&dlq_name).is_none() {
610 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
611 }
612 let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
613 let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
614 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
615 Ok(id)
616 }
617
618 fn enqueue_event_payload_raw(
620 &self,
621 store: &UnifiedStore,
622 queue: &str,
623 config: &QueueRuntimeConfig,
624 payload: Value,
625 ) -> RedDBResult<EntityId> {
626 let position = next_queue_position(store, queue, QueueSide::Right)?;
627 let mut entity = UnifiedEntity::new(
628 EntityId::new(0),
629 EntityKind::QueueMessage {
630 queue: queue.to_string(),
631 position,
632 },
633 EntityData::QueueMessage(QueueMessageData {
634 payload,
635 priority: None,
636 enqueued_at_ns: now_ns(),
637 attempts: 0,
638 max_attempts: config.max_attempts,
639 acked: false,
640 }),
641 );
642 if let Some(xid) = self.current_xid() {
643 entity.set_xmin(xid);
644 }
645 let id = store
646 .insert_auto(queue, entity)
647 .map_err(|err| RedDBError::Internal(err.to_string()))?;
648 if let Some(ttl_ms) = config.ttl_ms {
649 store
650 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
651 .map_err(|err| RedDBError::Internal(err.to_string()))?;
652 }
653 self.invalidate_result_cache_for_table(queue);
654 Ok(id)
655 }
656
657 pub fn execute_create_queue(
658 &self,
659 raw_query: &str,
660 query: &CreateQueueQuery,
661 ) -> RedDBResult<RuntimeQueryResult> {
662 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
663 if query.dlq.as_deref() == Some(query.name.as_str()) {
664 return Err(RedDBError::Query(
665 "dead-letter queue must be different from the source queue".to_string(),
666 ));
667 }
668
669 let store = self.inner.db.store();
670 let exists = store.get_collection(&query.name).is_some();
671 if exists {
672 if query.if_not_exists {
673 return Ok(RuntimeQueryResult::ok_message(
674 raw_query.to_string(),
675 &format!("queue '{}' already exists", query.name),
676 "create",
677 ));
678 }
679 return Err(RedDBError::Query(format!(
680 "queue '{}' already exists",
681 query.name
682 )));
683 }
684
685 store
686 .create_collection(&query.name)
687 .map_err(|err| RedDBError::Internal(err.to_string()))?;
688 if let Some(ttl_ms) = query.ttl_ms {
689 self.inner
690 .db
691 .set_collection_default_ttl_ms(&query.name, ttl_ms);
692 }
693 self.inner
694 .db
695 .save_collection_contract(queue_collection_contract(
696 &query.name,
697 query.priority,
698 query.ttl_ms,
699 ))
700 .map_err(|err| RedDBError::Internal(err.to_string()))?;
701 save_queue_config(
702 store.as_ref(),
703 &query.name,
704 &QueueRuntimeConfig {
705 mode: query.mode,
706 priority: query.priority,
707 max_size: query.max_size,
708 ttl_ms: query.ttl_ms,
709 dlq: query.dlq.clone(),
710 max_attempts: query.max_attempts,
711 lock_deadline_ms: query.lock_deadline_ms,
712 in_flight_cap_per_group: query.in_flight_cap_per_group,
713 retry_delay_ms: query.retry_delay_ms,
714 },
715 )?;
716
717 if let Some(dlq) = &query.dlq {
718 if store.get_collection(dlq).is_none() {
719 store
720 .create_collection(dlq)
721 .map_err(|err| RedDBError::Internal(err.to_string()))?;
722 self.inner
723 .db
724 .save_collection_contract(queue_collection_contract(dlq, false, None))
725 .map_err(|err| RedDBError::Internal(err.to_string()))?;
726 }
727 }
728
729 self.invalidate_result_cache();
730 self.inner
731 .db
732 .persist_metadata()
733 .map_err(|err| RedDBError::Internal(err.to_string()))?;
734 let mut type_tags = Vec::new();
739 if let Some(dlq) = &query.dlq {
740 type_tags.push(format!("dlq:{}", dlq));
741 }
742 self.schema_vocabulary_apply(
743 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
744 collection: query.name.clone(),
745 columns: vec!["payload".to_string()],
746 type_tags,
747 description: None,
748 },
749 );
750
751 let mut msg = format!("queue '{}' created", query.name);
752 msg.push_str(&format!(" (mode={})", query.mode.as_str()));
753 if query.priority {
754 msg.push_str(" (priority)");
755 }
756 if let Some(max_size) = query.max_size {
757 msg.push_str(&format!(" (max_size={max_size})"));
758 }
759 if let Some(ttl_ms) = query.ttl_ms {
760 msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
761 }
762 if let Some(dlq) = &query.dlq {
763 msg.push_str(&format!(
764 " (dlq={dlq}, max_attempts={})",
765 query.max_attempts
766 ));
767 }
768
769 Ok(RuntimeQueryResult::ok_message(
770 raw_query.to_string(),
771 &msg,
772 "create",
773 ))
774 }
775
776 pub fn execute_alter_queue(
777 &self,
778 raw_query: &str,
779 query: &AlterQueueQuery,
780 ) -> RedDBResult<RuntimeQueryResult> {
781 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
782 let store = self.inner.db.store();
783 ensure_queue_exists(store.as_ref(), &query.name)?;
784
785 let mut config = load_queue_config(store.as_ref(), &query.name);
786 let mut summary: Vec<String> = Vec::new();
787
788 if let Some(new_mode) = query.mode {
789 let pending =
790 load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
791 if !pending.is_empty() {
792 tracing::warn!(
793 queue = %query.name,
794 pending_count = pending.len(),
795 new_mode = %new_mode.as_str(),
796 "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
797 new reads use {}",
798 pending.len(),
799 new_mode.as_str(),
800 );
801 }
802 config.mode = new_mode;
803 summary.push(format!("mode={}", new_mode.as_str()));
804 }
805 if let Some(max_attempts) = query.max_attempts {
806 config.max_attempts = max_attempts;
807 summary.push(format!("max_attempts={max_attempts}"));
808 }
809 if let Some(lock_deadline_ms) = query.lock_deadline_ms {
810 config.lock_deadline_ms = lock_deadline_ms;
811 summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
812 }
813 if let Some(in_flight_cap) = query.in_flight_cap_per_group {
814 config.in_flight_cap_per_group = in_flight_cap;
815 summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
816 }
817 if let Some(dlq) = &query.dlq {
818 if dlq == &query.name {
819 return Err(RedDBError::Query(
820 "dead-letter queue must be different from the source queue".to_string(),
821 ));
822 }
823 config.dlq = Some(dlq.clone());
824 summary.push(format!("dlq={dlq}"));
825 }
826 if let Some(retry_delay_ms) = query.retry_delay_ms {
827 config.retry_delay_ms = if retry_delay_ms == 0 {
828 None
829 } else {
830 Some(retry_delay_ms)
831 };
832 summary.push(format!("retry_delay_ms={retry_delay_ms}"));
833 }
834
835 save_queue_config(store.as_ref(), &query.name, &config)?;
836
837 self.invalidate_result_cache();
838 self.inner
839 .db
840 .persist_metadata()
841 .map_err(|err| RedDBError::Internal(err.to_string()))?;
842
843 Ok(RuntimeQueryResult::ok_message(
844 raw_query.to_string(),
845 &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
846 "alter",
847 ))
848 }
849
850 pub fn execute_drop_queue(
851 &self,
852 raw_query: &str,
853 query: &DropQueueQuery,
854 ) -> RedDBResult<RuntimeQueryResult> {
855 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
856 let store = self.inner.db.store();
857 if super::impl_ddl::is_system_schema_name(&query.name) {
858 return Err(RedDBError::Query("system schema is read-only".to_string()));
859 }
860 if store.get_collection(&query.name).is_none() {
861 if query.if_exists {
862 return Ok(RuntimeQueryResult::ok_message(
863 raw_query.to_string(),
864 &format!("queue '{}' does not exist", query.name),
865 "drop",
866 ));
867 }
868 return Err(RedDBError::NotFound(format!(
869 "queue '{}' not found",
870 query.name
871 )));
872 }
873 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
874 &query.name,
875 &self.inner.db.catalog_model_snapshot(),
876 )?;
877 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
878 crate::catalog::CollectionModel::Queue,
879 actual,
880 )?;
881
882 store
883 .drop_collection(&query.name)
884 .map_err(|err| RedDBError::Internal(err.to_string()))?;
885 self.inner.db.clear_collection_default_ttl_ms(&query.name);
886 self.inner
887 .db
888 .remove_collection_contract(&query.name)
889 .map_err(|err| RedDBError::Internal(err.to_string()))?;
890 remove_queue_metadata(store.as_ref(), &query.name);
891 self.invalidate_result_cache();
892 self.inner
893 .db
894 .persist_metadata()
895 .map_err(|err| RedDBError::Internal(err.to_string()))?;
896 self.schema_vocabulary_apply(
898 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
899 collection: query.name.clone(),
900 },
901 );
902
903 Ok(RuntimeQueryResult::ok_message(
904 raw_query.to_string(),
905 &format!("queue '{}' dropped", query.name),
906 "drop",
907 ))
908 }
909
910 pub fn execute_queue_command(
911 &self,
912 raw_query: &str,
913 cmd: &QueueCommand,
914 ) -> RedDBResult<RuntimeQueryResult> {
915 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
916 match cmd {
917 QueueCommand::Push {
918 queue,
919 value,
920 side,
921 priority,
922 available,
923 } => {
924 let store = self.inner.db.store();
925 ensure_queue_exists(store.as_ref(), queue)?;
926 let config = load_queue_config(store.as_ref(), queue);
927 if priority.is_some() && !config.priority {
928 return Err(RedDBError::Query(format!(
929 "queue '{}' is not a priority queue",
930 queue
931 )));
932 }
933 if let Some(max_size) = config.max_size {
934 let current_len =
935 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
936 .len();
937 if current_len >= max_size {
938 return Err(RedDBError::Query(format!(
939 "queue '{}' is full (max_size={max_size})",
940 queue
941 )));
942 }
943 }
944
945 let position = next_queue_position(store.as_ref(), queue, *side)?;
946 let mut entity = UnifiedEntity::new(
947 EntityId::new(0),
948 EntityKind::QueueMessage {
949 queue: queue.clone(),
950 position,
951 },
952 EntityData::QueueMessage(QueueMessageData {
953 payload: value.clone(),
954 priority: if config.priority { *priority } else { None },
955 enqueued_at_ns: now_ns(),
956 attempts: 0,
957 max_attempts: config.max_attempts,
958 acked: false,
959 }),
960 );
961 if let Some(xid) = self.current_xid() {
964 entity.set_xmin(xid);
965 }
966 let id = store
967 .insert_auto(queue, entity)
968 .map_err(|err| RedDBError::Internal(err.to_string()))?;
969 let available_at_ns = available.map(|a| match a {
974 crate::storage::query::ast::QueueAvailability::DelayMs(ms) => {
975 now_ns().saturating_add(ms.saturating_mul(1_000_000))
976 }
977 crate::storage::query::ast::QueueAvailability::AtUnixMs(ms) => {
978 ms.saturating_mul(1_000_000)
979 }
980 });
981 if config.ttl_ms.is_some() || available_at_ns.is_some() {
982 store
983 .set_metadata(
984 queue,
985 id,
986 queue_message_metadata(config.ttl_ms, available_at_ns),
987 )
988 .map_err(|err| RedDBError::Internal(err.to_string()))?;
989 }
990 self.record_queue_wake(&queue_wait_scope(), queue);
995 self.invalidate_result_cache();
996
997 let mut result = UnifiedResult::with_columns(vec![
998 "message_id".into(),
999 "side".into(),
1000 "queue".into(),
1001 ]);
1002 let mut record = UnifiedRecord::new();
1003 record.set("message_id", Value::text(message_id_string(id)));
1004 record.set(
1005 "side",
1006 Value::text(match side {
1007 QueueSide::Left => "left".to_string(),
1008 QueueSide::Right => "right".to_string(),
1009 }),
1010 );
1011 record.set("queue", Value::text(queue.clone()));
1012 result.push(record);
1013
1014 Ok(RuntimeQueryResult {
1015 query: raw_query.to_string(),
1016 mode: QueryMode::Sql,
1017 statement: "queue_push",
1018 engine: "runtime-queue",
1019 result,
1020 affected_rows: 1,
1021 statement_type: "insert",
1022 bookmark: None,
1023 })
1024 }
1025 QueueCommand::Pop { queue, side, count } => {
1026 let store = self.inner.db.store();
1027 ensure_queue_exists(store.as_ref(), queue)?;
1028 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1029 let popped = lifecycle
1030 .pop(queue, ast_side_to_lc(*side), *count, &txn)
1031 .map_err(map_qse)?;
1032
1033 let mut result =
1034 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1035 for (message_id, payload) in &popped {
1036 let mut record = UnifiedRecord::new();
1037 record.set(
1038 "message_id",
1039 Value::text(message_id_string(EntityId::new(*message_id))),
1040 );
1041 record.set("payload", payload.clone());
1042 result.push(record);
1043 }
1044 let popped_count = popped.len() as u64;
1045 if popped_count > 0 {
1046 self.invalidate_result_cache();
1047 }
1048
1049 Ok(RuntimeQueryResult {
1050 query: raw_query.to_string(),
1051 mode: QueryMode::Sql,
1052 statement: "queue_pop",
1053 engine: "runtime-queue",
1054 result,
1055 affected_rows: popped_count,
1056 statement_type: "delete",
1057 bookmark: None,
1058 })
1059 }
1060 QueueCommand::Peek { queue, count } => {
1061 let store = self.inner.db.store();
1062 ensure_queue_exists(store.as_ref(), queue)?;
1063 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1064 let messages = lifecycle.peek(queue, *count, &txn);
1065
1066 let mut result =
1067 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1068 for message in messages {
1069 let mut record = UnifiedRecord::new();
1070 record.set(
1071 "message_id",
1072 Value::text(message_id_string(EntityId::new(message.message_id))),
1073 );
1074 record.set("payload", message.payload);
1075 result.push(record);
1076 }
1077
1078 Ok(RuntimeQueryResult {
1079 query: raw_query.to_string(),
1080 mode: QueryMode::Sql,
1081 statement: "queue_peek",
1082 engine: "runtime-queue",
1083 result,
1084 affected_rows: 0,
1085 statement_type: "select",
1086 bookmark: None,
1087 })
1088 }
1089 QueueCommand::Len { queue } => {
1090 let store = self.inner.db.store();
1091 ensure_queue_exists(store.as_ref(), queue)?;
1092 let count =
1093 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
1094 as u64;
1095 let mut result = UnifiedResult::with_columns(vec!["len".into()]);
1096 let mut record = UnifiedRecord::new();
1097 record.set("len", Value::UnsignedInteger(count));
1098 result.push(record);
1099
1100 Ok(RuntimeQueryResult {
1101 query: raw_query.to_string(),
1102 mode: QueryMode::Sql,
1103 statement: "queue_len",
1104 engine: "runtime-queue",
1105 result,
1106 affected_rows: 0,
1107 statement_type: "select",
1108 bookmark: None,
1109 })
1110 }
1111 QueueCommand::Purge { queue } => {
1112 let store = self.inner.db.store();
1113 ensure_queue_exists(store.as_ref(), queue)?;
1114 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1115 let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
1116 if count > 0 {
1117 self.invalidate_result_cache();
1118 }
1119
1120 Ok(RuntimeQueryResult::ok_message(
1121 raw_query.to_string(),
1122 &format!("{count} messages purged from queue '{queue}'"),
1123 "delete",
1124 ))
1125 }
1126 QueueCommand::GroupCreate { queue, group } => {
1127 let store = self.inner.db.store();
1128 ensure_queue_exists(store.as_ref(), queue)?;
1129 if queue_group_exists(store.as_ref(), queue, group)? {
1130 return Ok(RuntimeQueryResult::ok_message(
1131 raw_query.to_string(),
1132 &format!(
1133 "consumer group '{}' already exists on queue '{}'",
1134 group, queue
1135 ),
1136 "create",
1137 ));
1138 }
1139 save_queue_group(store.as_ref(), queue, group)?;
1140 self.invalidate_result_cache();
1141
1142 Ok(RuntimeQueryResult::ok_message(
1143 raw_query.to_string(),
1144 &format!("consumer group '{}' created on queue '{}'", group, queue),
1145 "create",
1146 ))
1147 }
1148 QueueCommand::GroupRead {
1149 queue,
1150 group,
1151 consumer,
1152 count,
1153 wait_ms,
1154 } => {
1155 let store = self.inner.db.store();
1156 ensure_queue_exists(store.as_ref(), queue)?;
1157 if let Some(ms) = *wait_ms {
1163 if self.current_xid().is_some() {
1164 return Err(RedDBError::Query(
1165 "QUEUE READ … WAIT is autocommit-only: refusing to park inside an explicit transaction (BEGIN/COMMIT)"
1166 .to_string(),
1167 ));
1168 }
1169 let cap =
1170 self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
1171 if ms > cap {
1172 return Err(RedDBError::Query(format!(
1173 "QUEUE READ … WAIT {ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
1174 )));
1175 }
1176 }
1177 let config = load_queue_config(store.as_ref(), queue);
1181 let group_owned =
1182 resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
1183 let group_ref = group_owned.as_str();
1184 let delivered = self
1185 .group_read_with_optional_wait(queue, group_ref, consumer, *count, *wait_ms)?;
1186
1187 {
1191 let lease_count = u32::try_from(delivered.len()).unwrap_or(u32::MAX);
1192 let now_ns = std::time::SystemTime::now()
1193 .duration_since(std::time::UNIX_EPOCH)
1194 .map(|d| d.as_nanos() as u64)
1195 .unwrap_or(0);
1196 self.queue_presence().heartbeat(
1197 queue,
1198 group_ref,
1199 consumer,
1200 lease_count,
1201 now_ns,
1202 );
1203 }
1204
1205 let mut result = UnifiedResult::with_columns(vec![
1206 "message_id".into(),
1207 "payload".into(),
1208 "consumer".into(),
1209 "delivery_count".into(),
1210 "attempts".into(),
1211 ]);
1212
1213 for message in delivered {
1214 let mut record = UnifiedRecord::new();
1215 record.set(
1216 "message_id",
1217 Value::text(message_id_string(EntityId::new(message.message_id))),
1218 );
1219 record.set("payload", message.payload);
1220 record.set("consumer", Value::text(message.consumer));
1221 record.set(
1222 "delivery_count",
1223 Value::UnsignedInteger(u64::from(message.delivery_count)),
1224 );
1225 record.set(
1226 "attempts",
1227 Value::UnsignedInteger(u64::from(message.delivery_count)),
1228 );
1229 result.push(record);
1230 }
1231 if !result.records.is_empty() {
1232 self.invalidate_result_cache();
1233 }
1234
1235 Ok(RuntimeQueryResult {
1236 query: raw_query.to_string(),
1237 mode: QueryMode::Sql,
1238 statement: "queue_group_read",
1239 engine: "runtime-queue",
1240 result,
1241 affected_rows: 0,
1242 statement_type: "select",
1243 bookmark: None,
1244 })
1245 }
1246 QueueCommand::Pending { queue, group } => {
1247 let store = self.inner.db.store();
1248 ensure_queue_exists(store.as_ref(), queue)?;
1249 require_queue_group(store.as_ref(), queue, group)?;
1250 let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
1251 pending.sort_by_key(|entry| entry.delivered_at_ns);
1252 let current_time_ns = now_ns();
1253
1254 let mut result = UnifiedResult::with_columns(vec![
1255 "message_id".into(),
1256 "consumer".into(),
1257 "delivered_at_ns".into(),
1258 "delivery_count".into(),
1259 "idle_ms".into(),
1260 ]);
1261 for entry in pending {
1262 let mut record = UnifiedRecord::new();
1263 record.set(
1264 "message_id",
1265 Value::text(message_id_string(entry.message_id)),
1266 );
1267 record.set("consumer", Value::text(entry.consumer));
1268 record.set(
1269 "delivered_at_ns",
1270 Value::UnsignedInteger(entry.delivered_at_ns),
1271 );
1272 record.set(
1273 "delivery_count",
1274 Value::UnsignedInteger(u64::from(entry.delivery_count)),
1275 );
1276 record.set(
1277 "idle_ms",
1278 Value::UnsignedInteger(
1279 current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
1280 ),
1281 );
1282 result.push(record);
1283 }
1284
1285 Ok(RuntimeQueryResult {
1286 query: raw_query.to_string(),
1287 mode: QueryMode::Sql,
1288 statement: "queue_pending",
1289 engine: "runtime-queue",
1290 result,
1291 affected_rows: 0,
1292 statement_type: "select",
1293 bookmark: None,
1294 })
1295 }
1296 QueueCommand::Claim {
1297 queue,
1298 group,
1299 consumer,
1300 min_idle_ms,
1301 } => {
1302 let store = self.inner.db.store();
1303 ensure_queue_exists(store.as_ref(), queue)?;
1304 require_queue_group(store.as_ref(), queue, group)?;
1305 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1306 let delivered = lifecycle
1307 .claim_delivering(queue, consumer, *min_idle_ms, &txn)
1308 .map_err(map_qse)?;
1309
1310 let mut result = UnifiedResult::with_columns(vec![
1311 "message_id".into(),
1312 "payload".into(),
1313 "consumer".into(),
1314 "delivery_count".into(),
1315 ]);
1316
1317 for message in delivered {
1318 let mut record = UnifiedRecord::new();
1319 record.set(
1320 "message_id",
1321 Value::text(message_id_string(EntityId::new(message.message_id))),
1322 );
1323 record.set("payload", message.payload);
1324 record.set("consumer", Value::text(message.consumer));
1325 record.set(
1326 "delivery_count",
1327 Value::UnsignedInteger(u64::from(message.delivery_count)),
1328 );
1329 result.push(record);
1330 }
1331 if !result.records.is_empty() {
1332 self.invalidate_result_cache();
1333 }
1334 let affected_rows = result.records.len() as u64;
1335
1336 Ok(RuntimeQueryResult {
1337 query: raw_query.to_string(),
1338 mode: QueryMode::Sql,
1339 statement: "queue_claim",
1340 engine: "runtime-queue",
1341 result,
1342 affected_rows,
1343 statement_type: "update",
1344 bookmark: None,
1345 })
1346 }
1347 QueueCommand::Ack {
1348 queue,
1349 group,
1350 message_id,
1351 delivery_id,
1352 } => {
1353 let store = self.inner.db.store();
1354 ensure_queue_exists(store.as_ref(), queue)?;
1355 let (group_owned, message_entity) = resolve_ack_nack_handle(
1356 store.as_ref(),
1357 queue,
1358 group,
1359 message_id,
1360 delivery_id.as_deref(),
1361 )?;
1362 let group_ref = group_owned.as_str();
1363 require_queue_group(store.as_ref(), queue, group_ref)?;
1364 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1365 let did = match delivery_id.as_deref() {
1366 Some(d) => d.to_string(),
1367 None => ps
1368 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1369 .ok_or_else(|| {
1370 RedDBError::NotFound(format!(
1371 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1372 message_entity.raw(),
1373 queue,
1374 group_ref
1375 ))
1376 })?,
1377 };
1378 lifecycle.ack(&txn, &did).map_err(map_qse)?;
1379 self.invalidate_result_cache();
1380
1381 Ok(RuntimeQueryResult::ok_message(
1382 raw_query.to_string(),
1383 "message acknowledged",
1384 "update",
1385 ))
1386 }
1387 QueueCommand::Nack {
1388 queue,
1389 group,
1390 message_id,
1391 delivery_id,
1392 delay_ms,
1393 } => {
1394 let store = self.inner.db.store();
1395 ensure_queue_exists(store.as_ref(), queue)?;
1396 let config = load_queue_config(store.as_ref(), queue);
1397 if delay_ms.is_some() {
1403 if let Some((_, role)) = current_auth_identity() {
1404 if !role.can_write() {
1405 return Err(RedDBError::InvalidOperation(format!(
1406 "role '{role}' is not authorized to override NACK retry delay on queue '{queue}'"
1407 )));
1408 }
1409 }
1410 }
1411 let (group_owned, message_entity) = resolve_ack_nack_handle(
1412 store.as_ref(),
1413 queue,
1414 group,
1415 message_id,
1416 delivery_id.as_deref(),
1417 )?;
1418 let group_ref = group_owned.as_str();
1419 require_queue_group(store.as_ref(), queue, group_ref)?;
1420 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1421 let did = match delivery_id.as_deref() {
1422 Some(d) => d.to_string(),
1423 None => ps
1424 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1425 .ok_or_else(|| {
1426 RedDBError::NotFound(format!(
1427 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1428 message_entity.raw(),
1429 queue,
1430 group_ref
1431 ))
1432 })?,
1433 };
1434 let effective_delay_ms = delay_ms.or(config.retry_delay_ms).unwrap_or(0);
1438 let outcome = lifecycle.nack(&txn, &did).map_err(map_qse)?;
1439 if matches!(outcome, RetirementOutcome::Requeued) && effective_delay_ms > 0 {
1443 let at_ns =
1444 now_ns().saturating_add(effective_delay_ms.saturating_mul(1_000_000));
1445 set_message_available_at_ns(
1446 store.as_ref(),
1447 queue,
1448 message_entity,
1449 Some(at_ns),
1450 config.ttl_ms,
1451 )?;
1452 }
1453 self.maybe_emit_nack_audit(
1460 queue,
1461 group_ref,
1462 &did,
1463 *delay_ms,
1464 config.retry_delay_ms,
1465 &outcome,
1466 );
1467 let message = match outcome {
1468 RetirementOutcome::Requeued => {
1469 if effective_delay_ms > 0 {
1470 format!("message requeued (delay={effective_delay_ms}ms)")
1471 } else {
1472 "message requeued".to_string()
1473 }
1474 }
1475 RetirementOutcome::MovedToDlq(dlq) => {
1476 format!("message moved to dead-letter queue '{}'", dlq)
1477 }
1478 RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
1479 };
1480 self.invalidate_result_cache();
1481
1482 Ok(RuntimeQueryResult::ok_message(
1483 raw_query.to_string(),
1484 &message,
1485 "update",
1486 ))
1487 }
1488 QueueCommand::Move {
1489 source,
1490 destination,
1491 filter,
1492 limit,
1493 } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
1494 }
1495 }
1496
1497 pub fn execute_queue_select(
1498 &self,
1499 raw_query: &str,
1500 query: &QueueSelectQuery,
1501 ) -> RedDBResult<RuntimeQueryResult> {
1502 let store = self.inner.db.store();
1503 ensure_queue_exists(store.as_ref(), &query.queue)?;
1504 let config = load_queue_config(store.as_ref(), &query.queue);
1505 let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
1506 let columns = if query.columns.is_empty() {
1507 queue_projection_default_columns()
1508 } else {
1509 query.columns.clone()
1510 };
1511
1512 let mut messages =
1513 load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1514 sort_queue_messages(&mut messages, &config, QueueSide::Left);
1515
1516 let mut result = UnifiedResult::with_columns(columns.clone());
1517 for message in messages {
1518 if query
1519 .filter
1520 .as_ref()
1521 .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1522 {
1523 continue;
1524 }
1525 let record = queue_projection_record(&columns, &message, dlq)?;
1526 result.push(record);
1527 if query
1528 .limit
1529 .is_some_and(|limit| result.records.len() >= limit as usize)
1530 {
1531 break;
1532 }
1533 }
1534
1535 Ok(RuntimeQueryResult {
1536 query: raw_query.to_string(),
1537 mode: QueryMode::Sql,
1538 statement: "queue_select",
1539 engine: "runtime-queue",
1540 result,
1541 affected_rows: 0,
1542 statement_type: "select",
1543 bookmark: None,
1544 })
1545 }
1546
1547 fn execute_queue_move(
1548 &self,
1549 raw_query: &str,
1550 source: &str,
1551 destination: &str,
1552 filter: Option<&Filter>,
1553 limit: usize,
1554 ) -> RedDBResult<RuntimeQueryResult> {
1555 if source == destination {
1556 return Err(RedDBError::Query(
1557 "QUEUE MOVE source and destination must be different".to_string(),
1558 ));
1559 }
1560 let store = self.inner.db.store();
1561 ensure_queue_exists(store.as_ref(), source)?;
1562 ensure_queue_exists(store.as_ref(), destination)?;
1563 let source_config = load_queue_config(store.as_ref(), source);
1564 let destination_config = load_queue_config(store.as_ref(), destination);
1565 let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1566
1567 let mut messages =
1568 load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1569 sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1570 let selected = messages
1571 .into_iter()
1572 .filter(|message| {
1573 filter
1574 .map(|f| queue_message_matches_filter(message, source_dlq, f))
1575 .unwrap_or(true)
1576 })
1577 .take(limit)
1578 .collect::<Vec<_>>();
1579
1580 if let Some(max_size) = destination_config.max_size {
1581 let current_len =
1582 load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1583 .len();
1584 if current_len + selected.len() > max_size {
1585 return Err(RedDBError::Query(format!(
1586 "queue '{}' is full (max_size={max_size})",
1587 destination
1588 )));
1589 }
1590 }
1591
1592 for message in &selected {
1593 let lock = queue_message_lock_handle(self, source, message.id);
1594 let Some(_guard) = lock.try_lock() else {
1595 return Err(RedDBError::Query(format!(
1596 "message '{}' is locked on queue '{}'",
1597 message.id.raw(),
1598 source
1599 )));
1600 };
1601 if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1602 return Err(RedDBError::Query(format!(
1603 "message '{}' is no longer available on queue '{}'",
1604 message.id.raw(),
1605 source
1606 )));
1607 }
1608 }
1609
1610 let mut inserted = Vec::new();
1611 for message in &selected {
1612 match insert_moved_queue_message(
1613 store.as_ref(),
1614 destination,
1615 &destination_config,
1616 message,
1617 ) {
1618 Ok(id) => inserted.push(id),
1619 Err(err) => {
1620 for id in inserted {
1621 let _ = store.delete(destination, id);
1622 }
1623 return Err(err);
1624 }
1625 }
1626 }
1627
1628 let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1629 for message in &selected {
1630 move_lifecycle
1631 .delete_with_state(source, message.id.raw(), &move_txn)
1632 .map_err(map_qse)?;
1633 }
1634 if !selected.is_empty() {
1635 self.invalidate_result_cache();
1636 }
1637
1638 let selected_count = selected.len() as u64;
1639 self.audit_log().record_event(
1640 AuditEvent::builder("queue/move")
1641 .source(AuditAuthSource::System)
1642 .outcome(Outcome::Success)
1643 .resource(format!("queue:{source}->{destination}"))
1644 .fields([
1645 AuditFieldEscaper::field("source", source),
1646 AuditFieldEscaper::field("destination", destination),
1647 AuditFieldEscaper::field("selected", selected_count),
1648 AuditFieldEscaper::field("committed", selected_count),
1649 ])
1650 .build(),
1651 );
1652
1653 let mut result = UnifiedResult::with_columns(vec![
1654 "source".into(),
1655 "destination".into(),
1656 "selected".into(),
1657 "committed".into(),
1658 ]);
1659 let mut record = UnifiedRecord::new();
1660 record.set("source", Value::text(source.to_string()));
1661 record.set("destination", Value::text(destination.to_string()));
1662 record.set("selected", Value::UnsignedInteger(selected_count));
1663 record.set("committed", Value::UnsignedInteger(selected_count));
1664 result.push(record);
1665
1666 Ok(RuntimeQueryResult {
1667 query: raw_query.to_string(),
1668 mode: QueryMode::Sql,
1669 statement: "queue_move",
1670 engine: "runtime-queue",
1671 result,
1672 affected_rows: selected_count,
1673 statement_type: "update",
1674 bookmark: None,
1675 })
1676 }
1677
1678 fn maybe_emit_nack_audit(
1693 &self,
1694 queue: &str,
1695 group: &str,
1696 delivery_id: &str,
1697 override_ms: Option<u64>,
1698 default_ms: Option<u64>,
1699 outcome: &RetirementOutcome,
1700 ) {
1701 let Some(override_ms) = override_ms else {
1702 return;
1703 };
1704 let outcome_label = match outcome {
1705 RetirementOutcome::Requeued => "requeued",
1706 RetirementOutcome::MovedToDlq(_) => "dlq",
1707 RetirementOutcome::Dropped => "dropped",
1708 };
1709 const SIGNIFICANT_DELAY_MS: u64 = 60_000;
1710 let destination_changed = !matches!(outcome, RetirementOutcome::Requeued);
1711 if override_ms < SIGNIFICANT_DELAY_MS && !destination_changed {
1712 return;
1713 }
1714 self.audit_log().record_event(
1715 AuditEvent::builder("queue/nack/override")
1716 .source(AuditAuthSource::System)
1717 .outcome(Outcome::Success)
1718 .resource(format!("queue:{queue}"))
1719 .fields([
1720 AuditFieldEscaper::field("queue", queue),
1721 AuditFieldEscaper::field("group", group),
1722 AuditFieldEscaper::field("delivery_id", delivery_id),
1723 AuditFieldEscaper::field("override_delay_ms", override_ms),
1724 AuditFieldEscaper::field("default_delay_ms", default_ms.unwrap_or(0)),
1725 AuditFieldEscaper::field("outcome", outcome_label),
1726 ])
1727 .build(),
1728 );
1729 }
1730}
1731
1732fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1733 if store.get_collection(queue).is_some() {
1734 Ok(())
1735 } else {
1736 Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1737 }
1738}
1739
1740pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1741 let default = QueueRuntimeConfig {
1742 mode: QueueMode::Work,
1743 priority: false,
1744 max_size: None,
1745 ttl_ms: None,
1746 dlq: None,
1747 max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1748 lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1749 in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1750 retry_delay_ms: None,
1751 };
1752
1753 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1754 return default;
1755 };
1756 manager
1757 .query_all(|entity| {
1758 entity.data.as_row().is_some_and(|row| {
1759 row_text(row, "kind").as_deref() == Some("queue_config")
1760 && row_text(row, "queue").as_deref() == Some(queue)
1761 })
1762 })
1763 .into_iter()
1764 .find_map(|entity| {
1765 let row = entity.data.as_row()?;
1766 Some(QueueRuntimeConfig {
1767 mode: row_text(row, "mode")
1768 .as_deref()
1769 .and_then(QueueMode::parse)
1770 .unwrap_or_default(),
1771 priority: row_bool(row, "priority").unwrap_or(false),
1772 max_size: row_u64(row, "max_size").map(|value| value as usize),
1773 ttl_ms: row_u64(row, "ttl_ms"),
1774 dlq: row_text(row, "dlq"),
1775 max_attempts: row_u64(row, "max_attempts")
1776 .map(|value| value as u32)
1777 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1778 lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1779 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1780 in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1781 .map(|value| value as u32)
1782 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1783 retry_delay_ms: row_u64(row, "retry_delay_ms").filter(|v| *v > 0),
1784 })
1785 })
1786 .unwrap_or(default)
1787}
1788
1789pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1790 load_queue_config(store, queue).mode.as_str()
1791}
1792
1793fn save_queue_config(
1794 store: &UnifiedStore,
1795 queue: &str,
1796 config: &QueueRuntimeConfig,
1797) -> RedDBResult<()> {
1798 remove_meta_rows(store, |row| {
1799 row_text(row, "kind").as_deref() == Some("queue_config")
1800 && row_text(row, "queue").as_deref() == Some(queue)
1801 });
1802
1803 let mut fields = HashMap::new();
1804 fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1805 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1806 fields.insert(
1807 "mode".to_string(),
1808 Value::text(config.mode.as_str().to_string()),
1809 );
1810 fields.insert("priority".to_string(), Value::Boolean(config.priority));
1811 fields.insert(
1812 "max_size".to_string(),
1813 config
1814 .max_size
1815 .map(|value| Value::UnsignedInteger(value as u64))
1816 .unwrap_or(Value::Null),
1817 );
1818 fields.insert(
1819 "ttl_ms".to_string(),
1820 config
1821 .ttl_ms
1822 .map(Value::UnsignedInteger)
1823 .unwrap_or(Value::Null),
1824 );
1825 fields.insert(
1826 "dlq".to_string(),
1827 config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1828 );
1829 fields.insert(
1830 "max_attempts".to_string(),
1831 Value::UnsignedInteger(u64::from(config.max_attempts)),
1832 );
1833 fields.insert(
1834 "lock_deadline_ms".to_string(),
1835 Value::UnsignedInteger(config.lock_deadline_ms),
1836 );
1837 fields.insert(
1838 "in_flight_cap_per_group".to_string(),
1839 Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1840 );
1841 fields.insert(
1842 "retry_delay_ms".to_string(),
1843 config
1844 .retry_delay_ms
1845 .map(Value::UnsignedInteger)
1846 .unwrap_or(Value::Null),
1847 );
1848 insert_meta_row(store, fields)
1849}
1850
1851fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1852 remove_meta_rows(store, |row| {
1853 row_text(row, "queue").as_deref() == Some(queue)
1854 });
1855}
1856
1857fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1858 Ok(load_queue_groups(store, queue)?
1859 .into_iter()
1860 .any(|entry| entry.group == group))
1861}
1862
1863pub(super) fn require_queue_group(
1864 store: &UnifiedStore,
1865 queue: &str,
1866 group: &str,
1867) -> RedDBResult<()> {
1868 if queue_group_exists(store, queue, group)? {
1869 Ok(())
1870 } else {
1871 Err(RedDBError::NotFound(format!(
1872 "consumer group '{}' not found on queue '{}'",
1873 group, queue
1874 )))
1875 }
1876}
1877
1878pub(super) fn resolve_read_group(
1879 store: &UnifiedStore,
1880 queue: &str,
1881 group: Option<&str>,
1882 consumer: &str,
1883 config: &QueueRuntimeConfig,
1884) -> RedDBResult<String> {
1885 if let Some(group) = group {
1886 require_queue_group(store, queue, group)?;
1887 return Ok(group.to_string());
1888 }
1889
1890 match config.mode {
1891 QueueMode::Work => {
1892 if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1893 save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1894 }
1895 Ok(WORK_DEFAULT_GROUP.to_string())
1896 }
1897 QueueMode::Fanout => {
1898 let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1899 if !queue_group_exists(store, queue, &fanout_group)? {
1900 save_queue_group(store, queue, &fanout_group)?;
1901 }
1902 Ok(fanout_group)
1903 }
1904 }
1905}
1906
1907fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1908 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1909 return Ok(Vec::new());
1910 };
1911 Ok(manager
1912 .query_all(|entity| {
1913 entity.data.as_row().is_some_and(|row| {
1914 row_text(row, "kind").as_deref() == Some("queue_group")
1915 && row_text(row, "queue").as_deref() == Some(queue)
1916 })
1917 })
1918 .into_iter()
1919 .filter_map(|entity| {
1920 let row = entity.data.as_row()?;
1921 Some(QueueGroupEntry {
1922 entity_id: entity.id,
1923 group: row_text(row, "group")?,
1924 })
1925 })
1926 .collect())
1927}
1928
1929fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1930 let mut fields = HashMap::new();
1931 fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1932 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1933 fields.insert("group".to_string(), Value::text(group.to_string()));
1934 fields.insert(
1935 "created_at_ns".to_string(),
1936 Value::UnsignedInteger(now_ns()),
1937 );
1938 insert_meta_row(store, fields)
1939}
1940
1941pub(super) fn load_pending_entries(
1942 store: &UnifiedStore,
1943 queue: &str,
1944 group: Option<&str>,
1945 message_id: Option<EntityId>,
1946) -> RedDBResult<Vec<QueuePendingEntry>> {
1947 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1948 return Ok(Vec::new());
1949 };
1950 Ok(manager
1951 .query_all(|entity| {
1952 entity.data.as_row().is_some_and(|row| {
1953 row_text(row, "kind").as_deref() == Some("queue_pending")
1954 && row_text(row, "queue").as_deref() == Some(queue)
1955 && group
1956 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1957 .unwrap_or(true)
1958 && message_id
1959 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1960 .unwrap_or(true)
1961 })
1962 })
1963 .into_iter()
1964 .filter_map(|entity| {
1965 let row = entity.data.as_row()?;
1966 Some(QueuePendingEntry {
1967 entity_id: entity.id,
1968 group: row_text(row, "group")?,
1969 message_id: EntityId::new(row_u64(row, "message_id")?),
1970 consumer: row_text(row, "consumer")?,
1971 delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1972 delivery_count: row_u64(row, "delivery_count")
1973 .map(|value| value as u32)
1974 .unwrap_or(1),
1975 })
1976 })
1977 .collect())
1978}
1979
1980pub(super) fn save_queue_pending(
1981 store: &UnifiedStore,
1982 queue: &str,
1983 group: &str,
1984 message_id: EntityId,
1985 consumer: &str,
1986 delivered_at_ns: u64,
1987 delivery_count: u32,
1988) -> RedDBResult<()> {
1989 remove_meta_rows(store, |row| {
1990 row_text(row, "kind").as_deref() == Some("queue_pending")
1991 && row_text(row, "queue").as_deref() == Some(queue)
1992 && row_text(row, "group").as_deref() == Some(group)
1993 && row_u64(row, "message_id") == Some(message_id.raw())
1994 });
1995
1996 let mut fields = HashMap::new();
1997 fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1998 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1999 fields.insert("group".to_string(), Value::text(group.to_string()));
2000 fields.insert(
2001 "message_id".to_string(),
2002 Value::UnsignedInteger(message_id.raw()),
2003 );
2004 fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
2005 fields.insert(
2006 "delivered_at_ns".to_string(),
2007 Value::UnsignedInteger(delivered_at_ns),
2008 );
2009 fields.insert(
2010 "delivery_count".to_string(),
2011 Value::UnsignedInteger(u64::from(delivery_count)),
2012 );
2013 insert_meta_row(store, fields)
2014}
2015
2016pub(super) fn require_pending_entry(
2017 store: &UnifiedStore,
2018 queue: &str,
2019 group: &str,
2020 message_id: EntityId,
2021) -> RedDBResult<QueuePendingEntry> {
2022 load_pending_entries(store, queue, Some(group), Some(message_id))?
2023 .into_iter()
2024 .next()
2025 .ok_or_else(|| {
2026 RedDBError::NotFound(format!(
2027 "message '{}' is not pending in group '{}' on queue '{}'",
2028 message_id.raw(),
2029 group,
2030 queue
2031 ))
2032 })
2033}
2034
2035pub(super) fn load_ack_entries(
2036 store: &UnifiedStore,
2037 queue: &str,
2038 group: Option<&str>,
2039 message_id: Option<EntityId>,
2040) -> RedDBResult<Vec<QueueAckEntry>> {
2041 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2042 return Ok(Vec::new());
2043 };
2044 Ok(manager
2045 .query_all(|entity| {
2046 entity.data.as_row().is_some_and(|row| {
2047 row_text(row, "kind").as_deref() == Some("queue_ack")
2048 && row_text(row, "queue").as_deref() == Some(queue)
2049 && group
2050 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2051 .unwrap_or(true)
2052 && message_id
2053 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2054 .unwrap_or(true)
2055 })
2056 })
2057 .into_iter()
2058 .filter_map(|entity| {
2059 let row = entity.data.as_row()?;
2060 Some(QueueAckEntry {
2061 entity_id: entity.id,
2062 group: row_text(row, "group")?,
2063 message_id: EntityId::new(row_u64(row, "message_id")?),
2064 })
2065 })
2066 .collect())
2067}
2068
2069pub(super) fn save_queue_ack(
2070 store: &UnifiedStore,
2071 queue: &str,
2072 group: &str,
2073 message_id: EntityId,
2074) -> RedDBResult<()> {
2075 let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
2076 if !existing.is_empty() {
2077 return Ok(());
2078 }
2079
2080 let mut fields = HashMap::new();
2081 fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
2082 fields.insert("queue".to_string(), Value::text(queue.to_string()));
2083 fields.insert("group".to_string(), Value::text(group.to_string()));
2084 fields.insert(
2085 "message_id".to_string(),
2086 Value::UnsignedInteger(message_id.raw()),
2087 );
2088 fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
2089 insert_meta_row(store, fields)
2090}
2091
2092pub(super) fn queue_message_completed_for_all_groups(
2093 store: &UnifiedStore,
2094 queue: &str,
2095 message_id: EntityId,
2096) -> RedDBResult<bool> {
2097 let groups = load_queue_groups(store, queue)?;
2098 let pending = load_pending_entries(store, queue, None, Some(message_id))?;
2099 if !pending.is_empty() {
2100 return Ok(false);
2101 }
2102 if groups.is_empty() {
2103 return Ok(true);
2104 }
2105
2106 let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
2107 .into_iter()
2108 .map(|entry| entry.group)
2109 .collect::<HashSet<_>>();
2110 Ok(groups
2111 .into_iter()
2112 .all(|group| acked_groups.contains(&group.group)))
2113}
2114
2115fn load_queue_message_views(
2116 store: &UnifiedStore,
2117 queue: &str,
2118) -> RedDBResult<Vec<QueueMessageView>> {
2119 load_queue_message_views_with_runtime(None, store, queue)
2120}
2121
2122pub(super) fn load_queue_message_views_with_runtime(
2129 runtime: Option<&RedDBRuntime>,
2130 store: &UnifiedStore,
2131 queue: &str,
2132) -> RedDBResult<Vec<QueueMessageView>> {
2133 let manager = store
2134 .get_collection(queue)
2135 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
2136 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
2140 let rls_filter = runtime.and_then(|rt| {
2141 crate::runtime::impl_core::rls_policy_filter_for_kind(
2142 rt,
2143 queue,
2144 crate::storage::query::ast::PolicyAction::Select,
2145 crate::storage::query::ast::PolicyTargetKind::Messages,
2146 )
2147 });
2148 let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
2149 && rls_filter.is_none()
2150 && runtime.is_some();
2151 if rls_enabled_but_denied {
2152 return Ok(Vec::new());
2154 }
2155 let filter_arc = rls_filter.map(std::sync::Arc::new);
2156 let rt_arc = runtime;
2157 Ok(manager
2158 .query_all(move |entity| {
2159 if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
2160 return false;
2161 }
2162 if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
2163 return false;
2164 }
2165 if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
2166 return crate::runtime::query_exec::evaluate_entity_filter_with_db(
2167 Some(&rt.inner.db),
2168 entity,
2169 filter,
2170 queue,
2171 queue,
2172 );
2173 }
2174 true
2175 })
2176 .into_iter()
2177 .filter_map(queue_message_view_from_entity)
2178 .map(|mut view| {
2179 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2180 view
2181 })
2182 .collect())
2183}
2184
2185fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
2186 let (position, _) = match &entity.kind {
2187 EntityKind::QueueMessage { position, queue } => (*position, queue),
2188 _ => return None,
2189 };
2190 let data = match entity.data {
2191 EntityData::QueueMessage(data) => data,
2192 _ => return None,
2193 };
2194 Some(QueueMessageView {
2195 id: entity.id,
2196 position,
2197 priority: data.priority.unwrap_or(0),
2198 payload: data.payload,
2199 attempts: data.attempts,
2200 max_attempts: data.max_attempts,
2201 enqueued_at_ns: data.enqueued_at_ns,
2202 available_at_ns: None,
2203 })
2204}
2205
2206pub(super) fn insert_moved_queue_message_payload(
2213 store: &UnifiedStore,
2214 queue: &str,
2215 payload: &Value,
2216) -> RedDBResult<EntityId> {
2217 let config = load_queue_config(store, queue);
2218 let position = next_queue_position(store, queue, QueueSide::Right)?;
2219 let enqueued_at_ns = std::time::SystemTime::now()
2220 .duration_since(std::time::UNIX_EPOCH)
2221 .map(|d| d.as_nanos() as u64)
2222 .unwrap_or(0);
2223 let entity = UnifiedEntity::new(
2224 EntityId::new(0),
2225 EntityKind::QueueMessage {
2226 queue: queue.to_string(),
2227 position,
2228 },
2229 EntityData::QueueMessage(QueueMessageData {
2230 payload: payload.clone(),
2231 priority: None,
2232 enqueued_at_ns,
2233 attempts: 0,
2234 max_attempts: config.max_attempts,
2235 acked: false,
2236 }),
2237 );
2238 let id = store
2239 .insert_auto(queue, entity)
2240 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2241 if let Some(ttl_ms) = config.ttl_ms {
2242 store
2243 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2244 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2245 }
2246 Ok(id)
2247}
2248
2249fn insert_moved_queue_message(
2250 store: &UnifiedStore,
2251 queue: &str,
2252 config: &QueueRuntimeConfig,
2253 message: &QueueMessageView,
2254) -> RedDBResult<EntityId> {
2255 let position = next_queue_position(store, queue, QueueSide::Right)?;
2256 let entity = UnifiedEntity::new(
2257 EntityId::new(0),
2258 EntityKind::QueueMessage {
2259 queue: queue.to_string(),
2260 position,
2261 },
2262 EntityData::QueueMessage(QueueMessageData {
2263 payload: message.payload.clone(),
2264 priority: if config.priority {
2265 Some(message.priority)
2266 } else {
2267 None
2268 },
2269 enqueued_at_ns: message.enqueued_at_ns,
2270 attempts: message.attempts,
2271 max_attempts: message.max_attempts,
2272 acked: false,
2273 }),
2274 );
2275 let id = store
2276 .insert_auto(queue, entity)
2277 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2278 if let Some(ttl_ms) = config.ttl_ms {
2279 store
2280 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2281 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2282 }
2283 Ok(id)
2284}
2285
2286fn queue_projection_default_columns() -> Vec<String> {
2287 [
2288 "id",
2289 "payload",
2290 "priority",
2291 "attempts",
2292 "last_error",
2293 "enqueued_at",
2294 "available_at",
2295 "dlq",
2296 "tenant",
2297 ]
2298 .into_iter()
2299 .map(str::to_string)
2300 .collect()
2301}
2302
2303fn queue_projection_record(
2304 columns: &[String],
2305 message: &QueueMessageView,
2306 dlq: bool,
2307) -> RedDBResult<UnifiedRecord> {
2308 let mut record = UnifiedRecord::new();
2309 for column in columns {
2310 let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
2311 RedDBError::Query(format!("unknown queue projection column '{}'", column))
2312 })?;
2313 record.set(column, value);
2314 }
2315 Ok(record)
2316}
2317
2318fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
2319 match column {
2320 "id" => Some(Value::text(message_id_string(message.id))),
2321 "payload" => Some(message.payload.clone()),
2322 "priority" => Some(Value::Integer(i64::from(message.priority))),
2323 "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
2324 "last_error" => Some(Value::Null),
2325 "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
2326 "available_at" => Some(Value::UnsignedInteger(
2327 message.available_at_ns.unwrap_or(message.enqueued_at_ns),
2328 )),
2329 "dlq" => Some(Value::Boolean(dlq)),
2330 "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
2331 _ => None,
2332 }
2333}
2334
2335fn queue_message_tenant(payload: &Value) -> Option<Value> {
2336 let Value::Json(bytes) = payload else {
2337 return None;
2338 };
2339 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2340 json.get("tenant")
2341 .and_then(crate::json::Value::as_str)
2342 .map(|tenant| Value::text(tenant.to_string()))
2343}
2344
2345fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
2346 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2347 return false;
2348 };
2349 !manager
2350 .query_all(|entity| {
2351 entity.data.as_row().is_some_and(|row| {
2352 row_text(row, "kind").as_deref() == Some("queue_config")
2353 && row_text(row, "dlq").as_deref() == Some(queue)
2354 })
2355 })
2356 .is_empty()
2357}
2358
2359fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
2360 match filter {
2361 Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
2362 .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
2363 Filter::CompareFields { left, op, right } => {
2364 match (
2365 queue_filter_field_value(message, dlq, left),
2366 queue_filter_field_value(message, dlq, right),
2367 ) {
2368 (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
2369 _ => false,
2370 }
2371 }
2372 Filter::And(left, right) => {
2373 queue_message_matches_filter(message, dlq, left)
2374 && queue_message_matches_filter(message, dlq, right)
2375 }
2376 Filter::Or(left, right) => {
2377 queue_message_matches_filter(message, dlq, left)
2378 || queue_message_matches_filter(message, dlq, right)
2379 }
2380 Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
2381 Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
2382 .is_none_or(|value| matches!(value, Value::Null)),
2383 Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
2384 .is_some_and(|value| !matches!(value, Value::Null)),
2385 Filter::In { field, values } => {
2386 queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
2387 values
2388 .iter()
2389 .any(|value| queue_values_equal(&candidate, value))
2390 })
2391 }
2392 Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
2393 .is_some_and(|candidate| {
2394 queue_compare_values(&candidate, low, CompareOp::Ge)
2395 && queue_compare_values(&candidate, high, CompareOp::Le)
2396 }),
2397 Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
2398 .is_some_and(|value| queue_like_matches(&value, pattern)),
2399 Filter::StartsWith { field, prefix } => {
2400 queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
2401 }
2402 Filter::EndsWith { field, suffix } => {
2403 queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
2404 }
2405 Filter::Contains { field, substring } => {
2406 queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
2407 }
2408 Filter::CompareExpr { .. } => false,
2409 }
2410}
2411
2412fn queue_filter_field_value(
2413 message: &QueueMessageView,
2414 dlq: bool,
2415 field: &FieldRef,
2416) -> Option<Value> {
2417 match field {
2418 FieldRef::TableColumn { table, column } if table.is_empty() => {
2419 queue_projection_value(message, dlq, column)
2420 .or_else(|| queue_payload_field_value(&message.payload, column))
2421 }
2422 FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
2423 .or_else(|| queue_payload_field_value(&message.payload, column)),
2424 _ => None,
2425 }
2426}
2427
2428fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
2429 let Value::Json(bytes) = payload else {
2430 return None;
2431 };
2432 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2433 let value = json.get(field)?;
2434 json_value_to_schema_value(value)
2435}
2436
2437fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
2438 if matches!(value, crate::json::Value::Null) {
2439 Some(Value::Null)
2440 } else if let Some(value) = value.as_bool() {
2441 Some(Value::Boolean(value))
2442 } else if let Some(value) = value.as_i64() {
2443 Some(Value::Integer(value))
2444 } else if let Some(value) = value.as_u64() {
2445 Some(Value::UnsignedInteger(value))
2446 } else if let Some(value) = value.as_f64() {
2447 Some(Value::Float(value))
2448 } else if let Some(value) = value.as_str() {
2449 Some(Value::text(value.to_string()))
2450 } else {
2451 Some(Value::Json(value.to_string_compact().into_bytes()))
2452 }
2453}
2454
2455fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
2456 queue_filter_field_value(message, dlq, field).and_then(|value| match value {
2457 Value::Text(value) => Some(value.to_string()),
2458 Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
2459 Value::Integer(value) => Some(value.to_string()),
2460 Value::UnsignedInteger(value) => Some(value.to_string()),
2461 Value::Float(value) => Some(value.to_string()),
2462 Value::Boolean(value) => Some(value.to_string()),
2463 _ => None,
2464 })
2465}
2466
2467fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
2468 match op {
2469 CompareOp::Eq => queue_values_equal(left, right),
2470 CompareOp::Ne => !queue_values_equal(left, right),
2471 CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
2472 CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
2473 CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
2474 CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
2475 }
2476}
2477
2478fn queue_values_equal(left: &Value, right: &Value) -> bool {
2479 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2480 return (left - right).abs() < f64::EPSILON;
2481 }
2482 match (left, right) {
2483 (Value::Text(left), Value::Text(right)) => left == right,
2484 (Value::Boolean(left), Value::Boolean(right)) => left == right,
2485 _ => left == right,
2486 }
2487}
2488
2489fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
2490 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2491 return left.partial_cmp(&right);
2492 }
2493 match (left, right) {
2494 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
2495 _ => None,
2496 }
2497}
2498
2499fn queue_value_number(value: &Value) -> Option<f64> {
2500 match value {
2501 Value::Integer(value) => Some(*value as f64),
2502 Value::UnsignedInteger(value) => Some(*value as f64),
2503 Value::Float(value) => Some(*value),
2504 Value::Text(value) => value.parse().ok(),
2505 _ => None,
2506 }
2507}
2508
2509fn queue_like_matches(value: &str, pattern: &str) -> bool {
2510 if pattern == "%" {
2511 return true;
2512 }
2513 let starts_wild = pattern.starts_with('%');
2514 let ends_wild = pattern.ends_with('%');
2515 let needle = pattern.trim_matches('%');
2516 match (starts_wild, ends_wild) {
2517 (true, true) => value.contains(needle),
2518 (true, false) => value.ends_with(needle),
2519 (false, true) => value.starts_with(needle),
2520 (false, false) => value == needle,
2521 }
2522}
2523
2524pub(super) fn queue_message_view_by_id(
2525 store: &UnifiedStore,
2526 queue: &str,
2527 message_id: EntityId,
2528) -> RedDBResult<Option<QueueMessageView>> {
2529 let manager = queue_manager(store, queue)?;
2530 Ok(manager
2531 .get(message_id)
2532 .and_then(queue_message_view_from_entity)
2533 .map(|mut view| {
2534 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2535 view
2536 }))
2537}
2538
2539pub(super) fn sort_queue_messages(
2540 messages: &mut [QueueMessageView],
2541 config: &QueueRuntimeConfig,
2542 side: QueueSide,
2543) {
2544 messages.sort_by(|left, right| {
2545 if config.priority {
2546 right
2547 .priority
2548 .cmp(&left.priority)
2549 .then_with(|| match side {
2550 QueueSide::Left => left.position.cmp(&right.position),
2551 QueueSide::Right => right.position.cmp(&left.position),
2552 })
2553 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2554 } else {
2555 match side {
2556 QueueSide::Left => left.position.cmp(&right.position),
2557 QueueSide::Right => right.position.cmp(&left.position),
2558 }
2559 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2560 }
2561 });
2562}
2563
2564pub(super) fn next_queue_position(
2565 store: &UnifiedStore,
2566 queue: &str,
2567 side: QueueSide,
2568) -> RedDBResult<u64> {
2569 let messages = load_queue_message_views(store, queue)?;
2570 if messages.is_empty() {
2571 return Ok(QUEUE_POSITION_CENTER);
2572 }
2573 match side {
2574 QueueSide::Left => Ok(messages
2575 .iter()
2576 .map(|message| message.position)
2577 .min()
2578 .unwrap_or(QUEUE_POSITION_CENTER)
2579 .saturating_sub(1)),
2580 QueueSide::Right => Ok(messages
2581 .iter()
2582 .map(|message| message.position)
2583 .max()
2584 .unwrap_or(QUEUE_POSITION_CENTER)
2585 .saturating_add(1)),
2586 }
2587}
2588
2589pub(super) fn increment_queue_attempts(
2590 store: &UnifiedStore,
2591 queue: &str,
2592 message_id: EntityId,
2593) -> RedDBResult<u32> {
2594 let manager = queue_manager(store, queue)?;
2595 let mut entity = manager
2596 .get(message_id)
2597 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2598 match &mut entity.data {
2599 EntityData::QueueMessage(message) => {
2600 message.attempts = message.attempts.saturating_add(1);
2601 let attempts = message.attempts;
2602 manager
2603 .update(entity)
2604 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2605 Ok(attempts)
2606 }
2607 _ => Err(RedDBError::Query(format!(
2608 "entity '{}' is not a queue message",
2609 message_id.raw()
2610 ))),
2611 }
2612}
2613
2614pub(super) fn queue_message_attempts(
2615 store: &UnifiedStore,
2616 queue: &str,
2617 message_id: EntityId,
2618) -> RedDBResult<u32> {
2619 Ok(queue_message_data(store, queue, message_id)?.attempts)
2620}
2621
2622pub(super) fn queue_message_max_attempts(
2623 store: &UnifiedStore,
2624 queue: &str,
2625 message_id: EntityId,
2626) -> RedDBResult<u32> {
2627 Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2628}
2629
2630pub(super) fn queue_message_payload(
2631 store: &UnifiedStore,
2632 queue: &str,
2633 message_id: EntityId,
2634) -> RedDBResult<Value> {
2635 Ok(queue_message_data(store, queue, message_id)?.payload)
2636}
2637
2638pub(super) fn queue_message_pending_any(
2639 store: &UnifiedStore,
2640 queue: &str,
2641 message_id: EntityId,
2642) -> RedDBResult<bool> {
2643 Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2644}
2645
2646pub(super) fn queue_message_pending_for_group(
2647 store: &UnifiedStore,
2648 queue: &str,
2649 group: &str,
2650 message_id: EntityId,
2651) -> RedDBResult<bool> {
2652 Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2653}
2654
2655pub(super) fn queue_message_acked_for_group(
2656 store: &UnifiedStore,
2657 queue: &str,
2658 group: &str,
2659 message_id: EntityId,
2660) -> RedDBResult<bool> {
2661 Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2662}
2663
2664fn queue_manager(
2665 store: &UnifiedStore,
2666 queue: &str,
2667) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2668 store
2669 .get_collection(queue)
2670 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2671}
2672
2673pub(super) fn queue_message_data(
2674 store: &UnifiedStore,
2675 queue: &str,
2676 message_id: EntityId,
2677) -> RedDBResult<QueueMessageData> {
2678 let manager = queue_manager(store, queue)?;
2679 let entity = manager
2680 .get(message_id)
2681 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2682 match entity.data {
2683 EntityData::QueueMessage(message) => Ok(message),
2684 _ => Err(RedDBError::Query(format!(
2685 "entity '{}' is not a queue message",
2686 message_id.raw()
2687 ))),
2688 }
2689}
2690
2691fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2692 let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2693 store
2694 .insert_auto(
2695 QUEUE_META_COLLECTION,
2696 UnifiedEntity::new(
2697 EntityId::new(0),
2698 EntityKind::TableRow {
2699 table: Arc::from(QUEUE_META_COLLECTION),
2700 row_id: 0,
2701 },
2702 EntityData::Row(RowData {
2703 columns: Vec::new(),
2704 named: Some(fields),
2705 schema: None,
2706 }),
2707 ),
2708 )
2709 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2710 Ok(())
2711}
2712
2713pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2714 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2715 return;
2716 };
2717 let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2718 for row in rows {
2719 let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2720 }
2721}
2722
2723pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2724 let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2725}
2726
2727fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2728 format!("{queue}:{}", message_id.raw())
2729}
2730
2731pub(super) fn queue_message_lock_handle(
2732 runtime: &RedDBRuntime,
2733 queue: &str,
2734 message_id: EntityId,
2735) -> Arc<parking_lot::Mutex<()>> {
2736 let key = queue_message_lock_key(queue, message_id);
2737 if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2738 return lock;
2739 }
2740
2741 let mut locks = runtime.inner.queue_message_locks.write();
2742 locks
2743 .entry(key)
2744 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2745 .clone()
2746}
2747
2748pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2749 runtime
2750 .inner
2751 .queue_message_locks
2752 .write()
2753 .remove(&queue_message_lock_key(queue, message_id));
2754}
2755
2756fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2757 let raw = value.strip_prefix('e').unwrap_or(value);
2758 raw.parse::<u64>()
2759 .map(EntityId::new)
2760 .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2761}
2762
2763pub(super) fn resolve_ack_nack_handle(
2769 store: &UnifiedStore,
2770 queue: &str,
2771 group_hint: &str,
2772 message_id_hint: &str,
2773 delivery_id: Option<&str>,
2774) -> RedDBResult<(String, EntityId)> {
2775 if let Some(did) = delivery_id {
2776 return resolve_delivery_id(store, queue, did);
2777 }
2778 if group_hint.is_empty() || message_id_hint.is_empty() {
2779 return Err(RedDBError::Query(
2780 "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2781 .to_string(),
2782 ));
2783 }
2784 log_tuple_deprecation(queue);
2785 let entity = parse_message_id(message_id_hint)?;
2786 Ok((group_hint.to_string(), entity))
2787}
2788
2789fn resolve_delivery_id(
2790 store: &UnifiedStore,
2791 queue: &str,
2792 delivery_id: &str,
2793) -> RedDBResult<(String, EntityId)> {
2794 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2795 return Err(RedDBError::Query(format!(
2796 "delivery_id '{}' does not resolve to a live pending delivery",
2797 delivery_id
2798 )));
2799 };
2800 for entity in manager.query_all(|entity| {
2801 entity.data.as_row().is_some_and(|row| {
2802 row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2803 && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2804 })
2805 }) {
2806 if let Some(row) = entity.data.as_row() {
2807 let row_queue = row_text(row, "queue").unwrap_or_default();
2808 let row_group = row_text(row, "group").unwrap_or_default();
2809 let row_message = row_u64(row, "message_id").unwrap_or(0);
2810 if row_queue != queue {
2811 return Err(RedDBError::Query(format!(
2812 "delivery_id '{}' belongs to queue '{}', not '{}'",
2813 delivery_id, row_queue, queue
2814 )));
2815 }
2816 return Ok((row_group, EntityId::new(row_message)));
2817 }
2818 }
2819 Err(RedDBError::Query(format!(
2820 "delivery_id '{}' does not resolve to a live pending delivery",
2821 delivery_id
2822 )))
2823}
2824
2825fn log_tuple_deprecation(queue: &str) {
2828 use std::sync::atomic::Ordering;
2829 use std::sync::{Mutex, OnceLock};
2830 use std::time::Instant;
2831
2832 static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2833 const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2834
2835 let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2836 let key = (super::impl_core::current_connection_id(), queue.to_string());
2837 let now = Instant::now();
2838 let mut guard = match map.lock() {
2839 Ok(g) => g,
2840 Err(_) => return,
2841 };
2842 let should_emit =
2843 !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2844 if should_emit {
2845 guard.insert(key.clone(), now);
2846 drop(guard);
2847 TUPLE_DEPRECATION_EMITS.fetch_add(1, Ordering::Relaxed);
2848 tracing::warn!(
2849 target: "reddb::queue_lifecycle",
2850 queue = queue,
2851 connection_id = key.0,
2852 "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2853 switch to the server-issued delivery_id (ADR 0026). \
2854 The tuple path will be removed one minor release after introduction.",
2855 );
2856 }
2857}
2858
2859pub static TUPLE_DEPRECATION_EMITS: std::sync::atomic::AtomicU64 =
2865 std::sync::atomic::AtomicU64::new(0);
2866
2867fn message_id_string(message_id: EntityId) -> String {
2868 message_id.raw().to_string()
2869}
2870
2871fn delivered_message_json(
2877 message: crate::runtime::queue_lifecycle::DeliveredMessage,
2878) -> crate::serde_json::Value {
2879 use crate::serde_json::{Map, Value as JsonValue};
2880 let mut obj = Map::new();
2881 obj.insert(
2882 "message_id".to_string(),
2883 JsonValue::String(message_id_string(EntityId::new(message.message_id))),
2884 );
2885 obj.insert(
2886 "payload".to_string(),
2887 crate::presentation::entity_json::storage_value_to_json(&message.payload),
2888 );
2889 obj.insert("consumer".to_string(), JsonValue::String(message.consumer));
2890 obj.insert(
2891 "delivery_count".to_string(),
2892 JsonValue::Number(message.delivery_count as f64),
2893 );
2894 JsonValue::Object(obj)
2895}
2896
2897pub(crate) fn pending_counts_by_group(
2902 store: &UnifiedStore,
2903) -> std::collections::BTreeMap<(String, String), u64> {
2904 let mut counts: std::collections::BTreeMap<(String, String), u64> =
2905 std::collections::BTreeMap::new();
2906 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2907 return counts;
2908 };
2909 for entity in manager.query_all(|entity| {
2910 entity
2911 .data
2912 .as_row()
2913 .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2914 }) {
2915 if let Some(row) = entity.data.as_row() {
2916 let queue = row_text(row, "queue");
2917 let group = row_text(row, "group");
2918 if let (Some(q), Some(g)) = (queue, group) {
2919 *counts.entry((q, g)).or_insert(0) += 1;
2920 }
2921 }
2922 }
2923 counts
2924}
2925
2926pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2927 match row.get_field(field)?.clone() {
2928 Value::Text(value) => Some(value.to_string()),
2929 Value::NodeRef(value) => Some(value),
2930 Value::EdgeRef(value) => Some(value),
2931 Value::TableRef(value) => Some(value),
2932 _ => None,
2933 }
2934}
2935
2936pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2937 match row.get_field(field)?.clone() {
2938 Value::UnsignedInteger(value) => Some(value),
2939 Value::Integer(value) if value >= 0 => Some(value as u64),
2940 Value::Float(value) if value >= 0.0 => Some(value as u64),
2941 Value::Text(value) => value.parse().ok(),
2942 _ => None,
2943 }
2944}
2945
2946fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2947 match row.get_field(field)?.clone() {
2948 Value::Boolean(value) => Some(value),
2949 Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2950 "true" => Some(true),
2951 "false" => Some(false),
2952 _ => None,
2953 },
2954 _ => None,
2955 }
2956}
2957
2958fn queue_collection_contract(
2959 name: &str,
2960 priority: bool,
2961 ttl_ms: Option<u64>,
2962) -> crate::physical::CollectionContract {
2963 let now = current_unix_ms();
2964 let mut context_index_fields = Vec::new();
2965 if priority {
2966 context_index_fields.push("priority".to_string());
2967 }
2968
2969 crate::physical::CollectionContract {
2970 name: name.to_string(),
2971 declared_model: crate::catalog::CollectionModel::Queue,
2972 schema_mode: crate::catalog::SchemaMode::Dynamic,
2973 origin: crate::physical::ContractOrigin::Explicit,
2974 version: 1,
2975 created_at_unix_ms: now,
2976 updated_at_unix_ms: now,
2977 default_ttl_ms: ttl_ms,
2978 vector_dimension: None,
2979 vector_metric: None,
2980 context_index_fields,
2981 declared_columns: Vec::new(),
2982 table_def: None,
2983 timestamps_enabled: false,
2984 context_index_enabled: false,
2985 metrics_raw_retention_ms: None,
2986 metrics_rollup_policies: Vec::new(),
2987 metrics_tenant_identity: None,
2988 metrics_namespace: None,
2989 append_only: true,
2993 subscriptions: Vec::new(),
2994 analytics_config: Vec::new(),
2995 session_key: None,
2996 session_gap_ms: None,
2997 retention_duration_ms: None,
2998 analytical_storage: None,
2999 }
3000}
3001
3002fn current_unix_ms() -> u128 {
3003 std::time::SystemTime::now()
3004 .duration_since(std::time::UNIX_EPOCH)
3005 .unwrap_or_default()
3006 .as_millis()
3007}
3008
3009pub(super) fn now_ns() -> u64 {
3010 std::time::SystemTime::now()
3011 .duration_since(std::time::UNIX_EPOCH)
3012 .unwrap_or_default()
3013 .as_nanos() as u64
3014}
3015
3016pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
3017 queue_message_metadata(Some(ttl_ms), None)
3018}
3019
3020pub(super) fn queue_message_metadata(
3026 ttl_ms: Option<u64>,
3027 available_at_ns: Option<u64>,
3028) -> Metadata {
3029 let mut fields = HashMap::new();
3030 if let Some(ttl_ms) = ttl_ms {
3031 fields.insert(
3032 "_ttl_ms".to_string(),
3033 if ttl_ms <= i64::MAX as u64 {
3034 MetadataValue::Int(ttl_ms as i64)
3035 } else {
3036 MetadataValue::Timestamp(ttl_ms)
3037 },
3038 );
3039 }
3040 if let Some(at_ns) = available_at_ns {
3041 fields.insert(
3042 "_available_at_ns".to_string(),
3043 MetadataValue::Timestamp(at_ns),
3044 );
3045 }
3046 Metadata::with_fields(fields)
3047}
3048
3049pub(super) fn earliest_future_available_at(store: &UnifiedStore, queue: &str) -> Option<u64> {
3057 let now_ns = now_ns();
3058 let views = load_queue_message_views_with_runtime(None, store, queue).ok()?;
3059 views
3060 .iter()
3061 .filter_map(|v| v.available_at_ns)
3062 .filter(|at| *at > now_ns)
3063 .min()
3064}
3065
3066pub(super) fn set_message_available_at_ns(
3077 store: &UnifiedStore,
3078 queue: &str,
3079 message_id: EntityId,
3080 available_at_ns: Option<u64>,
3081 fallback_ttl_ms: Option<u64>,
3082) -> RedDBResult<()> {
3083 let existing_ttl_ms = store
3084 .get_metadata(queue, message_id)
3085 .and_then(|md| match md.get("_ttl_ms")? {
3086 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3087 MetadataValue::Timestamp(t) => Some(*t),
3088 _ => None,
3089 })
3090 .or(fallback_ttl_ms);
3091 let metadata = queue_message_metadata(existing_ttl_ms, available_at_ns);
3092 match store.set_metadata(queue, message_id, metadata) {
3093 Ok(()) => Ok(()),
3094 Err(crate::storage::StoreError::CollectionNotFound(_)) => Ok(()),
3095 Err(err) => Err(RedDBError::Internal(err.to_string())),
3096 }
3097}
3098
3099pub(super) fn read_message_available_at_ns(
3103 store: &UnifiedStore,
3104 queue: &str,
3105 message_id: EntityId,
3106) -> Option<u64> {
3107 let md = store.get_metadata(queue, message_id)?;
3108 match md.get("_available_at_ns")? {
3109 MetadataValue::Timestamp(t) => Some(*t),
3110 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3111 _ => None,
3112 }
3113}
3114
3115fn estimate_payload_bytes(payload: &Value) -> u64 {
3117 match payload {
3118 Value::Json(v) => v.len() as u64,
3119 Value::Text(s) => s.len() as u64,
3120 _ => 64,
3121 }
3122}
3123
3124#[cfg(test)]
3125mod presence_integration_tests {
3126 use super::*;
3127 use crate::storage::queue::presence::{PresenceState, DEFAULT_PRESENCE_TTL_MS};
3128 use crate::{RedDBOptions, RedDBRuntime};
3129
3130 #[test]
3134 fn queue_read_emits_consumer_presence_heartbeat() {
3135 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3136 rt.execute_query("CREATE QUEUE tasks").unwrap();
3137 rt.execute_query("QUEUE GROUP CREATE tasks workers")
3138 .unwrap();
3139 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
3140 rt.execute_query("QUEUE READ tasks GROUP workers CONSUMER w1")
3141 .unwrap();
3142
3143 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3144 assert_eq!(snap.len(), 1, "exactly one heartbeat recorded");
3145 let row = &snap[0];
3146 assert_eq!(row.queue, "tasks");
3147 assert_eq!(row.group, "workers");
3148 assert_eq!(row.consumer, "w1");
3149 assert_eq!(row.state, PresenceState::Active);
3150
3151 let counts = rt.queue_active_consumer_counts(DEFAULT_PRESENCE_TTL_MS);
3152 assert_eq!(
3153 counts[&("tasks".to_string(), "workers".to_string())],
3154 1,
3155 "active count reflects the live consumer"
3156 );
3157 }
3158
3159 #[test]
3163 fn empty_queue_read_still_heartbeats() {
3164 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3165 rt.execute_query("CREATE QUEUE empty_q").unwrap();
3166 rt.execute_query("QUEUE GROUP CREATE empty_q workers")
3167 .unwrap();
3168 rt.execute_query("QUEUE READ empty_q GROUP workers CONSUMER w1")
3170 .unwrap();
3171
3172 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3173 assert_eq!(
3174 snap.len(),
3175 1,
3176 "empty read still registers consumer presence"
3177 );
3178 assert_eq!(snap[0].state, PresenceState::Active);
3179 assert_eq!(
3180 snap[0].lease_count, 0,
3181 "no messages delivered → zero leases"
3182 );
3183 }
3184}