1use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::runtime::impl_core::current_auth_identity;
9use crate::storage::queue::QueueMode;
10use crate::storage::unified::entity::{QueueMessageData, RowData};
11use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
12
13use super::*;
14
15use super::primary_queue_store::PrimaryQueueStore;
16use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
17use crate::storage::queue::lifecycle::{
18 QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
19};
20
21pub(super) fn runtime_lifecycle(
28 runtime: &RedDBRuntime,
29 queue: &str,
30) -> (
31 QueueLifecycle<PrimaryQueueStore>,
32 PrimaryQueueStore,
33 QueueTxn,
34) {
35 let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
36 let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
37 let txn = primary_for_lifecycle.new_txn();
38 let cfg = primary_for_lifecycle.lifecycle_config(queue);
39 (
40 QueueLifecycle::new(primary_for_lifecycle, cfg),
41 primary_for_lookup,
42 txn,
43 )
44}
45
46pub(crate) const QUEUE_READ_WAIT_CANCELLED: &str =
52 "QUEUE READ WAIT cancelled — server shutting down";
53
54pub(crate) const QUEUE_MAX_WAIT_MS_CONFIG_KEY: &str = "red.config.queue.max_wait_ms";
58
59pub(crate) const QUEUE_MAX_WAIT_MS_DEFAULT: u64 = 60_000;
62
63pub(super) fn queue_wait_scope() -> String {
68 crate::runtime::impl_core::current_tenant().unwrap_or_default()
69}
70
71fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
75 use crate::storage::query::ast::QueueSide as Ast;
76 match side {
77 Ast::Left => LcQueueSide::Left,
78 Ast::Right => LcQueueSide::Right,
79 }
80}
81
82fn map_qse(err: QueueStoreError) -> RedDBError {
86 match err {
87 QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
88 "delivery_id '{id}' does not resolve to a live pending delivery"
89 )),
90 QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
91 QueueStoreError::ReplicaImmutable => {
92 RedDBError::Internal("replica QueueStore is immutable".to_string())
93 }
94 }
95}
96
97pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
104
105pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
107
108pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
110
111const OUTBOX_WARN_BYTES: u64 = 1 << 30;
113
114const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
116
117static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
119
120const QUEUE_META_COLLECTION: &str = "red_queue_meta";
121const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
122const WORK_DEFAULT_GROUP: &str = "_work_default";
123const FANOUT_GROUP_PREFIX: &str = "_fanout_";
124
125#[derive(Debug, Clone)]
126pub(super) struct QueueRuntimeConfig {
127 pub(super) mode: QueueMode,
128 pub(super) priority: bool,
129 pub(super) max_size: Option<usize>,
130 pub(super) ttl_ms: Option<u64>,
131 pub(super) dlq: Option<String>,
132 pub(super) max_attempts: u32,
133 pub(super) lock_deadline_ms: u64,
134 pub(super) in_flight_cap_per_group: u32,
135 pub(super) retry_delay_ms: Option<u64>,
140}
141
142#[derive(Debug, Clone)]
143struct QueueGroupEntry {
144 entity_id: EntityId,
145 group: String,
146}
147
148#[derive(Debug, Clone)]
149pub(super) struct QueuePendingEntry {
150 pub(super) entity_id: EntityId,
151 group: String,
152 pub(super) message_id: EntityId,
153 consumer: String,
154 pub(super) delivered_at_ns: u64,
155 pub(super) delivery_count: u32,
156}
157
158#[derive(Debug, Clone)]
159pub(super) struct QueueAckEntry {
160 entity_id: EntityId,
161 group: String,
162 pub(super) message_id: EntityId,
163}
164
165#[derive(Debug, Clone)]
166pub(super) struct QueueMessageView {
167 pub(super) id: EntityId,
168 position: u64,
169 priority: i32,
170 pub(super) payload: Value,
171 attempts: u32,
172 pub(super) max_attempts: u32,
173 enqueued_at_ns: u64,
174 pub(super) available_at_ns: Option<u64>,
178}
179
180impl QueueMessageView {
181 pub(super) fn is_available_now(&self) -> bool {
186 match self.available_at_ns {
187 Some(at) => at <= now_ns(),
188 None => true,
189 }
190 }
191}
192
193impl RedDBRuntime {
194 pub(super) fn group_read_with_optional_wait(
203 &self,
204 queue: &str,
205 group: &str,
206 consumer: &str,
207 count: usize,
208 wait_ms: Option<u64>,
209 ) -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
210 let do_read =
211 |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
212 let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
213 lifecycle
214 .group_read(&txn, queue, group, consumer, count)
215 .map_err(map_qse)
216 };
217
218 let delivered = do_read(self)?;
219 let Some(wait_ms) = wait_ms else {
220 return Ok(delivered);
221 };
222 if !delivered.is_empty() {
223 return Ok(delivered);
224 }
225 let registry = self.queue_wait_registry();
235 let scope = queue_wait_scope();
236 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
237 let telemetry = self.queue_telemetry();
238 telemetry.record_wait_started(&scope, queue);
239 let wait_start = std::time::Instant::now();
240 let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
241 let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
242 telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
243 };
244 loop {
245 let snapshot = registry.snapshot(&scope, queue);
249 let delivered = do_read(self)?;
250 if !delivered.is_empty() {
251 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
252 return Ok(delivered);
253 }
254 if registry.is_cancelled() {
255 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
256 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
257 }
258 if std::time::Instant::now() >= deadline {
259 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
260 return Ok(Vec::new());
261 }
262 let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
272 Some(at_ns) => {
273 let now_ns = now_ns();
274 if at_ns <= now_ns {
275 deadline.min(std::time::Instant::now())
277 } else {
278 let wait_ns = at_ns - now_ns;
279 let due_instant =
280 std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
281 deadline.min(due_instant)
282 }
283 }
284 None => deadline,
285 };
286 match registry.wait_until(&snapshot, park_deadline) {
287 crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
288 crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
289 if std::time::Instant::now() >= deadline {
293 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
294 return Ok(Vec::new());
295 }
296 continue;
297 }
298 crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
299 observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
300 return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
301 }
302 }
303 }
304 }
305
306 pub(crate) fn enqueue_event_payload(
307 &self,
308 queue: &str,
309 payload: Value,
310 ) -> RedDBResult<EntityId> {
311 let store = self.inner.db.store();
312 if store.get_collection(queue).is_none() {
314 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
315 }
316
317 let payload_bytes = estimate_payload_bytes(&payload);
319 let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
320
321 if outbox_bytes > OUTBOX_MAX_BYTES {
323 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
324 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
325 return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
326 }
327
328 if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
330 tracing::warn!(
331 outbox_bytes,
332 warn_threshold = OUTBOX_WARN_BYTES,
333 "event outbox approaching capacity warning threshold"
334 );
335 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
336 queue: queue.to_string(),
337 dlq: format!("{queue}_outbox_dlq"),
338 reason: "outbox_warn_bytes_exceeded".to_string(),
339 }
340 .emit_global();
341 }
342
343 let config = load_queue_config(store.as_ref(), queue);
344
345 if let Some(max_size) = config.max_size {
347 let current_len = load_queue_message_views(store.as_ref(), queue)
348 .unwrap_or_default()
349 .len();
350 if current_len >= max_size {
351 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
352 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
353 return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
354 }
355 if current_len * 10 >= max_size * 8 {
357 tracing::warn!(
358 queue = %queue,
359 size = current_len,
360 max = max_size,
361 "event target queue near capacity"
362 );
363 }
364 }
365
366 let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
367 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
368 Ok(id)
369 }
370
371 fn route_event_to_outbox_dlq(
373 &self,
374 queue: &str,
375 payload: Value,
376 reason: &str,
377 ) -> RedDBResult<EntityId> {
378 let dlq_name = format!("{queue}_outbox_dlq");
379 EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
380
381 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
382 queue: queue.to_string(),
383 dlq: dlq_name.clone(),
384 reason: reason.to_string(),
385 }
386 .emit_global();
387
388 let store = self.inner.db.store();
389 if store.get_collection(&dlq_name).is_none() {
390 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
391 }
392 let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
393 let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
394 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
395 Ok(id)
396 }
397
398 fn enqueue_event_payload_raw(
400 &self,
401 store: &UnifiedStore,
402 queue: &str,
403 config: &QueueRuntimeConfig,
404 payload: Value,
405 ) -> RedDBResult<EntityId> {
406 let position = next_queue_position(store, queue, QueueSide::Right)?;
407 let mut entity = UnifiedEntity::new(
408 EntityId::new(0),
409 EntityKind::QueueMessage {
410 queue: queue.to_string(),
411 position,
412 },
413 EntityData::QueueMessage(QueueMessageData {
414 payload,
415 priority: None,
416 enqueued_at_ns: now_ns(),
417 attempts: 0,
418 max_attempts: config.max_attempts,
419 acked: false,
420 }),
421 );
422 if let Some(xid) = self.current_xid() {
423 entity.set_xmin(xid);
424 }
425 let id = store
426 .insert_auto(queue, entity)
427 .map_err(|err| RedDBError::Internal(err.to_string()))?;
428 if let Some(ttl_ms) = config.ttl_ms {
429 store
430 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
431 .map_err(|err| RedDBError::Internal(err.to_string()))?;
432 }
433 self.invalidate_result_cache_for_table(queue);
434 Ok(id)
435 }
436
437 pub fn execute_create_queue(
438 &self,
439 raw_query: &str,
440 query: &CreateQueueQuery,
441 ) -> RedDBResult<RuntimeQueryResult> {
442 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
443 if query.dlq.as_deref() == Some(query.name.as_str()) {
444 return Err(RedDBError::Query(
445 "dead-letter queue must be different from the source queue".to_string(),
446 ));
447 }
448
449 let store = self.inner.db.store();
450 let exists = store.get_collection(&query.name).is_some();
451 if exists {
452 if query.if_not_exists {
453 return Ok(RuntimeQueryResult::ok_message(
454 raw_query.to_string(),
455 &format!("queue '{}' already exists", query.name),
456 "create",
457 ));
458 }
459 return Err(RedDBError::Query(format!(
460 "queue '{}' already exists",
461 query.name
462 )));
463 }
464
465 store
466 .create_collection(&query.name)
467 .map_err(|err| RedDBError::Internal(err.to_string()))?;
468 if let Some(ttl_ms) = query.ttl_ms {
469 self.inner
470 .db
471 .set_collection_default_ttl_ms(&query.name, ttl_ms);
472 }
473 self.inner
474 .db
475 .save_collection_contract(queue_collection_contract(
476 &query.name,
477 query.priority,
478 query.ttl_ms,
479 ))
480 .map_err(|err| RedDBError::Internal(err.to_string()))?;
481 save_queue_config(
482 store.as_ref(),
483 &query.name,
484 &QueueRuntimeConfig {
485 mode: query.mode,
486 priority: query.priority,
487 max_size: query.max_size,
488 ttl_ms: query.ttl_ms,
489 dlq: query.dlq.clone(),
490 max_attempts: query.max_attempts,
491 lock_deadline_ms: query.lock_deadline_ms,
492 in_flight_cap_per_group: query.in_flight_cap_per_group,
493 retry_delay_ms: query.retry_delay_ms,
494 },
495 )?;
496
497 if let Some(dlq) = &query.dlq {
498 if store.get_collection(dlq).is_none() {
499 store
500 .create_collection(dlq)
501 .map_err(|err| RedDBError::Internal(err.to_string()))?;
502 self.inner
503 .db
504 .save_collection_contract(queue_collection_contract(dlq, false, None))
505 .map_err(|err| RedDBError::Internal(err.to_string()))?;
506 }
507 }
508
509 self.invalidate_result_cache();
510 self.inner
511 .db
512 .persist_metadata()
513 .map_err(|err| RedDBError::Internal(err.to_string()))?;
514 let mut type_tags = Vec::new();
519 if let Some(dlq) = &query.dlq {
520 type_tags.push(format!("dlq:{}", dlq));
521 }
522 self.schema_vocabulary_apply(
523 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
524 collection: query.name.clone(),
525 columns: vec!["payload".to_string()],
526 type_tags,
527 description: None,
528 },
529 );
530
531 let mut msg = format!("queue '{}' created", query.name);
532 msg.push_str(&format!(" (mode={})", query.mode.as_str()));
533 if query.priority {
534 msg.push_str(" (priority)");
535 }
536 if let Some(max_size) = query.max_size {
537 msg.push_str(&format!(" (max_size={max_size})"));
538 }
539 if let Some(ttl_ms) = query.ttl_ms {
540 msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
541 }
542 if let Some(dlq) = &query.dlq {
543 msg.push_str(&format!(
544 " (dlq={dlq}, max_attempts={})",
545 query.max_attempts
546 ));
547 }
548
549 Ok(RuntimeQueryResult::ok_message(
550 raw_query.to_string(),
551 &msg,
552 "create",
553 ))
554 }
555
556 pub fn execute_alter_queue(
557 &self,
558 raw_query: &str,
559 query: &AlterQueueQuery,
560 ) -> RedDBResult<RuntimeQueryResult> {
561 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
562 let store = self.inner.db.store();
563 ensure_queue_exists(store.as_ref(), &query.name)?;
564
565 let mut config = load_queue_config(store.as_ref(), &query.name);
566 let mut summary: Vec<String> = Vec::new();
567
568 if let Some(new_mode) = query.mode {
569 let pending =
570 load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
571 if !pending.is_empty() {
572 tracing::warn!(
573 queue = %query.name,
574 pending_count = pending.len(),
575 new_mode = %new_mode.as_str(),
576 "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
577 new reads use {}",
578 pending.len(),
579 new_mode.as_str(),
580 );
581 }
582 config.mode = new_mode;
583 summary.push(format!("mode={}", new_mode.as_str()));
584 }
585 if let Some(max_attempts) = query.max_attempts {
586 config.max_attempts = max_attempts;
587 summary.push(format!("max_attempts={max_attempts}"));
588 }
589 if let Some(lock_deadline_ms) = query.lock_deadline_ms {
590 config.lock_deadline_ms = lock_deadline_ms;
591 summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
592 }
593 if let Some(in_flight_cap) = query.in_flight_cap_per_group {
594 config.in_flight_cap_per_group = in_flight_cap;
595 summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
596 }
597 if let Some(dlq) = &query.dlq {
598 if dlq == &query.name {
599 return Err(RedDBError::Query(
600 "dead-letter queue must be different from the source queue".to_string(),
601 ));
602 }
603 config.dlq = Some(dlq.clone());
604 summary.push(format!("dlq={dlq}"));
605 }
606 if let Some(retry_delay_ms) = query.retry_delay_ms {
607 config.retry_delay_ms = if retry_delay_ms == 0 {
608 None
609 } else {
610 Some(retry_delay_ms)
611 };
612 summary.push(format!("retry_delay_ms={retry_delay_ms}"));
613 }
614
615 save_queue_config(store.as_ref(), &query.name, &config)?;
616
617 self.invalidate_result_cache();
618 self.inner
619 .db
620 .persist_metadata()
621 .map_err(|err| RedDBError::Internal(err.to_string()))?;
622
623 Ok(RuntimeQueryResult::ok_message(
624 raw_query.to_string(),
625 &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
626 "alter",
627 ))
628 }
629
630 pub fn execute_drop_queue(
631 &self,
632 raw_query: &str,
633 query: &DropQueueQuery,
634 ) -> RedDBResult<RuntimeQueryResult> {
635 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
636 let store = self.inner.db.store();
637 if super::impl_ddl::is_system_schema_name(&query.name) {
638 return Err(RedDBError::Query("system schema is read-only".to_string()));
639 }
640 if store.get_collection(&query.name).is_none() {
641 if query.if_exists {
642 return Ok(RuntimeQueryResult::ok_message(
643 raw_query.to_string(),
644 &format!("queue '{}' does not exist", query.name),
645 "drop",
646 ));
647 }
648 return Err(RedDBError::NotFound(format!(
649 "queue '{}' not found",
650 query.name
651 )));
652 }
653 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
654 &query.name,
655 &self.inner.db.catalog_model_snapshot(),
656 )?;
657 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
658 crate::catalog::CollectionModel::Queue,
659 actual,
660 )?;
661
662 store
663 .drop_collection(&query.name)
664 .map_err(|err| RedDBError::Internal(err.to_string()))?;
665 self.inner.db.clear_collection_default_ttl_ms(&query.name);
666 self.inner
667 .db
668 .remove_collection_contract(&query.name)
669 .map_err(|err| RedDBError::Internal(err.to_string()))?;
670 remove_queue_metadata(store.as_ref(), &query.name);
671 self.invalidate_result_cache();
672 self.inner
673 .db
674 .persist_metadata()
675 .map_err(|err| RedDBError::Internal(err.to_string()))?;
676 self.schema_vocabulary_apply(
678 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
679 collection: query.name.clone(),
680 },
681 );
682
683 Ok(RuntimeQueryResult::ok_message(
684 raw_query.to_string(),
685 &format!("queue '{}' dropped", query.name),
686 "drop",
687 ))
688 }
689
690 pub fn execute_queue_command(
691 &self,
692 raw_query: &str,
693 cmd: &QueueCommand,
694 ) -> RedDBResult<RuntimeQueryResult> {
695 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
696 match cmd {
697 QueueCommand::Push {
698 queue,
699 value,
700 side,
701 priority,
702 available,
703 } => {
704 let store = self.inner.db.store();
705 ensure_queue_exists(store.as_ref(), queue)?;
706 let config = load_queue_config(store.as_ref(), queue);
707 if priority.is_some() && !config.priority {
708 return Err(RedDBError::Query(format!(
709 "queue '{}' is not a priority queue",
710 queue
711 )));
712 }
713 if let Some(max_size) = config.max_size {
714 let current_len =
715 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
716 .len();
717 if current_len >= max_size {
718 return Err(RedDBError::Query(format!(
719 "queue '{}' is full (max_size={max_size})",
720 queue
721 )));
722 }
723 }
724
725 let position = next_queue_position(store.as_ref(), queue, *side)?;
726 let mut entity = UnifiedEntity::new(
727 EntityId::new(0),
728 EntityKind::QueueMessage {
729 queue: queue.clone(),
730 position,
731 },
732 EntityData::QueueMessage(QueueMessageData {
733 payload: value.clone(),
734 priority: if config.priority { *priority } else { None },
735 enqueued_at_ns: now_ns(),
736 attempts: 0,
737 max_attempts: config.max_attempts,
738 acked: false,
739 }),
740 );
741 if let Some(xid) = self.current_xid() {
744 entity.set_xmin(xid);
745 }
746 let id = store
747 .insert_auto(queue, entity)
748 .map_err(|err| RedDBError::Internal(err.to_string()))?;
749 let available_at_ns = available.map(|a| match a {
754 crate::storage::query::ast::QueueAvailability::DelayMs(ms) => {
755 now_ns().saturating_add(ms.saturating_mul(1_000_000))
756 }
757 crate::storage::query::ast::QueueAvailability::AtUnixMs(ms) => {
758 ms.saturating_mul(1_000_000)
759 }
760 });
761 if config.ttl_ms.is_some() || available_at_ns.is_some() {
762 store
763 .set_metadata(
764 queue,
765 id,
766 queue_message_metadata(config.ttl_ms, available_at_ns),
767 )
768 .map_err(|err| RedDBError::Internal(err.to_string()))?;
769 }
770 self.record_queue_wake(&queue_wait_scope(), queue);
775 self.invalidate_result_cache();
776
777 let mut result = UnifiedResult::with_columns(vec![
778 "message_id".into(),
779 "side".into(),
780 "queue".into(),
781 ]);
782 let mut record = UnifiedRecord::new();
783 record.set("message_id", Value::text(message_id_string(id)));
784 record.set(
785 "side",
786 Value::text(match side {
787 QueueSide::Left => "left".to_string(),
788 QueueSide::Right => "right".to_string(),
789 }),
790 );
791 record.set("queue", Value::text(queue.clone()));
792 result.push(record);
793
794 Ok(RuntimeQueryResult {
795 query: raw_query.to_string(),
796 mode: QueryMode::Sql,
797 statement: "queue_push",
798 engine: "runtime-queue",
799 result,
800 affected_rows: 1,
801 statement_type: "insert",
802 bookmark: None,
803 })
804 }
805 QueueCommand::Pop { queue, side, count } => {
806 let store = self.inner.db.store();
807 ensure_queue_exists(store.as_ref(), queue)?;
808 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
809 let popped = lifecycle
810 .pop(queue, ast_side_to_lc(*side), *count, &txn)
811 .map_err(map_qse)?;
812
813 let mut result =
814 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
815 for (message_id, payload) in &popped {
816 let mut record = UnifiedRecord::new();
817 record.set(
818 "message_id",
819 Value::text(message_id_string(EntityId::new(*message_id))),
820 );
821 record.set("payload", payload.clone());
822 result.push(record);
823 }
824 let popped_count = popped.len() as u64;
825 if popped_count > 0 {
826 self.invalidate_result_cache();
827 }
828
829 Ok(RuntimeQueryResult {
830 query: raw_query.to_string(),
831 mode: QueryMode::Sql,
832 statement: "queue_pop",
833 engine: "runtime-queue",
834 result,
835 affected_rows: popped_count,
836 statement_type: "delete",
837 bookmark: None,
838 })
839 }
840 QueueCommand::Peek { queue, count } => {
841 let store = self.inner.db.store();
842 ensure_queue_exists(store.as_ref(), queue)?;
843 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
844 let messages = lifecycle.peek(queue, *count, &txn);
845
846 let mut result =
847 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
848 for message in messages {
849 let mut record = UnifiedRecord::new();
850 record.set(
851 "message_id",
852 Value::text(message_id_string(EntityId::new(message.message_id))),
853 );
854 record.set("payload", message.payload);
855 result.push(record);
856 }
857
858 Ok(RuntimeQueryResult {
859 query: raw_query.to_string(),
860 mode: QueryMode::Sql,
861 statement: "queue_peek",
862 engine: "runtime-queue",
863 result,
864 affected_rows: 0,
865 statement_type: "select",
866 bookmark: None,
867 })
868 }
869 QueueCommand::Len { queue } => {
870 let store = self.inner.db.store();
871 ensure_queue_exists(store.as_ref(), queue)?;
872 let count =
873 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
874 as u64;
875 let mut result = UnifiedResult::with_columns(vec!["len".into()]);
876 let mut record = UnifiedRecord::new();
877 record.set("len", Value::UnsignedInteger(count));
878 result.push(record);
879
880 Ok(RuntimeQueryResult {
881 query: raw_query.to_string(),
882 mode: QueryMode::Sql,
883 statement: "queue_len",
884 engine: "runtime-queue",
885 result,
886 affected_rows: 0,
887 statement_type: "select",
888 bookmark: None,
889 })
890 }
891 QueueCommand::Purge { queue } => {
892 let store = self.inner.db.store();
893 ensure_queue_exists(store.as_ref(), queue)?;
894 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
895 let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
896 if count > 0 {
897 self.invalidate_result_cache();
898 }
899
900 Ok(RuntimeQueryResult::ok_message(
901 raw_query.to_string(),
902 &format!("{count} messages purged from queue '{queue}'"),
903 "delete",
904 ))
905 }
906 QueueCommand::GroupCreate { queue, group } => {
907 let store = self.inner.db.store();
908 ensure_queue_exists(store.as_ref(), queue)?;
909 if queue_group_exists(store.as_ref(), queue, group)? {
910 return Ok(RuntimeQueryResult::ok_message(
911 raw_query.to_string(),
912 &format!(
913 "consumer group '{}' already exists on queue '{}'",
914 group, queue
915 ),
916 "create",
917 ));
918 }
919 save_queue_group(store.as_ref(), queue, group)?;
920 self.invalidate_result_cache();
921
922 Ok(RuntimeQueryResult::ok_message(
923 raw_query.to_string(),
924 &format!("consumer group '{}' created on queue '{}'", group, queue),
925 "create",
926 ))
927 }
928 QueueCommand::GroupRead {
929 queue,
930 group,
931 consumer,
932 count,
933 wait_ms,
934 } => {
935 let store = self.inner.db.store();
936 ensure_queue_exists(store.as_ref(), queue)?;
937 if let Some(ms) = *wait_ms {
943 if self.current_xid().is_some() {
944 return Err(RedDBError::Query(
945 "QUEUE READ … WAIT is autocommit-only: refusing to park inside an explicit transaction (BEGIN/COMMIT)"
946 .to_string(),
947 ));
948 }
949 let cap =
950 self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
951 if ms > cap {
952 return Err(RedDBError::Query(format!(
953 "QUEUE READ … WAIT {ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
954 )));
955 }
956 }
957 let config = load_queue_config(store.as_ref(), queue);
961 let group_owned =
962 resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
963 let group_ref = group_owned.as_str();
964 let delivered = self
965 .group_read_with_optional_wait(queue, group_ref, consumer, *count, *wait_ms)?;
966
967 {
971 let lease_count = u32::try_from(delivered.len()).unwrap_or(u32::MAX);
972 let now_ns = std::time::SystemTime::now()
973 .duration_since(std::time::UNIX_EPOCH)
974 .map(|d| d.as_nanos() as u64)
975 .unwrap_or(0);
976 self.queue_presence().heartbeat(
977 queue,
978 group_ref,
979 consumer,
980 lease_count,
981 now_ns,
982 );
983 }
984
985 let mut result = UnifiedResult::with_columns(vec![
986 "message_id".into(),
987 "payload".into(),
988 "consumer".into(),
989 "delivery_count".into(),
990 "attempts".into(),
991 ]);
992
993 for message in delivered {
994 let mut record = UnifiedRecord::new();
995 record.set(
996 "message_id",
997 Value::text(message_id_string(EntityId::new(message.message_id))),
998 );
999 record.set("payload", message.payload);
1000 record.set("consumer", Value::text(message.consumer));
1001 record.set(
1002 "delivery_count",
1003 Value::UnsignedInteger(u64::from(message.delivery_count)),
1004 );
1005 record.set(
1006 "attempts",
1007 Value::UnsignedInteger(u64::from(message.delivery_count)),
1008 );
1009 result.push(record);
1010 }
1011 if !result.records.is_empty() {
1012 self.invalidate_result_cache();
1013 }
1014
1015 Ok(RuntimeQueryResult {
1016 query: raw_query.to_string(),
1017 mode: QueryMode::Sql,
1018 statement: "queue_group_read",
1019 engine: "runtime-queue",
1020 result,
1021 affected_rows: 0,
1022 statement_type: "select",
1023 bookmark: None,
1024 })
1025 }
1026 QueueCommand::Pending { queue, group } => {
1027 let store = self.inner.db.store();
1028 ensure_queue_exists(store.as_ref(), queue)?;
1029 require_queue_group(store.as_ref(), queue, group)?;
1030 let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
1031 pending.sort_by_key(|entry| entry.delivered_at_ns);
1032 let current_time_ns = now_ns();
1033
1034 let mut result = UnifiedResult::with_columns(vec![
1035 "message_id".into(),
1036 "consumer".into(),
1037 "delivered_at_ns".into(),
1038 "delivery_count".into(),
1039 "idle_ms".into(),
1040 ]);
1041 for entry in pending {
1042 let mut record = UnifiedRecord::new();
1043 record.set(
1044 "message_id",
1045 Value::text(message_id_string(entry.message_id)),
1046 );
1047 record.set("consumer", Value::text(entry.consumer));
1048 record.set(
1049 "delivered_at_ns",
1050 Value::UnsignedInteger(entry.delivered_at_ns),
1051 );
1052 record.set(
1053 "delivery_count",
1054 Value::UnsignedInteger(u64::from(entry.delivery_count)),
1055 );
1056 record.set(
1057 "idle_ms",
1058 Value::UnsignedInteger(
1059 current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
1060 ),
1061 );
1062 result.push(record);
1063 }
1064
1065 Ok(RuntimeQueryResult {
1066 query: raw_query.to_string(),
1067 mode: QueryMode::Sql,
1068 statement: "queue_pending",
1069 engine: "runtime-queue",
1070 result,
1071 affected_rows: 0,
1072 statement_type: "select",
1073 bookmark: None,
1074 })
1075 }
1076 QueueCommand::Claim {
1077 queue,
1078 group,
1079 consumer,
1080 min_idle_ms,
1081 } => {
1082 let store = self.inner.db.store();
1083 ensure_queue_exists(store.as_ref(), queue)?;
1084 require_queue_group(store.as_ref(), queue, group)?;
1085 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1086 let delivered = lifecycle
1087 .claim_delivering(queue, consumer, *min_idle_ms, &txn)
1088 .map_err(map_qse)?;
1089
1090 let mut result = UnifiedResult::with_columns(vec![
1091 "message_id".into(),
1092 "payload".into(),
1093 "consumer".into(),
1094 "delivery_count".into(),
1095 ]);
1096
1097 for message in delivered {
1098 let mut record = UnifiedRecord::new();
1099 record.set(
1100 "message_id",
1101 Value::text(message_id_string(EntityId::new(message.message_id))),
1102 );
1103 record.set("payload", message.payload);
1104 record.set("consumer", Value::text(message.consumer));
1105 record.set(
1106 "delivery_count",
1107 Value::UnsignedInteger(u64::from(message.delivery_count)),
1108 );
1109 result.push(record);
1110 }
1111 if !result.records.is_empty() {
1112 self.invalidate_result_cache();
1113 }
1114 let affected_rows = result.records.len() as u64;
1115
1116 Ok(RuntimeQueryResult {
1117 query: raw_query.to_string(),
1118 mode: QueryMode::Sql,
1119 statement: "queue_claim",
1120 engine: "runtime-queue",
1121 result,
1122 affected_rows,
1123 statement_type: "update",
1124 bookmark: None,
1125 })
1126 }
1127 QueueCommand::Ack {
1128 queue,
1129 group,
1130 message_id,
1131 delivery_id,
1132 } => {
1133 let store = self.inner.db.store();
1134 ensure_queue_exists(store.as_ref(), queue)?;
1135 let (group_owned, message_entity) = resolve_ack_nack_handle(
1136 store.as_ref(),
1137 queue,
1138 group,
1139 message_id,
1140 delivery_id.as_deref(),
1141 )?;
1142 let group_ref = group_owned.as_str();
1143 require_queue_group(store.as_ref(), queue, group_ref)?;
1144 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1145 let did = match delivery_id.as_deref() {
1146 Some(d) => d.to_string(),
1147 None => ps
1148 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1149 .ok_or_else(|| {
1150 RedDBError::NotFound(format!(
1151 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1152 message_entity.raw(),
1153 queue,
1154 group_ref
1155 ))
1156 })?,
1157 };
1158 lifecycle.ack(&txn, &did).map_err(map_qse)?;
1159 self.invalidate_result_cache();
1160
1161 Ok(RuntimeQueryResult::ok_message(
1162 raw_query.to_string(),
1163 "message acknowledged",
1164 "update",
1165 ))
1166 }
1167 QueueCommand::Nack {
1168 queue,
1169 group,
1170 message_id,
1171 delivery_id,
1172 delay_ms,
1173 } => {
1174 let store = self.inner.db.store();
1175 ensure_queue_exists(store.as_ref(), queue)?;
1176 let config = load_queue_config(store.as_ref(), queue);
1177 if delay_ms.is_some() {
1183 if let Some((_, role)) = current_auth_identity() {
1184 if !role.can_write() {
1185 return Err(RedDBError::InvalidOperation(format!(
1186 "role '{role}' is not authorized to override NACK retry delay on queue '{queue}'"
1187 )));
1188 }
1189 }
1190 }
1191 let (group_owned, message_entity) = resolve_ack_nack_handle(
1192 store.as_ref(),
1193 queue,
1194 group,
1195 message_id,
1196 delivery_id.as_deref(),
1197 )?;
1198 let group_ref = group_owned.as_str();
1199 require_queue_group(store.as_ref(), queue, group_ref)?;
1200 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1201 let did = match delivery_id.as_deref() {
1202 Some(d) => d.to_string(),
1203 None => ps
1204 .find_pending_by_key(queue, message_entity.raw(), group_ref)
1205 .ok_or_else(|| {
1206 RedDBError::NotFound(format!(
1207 "no pending delivery for message '{}' on queue '{}' (group '{}')",
1208 message_entity.raw(),
1209 queue,
1210 group_ref
1211 ))
1212 })?,
1213 };
1214 let effective_delay_ms = delay_ms.or(config.retry_delay_ms).unwrap_or(0);
1218 let outcome = lifecycle.nack(&txn, &did).map_err(map_qse)?;
1219 if matches!(outcome, RetirementOutcome::Requeued) && effective_delay_ms > 0 {
1223 let at_ns =
1224 now_ns().saturating_add(effective_delay_ms.saturating_mul(1_000_000));
1225 set_message_available_at_ns(
1226 store.as_ref(),
1227 queue,
1228 message_entity,
1229 Some(at_ns),
1230 config.ttl_ms,
1231 )?;
1232 }
1233 self.maybe_emit_nack_audit(
1240 queue,
1241 group_ref,
1242 &did,
1243 *delay_ms,
1244 config.retry_delay_ms,
1245 &outcome,
1246 );
1247 let message = match outcome {
1248 RetirementOutcome::Requeued => {
1249 if effective_delay_ms > 0 {
1250 format!("message requeued (delay={effective_delay_ms}ms)")
1251 } else {
1252 "message requeued".to_string()
1253 }
1254 }
1255 RetirementOutcome::MovedToDlq(dlq) => {
1256 format!("message moved to dead-letter queue '{}'", dlq)
1257 }
1258 RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
1259 };
1260 self.invalidate_result_cache();
1261
1262 Ok(RuntimeQueryResult::ok_message(
1263 raw_query.to_string(),
1264 &message,
1265 "update",
1266 ))
1267 }
1268 QueueCommand::Move {
1269 source,
1270 destination,
1271 filter,
1272 limit,
1273 } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
1274 }
1275 }
1276
1277 pub fn execute_queue_select(
1278 &self,
1279 raw_query: &str,
1280 query: &QueueSelectQuery,
1281 ) -> RedDBResult<RuntimeQueryResult> {
1282 let store = self.inner.db.store();
1283 ensure_queue_exists(store.as_ref(), &query.queue)?;
1284 let config = load_queue_config(store.as_ref(), &query.queue);
1285 let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
1286 let columns = if query.columns.is_empty() {
1287 queue_projection_default_columns()
1288 } else {
1289 query.columns.clone()
1290 };
1291
1292 let mut messages =
1293 load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1294 sort_queue_messages(&mut messages, &config, QueueSide::Left);
1295
1296 let mut result = UnifiedResult::with_columns(columns.clone());
1297 for message in messages {
1298 if query
1299 .filter
1300 .as_ref()
1301 .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1302 {
1303 continue;
1304 }
1305 let record = queue_projection_record(&columns, &message, dlq)?;
1306 result.push(record);
1307 if query
1308 .limit
1309 .is_some_and(|limit| result.records.len() >= limit as usize)
1310 {
1311 break;
1312 }
1313 }
1314
1315 Ok(RuntimeQueryResult {
1316 query: raw_query.to_string(),
1317 mode: QueryMode::Sql,
1318 statement: "queue_select",
1319 engine: "runtime-queue",
1320 result,
1321 affected_rows: 0,
1322 statement_type: "select",
1323 bookmark: None,
1324 })
1325 }
1326
1327 fn execute_queue_move(
1328 &self,
1329 raw_query: &str,
1330 source: &str,
1331 destination: &str,
1332 filter: Option<&Filter>,
1333 limit: usize,
1334 ) -> RedDBResult<RuntimeQueryResult> {
1335 if source == destination {
1336 return Err(RedDBError::Query(
1337 "QUEUE MOVE source and destination must be different".to_string(),
1338 ));
1339 }
1340 let store = self.inner.db.store();
1341 ensure_queue_exists(store.as_ref(), source)?;
1342 ensure_queue_exists(store.as_ref(), destination)?;
1343 let source_config = load_queue_config(store.as_ref(), source);
1344 let destination_config = load_queue_config(store.as_ref(), destination);
1345 let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1346
1347 let mut messages =
1348 load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1349 sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1350 let selected = messages
1351 .into_iter()
1352 .filter(|message| {
1353 filter
1354 .map(|f| queue_message_matches_filter(message, source_dlq, f))
1355 .unwrap_or(true)
1356 })
1357 .take(limit)
1358 .collect::<Vec<_>>();
1359
1360 if let Some(max_size) = destination_config.max_size {
1361 let current_len =
1362 load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1363 .len();
1364 if current_len + selected.len() > max_size {
1365 return Err(RedDBError::Query(format!(
1366 "queue '{}' is full (max_size={max_size})",
1367 destination
1368 )));
1369 }
1370 }
1371
1372 for message in &selected {
1373 let lock = queue_message_lock_handle(self, source, message.id);
1374 let Some(_guard) = lock.try_lock() else {
1375 return Err(RedDBError::Query(format!(
1376 "message '{}' is locked on queue '{}'",
1377 message.id.raw(),
1378 source
1379 )));
1380 };
1381 if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1382 return Err(RedDBError::Query(format!(
1383 "message '{}' is no longer available on queue '{}'",
1384 message.id.raw(),
1385 source
1386 )));
1387 }
1388 }
1389
1390 let mut inserted = Vec::new();
1391 for message in &selected {
1392 match insert_moved_queue_message(
1393 store.as_ref(),
1394 destination,
1395 &destination_config,
1396 message,
1397 ) {
1398 Ok(id) => inserted.push(id),
1399 Err(err) => {
1400 for id in inserted {
1401 let _ = store.delete(destination, id);
1402 }
1403 return Err(err);
1404 }
1405 }
1406 }
1407
1408 let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1409 for message in &selected {
1410 move_lifecycle
1411 .delete_with_state(source, message.id.raw(), &move_txn)
1412 .map_err(map_qse)?;
1413 }
1414 if !selected.is_empty() {
1415 self.invalidate_result_cache();
1416 }
1417
1418 let selected_count = selected.len() as u64;
1419 self.audit_log().record_event(
1420 AuditEvent::builder("queue/move")
1421 .source(AuditAuthSource::System)
1422 .outcome(Outcome::Success)
1423 .resource(format!("queue:{source}->{destination}"))
1424 .fields([
1425 AuditFieldEscaper::field("source", source),
1426 AuditFieldEscaper::field("destination", destination),
1427 AuditFieldEscaper::field("selected", selected_count),
1428 AuditFieldEscaper::field("committed", selected_count),
1429 ])
1430 .build(),
1431 );
1432
1433 let mut result = UnifiedResult::with_columns(vec![
1434 "source".into(),
1435 "destination".into(),
1436 "selected".into(),
1437 "committed".into(),
1438 ]);
1439 let mut record = UnifiedRecord::new();
1440 record.set("source", Value::text(source.to_string()));
1441 record.set("destination", Value::text(destination.to_string()));
1442 record.set("selected", Value::UnsignedInteger(selected_count));
1443 record.set("committed", Value::UnsignedInteger(selected_count));
1444 result.push(record);
1445
1446 Ok(RuntimeQueryResult {
1447 query: raw_query.to_string(),
1448 mode: QueryMode::Sql,
1449 statement: "queue_move",
1450 engine: "runtime-queue",
1451 result,
1452 affected_rows: selected_count,
1453 statement_type: "update",
1454 bookmark: None,
1455 })
1456 }
1457
1458 fn maybe_emit_nack_audit(
1473 &self,
1474 queue: &str,
1475 group: &str,
1476 delivery_id: &str,
1477 override_ms: Option<u64>,
1478 default_ms: Option<u64>,
1479 outcome: &RetirementOutcome,
1480 ) {
1481 let Some(override_ms) = override_ms else {
1482 return;
1483 };
1484 let outcome_label = match outcome {
1485 RetirementOutcome::Requeued => "requeued",
1486 RetirementOutcome::MovedToDlq(_) => "dlq",
1487 RetirementOutcome::Dropped => "dropped",
1488 };
1489 const SIGNIFICANT_DELAY_MS: u64 = 60_000;
1490 let destination_changed = !matches!(outcome, RetirementOutcome::Requeued);
1491 if override_ms < SIGNIFICANT_DELAY_MS && !destination_changed {
1492 return;
1493 }
1494 self.audit_log().record_event(
1495 AuditEvent::builder("queue/nack/override")
1496 .source(AuditAuthSource::System)
1497 .outcome(Outcome::Success)
1498 .resource(format!("queue:{queue}"))
1499 .fields([
1500 AuditFieldEscaper::field("queue", queue),
1501 AuditFieldEscaper::field("group", group),
1502 AuditFieldEscaper::field("delivery_id", delivery_id),
1503 AuditFieldEscaper::field("override_delay_ms", override_ms),
1504 AuditFieldEscaper::field("default_delay_ms", default_ms.unwrap_or(0)),
1505 AuditFieldEscaper::field("outcome", outcome_label),
1506 ])
1507 .build(),
1508 );
1509 }
1510}
1511
1512fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1513 if store.get_collection(queue).is_some() {
1514 Ok(())
1515 } else {
1516 Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1517 }
1518}
1519
1520pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1521 let default = QueueRuntimeConfig {
1522 mode: QueueMode::Work,
1523 priority: false,
1524 max_size: None,
1525 ttl_ms: None,
1526 dlq: None,
1527 max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1528 lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1529 in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1530 retry_delay_ms: None,
1531 };
1532
1533 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1534 return default;
1535 };
1536 manager
1537 .query_all(|entity| {
1538 entity.data.as_row().is_some_and(|row| {
1539 row_text(row, "kind").as_deref() == Some("queue_config")
1540 && row_text(row, "queue").as_deref() == Some(queue)
1541 })
1542 })
1543 .into_iter()
1544 .find_map(|entity| {
1545 let row = entity.data.as_row()?;
1546 Some(QueueRuntimeConfig {
1547 mode: row_text(row, "mode")
1548 .as_deref()
1549 .and_then(QueueMode::parse)
1550 .unwrap_or_default(),
1551 priority: row_bool(row, "priority").unwrap_or(false),
1552 max_size: row_u64(row, "max_size").map(|value| value as usize),
1553 ttl_ms: row_u64(row, "ttl_ms"),
1554 dlq: row_text(row, "dlq"),
1555 max_attempts: row_u64(row, "max_attempts")
1556 .map(|value| value as u32)
1557 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1558 lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1559 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1560 in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1561 .map(|value| value as u32)
1562 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1563 retry_delay_ms: row_u64(row, "retry_delay_ms").filter(|v| *v > 0),
1564 })
1565 })
1566 .unwrap_or(default)
1567}
1568
1569pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1570 load_queue_config(store, queue).mode.as_str()
1571}
1572
1573fn save_queue_config(
1574 store: &UnifiedStore,
1575 queue: &str,
1576 config: &QueueRuntimeConfig,
1577) -> RedDBResult<()> {
1578 remove_meta_rows(store, |row| {
1579 row_text(row, "kind").as_deref() == Some("queue_config")
1580 && row_text(row, "queue").as_deref() == Some(queue)
1581 });
1582
1583 let mut fields = HashMap::new();
1584 fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1585 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1586 fields.insert(
1587 "mode".to_string(),
1588 Value::text(config.mode.as_str().to_string()),
1589 );
1590 fields.insert("priority".to_string(), Value::Boolean(config.priority));
1591 fields.insert(
1592 "max_size".to_string(),
1593 config
1594 .max_size
1595 .map(|value| Value::UnsignedInteger(value as u64))
1596 .unwrap_or(Value::Null),
1597 );
1598 fields.insert(
1599 "ttl_ms".to_string(),
1600 config
1601 .ttl_ms
1602 .map(Value::UnsignedInteger)
1603 .unwrap_or(Value::Null),
1604 );
1605 fields.insert(
1606 "dlq".to_string(),
1607 config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1608 );
1609 fields.insert(
1610 "max_attempts".to_string(),
1611 Value::UnsignedInteger(u64::from(config.max_attempts)),
1612 );
1613 fields.insert(
1614 "lock_deadline_ms".to_string(),
1615 Value::UnsignedInteger(config.lock_deadline_ms),
1616 );
1617 fields.insert(
1618 "in_flight_cap_per_group".to_string(),
1619 Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1620 );
1621 fields.insert(
1622 "retry_delay_ms".to_string(),
1623 config
1624 .retry_delay_ms
1625 .map(Value::UnsignedInteger)
1626 .unwrap_or(Value::Null),
1627 );
1628 insert_meta_row(store, fields)
1629}
1630
1631fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1632 remove_meta_rows(store, |row| {
1633 row_text(row, "queue").as_deref() == Some(queue)
1634 });
1635}
1636
1637fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1638 Ok(load_queue_groups(store, queue)?
1639 .into_iter()
1640 .any(|entry| entry.group == group))
1641}
1642
1643pub(super) fn require_queue_group(
1644 store: &UnifiedStore,
1645 queue: &str,
1646 group: &str,
1647) -> RedDBResult<()> {
1648 if queue_group_exists(store, queue, group)? {
1649 Ok(())
1650 } else {
1651 Err(RedDBError::NotFound(format!(
1652 "consumer group '{}' not found on queue '{}'",
1653 group, queue
1654 )))
1655 }
1656}
1657
1658pub(super) fn resolve_read_group(
1659 store: &UnifiedStore,
1660 queue: &str,
1661 group: Option<&str>,
1662 consumer: &str,
1663 config: &QueueRuntimeConfig,
1664) -> RedDBResult<String> {
1665 if let Some(group) = group {
1666 require_queue_group(store, queue, group)?;
1667 return Ok(group.to_string());
1668 }
1669
1670 match config.mode {
1671 QueueMode::Work => {
1672 if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1673 save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1674 }
1675 Ok(WORK_DEFAULT_GROUP.to_string())
1676 }
1677 QueueMode::Fanout => {
1678 let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1679 if !queue_group_exists(store, queue, &fanout_group)? {
1680 save_queue_group(store, queue, &fanout_group)?;
1681 }
1682 Ok(fanout_group)
1683 }
1684 }
1685}
1686
1687fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1688 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1689 return Ok(Vec::new());
1690 };
1691 Ok(manager
1692 .query_all(|entity| {
1693 entity.data.as_row().is_some_and(|row| {
1694 row_text(row, "kind").as_deref() == Some("queue_group")
1695 && row_text(row, "queue").as_deref() == Some(queue)
1696 })
1697 })
1698 .into_iter()
1699 .filter_map(|entity| {
1700 let row = entity.data.as_row()?;
1701 Some(QueueGroupEntry {
1702 entity_id: entity.id,
1703 group: row_text(row, "group")?,
1704 })
1705 })
1706 .collect())
1707}
1708
1709fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1710 let mut fields = HashMap::new();
1711 fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1712 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1713 fields.insert("group".to_string(), Value::text(group.to_string()));
1714 fields.insert(
1715 "created_at_ns".to_string(),
1716 Value::UnsignedInteger(now_ns()),
1717 );
1718 insert_meta_row(store, fields)
1719}
1720
1721pub(super) fn load_pending_entries(
1722 store: &UnifiedStore,
1723 queue: &str,
1724 group: Option<&str>,
1725 message_id: Option<EntityId>,
1726) -> RedDBResult<Vec<QueuePendingEntry>> {
1727 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1728 return Ok(Vec::new());
1729 };
1730 Ok(manager
1731 .query_all(|entity| {
1732 entity.data.as_row().is_some_and(|row| {
1733 row_text(row, "kind").as_deref() == Some("queue_pending")
1734 && row_text(row, "queue").as_deref() == Some(queue)
1735 && group
1736 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1737 .unwrap_or(true)
1738 && message_id
1739 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1740 .unwrap_or(true)
1741 })
1742 })
1743 .into_iter()
1744 .filter_map(|entity| {
1745 let row = entity.data.as_row()?;
1746 Some(QueuePendingEntry {
1747 entity_id: entity.id,
1748 group: row_text(row, "group")?,
1749 message_id: EntityId::new(row_u64(row, "message_id")?),
1750 consumer: row_text(row, "consumer")?,
1751 delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1752 delivery_count: row_u64(row, "delivery_count")
1753 .map(|value| value as u32)
1754 .unwrap_or(1),
1755 })
1756 })
1757 .collect())
1758}
1759
1760pub(super) fn save_queue_pending(
1761 store: &UnifiedStore,
1762 queue: &str,
1763 group: &str,
1764 message_id: EntityId,
1765 consumer: &str,
1766 delivered_at_ns: u64,
1767 delivery_count: u32,
1768) -> RedDBResult<()> {
1769 remove_meta_rows(store, |row| {
1770 row_text(row, "kind").as_deref() == Some("queue_pending")
1771 && row_text(row, "queue").as_deref() == Some(queue)
1772 && row_text(row, "group").as_deref() == Some(group)
1773 && row_u64(row, "message_id") == Some(message_id.raw())
1774 });
1775
1776 let mut fields = HashMap::new();
1777 fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1778 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1779 fields.insert("group".to_string(), Value::text(group.to_string()));
1780 fields.insert(
1781 "message_id".to_string(),
1782 Value::UnsignedInteger(message_id.raw()),
1783 );
1784 fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1785 fields.insert(
1786 "delivered_at_ns".to_string(),
1787 Value::UnsignedInteger(delivered_at_ns),
1788 );
1789 fields.insert(
1790 "delivery_count".to_string(),
1791 Value::UnsignedInteger(u64::from(delivery_count)),
1792 );
1793 insert_meta_row(store, fields)
1794}
1795
1796pub(super) fn require_pending_entry(
1797 store: &UnifiedStore,
1798 queue: &str,
1799 group: &str,
1800 message_id: EntityId,
1801) -> RedDBResult<QueuePendingEntry> {
1802 load_pending_entries(store, queue, Some(group), Some(message_id))?
1803 .into_iter()
1804 .next()
1805 .ok_or_else(|| {
1806 RedDBError::NotFound(format!(
1807 "message '{}' is not pending in group '{}' on queue '{}'",
1808 message_id.raw(),
1809 group,
1810 queue
1811 ))
1812 })
1813}
1814
1815pub(super) fn load_ack_entries(
1816 store: &UnifiedStore,
1817 queue: &str,
1818 group: Option<&str>,
1819 message_id: Option<EntityId>,
1820) -> RedDBResult<Vec<QueueAckEntry>> {
1821 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1822 return Ok(Vec::new());
1823 };
1824 Ok(manager
1825 .query_all(|entity| {
1826 entity.data.as_row().is_some_and(|row| {
1827 row_text(row, "kind").as_deref() == Some("queue_ack")
1828 && row_text(row, "queue").as_deref() == Some(queue)
1829 && group
1830 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1831 .unwrap_or(true)
1832 && message_id
1833 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1834 .unwrap_or(true)
1835 })
1836 })
1837 .into_iter()
1838 .filter_map(|entity| {
1839 let row = entity.data.as_row()?;
1840 Some(QueueAckEntry {
1841 entity_id: entity.id,
1842 group: row_text(row, "group")?,
1843 message_id: EntityId::new(row_u64(row, "message_id")?),
1844 })
1845 })
1846 .collect())
1847}
1848
1849pub(super) fn save_queue_ack(
1850 store: &UnifiedStore,
1851 queue: &str,
1852 group: &str,
1853 message_id: EntityId,
1854) -> RedDBResult<()> {
1855 let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1856 if !existing.is_empty() {
1857 return Ok(());
1858 }
1859
1860 let mut fields = HashMap::new();
1861 fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1862 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1863 fields.insert("group".to_string(), Value::text(group.to_string()));
1864 fields.insert(
1865 "message_id".to_string(),
1866 Value::UnsignedInteger(message_id.raw()),
1867 );
1868 fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1869 insert_meta_row(store, fields)
1870}
1871
1872pub(super) fn queue_message_completed_for_all_groups(
1873 store: &UnifiedStore,
1874 queue: &str,
1875 message_id: EntityId,
1876) -> RedDBResult<bool> {
1877 let groups = load_queue_groups(store, queue)?;
1878 let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1879 if !pending.is_empty() {
1880 return Ok(false);
1881 }
1882 if groups.is_empty() {
1883 return Ok(true);
1884 }
1885
1886 let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1887 .into_iter()
1888 .map(|entry| entry.group)
1889 .collect::<HashSet<_>>();
1890 Ok(groups
1891 .into_iter()
1892 .all(|group| acked_groups.contains(&group.group)))
1893}
1894
1895fn load_queue_message_views(
1896 store: &UnifiedStore,
1897 queue: &str,
1898) -> RedDBResult<Vec<QueueMessageView>> {
1899 load_queue_message_views_with_runtime(None, store, queue)
1900}
1901
1902pub(super) fn load_queue_message_views_with_runtime(
1909 runtime: Option<&RedDBRuntime>,
1910 store: &UnifiedStore,
1911 queue: &str,
1912) -> RedDBResult<Vec<QueueMessageView>> {
1913 let manager = store
1914 .get_collection(queue)
1915 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1916 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1920 let rls_filter = runtime.and_then(|rt| {
1921 crate::runtime::impl_core::rls_policy_filter_for_kind(
1922 rt,
1923 queue,
1924 crate::storage::query::ast::PolicyAction::Select,
1925 crate::storage::query::ast::PolicyTargetKind::Messages,
1926 )
1927 });
1928 let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1929 && rls_filter.is_none()
1930 && runtime.is_some();
1931 if rls_enabled_but_denied {
1932 return Ok(Vec::new());
1934 }
1935 let filter_arc = rls_filter.map(std::sync::Arc::new);
1936 let rt_arc = runtime;
1937 Ok(manager
1938 .query_all(move |entity| {
1939 if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1940 return false;
1941 }
1942 if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1943 return false;
1944 }
1945 if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1946 return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1947 Some(&rt.inner.db),
1948 entity,
1949 filter,
1950 queue,
1951 queue,
1952 );
1953 }
1954 true
1955 })
1956 .into_iter()
1957 .filter_map(queue_message_view_from_entity)
1958 .map(|mut view| {
1959 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
1960 view
1961 })
1962 .collect())
1963}
1964
1965fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1966 let (position, _) = match &entity.kind {
1967 EntityKind::QueueMessage { position, queue } => (*position, queue),
1968 _ => return None,
1969 };
1970 let data = match entity.data {
1971 EntityData::QueueMessage(data) => data,
1972 _ => return None,
1973 };
1974 Some(QueueMessageView {
1975 id: entity.id,
1976 position,
1977 priority: data.priority.unwrap_or(0),
1978 payload: data.payload,
1979 attempts: data.attempts,
1980 max_attempts: data.max_attempts,
1981 enqueued_at_ns: data.enqueued_at_ns,
1982 available_at_ns: None,
1983 })
1984}
1985
1986pub(super) fn insert_moved_queue_message_payload(
1993 store: &UnifiedStore,
1994 queue: &str,
1995 payload: &Value,
1996) -> RedDBResult<EntityId> {
1997 let config = load_queue_config(store, queue);
1998 let position = next_queue_position(store, queue, QueueSide::Right)?;
1999 let enqueued_at_ns = std::time::SystemTime::now()
2000 .duration_since(std::time::UNIX_EPOCH)
2001 .map(|d| d.as_nanos() as u64)
2002 .unwrap_or(0);
2003 let entity = UnifiedEntity::new(
2004 EntityId::new(0),
2005 EntityKind::QueueMessage {
2006 queue: queue.to_string(),
2007 position,
2008 },
2009 EntityData::QueueMessage(QueueMessageData {
2010 payload: payload.clone(),
2011 priority: None,
2012 enqueued_at_ns,
2013 attempts: 0,
2014 max_attempts: config.max_attempts,
2015 acked: false,
2016 }),
2017 );
2018 let id = store
2019 .insert_auto(queue, entity)
2020 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2021 if let Some(ttl_ms) = config.ttl_ms {
2022 store
2023 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2024 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2025 }
2026 Ok(id)
2027}
2028
2029fn insert_moved_queue_message(
2030 store: &UnifiedStore,
2031 queue: &str,
2032 config: &QueueRuntimeConfig,
2033 message: &QueueMessageView,
2034) -> RedDBResult<EntityId> {
2035 let position = next_queue_position(store, queue, QueueSide::Right)?;
2036 let entity = UnifiedEntity::new(
2037 EntityId::new(0),
2038 EntityKind::QueueMessage {
2039 queue: queue.to_string(),
2040 position,
2041 },
2042 EntityData::QueueMessage(QueueMessageData {
2043 payload: message.payload.clone(),
2044 priority: if config.priority {
2045 Some(message.priority)
2046 } else {
2047 None
2048 },
2049 enqueued_at_ns: message.enqueued_at_ns,
2050 attempts: message.attempts,
2051 max_attempts: message.max_attempts,
2052 acked: false,
2053 }),
2054 );
2055 let id = store
2056 .insert_auto(queue, entity)
2057 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2058 if let Some(ttl_ms) = config.ttl_ms {
2059 store
2060 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2061 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2062 }
2063 Ok(id)
2064}
2065
2066fn queue_projection_default_columns() -> Vec<String> {
2067 [
2068 "id",
2069 "payload",
2070 "priority",
2071 "attempts",
2072 "last_error",
2073 "enqueued_at",
2074 "available_at",
2075 "dlq",
2076 "tenant",
2077 ]
2078 .into_iter()
2079 .map(str::to_string)
2080 .collect()
2081}
2082
2083fn queue_projection_record(
2084 columns: &[String],
2085 message: &QueueMessageView,
2086 dlq: bool,
2087) -> RedDBResult<UnifiedRecord> {
2088 let mut record = UnifiedRecord::new();
2089 for column in columns {
2090 let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
2091 RedDBError::Query(format!("unknown queue projection column '{}'", column))
2092 })?;
2093 record.set(column, value);
2094 }
2095 Ok(record)
2096}
2097
2098fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
2099 match column {
2100 "id" => Some(Value::text(message_id_string(message.id))),
2101 "payload" => Some(message.payload.clone()),
2102 "priority" => Some(Value::Integer(i64::from(message.priority))),
2103 "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
2104 "last_error" => Some(Value::Null),
2105 "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
2106 "available_at" => Some(Value::UnsignedInteger(
2107 message.available_at_ns.unwrap_or(message.enqueued_at_ns),
2108 )),
2109 "dlq" => Some(Value::Boolean(dlq)),
2110 "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
2111 _ => None,
2112 }
2113}
2114
2115fn queue_message_tenant(payload: &Value) -> Option<Value> {
2116 let Value::Json(bytes) = payload else {
2117 return None;
2118 };
2119 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2120 json.get("tenant")
2121 .and_then(crate::json::Value::as_str)
2122 .map(|tenant| Value::text(tenant.to_string()))
2123}
2124
2125fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
2126 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2127 return false;
2128 };
2129 !manager
2130 .query_all(|entity| {
2131 entity.data.as_row().is_some_and(|row| {
2132 row_text(row, "kind").as_deref() == Some("queue_config")
2133 && row_text(row, "dlq").as_deref() == Some(queue)
2134 })
2135 })
2136 .is_empty()
2137}
2138
2139fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
2140 match filter {
2141 Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
2142 .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
2143 Filter::CompareFields { left, op, right } => {
2144 match (
2145 queue_filter_field_value(message, dlq, left),
2146 queue_filter_field_value(message, dlq, right),
2147 ) {
2148 (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
2149 _ => false,
2150 }
2151 }
2152 Filter::And(left, right) => {
2153 queue_message_matches_filter(message, dlq, left)
2154 && queue_message_matches_filter(message, dlq, right)
2155 }
2156 Filter::Or(left, right) => {
2157 queue_message_matches_filter(message, dlq, left)
2158 || queue_message_matches_filter(message, dlq, right)
2159 }
2160 Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
2161 Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
2162 .is_none_or(|value| matches!(value, Value::Null)),
2163 Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
2164 .is_some_and(|value| !matches!(value, Value::Null)),
2165 Filter::In { field, values } => {
2166 queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
2167 values
2168 .iter()
2169 .any(|value| queue_values_equal(&candidate, value))
2170 })
2171 }
2172 Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
2173 .is_some_and(|candidate| {
2174 queue_compare_values(&candidate, low, CompareOp::Ge)
2175 && queue_compare_values(&candidate, high, CompareOp::Le)
2176 }),
2177 Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
2178 .is_some_and(|value| queue_like_matches(&value, pattern)),
2179 Filter::StartsWith { field, prefix } => {
2180 queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
2181 }
2182 Filter::EndsWith { field, suffix } => {
2183 queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
2184 }
2185 Filter::Contains { field, substring } => {
2186 queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
2187 }
2188 Filter::CompareExpr { .. } => false,
2189 }
2190}
2191
2192fn queue_filter_field_value(
2193 message: &QueueMessageView,
2194 dlq: bool,
2195 field: &FieldRef,
2196) -> Option<Value> {
2197 match field {
2198 FieldRef::TableColumn { table, column } if table.is_empty() => {
2199 queue_projection_value(message, dlq, column)
2200 .or_else(|| queue_payload_field_value(&message.payload, column))
2201 }
2202 FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
2203 .or_else(|| queue_payload_field_value(&message.payload, column)),
2204 _ => None,
2205 }
2206}
2207
2208fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
2209 let Value::Json(bytes) = payload else {
2210 return None;
2211 };
2212 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2213 let value = json.get(field)?;
2214 json_value_to_schema_value(value)
2215}
2216
2217fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
2218 if matches!(value, crate::json::Value::Null) {
2219 Some(Value::Null)
2220 } else if let Some(value) = value.as_bool() {
2221 Some(Value::Boolean(value))
2222 } else if let Some(value) = value.as_i64() {
2223 Some(Value::Integer(value))
2224 } else if let Some(value) = value.as_u64() {
2225 Some(Value::UnsignedInteger(value))
2226 } else if let Some(value) = value.as_f64() {
2227 Some(Value::Float(value))
2228 } else if let Some(value) = value.as_str() {
2229 Some(Value::text(value.to_string()))
2230 } else {
2231 Some(Value::Json(value.to_string_compact().into_bytes()))
2232 }
2233}
2234
2235fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
2236 queue_filter_field_value(message, dlq, field).and_then(|value| match value {
2237 Value::Text(value) => Some(value.to_string()),
2238 Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
2239 Value::Integer(value) => Some(value.to_string()),
2240 Value::UnsignedInteger(value) => Some(value.to_string()),
2241 Value::Float(value) => Some(value.to_string()),
2242 Value::Boolean(value) => Some(value.to_string()),
2243 _ => None,
2244 })
2245}
2246
2247fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
2248 match op {
2249 CompareOp::Eq => queue_values_equal(left, right),
2250 CompareOp::Ne => !queue_values_equal(left, right),
2251 CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
2252 CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
2253 CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
2254 CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
2255 }
2256}
2257
2258fn queue_values_equal(left: &Value, right: &Value) -> bool {
2259 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2260 return (left - right).abs() < f64::EPSILON;
2261 }
2262 match (left, right) {
2263 (Value::Text(left), Value::Text(right)) => left == right,
2264 (Value::Boolean(left), Value::Boolean(right)) => left == right,
2265 _ => left == right,
2266 }
2267}
2268
2269fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
2270 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2271 return left.partial_cmp(&right);
2272 }
2273 match (left, right) {
2274 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
2275 _ => None,
2276 }
2277}
2278
2279fn queue_value_number(value: &Value) -> Option<f64> {
2280 match value {
2281 Value::Integer(value) => Some(*value as f64),
2282 Value::UnsignedInteger(value) => Some(*value as f64),
2283 Value::Float(value) => Some(*value),
2284 Value::Text(value) => value.parse().ok(),
2285 _ => None,
2286 }
2287}
2288
2289fn queue_like_matches(value: &str, pattern: &str) -> bool {
2290 if pattern == "%" {
2291 return true;
2292 }
2293 let starts_wild = pattern.starts_with('%');
2294 let ends_wild = pattern.ends_with('%');
2295 let needle = pattern.trim_matches('%');
2296 match (starts_wild, ends_wild) {
2297 (true, true) => value.contains(needle),
2298 (true, false) => value.ends_with(needle),
2299 (false, true) => value.starts_with(needle),
2300 (false, false) => value == needle,
2301 }
2302}
2303
2304pub(super) fn queue_message_view_by_id(
2305 store: &UnifiedStore,
2306 queue: &str,
2307 message_id: EntityId,
2308) -> RedDBResult<Option<QueueMessageView>> {
2309 let manager = queue_manager(store, queue)?;
2310 Ok(manager
2311 .get(message_id)
2312 .and_then(queue_message_view_from_entity)
2313 .map(|mut view| {
2314 view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2315 view
2316 }))
2317}
2318
2319pub(super) fn sort_queue_messages(
2320 messages: &mut [QueueMessageView],
2321 config: &QueueRuntimeConfig,
2322 side: QueueSide,
2323) {
2324 messages.sort_by(|left, right| {
2325 if config.priority {
2326 right
2327 .priority
2328 .cmp(&left.priority)
2329 .then_with(|| match side {
2330 QueueSide::Left => left.position.cmp(&right.position),
2331 QueueSide::Right => right.position.cmp(&left.position),
2332 })
2333 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2334 } else {
2335 match side {
2336 QueueSide::Left => left.position.cmp(&right.position),
2337 QueueSide::Right => right.position.cmp(&left.position),
2338 }
2339 .then_with(|| left.id.raw().cmp(&right.id.raw()))
2340 }
2341 });
2342}
2343
2344pub(super) fn next_queue_position(
2345 store: &UnifiedStore,
2346 queue: &str,
2347 side: QueueSide,
2348) -> RedDBResult<u64> {
2349 let messages = load_queue_message_views(store, queue)?;
2350 if messages.is_empty() {
2351 return Ok(QUEUE_POSITION_CENTER);
2352 }
2353 match side {
2354 QueueSide::Left => Ok(messages
2355 .iter()
2356 .map(|message| message.position)
2357 .min()
2358 .unwrap_or(QUEUE_POSITION_CENTER)
2359 .saturating_sub(1)),
2360 QueueSide::Right => Ok(messages
2361 .iter()
2362 .map(|message| message.position)
2363 .max()
2364 .unwrap_or(QUEUE_POSITION_CENTER)
2365 .saturating_add(1)),
2366 }
2367}
2368
2369pub(super) fn increment_queue_attempts(
2370 store: &UnifiedStore,
2371 queue: &str,
2372 message_id: EntityId,
2373) -> RedDBResult<u32> {
2374 let manager = queue_manager(store, queue)?;
2375 let mut entity = manager
2376 .get(message_id)
2377 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2378 match &mut entity.data {
2379 EntityData::QueueMessage(message) => {
2380 message.attempts = message.attempts.saturating_add(1);
2381 let attempts = message.attempts;
2382 manager
2383 .update(entity)
2384 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2385 Ok(attempts)
2386 }
2387 _ => Err(RedDBError::Query(format!(
2388 "entity '{}' is not a queue message",
2389 message_id.raw()
2390 ))),
2391 }
2392}
2393
2394pub(super) fn queue_message_attempts(
2395 store: &UnifiedStore,
2396 queue: &str,
2397 message_id: EntityId,
2398) -> RedDBResult<u32> {
2399 Ok(queue_message_data(store, queue, message_id)?.attempts)
2400}
2401
2402pub(super) fn queue_message_max_attempts(
2403 store: &UnifiedStore,
2404 queue: &str,
2405 message_id: EntityId,
2406) -> RedDBResult<u32> {
2407 Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2408}
2409
2410pub(super) fn queue_message_payload(
2411 store: &UnifiedStore,
2412 queue: &str,
2413 message_id: EntityId,
2414) -> RedDBResult<Value> {
2415 Ok(queue_message_data(store, queue, message_id)?.payload)
2416}
2417
2418pub(super) fn queue_message_pending_any(
2419 store: &UnifiedStore,
2420 queue: &str,
2421 message_id: EntityId,
2422) -> RedDBResult<bool> {
2423 Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2424}
2425
2426pub(super) fn queue_message_pending_for_group(
2427 store: &UnifiedStore,
2428 queue: &str,
2429 group: &str,
2430 message_id: EntityId,
2431) -> RedDBResult<bool> {
2432 Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2433}
2434
2435pub(super) fn queue_message_acked_for_group(
2436 store: &UnifiedStore,
2437 queue: &str,
2438 group: &str,
2439 message_id: EntityId,
2440) -> RedDBResult<bool> {
2441 Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2442}
2443
2444fn queue_manager(
2445 store: &UnifiedStore,
2446 queue: &str,
2447) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2448 store
2449 .get_collection(queue)
2450 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2451}
2452
2453pub(super) fn queue_message_data(
2454 store: &UnifiedStore,
2455 queue: &str,
2456 message_id: EntityId,
2457) -> RedDBResult<QueueMessageData> {
2458 let manager = queue_manager(store, queue)?;
2459 let entity = manager
2460 .get(message_id)
2461 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2462 match entity.data {
2463 EntityData::QueueMessage(message) => Ok(message),
2464 _ => Err(RedDBError::Query(format!(
2465 "entity '{}' is not a queue message",
2466 message_id.raw()
2467 ))),
2468 }
2469}
2470
2471fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2472 let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2473 store
2474 .insert_auto(
2475 QUEUE_META_COLLECTION,
2476 UnifiedEntity::new(
2477 EntityId::new(0),
2478 EntityKind::TableRow {
2479 table: Arc::from(QUEUE_META_COLLECTION),
2480 row_id: 0,
2481 },
2482 EntityData::Row(RowData {
2483 columns: Vec::new(),
2484 named: Some(fields),
2485 schema: None,
2486 }),
2487 ),
2488 )
2489 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2490 Ok(())
2491}
2492
2493pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2494 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2495 return;
2496 };
2497 let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2498 for row in rows {
2499 let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2500 }
2501}
2502
2503pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2504 let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2505}
2506
2507fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2508 format!("{queue}:{}", message_id.raw())
2509}
2510
2511pub(super) fn queue_message_lock_handle(
2512 runtime: &RedDBRuntime,
2513 queue: &str,
2514 message_id: EntityId,
2515) -> Arc<parking_lot::Mutex<()>> {
2516 let key = queue_message_lock_key(queue, message_id);
2517 if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2518 return lock;
2519 }
2520
2521 let mut locks = runtime.inner.queue_message_locks.write();
2522 locks
2523 .entry(key)
2524 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2525 .clone()
2526}
2527
2528pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2529 runtime
2530 .inner
2531 .queue_message_locks
2532 .write()
2533 .remove(&queue_message_lock_key(queue, message_id));
2534}
2535
2536fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2537 let raw = value.strip_prefix('e').unwrap_or(value);
2538 raw.parse::<u64>()
2539 .map(EntityId::new)
2540 .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2541}
2542
2543pub(super) fn resolve_ack_nack_handle(
2549 store: &UnifiedStore,
2550 queue: &str,
2551 group_hint: &str,
2552 message_id_hint: &str,
2553 delivery_id: Option<&str>,
2554) -> RedDBResult<(String, EntityId)> {
2555 if let Some(did) = delivery_id {
2556 return resolve_delivery_id(store, queue, did);
2557 }
2558 if group_hint.is_empty() || message_id_hint.is_empty() {
2559 return Err(RedDBError::Query(
2560 "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2561 .to_string(),
2562 ));
2563 }
2564 log_tuple_deprecation(queue);
2565 let entity = parse_message_id(message_id_hint)?;
2566 Ok((group_hint.to_string(), entity))
2567}
2568
2569fn resolve_delivery_id(
2570 store: &UnifiedStore,
2571 queue: &str,
2572 delivery_id: &str,
2573) -> RedDBResult<(String, EntityId)> {
2574 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2575 return Err(RedDBError::Query(format!(
2576 "delivery_id '{}' does not resolve to a live pending delivery",
2577 delivery_id
2578 )));
2579 };
2580 for entity in manager.query_all(|entity| {
2581 entity.data.as_row().is_some_and(|row| {
2582 row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2583 && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2584 })
2585 }) {
2586 if let Some(row) = entity.data.as_row() {
2587 let row_queue = row_text(row, "queue").unwrap_or_default();
2588 let row_group = row_text(row, "group").unwrap_or_default();
2589 let row_message = row_u64(row, "message_id").unwrap_or(0);
2590 if row_queue != queue {
2591 return Err(RedDBError::Query(format!(
2592 "delivery_id '{}' belongs to queue '{}', not '{}'",
2593 delivery_id, row_queue, queue
2594 )));
2595 }
2596 return Ok((row_group, EntityId::new(row_message)));
2597 }
2598 }
2599 Err(RedDBError::Query(format!(
2600 "delivery_id '{}' does not resolve to a live pending delivery",
2601 delivery_id
2602 )))
2603}
2604
2605fn log_tuple_deprecation(queue: &str) {
2608 use std::sync::atomic::Ordering;
2609 use std::sync::{Mutex, OnceLock};
2610 use std::time::Instant;
2611
2612 static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2613 const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2614
2615 let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2616 let key = (super::impl_core::current_connection_id(), queue.to_string());
2617 let now = Instant::now();
2618 let mut guard = match map.lock() {
2619 Ok(g) => g,
2620 Err(_) => return,
2621 };
2622 let should_emit =
2623 !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2624 if should_emit {
2625 guard.insert(key.clone(), now);
2626 drop(guard);
2627 TUPLE_DEPRECATION_EMITS.fetch_add(1, Ordering::Relaxed);
2628 tracing::warn!(
2629 target: "reddb::queue_lifecycle",
2630 queue = queue,
2631 connection_id = key.0,
2632 "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2633 switch to the server-issued delivery_id (ADR 0026). \
2634 The tuple path will be removed one minor release after introduction.",
2635 );
2636 }
2637}
2638
2639pub static TUPLE_DEPRECATION_EMITS: std::sync::atomic::AtomicU64 =
2645 std::sync::atomic::AtomicU64::new(0);
2646
2647fn message_id_string(message_id: EntityId) -> String {
2648 message_id.raw().to_string()
2649}
2650
2651pub(crate) fn pending_counts_by_group(
2656 store: &UnifiedStore,
2657) -> std::collections::BTreeMap<(String, String), u64> {
2658 let mut counts: std::collections::BTreeMap<(String, String), u64> =
2659 std::collections::BTreeMap::new();
2660 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2661 return counts;
2662 };
2663 for entity in manager.query_all(|entity| {
2664 entity
2665 .data
2666 .as_row()
2667 .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2668 }) {
2669 if let Some(row) = entity.data.as_row() {
2670 let queue = row_text(row, "queue");
2671 let group = row_text(row, "group");
2672 if let (Some(q), Some(g)) = (queue, group) {
2673 *counts.entry((q, g)).or_insert(0) += 1;
2674 }
2675 }
2676 }
2677 counts
2678}
2679
2680pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2681 match row.get_field(field)?.clone() {
2682 Value::Text(value) => Some(value.to_string()),
2683 Value::NodeRef(value) => Some(value),
2684 Value::EdgeRef(value) => Some(value),
2685 Value::TableRef(value) => Some(value),
2686 _ => None,
2687 }
2688}
2689
2690pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2691 match row.get_field(field)?.clone() {
2692 Value::UnsignedInteger(value) => Some(value),
2693 Value::Integer(value) if value >= 0 => Some(value as u64),
2694 Value::Float(value) if value >= 0.0 => Some(value as u64),
2695 Value::Text(value) => value.parse().ok(),
2696 _ => None,
2697 }
2698}
2699
2700fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2701 match row.get_field(field)?.clone() {
2702 Value::Boolean(value) => Some(value),
2703 Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2704 "true" => Some(true),
2705 "false" => Some(false),
2706 _ => None,
2707 },
2708 _ => None,
2709 }
2710}
2711
2712fn queue_collection_contract(
2713 name: &str,
2714 priority: bool,
2715 ttl_ms: Option<u64>,
2716) -> crate::physical::CollectionContract {
2717 let now = current_unix_ms();
2718 let mut context_index_fields = Vec::new();
2719 if priority {
2720 context_index_fields.push("priority".to_string());
2721 }
2722
2723 crate::physical::CollectionContract {
2724 name: name.to_string(),
2725 declared_model: crate::catalog::CollectionModel::Queue,
2726 schema_mode: crate::catalog::SchemaMode::Dynamic,
2727 origin: crate::physical::ContractOrigin::Explicit,
2728 version: 1,
2729 created_at_unix_ms: now,
2730 updated_at_unix_ms: now,
2731 default_ttl_ms: ttl_ms,
2732 vector_dimension: None,
2733 vector_metric: None,
2734 context_index_fields,
2735 declared_columns: Vec::new(),
2736 table_def: None,
2737 timestamps_enabled: false,
2738 context_index_enabled: false,
2739 metrics_raw_retention_ms: None,
2740 metrics_rollup_policies: Vec::new(),
2741 metrics_tenant_identity: None,
2742 metrics_namespace: None,
2743 append_only: true,
2747 subscriptions: Vec::new(),
2748 analytics_config: Vec::new(),
2749 session_key: None,
2750 session_gap_ms: None,
2751 retention_duration_ms: None,
2752 }
2753}
2754
2755fn current_unix_ms() -> u128 {
2756 std::time::SystemTime::now()
2757 .duration_since(std::time::UNIX_EPOCH)
2758 .unwrap_or_default()
2759 .as_millis()
2760}
2761
2762pub(super) fn now_ns() -> u64 {
2763 std::time::SystemTime::now()
2764 .duration_since(std::time::UNIX_EPOCH)
2765 .unwrap_or_default()
2766 .as_nanos() as u64
2767}
2768
2769pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2770 queue_message_metadata(Some(ttl_ms), None)
2771}
2772
2773pub(super) fn queue_message_metadata(
2779 ttl_ms: Option<u64>,
2780 available_at_ns: Option<u64>,
2781) -> Metadata {
2782 let mut fields = HashMap::new();
2783 if let Some(ttl_ms) = ttl_ms {
2784 fields.insert(
2785 "_ttl_ms".to_string(),
2786 if ttl_ms <= i64::MAX as u64 {
2787 MetadataValue::Int(ttl_ms as i64)
2788 } else {
2789 MetadataValue::Timestamp(ttl_ms)
2790 },
2791 );
2792 }
2793 if let Some(at_ns) = available_at_ns {
2794 fields.insert(
2795 "_available_at_ns".to_string(),
2796 MetadataValue::Timestamp(at_ns),
2797 );
2798 }
2799 Metadata::with_fields(fields)
2800}
2801
2802pub(super) fn earliest_future_available_at(store: &UnifiedStore, queue: &str) -> Option<u64> {
2810 let now_ns = now_ns();
2811 let views = load_queue_message_views_with_runtime(None, store, queue).ok()?;
2812 views
2813 .iter()
2814 .filter_map(|v| v.available_at_ns)
2815 .filter(|at| *at > now_ns)
2816 .min()
2817}
2818
2819pub(super) fn set_message_available_at_ns(
2830 store: &UnifiedStore,
2831 queue: &str,
2832 message_id: EntityId,
2833 available_at_ns: Option<u64>,
2834 fallback_ttl_ms: Option<u64>,
2835) -> RedDBResult<()> {
2836 let existing_ttl_ms = store
2837 .get_metadata(queue, message_id)
2838 .and_then(|md| match md.get("_ttl_ms")? {
2839 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
2840 MetadataValue::Timestamp(t) => Some(*t),
2841 _ => None,
2842 })
2843 .or(fallback_ttl_ms);
2844 let metadata = queue_message_metadata(existing_ttl_ms, available_at_ns);
2845 match store.set_metadata(queue, message_id, metadata) {
2846 Ok(()) => Ok(()),
2847 Err(crate::storage::StoreError::CollectionNotFound(_)) => Ok(()),
2848 Err(err) => Err(RedDBError::Internal(err.to_string())),
2849 }
2850}
2851
2852pub(super) fn read_message_available_at_ns(
2856 store: &UnifiedStore,
2857 queue: &str,
2858 message_id: EntityId,
2859) -> Option<u64> {
2860 let md = store.get_metadata(queue, message_id)?;
2861 match md.get("_available_at_ns")? {
2862 MetadataValue::Timestamp(t) => Some(*t),
2863 MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
2864 _ => None,
2865 }
2866}
2867
2868fn estimate_payload_bytes(payload: &Value) -> u64 {
2870 match payload {
2871 Value::Json(v) => v.len() as u64,
2872 Value::Text(s) => s.len() as u64,
2873 _ => 64,
2874 }
2875}
2876
2877#[cfg(test)]
2878mod presence_integration_tests {
2879 use super::*;
2880 use crate::storage::queue::presence::{PresenceState, DEFAULT_PRESENCE_TTL_MS};
2881 use crate::{RedDBOptions, RedDBRuntime};
2882
2883 #[test]
2887 fn queue_read_emits_consumer_presence_heartbeat() {
2888 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2889 rt.execute_query("CREATE QUEUE tasks").unwrap();
2890 rt.execute_query("QUEUE GROUP CREATE tasks workers")
2891 .unwrap();
2892 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2893 rt.execute_query("QUEUE READ tasks GROUP workers CONSUMER w1")
2894 .unwrap();
2895
2896 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
2897 assert_eq!(snap.len(), 1, "exactly one heartbeat recorded");
2898 let row = &snap[0];
2899 assert_eq!(row.queue, "tasks");
2900 assert_eq!(row.group, "workers");
2901 assert_eq!(row.consumer, "w1");
2902 assert_eq!(row.state, PresenceState::Active);
2903
2904 let counts = rt.queue_active_consumer_counts(DEFAULT_PRESENCE_TTL_MS);
2905 assert_eq!(
2906 counts[&("tasks".to_string(), "workers".to_string())],
2907 1,
2908 "active count reflects the live consumer"
2909 );
2910 }
2911
2912 #[test]
2916 fn empty_queue_read_still_heartbeats() {
2917 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2918 rt.execute_query("CREATE QUEUE empty_q").unwrap();
2919 rt.execute_query("QUEUE GROUP CREATE empty_q workers")
2920 .unwrap();
2921 rt.execute_query("QUEUE READ empty_q GROUP workers CONSUMER w1")
2923 .unwrap();
2924
2925 let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
2926 assert_eq!(
2927 snap.len(),
2928 1,
2929 "empty read still registers consumer presence"
2930 );
2931 assert_eq!(snap[0].state, PresenceState::Active);
2932 assert_eq!(
2933 snap[0].lease_count, 0,
2934 "no messages delivered → zero leases"
2935 );
2936 }
2937}