1use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::runtime::impl_core::{
9 clear_current_auth_identity, clear_current_tenant, current_auth_identity, current_tenant,
10 set_current_auth_identity, set_current_tenant,
11};
12use crate::runtime::queue_telemetry::NackOutcomeLabel;
13use crate::storage::queue::QueueMode;
14use crate::storage::unified::entity::{QueueMessageData, RowData};
15use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
16use crate::telemetry::operator_event::OperatorEvent;
17
18use super::*;
19
20use super::primary_queue_store::PrimaryQueueStore;
21use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
22use crate::storage::queue::lifecycle::{
23 QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
24};
25
26pub(super) fn runtime_lifecycle(
33 runtime: &RedDBRuntime,
34 queue: &str,
35) -> (
36 QueueLifecycle<PrimaryQueueStore>,
37 PrimaryQueueStore,
38 QueueTxn,
39) {
40 let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
41 let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
42 let txn = primary_for_lifecycle.new_txn();
43 let cfg = primary_for_lifecycle.lifecycle_config(queue);
44 (
45 QueueLifecycle::new(primary_for_lifecycle, cfg)
46 .with_telemetry(Arc::clone(&runtime.inner.queue_telemetry)),
47 primary_for_lookup,
48 txn,
49 )
50}
51
52pub(crate) const QUEUE_READ_WAIT_CANCELLED: &str =
58 "QUEUE READ WAIT cancelled — server shutting down";
59
60pub(crate) const QUEUE_MAX_WAIT_MS_CONFIG_KEY: &str = "red.config.queue.max_wait_ms";
64
65pub(crate) const QUEUE_MAX_WAIT_MS_DEFAULT: u64 = 60_000;
68
69#[derive(Debug)]
80pub(crate) enum RedwireWaitOutcome {
81 Delivered(Vec<crate::serde_json::Value>),
82 TimedOut,
83 Cancelled,
84}
85
86pub(super) fn queue_wait_scope() -> String {
91 crate::runtime::impl_core::current_tenant().unwrap_or_default()
92}
93
94fn with_redwire_wait_context<T>(
95 auth_identity: &Option<(String, crate::auth::Role)>,
96 tenant: &Option<String>,
97 f: impl FnOnce() -> T,
98) -> T {
99 let previous_auth = current_auth_identity();
100 let previous_tenant = current_tenant();
101 match tenant {
102 Some(t) => set_current_tenant(t.clone()),
103 None => clear_current_tenant(),
104 }
105 match auth_identity {
106 Some((username, role)) => set_current_auth_identity(username.clone(), *role),
107 None => clear_current_auth_identity(),
108 }
109 let result = f();
110 match previous_tenant {
111 Some(t) => set_current_tenant(t),
112 None => clear_current_tenant(),
113 }
114 match previous_auth {
115 Some((username, role)) => set_current_auth_identity(username, role),
116 None => clear_current_auth_identity(),
117 }
118 result
119}
120
121fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
125 use crate::storage::query::ast::QueueSide as Ast;
126 match side {
127 Ast::Left => LcQueueSide::Left,
128 Ast::Right => LcQueueSide::Right,
129 }
130}
131
132fn map_qse(err: QueueStoreError) -> RedDBError {
135 match err {
136 QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
137 "delivery_id '{id}' does not resolve to a live pending delivery"
138 )),
139 QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
140 QueueStoreError::ReplicaImmutable => {
141 RedDBError::Internal("replica QueueStore is immutable".to_string())
142 }
143 }
144}
145
146pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
153
154pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
156
157pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
159
160const OUTBOX_WARN_BYTES: u64 = 1 << 30;
162
163const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
165
166static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
168
169const QUEUE_META_COLLECTION: &str = "red_queue_meta";
170const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
171const WORK_DEFAULT_GROUP: &str = "_work_default";
172const FANOUT_GROUP_PREFIX: &str = "_fanout_";
173
174#[derive(Debug, Clone)]
175pub(super) struct QueueRuntimeConfig {
176 pub(super) mode: QueueMode,
177 pub(super) priority: bool,
178 pub(super) max_size: Option<usize>,
179 pub(super) ttl_ms: Option<u64>,
180 pub(super) dlq: Option<String>,
181 pub(super) max_attempts: u32,
182 pub(super) lock_deadline_ms: u64,
183 pub(super) in_flight_cap_per_group: u32,
184 pub(super) retry_delay_ms: Option<u64>,
189}
190
191#[derive(Debug, Clone)]
192struct QueueGroupEntry {
193 entity_id: EntityId,
194 group: String,
195}
196
197#[derive(Debug, Clone)]
198pub(super) struct QueuePendingEntry {
199 pub(super) entity_id: EntityId,
200 group: String,
201 pub(super) message_id: EntityId,
202 consumer: String,
203 pub(super) delivered_at_ns: u64,
204 pub(super) delivery_count: u32,
205}
206
207#[derive(Debug, Clone)]
208pub(super) struct QueueAckEntry {
209 entity_id: EntityId,
210 group: String,
211 pub(super) message_id: EntityId,
212}
213
214#[derive(Debug, Clone)]
215pub(super) struct QueueMessageView {
216 pub(super) id: EntityId,
217 position: u64,
218 priority: i32,
219 pub(super) payload: Value,
220 attempts: u32,
221 pub(super) max_attempts: u32,
222 enqueued_at_ns: u64,
223 pub(super) available_at_ns: Option<u64>,
227}
228
229impl QueueMessageView {
230 pub(super) fn is_available_now(&self) -> bool {
235 match self.available_at_ns {
236 Some(at) => at <= now_ns(),
237 None => true,
238 }
239 }
240}
241
242impl RedDBRuntime {
243 pub(super) fn group_read_with_optional_wait(
252 &self,
253 queue: &str,
254 group: &str,
255 consumer: &str,
256 count: usize,
257 wait_ms: Option<u64>,
258 ) -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
259 let do_read =
260 |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
261 let read_lock = runtime
267 .inner
268 .rmw_locks
269 .lock_for(queue, "__queue_group_read__");
270 let _read_guard = read_lock.lock();
271 let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
272 lifecycle
273 .group_read(&txn, queue, group, consumer, count)
274 .map_err(map_qse)
275 };
276
277 let delivered = do_read(self)?;
278 let Some(wait_ms) = wait_ms else {
279 return Ok(delivered);
280 };
281 if !delivered.is_empty() {
282 return Ok(delivered);
283 }
284 let registry = self.queue_wait_registry();
294 let scope = queue_wait_scope();
295 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
296 let telemetry = self.queue_telemetry();
297 telemetry.record_wait_started(&scope, queue);
298 let wait_start = std::time::Instant::now();
299 let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
300 let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
301 telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
302 };
303 loop {
304 let snapshot = registry.snapshot(&scope, queue);
308 let delivered = do_read(self)?;
309 if !delivered.is_empty() {
310 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
311 return Ok(delivered);
312 }
313 if registry.is_cancelled() {
314 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
315 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
316 }
317 if std::time::Instant::now() >= deadline {
318 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
319 return Ok(Vec::new());
320 }
321 let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
331 Some(at_ns) => {
332 let now_ns = now_ns();
333 if at_ns <= now_ns {
334 deadline.min(std::time::Instant::now())
336 } else {
337 let wait_ns = at_ns - now_ns;
338 let due_instant =
339 std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
340 deadline.min(due_instant)
341 }
342 }
343 None => deadline,
344 };
345 match registry.wait_until(&snapshot, park_deadline) {
346 crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
347 crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
348 if std::time::Instant::now() >= deadline {
352 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
353 return Ok(Vec::new());
354 }
355 continue;
356 }
357 crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
358 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
359 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
360 }
361 }
362 }
363 }
364
365 pub(crate) async fn redwire_queue_wait_json(
382 &self,
383 queue: &str,
384 group: Option<&str>,
385 consumer: &str,
386 count: usize,
387 wait_ms: u64,
388 auth_identity: Option<(String, crate::auth::Role)>,
389 tenant: Option<String>,
390 ) -> RedDBResult<RedwireWaitOutcome> {
391 let group_owned: RedDBResult<String> =
392 with_redwire_wait_context(&auth_identity, &tenant, || {
393 let expr = crate::storage::query::ast::QueryExpr::QueueCommand(
394 crate::storage::query::ast::QueueCommand::GroupRead {
395 queue: queue.to_string(),
396 group: group.map(str::to_string),
397 consumer: consumer.to_string(),
398 count,
399 wait_ms: Some(wait_ms),
400 },
401 );
402 self.check_query_privilege(&expr)
403 .map_err(RedDBError::Query)?;
404 let store = self.inner.db.store();
405 ensure_queue_exists(store.as_ref(), queue)?;
406 let config = load_queue_config(store.as_ref(), queue);
407 resolve_read_group(store.as_ref(), queue, group, consumer, &config)
408 });
409 let group_owned = group_owned?;
410 let group_ref = group_owned.as_str();
411
412 let do_read =
413 |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
414 with_redwire_wait_context(&auth_identity, &tenant, || {
415 let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
416 lifecycle
417 .group_read(&txn, queue, group_ref, consumer, count)
418 .map_err(map_qse)
419 })
420 };
421
422 let render = |delivered: Vec<crate::runtime::queue_lifecycle::DeliveredMessage>| {
423 RedwireWaitOutcome::Delivered(
424 delivered.into_iter().map(delivered_message_json).collect(),
425 )
426 };
427
428 let delivered = do_read(self)?;
430 if !delivered.is_empty() {
431 return Ok(render(delivered));
432 }
433
434 let registry = self.queue_wait_registry();
435 let scope = with_redwire_wait_context(&auth_identity, &tenant, queue_wait_scope);
436 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
437 let telemetry = self.queue_telemetry();
438 telemetry.record_wait_started(&scope, queue);
439 let wait_start = std::time::Instant::now();
440 tracing::debug!(
441 target: "reddb::redwire::queue_wait",
442 queue,
443 group = group_ref,
444 consumer,
445 count,
446 wait_ms,
447 scope = scope.as_str(),
448 "redwire queue wait parked"
449 );
450 let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
451 let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
452 telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
453 tracing::debug!(
454 target: "reddb::redwire::queue_wait",
455 queue,
456 group = group_ref,
457 consumer,
458 count,
459 wait_ms,
460 scope = scope.as_str(),
461 outcome = outcome.as_str(),
462 duration_ms = elapsed_ms,
463 "redwire queue wait resolved"
464 );
465 };
466 loop {
467 let waiter = registry.async_waiter(&scope, queue);
471 let delivered = do_read(self)?;
472 if !delivered.is_empty() {
473 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
474 return Ok(render(delivered));
475 }
476 if registry.is_cancelled() {
477 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
478 return Ok(RedwireWaitOutcome::Cancelled);
479 }
480 if std::time::Instant::now() >= deadline {
481 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
482 return Ok(RedwireWaitOutcome::TimedOut);
483 }
484 let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
485 Some(at_ns) => {
486 let now_ns = now_ns();
487 if at_ns <= now_ns {
488 deadline.min(std::time::Instant::now())
489 } else {
490 let wait_ns = at_ns - now_ns;
491 let due_instant =
492 std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
493 deadline.min(due_instant)
494 }
495 }
496 None => deadline,
497 };
498 match registry.wait_until_async(&waiter, park_deadline).await {
503 crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
504 crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
505 if std::time::Instant::now() >= deadline {
506 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
507 return Ok(RedwireWaitOutcome::TimedOut);
508 }
509 continue;
510 }
511 crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
512 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
513 return Ok(RedwireWaitOutcome::Cancelled);
514 }
515 }
516 }
517 }
518
519 pub(crate) fn redwire_queue_wait_cap_check(&self, wait_ms: u64) -> Result<(), String> {
528 let cap = self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
529 if wait_ms > cap {
530 Err(format!(
531 "queue-wait WAIT {wait_ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
532 ))
533 } else {
534 Ok(())
535 }
536 }
537
538 pub(crate) fn enqueue_event_payload(
539 &self,
540 queue: &str,
541 payload: Value,
542 ) -> RedDBResult<EntityId> {
543 let store = self.inner.db.store();
544 if store.get_collection(queue).is_none() {
546 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
547 }
548
549 let payload_bytes = estimate_payload_bytes(&payload);
551 let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
552
553 if outbox_bytes > OUTBOX_MAX_BYTES {
555 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
556 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
557 return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
558 }
559
560 if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
562 tracing::warn!(
563 outbox_bytes,
564 warn_threshold = OUTBOX_WARN_BYTES,
565 "event outbox approaching capacity warning threshold"
566 );
567 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
568 queue: queue.to_string(),
569 dlq: format!("{queue}_outbox_dlq"),
570 reason: "outbox_warn_bytes_exceeded".to_string(),
571 }
572 .emit_global();
573 }
574
575 let config = load_queue_config(store.as_ref(), queue);
576
577 if let Some(max_size) = config.max_size {
579 let current_len = load_queue_message_views(store.as_ref(), queue)
580 .unwrap_or_default()
581 .len();
582 if current_len >= max_size {
583 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
584 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
585 return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
586 }
587 if current_len * 10 >= max_size * 8 {
589 tracing::warn!(
590 queue = %queue,
591 size = current_len,
592 max = max_size,
593 "event target queue near capacity"
594 );
595 }
596 }
597
598 let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
599 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
600 Ok(id)
601 }
602
603 fn route_event_to_outbox_dlq(
605 &self,
606 queue: &str,
607 payload: Value,
608 reason: &str,
609 ) -> RedDBResult<EntityId> {
610 let dlq_name = format!("{queue}_outbox_dlq");
611 EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
612
613 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
614 queue: queue.to_string(),
615 dlq: dlq_name.clone(),
616 reason: reason.to_string(),
617 }
618 .emit_global();
619
620 let store = self.inner.db.store();
621 if store.get_collection(&dlq_name).is_none() {
622 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
623 }
624 let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
625 let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
626 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
627 Ok(id)
628 }
629
630 fn enqueue_event_payload_raw(
632 &self,
633 store: &UnifiedStore,
634 queue: &str,
635 config: &QueueRuntimeConfig,
636 payload: Value,
637 ) -> RedDBResult<EntityId> {
638 let position = next_queue_position(store, queue, QueueSide::Right)?;
639 let mut entity = UnifiedEntity::new(
640 EntityId::new(0),
641 EntityKind::QueueMessage {
642 queue: queue.to_string(),
643 position,
644 },
645 EntityData::QueueMessage(QueueMessageData {
646 payload,
647 priority: None,
648 enqueued_at_ns: now_ns(),
649 attempts: 0,
650 max_attempts: config.max_attempts,
651 acked: false,
652 }),
653 );
654 if let Some(xid) = self.current_xid() {
655 entity.set_xmin(xid);
656 }
657 let id = store
658 .insert_auto(queue, entity)
659 .map_err(|err| RedDBError::Internal(err.to_string()))?;
660 if let Some(ttl_ms) = config.ttl_ms {
661 store
662 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
663 .map_err(|err| RedDBError::Internal(err.to_string()))?;
664 }
665 self.invalidate_result_cache_for_table(queue);
666 Ok(id)
667 }
668
669 pub fn execute_create_queue(
670 &self,
671 raw_query: &str,
672 query: &CreateQueueQuery,
673 ) -> RedDBResult<RuntimeQueryResult> {
674 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
675 if query.dlq.as_deref() == Some(query.name.as_str()) {
676 return Err(RedDBError::Query(
677 "dead-letter queue must be different from the source queue".to_string(),
678 ));
679 }
680
681 let store = self.inner.db.store();
682 let exists = store.get_collection(&query.name).is_some();
683 if exists {
684 if query.if_not_exists {
685 return Ok(RuntimeQueryResult::ok_message(
686 raw_query.to_string(),
687 &format!("queue '{}' already exists", query.name),
688 "create",
689 ));
690 }
691 return Err(RedDBError::Query(format!(
692 "queue '{}' already exists",
693 query.name
694 )));
695 }
696
697 store
698 .create_collection(&query.name)
699 .map_err(|err| RedDBError::Internal(err.to_string()))?;
700 if let Some(ttl_ms) = query.ttl_ms {
701 self.inner
702 .db
703 .set_collection_default_ttl_ms(&query.name, ttl_ms);
704 }
705 self.inner
706 .db
707 .save_collection_contract(queue_collection_contract(
708 &query.name,
709 query.priority,
710 query.ttl_ms,
711 ))
712 .map_err(|err| RedDBError::Internal(err.to_string()))?;
713 save_queue_config(
714 store.as_ref(),
715 &query.name,
716 &QueueRuntimeConfig {
717 mode: query.mode,
718 priority: query.priority,
719 max_size: query.max_size,
720 ttl_ms: query.ttl_ms,
721 dlq: query.dlq.clone(),
722 max_attempts: query.max_attempts,
723 lock_deadline_ms: query.lock_deadline_ms,
724 in_flight_cap_per_group: query.in_flight_cap_per_group,
725 retry_delay_ms: query.retry_delay_ms,
726 },
727 )?;
728
729 if let Some(dlq) = &query.dlq {
730 if store.get_collection(dlq).is_none() {
731 store
732 .create_collection(dlq)
733 .map_err(|err| RedDBError::Internal(err.to_string()))?;
734 self.inner
735 .db
736 .save_collection_contract(queue_collection_contract(dlq, false, None))
737 .map_err(|err| RedDBError::Internal(err.to_string()))?;
738 }
739 }
740
741 self.invalidate_result_cache();
742 self.inner
743 .db
744 .persist_metadata()
745 .map_err(|err| RedDBError::Internal(err.to_string()))?;
746 let mut type_tags = Vec::new();
751 if let Some(dlq) = &query.dlq {
752 type_tags.push(format!("dlq:{}", dlq));
753 }
754 self.schema_vocabulary_apply(
755 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
756 collection: query.name.clone(),
757 columns: vec!["payload".to_string()],
758 type_tags,
759 description: None,
760 },
761 );
762
763 let mut msg = format!("queue '{}' created", query.name);
764 msg.push_str(&format!(" (mode={})", query.mode.as_str()));
765 if query.priority {
766 msg.push_str(" (priority)");
767 }
768 if let Some(max_size) = query.max_size {
769 msg.push_str(&format!(" (max_size={max_size})"));
770 }
771 if let Some(ttl_ms) = query.ttl_ms {
772 msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
773 }
774 if let Some(dlq) = &query.dlq {
775 msg.push_str(&format!(
776 " (dlq={dlq}, max_attempts={})",
777 query.max_attempts
778 ));
779 }
780
781 Ok(RuntimeQueryResult::ok_message(
782 raw_query.to_string(),
783 &msg,
784 "create",
785 ))
786 }
787
788 pub fn execute_alter_queue(
789 &self,
790 raw_query: &str,
791 query: &AlterQueueQuery,
792 ) -> RedDBResult<RuntimeQueryResult> {
793 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
794 let store = self.inner.db.store();
795 ensure_queue_exists(store.as_ref(), &query.name)?;
796
797 let mut config = load_queue_config(store.as_ref(), &query.name);
798 let mut summary: Vec<String> = Vec::new();
799
800 if let Some(new_mode) = query.mode {
801 let pending =
802 load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
803 if !pending.is_empty() {
804 tracing::warn!(
805 queue = %query.name,
806 pending_count = pending.len(),
807 new_mode = %new_mode.as_str(),
808 "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
809 new reads use {}",
810 pending.len(),
811 new_mode.as_str(),
812 );
813 }
814 config.mode = new_mode;
815 summary.push(format!("mode={}", new_mode.as_str()));
816 }
817 if let Some(max_attempts) = query.max_attempts {
818 config.max_attempts = max_attempts;
819 summary.push(format!("max_attempts={max_attempts}"));
820 }
821 if let Some(lock_deadline_ms) = query.lock_deadline_ms {
822 config.lock_deadline_ms = lock_deadline_ms;
823 summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
824 }
825 if let Some(in_flight_cap) = query.in_flight_cap_per_group {
826 config.in_flight_cap_per_group = in_flight_cap;
827 summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
828 }
829 if let Some(dlq) = &query.dlq {
830 if dlq == &query.name {
831 return Err(RedDBError::Query(
832 "dead-letter queue must be different from the source queue".to_string(),
833 ));
834 }
835 config.dlq = Some(dlq.clone());
836 summary.push(format!("dlq={dlq}"));
837 }
838 if let Some(retry_delay_ms) = query.retry_delay_ms {
839 config.retry_delay_ms = if retry_delay_ms == 0 {
840 None
841 } else {
842 Some(retry_delay_ms)
843 };
844 summary.push(format!("retry_delay_ms={retry_delay_ms}"));
845 }
846
847 save_queue_config(store.as_ref(), &query.name, &config)?;
848
849 self.invalidate_result_cache();
850 self.inner
851 .db
852 .persist_metadata()
853 .map_err(|err| RedDBError::Internal(err.to_string()))?;
854
855 Ok(RuntimeQueryResult::ok_message(
856 raw_query.to_string(),
857 &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
858 "alter",
859 ))
860 }
861
862 pub fn execute_drop_queue(
863 &self,
864 raw_query: &str,
865 query: &DropQueueQuery,
866 ) -> RedDBResult<RuntimeQueryResult> {
867 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
868 let store = self.inner.db.store();
869 if super::impl_ddl::is_system_schema_name(&query.name) {
870 return Err(RedDBError::Query("system schema is read-only".to_string()));
871 }
872 if store.get_collection(&query.name).is_none() {
873 if query.if_exists {
874 return Ok(RuntimeQueryResult::ok_message(
875 raw_query.to_string(),
876 &format!("queue '{}' does not exist", query.name),
877 "drop",
878 ));
879 }
880 return Err(RedDBError::NotFound(format!(
881 "queue '{}' not found",
882 query.name
883 )));
884 }
885 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
886 &query.name,
887 &self.inner.db.catalog_model_snapshot(),
888 )?;
889 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
890 crate::catalog::CollectionModel::Queue,
891 actual,
892 )?;
893
894 store
895 .drop_collection(&query.name)
896 .map_err(|err| RedDBError::Internal(err.to_string()))?;
897 self.inner.db.clear_collection_default_ttl_ms(&query.name);
898 self.inner
899 .db
900 .remove_collection_contract(&query.name)
901 .map_err(|err| RedDBError::Internal(err.to_string()))?;
902 remove_queue_metadata(store.as_ref(), &query.name);
903 self.invalidate_result_cache();
904 self.inner
905 .db
906 .persist_metadata()
907 .map_err(|err| RedDBError::Internal(err.to_string()))?;
908 self.schema_vocabulary_apply(
910 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
911 collection: query.name.clone(),
912 },
913 );
914
915 Ok(RuntimeQueryResult::ok_message(
916 raw_query.to_string(),
917 &format!("queue '{}' dropped", query.name),
918 "drop",
919 ))
920 }
921
922 pub fn execute_queue_command(
923 &self,
924 raw_query: &str,
925 cmd: &QueueCommand,
926 ) -> RedDBResult<RuntimeQueryResult> {
927 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
928 match cmd {
929 QueueCommand::Push {
930 queue,
931 value,
932 side,
933 priority,
934 available,
935 } => {
936 let store = self.inner.db.store();
937 ensure_queue_exists(store.as_ref(), queue)?;
938 let config = load_queue_config(store.as_ref(), queue);
939 if priority.is_some() && !config.priority {
940 return Err(RedDBError::Query(format!(
941 "queue '{}' is not a priority queue",
942 queue
943 )));
944 }
945 if let Some(max_size) = config.max_size {
946 let current_len =
947 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
948 .len();
949 if current_len >= max_size {
950 return Err(RedDBError::Query(format!(
951 "queue '{}' is full (max_size={max_size})",
952 queue
953 )));
954 }
955 }
956
957 let position = next_queue_position(store.as_ref(), queue, *side)?;
958 let mut entity = UnifiedEntity::new(
959 EntityId::new(0),
960 EntityKind::QueueMessage {
961 queue: queue.clone(),
962 position,
963 },
964 EntityData::QueueMessage(QueueMessageData {
965 payload: value.clone(),
966 priority: if config.priority { *priority } else { None },
967 enqueued_at_ns: now_ns(),
968 attempts: 0,
969 max_attempts: config.max_attempts,
970 acked: false,
971 }),
972 );
973 if let Some(xid) = self.current_xid() {
976 entity.set_xmin(xid);
977 }
978 let id = store
979 .insert_auto(queue, entity)
980 .map_err(|err| RedDBError::Internal(err.to_string()))?;
981 let available_at_ns = available.map(|a| match a {
986 crate::storage::query::ast::QueueAvailability::DelayMs(ms) => {
987 now_ns().saturating_add(ms.saturating_mul(1_000_000))
988 }
989 crate::storage::query::ast::QueueAvailability::AtUnixMs(ms) => {
990 ms.saturating_mul(1_000_000)
991 }
992 });
993 if config.ttl_ms.is_some() || available_at_ns.is_some() {
994 store
995 .set_metadata(
996 queue,
997 id,
998 queue_message_metadata(config.ttl_ms, available_at_ns),
999 )
1000 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1001 }
1002 self.record_queue_wake(&queue_wait_scope(), queue);
1007 self.invalidate_result_cache();
1008
1009 let mut result = UnifiedResult::with_columns(vec![
1010 "message_id".into(),
1011 "side".into(),
1012 "queue".into(),
1013 ]);
1014 let mut record = UnifiedRecord::new();
1015 record.set("message_id", Value::text(message_id_string(id)));
1016 record.set(
1017 "side",
1018 Value::text(match side {
1019 QueueSide::Left => "left".to_string(),
1020 QueueSide::Right => "right".to_string(),
1021 }),
1022 );
1023 record.set("queue", Value::text(queue.clone()));
1024 result.push(record);
1025
1026 Ok(RuntimeQueryResult {
1027 query: raw_query.to_string(),
1028 mode: QueryMode::Sql,
1029 statement: "queue_push",
1030 engine: "runtime-queue",
1031 result,
1032 affected_rows: 1,
1033 statement_type: "insert",
1034 bookmark: None,
1035 })
1036 }
1037 QueueCommand::Pop { queue, side, count } => {
1038 let store = self.inner.db.store();
1039 ensure_queue_exists(store.as_ref(), queue)?;
1040 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1041 let popped = lifecycle
1042 .pop(queue, ast_side_to_lc(*side), *count, &txn)
1043 .map_err(map_qse)?;
1044
1045 let mut result =
1046 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1047 for (message_id, payload) in &popped {
1048 let mut record = UnifiedRecord::new();
1049 record.set(
1050 "message_id",
1051 Value::text(message_id_string(EntityId::new(*message_id))),
1052 );
1053 record.set("payload", payload.clone());
1054 result.push(record);
1055 }
1056 let popped_count = popped.len() as u64;
1057 if popped_count > 0 {
1058 self.invalidate_result_cache();
1059 }
1060
1061 Ok(RuntimeQueryResult {
1062 query: raw_query.to_string(),
1063 mode: QueryMode::Sql,
1064 statement: "queue_pop",
1065 engine: "runtime-queue",
1066 result,
1067 affected_rows: popped_count,
1068 statement_type: "delete",
1069 bookmark: None,
1070 })
1071 }
1072 QueueCommand::Peek { queue, count } => {
1073 let store = self.inner.db.store();
1074 ensure_queue_exists(store.as_ref(), queue)?;
1075 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1076 let messages = lifecycle.peek(queue, *count, &txn);
1077
1078 let mut result =
1079 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1080 for message in messages {
1081 let mut record = UnifiedRecord::new();
1082 record.set(
1083 "message_id",
1084 Value::text(message_id_string(EntityId::new(message.message_id))),
1085 );
1086 record.set("payload", message.payload);
1087 result.push(record);
1088 }
1089
1090 Ok(RuntimeQueryResult {
1091 query: raw_query.to_string(),
1092 mode: QueryMode::Sql,
1093 statement: "queue_peek",
1094 engine: "runtime-queue",
1095 result,
1096 affected_rows: 0,
1097 statement_type: "select",
1098 bookmark: None,
1099 })
1100 }
1101 QueueCommand::Len { queue } => {
1102 let store = self.inner.db.store();
1103 ensure_queue_exists(store.as_ref(), queue)?;
1104 let count =
1105 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
1106 as u64;
1107 let mut result = UnifiedResult::with_columns(vec!["len".into()]);
1108 let mut record = UnifiedRecord::new();
1109 record.set("len", Value::UnsignedInteger(count));
1110 result.push(record);
1111
1112 Ok(RuntimeQueryResult {
1113 query: raw_query.to_string(),
1114 mode: QueryMode::Sql,
1115 statement: "queue_len",
1116 engine: "runtime-queue",
1117 result,
1118 affected_rows: 0,
1119 statement_type: "select",
1120 bookmark: None,
1121 })
1122 }
1123 QueueCommand::Purge { queue } => {
1124 let store = self.inner.db.store();
1125 ensure_queue_exists(store.as_ref(), queue)?;
1126 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1127 let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
1128 if count > 0 {
1129 self.invalidate_result_cache();
1130 }
1131
1132 Ok(RuntimeQueryResult::ok_message(
1133 raw_query.to_string(),
1134 &format!("{count} messages purged from queue '{queue}'"),
1135 "delete",
1136 ))
1137 }
1138 QueueCommand::GroupCreate { queue, group } => {
1139 let store = self.inner.db.store();
1140 ensure_queue_exists(store.as_ref(), queue)?;
1141 if queue_group_exists(store.as_ref(), queue, group)? {
1142 return Ok(RuntimeQueryResult::ok_message(
1143 raw_query.to_string(),
1144 &format!(
1145 "consumer group '{}' already exists on queue '{}'",
1146 group, queue
1147 ),
1148 "create",
1149 ));
1150 }
1151 save_queue_group(store.as_ref(), queue, group)?;
1152 self.invalidate_result_cache();
1153
1154 Ok(RuntimeQueryResult::ok_message(
1155 raw_query.to_string(),
1156 &format!("consumer group '{}' created on queue '{}'", group, queue),
1157 "create",
1158 ))
1159 }
1160 QueueCommand::GroupRead {
1161 queue,
1162 group,
1163 consumer,
1164 count,
1165 wait_ms,
1166 } => {
1167 let store = self.inner.db.store();
1168 ensure_queue_exists(store.as_ref(), queue)?;
1169 if let Some(ms) = *wait_ms {
1175 if self.current_xid().is_some() {
1176 return Err(RedDBError::Query(
1177 "QUEUE READ … WAIT is autocommit-only: refusing to park inside an explicit transaction (BEGIN/COMMIT)"
1178 .to_string(),
1179 ));
1180 }
1181 let cap =
1182 self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
1183 if ms > cap {
1184 return Err(RedDBError::Query(format!(
1185 "QUEUE READ … WAIT {ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
1186 )));
1187 }
1188 }
1189 let config = load_queue_config(store.as_ref(), queue);
1193 let group_owned =
1194 resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
1195 let group_ref = group_owned.as_str();
1196 let delivered = self
1197 .group_read_with_optional_wait(queue, group_ref, consumer, *count, *wait_ms)?;
1198
1199 {
1203 let lease_count = u32::try_from(delivered.len()).unwrap_or(u32::MAX);
1204 let now_ns = std::time::SystemTime::now()
1205 .duration_since(std::time::UNIX_EPOCH)
1206 .map(|d| d.as_nanos() as u64)
1207 .unwrap_or(0);
1208 self.queue_presence().heartbeat(
1209 queue,
1210 group_ref,
1211 consumer,
1212 lease_count,
1213 now_ns,
1214 );
1215 }
1216
1217 let mut result = UnifiedResult::with_columns(vec![
1218 "message_id".into(),
1219 "payload".into(),
1220 "consumer".into(),
1221 "delivery_count".into(),
1222 "attempts".into(),
1223 ]);
1224
1225 for message in delivered {
1226 let mut record = UnifiedRecord::new();
1227 record.set(
1228 "message_id",
1229 Value::text(message_id_string(EntityId::new(message.message_id))),
1230 );
1231 record.set("payload", message.payload);
1232 record.set("consumer", Value::text(message.consumer));
1233 record.set(
1234 "delivery_count",
1235 Value::UnsignedInteger(u64::from(message.delivery_count)),
1236 );
1237 record.set(
1238 "attempts",
1239 Value::UnsignedInteger(u64::from(message.delivery_count)),
1240 );
1241 result.push(record);
1242 }
1243 if !result.records.is_empty() {
1244 self.invalidate_result_cache();
1245 }
1246
1247 Ok(RuntimeQueryResult {
1248 query: raw_query.to_string(),
1249 mode: QueryMode::Sql,
1250 statement: "queue_group_read",
1251 engine: "runtime-queue",
1252 result,
1253 affected_rows: 0,
1254 statement_type: "select",
1255 bookmark: None,
1256 })
1257 }
1258 QueueCommand::Pending { queue, group } => {
1259 let store = self.inner.db.store();
1260 ensure_queue_exists(store.as_ref(), queue)?;
1261 require_queue_group(store.as_ref(), queue, group)?;
1262 let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
1263 pending.sort_by_key(|entry| entry.delivered_at_ns);
1264 let current_time_ns = now_ns();
1265
1266 let mut result = UnifiedResult::with_columns(vec![
1267 "message_id".into(),
1268 "consumer".into(),
1269 "delivered_at_ns".into(),
1270 "delivery_count".into(),
1271 "idle_ms".into(),
1272 ]);
1273 for entry in pending {
1274 let mut record = UnifiedRecord::new();
1275 record.set(
1276 "message_id",
1277 Value::text(message_id_string(entry.message_id)),
1278 );
1279 record.set("consumer", Value::text(entry.consumer));
1280 record.set(
1281 "delivered_at_ns",
1282 Value::UnsignedInteger(entry.delivered_at_ns),
1283 );
1284 record.set(
1285 "delivery_count",
1286 Value::UnsignedInteger(u64::from(entry.delivery_count)),
1287 );
1288 record.set(
1289 "idle_ms",
1290 Value::UnsignedInteger(
1291 current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
1292 ),
1293 );
1294 result.push(record);
1295 }
1296
1297 Ok(RuntimeQueryResult {
1298 query: raw_query.to_string(),
1299 mode: QueryMode::Sql,
1300 statement: "queue_pending",
1301 engine: "runtime-queue",
1302 result,
1303 affected_rows: 0,
1304 statement_type: "select",
1305 bookmark: None,
1306 })
1307 }
1308 QueueCommand::Claim {
1309 queue,
1310 group,
1311 consumer,
1312 min_idle_ms,
1313 } => {
1314 let store = self.inner.db.store();
1315 ensure_queue_exists(store.as_ref(), queue)?;
1316 require_queue_group(store.as_ref(), queue, group)?;
1317 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1318 let delivered = lifecycle
1319 .claim_delivering(queue, consumer, *min_idle_ms, &txn)
1320 .map_err(map_qse)?;
1321
1322 let mut result = UnifiedResult::with_columns(vec![
1323 "message_id".into(),
1324 "payload".into(),
1325 "consumer".into(),
1326 "delivery_count".into(),
1327 ]);
1328
1329 for message in delivered {
1330 let mut record = UnifiedRecord::new();
1331 record.set(
1332 "message_id",
1333 Value::text(message_id_string(EntityId::new(message.message_id))),
1334 );
1335 record.set("payload", message.payload);
1336 record.set("consumer", Value::text(message.consumer));
1337 record.set(
1338 "delivery_count",
1339 Value::UnsignedInteger(u64::from(message.delivery_count)),
1340 );
1341 result.push(record);
1342 }
1343 if !result.records.is_empty() {
1344 self.invalidate_result_cache();
1345 }
1346 let affected_rows = result.records.len() as u64;
1347
1348 Ok(RuntimeQueryResult {
1349 query: raw_query.to_string(),
1350 mode: QueryMode::Sql,
1351 statement: "queue_claim",
1352 engine: "runtime-queue",
1353 result,
1354 affected_rows,
1355 statement_type: "update",
1356 bookmark: None,
1357 })
1358 }
1359 QueueCommand::Ack {
1360 queue,
1361 group,
1362 message_id,
1363 delivery_id,
1364 } => {
1365 let store = self.inner.db.store();
1366 ensure_queue_exists(store.as_ref(), queue)?;
1367 let (group_owned, message_entity) = resolve_ack_nack_handle(
1368 store.as_ref(),
1369 queue,
1370 group,
1371 message_id,
1372 delivery_id.as_deref(),
1373 )?;
1374 let group_ref = group_owned.as_str();
1375 require_queue_group(store.as_ref(), queue, group_ref)?;
1376 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1377 let did = match delivery_id.as_deref() {
1378 Some(d) => d.to_string(),
1379 None => ps
1380 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1381 .ok_or_else(|| {
1382 RedDBError::NotFound(format!(
1383 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1384 message_entity.raw(),
1385 queue,
1386 group_ref
1387 ))
1388 })?,
1389 };
1390 lifecycle.ack(&txn, &did).map_err(map_qse)?;
1391 self.invalidate_result_cache();
1392
1393 Ok(RuntimeQueryResult::ok_message(
1394 raw_query.to_string(),
1395 "message acknowledged",
1396 "update",
1397 ))
1398 }
1399 QueueCommand::Nack {
1400 queue,
1401 group,
1402 message_id,
1403 delivery_id,
1404 delay_ms,
1405 } => {
1406 let store = self.inner.db.store();
1407 ensure_queue_exists(store.as_ref(), queue)?;
1408 let config = load_queue_config(store.as_ref(), queue);
1409 if delay_ms.is_some() {
1415 if let Some((_, role)) = current_auth_identity() {
1416 if !role.can_write() {
1417 return Err(RedDBError::InvalidOperation(format!(
1418 "role '{role}' is not authorized to override NACK retry delay on queue '{queue}'"
1419 )));
1420 }
1421 }
1422 }
1423 let (group_owned, message_entity) = resolve_ack_nack_handle(
1424 store.as_ref(),
1425 queue,
1426 group,
1427 message_id,
1428 delivery_id.as_deref(),
1429 )?;
1430 let group_ref = group_owned.as_str();
1431 require_queue_group(store.as_ref(), queue, group_ref)?;
1432 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1433 let did = match delivery_id.as_deref() {
1434 Some(d) => d.to_string(),
1435 None => ps
1436 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1437 .ok_or_else(|| {
1438 RedDBError::NotFound(format!(
1439 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1440 message_entity.raw(),
1441 queue,
1442 group_ref
1443 ))
1444 })?,
1445 };
1446 let effective_delay_ms = delay_ms.or(config.retry_delay_ms).unwrap_or(0);
1450 let pending_attempt = ps.read_pending_attempt(&did).map_err(map_qse)?;
1451 let nack_attempts = pending_attempt.attempts.saturating_add(1);
1452 let outcome = lifecycle.nack(&txn, &did).map_err(map_qse)?;
1453 if matches!(outcome, RetirementOutcome::Requeued) && effective_delay_ms > 0 {
1457 let at_ns =
1458 now_ns().saturating_add(effective_delay_ms.saturating_mul(1_000_000));
1459 set_message_available_at_ns(
1460 store.as_ref(),
1461 queue,
1462 message_entity,
1463 Some(at_ns),
1464 config.ttl_ms,
1465 )?;
1466 }
1467 self.maybe_emit_nack_audit(
1474 queue,
1475 group_ref,
1476 &did,
1477 *delay_ms,
1478 config.retry_delay_ms,
1479 &outcome,
1480 );
1481 let outcome_label = match &outcome {
1482 RetirementOutcome::Requeued => NackOutcomeLabel::Retry,
1483 RetirementOutcome::MovedToDlq(_) => NackOutcomeLabel::Dlq,
1484 RetirementOutcome::Dropped => NackOutcomeLabel::Drop,
1485 };
1486 self.queue_telemetry().record_nacked(
1487 queue,
1488 group_ref,
1489 config.mode.as_str(),
1490 outcome_label,
1491 );
1492 if let RetirementOutcome::MovedToDlq(dlq) = &outcome {
1493 OperatorEvent::QueueDlqPromoted {
1494 queue: queue.to_string(),
1495 group: group_ref.to_string(),
1496 dlq: dlq.clone(),
1497 message_id: pending_attempt.message_id,
1498 attempts: nack_attempts,
1499 reason: format!("lifecycle_nack:{did}"),
1500 }
1501 .emit(self.audit_log());
1502 }
1503 let message = match outcome {
1504 RetirementOutcome::Requeued => {
1505 if effective_delay_ms > 0 {
1506 format!("message requeued (delay={effective_delay_ms}ms)")
1507 } else {
1508 "message requeued".to_string()
1509 }
1510 }
1511 RetirementOutcome::MovedToDlq(dlq) => {
1512 format!("message moved to dead-letter queue '{}'", dlq)
1513 }
1514 RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
1515 };
1516 self.invalidate_result_cache();
1517
1518 Ok(RuntimeQueryResult::ok_message(
1519 raw_query.to_string(),
1520 &message,
1521 "update",
1522 ))
1523 }
1524 QueueCommand::Move {
1525 source,
1526 destination,
1527 filter,
1528 limit,
1529 } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
1530 }
1531 }
1532
1533 pub fn execute_queue_select(
1534 &self,
1535 raw_query: &str,
1536 query: &QueueSelectQuery,
1537 ) -> RedDBResult<RuntimeQueryResult> {
1538 let store = self.inner.db.store();
1539 ensure_queue_exists(store.as_ref(), &query.queue)?;
1540 let config = load_queue_config(store.as_ref(), &query.queue);
1541 let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
1542 let columns = if query.columns.is_empty() {
1543 queue_projection_default_columns()
1544 } else {
1545 query.columns.clone()
1546 };
1547
1548 let mut messages =
1549 load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1550 sort_queue_messages(&mut messages, &config, QueueSide::Left);
1551
1552 let mut result = UnifiedResult::with_columns(columns.clone());
1553 for message in messages {
1554 if query
1555 .filter
1556 .as_ref()
1557 .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1558 {
1559 continue;
1560 }
1561 let record = queue_projection_record(&columns, &message, dlq)?;
1562 result.push(record);
1563 if query
1564 .limit
1565 .is_some_and(|limit| result.records.len() >= limit as usize)
1566 {
1567 break;
1568 }
1569 }
1570
1571 Ok(RuntimeQueryResult {
1572 query: raw_query.to_string(),
1573 mode: QueryMode::Sql,
1574 statement: "queue_select",
1575 engine: "runtime-queue",
1576 result,
1577 affected_rows: 0,
1578 statement_type: "select",
1579 bookmark: None,
1580 })
1581 }
1582
1583 fn execute_queue_move(
1584 &self,
1585 raw_query: &str,
1586 source: &str,
1587 destination: &str,
1588 filter: Option<&Filter>,
1589 limit: usize,
1590 ) -> RedDBResult<RuntimeQueryResult> {
1591 if source == destination {
1592 return Err(RedDBError::Query(
1593 "QUEUE MOVE source and destination must be different".to_string(),
1594 ));
1595 }
1596 let store = self.inner.db.store();
1597 ensure_queue_exists(store.as_ref(), source)?;
1598 ensure_queue_exists(store.as_ref(), destination)?;
1599 let source_config = load_queue_config(store.as_ref(), source);
1600 let destination_config = load_queue_config(store.as_ref(), destination);
1601 let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1602
1603 let mut messages =
1604 load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1605 sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1606 let selected = messages
1607 .into_iter()
1608 .filter(|message| {
1609 filter
1610 .map(|f| queue_message_matches_filter(message, source_dlq, f))
1611 .unwrap_or(true)
1612 })
1613 .take(limit)
1614 .collect::<Vec<_>>();
1615
1616 if let Some(max_size) = destination_config.max_size {
1617 let current_len =
1618 load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1619 .len();
1620 if current_len + selected.len() > max_size {
1621 return Err(RedDBError::Query(format!(
1622 "queue '{}' is full (max_size={max_size})",
1623 destination
1624 )));
1625 }
1626 }
1627
1628 for message in &selected {
1629 let lock = queue_message_lock_handle(self, source, message.id);
1630 let Some(_guard) = lock.try_lock() else {
1631 return Err(RedDBError::Query(format!(
1632 "message '{}' is locked on queue '{}'",
1633 message.id.raw(),
1634 source
1635 )));
1636 };
1637 if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1638 return Err(RedDBError::Query(format!(
1639 "message '{}' is no longer available on queue '{}'",
1640 message.id.raw(),
1641 source
1642 )));
1643 }
1644 }
1645
1646 let mut inserted = Vec::new();
1647 for message in &selected {
1648 match insert_moved_queue_message(
1649 store.as_ref(),
1650 destination,
1651 &destination_config,
1652 message,
1653 ) {
1654 Ok(id) => inserted.push(id),
1655 Err(err) => {
1656 for id in inserted {
1657 let _ = store.delete(destination, id);
1658 }
1659 return Err(err);
1660 }
1661 }
1662 }
1663
1664 let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1665 for message in &selected {
1666 move_lifecycle
1667 .delete_with_state(source, message.id.raw(), &move_txn)
1668 .map_err(map_qse)?;
1669 }
1670 if !selected.is_empty() {
1671 self.invalidate_result_cache();
1672 }
1673
1674 let selected_count = selected.len() as u64;
1675 self.audit_log().record_event(
1676 AuditEvent::builder("queue/move")
1677 .source(AuditAuthSource::System)
1678 .outcome(Outcome::Success)
1679 .resource(format!("queue:{source}->{destination}"))
1680 .fields([
1681 AuditFieldEscaper::field("source", source),
1682 AuditFieldEscaper::field("destination", destination),
1683 AuditFieldEscaper::field("selected", selected_count),
1684 AuditFieldEscaper::field("committed", selected_count),
1685 ])
1686 .build(),
1687 );
1688
1689 let mut result = UnifiedResult::with_columns(vec![
1690 "source".into(),
1691 "destination".into(),
1692 "selected".into(),
1693 "committed".into(),
1694 ]);
1695 let mut record = UnifiedRecord::new();
1696 record.set("source", Value::text(source.to_string()));
1697 record.set("destination", Value::text(destination.to_string()));
1698 record.set("selected", Value::UnsignedInteger(selected_count));
1699 record.set("committed", Value::UnsignedInteger(selected_count));
1700 result.push(record);
1701
1702 Ok(RuntimeQueryResult {
1703 query: raw_query.to_string(),
1704 mode: QueryMode::Sql,
1705 statement: "queue_move",
1706 engine: "runtime-queue",
1707 result,
1708 affected_rows: selected_count,
1709 statement_type: "update",
1710 bookmark: None,
1711 })
1712 }
1713
1714 fn maybe_emit_nack_audit(
1729 &self,
1730 queue: &str,
1731 group: &str,
1732 delivery_id: &str,
1733 override_ms: Option<u64>,
1734 default_ms: Option<u64>,
1735 outcome: &RetirementOutcome,
1736 ) {
1737 let Some(override_ms) = override_ms else {
1738 return;
1739 };
1740 let outcome_label = match outcome {
1741 RetirementOutcome::Requeued => "requeued",
1742 RetirementOutcome::MovedToDlq(_) => "dlq",
1743 RetirementOutcome::Dropped => "dropped",
1744 };
1745 const SIGNIFICANT_DELAY_MS: u64 = 60_000;
1746 let destination_changed = !matches!(outcome, RetirementOutcome::Requeued);
1747 if override_ms < SIGNIFICANT_DELAY_MS && !destination_changed {
1748 return;
1749 }
1750 self.audit_log().record_event(
1751 AuditEvent::builder("queue/nack/override")
1752 .source(AuditAuthSource::System)
1753 .outcome(Outcome::Success)
1754 .resource(format!("queue:{queue}"))
1755 .fields([
1756 AuditFieldEscaper::field("queue", queue),
1757 AuditFieldEscaper::field("group", group),
1758 AuditFieldEscaper::field("delivery_id", delivery_id),
1759 AuditFieldEscaper::field("override_delay_ms", override_ms),
1760 AuditFieldEscaper::field("default_delay_ms", default_ms.unwrap_or(0)),
1761 AuditFieldEscaper::field("outcome", outcome_label),
1762 ])
1763 .build(),
1764 );
1765 }
1766}
1767
1768fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1769 if store.get_collection(queue).is_some() {
1770 Ok(())
1771 } else {
1772 Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1773 }
1774}
1775
1776pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1777 let default = QueueRuntimeConfig {
1778 mode: QueueMode::Work,
1779 priority: false,
1780 max_size: None,
1781 ttl_ms: None,
1782 dlq: None,
1783 max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1784 lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1785 in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1786 retry_delay_ms: None,
1787 };
1788
1789 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1790 return default;
1791 };
1792 manager
1793 .query_all(|entity| {
1794 entity.data.as_row().is_some_and(|row| {
1795 row_text(row, "kind").as_deref() == Some("queue_config")
1796 && row_text(row, "queue").as_deref() == Some(queue)
1797 })
1798 })
1799 .into_iter()
1800 .find_map(|entity| {
1801 let row = entity.data.as_row()?;
1802 Some(QueueRuntimeConfig {
1803 mode: row_text(row, "mode")
1804 .as_deref()
1805 .and_then(QueueMode::parse)
1806 .unwrap_or_default(),
1807 priority: row_bool(row, "priority").unwrap_or(false),
1808 max_size: row_u64(row, "max_size").map(|value| value as usize),
1809 ttl_ms: row_u64(row, "ttl_ms"),
1810 dlq: row_text(row, "dlq"),
1811 max_attempts: row_u64(row, "max_attempts")
1812 .map(|value| value as u32)
1813 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1814 lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1815 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1816 in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1817 .map(|value| value as u32)
1818 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1819 retry_delay_ms: row_u64(row, "retry_delay_ms").filter(|v| *v > 0),
1820 })
1821 })
1822 .unwrap_or(default)
1823}
1824
1825pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1826 load_queue_config(store, queue).mode.as_str()
1827}
1828
1829fn save_queue_config(
1830 store: &UnifiedStore,
1831 queue: &str,
1832 config: &QueueRuntimeConfig,
1833) -> RedDBResult<()> {
1834 remove_meta_rows(store, |row| {
1835 row_text(row, "kind").as_deref() == Some("queue_config")
1836 && row_text(row, "queue").as_deref() == Some(queue)
1837 });
1838
1839 let mut fields = HashMap::new();
1840 fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1841 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1842 fields.insert(
1843 "mode".to_string(),
1844 Value::text(config.mode.as_str().to_string()),
1845 );
1846 fields.insert("priority".to_string(), Value::Boolean(config.priority));
1847 fields.insert(
1848 "max_size".to_string(),
1849 config
1850 .max_size
1851 .map(|value| Value::UnsignedInteger(value as u64))
1852 .unwrap_or(Value::Null),
1853 );
1854 fields.insert(
1855 "ttl_ms".to_string(),
1856 config
1857 .ttl_ms
1858 .map(Value::UnsignedInteger)
1859 .unwrap_or(Value::Null),
1860 );
1861 fields.insert(
1862 "dlq".to_string(),
1863 config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1864 );
1865 fields.insert(
1866 "max_attempts".to_string(),
1867 Value::UnsignedInteger(u64::from(config.max_attempts)),
1868 );
1869 fields.insert(
1870 "lock_deadline_ms".to_string(),
1871 Value::UnsignedInteger(config.lock_deadline_ms),
1872 );
1873 fields.insert(
1874 "in_flight_cap_per_group".to_string(),
1875 Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1876 );
1877 fields.insert(
1878 "retry_delay_ms".to_string(),
1879 config
1880 .retry_delay_ms
1881 .map(Value::UnsignedInteger)
1882 .unwrap_or(Value::Null),
1883 );
1884 insert_meta_row(store, fields)
1885}
1886
1887fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1888 remove_meta_rows(store, |row| {
1889 row_text(row, "queue").as_deref() == Some(queue)
1890 });
1891}
1892
1893fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1894 Ok(load_queue_groups(store, queue)?
1895 .into_iter()
1896 .any(|entry| entry.group == group))
1897}
1898
1899pub(super) fn require_queue_group(
1900 store: &UnifiedStore,
1901 queue: &str,
1902 group: &str,
1903) -> RedDBResult<()> {
1904 if queue_group_exists(store, queue, group)? {
1905 Ok(())
1906 } else {
1907 Err(RedDBError::NotFound(format!(
1908 "consumer group '{}' not found on queue '{}'",
1909 group, queue
1910 )))
1911 }
1912}
1913
1914pub(super) fn resolve_read_group(
1915 store: &UnifiedStore,
1916 queue: &str,
1917 group: Option<&str>,
1918 consumer: &str,
1919 config: &QueueRuntimeConfig,
1920) -> RedDBResult<String> {
1921 if let Some(group) = group {
1922 require_queue_group(store, queue, group)?;
1923 return Ok(group.to_string());
1924 }
1925
1926 match config.mode {
1927 QueueMode::Work => {
1928 if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1929 save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1930 }
1931 Ok(WORK_DEFAULT_GROUP.to_string())
1932 }
1933 QueueMode::Fanout => {
1934 let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1935 if !queue_group_exists(store, queue, &fanout_group)? {
1936 save_queue_group(store, queue, &fanout_group)?;
1937 }
1938 Ok(fanout_group)
1939 }
1940 }
1941}
1942
1943fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1944 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1945 return Ok(Vec::new());
1946 };
1947 Ok(manager
1948 .query_all(|entity| {
1949 entity.data.as_row().is_some_and(|row| {
1950 row_text(row, "kind").as_deref() == Some("queue_group")
1951 && row_text(row, "queue").as_deref() == Some(queue)
1952 })
1953 })
1954 .into_iter()
1955 .filter_map(|entity| {
1956 let row = entity.data.as_row()?;
1957 Some(QueueGroupEntry {
1958 entity_id: entity.id,
1959 group: row_text(row, "group")?,
1960 })
1961 })
1962 .collect())
1963}
1964
1965fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1966 let mut fields = HashMap::new();
1967 fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1968 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1969 fields.insert("group".to_string(), Value::text(group.to_string()));
1970 fields.insert(
1971 "created_at_ns".to_string(),
1972 Value::UnsignedInteger(now_ns()),
1973 );
1974 insert_meta_row(store, fields)
1975}
1976
1977pub(super) fn load_pending_entries(
1978 store: &UnifiedStore,
1979 queue: &str,
1980 group: Option<&str>,
1981 message_id: Option<EntityId>,
1982) -> RedDBResult<Vec<QueuePendingEntry>> {
1983 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1984 return Ok(Vec::new());
1985 };
1986 let lock_deadline_ns = load_queue_config(store, queue)
1987 .lock_deadline_ms
1988 .saturating_mul(1_000_000);
1989 let attempts_by_key: HashMap<(String, String, u64), u64> = manager
1990 .query_all(|entity| {
1991 entity.data.as_row().is_some_and(|row| {
1992 row_text(row, "kind").as_deref() == Some("queue_attempts_lc")
1993 && row_text(row, "queue").as_deref() == Some(queue)
1994 })
1995 })
1996 .into_iter()
1997 .filter_map(|entity| {
1998 let row = entity.data.as_row()?;
1999 Some((
2000 (
2001 row_text(row, "queue")?,
2002 row_text(row, "group")?,
2003 row_u64(row, "message_id")?,
2004 ),
2005 row_u64(row, "attempts").unwrap_or(1),
2006 ))
2007 })
2008 .collect();
2009 Ok(manager
2010 .query_all(|entity| {
2011 entity.data.as_row().is_some_and(|row| {
2012 matches!(
2013 row_text(row, "kind").as_deref(),
2014 Some("queue_pending") | Some("queue_pending_lc")
2015 ) && row_text(row, "queue").as_deref() == Some(queue)
2016 && group
2017 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2018 .unwrap_or(true)
2019 && message_id
2020 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2021 .unwrap_or(true)
2022 })
2023 })
2024 .into_iter()
2025 .filter_map(|entity| {
2026 let row = entity.data.as_row()?;
2027 let group = row_text(row, "group")?;
2028 let message_id = row_u64(row, "message_id")?;
2029 let kind = row_text(row, "kind")?;
2030 let delivered_at_ns = if kind == "queue_pending_lc" {
2031 row_u64(row, "lock_deadline_ns")
2032 .unwrap_or(0)
2033 .saturating_sub(lock_deadline_ns)
2034 } else {
2035 row_u64(row, "delivered_at_ns")?
2036 };
2037 let delivery_count = if kind == "queue_pending_lc" {
2038 attempts_by_key
2039 .get(&(queue.to_string(), group.clone(), message_id))
2040 .copied()
2041 .unwrap_or(1)
2042 } else {
2043 row_u64(row, "delivery_count").unwrap_or(1)
2044 };
2045 Some(QueuePendingEntry {
2046 entity_id: entity.id,
2047 group,
2048 message_id: EntityId::new(message_id),
2049 consumer: row_text(row, "consumer").unwrap_or_default(),
2050 delivered_at_ns,
2051 delivery_count: delivery_count as u32,
2052 })
2053 })
2054 .collect())
2055}
2056
2057pub(super) fn save_queue_pending(
2058 store: &UnifiedStore,
2059 queue: &str,
2060 group: &str,
2061 message_id: EntityId,
2062 consumer: &str,
2063 delivered_at_ns: u64,
2064 delivery_count: u32,
2065) -> RedDBResult<()> {
2066 remove_meta_rows(store, |row| {
2067 row_text(row, "kind").as_deref() == Some("queue_pending")
2068 && row_text(row, "queue").as_deref() == Some(queue)
2069 && row_text(row, "group").as_deref() == Some(group)
2070 && row_u64(row, "message_id") == Some(message_id.raw())
2071 });
2072
2073 let mut fields = HashMap::new();
2074 fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
2075 fields.insert("queue".to_string(), Value::text(queue.to_string()));
2076 fields.insert("group".to_string(), Value::text(group.to_string()));
2077 fields.insert(
2078 "message_id".to_string(),
2079 Value::UnsignedInteger(message_id.raw()),
2080 );
2081 fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
2082 fields.insert(
2083 "delivered_at_ns".to_string(),
2084 Value::UnsignedInteger(delivered_at_ns),
2085 );
2086 fields.insert(
2087 "delivery_count".to_string(),
2088 Value::UnsignedInteger(u64::from(delivery_count)),
2089 );
2090 insert_meta_row(store, fields)
2091}
2092
2093pub(super) fn require_pending_entry(
2094 store: &UnifiedStore,
2095 queue: &str,
2096 group: &str,
2097 message_id: EntityId,
2098) -> RedDBResult<QueuePendingEntry> {
2099 load_pending_entries(store, queue, Some(group), Some(message_id))?
2100 .into_iter()
2101 .next()
2102 .ok_or_else(|| {
2103 RedDBError::NotFound(format!(
2104 "message '{}' is not pending in group '{}' on queue '{}'",
2105 message_id.raw(),
2106 group,
2107 queue
2108 ))
2109 })
2110}
2111
2112pub(super) fn load_ack_entries(
2113 store: &UnifiedStore,
2114 queue: &str,
2115 group: Option<&str>,
2116 message_id: Option<EntityId>,
2117) -> RedDBResult<Vec<QueueAckEntry>> {
2118 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2119 return Ok(Vec::new());
2120 };
2121 Ok(manager
2122 .query_all(|entity| {
2123 entity.data.as_row().is_some_and(|row| {
2124 row_text(row, "kind").as_deref() == Some("queue_ack")
2125 && row_text(row, "queue").as_deref() == Some(queue)
2126 && group
2127 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2128 .unwrap_or(true)
2129 && message_id
2130 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2131 .unwrap_or(true)
2132 })
2133 })
2134 .into_iter()
2135 .filter_map(|entity| {
2136 let row = entity.data.as_row()?;
2137 Some(QueueAckEntry {
2138 entity_id: entity.id,
2139 group: row_text(row, "group")?,
2140 message_id: EntityId::new(row_u64(row, "message_id")?),
2141 })
2142 })
2143 .collect())
2144}
2145
2146pub(super) fn save_queue_ack(
2147 store: &UnifiedStore,
2148 queue: &str,
2149 group: &str,
2150 message_id: EntityId,
2151) -> RedDBResult<()> {
2152 let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
2153 if !existing.is_empty() {
2154 return Ok(());
2155 }
2156
2157 let mut fields = HashMap::new();
2158 fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
2159 fields.insert("queue".to_string(), Value::text(queue.to_string()));
2160 fields.insert("group".to_string(), Value::text(group.to_string()));
2161 fields.insert(
2162 "message_id".to_string(),
2163 Value::UnsignedInteger(message_id.raw()),
2164 );
2165 fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
2166 insert_meta_row(store, fields)
2167}
2168
2169pub(super) fn queue_message_completed_for_all_groups(
2170 store: &UnifiedStore,
2171 queue: &str,
2172 message_id: EntityId,
2173) -> RedDBResult<bool> {
2174 let groups = load_queue_groups(store, queue)?;
2175 let pending = load_pending_entries(store, queue, None, Some(message_id))?;
2176 if !pending.is_empty() {
2177 return Ok(false);
2178 }
2179 if groups.is_empty() {
2180 return Ok(true);
2181 }
2182
2183 let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
2184 .into_iter()
2185 .map(|entry| entry.group)
2186 .collect::<HashSet<_>>();
2187 Ok(groups
2188 .into_iter()
2189 .all(|group| acked_groups.contains(&group.group)))
2190}
2191
2192fn load_queue_message_views(
2193 store: &UnifiedStore,
2194 queue: &str,
2195) -> RedDBResult<Vec<QueueMessageView>> {
2196 load_queue_message_views_with_runtime(None, store, queue)
2197}
2198
2199pub(super) fn load_queue_message_views_with_runtime(
2206 runtime: Option<&RedDBRuntime>,
2207 store: &UnifiedStore,
2208 queue: &str,
2209) -> RedDBResult<Vec<QueueMessageView>> {
2210 let manager = store
2211 .get_collection(queue)
2212 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
2213 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
2217 let rls_filter = runtime.and_then(|rt| {
2218 crate::runtime::impl_core::rls_policy_filter_for_kind(
2219 rt,
2220 queue,
2221 crate::storage::query::ast::PolicyAction::Select,
2222 crate::storage::query::ast::PolicyTargetKind::Messages,
2223 )
2224 });
2225 let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
2226 && rls_filter.is_none()
2227 && runtime.is_some();
2228 if rls_enabled_but_denied {
2229 return Ok(Vec::new());
2231 }
2232 let filter_arc = rls_filter.map(std::sync::Arc::new);
2233 let rt_arc = runtime;
2234 Ok(manager
2235 .query_all(move |entity| {
2236 if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
2237 return false;
2238 }
2239 if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
2240 return false;
2241 }
2242 if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
2243 return crate::runtime::query_exec::evaluate_entity_filter_with_db(
2244 Some(&rt.inner.db),
2245 entity,
2246 filter,
2247 queue,
2248 queue,
2249 );
2250 }
2251 true
2252 })
2253 .into_iter()
2254 .filter_map(queue_message_view_from_entity)
2255 .map(|mut view| {
2256 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2257 view
2258 })
2259 .collect())
2260}
2261
2262fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
2263 let (position, _) = match &entity.kind {
2264 EntityKind::QueueMessage { position, queue } => (*position, queue),
2265 _ => return None,
2266 };
2267 let data = match entity.data {
2268 EntityData::QueueMessage(data) => data,
2269 _ => return None,
2270 };
2271 Some(QueueMessageView {
2272 id: entity.id,
2273 position,
2274 priority: data.priority.unwrap_or(0),
2275 payload: data.payload,
2276 attempts: data.attempts,
2277 max_attempts: data.max_attempts,
2278 enqueued_at_ns: data.enqueued_at_ns,
2279 available_at_ns: None,
2280 })
2281}
2282
2283pub(super) fn insert_moved_queue_message_payload(
2290 store: &UnifiedStore,
2291 queue: &str,
2292 payload: &Value,
2293) -> RedDBResult<EntityId> {
2294 let config = load_queue_config(store, queue);
2295 let position = next_queue_position(store, queue, QueueSide::Right)?;
2296 let enqueued_at_ns = std::time::SystemTime::now()
2297 .duration_since(std::time::UNIX_EPOCH)
2298 .map(|d| d.as_nanos() as u64)
2299 .unwrap_or(0);
2300 let entity = UnifiedEntity::new(
2301 EntityId::new(0),
2302 EntityKind::QueueMessage {
2303 queue: queue.to_string(),
2304 position,
2305 },
2306 EntityData::QueueMessage(QueueMessageData {
2307 payload: payload.clone(),
2308 priority: None,
2309 enqueued_at_ns,
2310 attempts: 0,
2311 max_attempts: config.max_attempts,
2312 acked: false,
2313 }),
2314 );
2315 let id = store
2316 .insert_auto(queue, entity)
2317 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2318 if let Some(ttl_ms) = config.ttl_ms {
2319 store
2320 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2321 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2322 }
2323 Ok(id)
2324}
2325
2326fn insert_moved_queue_message(
2327 store: &UnifiedStore,
2328 queue: &str,
2329 config: &QueueRuntimeConfig,
2330 message: &QueueMessageView,
2331) -> RedDBResult<EntityId> {
2332 let position = next_queue_position(store, queue, QueueSide::Right)?;
2333 let entity = UnifiedEntity::new(
2334 EntityId::new(0),
2335 EntityKind::QueueMessage {
2336 queue: queue.to_string(),
2337 position,
2338 },
2339 EntityData::QueueMessage(QueueMessageData {
2340 payload: message.payload.clone(),
2341 priority: if config.priority {
2342 Some(message.priority)
2343 } else {
2344 None
2345 },
2346 enqueued_at_ns: message.enqueued_at_ns,
2347 attempts: message.attempts,
2348 max_attempts: message.max_attempts,
2349 acked: false,
2350 }),
2351 );
2352 let id = store
2353 .insert_auto(queue, entity)
2354 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2355 if let Some(ttl_ms) = config.ttl_ms {
2356 store
2357 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2358 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2359 }
2360 Ok(id)
2361}
2362
2363fn queue_projection_default_columns() -> Vec<String> {
2364 [
2365 "id",
2366 "payload",
2367 "priority",
2368 "attempts",
2369 "last_error",
2370 "enqueued_at",
2371 "available_at",
2372 "dlq",
2373 "tenant",
2374 ]
2375 .into_iter()
2376 .map(str::to_string)
2377 .collect()
2378}
2379
2380fn queue_projection_record(
2381 columns: &[String],
2382 message: &QueueMessageView,
2383 dlq: bool,
2384) -> RedDBResult<UnifiedRecord> {
2385 let mut record = UnifiedRecord::new();
2386 for column in columns {
2387 let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
2388 RedDBError::Query(format!("unknown queue projection column '{}'", column))
2389 })?;
2390 record.set(column, value);
2391 }
2392 Ok(record)
2393}
2394
2395fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
2396 match column {
2397 "id" => Some(Value::text(message_id_string(message.id))),
2398 "payload" => Some(message.payload.clone()),
2399 "priority" => Some(Value::Integer(i64::from(message.priority))),
2400 "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
2401 "last_error" => Some(Value::Null),
2402 "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
2403 "available_at" => Some(Value::UnsignedInteger(
2404 message.available_at_ns.unwrap_or(message.enqueued_at_ns),
2405 )),
2406 "dlq" => Some(Value::Boolean(dlq)),
2407 "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
2408 _ => None,
2409 }
2410}
2411
2412fn queue_message_tenant(payload: &Value) -> Option<Value> {
2413 let Value::Json(bytes) = payload else {
2414 return None;
2415 };
2416 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2417 json.get("tenant")
2418 .and_then(crate::json::Value::as_str)
2419 .map(|tenant| Value::text(tenant.to_string()))
2420}
2421
2422fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
2423 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2424 return false;
2425 };
2426 !manager
2427 .query_all(|entity| {
2428 entity.data.as_row().is_some_and(|row| {
2429 row_text(row, "kind").as_deref() == Some("queue_config")
2430 && row_text(row, "dlq").as_deref() == Some(queue)
2431 })
2432 })
2433 .is_empty()
2434}
2435
2436fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
2437 match filter {
2438 Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
2439 .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
2440 Filter::CompareFields { left, op, right } => {
2441 match (
2442 queue_filter_field_value(message, dlq, left),
2443 queue_filter_field_value(message, dlq, right),
2444 ) {
2445 (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
2446 _ => false,
2447 }
2448 }
2449 Filter::And(left, right) => {
2450 queue_message_matches_filter(message, dlq, left)
2451 && queue_message_matches_filter(message, dlq, right)
2452 }
2453 Filter::Or(left, right) => {
2454 queue_message_matches_filter(message, dlq, left)
2455 || queue_message_matches_filter(message, dlq, right)
2456 }
2457 Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
2458 Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
2459 .is_none_or(|value| matches!(value, Value::Null)),
2460 Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
2461 .is_some_and(|value| !matches!(value, Value::Null)),
2462 Filter::In { field, values } => {
2463 queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
2464 values
2465 .iter()
2466 .any(|value| queue_values_equal(&candidate, value))
2467 })
2468 }
2469 Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
2470 .is_some_and(|candidate| {
2471 queue_compare_values(&candidate, low, CompareOp::Ge)
2472 && queue_compare_values(&candidate, high, CompareOp::Le)
2473 }),
2474 Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
2475 .is_some_and(|value| queue_like_matches(&value, pattern)),
2476 Filter::StartsWith { field, prefix } => {
2477 queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
2478 }
2479 Filter::EndsWith { field, suffix } => {
2480 queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
2481 }
2482 Filter::Contains { field, substring } => {
2483 queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
2484 }
2485 Filter::CompareExpr { .. } => false,
2486 }
2487}
2488
2489fn queue_filter_field_value(
2490 message: &QueueMessageView,
2491 dlq: bool,
2492 field: &FieldRef,
2493) -> Option<Value> {
2494 match field {
2495 FieldRef::TableColumn { table, column } if table.is_empty() => {
2496 queue_projection_value(message, dlq, column)
2497 .or_else(|| queue_payload_field_value(&message.payload, column))
2498 }
2499 FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
2500 .or_else(|| queue_payload_field_value(&message.payload, column)),
2501 _ => None,
2502 }
2503}
2504
2505fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
2506 let Value::Json(bytes) = payload else {
2507 return None;
2508 };
2509 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2510 let value = json.get(field)?;
2511 json_value_to_schema_value(value)
2512}
2513
2514fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
2515 if matches!(value, crate::json::Value::Null) {
2516 Some(Value::Null)
2517 } else if let Some(value) = value.as_bool() {
2518 Some(Value::Boolean(value))
2519 } else if let Some(value) = value.as_i64() {
2520 Some(Value::Integer(value))
2521 } else if let Some(value) = value.as_u64() {
2522 Some(Value::UnsignedInteger(value))
2523 } else if let Some(value) = value.as_f64() {
2524 Some(Value::Float(value))
2525 } else if let Some(value) = value.as_str() {
2526 Some(Value::text(value.to_string()))
2527 } else {
2528 Some(Value::Json(value.to_string_compact().into_bytes()))
2529 }
2530}
2531
2532fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
2533 queue_filter_field_value(message, dlq, field).and_then(|value| match value {
2534 Value::Text(value) => Some(value.to_string()),
2535 Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
2536 Value::Integer(value) => Some(value.to_string()),
2537 Value::UnsignedInteger(value) => Some(value.to_string()),
2538 Value::Float(value) => Some(value.to_string()),
2539 Value::Boolean(value) => Some(value.to_string()),
2540 _ => None,
2541 })
2542}
2543
2544fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
2545 match op {
2546 CompareOp::Eq => queue_values_equal(left, right),
2547 CompareOp::Ne => !queue_values_equal(left, right),
2548 CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
2549 CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
2550 CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
2551 CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
2552 }
2553}
2554
2555fn queue_values_equal(left: &Value, right: &Value) -> bool {
2556 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2557 return (left - right).abs() < f64::EPSILON;
2558 }
2559 match (left, right) {
2560 (Value::Text(left), Value::Text(right)) => left == right,
2561 (Value::Boolean(left), Value::Boolean(right)) => left == right,
2562 _ => left == right,
2563 }
2564}
2565
2566fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
2567 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2568 return left.partial_cmp(&right);
2569 }
2570 match (left, right) {
2571 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
2572 _ => None,
2573 }
2574}
2575
2576fn queue_value_number(value: &Value) -> Option<f64> {
2577 match value {
2578 Value::Integer(value) => Some(*value as f64),
2579 Value::UnsignedInteger(value) => Some(*value as f64),
2580 Value::Float(value) => Some(*value),
2581 Value::Text(value) => value.parse().ok(),
2582 _ => None,
2583 }
2584}
2585
2586fn queue_like_matches(value: &str, pattern: &str) -> bool {
2587 if pattern == "%" {
2588 return true;
2589 }
2590 let starts_wild = pattern.starts_with('%');
2591 let ends_wild = pattern.ends_with('%');
2592 let needle = pattern.trim_matches('%');
2593 match (starts_wild, ends_wild) {
2594 (true, true) => value.contains(needle),
2595 (true, false) => value.ends_with(needle),
2596 (false, true) => value.starts_with(needle),
2597 (false, false) => value == needle,
2598 }
2599}
2600
2601pub(super) fn queue_message_view_by_id(
2602 store: &UnifiedStore,
2603 queue: &str,
2604 message_id: EntityId,
2605) -> RedDBResult<Option<QueueMessageView>> {
2606 let manager = queue_manager(store, queue)?;
2607 Ok(manager
2608 .get(message_id)
2609 .and_then(queue_message_view_from_entity)
2610 .map(|mut view| {
2611 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2612 view
2613 }))
2614}
2615
2616pub(super) fn sort_queue_messages(
2617 messages: &mut [QueueMessageView],
2618 config: &QueueRuntimeConfig,
2619 side: QueueSide,
2620) {
2621 messages.sort_by(|left, right| {
2622 if config.priority {
2623 right
2624 .priority
2625 .cmp(&left.priority)
2626 .then_with(|| match side {
2627 QueueSide::Left => left.position.cmp(&right.position),
2628 QueueSide::Right => right.position.cmp(&left.position),
2629 })
2630 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2631 } else {
2632 match side {
2633 QueueSide::Left => left.position.cmp(&right.position),
2634 QueueSide::Right => right.position.cmp(&left.position),
2635 }
2636 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2637 }
2638 });
2639}
2640
2641pub(super) fn next_queue_position(
2642 store: &UnifiedStore,
2643 queue: &str,
2644 side: QueueSide,
2645) -> RedDBResult<u64> {
2646 let messages = load_queue_message_views(store, queue)?;
2647 if messages.is_empty() {
2648 return Ok(QUEUE_POSITION_CENTER);
2649 }
2650 match side {
2651 QueueSide::Left => Ok(messages
2652 .iter()
2653 .map(|message| message.position)
2654 .min()
2655 .unwrap_or(QUEUE_POSITION_CENTER)
2656 .saturating_sub(1)),
2657 QueueSide::Right => Ok(messages
2658 .iter()
2659 .map(|message| message.position)
2660 .max()
2661 .unwrap_or(QUEUE_POSITION_CENTER)
2662 .saturating_add(1)),
2663 }
2664}
2665
2666pub(super) fn increment_queue_attempts(
2667 store: &UnifiedStore,
2668 queue: &str,
2669 message_id: EntityId,
2670) -> RedDBResult<u32> {
2671 let manager = queue_manager(store, queue)?;
2672 let mut entity = manager
2673 .get(message_id)
2674 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2675 match &mut entity.data {
2676 EntityData::QueueMessage(message) => {
2677 message.attempts = message.attempts.saturating_add(1);
2678 let attempts = message.attempts;
2679 manager
2680 .update(entity)
2681 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2682 Ok(attempts)
2683 }
2684 _ => Err(RedDBError::Query(format!(
2685 "entity '{}' is not a queue message",
2686 message_id.raw()
2687 ))),
2688 }
2689}
2690
2691pub(super) fn queue_message_attempts(
2692 store: &UnifiedStore,
2693 queue: &str,
2694 message_id: EntityId,
2695) -> RedDBResult<u32> {
2696 Ok(queue_message_data(store, queue, message_id)?.attempts)
2697}
2698
2699pub(super) fn queue_message_max_attempts(
2700 store: &UnifiedStore,
2701 queue: &str,
2702 message_id: EntityId,
2703) -> RedDBResult<u32> {
2704 Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2705}
2706
2707pub(super) fn queue_message_payload(
2708 store: &UnifiedStore,
2709 queue: &str,
2710 message_id: EntityId,
2711) -> RedDBResult<Value> {
2712 Ok(queue_message_data(store, queue, message_id)?.payload)
2713}
2714
2715pub(super) fn queue_message_pending_any(
2716 store: &UnifiedStore,
2717 queue: &str,
2718 message_id: EntityId,
2719) -> RedDBResult<bool> {
2720 Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2721}
2722
2723pub(super) fn queue_message_pending_for_group(
2724 store: &UnifiedStore,
2725 queue: &str,
2726 group: &str,
2727 message_id: EntityId,
2728) -> RedDBResult<bool> {
2729 Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2730}
2731
2732pub(super) fn queue_message_acked_for_group(
2733 store: &UnifiedStore,
2734 queue: &str,
2735 group: &str,
2736 message_id: EntityId,
2737) -> RedDBResult<bool> {
2738 Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2739}
2740
2741fn queue_manager(
2742 store: &UnifiedStore,
2743 queue: &str,
2744) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2745 store
2746 .get_collection(queue)
2747 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2748}
2749
2750pub(super) fn queue_message_data(
2751 store: &UnifiedStore,
2752 queue: &str,
2753 message_id: EntityId,
2754) -> RedDBResult<QueueMessageData> {
2755 let manager = queue_manager(store, queue)?;
2756 let entity = manager
2757 .get(message_id)
2758 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2759 match entity.data {
2760 EntityData::QueueMessage(message) => Ok(message),
2761 _ => Err(RedDBError::Query(format!(
2762 "entity '{}' is not a queue message",
2763 message_id.raw()
2764 ))),
2765 }
2766}
2767
2768fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2769 let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2770 store
2771 .insert_auto(
2772 QUEUE_META_COLLECTION,
2773 UnifiedEntity::new(
2774 EntityId::new(0),
2775 EntityKind::TableRow {
2776 table: Arc::from(QUEUE_META_COLLECTION),
2777 row_id: 0,
2778 },
2779 EntityData::Row(RowData {
2780 columns: Vec::new(),
2781 named: Some(fields),
2782 schema: None,
2783 }),
2784 ),
2785 )
2786 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2787 Ok(())
2788}
2789
2790pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2791 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2792 return;
2793 };
2794 let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2795 for row in rows {
2796 let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2797 }
2798}
2799
2800pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2801 let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2802}
2803
2804fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2805 format!("{queue}:{}", message_id.raw())
2806}
2807
2808pub(super) fn queue_message_lock_handle(
2809 runtime: &RedDBRuntime,
2810 queue: &str,
2811 message_id: EntityId,
2812) -> Arc<parking_lot::Mutex<()>> {
2813 let key = queue_message_lock_key(queue, message_id);
2814 if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2815 return lock;
2816 }
2817
2818 let mut locks = runtime.inner.queue_message_locks.write();
2819 locks
2820 .entry(key)
2821 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2822 .clone()
2823}
2824
2825pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2826 runtime
2827 .inner
2828 .queue_message_locks
2829 .write()
2830 .remove(&queue_message_lock_key(queue, message_id));
2831}
2832
2833fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2834 let raw = value.strip_prefix('e').unwrap_or(value);
2835 raw.parse::<u64>()
2836 .map(EntityId::new)
2837 .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2838}
2839
2840pub(super) fn resolve_ack_nack_handle(
2846 store: &UnifiedStore,
2847 queue: &str,
2848 group_hint: &str,
2849 message_id_hint: &str,
2850 delivery_id: Option<&str>,
2851) -> RedDBResult<(String, EntityId)> {
2852 if let Some(did) = delivery_id {
2853 return resolve_delivery_id(store, queue, did);
2854 }
2855 if group_hint.is_empty() || message_id_hint.is_empty() {
2856 return Err(RedDBError::Query(
2857 "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2858 .to_string(),
2859 ));
2860 }
2861 log_tuple_deprecation(queue);
2862 let entity = parse_message_id(message_id_hint)?;
2863 Ok((group_hint.to_string(), entity))
2864}
2865
2866fn resolve_delivery_id(
2867 store: &UnifiedStore,
2868 queue: &str,
2869 delivery_id: &str,
2870) -> RedDBResult<(String, EntityId)> {
2871 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2872 return Err(RedDBError::Query(format!(
2873 "delivery_id '{}' does not resolve to a live pending delivery",
2874 delivery_id
2875 )));
2876 };
2877 for entity in manager.query_all(|entity| {
2878 entity.data.as_row().is_some_and(|row| {
2879 row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2880 && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2881 })
2882 }) {
2883 if let Some(row) = entity.data.as_row() {
2884 let row_queue = row_text(row, "queue").unwrap_or_default();
2885 let row_group = row_text(row, "group").unwrap_or_default();
2886 let row_message = row_u64(row, "message_id").unwrap_or(0);
2887 if row_queue != queue {
2888 return Err(RedDBError::Query(format!(
2889 "delivery_id '{}' belongs to queue '{}', not '{}'",
2890 delivery_id, row_queue, queue
2891 )));
2892 }
2893 return Ok((row_group, EntityId::new(row_message)));
2894 }
2895 }
2896 Err(RedDBError::Query(format!(
2897 "delivery_id '{}' does not resolve to a live pending delivery",
2898 delivery_id
2899 )))
2900}
2901
2902fn log_tuple_deprecation(queue: &str) {
2905 use std::sync::atomic::Ordering;
2906 use std::sync::{Mutex, OnceLock};
2907 use std::time::Instant;
2908
2909 static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2910 const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2911
2912 let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2913 let key = (super::impl_core::current_connection_id(), queue.to_string());
2914 let now = Instant::now();
2915 let mut guard = match map.lock() {
2916 Ok(g) => g,
2917 Err(_) => return,
2918 };
2919 let should_emit =
2920 !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2921 if should_emit {
2922 guard.insert(key.clone(), now);
2923 drop(guard);
2924 TUPLE_DEPRECATION_EMITS.fetch_add(1, Ordering::Relaxed);
2925 tracing::warn!(
2926 target: "reddb::queue_lifecycle",
2927 queue = queue,
2928 connection_id = key.0,
2929 "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2930 switch to the server-issued delivery_id (ADR 0026). \
2931 The tuple path will be removed one minor release after introduction.",
2932 );
2933 }
2934}
2935
2936pub static TUPLE_DEPRECATION_EMITS: std::sync::atomic::AtomicU64 =
2942 std::sync::atomic::AtomicU64::new(0);
2943
2944fn message_id_string(message_id: EntityId) -> String {
2945 message_id.raw().to_string()
2946}
2947
2948fn delivered_message_json(
2954 message: crate::runtime::queue_lifecycle::DeliveredMessage,
2955) -> crate::serde_json::Value {
2956 use crate::serde_json::{Map, Value as JsonValue};
2957 let mut obj = Map::new();
2958 obj.insert(
2959 "message_id".to_string(),
2960 JsonValue::String(message_id_string(EntityId::new(message.message_id))),
2961 );
2962 obj.insert(
2963 "payload".to_string(),
2964 crate::presentation::entity_json::storage_value_to_json(&message.payload),
2965 );
2966 obj.insert("consumer".to_string(), JsonValue::String(message.consumer));
2967 obj.insert(
2968 "delivery_count".to_string(),
2969 JsonValue::Number(message.delivery_count as f64),
2970 );
2971 JsonValue::Object(obj)
2972}
2973
2974pub(crate) fn pending_counts_by_group(
2979 store: &UnifiedStore,
2980) -> std::collections::BTreeMap<(String, String), u64> {
2981 let mut counts: std::collections::BTreeMap<(String, String), u64> =
2982 std::collections::BTreeMap::new();
2983 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2984 return counts;
2985 };
2986 for entity in manager.query_all(|entity| {
2987 entity
2988 .data
2989 .as_row()
2990 .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2991 }) {
2992 if let Some(row) = entity.data.as_row() {
2993 let queue = row_text(row, "queue");
2994 let group = row_text(row, "group");
2995 if let (Some(q), Some(g)) = (queue, group) {
2996 *counts.entry((q, g)).or_insert(0) += 1;
2997 }
2998 }
2999 }
3000 counts
3001}
3002
3003pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
3004 match row.get_field(field)?.clone() {
3005 Value::Text(value) => Some(value.to_string()),
3006 Value::NodeRef(value) => Some(value),
3007 Value::EdgeRef(value) => Some(value),
3008 Value::TableRef(value) => Some(value),
3009 _ => None,
3010 }
3011}
3012
3013pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
3014 match row.get_field(field)?.clone() {
3015 Value::UnsignedInteger(value) => Some(value),
3016 Value::Integer(value) if value >= 0 => Some(value as u64),
3017 Value::Float(value) if value >= 0.0 => Some(value as u64),
3018 Value::Text(value) => value.parse().ok(),
3019 _ => None,
3020 }
3021}
3022
3023fn row_bool(row: &RowData, field: &str) -> Option<bool> {
3024 match row.get_field(field)?.clone() {
3025 Value::Boolean(value) => Some(value),
3026 Value::Text(value) => match value.to_ascii_lowercase().as_str() {
3027 "true" => Some(true),
3028 "false" => Some(false),
3029 _ => None,
3030 },
3031 _ => None,
3032 }
3033}
3034
3035fn queue_collection_contract(
3036 name: &str,
3037 priority: bool,
3038 ttl_ms: Option<u64>,
3039) -> crate::physical::CollectionContract {
3040 let now = current_unix_ms();
3041 let mut context_index_fields = Vec::new();
3042 if priority {
3043 context_index_fields.push("priority".to_string());
3044 }
3045
3046 crate::physical::CollectionContract {
3047 name: name.to_string(),
3048 declared_model: crate::catalog::CollectionModel::Queue,
3049 schema_mode: crate::catalog::SchemaMode::Dynamic,
3050 origin: crate::physical::ContractOrigin::Explicit,
3051 version: 1,
3052 created_at_unix_ms: now,
3053 updated_at_unix_ms: now,
3054 default_ttl_ms: ttl_ms,
3055 vector_dimension: None,
3056 vector_metric: None,
3057 context_index_fields,
3058 declared_columns: Vec::new(),
3059 table_def: None,
3060 timestamps_enabled: false,
3061 context_index_enabled: false,
3062 metrics_raw_retention_ms: None,
3063 metrics_rollup_policies: Vec::new(),
3064 metrics_tenant_identity: None,
3065 metrics_namespace: None,
3066 append_only: true,
3070 subscriptions: Vec::new(),
3071 analytics_config: Vec::new(),
3072 session_key: None,
3073 session_gap_ms: None,
3074 retention_duration_ms: None,
3075 analytical_storage: None,
3076
3077 ai_policy: None,
3078 }
3079}
3080
3081fn current_unix_ms() -> u128 {
3082 std::time::SystemTime::now()
3083 .duration_since(std::time::UNIX_EPOCH)
3084 .unwrap_or_default()
3085 .as_millis()
3086}
3087
3088pub(super) fn now_ns() -> u64 {
3089 std::time::SystemTime::now()
3090 .duration_since(std::time::UNIX_EPOCH)
3091 .unwrap_or_default()
3092 .as_nanos() as u64
3093}
3094
3095pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
3096 queue_message_metadata(Some(ttl_ms), None)
3097}
3098
3099pub(super) fn queue_message_metadata(
3105 ttl_ms: Option<u64>,
3106 available_at_ns: Option<u64>,
3107) -> Metadata {
3108 let mut fields = HashMap::new();
3109 if let Some(ttl_ms) = ttl_ms {
3110 fields.insert(
3111 "_ttl_ms".to_string(),
3112 if ttl_ms <= i64::MAX as u64 {
3113 MetadataValue::Int(ttl_ms as i64)
3114 } else {
3115 MetadataValue::Timestamp(ttl_ms)
3116 },
3117 );
3118 }
3119 if let Some(at_ns) = available_at_ns {
3120 fields.insert(
3121 "_available_at_ns".to_string(),
3122 MetadataValue::Timestamp(at_ns),
3123 );
3124 }
3125 Metadata::with_fields(fields)
3126}
3127
3128pub(super) fn earliest_future_available_at(store: &UnifiedStore, queue: &str) -> Option<u64> {
3136 let now_ns = now_ns();
3137 let views = load_queue_message_views_with_runtime(None, store, queue).ok()?;
3138 views
3139 .iter()
3140 .filter_map(|v| v.available_at_ns)
3141 .filter(|at| *at > now_ns)
3142 .min()
3143}
3144
3145pub(super) fn set_message_available_at_ns(
3156 store: &UnifiedStore,
3157 queue: &str,
3158 message_id: EntityId,
3159 available_at_ns: Option<u64>,
3160 fallback_ttl_ms: Option<u64>,
3161) -> RedDBResult<()> {
3162 let existing_ttl_ms = store
3163 .get_metadata(queue, message_id)
3164 .and_then(|md| match md.get("_ttl_ms")? {
3165 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3166 MetadataValue::Timestamp(t) => Some(*t),
3167 _ => None,
3168 })
3169 .or(fallback_ttl_ms);
3170 let metadata = queue_message_metadata(existing_ttl_ms, available_at_ns);
3171 match store.set_metadata(queue, message_id, metadata) {
3172 Ok(()) => Ok(()),
3173 Err(crate::storage::StoreError::CollectionNotFound(_)) => Ok(()),
3174 Err(err) => Err(RedDBError::Internal(err.to_string())),
3175 }
3176}
3177
3178pub(super) fn read_message_available_at_ns(
3182 store: &UnifiedStore,
3183 queue: &str,
3184 message_id: EntityId,
3185) -> Option<u64> {
3186 let md = store.get_metadata(queue, message_id)?;
3187 match md.get("_available_at_ns")? {
3188 MetadataValue::Timestamp(t) => Some(*t),
3189 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3190 _ => None,
3191 }
3192}
3193
3194fn estimate_payload_bytes(payload: &Value) -> u64 {
3196 match payload {
3197 Value::Json(v) => v.len() as u64,
3198 Value::Text(s) => s.len() as u64,
3199 _ => 64,
3200 }
3201}
3202
3203#[cfg(test)]
3204mod presence_integration_tests {
3205 use super::*;
3206 use crate::storage::queue::presence::{PresenceState, DEFAULT_PRESENCE_TTL_MS};
3207 use crate::{RedDBOptions, RedDBRuntime};
3208
3209 #[test]
3213 fn queue_read_emits_consumer_presence_heartbeat() {
3214 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3215 rt.execute_query("CREATE QUEUE tasks").unwrap();
3216 rt.execute_query("QUEUE GROUP CREATE tasks workers")
3217 .unwrap();
3218 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
3219 rt.execute_query("QUEUE READ tasks GROUP workers CONSUMER w1")
3220 .unwrap();
3221
3222 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3223 assert_eq!(snap.len(), 1, "exactly one heartbeat recorded");
3224 let row = &snap[0];
3225 assert_eq!(row.queue, "tasks");
3226 assert_eq!(row.group, "workers");
3227 assert_eq!(row.consumer, "w1");
3228 assert_eq!(row.state, PresenceState::Active);
3229
3230 let counts = rt.queue_active_consumer_counts(DEFAULT_PRESENCE_TTL_MS);
3231 assert_eq!(
3232 counts[&("tasks".to_string(), "workers".to_string())],
3233 1,
3234 "active count reflects the live consumer"
3235 );
3236 }
3237
3238 #[test]
3242 fn empty_queue_read_still_heartbeats() {
3243 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3244 rt.execute_query("CREATE QUEUE empty_q").unwrap();
3245 rt.execute_query("QUEUE GROUP CREATE empty_q workers")
3246 .unwrap();
3247 rt.execute_query("QUEUE READ empty_q GROUP workers CONSUMER w1")
3249 .unwrap();
3250
3251 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3252 assert_eq!(
3253 snap.len(),
3254 1,
3255 "empty read still registers consumer presence"
3256 );
3257 assert_eq!(snap[0].state, PresenceState::Active);
3258 assert_eq!(
3259 snap[0].lease_count, 0,
3260 "no messages delivered → zero leases"
3261 );
3262 }
3263}