Skip to main content

reddb_server/runtime/
impl_queue.rs

1//! Queue DDL and command execution
2
3use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::runtime::impl_core::current_auth_identity;
9use crate::storage::queue::QueueMode;
10use crate::storage::unified::entity::{QueueMessageData, RowData};
11use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
12
13use super::*;
14
15use super::primary_queue_store::PrimaryQueueStore;
16use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
17use crate::storage::queue::lifecycle::{
18    QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
19};
20
21/// Build a [`QueueLifecycle`] backed by a fresh [`PrimaryQueueStore`] for
22/// the given `queue`, plus a `QueueTxn` bound to the live runtime
23/// connection. The store inside the lifecycle and the standalone
24/// [`PrimaryQueueStore`] returned for ack/nack lookups share the same
25/// underlying [`UnifiedStore`] — calls against either are observable on
26/// the other.
27pub(super) fn runtime_lifecycle(
28    runtime: &RedDBRuntime,
29    queue: &str,
30) -> (
31    QueueLifecycle<PrimaryQueueStore>,
32    PrimaryQueueStore,
33    QueueTxn,
34) {
35    let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
36    let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
37    let txn = primary_for_lifecycle.new_txn();
38    let cfg = primary_for_lifecycle.lifecycle_config(queue);
39    (
40        QueueLifecycle::new(primary_for_lifecycle, cfg),
41        primary_for_lookup,
42        txn,
43    )
44}
45
46/// Slice C of PRD #718 — error surfaced to callers when the wait
47/// registry is cancelled (server shutdown) while a `QUEUE READ … WAIT`
48/// is parked. Kept as a plain `RedDBError::Query` so transports
49/// inherit the message unchanged — there is no separate `Cancelled`
50/// variant on the public error today.
51pub(crate) const QUEUE_READ_WAIT_CANCELLED: &str =
52    "QUEUE READ WAIT cancelled — server shutting down";
53
54/// Slice B of PRD #718 — `red.config` key naming the maximum WAIT
55/// budget the runtime will honour. Values above the cap are rejected
56/// before any waiter is registered.
57pub(crate) const QUEUE_MAX_WAIT_MS_CONFIG_KEY: &str = "red.config.queue.max_wait_ms";
58
59/// Default cap when the operator has not set
60/// [`QUEUE_MAX_WAIT_MS_CONFIG_KEY`] — 60 seconds, in milliseconds.
61pub(crate) const QUEUE_MAX_WAIT_MS_DEFAULT: u64 = 60_000;
62
63/// Slice C of PRD #718 — scope key for the queue wait registry.
64/// Today every connection in the process shares a single namespace;
65/// the helper exists so multi-tenant scoping (e.g. tenant id) can be
66/// threaded through later without touching every call site.
67pub(super) fn queue_wait_scope() -> String {
68    crate::runtime::impl_core::current_tenant().unwrap_or_default()
69}
70
71/// Convert a lifecycle `QueueSide` view into the AST flavour we accept
72/// from `QueueCommand` callers. Both enums are isomorphic but live in
73/// different modules.
74fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
75    use crate::storage::query::ast::QueueSide as Ast;
76    match side {
77        Ast::Left => LcQueueSide::Left,
78        Ast::Right => LcQueueSide::Right,
79    }
80}
81
82/// Map a `QueueStoreError` (returned by lifecycle methods) onto the
83/// runtime-facing `RedDBError`. Mirrors the wire-error shapes the legacy
84/// `queue_delivery::*` helpers produced.
85fn map_qse(err: QueueStoreError) -> RedDBError {
86    match err {
87        QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
88            "delivery_id '{id}' does not resolve to a live pending delivery"
89        )),
90        QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
91        QueueStoreError::ReplicaImmutable => {
92            RedDBError::Internal("replica QueueStore is immutable".to_string())
93        }
94    }
95}
96
97// ---------------------------------------------------------------------------
98// Outbox metrics (exposed via /metrics)
99// ---------------------------------------------------------------------------
100
101/// Total event push attempts that failed (queue full or other error) and
102/// triggered DLQ routing.
103pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
104
105/// Total events routed to the dead-letter queue.
106pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
107
108/// Total events successfully enqueued to their target queue.
109pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
110
111/// Warn when total estimated outbox payload bytes exceed this value (1 GiB).
112const OUTBOX_WARN_BYTES: u64 = 1 << 30;
113
114/// Route all new events to DLQ when estimated outbox exceeds this value (10 GiB).
115const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
116
117/// Running estimate of bytes pending in event queues (approximate; not decremented on consume).
118static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
119
120const QUEUE_META_COLLECTION: &str = "red_queue_meta";
121const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
122const WORK_DEFAULT_GROUP: &str = "_work_default";
123const FANOUT_GROUP_PREFIX: &str = "_fanout_";
124
125#[derive(Debug, Clone)]
126pub(super) struct QueueRuntimeConfig {
127    pub(super) mode: QueueMode,
128    pub(super) priority: bool,
129    pub(super) max_size: Option<usize>,
130    pub(super) ttl_ms: Option<u64>,
131    pub(super) dlq: Option<String>,
132    pub(super) max_attempts: u32,
133    pub(super) lock_deadline_ms: u64,
134    pub(super) in_flight_cap_per_group: u32,
135    /// Default retry delay (issue #723) applied to NACK-requeued
136    /// messages before they become re-deliverable. `None` keeps the
137    /// pre-#723 immediate-requeue behaviour. Overridden per-failure by
138    /// an authorized `NACK ... WITH DELAY <duration>`.
139    pub(super) retry_delay_ms: Option<u64>,
140}
141
142#[derive(Debug, Clone)]
143struct QueueGroupEntry {
144    entity_id: EntityId,
145    group: String,
146}
147
148#[derive(Debug, Clone)]
149pub(super) struct QueuePendingEntry {
150    pub(super) entity_id: EntityId,
151    group: String,
152    pub(super) message_id: EntityId,
153    consumer: String,
154    pub(super) delivered_at_ns: u64,
155    pub(super) delivery_count: u32,
156}
157
158#[derive(Debug, Clone)]
159pub(super) struct QueueAckEntry {
160    entity_id: EntityId,
161    group: String,
162    pub(super) message_id: EntityId,
163}
164
165#[derive(Debug, Clone)]
166pub(super) struct QueueMessageView {
167    pub(super) id: EntityId,
168    position: u64,
169    priority: i32,
170    pub(super) payload: Value,
171    attempts: u32,
172    pub(super) max_attempts: u32,
173    enqueued_at_ns: u64,
174    /// First-delivery instant for delayed messages (issue #722). `None`
175    /// means immediate availability. Sourced from the
176    /// `_available_at_ns` metadata field, populated on push.
177    pub(super) available_at_ns: Option<u64>,
178}
179
180impl QueueMessageView {
181    /// Whether this message is currently deliverable. Messages whose
182    /// `available_at_ns` lies in the future remain durable and
183    /// inspectable but are filtered out of `QUEUE READ` / `QUEUE POP`
184    /// projections.
185    pub(super) fn is_available_now(&self) -> bool {
186        match self.available_at_ns {
187            Some(at) => at <= now_ns(),
188            None => true,
189        }
190    }
191}
192
193impl RedDBRuntime {
194    /// Slice C of PRD #718 — non-blocking `group_read` plus optional
195    /// `WAIT <duration>` retry. When `wait_ms` is `None` this is the
196    /// pre-slice-C synchronous read. When `Some`, an immediate empty
197    /// projection parks the caller on the shared
198    /// [`crate::runtime::queue_wait_registry::QueueWaitRegistry`] and
199    /// retries on wake until the deadline. Timeout returns an empty
200    /// projection (zero records, no error). Shutdown cancellation
201    /// returns [`QUEUE_READ_WAIT_CANCELLED`].
202    pub(super) fn group_read_with_optional_wait(
203        &self,
204        queue: &str,
205        group: &str,
206        consumer: &str,
207        count: usize,
208        wait_ms: Option<u64>,
209    ) -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
210        let do_read =
211            |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
212                let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
213                lifecycle
214                    .group_read(&txn, queue, group, consumer, count)
215                    .map_err(map_qse)
216            };
217
218        let delivered = do_read(self)?;
219        let Some(wait_ms) = wait_ms else {
220            return Ok(delivered);
221        };
222        if !delivered.is_empty() {
223            return Ok(delivered);
224        }
225        // Empty under WAIT: park on the registry. WAIT 0 collapses to
226        // a single re-probe of the registry's current state — useful
227        // for tests but the timeout path returns immediately.
228        //
229        // Telemetry (slice D / PRD #718 / #729): we record exactly one
230        // `wait_started` increment at entry, and exactly one terminal
231        // outcome increment + histogram observation at exit, for the
232        // (scope, queue) labels. The histogram measures wall-clock
233        // started→resolved across all re-park iterations of this call.
234        let registry = self.queue_wait_registry();
235        let scope = queue_wait_scope();
236        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
237        let telemetry = self.queue_telemetry();
238        telemetry.record_wait_started(&scope, queue);
239        let wait_start = std::time::Instant::now();
240        let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
241            let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
242            telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
243        };
244        loop {
245            // Snapshot BEFORE the re-probe so a notify that fires
246            // between the probe and the park bumps the generation and
247            // wait_until returns Woken without ever blocking.
248            let snapshot = registry.snapshot(&scope, queue);
249            let delivered = do_read(self)?;
250            if !delivered.is_empty() {
251                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
252                return Ok(delivered);
253            }
254            if registry.is_cancelled() {
255                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
256                return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
257            }
258            if std::time::Instant::now() >= deadline {
259                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
260                return Ok(Vec::new());
261            }
262            // Issue #722: a delayed message becomes deliverable when its
263            // `available_at_ns` passes, but the registry only wakes on
264            // producer commits — a quiet queue with a future due-time
265            // would otherwise sit on the condvar until the user budget
266            // expired. Cap the park horizon at the soonest future
267            // `available_at_ns` so the next loop iteration probes the
268            // queue at-or-just-after the message becomes due. A
269            // `Timeout` from the capped park is not the final answer; we
270            // loop and re-probe before deciding the user budget is up.
271            let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
272                Some(at_ns) => {
273                    let now_ns = now_ns();
274                    if at_ns <= now_ns {
275                        // Already due; re-probe immediately.
276                        deadline.min(std::time::Instant::now())
277                    } else {
278                        let wait_ns = at_ns - now_ns;
279                        let due_instant =
280                            std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
281                        deadline.min(due_instant)
282                    }
283                }
284                None => deadline,
285            };
286            match registry.wait_until(&snapshot, park_deadline) {
287                crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
288                crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
289                    // If this was the user-supplied deadline, give up;
290                    // otherwise loop and re-probe (a delayed message may
291                    // have just become due).
292                    if std::time::Instant::now() >= deadline {
293                        observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
294                        return Ok(Vec::new());
295                    }
296                    continue;
297                }
298                crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
299                    observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
300                    return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
301                }
302            }
303        }
304    }
305
306    pub(crate) fn enqueue_event_payload(
307        &self,
308        queue: &str,
309        payload: Value,
310    ) -> RedDBResult<EntityId> {
311        let store = self.inner.db.store();
312        // Auto-create the queue if it does not exist yet.
313        if store.get_collection(queue).is_none() {
314            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
315        }
316
317        // Estimate payload bytes for outbox watermark checks.
318        let payload_bytes = estimate_payload_bytes(&payload);
319        let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
320
321        // Hard limit: route directly to DLQ without even trying.
322        if outbox_bytes > OUTBOX_MAX_BYTES {
323            OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
324            EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
325            return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
326        }
327
328        // Soft limit: warn once per crossing.
329        if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
330            tracing::warn!(
331                outbox_bytes,
332                warn_threshold = OUTBOX_WARN_BYTES,
333                "event outbox approaching capacity warning threshold"
334            );
335            crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
336                queue: queue.to_string(),
337                dlq: format!("{queue}_outbox_dlq"),
338                reason: "outbox_warn_bytes_exceeded".to_string(),
339            }
340            .emit_global();
341        }
342
343        let config = load_queue_config(store.as_ref(), queue);
344
345        // If the target queue has a max_size and is full, route to DLQ.
346        if let Some(max_size) = config.max_size {
347            let current_len = load_queue_message_views(store.as_ref(), queue)
348                .unwrap_or_default()
349                .len();
350            if current_len >= max_size {
351                OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
352                EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
353                return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
354            }
355            // Warn at 80% capacity.
356            if current_len * 10 >= max_size * 8 {
357                tracing::warn!(
358                    queue = %queue,
359                    size = current_len,
360                    max = max_size,
361                    "event target queue near capacity"
362                );
363            }
364        }
365
366        let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
367        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
368        Ok(id)
369    }
370
371    /// Route a failed event to `<queue>_outbox_dlq`, auto-creating it if needed.
372    fn route_event_to_outbox_dlq(
373        &self,
374        queue: &str,
375        payload: Value,
376        reason: &str,
377    ) -> RedDBResult<EntityId> {
378        let dlq_name = format!("{queue}_outbox_dlq");
379        EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
380
381        crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
382            queue: queue.to_string(),
383            dlq: dlq_name.clone(),
384            reason: reason.to_string(),
385        }
386        .emit_global();
387
388        let store = self.inner.db.store();
389        if store.get_collection(&dlq_name).is_none() {
390            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
391        }
392        let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
393        let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
394        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
395        Ok(id)
396    }
397
398    /// Low-level event message insert — no size checks, no DLQ routing.
399    fn enqueue_event_payload_raw(
400        &self,
401        store: &UnifiedStore,
402        queue: &str,
403        config: &QueueRuntimeConfig,
404        payload: Value,
405    ) -> RedDBResult<EntityId> {
406        let position = next_queue_position(store, queue, QueueSide::Right)?;
407        let mut entity = UnifiedEntity::new(
408            EntityId::new(0),
409            EntityKind::QueueMessage {
410                queue: queue.to_string(),
411                position,
412            },
413            EntityData::QueueMessage(QueueMessageData {
414                payload,
415                priority: None,
416                enqueued_at_ns: now_ns(),
417                attempts: 0,
418                max_attempts: config.max_attempts,
419                acked: false,
420            }),
421        );
422        if let Some(xid) = self.current_xid() {
423            entity.set_xmin(xid);
424        }
425        let id = store
426            .insert_auto(queue, entity)
427            .map_err(|err| RedDBError::Internal(err.to_string()))?;
428        if let Some(ttl_ms) = config.ttl_ms {
429            store
430                .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
431                .map_err(|err| RedDBError::Internal(err.to_string()))?;
432        }
433        self.invalidate_result_cache_for_table(queue);
434        Ok(id)
435    }
436
437    pub fn execute_create_queue(
438        &self,
439        raw_query: &str,
440        query: &CreateQueueQuery,
441    ) -> RedDBResult<RuntimeQueryResult> {
442        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
443        if query.dlq.as_deref() == Some(query.name.as_str()) {
444            return Err(RedDBError::Query(
445                "dead-letter queue must be different from the source queue".to_string(),
446            ));
447        }
448
449        let store = self.inner.db.store();
450        let exists = store.get_collection(&query.name).is_some();
451        if exists {
452            if query.if_not_exists {
453                return Ok(RuntimeQueryResult::ok_message(
454                    raw_query.to_string(),
455                    &format!("queue '{}' already exists", query.name),
456                    "create",
457                ));
458            }
459            return Err(RedDBError::Query(format!(
460                "queue '{}' already exists",
461                query.name
462            )));
463        }
464
465        store
466            .create_collection(&query.name)
467            .map_err(|err| RedDBError::Internal(err.to_string()))?;
468        if let Some(ttl_ms) = query.ttl_ms {
469            self.inner
470                .db
471                .set_collection_default_ttl_ms(&query.name, ttl_ms);
472        }
473        self.inner
474            .db
475            .save_collection_contract(queue_collection_contract(
476                &query.name,
477                query.priority,
478                query.ttl_ms,
479            ))
480            .map_err(|err| RedDBError::Internal(err.to_string()))?;
481        save_queue_config(
482            store.as_ref(),
483            &query.name,
484            &QueueRuntimeConfig {
485                mode: query.mode,
486                priority: query.priority,
487                max_size: query.max_size,
488                ttl_ms: query.ttl_ms,
489                dlq: query.dlq.clone(),
490                max_attempts: query.max_attempts,
491                lock_deadline_ms: query.lock_deadline_ms,
492                in_flight_cap_per_group: query.in_flight_cap_per_group,
493                retry_delay_ms: query.retry_delay_ms,
494            },
495        )?;
496
497        if let Some(dlq) = &query.dlq {
498            if store.get_collection(dlq).is_none() {
499                store
500                    .create_collection(dlq)
501                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
502                self.inner
503                    .db
504                    .save_collection_contract(queue_collection_contract(dlq, false, None))
505                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
506            }
507        }
508
509        self.invalidate_result_cache();
510        self.inner
511            .db
512            .persist_metadata()
513            .map_err(|err| RedDBError::Internal(err.to_string()))?;
514        // Issue #120 — feed the queue into the schema-vocabulary so
515        // AskPipeline (#121) can resolve queue references. Queues
516        // have an opaque payload column, so we expose `payload` and
517        // (when configured) the DLQ partner as type-tag context.
518        let mut type_tags = Vec::new();
519        if let Some(dlq) = &query.dlq {
520            type_tags.push(format!("dlq:{}", dlq));
521        }
522        self.schema_vocabulary_apply(
523            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
524                collection: query.name.clone(),
525                columns: vec!["payload".to_string()],
526                type_tags,
527                description: None,
528            },
529        );
530
531        let mut msg = format!("queue '{}' created", query.name);
532        msg.push_str(&format!(" (mode={})", query.mode.as_str()));
533        if query.priority {
534            msg.push_str(" (priority)");
535        }
536        if let Some(max_size) = query.max_size {
537            msg.push_str(&format!(" (max_size={max_size})"));
538        }
539        if let Some(ttl_ms) = query.ttl_ms {
540            msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
541        }
542        if let Some(dlq) = &query.dlq {
543            msg.push_str(&format!(
544                " (dlq={dlq}, max_attempts={})",
545                query.max_attempts
546            ));
547        }
548
549        Ok(RuntimeQueryResult::ok_message(
550            raw_query.to_string(),
551            &msg,
552            "create",
553        ))
554    }
555
556    pub fn execute_alter_queue(
557        &self,
558        raw_query: &str,
559        query: &AlterQueueQuery,
560    ) -> RedDBResult<RuntimeQueryResult> {
561        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
562        let store = self.inner.db.store();
563        ensure_queue_exists(store.as_ref(), &query.name)?;
564
565        let mut config = load_queue_config(store.as_ref(), &query.name);
566        let mut summary: Vec<String> = Vec::new();
567
568        if let Some(new_mode) = query.mode {
569            let pending =
570                load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
571            if !pending.is_empty() {
572                tracing::warn!(
573                    queue = %query.name,
574                    pending_count = pending.len(),
575                    new_mode = %new_mode.as_str(),
576                    "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
577                     new reads use {}",
578                    pending.len(),
579                    new_mode.as_str(),
580                );
581            }
582            config.mode = new_mode;
583            summary.push(format!("mode={}", new_mode.as_str()));
584        }
585        if let Some(max_attempts) = query.max_attempts {
586            config.max_attempts = max_attempts;
587            summary.push(format!("max_attempts={max_attempts}"));
588        }
589        if let Some(lock_deadline_ms) = query.lock_deadline_ms {
590            config.lock_deadline_ms = lock_deadline_ms;
591            summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
592        }
593        if let Some(in_flight_cap) = query.in_flight_cap_per_group {
594            config.in_flight_cap_per_group = in_flight_cap;
595            summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
596        }
597        if let Some(dlq) = &query.dlq {
598            if dlq == &query.name {
599                return Err(RedDBError::Query(
600                    "dead-letter queue must be different from the source queue".to_string(),
601                ));
602            }
603            config.dlq = Some(dlq.clone());
604            summary.push(format!("dlq={dlq}"));
605        }
606        if let Some(retry_delay_ms) = query.retry_delay_ms {
607            config.retry_delay_ms = if retry_delay_ms == 0 {
608                None
609            } else {
610                Some(retry_delay_ms)
611            };
612            summary.push(format!("retry_delay_ms={retry_delay_ms}"));
613        }
614
615        save_queue_config(store.as_ref(), &query.name, &config)?;
616
617        self.invalidate_result_cache();
618        self.inner
619            .db
620            .persist_metadata()
621            .map_err(|err| RedDBError::Internal(err.to_string()))?;
622
623        Ok(RuntimeQueryResult::ok_message(
624            raw_query.to_string(),
625            &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
626            "alter",
627        ))
628    }
629
630    pub fn execute_drop_queue(
631        &self,
632        raw_query: &str,
633        query: &DropQueueQuery,
634    ) -> RedDBResult<RuntimeQueryResult> {
635        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
636        let store = self.inner.db.store();
637        if super::impl_ddl::is_system_schema_name(&query.name) {
638            return Err(RedDBError::Query("system schema is read-only".to_string()));
639        }
640        if store.get_collection(&query.name).is_none() {
641            if query.if_exists {
642                return Ok(RuntimeQueryResult::ok_message(
643                    raw_query.to_string(),
644                    &format!("queue '{}' does not exist", query.name),
645                    "drop",
646                ));
647            }
648            return Err(RedDBError::NotFound(format!(
649                "queue '{}' not found",
650                query.name
651            )));
652        }
653        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
654            &query.name,
655            &self.inner.db.catalog_model_snapshot(),
656        )?;
657        crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
658            crate::catalog::CollectionModel::Queue,
659            actual,
660        )?;
661
662        store
663            .drop_collection(&query.name)
664            .map_err(|err| RedDBError::Internal(err.to_string()))?;
665        self.inner.db.clear_collection_default_ttl_ms(&query.name);
666        self.inner
667            .db
668            .remove_collection_contract(&query.name)
669            .map_err(|err| RedDBError::Internal(err.to_string()))?;
670        remove_queue_metadata(store.as_ref(), &query.name);
671        self.invalidate_result_cache();
672        self.inner
673            .db
674            .persist_metadata()
675            .map_err(|err| RedDBError::Internal(err.to_string()))?;
676        // Issue #120 — invalidate the schema-vocabulary entry.
677        self.schema_vocabulary_apply(
678            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
679                collection: query.name.clone(),
680            },
681        );
682
683        Ok(RuntimeQueryResult::ok_message(
684            raw_query.to_string(),
685            &format!("queue '{}' dropped", query.name),
686            "drop",
687        ))
688    }
689
690    pub fn execute_queue_command(
691        &self,
692        raw_query: &str,
693        cmd: &QueueCommand,
694    ) -> RedDBResult<RuntimeQueryResult> {
695        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
696        match cmd {
697            QueueCommand::Push {
698                queue,
699                value,
700                side,
701                priority,
702                available,
703            } => {
704                let store = self.inner.db.store();
705                ensure_queue_exists(store.as_ref(), queue)?;
706                let config = load_queue_config(store.as_ref(), queue);
707                if priority.is_some() && !config.priority {
708                    return Err(RedDBError::Query(format!(
709                        "queue '{}' is not a priority queue",
710                        queue
711                    )));
712                }
713                if let Some(max_size) = config.max_size {
714                    let current_len =
715                        load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
716                            .len();
717                    if current_len >= max_size {
718                        return Err(RedDBError::Query(format!(
719                            "queue '{}' is full (max_size={max_size})",
720                            queue
721                        )));
722                    }
723                }
724
725                let position = next_queue_position(store.as_ref(), queue, *side)?;
726                let mut entity = UnifiedEntity::new(
727                    EntityId::new(0),
728                    EntityKind::QueueMessage {
729                        queue: queue.clone(),
730                        position,
731                    },
732                    EntityData::QueueMessage(QueueMessageData {
733                        payload: value.clone(),
734                        priority: if config.priority { *priority } else { None },
735                        enqueued_at_ns: now_ns(),
736                        attempts: 0,
737                        max_attempts: config.max_attempts,
738                        acked: false,
739                    }),
740                );
741                // Phase 1.1 MVCC universal: stamp xmin so other
742                // connections don't see this message until COMMIT.
743                if let Some(xid) = self.current_xid() {
744                    entity.set_xmin(xid);
745                }
746                let id = store
747                    .insert_auto(queue, entity)
748                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
749                // Resolve per-message availability (issue #722): DELAY is
750                // relative to the push instant, AVAILABLE AT carries an
751                // absolute unix-ms. Both collapse to a unix-ns timestamp
752                // delivery paths compare against. `None` means immediate.
753                let available_at_ns = available.map(|a| match a {
754                    crate::storage::query::ast::QueueAvailability::DelayMs(ms) => {
755                        now_ns().saturating_add(ms.saturating_mul(1_000_000))
756                    }
757                    crate::storage::query::ast::QueueAvailability::AtUnixMs(ms) => {
758                        ms.saturating_mul(1_000_000)
759                    }
760                });
761                if config.ttl_ms.is_some() || available_at_ns.is_some() {
762                    store
763                        .set_metadata(
764                            queue,
765                            id,
766                            queue_message_metadata(config.ttl_ms, available_at_ns),
767                        )
768                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
769                }
770                // Slice C of PRD #718 — wake `QUEUE READ … WAIT` waiters.
771                // Under autocommit this fires immediately; inside a txn
772                // the wake is buffered and replayed on COMMIT (rollback
773                // discards it so rolled-back enqueues do not deliver).
774                self.record_queue_wake(&queue_wait_scope(), queue);
775                self.invalidate_result_cache();
776
777                let mut result = UnifiedResult::with_columns(vec![
778                    "message_id".into(),
779                    "side".into(),
780                    "queue".into(),
781                ]);
782                let mut record = UnifiedRecord::new();
783                record.set("message_id", Value::text(message_id_string(id)));
784                record.set(
785                    "side",
786                    Value::text(match side {
787                        QueueSide::Left => "left".to_string(),
788                        QueueSide::Right => "right".to_string(),
789                    }),
790                );
791                record.set("queue", Value::text(queue.clone()));
792                result.push(record);
793
794                Ok(RuntimeQueryResult {
795                    query: raw_query.to_string(),
796                    mode: QueryMode::Sql,
797                    statement: "queue_push",
798                    engine: "runtime-queue",
799                    result,
800                    affected_rows: 1,
801                    statement_type: "insert",
802                    bookmark: None,
803                })
804            }
805            QueueCommand::Pop { queue, side, count } => {
806                let store = self.inner.db.store();
807                ensure_queue_exists(store.as_ref(), queue)?;
808                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
809                let popped = lifecycle
810                    .pop(queue, ast_side_to_lc(*side), *count, &txn)
811                    .map_err(map_qse)?;
812
813                let mut result =
814                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
815                for (message_id, payload) in &popped {
816                    let mut record = UnifiedRecord::new();
817                    record.set(
818                        "message_id",
819                        Value::text(message_id_string(EntityId::new(*message_id))),
820                    );
821                    record.set("payload", payload.clone());
822                    result.push(record);
823                }
824                let popped_count = popped.len() as u64;
825                if popped_count > 0 {
826                    self.invalidate_result_cache();
827                }
828
829                Ok(RuntimeQueryResult {
830                    query: raw_query.to_string(),
831                    mode: QueryMode::Sql,
832                    statement: "queue_pop",
833                    engine: "runtime-queue",
834                    result,
835                    affected_rows: popped_count,
836                    statement_type: "delete",
837                    bookmark: None,
838                })
839            }
840            QueueCommand::Peek { queue, count } => {
841                let store = self.inner.db.store();
842                ensure_queue_exists(store.as_ref(), queue)?;
843                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
844                let messages = lifecycle.peek(queue, *count, &txn);
845
846                let mut result =
847                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
848                for message in messages {
849                    let mut record = UnifiedRecord::new();
850                    record.set(
851                        "message_id",
852                        Value::text(message_id_string(EntityId::new(message.message_id))),
853                    );
854                    record.set("payload", message.payload);
855                    result.push(record);
856                }
857
858                Ok(RuntimeQueryResult {
859                    query: raw_query.to_string(),
860                    mode: QueryMode::Sql,
861                    statement: "queue_peek",
862                    engine: "runtime-queue",
863                    result,
864                    affected_rows: 0,
865                    statement_type: "select",
866                    bookmark: None,
867                })
868            }
869            QueueCommand::Len { queue } => {
870                let store = self.inner.db.store();
871                ensure_queue_exists(store.as_ref(), queue)?;
872                let count =
873                    load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
874                        as u64;
875                let mut result = UnifiedResult::with_columns(vec!["len".into()]);
876                let mut record = UnifiedRecord::new();
877                record.set("len", Value::UnsignedInteger(count));
878                result.push(record);
879
880                Ok(RuntimeQueryResult {
881                    query: raw_query.to_string(),
882                    mode: QueryMode::Sql,
883                    statement: "queue_len",
884                    engine: "runtime-queue",
885                    result,
886                    affected_rows: 0,
887                    statement_type: "select",
888                    bookmark: None,
889                })
890            }
891            QueueCommand::Purge { queue } => {
892                let store = self.inner.db.store();
893                ensure_queue_exists(store.as_ref(), queue)?;
894                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
895                let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
896                if count > 0 {
897                    self.invalidate_result_cache();
898                }
899
900                Ok(RuntimeQueryResult::ok_message(
901                    raw_query.to_string(),
902                    &format!("{count} messages purged from queue '{queue}'"),
903                    "delete",
904                ))
905            }
906            QueueCommand::GroupCreate { queue, group } => {
907                let store = self.inner.db.store();
908                ensure_queue_exists(store.as_ref(), queue)?;
909                if queue_group_exists(store.as_ref(), queue, group)? {
910                    return Ok(RuntimeQueryResult::ok_message(
911                        raw_query.to_string(),
912                        &format!(
913                            "consumer group '{}' already exists on queue '{}'",
914                            group, queue
915                        ),
916                        "create",
917                    ));
918                }
919                save_queue_group(store.as_ref(), queue, group)?;
920                self.invalidate_result_cache();
921
922                Ok(RuntimeQueryResult::ok_message(
923                    raw_query.to_string(),
924                    &format!("consumer group '{}' created on queue '{}'", group, queue),
925                    "create",
926                ))
927            }
928            QueueCommand::GroupRead {
929                queue,
930                group,
931                consumer,
932                count,
933                wait_ms,
934            } => {
935                let store = self.inner.db.store();
936                ensure_queue_exists(store.as_ref(), queue)?;
937                // Slice B of PRD #718: reject `WAIT` issued inside an
938                // explicit transaction, and reject `WAIT > cap` before
939                // any waiter is registered. Both checks fire before the
940                // lifecycle is touched so a refused statement leaves
941                // no side effects (no group auto-create, no parking).
942                if let Some(ms) = *wait_ms {
943                    if self.current_xid().is_some() {
944                        return Err(RedDBError::Query(
945                            "QUEUE READ … WAIT is autocommit-only: refusing to park inside an explicit transaction (BEGIN/COMMIT)"
946                                .to_string(),
947                        ));
948                    }
949                    let cap =
950                        self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
951                    if ms > cap {
952                        return Err(RedDBError::Query(format!(
953                            "QUEUE READ … WAIT {ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
954                        )));
955                    }
956                }
957                // Resolve the consumer group up-front so the lifecycle
958                // sees the same auto-created `_work_default` / fanout
959                // group the legacy `read_messages` would have minted.
960                let config = load_queue_config(store.as_ref(), queue);
961                let group_owned =
962                    resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
963                let group_ref = group_owned.as_str();
964                let delivered = self
965                    .group_read_with_optional_wait(queue, group_ref, consumer, *count, *wait_ms)?;
966
967                // Issue #742 — record consumer presence on every read,
968                // including empty returns. Heartbeat-driven aliveness
969                // is the contract; pending deliveries don't define it.
970                {
971                    let lease_count = u32::try_from(delivered.len()).unwrap_or(u32::MAX);
972                    let now_ns = std::time::SystemTime::now()
973                        .duration_since(std::time::UNIX_EPOCH)
974                        .map(|d| d.as_nanos() as u64)
975                        .unwrap_or(0);
976                    self.queue_presence().heartbeat(
977                        queue,
978                        group_ref,
979                        consumer,
980                        lease_count,
981                        now_ns,
982                    );
983                }
984
985                let mut result = UnifiedResult::with_columns(vec![
986                    "message_id".into(),
987                    "payload".into(),
988                    "consumer".into(),
989                    "delivery_count".into(),
990                    "attempts".into(),
991                ]);
992
993                for message in delivered {
994                    let mut record = UnifiedRecord::new();
995                    record.set(
996                        "message_id",
997                        Value::text(message_id_string(EntityId::new(message.message_id))),
998                    );
999                    record.set("payload", message.payload);
1000                    record.set("consumer", Value::text(message.consumer));
1001                    record.set(
1002                        "delivery_count",
1003                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1004                    );
1005                    record.set(
1006                        "attempts",
1007                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1008                    );
1009                    result.push(record);
1010                }
1011                if !result.records.is_empty() {
1012                    self.invalidate_result_cache();
1013                }
1014
1015                Ok(RuntimeQueryResult {
1016                    query: raw_query.to_string(),
1017                    mode: QueryMode::Sql,
1018                    statement: "queue_group_read",
1019                    engine: "runtime-queue",
1020                    result,
1021                    affected_rows: 0,
1022                    statement_type: "select",
1023                    bookmark: None,
1024                })
1025            }
1026            QueueCommand::Pending { queue, group } => {
1027                let store = self.inner.db.store();
1028                ensure_queue_exists(store.as_ref(), queue)?;
1029                require_queue_group(store.as_ref(), queue, group)?;
1030                let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
1031                pending.sort_by_key(|entry| entry.delivered_at_ns);
1032                let current_time_ns = now_ns();
1033
1034                let mut result = UnifiedResult::with_columns(vec![
1035                    "message_id".into(),
1036                    "consumer".into(),
1037                    "delivered_at_ns".into(),
1038                    "delivery_count".into(),
1039                    "idle_ms".into(),
1040                ]);
1041                for entry in pending {
1042                    let mut record = UnifiedRecord::new();
1043                    record.set(
1044                        "message_id",
1045                        Value::text(message_id_string(entry.message_id)),
1046                    );
1047                    record.set("consumer", Value::text(entry.consumer));
1048                    record.set(
1049                        "delivered_at_ns",
1050                        Value::UnsignedInteger(entry.delivered_at_ns),
1051                    );
1052                    record.set(
1053                        "delivery_count",
1054                        Value::UnsignedInteger(u64::from(entry.delivery_count)),
1055                    );
1056                    record.set(
1057                        "idle_ms",
1058                        Value::UnsignedInteger(
1059                            current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
1060                        ),
1061                    );
1062                    result.push(record);
1063                }
1064
1065                Ok(RuntimeQueryResult {
1066                    query: raw_query.to_string(),
1067                    mode: QueryMode::Sql,
1068                    statement: "queue_pending",
1069                    engine: "runtime-queue",
1070                    result,
1071                    affected_rows: 0,
1072                    statement_type: "select",
1073                    bookmark: None,
1074                })
1075            }
1076            QueueCommand::Claim {
1077                queue,
1078                group,
1079                consumer,
1080                min_idle_ms,
1081            } => {
1082                let store = self.inner.db.store();
1083                ensure_queue_exists(store.as_ref(), queue)?;
1084                require_queue_group(store.as_ref(), queue, group)?;
1085                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1086                let delivered = lifecycle
1087                    .claim_delivering(queue, consumer, *min_idle_ms, &txn)
1088                    .map_err(map_qse)?;
1089
1090                let mut result = UnifiedResult::with_columns(vec![
1091                    "message_id".into(),
1092                    "payload".into(),
1093                    "consumer".into(),
1094                    "delivery_count".into(),
1095                ]);
1096
1097                for message in delivered {
1098                    let mut record = UnifiedRecord::new();
1099                    record.set(
1100                        "message_id",
1101                        Value::text(message_id_string(EntityId::new(message.message_id))),
1102                    );
1103                    record.set("payload", message.payload);
1104                    record.set("consumer", Value::text(message.consumer));
1105                    record.set(
1106                        "delivery_count",
1107                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1108                    );
1109                    result.push(record);
1110                }
1111                if !result.records.is_empty() {
1112                    self.invalidate_result_cache();
1113                }
1114                let affected_rows = result.records.len() as u64;
1115
1116                Ok(RuntimeQueryResult {
1117                    query: raw_query.to_string(),
1118                    mode: QueryMode::Sql,
1119                    statement: "queue_claim",
1120                    engine: "runtime-queue",
1121                    result,
1122                    affected_rows,
1123                    statement_type: "update",
1124                    bookmark: None,
1125                })
1126            }
1127            QueueCommand::Ack {
1128                queue,
1129                group,
1130                message_id,
1131                delivery_id,
1132            } => {
1133                let store = self.inner.db.store();
1134                ensure_queue_exists(store.as_ref(), queue)?;
1135                let (group_owned, message_entity) = resolve_ack_nack_handle(
1136                    store.as_ref(),
1137                    queue,
1138                    group,
1139                    message_id,
1140                    delivery_id.as_deref(),
1141                )?;
1142                let group_ref = group_owned.as_str();
1143                require_queue_group(store.as_ref(), queue, group_ref)?;
1144                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1145                let did = match delivery_id.as_deref() {
1146                    Some(d) => d.to_string(),
1147                    None => ps
1148                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
1149                        .ok_or_else(|| {
1150                            RedDBError::NotFound(format!(
1151                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
1152                                message_entity.raw(),
1153                                queue,
1154                                group_ref
1155                            ))
1156                        })?,
1157                };
1158                lifecycle.ack(&txn, &did).map_err(map_qse)?;
1159                self.invalidate_result_cache();
1160
1161                Ok(RuntimeQueryResult::ok_message(
1162                    raw_query.to_string(),
1163                    "message acknowledged",
1164                    "update",
1165                ))
1166            }
1167            QueueCommand::Nack {
1168                queue,
1169                group,
1170                message_id,
1171                delivery_id,
1172                delay_ms,
1173            } => {
1174                let store = self.inner.db.store();
1175                ensure_queue_exists(store.as_ref(), queue)?;
1176                let config = load_queue_config(store.as_ref(), queue);
1177                // Issue #723: a per-failure DELAY override is a write
1178                // operation that re-shapes retry behavior; readers must
1179                // not be able to silently re-schedule another worker's
1180                // work. Embedded callers (no auth identity attached)
1181                // are trusted and bypass the check.
1182                if delay_ms.is_some() {
1183                    if let Some((_, role)) = current_auth_identity() {
1184                        if !role.can_write() {
1185                            return Err(RedDBError::InvalidOperation(format!(
1186                                "role '{role}' is not authorized to override NACK retry delay on queue '{queue}'"
1187                            )));
1188                        }
1189                    }
1190                }
1191                let (group_owned, message_entity) = resolve_ack_nack_handle(
1192                    store.as_ref(),
1193                    queue,
1194                    group,
1195                    message_id,
1196                    delivery_id.as_deref(),
1197                )?;
1198                let group_ref = group_owned.as_str();
1199                require_queue_group(store.as_ref(), queue, group_ref)?;
1200                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1201                let did = match delivery_id.as_deref() {
1202                    Some(d) => d.to_string(),
1203                    None => ps
1204                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
1205                        .ok_or_else(|| {
1206                            RedDBError::NotFound(format!(
1207                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
1208                                message_entity.raw(),
1209                                queue,
1210                                group_ref
1211                            ))
1212                        })?,
1213                };
1214                // Resolve the effective retry delay: per-failure
1215                // override wins, then queue default, then zero
1216                // (immediate requeue — pre-#723 behavior).
1217                let effective_delay_ms = delay_ms.or(config.retry_delay_ms).unwrap_or(0);
1218                let outcome = lifecycle.nack(&txn, &did).map_err(map_qse)?;
1219                // Apply delay only when the message was actually
1220                // requeued — DLQ promotion / drop terminate the
1221                // retry cycle and a delay would be meaningless.
1222                if matches!(outcome, RetirementOutcome::Requeued) && effective_delay_ms > 0 {
1223                    let at_ns =
1224                        now_ns().saturating_add(effective_delay_ms.saturating_mul(1_000_000));
1225                    set_message_available_at_ns(
1226                        store.as_ref(),
1227                        queue,
1228                        message_entity,
1229                        Some(at_ns),
1230                        config.ttl_ms,
1231                    )?;
1232                }
1233                // Issue #723: routine retries do not flood audit
1234                // channels (telemetry already covers them via
1235                // `queue_nacked_total{outcome=...}`). Significant
1236                // overrides — large delays, destination changes,
1237                // drops — are audited so operators see the events
1238                // that re-shape operational risk.
1239                self.maybe_emit_nack_audit(
1240                    queue,
1241                    group_ref,
1242                    &did,
1243                    *delay_ms,
1244                    config.retry_delay_ms,
1245                    &outcome,
1246                );
1247                let message = match outcome {
1248                    RetirementOutcome::Requeued => {
1249                        if effective_delay_ms > 0 {
1250                            format!("message requeued (delay={effective_delay_ms}ms)")
1251                        } else {
1252                            "message requeued".to_string()
1253                        }
1254                    }
1255                    RetirementOutcome::MovedToDlq(dlq) => {
1256                        format!("message moved to dead-letter queue '{}'", dlq)
1257                    }
1258                    RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
1259                };
1260                self.invalidate_result_cache();
1261
1262                Ok(RuntimeQueryResult::ok_message(
1263                    raw_query.to_string(),
1264                    &message,
1265                    "update",
1266                ))
1267            }
1268            QueueCommand::Move {
1269                source,
1270                destination,
1271                filter,
1272                limit,
1273            } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
1274        }
1275    }
1276
1277    pub fn execute_queue_select(
1278        &self,
1279        raw_query: &str,
1280        query: &QueueSelectQuery,
1281    ) -> RedDBResult<RuntimeQueryResult> {
1282        let store = self.inner.db.store();
1283        ensure_queue_exists(store.as_ref(), &query.queue)?;
1284        let config = load_queue_config(store.as_ref(), &query.queue);
1285        let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
1286        let columns = if query.columns.is_empty() {
1287            queue_projection_default_columns()
1288        } else {
1289            query.columns.clone()
1290        };
1291
1292        let mut messages =
1293            load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1294        sort_queue_messages(&mut messages, &config, QueueSide::Left);
1295
1296        let mut result = UnifiedResult::with_columns(columns.clone());
1297        for message in messages {
1298            if query
1299                .filter
1300                .as_ref()
1301                .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1302            {
1303                continue;
1304            }
1305            let record = queue_projection_record(&columns, &message, dlq)?;
1306            result.push(record);
1307            if query
1308                .limit
1309                .is_some_and(|limit| result.records.len() >= limit as usize)
1310            {
1311                break;
1312            }
1313        }
1314
1315        Ok(RuntimeQueryResult {
1316            query: raw_query.to_string(),
1317            mode: QueryMode::Sql,
1318            statement: "queue_select",
1319            engine: "runtime-queue",
1320            result,
1321            affected_rows: 0,
1322            statement_type: "select",
1323            bookmark: None,
1324        })
1325    }
1326
1327    fn execute_queue_move(
1328        &self,
1329        raw_query: &str,
1330        source: &str,
1331        destination: &str,
1332        filter: Option<&Filter>,
1333        limit: usize,
1334    ) -> RedDBResult<RuntimeQueryResult> {
1335        if source == destination {
1336            return Err(RedDBError::Query(
1337                "QUEUE MOVE source and destination must be different".to_string(),
1338            ));
1339        }
1340        let store = self.inner.db.store();
1341        ensure_queue_exists(store.as_ref(), source)?;
1342        ensure_queue_exists(store.as_ref(), destination)?;
1343        let source_config = load_queue_config(store.as_ref(), source);
1344        let destination_config = load_queue_config(store.as_ref(), destination);
1345        let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1346
1347        let mut messages =
1348            load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1349        sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1350        let selected = messages
1351            .into_iter()
1352            .filter(|message| {
1353                filter
1354                    .map(|f| queue_message_matches_filter(message, source_dlq, f))
1355                    .unwrap_or(true)
1356            })
1357            .take(limit)
1358            .collect::<Vec<_>>();
1359
1360        if let Some(max_size) = destination_config.max_size {
1361            let current_len =
1362                load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1363                    .len();
1364            if current_len + selected.len() > max_size {
1365                return Err(RedDBError::Query(format!(
1366                    "queue '{}' is full (max_size={max_size})",
1367                    destination
1368                )));
1369            }
1370        }
1371
1372        for message in &selected {
1373            let lock = queue_message_lock_handle(self, source, message.id);
1374            let Some(_guard) = lock.try_lock() else {
1375                return Err(RedDBError::Query(format!(
1376                    "message '{}' is locked on queue '{}'",
1377                    message.id.raw(),
1378                    source
1379                )));
1380            };
1381            if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1382                return Err(RedDBError::Query(format!(
1383                    "message '{}' is no longer available on queue '{}'",
1384                    message.id.raw(),
1385                    source
1386                )));
1387            }
1388        }
1389
1390        let mut inserted = Vec::new();
1391        for message in &selected {
1392            match insert_moved_queue_message(
1393                store.as_ref(),
1394                destination,
1395                &destination_config,
1396                message,
1397            ) {
1398                Ok(id) => inserted.push(id),
1399                Err(err) => {
1400                    for id in inserted {
1401                        let _ = store.delete(destination, id);
1402                    }
1403                    return Err(err);
1404                }
1405            }
1406        }
1407
1408        let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1409        for message in &selected {
1410            move_lifecycle
1411                .delete_with_state(source, message.id.raw(), &move_txn)
1412                .map_err(map_qse)?;
1413        }
1414        if !selected.is_empty() {
1415            self.invalidate_result_cache();
1416        }
1417
1418        let selected_count = selected.len() as u64;
1419        self.audit_log().record_event(
1420            AuditEvent::builder("queue/move")
1421                .source(AuditAuthSource::System)
1422                .outcome(Outcome::Success)
1423                .resource(format!("queue:{source}->{destination}"))
1424                .fields([
1425                    AuditFieldEscaper::field("source", source),
1426                    AuditFieldEscaper::field("destination", destination),
1427                    AuditFieldEscaper::field("selected", selected_count),
1428                    AuditFieldEscaper::field("committed", selected_count),
1429                ])
1430                .build(),
1431        );
1432
1433        let mut result = UnifiedResult::with_columns(vec![
1434            "source".into(),
1435            "destination".into(),
1436            "selected".into(),
1437            "committed".into(),
1438        ]);
1439        let mut record = UnifiedRecord::new();
1440        record.set("source", Value::text(source.to_string()));
1441        record.set("destination", Value::text(destination.to_string()));
1442        record.set("selected", Value::UnsignedInteger(selected_count));
1443        record.set("committed", Value::UnsignedInteger(selected_count));
1444        result.push(record);
1445
1446        Ok(RuntimeQueryResult {
1447            query: raw_query.to_string(),
1448            mode: QueryMode::Sql,
1449            statement: "queue_move",
1450            engine: "runtime-queue",
1451            result,
1452            affected_rows: selected_count,
1453            statement_type: "update",
1454            bookmark: None,
1455        })
1456    }
1457
1458    /// Issue #723: routine retries are observable through metrics
1459    /// (`queue_nacked_total{outcome=...}`) so this is the audit
1460    /// shoulder for the *non-routine* cases: explicit NACK delay
1461    /// overrides whose magnitude or destination-changing impact would
1462    /// be invisible in metrics alone. Specifically:
1463    ///
1464    /// - Explicit override ≥ 60s (a worker decided to defer well past
1465    ///   the queue's default cadence — operators care).
1466    /// - Override that lands on a DLQ promotion or drop (destination
1467    ///   changed; the override may have influenced retire-vs-requeue
1468    ///   accounting on the caller's side and the audit trail needs to
1469    ///   show who asked).
1470    ///
1471    /// Calls with no override are intentionally silent here.
1472    fn maybe_emit_nack_audit(
1473        &self,
1474        queue: &str,
1475        group: &str,
1476        delivery_id: &str,
1477        override_ms: Option<u64>,
1478        default_ms: Option<u64>,
1479        outcome: &RetirementOutcome,
1480    ) {
1481        let Some(override_ms) = override_ms else {
1482            return;
1483        };
1484        let outcome_label = match outcome {
1485            RetirementOutcome::Requeued => "requeued",
1486            RetirementOutcome::MovedToDlq(_) => "dlq",
1487            RetirementOutcome::Dropped => "dropped",
1488        };
1489        const SIGNIFICANT_DELAY_MS: u64 = 60_000;
1490        let destination_changed = !matches!(outcome, RetirementOutcome::Requeued);
1491        if override_ms < SIGNIFICANT_DELAY_MS && !destination_changed {
1492            return;
1493        }
1494        self.audit_log().record_event(
1495            AuditEvent::builder("queue/nack/override")
1496                .source(AuditAuthSource::System)
1497                .outcome(Outcome::Success)
1498                .resource(format!("queue:{queue}"))
1499                .fields([
1500                    AuditFieldEscaper::field("queue", queue),
1501                    AuditFieldEscaper::field("group", group),
1502                    AuditFieldEscaper::field("delivery_id", delivery_id),
1503                    AuditFieldEscaper::field("override_delay_ms", override_ms),
1504                    AuditFieldEscaper::field("default_delay_ms", default_ms.unwrap_or(0)),
1505                    AuditFieldEscaper::field("outcome", outcome_label),
1506                ])
1507                .build(),
1508        );
1509    }
1510}
1511
1512fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1513    if store.get_collection(queue).is_some() {
1514        Ok(())
1515    } else {
1516        Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1517    }
1518}
1519
1520pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1521    let default = QueueRuntimeConfig {
1522        mode: QueueMode::Work,
1523        priority: false,
1524        max_size: None,
1525        ttl_ms: None,
1526        dlq: None,
1527        max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1528        lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1529        in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1530        retry_delay_ms: None,
1531    };
1532
1533    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1534        return default;
1535    };
1536    manager
1537        .query_all(|entity| {
1538            entity.data.as_row().is_some_and(|row| {
1539                row_text(row, "kind").as_deref() == Some("queue_config")
1540                    && row_text(row, "queue").as_deref() == Some(queue)
1541            })
1542        })
1543        .into_iter()
1544        .find_map(|entity| {
1545            let row = entity.data.as_row()?;
1546            Some(QueueRuntimeConfig {
1547                mode: row_text(row, "mode")
1548                    .as_deref()
1549                    .and_then(QueueMode::parse)
1550                    .unwrap_or_default(),
1551                priority: row_bool(row, "priority").unwrap_or(false),
1552                max_size: row_u64(row, "max_size").map(|value| value as usize),
1553                ttl_ms: row_u64(row, "ttl_ms"),
1554                dlq: row_text(row, "dlq"),
1555                max_attempts: row_u64(row, "max_attempts")
1556                    .map(|value| value as u32)
1557                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1558                lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1559                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1560                in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1561                    .map(|value| value as u32)
1562                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1563                retry_delay_ms: row_u64(row, "retry_delay_ms").filter(|v| *v > 0),
1564            })
1565        })
1566        .unwrap_or(default)
1567}
1568
1569pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1570    load_queue_config(store, queue).mode.as_str()
1571}
1572
1573fn save_queue_config(
1574    store: &UnifiedStore,
1575    queue: &str,
1576    config: &QueueRuntimeConfig,
1577) -> RedDBResult<()> {
1578    remove_meta_rows(store, |row| {
1579        row_text(row, "kind").as_deref() == Some("queue_config")
1580            && row_text(row, "queue").as_deref() == Some(queue)
1581    });
1582
1583    let mut fields = HashMap::new();
1584    fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1585    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1586    fields.insert(
1587        "mode".to_string(),
1588        Value::text(config.mode.as_str().to_string()),
1589    );
1590    fields.insert("priority".to_string(), Value::Boolean(config.priority));
1591    fields.insert(
1592        "max_size".to_string(),
1593        config
1594            .max_size
1595            .map(|value| Value::UnsignedInteger(value as u64))
1596            .unwrap_or(Value::Null),
1597    );
1598    fields.insert(
1599        "ttl_ms".to_string(),
1600        config
1601            .ttl_ms
1602            .map(Value::UnsignedInteger)
1603            .unwrap_or(Value::Null),
1604    );
1605    fields.insert(
1606        "dlq".to_string(),
1607        config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1608    );
1609    fields.insert(
1610        "max_attempts".to_string(),
1611        Value::UnsignedInteger(u64::from(config.max_attempts)),
1612    );
1613    fields.insert(
1614        "lock_deadline_ms".to_string(),
1615        Value::UnsignedInteger(config.lock_deadline_ms),
1616    );
1617    fields.insert(
1618        "in_flight_cap_per_group".to_string(),
1619        Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1620    );
1621    fields.insert(
1622        "retry_delay_ms".to_string(),
1623        config
1624            .retry_delay_ms
1625            .map(Value::UnsignedInteger)
1626            .unwrap_or(Value::Null),
1627    );
1628    insert_meta_row(store, fields)
1629}
1630
1631fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1632    remove_meta_rows(store, |row| {
1633        row_text(row, "queue").as_deref() == Some(queue)
1634    });
1635}
1636
1637fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1638    Ok(load_queue_groups(store, queue)?
1639        .into_iter()
1640        .any(|entry| entry.group == group))
1641}
1642
1643pub(super) fn require_queue_group(
1644    store: &UnifiedStore,
1645    queue: &str,
1646    group: &str,
1647) -> RedDBResult<()> {
1648    if queue_group_exists(store, queue, group)? {
1649        Ok(())
1650    } else {
1651        Err(RedDBError::NotFound(format!(
1652            "consumer group '{}' not found on queue '{}'",
1653            group, queue
1654        )))
1655    }
1656}
1657
1658pub(super) fn resolve_read_group(
1659    store: &UnifiedStore,
1660    queue: &str,
1661    group: Option<&str>,
1662    consumer: &str,
1663    config: &QueueRuntimeConfig,
1664) -> RedDBResult<String> {
1665    if let Some(group) = group {
1666        require_queue_group(store, queue, group)?;
1667        return Ok(group.to_string());
1668    }
1669
1670    match config.mode {
1671        QueueMode::Work => {
1672            if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1673                save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1674            }
1675            Ok(WORK_DEFAULT_GROUP.to_string())
1676        }
1677        QueueMode::Fanout => {
1678            let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1679            if !queue_group_exists(store, queue, &fanout_group)? {
1680                save_queue_group(store, queue, &fanout_group)?;
1681            }
1682            Ok(fanout_group)
1683        }
1684    }
1685}
1686
1687fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1688    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1689        return Ok(Vec::new());
1690    };
1691    Ok(manager
1692        .query_all(|entity| {
1693            entity.data.as_row().is_some_and(|row| {
1694                row_text(row, "kind").as_deref() == Some("queue_group")
1695                    && row_text(row, "queue").as_deref() == Some(queue)
1696            })
1697        })
1698        .into_iter()
1699        .filter_map(|entity| {
1700            let row = entity.data.as_row()?;
1701            Some(QueueGroupEntry {
1702                entity_id: entity.id,
1703                group: row_text(row, "group")?,
1704            })
1705        })
1706        .collect())
1707}
1708
1709fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1710    let mut fields = HashMap::new();
1711    fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1712    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1713    fields.insert("group".to_string(), Value::text(group.to_string()));
1714    fields.insert(
1715        "created_at_ns".to_string(),
1716        Value::UnsignedInteger(now_ns()),
1717    );
1718    insert_meta_row(store, fields)
1719}
1720
1721pub(super) fn load_pending_entries(
1722    store: &UnifiedStore,
1723    queue: &str,
1724    group: Option<&str>,
1725    message_id: Option<EntityId>,
1726) -> RedDBResult<Vec<QueuePendingEntry>> {
1727    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1728        return Ok(Vec::new());
1729    };
1730    Ok(manager
1731        .query_all(|entity| {
1732            entity.data.as_row().is_some_and(|row| {
1733                row_text(row, "kind").as_deref() == Some("queue_pending")
1734                    && row_text(row, "queue").as_deref() == Some(queue)
1735                    && group
1736                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1737                        .unwrap_or(true)
1738                    && message_id
1739                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1740                        .unwrap_or(true)
1741            })
1742        })
1743        .into_iter()
1744        .filter_map(|entity| {
1745            let row = entity.data.as_row()?;
1746            Some(QueuePendingEntry {
1747                entity_id: entity.id,
1748                group: row_text(row, "group")?,
1749                message_id: EntityId::new(row_u64(row, "message_id")?),
1750                consumer: row_text(row, "consumer")?,
1751                delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1752                delivery_count: row_u64(row, "delivery_count")
1753                    .map(|value| value as u32)
1754                    .unwrap_or(1),
1755            })
1756        })
1757        .collect())
1758}
1759
1760pub(super) fn save_queue_pending(
1761    store: &UnifiedStore,
1762    queue: &str,
1763    group: &str,
1764    message_id: EntityId,
1765    consumer: &str,
1766    delivered_at_ns: u64,
1767    delivery_count: u32,
1768) -> RedDBResult<()> {
1769    remove_meta_rows(store, |row| {
1770        row_text(row, "kind").as_deref() == Some("queue_pending")
1771            && row_text(row, "queue").as_deref() == Some(queue)
1772            && row_text(row, "group").as_deref() == Some(group)
1773            && row_u64(row, "message_id") == Some(message_id.raw())
1774    });
1775
1776    let mut fields = HashMap::new();
1777    fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1778    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1779    fields.insert("group".to_string(), Value::text(group.to_string()));
1780    fields.insert(
1781        "message_id".to_string(),
1782        Value::UnsignedInteger(message_id.raw()),
1783    );
1784    fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1785    fields.insert(
1786        "delivered_at_ns".to_string(),
1787        Value::UnsignedInteger(delivered_at_ns),
1788    );
1789    fields.insert(
1790        "delivery_count".to_string(),
1791        Value::UnsignedInteger(u64::from(delivery_count)),
1792    );
1793    insert_meta_row(store, fields)
1794}
1795
1796pub(super) fn require_pending_entry(
1797    store: &UnifiedStore,
1798    queue: &str,
1799    group: &str,
1800    message_id: EntityId,
1801) -> RedDBResult<QueuePendingEntry> {
1802    load_pending_entries(store, queue, Some(group), Some(message_id))?
1803        .into_iter()
1804        .next()
1805        .ok_or_else(|| {
1806            RedDBError::NotFound(format!(
1807                "message '{}' is not pending in group '{}' on queue '{}'",
1808                message_id.raw(),
1809                group,
1810                queue
1811            ))
1812        })
1813}
1814
1815pub(super) fn load_ack_entries(
1816    store: &UnifiedStore,
1817    queue: &str,
1818    group: Option<&str>,
1819    message_id: Option<EntityId>,
1820) -> RedDBResult<Vec<QueueAckEntry>> {
1821    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1822        return Ok(Vec::new());
1823    };
1824    Ok(manager
1825        .query_all(|entity| {
1826            entity.data.as_row().is_some_and(|row| {
1827                row_text(row, "kind").as_deref() == Some("queue_ack")
1828                    && row_text(row, "queue").as_deref() == Some(queue)
1829                    && group
1830                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1831                        .unwrap_or(true)
1832                    && message_id
1833                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1834                        .unwrap_or(true)
1835            })
1836        })
1837        .into_iter()
1838        .filter_map(|entity| {
1839            let row = entity.data.as_row()?;
1840            Some(QueueAckEntry {
1841                entity_id: entity.id,
1842                group: row_text(row, "group")?,
1843                message_id: EntityId::new(row_u64(row, "message_id")?),
1844            })
1845        })
1846        .collect())
1847}
1848
1849pub(super) fn save_queue_ack(
1850    store: &UnifiedStore,
1851    queue: &str,
1852    group: &str,
1853    message_id: EntityId,
1854) -> RedDBResult<()> {
1855    let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1856    if !existing.is_empty() {
1857        return Ok(());
1858    }
1859
1860    let mut fields = HashMap::new();
1861    fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1862    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1863    fields.insert("group".to_string(), Value::text(group.to_string()));
1864    fields.insert(
1865        "message_id".to_string(),
1866        Value::UnsignedInteger(message_id.raw()),
1867    );
1868    fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1869    insert_meta_row(store, fields)
1870}
1871
1872pub(super) fn queue_message_completed_for_all_groups(
1873    store: &UnifiedStore,
1874    queue: &str,
1875    message_id: EntityId,
1876) -> RedDBResult<bool> {
1877    let groups = load_queue_groups(store, queue)?;
1878    let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1879    if !pending.is_empty() {
1880        return Ok(false);
1881    }
1882    if groups.is_empty() {
1883        return Ok(true);
1884    }
1885
1886    let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1887        .into_iter()
1888        .map(|entry| entry.group)
1889        .collect::<HashSet<_>>();
1890    Ok(groups
1891        .into_iter()
1892        .all(|group| acked_groups.contains(&group.group)))
1893}
1894
1895fn load_queue_message_views(
1896    store: &UnifiedStore,
1897    queue: &str,
1898) -> RedDBResult<Vec<QueueMessageView>> {
1899    load_queue_message_views_with_runtime(None, store, queue)
1900}
1901
1902/// Kind-aware queue scan (Phase 2.5.5 RLS universal). When the
1903/// caller has a `RedDBRuntime` reference, the gate also applies
1904/// any `CREATE POLICY ... ON MESSAGES OF <queue>` predicate. In
1905/// autocommit / embedded paths that only have the raw store (e.g.
1906/// purge loops) we skip RLS because there's no session identity
1907/// to match against.
1908pub(super) fn load_queue_message_views_with_runtime(
1909    runtime: Option<&RedDBRuntime>,
1910    store: &UnifiedStore,
1911    queue: &str,
1912) -> RedDBResult<Vec<QueueMessageView>> {
1913    let manager = store
1914        .get_collection(queue)
1915        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1916    // Phase 1.2 MVCC universal: capture before parallel scan. Messages
1917    // inserted by another connection's open txn stay invisible to
1918    // consumers until that txn commits (prevents phantom POPs).
1919    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1920    let rls_filter = runtime.and_then(|rt| {
1921        crate::runtime::impl_core::rls_policy_filter_for_kind(
1922            rt,
1923            queue,
1924            crate::storage::query::ast::PolicyAction::Select,
1925            crate::storage::query::ast::PolicyTargetKind::Messages,
1926        )
1927    });
1928    let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1929        && rls_filter.is_none()
1930        && runtime.is_some();
1931    if rls_enabled_but_denied {
1932        // RLS on + no Messages policy for this role = deny-default.
1933        return Ok(Vec::new());
1934    }
1935    let filter_arc = rls_filter.map(std::sync::Arc::new);
1936    let rt_arc = runtime;
1937    Ok(manager
1938        .query_all(move |entity| {
1939            if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1940                return false;
1941            }
1942            if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1943                return false;
1944            }
1945            if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1946                return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1947                    Some(&rt.inner.db),
1948                    entity,
1949                    filter,
1950                    queue,
1951                    queue,
1952                );
1953            }
1954            true
1955        })
1956        .into_iter()
1957        .filter_map(queue_message_view_from_entity)
1958        .map(|mut view| {
1959            view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
1960            view
1961        })
1962        .collect())
1963}
1964
1965fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1966    let (position, _) = match &entity.kind {
1967        EntityKind::QueueMessage { position, queue } => (*position, queue),
1968        _ => return None,
1969    };
1970    let data = match entity.data {
1971        EntityData::QueueMessage(data) => data,
1972        _ => return None,
1973    };
1974    Some(QueueMessageView {
1975        id: entity.id,
1976        position,
1977        priority: data.priority.unwrap_or(0),
1978        payload: data.payload,
1979        attempts: data.attempts,
1980        max_attempts: data.max_attempts,
1981        enqueued_at_ns: data.enqueued_at_ns,
1982        available_at_ns: None,
1983    })
1984}
1985
1986/// Insert a moved payload onto `queue` using only the payload value —
1987/// priority / attempts / TTL fall back to the destination queue's
1988/// catalog config (mirrors a fresh enqueue rather than carrying source
1989/// metadata over). Used by `PrimaryQueueStore::move_to_queue`, the
1990/// `QueueLifecycle::move_between_queues` adapter that owns only
1991/// `(message_id, payload)` after `pop_messages` retires the source row.
1992pub(super) fn insert_moved_queue_message_payload(
1993    store: &UnifiedStore,
1994    queue: &str,
1995    payload: &Value,
1996) -> RedDBResult<EntityId> {
1997    let config = load_queue_config(store, queue);
1998    let position = next_queue_position(store, queue, QueueSide::Right)?;
1999    let enqueued_at_ns = std::time::SystemTime::now()
2000        .duration_since(std::time::UNIX_EPOCH)
2001        .map(|d| d.as_nanos() as u64)
2002        .unwrap_or(0);
2003    let entity = UnifiedEntity::new(
2004        EntityId::new(0),
2005        EntityKind::QueueMessage {
2006            queue: queue.to_string(),
2007            position,
2008        },
2009        EntityData::QueueMessage(QueueMessageData {
2010            payload: payload.clone(),
2011            priority: None,
2012            enqueued_at_ns,
2013            attempts: 0,
2014            max_attempts: config.max_attempts,
2015            acked: false,
2016        }),
2017    );
2018    let id = store
2019        .insert_auto(queue, entity)
2020        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2021    if let Some(ttl_ms) = config.ttl_ms {
2022        store
2023            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2024            .map_err(|err| RedDBError::Internal(err.to_string()))?;
2025    }
2026    Ok(id)
2027}
2028
2029fn insert_moved_queue_message(
2030    store: &UnifiedStore,
2031    queue: &str,
2032    config: &QueueRuntimeConfig,
2033    message: &QueueMessageView,
2034) -> RedDBResult<EntityId> {
2035    let position = next_queue_position(store, queue, QueueSide::Right)?;
2036    let entity = UnifiedEntity::new(
2037        EntityId::new(0),
2038        EntityKind::QueueMessage {
2039            queue: queue.to_string(),
2040            position,
2041        },
2042        EntityData::QueueMessage(QueueMessageData {
2043            payload: message.payload.clone(),
2044            priority: if config.priority {
2045                Some(message.priority)
2046            } else {
2047                None
2048            },
2049            enqueued_at_ns: message.enqueued_at_ns,
2050            attempts: message.attempts,
2051            max_attempts: message.max_attempts,
2052            acked: false,
2053        }),
2054    );
2055    let id = store
2056        .insert_auto(queue, entity)
2057        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2058    if let Some(ttl_ms) = config.ttl_ms {
2059        store
2060            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2061            .map_err(|err| RedDBError::Internal(err.to_string()))?;
2062    }
2063    Ok(id)
2064}
2065
2066fn queue_projection_default_columns() -> Vec<String> {
2067    [
2068        "id",
2069        "payload",
2070        "priority",
2071        "attempts",
2072        "last_error",
2073        "enqueued_at",
2074        "available_at",
2075        "dlq",
2076        "tenant",
2077    ]
2078    .into_iter()
2079    .map(str::to_string)
2080    .collect()
2081}
2082
2083fn queue_projection_record(
2084    columns: &[String],
2085    message: &QueueMessageView,
2086    dlq: bool,
2087) -> RedDBResult<UnifiedRecord> {
2088    let mut record = UnifiedRecord::new();
2089    for column in columns {
2090        let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
2091            RedDBError::Query(format!("unknown queue projection column '{}'", column))
2092        })?;
2093        record.set(column, value);
2094    }
2095    Ok(record)
2096}
2097
2098fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
2099    match column {
2100        "id" => Some(Value::text(message_id_string(message.id))),
2101        "payload" => Some(message.payload.clone()),
2102        "priority" => Some(Value::Integer(i64::from(message.priority))),
2103        "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
2104        "last_error" => Some(Value::Null),
2105        "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
2106        "available_at" => Some(Value::UnsignedInteger(
2107            message.available_at_ns.unwrap_or(message.enqueued_at_ns),
2108        )),
2109        "dlq" => Some(Value::Boolean(dlq)),
2110        "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
2111        _ => None,
2112    }
2113}
2114
2115fn queue_message_tenant(payload: &Value) -> Option<Value> {
2116    let Value::Json(bytes) = payload else {
2117        return None;
2118    };
2119    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2120    json.get("tenant")
2121        .and_then(crate::json::Value::as_str)
2122        .map(|tenant| Value::text(tenant.to_string()))
2123}
2124
2125fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
2126    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2127        return false;
2128    };
2129    !manager
2130        .query_all(|entity| {
2131            entity.data.as_row().is_some_and(|row| {
2132                row_text(row, "kind").as_deref() == Some("queue_config")
2133                    && row_text(row, "dlq").as_deref() == Some(queue)
2134            })
2135        })
2136        .is_empty()
2137}
2138
2139fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
2140    match filter {
2141        Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
2142            .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
2143        Filter::CompareFields { left, op, right } => {
2144            match (
2145                queue_filter_field_value(message, dlq, left),
2146                queue_filter_field_value(message, dlq, right),
2147            ) {
2148                (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
2149                _ => false,
2150            }
2151        }
2152        Filter::And(left, right) => {
2153            queue_message_matches_filter(message, dlq, left)
2154                && queue_message_matches_filter(message, dlq, right)
2155        }
2156        Filter::Or(left, right) => {
2157            queue_message_matches_filter(message, dlq, left)
2158                || queue_message_matches_filter(message, dlq, right)
2159        }
2160        Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
2161        Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
2162            .is_none_or(|value| matches!(value, Value::Null)),
2163        Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
2164            .is_some_and(|value| !matches!(value, Value::Null)),
2165        Filter::In { field, values } => {
2166            queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
2167                values
2168                    .iter()
2169                    .any(|value| queue_values_equal(&candidate, value))
2170            })
2171        }
2172        Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
2173            .is_some_and(|candidate| {
2174                queue_compare_values(&candidate, low, CompareOp::Ge)
2175                    && queue_compare_values(&candidate, high, CompareOp::Le)
2176            }),
2177        Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
2178            .is_some_and(|value| queue_like_matches(&value, pattern)),
2179        Filter::StartsWith { field, prefix } => {
2180            queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
2181        }
2182        Filter::EndsWith { field, suffix } => {
2183            queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
2184        }
2185        Filter::Contains { field, substring } => {
2186            queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
2187        }
2188        Filter::CompareExpr { .. } => false,
2189    }
2190}
2191
2192fn queue_filter_field_value(
2193    message: &QueueMessageView,
2194    dlq: bool,
2195    field: &FieldRef,
2196) -> Option<Value> {
2197    match field {
2198        FieldRef::TableColumn { table, column } if table.is_empty() => {
2199            queue_projection_value(message, dlq, column)
2200                .or_else(|| queue_payload_field_value(&message.payload, column))
2201        }
2202        FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
2203            .or_else(|| queue_payload_field_value(&message.payload, column)),
2204        _ => None,
2205    }
2206}
2207
2208fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
2209    let Value::Json(bytes) = payload else {
2210        return None;
2211    };
2212    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2213    let value = json.get(field)?;
2214    json_value_to_schema_value(value)
2215}
2216
2217fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
2218    if matches!(value, crate::json::Value::Null) {
2219        Some(Value::Null)
2220    } else if let Some(value) = value.as_bool() {
2221        Some(Value::Boolean(value))
2222    } else if let Some(value) = value.as_i64() {
2223        Some(Value::Integer(value))
2224    } else if let Some(value) = value.as_u64() {
2225        Some(Value::UnsignedInteger(value))
2226    } else if let Some(value) = value.as_f64() {
2227        Some(Value::Float(value))
2228    } else if let Some(value) = value.as_str() {
2229        Some(Value::text(value.to_string()))
2230    } else {
2231        Some(Value::Json(value.to_string_compact().into_bytes()))
2232    }
2233}
2234
2235fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
2236    queue_filter_field_value(message, dlq, field).and_then(|value| match value {
2237        Value::Text(value) => Some(value.to_string()),
2238        Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
2239        Value::Integer(value) => Some(value.to_string()),
2240        Value::UnsignedInteger(value) => Some(value.to_string()),
2241        Value::Float(value) => Some(value.to_string()),
2242        Value::Boolean(value) => Some(value.to_string()),
2243        _ => None,
2244    })
2245}
2246
2247fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
2248    match op {
2249        CompareOp::Eq => queue_values_equal(left, right),
2250        CompareOp::Ne => !queue_values_equal(left, right),
2251        CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
2252        CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
2253        CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
2254        CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
2255    }
2256}
2257
2258fn queue_values_equal(left: &Value, right: &Value) -> bool {
2259    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2260        return (left - right).abs() < f64::EPSILON;
2261    }
2262    match (left, right) {
2263        (Value::Text(left), Value::Text(right)) => left == right,
2264        (Value::Boolean(left), Value::Boolean(right)) => left == right,
2265        _ => left == right,
2266    }
2267}
2268
2269fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
2270    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2271        return left.partial_cmp(&right);
2272    }
2273    match (left, right) {
2274        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
2275        _ => None,
2276    }
2277}
2278
2279fn queue_value_number(value: &Value) -> Option<f64> {
2280    match value {
2281        Value::Integer(value) => Some(*value as f64),
2282        Value::UnsignedInteger(value) => Some(*value as f64),
2283        Value::Float(value) => Some(*value),
2284        Value::Text(value) => value.parse().ok(),
2285        _ => None,
2286    }
2287}
2288
2289fn queue_like_matches(value: &str, pattern: &str) -> bool {
2290    if pattern == "%" {
2291        return true;
2292    }
2293    let starts_wild = pattern.starts_with('%');
2294    let ends_wild = pattern.ends_with('%');
2295    let needle = pattern.trim_matches('%');
2296    match (starts_wild, ends_wild) {
2297        (true, true) => value.contains(needle),
2298        (true, false) => value.ends_with(needle),
2299        (false, true) => value.starts_with(needle),
2300        (false, false) => value == needle,
2301    }
2302}
2303
2304pub(super) fn queue_message_view_by_id(
2305    store: &UnifiedStore,
2306    queue: &str,
2307    message_id: EntityId,
2308) -> RedDBResult<Option<QueueMessageView>> {
2309    let manager = queue_manager(store, queue)?;
2310    Ok(manager
2311        .get(message_id)
2312        .and_then(queue_message_view_from_entity)
2313        .map(|mut view| {
2314            view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2315            view
2316        }))
2317}
2318
2319pub(super) fn sort_queue_messages(
2320    messages: &mut [QueueMessageView],
2321    config: &QueueRuntimeConfig,
2322    side: QueueSide,
2323) {
2324    messages.sort_by(|left, right| {
2325        if config.priority {
2326            right
2327                .priority
2328                .cmp(&left.priority)
2329                .then_with(|| match side {
2330                    QueueSide::Left => left.position.cmp(&right.position),
2331                    QueueSide::Right => right.position.cmp(&left.position),
2332                })
2333                .then_with(|| left.id.raw().cmp(&right.id.raw()))
2334        } else {
2335            match side {
2336                QueueSide::Left => left.position.cmp(&right.position),
2337                QueueSide::Right => right.position.cmp(&left.position),
2338            }
2339            .then_with(|| left.id.raw().cmp(&right.id.raw()))
2340        }
2341    });
2342}
2343
2344pub(super) fn next_queue_position(
2345    store: &UnifiedStore,
2346    queue: &str,
2347    side: QueueSide,
2348) -> RedDBResult<u64> {
2349    let messages = load_queue_message_views(store, queue)?;
2350    if messages.is_empty() {
2351        return Ok(QUEUE_POSITION_CENTER);
2352    }
2353    match side {
2354        QueueSide::Left => Ok(messages
2355            .iter()
2356            .map(|message| message.position)
2357            .min()
2358            .unwrap_or(QUEUE_POSITION_CENTER)
2359            .saturating_sub(1)),
2360        QueueSide::Right => Ok(messages
2361            .iter()
2362            .map(|message| message.position)
2363            .max()
2364            .unwrap_or(QUEUE_POSITION_CENTER)
2365            .saturating_add(1)),
2366    }
2367}
2368
2369pub(super) fn increment_queue_attempts(
2370    store: &UnifiedStore,
2371    queue: &str,
2372    message_id: EntityId,
2373) -> RedDBResult<u32> {
2374    let manager = queue_manager(store, queue)?;
2375    let mut entity = manager
2376        .get(message_id)
2377        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2378    match &mut entity.data {
2379        EntityData::QueueMessage(message) => {
2380            message.attempts = message.attempts.saturating_add(1);
2381            let attempts = message.attempts;
2382            manager
2383                .update(entity)
2384                .map_err(|err| RedDBError::Internal(err.to_string()))?;
2385            Ok(attempts)
2386        }
2387        _ => Err(RedDBError::Query(format!(
2388            "entity '{}' is not a queue message",
2389            message_id.raw()
2390        ))),
2391    }
2392}
2393
2394pub(super) fn queue_message_attempts(
2395    store: &UnifiedStore,
2396    queue: &str,
2397    message_id: EntityId,
2398) -> RedDBResult<u32> {
2399    Ok(queue_message_data(store, queue, message_id)?.attempts)
2400}
2401
2402pub(super) fn queue_message_max_attempts(
2403    store: &UnifiedStore,
2404    queue: &str,
2405    message_id: EntityId,
2406) -> RedDBResult<u32> {
2407    Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2408}
2409
2410pub(super) fn queue_message_payload(
2411    store: &UnifiedStore,
2412    queue: &str,
2413    message_id: EntityId,
2414) -> RedDBResult<Value> {
2415    Ok(queue_message_data(store, queue, message_id)?.payload)
2416}
2417
2418pub(super) fn queue_message_pending_any(
2419    store: &UnifiedStore,
2420    queue: &str,
2421    message_id: EntityId,
2422) -> RedDBResult<bool> {
2423    Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2424}
2425
2426pub(super) fn queue_message_pending_for_group(
2427    store: &UnifiedStore,
2428    queue: &str,
2429    group: &str,
2430    message_id: EntityId,
2431) -> RedDBResult<bool> {
2432    Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2433}
2434
2435pub(super) fn queue_message_acked_for_group(
2436    store: &UnifiedStore,
2437    queue: &str,
2438    group: &str,
2439    message_id: EntityId,
2440) -> RedDBResult<bool> {
2441    Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2442}
2443
2444fn queue_manager(
2445    store: &UnifiedStore,
2446    queue: &str,
2447) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2448    store
2449        .get_collection(queue)
2450        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2451}
2452
2453pub(super) fn queue_message_data(
2454    store: &UnifiedStore,
2455    queue: &str,
2456    message_id: EntityId,
2457) -> RedDBResult<QueueMessageData> {
2458    let manager = queue_manager(store, queue)?;
2459    let entity = manager
2460        .get(message_id)
2461        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2462    match entity.data {
2463        EntityData::QueueMessage(message) => Ok(message),
2464        _ => Err(RedDBError::Query(format!(
2465            "entity '{}' is not a queue message",
2466            message_id.raw()
2467        ))),
2468    }
2469}
2470
2471fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2472    let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2473    store
2474        .insert_auto(
2475            QUEUE_META_COLLECTION,
2476            UnifiedEntity::new(
2477                EntityId::new(0),
2478                EntityKind::TableRow {
2479                    table: Arc::from(QUEUE_META_COLLECTION),
2480                    row_id: 0,
2481                },
2482                EntityData::Row(RowData {
2483                    columns: Vec::new(),
2484                    named: Some(fields),
2485                    schema: None,
2486                }),
2487            ),
2488        )
2489        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2490    Ok(())
2491}
2492
2493pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2494    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2495        return;
2496    };
2497    let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2498    for row in rows {
2499        let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2500    }
2501}
2502
2503pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2504    let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2505}
2506
2507fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2508    format!("{queue}:{}", message_id.raw())
2509}
2510
2511pub(super) fn queue_message_lock_handle(
2512    runtime: &RedDBRuntime,
2513    queue: &str,
2514    message_id: EntityId,
2515) -> Arc<parking_lot::Mutex<()>> {
2516    let key = queue_message_lock_key(queue, message_id);
2517    if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2518        return lock;
2519    }
2520
2521    let mut locks = runtime.inner.queue_message_locks.write();
2522    locks
2523        .entry(key)
2524        .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2525        .clone()
2526}
2527
2528pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2529    runtime
2530        .inner
2531        .queue_message_locks
2532        .write()
2533        .remove(&queue_message_lock_key(queue, message_id));
2534}
2535
2536fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2537    let raw = value.strip_prefix('e').unwrap_or(value);
2538    raw.parse::<u64>()
2539        .map(EntityId::new)
2540        .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2541}
2542
2543/// ADR 0026: resolve the ACK/NACK handle. When `delivery_id` is supplied,
2544/// it wins unconditionally — strict failure if the handle does not resolve
2545/// to a live pending delivery on `queue`. When only the legacy tuple is
2546/// supplied, emit a rate-limited deprecation log line and use the tuple.
2547/// At least one handle must be present.
2548pub(super) fn resolve_ack_nack_handle(
2549    store: &UnifiedStore,
2550    queue: &str,
2551    group_hint: &str,
2552    message_id_hint: &str,
2553    delivery_id: Option<&str>,
2554) -> RedDBResult<(String, EntityId)> {
2555    if let Some(did) = delivery_id {
2556        return resolve_delivery_id(store, queue, did);
2557    }
2558    if group_hint.is_empty() || message_id_hint.is_empty() {
2559        return Err(RedDBError::Query(
2560            "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2561                .to_string(),
2562        ));
2563    }
2564    log_tuple_deprecation(queue);
2565    let entity = parse_message_id(message_id_hint)?;
2566    Ok((group_hint.to_string(), entity))
2567}
2568
2569fn resolve_delivery_id(
2570    store: &UnifiedStore,
2571    queue: &str,
2572    delivery_id: &str,
2573) -> RedDBResult<(String, EntityId)> {
2574    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2575        return Err(RedDBError::Query(format!(
2576            "delivery_id '{}' does not resolve to a live pending delivery",
2577            delivery_id
2578        )));
2579    };
2580    for entity in manager.query_all(|entity| {
2581        entity.data.as_row().is_some_and(|row| {
2582            row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2583                && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2584        })
2585    }) {
2586        if let Some(row) = entity.data.as_row() {
2587            let row_queue = row_text(row, "queue").unwrap_or_default();
2588            let row_group = row_text(row, "group").unwrap_or_default();
2589            let row_message = row_u64(row, "message_id").unwrap_or(0);
2590            if row_queue != queue {
2591                return Err(RedDBError::Query(format!(
2592                    "delivery_id '{}' belongs to queue '{}', not '{}'",
2593                    delivery_id, row_queue, queue
2594                )));
2595            }
2596            return Ok((row_group, EntityId::new(row_message)));
2597        }
2598    }
2599    Err(RedDBError::Query(format!(
2600        "delivery_id '{}' does not resolve to a live pending delivery",
2601        delivery_id
2602    )))
2603}
2604
2605/// Per-(connection, queue) rate-limited "tuple ACK is deprecated" log line.
2606/// One emission per minute matches ADR 0026's operational guidance.
2607fn log_tuple_deprecation(queue: &str) {
2608    use std::sync::atomic::Ordering;
2609    use std::sync::{Mutex, OnceLock};
2610    use std::time::Instant;
2611
2612    static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2613    const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2614
2615    let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2616    let key = (super::impl_core::current_connection_id(), queue.to_string());
2617    let now = Instant::now();
2618    let mut guard = match map.lock() {
2619        Ok(g) => g,
2620        Err(_) => return,
2621    };
2622    let should_emit =
2623        !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2624    if should_emit {
2625        guard.insert(key.clone(), now);
2626        drop(guard);
2627        TUPLE_DEPRECATION_EMITS.fetch_add(1, Ordering::Relaxed);
2628        tracing::warn!(
2629            target: "reddb::queue_lifecycle",
2630            queue = queue,
2631            connection_id = key.0,
2632            "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2633             switch to the server-issued delivery_id (ADR 0026). \
2634             The tuple path will be removed one minor release after introduction.",
2635        );
2636    }
2637}
2638
2639/// Total count of tuple-deprecation log emissions since process start.
2640/// Intentionally process-wide and `pub` so the transport-bridge
2641/// integration tests can observe that the legacy tuple path emitted a
2642/// deprecation while the `delivery_id` path stayed silent, without
2643/// having to plumb a `tracing::Subscriber` through every test.
2644pub static TUPLE_DEPRECATION_EMITS: std::sync::atomic::AtomicU64 =
2645    std::sync::atomic::AtomicU64::new(0);
2646
2647fn message_id_string(message_id: EntityId) -> String {
2648    message_id.raw().to_string()
2649}
2650
2651/// Slice 10 of issue #527 — render-time scan of pending entries
2652/// per (queue, group) for `queue_pending_gauge` exposition. Walks
2653/// `red_queue_meta` live so the gauge cannot drift from the source
2654/// of truth.
2655pub(crate) fn pending_counts_by_group(
2656    store: &UnifiedStore,
2657) -> std::collections::BTreeMap<(String, String), u64> {
2658    let mut counts: std::collections::BTreeMap<(String, String), u64> =
2659        std::collections::BTreeMap::new();
2660    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2661        return counts;
2662    };
2663    for entity in manager.query_all(|entity| {
2664        entity
2665            .data
2666            .as_row()
2667            .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2668    }) {
2669        if let Some(row) = entity.data.as_row() {
2670            let queue = row_text(row, "queue");
2671            let group = row_text(row, "group");
2672            if let (Some(q), Some(g)) = (queue, group) {
2673                *counts.entry((q, g)).or_insert(0) += 1;
2674            }
2675        }
2676    }
2677    counts
2678}
2679
2680pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2681    match row.get_field(field)?.clone() {
2682        Value::Text(value) => Some(value.to_string()),
2683        Value::NodeRef(value) => Some(value),
2684        Value::EdgeRef(value) => Some(value),
2685        Value::TableRef(value) => Some(value),
2686        _ => None,
2687    }
2688}
2689
2690pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2691    match row.get_field(field)?.clone() {
2692        Value::UnsignedInteger(value) => Some(value),
2693        Value::Integer(value) if value >= 0 => Some(value as u64),
2694        Value::Float(value) if value >= 0.0 => Some(value as u64),
2695        Value::Text(value) => value.parse().ok(),
2696        _ => None,
2697    }
2698}
2699
2700fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2701    match row.get_field(field)?.clone() {
2702        Value::Boolean(value) => Some(value),
2703        Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2704            "true" => Some(true),
2705            "false" => Some(false),
2706            _ => None,
2707        },
2708        _ => None,
2709    }
2710}
2711
2712fn queue_collection_contract(
2713    name: &str,
2714    priority: bool,
2715    ttl_ms: Option<u64>,
2716) -> crate::physical::CollectionContract {
2717    let now = current_unix_ms();
2718    let mut context_index_fields = Vec::new();
2719    if priority {
2720        context_index_fields.push("priority".to_string());
2721    }
2722
2723    crate::physical::CollectionContract {
2724        name: name.to_string(),
2725        declared_model: crate::catalog::CollectionModel::Queue,
2726        schema_mode: crate::catalog::SchemaMode::Dynamic,
2727        origin: crate::physical::ContractOrigin::Explicit,
2728        version: 1,
2729        created_at_unix_ms: now,
2730        updated_at_unix_ms: now,
2731        default_ttl_ms: ttl_ms,
2732        vector_dimension: None,
2733        vector_metric: None,
2734        context_index_fields,
2735        declared_columns: Vec::new(),
2736        table_def: None,
2737        timestamps_enabled: false,
2738        context_index_enabled: false,
2739        metrics_raw_retention_ms: None,
2740        metrics_rollup_policies: Vec::new(),
2741        metrics_tenant_identity: None,
2742        metrics_namespace: None,
2743        // Queues manipulate messages via push/pop/ack — the row DML
2744        // paths never apply. Flag it as append_only so inadvertent
2745        // `UPDATE/DELETE FROM queue_name` statements fail loudly.
2746        append_only: true,
2747        subscriptions: Vec::new(),
2748        analytics_config: Vec::new(),
2749        session_key: None,
2750        session_gap_ms: None,
2751        retention_duration_ms: None,
2752    }
2753}
2754
2755fn current_unix_ms() -> u128 {
2756    std::time::SystemTime::now()
2757        .duration_since(std::time::UNIX_EPOCH)
2758        .unwrap_or_default()
2759        .as_millis()
2760}
2761
2762pub(super) fn now_ns() -> u64 {
2763    std::time::SystemTime::now()
2764        .duration_since(std::time::UNIX_EPOCH)
2765        .unwrap_or_default()
2766        .as_nanos() as u64
2767}
2768
2769pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2770    queue_message_metadata(Some(ttl_ms), None)
2771}
2772
2773/// Build the per-message metadata row attached to a queue message. Both
2774/// fields are optional — `_ttl_ms` carries the per-message TTL (existing
2775/// behaviour) and `_available_at_ns` carries the first-delivery instant
2776/// for delayed messages (issue #722). When both are present they share
2777/// the same row, since `UnifiedStore::set_metadata` replaces the entry.
2778pub(super) fn queue_message_metadata(
2779    ttl_ms: Option<u64>,
2780    available_at_ns: Option<u64>,
2781) -> Metadata {
2782    let mut fields = HashMap::new();
2783    if let Some(ttl_ms) = ttl_ms {
2784        fields.insert(
2785            "_ttl_ms".to_string(),
2786            if ttl_ms <= i64::MAX as u64 {
2787                MetadataValue::Int(ttl_ms as i64)
2788            } else {
2789                MetadataValue::Timestamp(ttl_ms)
2790            },
2791        );
2792    }
2793    if let Some(at_ns) = available_at_ns {
2794        fields.insert(
2795            "_available_at_ns".to_string(),
2796            MetadataValue::Timestamp(at_ns),
2797        );
2798    }
2799    Metadata::with_fields(fields)
2800}
2801
2802/// Smallest future `available_at_ns` among messages currently sitting
2803/// on `queue`. Used by `QUEUE READ … WAIT` (issue #722) to cap the
2804/// condvar park horizon: without this, a waiter on a quiet queue with
2805/// only delayed messages would never wake when one became due, since
2806/// the wait registry is only notified by producer commits. Returns
2807/// `None` when no future-dated message exists (the common case — the
2808/// caller falls back to the user-supplied wait budget).
2809pub(super) fn earliest_future_available_at(store: &UnifiedStore, queue: &str) -> Option<u64> {
2810    let now_ns = now_ns();
2811    let views = load_queue_message_views_with_runtime(None, store, queue).ok()?;
2812    views
2813        .iter()
2814        .filter_map(|v| v.available_at_ns)
2815        .filter(|at| *at > now_ns)
2816        .min()
2817}
2818
2819/// Update the `_available_at_ns` metadata for a queue message in place
2820/// without dropping any `_ttl_ms` already present (issue #723 — used by
2821/// NACK retry delay). `available_at_ns = None` clears the field. The
2822/// `fallback_ttl_ms` argument is consulted only when the existing
2823/// metadata row carries no `_ttl_ms` — keeps the per-message TTL in
2824/// sync with the queue default in the common case where no per-message
2825/// TTL was set explicitly. Tolerant of missing collections / entities:
2826/// returns `Ok(())` rather than failing the surrounding NACK if the
2827/// underlying message has gone away between resolution and metadata
2828/// update (a benign race; the next delivery cycle reflects truth).
2829pub(super) fn set_message_available_at_ns(
2830    store: &UnifiedStore,
2831    queue: &str,
2832    message_id: EntityId,
2833    available_at_ns: Option<u64>,
2834    fallback_ttl_ms: Option<u64>,
2835) -> RedDBResult<()> {
2836    let existing_ttl_ms = store
2837        .get_metadata(queue, message_id)
2838        .and_then(|md| match md.get("_ttl_ms")? {
2839            MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
2840            MetadataValue::Timestamp(t) => Some(*t),
2841            _ => None,
2842        })
2843        .or(fallback_ttl_ms);
2844    let metadata = queue_message_metadata(existing_ttl_ms, available_at_ns);
2845    match store.set_metadata(queue, message_id, metadata) {
2846        Ok(()) => Ok(()),
2847        Err(crate::storage::StoreError::CollectionNotFound(_)) => Ok(()),
2848        Err(err) => Err(RedDBError::Internal(err.to_string())),
2849    }
2850}
2851
2852/// Read the `_available_at_ns` metadata for a queue message. Returns
2853/// `None` for messages with no delay (the common case) or when the
2854/// metadata row is missing entirely.
2855pub(super) fn read_message_available_at_ns(
2856    store: &UnifiedStore,
2857    queue: &str,
2858    message_id: EntityId,
2859) -> Option<u64> {
2860    let md = store.get_metadata(queue, message_id)?;
2861    match md.get("_available_at_ns")? {
2862        MetadataValue::Timestamp(t) => Some(*t),
2863        MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
2864        _ => None,
2865    }
2866}
2867
2868/// Rough payload byte estimate for outbox watermark tracking.
2869fn estimate_payload_bytes(payload: &Value) -> u64 {
2870    match payload {
2871        Value::Json(v) => v.len() as u64,
2872        Value::Text(s) => s.len() as u64,
2873        _ => 64,
2874    }
2875}
2876
2877#[cfg(test)]
2878mod presence_integration_tests {
2879    use super::*;
2880    use crate::storage::queue::presence::{PresenceState, DEFAULT_PRESENCE_TTL_MS};
2881    use crate::{RedDBOptions, RedDBRuntime};
2882
2883    /// Issue #742 acceptance: `QUEUE READ` must register and refresh
2884    /// consumer presence. The snapshot exposes the same `(queue, group,
2885    /// consumer)` triple the read named, with `state = Active`.
2886    #[test]
2887    fn queue_read_emits_consumer_presence_heartbeat() {
2888        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2889        rt.execute_query("CREATE QUEUE tasks").unwrap();
2890        rt.execute_query("QUEUE GROUP CREATE tasks workers")
2891            .unwrap();
2892        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2893        rt.execute_query("QUEUE READ tasks GROUP workers CONSUMER w1")
2894            .unwrap();
2895
2896        let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
2897        assert_eq!(snap.len(), 1, "exactly one heartbeat recorded");
2898        let row = &snap[0];
2899        assert_eq!(row.queue, "tasks");
2900        assert_eq!(row.group, "workers");
2901        assert_eq!(row.consumer, "w1");
2902        assert_eq!(row.state, PresenceState::Active);
2903
2904        let counts = rt.queue_active_consumer_counts(DEFAULT_PRESENCE_TTL_MS);
2905        assert_eq!(
2906            counts[&("tasks".to_string(), "workers".to_string())],
2907            1,
2908            "active count reflects the live consumer"
2909        );
2910    }
2911
2912    /// Issue #742 acceptance: a read that returns no messages must
2913    /// still heartbeat — aliveness is independent of pending
2914    /// deliveries.
2915    #[test]
2916    fn empty_queue_read_still_heartbeats() {
2917        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2918        rt.execute_query("CREATE QUEUE empty_q").unwrap();
2919        rt.execute_query("QUEUE GROUP CREATE empty_q workers")
2920            .unwrap();
2921        // No PUSH — the queue is empty.
2922        rt.execute_query("QUEUE READ empty_q GROUP workers CONSUMER w1")
2923            .unwrap();
2924
2925        let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
2926        assert_eq!(
2927            snap.len(),
2928            1,
2929            "empty read still registers consumer presence"
2930        );
2931        assert_eq!(snap[0].state, PresenceState::Active);
2932        assert_eq!(
2933            snap[0].lease_count, 0,
2934            "no messages delivered → zero leases"
2935        );
2936    }
2937}