Skip to main content

reddb_server/runtime/
impl_queue.rs

1//! Queue DDL and command execution
2
3use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::runtime::impl_core::{
9    clear_current_auth_identity, clear_current_tenant, current_auth_identity, current_tenant,
10    set_current_auth_identity, set_current_tenant,
11};
12use crate::runtime::queue_telemetry::NackOutcomeLabel;
13use crate::storage::queue::QueueMode;
14use crate::storage::unified::entity::{QueueMessageData, RowData};
15use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
16use crate::telemetry::operator_event::OperatorEvent;
17
18use super::*;
19
20use super::primary_queue_store::PrimaryQueueStore;
21use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
22use crate::storage::queue::lifecycle::{
23    QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
24};
25
26/// Build a [`QueueLifecycle`] backed by a fresh [`PrimaryQueueStore`] for
27/// the given `queue`, plus a `QueueTxn` bound to the live runtime
28/// connection. The store inside the lifecycle and the standalone
29/// [`PrimaryQueueStore`] returned for ack/nack lookups share the same
30/// underlying [`UnifiedStore`] — calls against either are observable on
31/// the other.
32pub(super) fn runtime_lifecycle(
33    runtime: &RedDBRuntime,
34    queue: &str,
35) -> (
36    QueueLifecycle<PrimaryQueueStore>,
37    PrimaryQueueStore,
38    QueueTxn,
39) {
40    let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
41    let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
42    let txn = primary_for_lifecycle.new_txn();
43    let cfg = primary_for_lifecycle.lifecycle_config(queue);
44    (
45        QueueLifecycle::new(primary_for_lifecycle, cfg)
46            .with_telemetry(Arc::clone(&runtime.inner.queue_telemetry)),
47        primary_for_lookup,
48        txn,
49    )
50}
51
52/// Slice C of PRD #718 — error surfaced to callers when the wait
53/// registry is cancelled (server shutdown) while a `QUEUE READ … WAIT`
54/// is parked. Kept as a plain `RedDBError::Query` so transports
55/// inherit the message unchanged — there is no separate `Cancelled`
56/// variant on the public error today.
57pub(crate) const QUEUE_READ_WAIT_CANCELLED: &str =
58    "QUEUE READ WAIT cancelled — server shutting down";
59
60/// Slice B of PRD #718 — `red.config` key naming the maximum WAIT
61/// budget the runtime will honour. Values above the cap are rejected
62/// before any waiter is registered.
63pub(crate) const QUEUE_MAX_WAIT_MS_CONFIG_KEY: &str = "red.config.queue.max_wait_ms";
64
65/// Default cap when the operator has not set
66/// [`QUEUE_MAX_WAIT_MS_CONFIG_KEY`] — 60 seconds, in milliseconds.
67pub(crate) const QUEUE_MAX_WAIT_MS_DEFAULT: u64 = 60_000;
68
69/// Outcome of the async live queue-wait edge ([`RedDBRuntime::redwire_queue_wait_json`],
70/// issue #919). Carries the three non-error terminal states the RedWire
71/// session maps to distinct frames, so a timeout never aliases an empty
72/// delivery and a cancellation never aliases a timeout:
73///   - `Delivered` → one `QueueEventPush` per message.
74///   - `TimedOut`  → a distinct `QueueWaitTimeout` frame.
75///   - `Cancelled` → a `StreamError` with the cancellation code.
76///
77/// A genuine runtime failure (bad queue, read error) stays an `Err` on
78/// the surrounding `RedDBResult`.
79#[derive(Debug)]
80pub(crate) enum RedwireWaitOutcome {
81    Delivered(Vec<crate::serde_json::Value>),
82    TimedOut,
83    Cancelled,
84}
85
86/// Slice C of PRD #718 — scope key for the queue wait registry.
87/// Today every connection in the process shares a single namespace;
88/// the helper exists so multi-tenant scoping (e.g. tenant id) can be
89/// threaded through later without touching every call site.
90pub(super) fn queue_wait_scope() -> String {
91    crate::runtime::impl_core::current_tenant().unwrap_or_default()
92}
93
94fn with_redwire_wait_context<T>(
95    auth_identity: &Option<(String, crate::auth::Role)>,
96    tenant: &Option<String>,
97    f: impl FnOnce() -> T,
98) -> T {
99    let previous_auth = current_auth_identity();
100    let previous_tenant = current_tenant();
101    match tenant {
102        Some(t) => set_current_tenant(t.clone()),
103        None => clear_current_tenant(),
104    }
105    match auth_identity {
106        Some((username, role)) => set_current_auth_identity(username.clone(), *role),
107        None => clear_current_auth_identity(),
108    }
109    let result = f();
110    match previous_tenant {
111        Some(t) => set_current_tenant(t),
112        None => clear_current_tenant(),
113    }
114    match previous_auth {
115        Some((username, role)) => set_current_auth_identity(username, role),
116        None => clear_current_auth_identity(),
117    }
118    result
119}
120
121/// Convert a lifecycle `QueueSide` view into the AST flavour we accept
122/// from `QueueCommand` callers. Both enums are isomorphic but live in
123/// different modules.
124fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
125    use crate::storage::query::ast::QueueSide as Ast;
126    match side {
127        Ast::Left => LcQueueSide::Left,
128        Ast::Right => LcQueueSide::Right,
129    }
130}
131
132/// Map a `QueueStoreError` (returned by lifecycle methods) onto the
133/// runtime-facing `RedDBError`.
134fn map_qse(err: QueueStoreError) -> RedDBError {
135    match err {
136        QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
137            "delivery_id '{id}' does not resolve to a live pending delivery"
138        )),
139        QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
140        QueueStoreError::ReplicaImmutable => {
141            RedDBError::Internal("replica QueueStore is immutable".to_string())
142        }
143    }
144}
145
146// ---------------------------------------------------------------------------
147// Outbox metrics (exposed via /metrics)
148// ---------------------------------------------------------------------------
149
150/// Total event push attempts that failed (queue full or other error) and
151/// triggered DLQ routing.
152pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
153
154/// Total events routed to the dead-letter queue.
155pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
156
157/// Total events successfully enqueued to their target queue.
158pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
159
160/// Warn when total estimated outbox payload bytes exceed this value (1 GiB).
161const OUTBOX_WARN_BYTES: u64 = 1 << 30;
162
163/// Route all new events to DLQ when estimated outbox exceeds this value (10 GiB).
164const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
165
166/// Running estimate of bytes pending in event queues (approximate; not decremented on consume).
167static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
168
169const QUEUE_META_COLLECTION: &str = "red_queue_meta";
170const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
171const WORK_DEFAULT_GROUP: &str = "_work_default";
172const FANOUT_GROUP_PREFIX: &str = "_fanout_";
173
174#[derive(Debug, Clone)]
175pub(super) struct QueueRuntimeConfig {
176    pub(super) mode: QueueMode,
177    pub(super) priority: bool,
178    pub(super) max_size: Option<usize>,
179    pub(super) ttl_ms: Option<u64>,
180    pub(super) dlq: Option<String>,
181    pub(super) max_attempts: u32,
182    pub(super) lock_deadline_ms: u64,
183    pub(super) in_flight_cap_per_group: u32,
184    /// Default retry delay (issue #723) applied to NACK-requeued
185    /// messages before they become re-deliverable. `None` keeps the
186    /// pre-#723 immediate-requeue behaviour. Overridden per-failure by
187    /// an authorized `NACK ... WITH DELAY <duration>`.
188    pub(super) retry_delay_ms: Option<u64>,
189}
190
191#[derive(Debug, Clone)]
192struct QueueGroupEntry {
193    entity_id: EntityId,
194    group: String,
195}
196
197#[derive(Debug, Clone)]
198pub(super) struct QueuePendingEntry {
199    pub(super) entity_id: EntityId,
200    group: String,
201    pub(super) message_id: EntityId,
202    consumer: String,
203    pub(super) delivered_at_ns: u64,
204    pub(super) delivery_count: u32,
205}
206
207#[derive(Debug, Clone)]
208pub(super) struct QueueAckEntry {
209    entity_id: EntityId,
210    group: String,
211    pub(super) message_id: EntityId,
212}
213
214#[derive(Debug, Clone)]
215pub(super) struct QueueMessageView {
216    pub(super) id: EntityId,
217    position: u64,
218    priority: i32,
219    pub(super) payload: Value,
220    attempts: u32,
221    pub(super) max_attempts: u32,
222    enqueued_at_ns: u64,
223    /// First-delivery instant for delayed messages (issue #722). `None`
224    /// means immediate availability. Sourced from the
225    /// `_available_at_ns` metadata field, populated on push.
226    pub(super) available_at_ns: Option<u64>,
227}
228
229impl QueueMessageView {
230    /// Whether this message is currently deliverable. Messages whose
231    /// `available_at_ns` lies in the future remain durable and
232    /// inspectable but are filtered out of `QUEUE READ` / `QUEUE POP`
233    /// projections.
234    pub(super) fn is_available_now(&self) -> bool {
235        match self.available_at_ns {
236            Some(at) => at <= now_ns(),
237            None => true,
238        }
239    }
240}
241
242impl RedDBRuntime {
243    /// Slice C of PRD #718 — non-blocking `group_read` plus optional
244    /// `WAIT <duration>` retry. When `wait_ms` is `None` this is the
245    /// pre-slice-C synchronous read. When `Some`, an immediate empty
246    /// projection parks the caller on the shared
247    /// [`crate::runtime::queue_wait_registry::QueueWaitRegistry`] and
248    /// retries on wake until the deadline. Timeout returns an empty
249    /// projection (zero records, no error). Shutdown cancellation
250    /// returns [`QUEUE_READ_WAIT_CANCELLED`].
251    pub(super) fn group_read_with_optional_wait(
252        &self,
253        queue: &str,
254        group: &str,
255        consumer: &str,
256        count: usize,
257        wait_ms: Option<u64>,
258    ) -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
259        let do_read =
260            |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
261                // #1371 — serialize concurrent group consumers on this queue so
262                // a single available message is claimed (mark_pending) and
263                // returned by exactly one consumer. Without this, woken
264                // competing waiters all observe the message as available and
265                // each delivers it (double-delivery across the wake-all).
266                let read_lock = runtime
267                    .inner
268                    .rmw_locks
269                    .lock_for(queue, "__queue_group_read__");
270                let _read_guard = read_lock.lock();
271                let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
272                lifecycle
273                    .group_read(&txn, queue, group, consumer, count)
274                    .map_err(map_qse)
275            };
276
277        let delivered = do_read(self)?;
278        let Some(wait_ms) = wait_ms else {
279            return Ok(delivered);
280        };
281        if !delivered.is_empty() {
282            return Ok(delivered);
283        }
284        // Empty under WAIT: park on the registry. WAIT 0 collapses to
285        // a single re-probe of the registry's current state — useful
286        // for tests but the timeout path returns immediately.
287        //
288        // Telemetry (slice D / PRD #718 / #729): we record exactly one
289        // `wait_started` increment at entry, and exactly one terminal
290        // outcome increment + histogram observation at exit, for the
291        // (scope, queue) labels. The histogram measures wall-clock
292        // started→resolved across all re-park iterations of this call.
293        let registry = self.queue_wait_registry();
294        let scope = queue_wait_scope();
295        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
296        let telemetry = self.queue_telemetry();
297        telemetry.record_wait_started(&scope, queue);
298        let wait_start = std::time::Instant::now();
299        let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
300            let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
301            telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
302        };
303        loop {
304            // Snapshot BEFORE the re-probe so a notify that fires
305            // between the probe and the park bumps the generation and
306            // wait_until returns Woken without ever blocking.
307            let snapshot = registry.snapshot(&scope, queue);
308            let delivered = do_read(self)?;
309            if !delivered.is_empty() {
310                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
311                return Ok(delivered);
312            }
313            if registry.is_cancelled() {
314                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
315                return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
316            }
317            if std::time::Instant::now() >= deadline {
318                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
319                return Ok(Vec::new());
320            }
321            // Issue #722: a delayed message becomes deliverable when its
322            // `available_at_ns` passes, but the registry only wakes on
323            // producer commits — a quiet queue with a future due-time
324            // would otherwise sit on the condvar until the user budget
325            // expired. Cap the park horizon at the soonest future
326            // `available_at_ns` so the next loop iteration probes the
327            // queue at-or-just-after the message becomes due. A
328            // `Timeout` from the capped park is not the final answer; we
329            // loop and re-probe before deciding the user budget is up.
330            let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
331                Some(at_ns) => {
332                    let now_ns = now_ns();
333                    if at_ns <= now_ns {
334                        // Already due; re-probe immediately.
335                        deadline.min(std::time::Instant::now())
336                    } else {
337                        let wait_ns = at_ns - now_ns;
338                        let due_instant =
339                            std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
340                        deadline.min(due_instant)
341                    }
342                }
343                None => deadline,
344            };
345            match registry.wait_until(&snapshot, park_deadline) {
346                crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
347                crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
348                    // If this was the user-supplied deadline, give up;
349                    // otherwise loop and re-probe (a delayed message may
350                    // have just become due).
351                    if std::time::Instant::now() >= deadline {
352                        observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
353                        return Ok(Vec::new());
354                    }
355                    continue;
356                }
357                crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
358                    observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
359                    return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
360                }
361            }
362        }
363    }
364
365    /// Issue #917 — async live-wait edge used by the RedWire session.
366    ///
367    /// Unlike [`group_read_with_optional_wait`](Self::group_read_with_optional_wait)
368    /// (the synchronous HTTP/condvar caller), this parks on the
369    /// registry's async wake head and never holds a blocking OS thread:
370    /// the awaiting tokio worker is released back to the runtime for the
371    /// wait duration. On every wake it re-probes the *normal* queue
372    /// delivery path (`group_read`), so a delivered message is genuinely
373    /// claimed, not merely observed. Returns the delivered messages
374    /// rendered as JSON values (so the transport edge stays free of
375    /// runtime queue types); [`RedwireWaitOutcome::TimedOut`] means the
376    /// deadline elapsed without a delivery (issue #919 surfaces this as
377    /// a distinct timeout frame rather than an empty push), and a
378    /// cancellation surfaces as [`RedwireWaitOutcome::Cancelled`] — the
379    /// async analogue of the sync path's [`QUEUE_READ_WAIT_CANCELLED`].
380    /// A genuine runtime failure stays an `Err`.
381    pub(crate) async fn redwire_queue_wait_json(
382        &self,
383        queue: &str,
384        group: Option<&str>,
385        consumer: &str,
386        count: usize,
387        wait_ms: u64,
388        auth_identity: Option<(String, crate::auth::Role)>,
389        tenant: Option<String>,
390    ) -> RedDBResult<RedwireWaitOutcome> {
391        let group_owned: RedDBResult<String> =
392            with_redwire_wait_context(&auth_identity, &tenant, || {
393                let expr = crate::storage::query::ast::QueryExpr::QueueCommand(
394                    crate::storage::query::ast::QueueCommand::GroupRead {
395                        queue: queue.to_string(),
396                        group: group.map(str::to_string),
397                        consumer: consumer.to_string(),
398                        count,
399                        wait_ms: Some(wait_ms),
400                    },
401                );
402                self.check_query_privilege(&expr)
403                    .map_err(RedDBError::Query)?;
404                let store = self.inner.db.store();
405                ensure_queue_exists(store.as_ref(), queue)?;
406                let config = load_queue_config(store.as_ref(), queue);
407                resolve_read_group(store.as_ref(), queue, group, consumer, &config)
408            });
409        let group_owned = group_owned?;
410        let group_ref = group_owned.as_str();
411
412        let do_read =
413            |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
414                with_redwire_wait_context(&auth_identity, &tenant, || {
415                    let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
416                    lifecycle
417                        .group_read(&txn, queue, group_ref, consumer, count)
418                        .map_err(map_qse)
419                })
420            };
421
422        let render = |delivered: Vec<crate::runtime::queue_lifecycle::DeliveredMessage>| {
423            RedwireWaitOutcome::Delivered(
424                delivered.into_iter().map(delivered_message_json).collect(),
425            )
426        };
427
428        // Fast path: a message is already deliverable at open time.
429        let delivered = do_read(self)?;
430        if !delivered.is_empty() {
431            return Ok(render(delivered));
432        }
433
434        let registry = self.queue_wait_registry();
435        let scope = with_redwire_wait_context(&auth_identity, &tenant, queue_wait_scope);
436        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
437        let telemetry = self.queue_telemetry();
438        telemetry.record_wait_started(&scope, queue);
439        let wait_start = std::time::Instant::now();
440        tracing::debug!(
441            target: "reddb::redwire::queue_wait",
442            queue,
443            group = group_ref,
444            consumer,
445            count,
446            wait_ms,
447            scope = scope.as_str(),
448            "redwire queue wait parked"
449        );
450        let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
451            let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
452            telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
453            tracing::debug!(
454                target: "reddb::redwire::queue_wait",
455                queue,
456                group = group_ref,
457                consumer,
458                count,
459                wait_ms,
460                scope = scope.as_str(),
461                outcome = outcome.as_str(),
462                duration_ms = elapsed_ms,
463                "redwire queue wait resolved"
464            );
465        };
466        loop {
467            // Register the async waiter (snapshot the generation) BEFORE
468            // the re-probe so a notify landing between probe and park is
469            // observed as a generation move rather than a lost wake.
470            let waiter = registry.async_waiter(&scope, queue);
471            let delivered = do_read(self)?;
472            if !delivered.is_empty() {
473                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
474                return Ok(render(delivered));
475            }
476            if registry.is_cancelled() {
477                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
478                return Ok(RedwireWaitOutcome::Cancelled);
479            }
480            if std::time::Instant::now() >= deadline {
481                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
482                return Ok(RedwireWaitOutcome::TimedOut);
483            }
484            let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
485                Some(at_ns) => {
486                    let now_ns = now_ns();
487                    if at_ns <= now_ns {
488                        deadline.min(std::time::Instant::now())
489                    } else {
490                        let wait_ns = at_ns - now_ns;
491                        let due_instant =
492                            std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
493                        deadline.min(due_instant)
494                    }
495                }
496                None => deadline,
497            };
498            // The async waiter (and its `Arc<Slot>` clone) is a local
499            // dropped on every return below, so an expired or cancelled
500            // wait releases its registry slot reference and frees the
501            // tokio worker the moment this future resolves (AC #4).
502            match registry.wait_until_async(&waiter, park_deadline).await {
503                crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
504                crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
505                    if std::time::Instant::now() >= deadline {
506                        observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
507                        return Ok(RedwireWaitOutcome::TimedOut);
508                    }
509                    continue;
510                }
511                crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
512                    observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
513                    return Ok(RedwireWaitOutcome::Cancelled);
514                }
515            }
516        }
517    }
518
519    /// Reject a live queue-wait open whose requested budget exceeds the
520    /// server's maximum wait cap (issue #919), mirroring the SQL
521    /// `QUEUE READ … WAIT` cap (slice B of PRD #718). Returns the
522    /// operator-actionable message (naming the `red.config` key and the
523    /// active cap) when `wait_ms` is over the cap, or `Ok(())` to
524    /// proceed. The transport calls this *before* spawning the wait
525    /// task, so an over-cap request is refused with an explicit error
526    /// and never parks — not silently shortened (AC #3).
527    pub(crate) fn redwire_queue_wait_cap_check(&self, wait_ms: u64) -> Result<(), String> {
528        let cap = self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
529        if wait_ms > cap {
530            Err(format!(
531                "queue-wait WAIT {wait_ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
532            ))
533        } else {
534            Ok(())
535        }
536    }
537
538    pub(crate) fn enqueue_event_payload(
539        &self,
540        queue: &str,
541        payload: Value,
542    ) -> RedDBResult<EntityId> {
543        let store = self.inner.db.store();
544        // Auto-create the queue if it does not exist yet.
545        if store.get_collection(queue).is_none() {
546            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
547        }
548
549        // Estimate payload bytes for outbox watermark checks.
550        let payload_bytes = estimate_payload_bytes(&payload);
551        let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
552
553        // Hard limit: route directly to DLQ without even trying.
554        if outbox_bytes > OUTBOX_MAX_BYTES {
555            OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
556            EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
557            return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
558        }
559
560        // Soft limit: warn once per crossing.
561        if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
562            tracing::warn!(
563                outbox_bytes,
564                warn_threshold = OUTBOX_WARN_BYTES,
565                "event outbox approaching capacity warning threshold"
566            );
567            crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
568                queue: queue.to_string(),
569                dlq: format!("{queue}_outbox_dlq"),
570                reason: "outbox_warn_bytes_exceeded".to_string(),
571            }
572            .emit_global();
573        }
574
575        let config = load_queue_config(store.as_ref(), queue);
576
577        // If the target queue has a max_size and is full, route to DLQ.
578        if let Some(max_size) = config.max_size {
579            let current_len = load_queue_message_views(store.as_ref(), queue)
580                .unwrap_or_default()
581                .len();
582            if current_len >= max_size {
583                OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
584                EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
585                return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
586            }
587            // Warn at 80% capacity.
588            if current_len * 10 >= max_size * 8 {
589                tracing::warn!(
590                    queue = %queue,
591                    size = current_len,
592                    max = max_size,
593                    "event target queue near capacity"
594                );
595            }
596        }
597
598        let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
599        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
600        Ok(id)
601    }
602
603    /// Route a failed event to `<queue>_outbox_dlq`, auto-creating it if needed.
604    fn route_event_to_outbox_dlq(
605        &self,
606        queue: &str,
607        payload: Value,
608        reason: &str,
609    ) -> RedDBResult<EntityId> {
610        let dlq_name = format!("{queue}_outbox_dlq");
611        EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
612
613        crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
614            queue: queue.to_string(),
615            dlq: dlq_name.clone(),
616            reason: reason.to_string(),
617        }
618        .emit_global();
619
620        let store = self.inner.db.store();
621        if store.get_collection(&dlq_name).is_none() {
622            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
623        }
624        let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
625        let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
626        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
627        Ok(id)
628    }
629
630    /// Low-level event message insert — no size checks, no DLQ routing.
631    fn enqueue_event_payload_raw(
632        &self,
633        store: &UnifiedStore,
634        queue: &str,
635        config: &QueueRuntimeConfig,
636        payload: Value,
637    ) -> RedDBResult<EntityId> {
638        let position = next_queue_position(store, queue, QueueSide::Right)?;
639        let mut entity = UnifiedEntity::new(
640            EntityId::new(0),
641            EntityKind::QueueMessage {
642                queue: queue.to_string(),
643                position,
644            },
645            EntityData::QueueMessage(QueueMessageData {
646                payload,
647                priority: None,
648                enqueued_at_ns: now_ns(),
649                attempts: 0,
650                max_attempts: config.max_attempts,
651                acked: false,
652            }),
653        );
654        if let Some(xid) = self.current_xid() {
655            entity.set_xmin(xid);
656        }
657        let id = store
658            .insert_auto(queue, entity)
659            .map_err(|err| RedDBError::Internal(err.to_string()))?;
660        if let Some(ttl_ms) = config.ttl_ms {
661            store
662                .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
663                .map_err(|err| RedDBError::Internal(err.to_string()))?;
664        }
665        self.invalidate_result_cache_for_table(queue);
666        Ok(id)
667    }
668
669    pub fn execute_create_queue(
670        &self,
671        raw_query: &str,
672        query: &CreateQueueQuery,
673    ) -> RedDBResult<RuntimeQueryResult> {
674        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
675        if query.dlq.as_deref() == Some(query.name.as_str()) {
676            return Err(RedDBError::Query(
677                "dead-letter queue must be different from the source queue".to_string(),
678            ));
679        }
680
681        let store = self.inner.db.store();
682        let exists = store.get_collection(&query.name).is_some();
683        if exists {
684            if query.if_not_exists {
685                return Ok(RuntimeQueryResult::ok_message(
686                    raw_query.to_string(),
687                    &format!("queue '{}' already exists", query.name),
688                    "create",
689                ));
690            }
691            return Err(RedDBError::Query(format!(
692                "queue '{}' already exists",
693                query.name
694            )));
695        }
696
697        store
698            .create_collection(&query.name)
699            .map_err(|err| RedDBError::Internal(err.to_string()))?;
700        if let Some(ttl_ms) = query.ttl_ms {
701            self.inner
702                .db
703                .set_collection_default_ttl_ms(&query.name, ttl_ms);
704        }
705        self.inner
706            .db
707            .save_collection_contract(queue_collection_contract(
708                &query.name,
709                query.priority,
710                query.ttl_ms,
711            ))
712            .map_err(|err| RedDBError::Internal(err.to_string()))?;
713        save_queue_config(
714            store.as_ref(),
715            &query.name,
716            &QueueRuntimeConfig {
717                mode: query.mode,
718                priority: query.priority,
719                max_size: query.max_size,
720                ttl_ms: query.ttl_ms,
721                dlq: query.dlq.clone(),
722                max_attempts: query.max_attempts,
723                lock_deadline_ms: query.lock_deadline_ms,
724                in_flight_cap_per_group: query.in_flight_cap_per_group,
725                retry_delay_ms: query.retry_delay_ms,
726            },
727        )?;
728
729        if let Some(dlq) = &query.dlq {
730            if store.get_collection(dlq).is_none() {
731                store
732                    .create_collection(dlq)
733                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
734                self.inner
735                    .db
736                    .save_collection_contract(queue_collection_contract(dlq, false, None))
737                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
738            }
739        }
740
741        self.invalidate_result_cache();
742        self.inner
743            .db
744            .persist_metadata()
745            .map_err(|err| RedDBError::Internal(err.to_string()))?;
746        // Issue #120 — feed the queue into the schema-vocabulary so
747        // AskPipeline (#121) can resolve queue references. Queues
748        // have an opaque payload column, so we expose `payload` and
749        // (when configured) the DLQ partner as type-tag context.
750        let mut type_tags = Vec::new();
751        if let Some(dlq) = &query.dlq {
752            type_tags.push(format!("dlq:{}", dlq));
753        }
754        self.schema_vocabulary_apply(
755            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
756                collection: query.name.clone(),
757                columns: vec!["payload".to_string()],
758                type_tags,
759                description: None,
760            },
761        );
762
763        let mut msg = format!("queue '{}' created", query.name);
764        msg.push_str(&format!(" (mode={})", query.mode.as_str()));
765        if query.priority {
766            msg.push_str(" (priority)");
767        }
768        if let Some(max_size) = query.max_size {
769            msg.push_str(&format!(" (max_size={max_size})"));
770        }
771        if let Some(ttl_ms) = query.ttl_ms {
772            msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
773        }
774        if let Some(dlq) = &query.dlq {
775            msg.push_str(&format!(
776                " (dlq={dlq}, max_attempts={})",
777                query.max_attempts
778            ));
779        }
780
781        Ok(RuntimeQueryResult::ok_message(
782            raw_query.to_string(),
783            &msg,
784            "create",
785        ))
786    }
787
788    pub fn execute_alter_queue(
789        &self,
790        raw_query: &str,
791        query: &AlterQueueQuery,
792    ) -> RedDBResult<RuntimeQueryResult> {
793        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
794        let store = self.inner.db.store();
795        ensure_queue_exists(store.as_ref(), &query.name)?;
796
797        let mut config = load_queue_config(store.as_ref(), &query.name);
798        let mut summary: Vec<String> = Vec::new();
799
800        if let Some(new_mode) = query.mode {
801            let pending =
802                load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
803            if !pending.is_empty() {
804                tracing::warn!(
805                    queue = %query.name,
806                    pending_count = pending.len(),
807                    new_mode = %new_mode.as_str(),
808                    "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
809                     new reads use {}",
810                    pending.len(),
811                    new_mode.as_str(),
812                );
813            }
814            config.mode = new_mode;
815            summary.push(format!("mode={}", new_mode.as_str()));
816        }
817        if let Some(max_attempts) = query.max_attempts {
818            config.max_attempts = max_attempts;
819            summary.push(format!("max_attempts={max_attempts}"));
820        }
821        if let Some(lock_deadline_ms) = query.lock_deadline_ms {
822            config.lock_deadline_ms = lock_deadline_ms;
823            summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
824        }
825        if let Some(in_flight_cap) = query.in_flight_cap_per_group {
826            config.in_flight_cap_per_group = in_flight_cap;
827            summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
828        }
829        if let Some(dlq) = &query.dlq {
830            if dlq == &query.name {
831                return Err(RedDBError::Query(
832                    "dead-letter queue must be different from the source queue".to_string(),
833                ));
834            }
835            config.dlq = Some(dlq.clone());
836            summary.push(format!("dlq={dlq}"));
837        }
838        if let Some(retry_delay_ms) = query.retry_delay_ms {
839            config.retry_delay_ms = if retry_delay_ms == 0 {
840                None
841            } else {
842                Some(retry_delay_ms)
843            };
844            summary.push(format!("retry_delay_ms={retry_delay_ms}"));
845        }
846
847        save_queue_config(store.as_ref(), &query.name, &config)?;
848
849        self.invalidate_result_cache();
850        self.inner
851            .db
852            .persist_metadata()
853            .map_err(|err| RedDBError::Internal(err.to_string()))?;
854
855        Ok(RuntimeQueryResult::ok_message(
856            raw_query.to_string(),
857            &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
858            "alter",
859        ))
860    }
861
862    pub fn execute_drop_queue(
863        &self,
864        raw_query: &str,
865        query: &DropQueueQuery,
866    ) -> RedDBResult<RuntimeQueryResult> {
867        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
868        let store = self.inner.db.store();
869        if super::impl_ddl::is_system_schema_name(&query.name) {
870            return Err(RedDBError::Query("system schema is read-only".to_string()));
871        }
872        if store.get_collection(&query.name).is_none() {
873            if query.if_exists {
874                return Ok(RuntimeQueryResult::ok_message(
875                    raw_query.to_string(),
876                    &format!("queue '{}' does not exist", query.name),
877                    "drop",
878                ));
879            }
880            return Err(RedDBError::NotFound(format!(
881                "queue '{}' not found",
882                query.name
883            )));
884        }
885        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
886            &query.name,
887            &self.inner.db.catalog_model_snapshot(),
888        )?;
889        crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
890            crate::catalog::CollectionModel::Queue,
891            actual,
892        )?;
893
894        store
895            .drop_collection(&query.name)
896            .map_err(|err| RedDBError::Internal(err.to_string()))?;
897        self.inner.db.clear_collection_default_ttl_ms(&query.name);
898        self.inner
899            .db
900            .remove_collection_contract(&query.name)
901            .map_err(|err| RedDBError::Internal(err.to_string()))?;
902        remove_queue_metadata(store.as_ref(), &query.name);
903        self.invalidate_result_cache();
904        self.inner
905            .db
906            .persist_metadata()
907            .map_err(|err| RedDBError::Internal(err.to_string()))?;
908        // Issue #120 — invalidate the schema-vocabulary entry.
909        self.schema_vocabulary_apply(
910            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
911                collection: query.name.clone(),
912            },
913        );
914
915        Ok(RuntimeQueryResult::ok_message(
916            raw_query.to_string(),
917            &format!("queue '{}' dropped", query.name),
918            "drop",
919        ))
920    }
921
922    pub fn execute_queue_command(
923        &self,
924        raw_query: &str,
925        cmd: &QueueCommand,
926    ) -> RedDBResult<RuntimeQueryResult> {
927        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
928        match cmd {
929            QueueCommand::Push {
930                queue,
931                value,
932                side,
933                priority,
934                available,
935            } => {
936                let store = self.inner.db.store();
937                ensure_queue_exists(store.as_ref(), queue)?;
938                let config = load_queue_config(store.as_ref(), queue);
939                if priority.is_some() && !config.priority {
940                    return Err(RedDBError::Query(format!(
941                        "queue '{}' is not a priority queue",
942                        queue
943                    )));
944                }
945                if let Some(max_size) = config.max_size {
946                    let current_len =
947                        load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
948                            .len();
949                    if current_len >= max_size {
950                        return Err(RedDBError::Query(format!(
951                            "queue '{}' is full (max_size={max_size})",
952                            queue
953                        )));
954                    }
955                }
956
957                let position = next_queue_position(store.as_ref(), queue, *side)?;
958                let mut entity = UnifiedEntity::new(
959                    EntityId::new(0),
960                    EntityKind::QueueMessage {
961                        queue: queue.clone(),
962                        position,
963                    },
964                    EntityData::QueueMessage(QueueMessageData {
965                        payload: value.clone(),
966                        priority: if config.priority { *priority } else { None },
967                        enqueued_at_ns: now_ns(),
968                        attempts: 0,
969                        max_attempts: config.max_attempts,
970                        acked: false,
971                    }),
972                );
973                // Phase 1.1 MVCC universal: stamp xmin so other
974                // connections don't see this message until COMMIT.
975                if let Some(xid) = self.current_xid() {
976                    entity.set_xmin(xid);
977                }
978                let id = store
979                    .insert_auto(queue, entity)
980                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
981                // Resolve per-message availability (issue #722): DELAY is
982                // relative to the push instant, AVAILABLE AT carries an
983                // absolute unix-ms. Both collapse to a unix-ns timestamp
984                // delivery paths compare against. `None` means immediate.
985                let available_at_ns = available.map(|a| match a {
986                    crate::storage::query::ast::QueueAvailability::DelayMs(ms) => {
987                        now_ns().saturating_add(ms.saturating_mul(1_000_000))
988                    }
989                    crate::storage::query::ast::QueueAvailability::AtUnixMs(ms) => {
990                        ms.saturating_mul(1_000_000)
991                    }
992                });
993                if config.ttl_ms.is_some() || available_at_ns.is_some() {
994                    store
995                        .set_metadata(
996                            queue,
997                            id,
998                            queue_message_metadata(config.ttl_ms, available_at_ns),
999                        )
1000                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1001                }
1002                // Slice C of PRD #718 — wake `QUEUE READ … WAIT` waiters.
1003                // Under autocommit this fires immediately; inside a txn
1004                // the wake is buffered and replayed on COMMIT (rollback
1005                // discards it so rolled-back enqueues do not deliver).
1006                self.record_queue_wake(&queue_wait_scope(), queue);
1007                self.invalidate_result_cache();
1008
1009                let mut result = UnifiedResult::with_columns(vec![
1010                    "message_id".into(),
1011                    "side".into(),
1012                    "queue".into(),
1013                ]);
1014                let mut record = UnifiedRecord::new();
1015                record.set("message_id", Value::text(message_id_string(id)));
1016                record.set(
1017                    "side",
1018                    Value::text(match side {
1019                        QueueSide::Left => "left".to_string(),
1020                        QueueSide::Right => "right".to_string(),
1021                    }),
1022                );
1023                record.set("queue", Value::text(queue.clone()));
1024                result.push(record);
1025
1026                Ok(RuntimeQueryResult {
1027                    query: raw_query.to_string(),
1028                    mode: QueryMode::Sql,
1029                    statement: "queue_push",
1030                    engine: "runtime-queue",
1031                    result,
1032                    affected_rows: 1,
1033                    statement_type: "insert",
1034                    bookmark: None,
1035                })
1036            }
1037            QueueCommand::Pop { queue, side, count } => {
1038                let store = self.inner.db.store();
1039                ensure_queue_exists(store.as_ref(), queue)?;
1040                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1041                let popped = lifecycle
1042                    .pop(queue, ast_side_to_lc(*side), *count, &txn)
1043                    .map_err(map_qse)?;
1044
1045                let mut result =
1046                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1047                for (message_id, payload) in &popped {
1048                    let mut record = UnifiedRecord::new();
1049                    record.set(
1050                        "message_id",
1051                        Value::text(message_id_string(EntityId::new(*message_id))),
1052                    );
1053                    record.set("payload", payload.clone());
1054                    result.push(record);
1055                }
1056                let popped_count = popped.len() as u64;
1057                if popped_count > 0 {
1058                    self.invalidate_result_cache();
1059                }
1060
1061                Ok(RuntimeQueryResult {
1062                    query: raw_query.to_string(),
1063                    mode: QueryMode::Sql,
1064                    statement: "queue_pop",
1065                    engine: "runtime-queue",
1066                    result,
1067                    affected_rows: popped_count,
1068                    statement_type: "delete",
1069                    bookmark: None,
1070                })
1071            }
1072            QueueCommand::Peek { queue, count } => {
1073                let store = self.inner.db.store();
1074                ensure_queue_exists(store.as_ref(), queue)?;
1075                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1076                let messages = lifecycle.peek(queue, *count, &txn);
1077
1078                let mut result =
1079                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1080                for message in messages {
1081                    let mut record = UnifiedRecord::new();
1082                    record.set(
1083                        "message_id",
1084                        Value::text(message_id_string(EntityId::new(message.message_id))),
1085                    );
1086                    record.set("payload", message.payload);
1087                    result.push(record);
1088                }
1089
1090                Ok(RuntimeQueryResult {
1091                    query: raw_query.to_string(),
1092                    mode: QueryMode::Sql,
1093                    statement: "queue_peek",
1094                    engine: "runtime-queue",
1095                    result,
1096                    affected_rows: 0,
1097                    statement_type: "select",
1098                    bookmark: None,
1099                })
1100            }
1101            QueueCommand::Len { queue } => {
1102                let store = self.inner.db.store();
1103                ensure_queue_exists(store.as_ref(), queue)?;
1104                let count =
1105                    load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
1106                        as u64;
1107                let mut result = UnifiedResult::with_columns(vec!["len".into()]);
1108                let mut record = UnifiedRecord::new();
1109                record.set("len", Value::UnsignedInteger(count));
1110                result.push(record);
1111
1112                Ok(RuntimeQueryResult {
1113                    query: raw_query.to_string(),
1114                    mode: QueryMode::Sql,
1115                    statement: "queue_len",
1116                    engine: "runtime-queue",
1117                    result,
1118                    affected_rows: 0,
1119                    statement_type: "select",
1120                    bookmark: None,
1121                })
1122            }
1123            QueueCommand::Purge { queue } => {
1124                let store = self.inner.db.store();
1125                ensure_queue_exists(store.as_ref(), queue)?;
1126                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1127                let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
1128                if count > 0 {
1129                    self.invalidate_result_cache();
1130                }
1131
1132                Ok(RuntimeQueryResult::ok_message(
1133                    raw_query.to_string(),
1134                    &format!("{count} messages purged from queue '{queue}'"),
1135                    "delete",
1136                ))
1137            }
1138            QueueCommand::GroupCreate { queue, group } => {
1139                let store = self.inner.db.store();
1140                ensure_queue_exists(store.as_ref(), queue)?;
1141                if queue_group_exists(store.as_ref(), queue, group)? {
1142                    return Ok(RuntimeQueryResult::ok_message(
1143                        raw_query.to_string(),
1144                        &format!(
1145                            "consumer group '{}' already exists on queue '{}'",
1146                            group, queue
1147                        ),
1148                        "create",
1149                    ));
1150                }
1151                save_queue_group(store.as_ref(), queue, group)?;
1152                self.invalidate_result_cache();
1153
1154                Ok(RuntimeQueryResult::ok_message(
1155                    raw_query.to_string(),
1156                    &format!("consumer group '{}' created on queue '{}'", group, queue),
1157                    "create",
1158                ))
1159            }
1160            QueueCommand::GroupRead {
1161                queue,
1162                group,
1163                consumer,
1164                count,
1165                wait_ms,
1166            } => {
1167                let store = self.inner.db.store();
1168                ensure_queue_exists(store.as_ref(), queue)?;
1169                // Slice B of PRD #718: reject `WAIT` issued inside an
1170                // explicit transaction, and reject `WAIT > cap` before
1171                // any waiter is registered. Both checks fire before the
1172                // lifecycle is touched so a refused statement leaves
1173                // no side effects (no group auto-create, no parking).
1174                if let Some(ms) = *wait_ms {
1175                    if self.current_xid().is_some() {
1176                        return Err(RedDBError::Query(
1177                            "QUEUE READ … WAIT is autocommit-only: refusing to park inside an explicit transaction (BEGIN/COMMIT)"
1178                                .to_string(),
1179                        ));
1180                    }
1181                    let cap =
1182                        self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
1183                    if ms > cap {
1184                        return Err(RedDBError::Query(format!(
1185                            "QUEUE READ … WAIT {ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
1186                        )));
1187                    }
1188                }
1189                // Resolve the consumer group up-front so the lifecycle
1190                // sees the same auto-created `_work_default` / fanout
1191                // group the legacy `read_messages` would have minted.
1192                let config = load_queue_config(store.as_ref(), queue);
1193                let group_owned =
1194                    resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
1195                let group_ref = group_owned.as_str();
1196                let delivered = self
1197                    .group_read_with_optional_wait(queue, group_ref, consumer, *count, *wait_ms)?;
1198
1199                // Issue #742 — record consumer presence on every read,
1200                // including empty returns. Heartbeat-driven aliveness
1201                // is the contract; pending deliveries don't define it.
1202                {
1203                    let lease_count = u32::try_from(delivered.len()).unwrap_or(u32::MAX);
1204                    let now_ns = std::time::SystemTime::now()
1205                        .duration_since(std::time::UNIX_EPOCH)
1206                        .map(|d| d.as_nanos() as u64)
1207                        .unwrap_or(0);
1208                    self.queue_presence().heartbeat(
1209                        queue,
1210                        group_ref,
1211                        consumer,
1212                        lease_count,
1213                        now_ns,
1214                    );
1215                }
1216
1217                let mut result = UnifiedResult::with_columns(vec![
1218                    "message_id".into(),
1219                    "payload".into(),
1220                    "consumer".into(),
1221                    "delivery_count".into(),
1222                    "attempts".into(),
1223                ]);
1224
1225                for message in delivered {
1226                    let mut record = UnifiedRecord::new();
1227                    record.set(
1228                        "message_id",
1229                        Value::text(message_id_string(EntityId::new(message.message_id))),
1230                    );
1231                    record.set("payload", message.payload);
1232                    record.set("consumer", Value::text(message.consumer));
1233                    record.set(
1234                        "delivery_count",
1235                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1236                    );
1237                    record.set(
1238                        "attempts",
1239                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1240                    );
1241                    result.push(record);
1242                }
1243                if !result.records.is_empty() {
1244                    self.invalidate_result_cache();
1245                }
1246
1247                Ok(RuntimeQueryResult {
1248                    query: raw_query.to_string(),
1249                    mode: QueryMode::Sql,
1250                    statement: "queue_group_read",
1251                    engine: "runtime-queue",
1252                    result,
1253                    affected_rows: 0,
1254                    statement_type: "select",
1255                    bookmark: None,
1256                })
1257            }
1258            QueueCommand::Pending { queue, group } => {
1259                let store = self.inner.db.store();
1260                ensure_queue_exists(store.as_ref(), queue)?;
1261                require_queue_group(store.as_ref(), queue, group)?;
1262                let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
1263                pending.sort_by_key(|entry| entry.delivered_at_ns);
1264                let current_time_ns = now_ns();
1265
1266                let mut result = UnifiedResult::with_columns(vec![
1267                    "message_id".into(),
1268                    "consumer".into(),
1269                    "delivered_at_ns".into(),
1270                    "delivery_count".into(),
1271                    "idle_ms".into(),
1272                ]);
1273                for entry in pending {
1274                    let mut record = UnifiedRecord::new();
1275                    record.set(
1276                        "message_id",
1277                        Value::text(message_id_string(entry.message_id)),
1278                    );
1279                    record.set("consumer", Value::text(entry.consumer));
1280                    record.set(
1281                        "delivered_at_ns",
1282                        Value::UnsignedInteger(entry.delivered_at_ns),
1283                    );
1284                    record.set(
1285                        "delivery_count",
1286                        Value::UnsignedInteger(u64::from(entry.delivery_count)),
1287                    );
1288                    record.set(
1289                        "idle_ms",
1290                        Value::UnsignedInteger(
1291                            current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
1292                        ),
1293                    );
1294                    result.push(record);
1295                }
1296
1297                Ok(RuntimeQueryResult {
1298                    query: raw_query.to_string(),
1299                    mode: QueryMode::Sql,
1300                    statement: "queue_pending",
1301                    engine: "runtime-queue",
1302                    result,
1303                    affected_rows: 0,
1304                    statement_type: "select",
1305                    bookmark: None,
1306                })
1307            }
1308            QueueCommand::Claim {
1309                queue,
1310                group,
1311                consumer,
1312                min_idle_ms,
1313            } => {
1314                let store = self.inner.db.store();
1315                ensure_queue_exists(store.as_ref(), queue)?;
1316                require_queue_group(store.as_ref(), queue, group)?;
1317                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1318                let delivered = lifecycle
1319                    .claim_delivering(queue, consumer, *min_idle_ms, &txn)
1320                    .map_err(map_qse)?;
1321
1322                let mut result = UnifiedResult::with_columns(vec![
1323                    "message_id".into(),
1324                    "payload".into(),
1325                    "consumer".into(),
1326                    "delivery_count".into(),
1327                ]);
1328
1329                for message in delivered {
1330                    let mut record = UnifiedRecord::new();
1331                    record.set(
1332                        "message_id",
1333                        Value::text(message_id_string(EntityId::new(message.message_id))),
1334                    );
1335                    record.set("payload", message.payload);
1336                    record.set("consumer", Value::text(message.consumer));
1337                    record.set(
1338                        "delivery_count",
1339                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1340                    );
1341                    result.push(record);
1342                }
1343                if !result.records.is_empty() {
1344                    self.invalidate_result_cache();
1345                }
1346                let affected_rows = result.records.len() as u64;
1347
1348                Ok(RuntimeQueryResult {
1349                    query: raw_query.to_string(),
1350                    mode: QueryMode::Sql,
1351                    statement: "queue_claim",
1352                    engine: "runtime-queue",
1353                    result,
1354                    affected_rows,
1355                    statement_type: "update",
1356                    bookmark: None,
1357                })
1358            }
1359            QueueCommand::Ack {
1360                queue,
1361                group,
1362                message_id,
1363                delivery_id,
1364            } => {
1365                let store = self.inner.db.store();
1366                ensure_queue_exists(store.as_ref(), queue)?;
1367                let (group_owned, message_entity) = resolve_ack_nack_handle(
1368                    store.as_ref(),
1369                    queue,
1370                    group,
1371                    message_id,
1372                    delivery_id.as_deref(),
1373                )?;
1374                let group_ref = group_owned.as_str();
1375                require_queue_group(store.as_ref(), queue, group_ref)?;
1376                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1377                let did = match delivery_id.as_deref() {
1378                    Some(d) => d.to_string(),
1379                    None => ps
1380                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
1381                        .ok_or_else(|| {
1382                            RedDBError::NotFound(format!(
1383                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
1384                                message_entity.raw(),
1385                                queue,
1386                                group_ref
1387                            ))
1388                        })?,
1389                };
1390                lifecycle.ack(&txn, &did).map_err(map_qse)?;
1391                self.invalidate_result_cache();
1392
1393                Ok(RuntimeQueryResult::ok_message(
1394                    raw_query.to_string(),
1395                    "message acknowledged",
1396                    "update",
1397                ))
1398            }
1399            QueueCommand::Nack {
1400                queue,
1401                group,
1402                message_id,
1403                delivery_id,
1404                delay_ms,
1405            } => {
1406                let store = self.inner.db.store();
1407                ensure_queue_exists(store.as_ref(), queue)?;
1408                let config = load_queue_config(store.as_ref(), queue);
1409                // Issue #723: a per-failure DELAY override is a write
1410                // operation that re-shapes retry behavior; readers must
1411                // not be able to silently re-schedule another worker's
1412                // work. Embedded callers (no auth identity attached)
1413                // are trusted and bypass the check.
1414                if delay_ms.is_some() {
1415                    if let Some((_, role)) = current_auth_identity() {
1416                        if !role.can_write() {
1417                            return Err(RedDBError::InvalidOperation(format!(
1418                                "role '{role}' is not authorized to override NACK retry delay on queue '{queue}'"
1419                            )));
1420                        }
1421                    }
1422                }
1423                let (group_owned, message_entity) = resolve_ack_nack_handle(
1424                    store.as_ref(),
1425                    queue,
1426                    group,
1427                    message_id,
1428                    delivery_id.as_deref(),
1429                )?;
1430                let group_ref = group_owned.as_str();
1431                require_queue_group(store.as_ref(), queue, group_ref)?;
1432                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1433                let did = match delivery_id.as_deref() {
1434                    Some(d) => d.to_string(),
1435                    None => ps
1436                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
1437                        .ok_or_else(|| {
1438                            RedDBError::NotFound(format!(
1439                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
1440                                message_entity.raw(),
1441                                queue,
1442                                group_ref
1443                            ))
1444                        })?,
1445                };
1446                // Resolve the effective retry delay: per-failure
1447                // override wins, then queue default, then zero
1448                // (immediate requeue — pre-#723 behavior).
1449                let effective_delay_ms = delay_ms.or(config.retry_delay_ms).unwrap_or(0);
1450                let pending_attempt = ps.read_pending_attempt(&did).map_err(map_qse)?;
1451                let nack_attempts = pending_attempt.attempts.saturating_add(1);
1452                let outcome = lifecycle.nack(&txn, &did).map_err(map_qse)?;
1453                // Apply delay only when the message was actually
1454                // requeued — DLQ promotion / drop terminate the
1455                // retry cycle and a delay would be meaningless.
1456                if matches!(outcome, RetirementOutcome::Requeued) && effective_delay_ms > 0 {
1457                    let at_ns =
1458                        now_ns().saturating_add(effective_delay_ms.saturating_mul(1_000_000));
1459                    set_message_available_at_ns(
1460                        store.as_ref(),
1461                        queue,
1462                        message_entity,
1463                        Some(at_ns),
1464                        config.ttl_ms,
1465                    )?;
1466                }
1467                // Issue #723: routine retries do not flood audit
1468                // channels (telemetry already covers them via
1469                // `queue_nacked_total{outcome=...}`). Significant
1470                // overrides — large delays, destination changes,
1471                // drops — are audited so operators see the events
1472                // that re-shape operational risk.
1473                self.maybe_emit_nack_audit(
1474                    queue,
1475                    group_ref,
1476                    &did,
1477                    *delay_ms,
1478                    config.retry_delay_ms,
1479                    &outcome,
1480                );
1481                let outcome_label = match &outcome {
1482                    RetirementOutcome::Requeued => NackOutcomeLabel::Retry,
1483                    RetirementOutcome::MovedToDlq(_) => NackOutcomeLabel::Dlq,
1484                    RetirementOutcome::Dropped => NackOutcomeLabel::Drop,
1485                };
1486                self.queue_telemetry().record_nacked(
1487                    queue,
1488                    group_ref,
1489                    config.mode.as_str(),
1490                    outcome_label,
1491                );
1492                if let RetirementOutcome::MovedToDlq(dlq) = &outcome {
1493                    OperatorEvent::QueueDlqPromoted {
1494                        queue: queue.to_string(),
1495                        group: group_ref.to_string(),
1496                        dlq: dlq.clone(),
1497                        message_id: pending_attempt.message_id,
1498                        attempts: nack_attempts,
1499                        reason: format!("lifecycle_nack:{did}"),
1500                    }
1501                    .emit(self.audit_log());
1502                }
1503                let message = match outcome {
1504                    RetirementOutcome::Requeued => {
1505                        if effective_delay_ms > 0 {
1506                            format!("message requeued (delay={effective_delay_ms}ms)")
1507                        } else {
1508                            "message requeued".to_string()
1509                        }
1510                    }
1511                    RetirementOutcome::MovedToDlq(dlq) => {
1512                        format!("message moved to dead-letter queue '{}'", dlq)
1513                    }
1514                    RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
1515                };
1516                self.invalidate_result_cache();
1517
1518                Ok(RuntimeQueryResult::ok_message(
1519                    raw_query.to_string(),
1520                    &message,
1521                    "update",
1522                ))
1523            }
1524            QueueCommand::Move {
1525                source,
1526                destination,
1527                filter,
1528                limit,
1529            } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
1530        }
1531    }
1532
1533    pub fn execute_queue_select(
1534        &self,
1535        raw_query: &str,
1536        query: &QueueSelectQuery,
1537    ) -> RedDBResult<RuntimeQueryResult> {
1538        let store = self.inner.db.store();
1539        ensure_queue_exists(store.as_ref(), &query.queue)?;
1540        let config = load_queue_config(store.as_ref(), &query.queue);
1541        let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
1542        let columns = if query.columns.is_empty() {
1543            queue_projection_default_columns()
1544        } else {
1545            query.columns.clone()
1546        };
1547
1548        let mut messages =
1549            load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1550        sort_queue_messages(&mut messages, &config, QueueSide::Left);
1551
1552        let mut result = UnifiedResult::with_columns(columns.clone());
1553        for message in messages {
1554            if query
1555                .filter
1556                .as_ref()
1557                .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1558            {
1559                continue;
1560            }
1561            let record = queue_projection_record(&columns, &message, dlq)?;
1562            result.push(record);
1563            if query
1564                .limit
1565                .is_some_and(|limit| result.records.len() >= limit as usize)
1566            {
1567                break;
1568            }
1569        }
1570
1571        Ok(RuntimeQueryResult {
1572            query: raw_query.to_string(),
1573            mode: QueryMode::Sql,
1574            statement: "queue_select",
1575            engine: "runtime-queue",
1576            result,
1577            affected_rows: 0,
1578            statement_type: "select",
1579            bookmark: None,
1580        })
1581    }
1582
1583    fn execute_queue_move(
1584        &self,
1585        raw_query: &str,
1586        source: &str,
1587        destination: &str,
1588        filter: Option<&Filter>,
1589        limit: usize,
1590    ) -> RedDBResult<RuntimeQueryResult> {
1591        if source == destination {
1592            return Err(RedDBError::Query(
1593                "QUEUE MOVE source and destination must be different".to_string(),
1594            ));
1595        }
1596        let store = self.inner.db.store();
1597        ensure_queue_exists(store.as_ref(), source)?;
1598        ensure_queue_exists(store.as_ref(), destination)?;
1599        let source_config = load_queue_config(store.as_ref(), source);
1600        let destination_config = load_queue_config(store.as_ref(), destination);
1601        let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1602
1603        let mut messages =
1604            load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1605        sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1606        let selected = messages
1607            .into_iter()
1608            .filter(|message| {
1609                filter
1610                    .map(|f| queue_message_matches_filter(message, source_dlq, f))
1611                    .unwrap_or(true)
1612            })
1613            .take(limit)
1614            .collect::<Vec<_>>();
1615
1616        if let Some(max_size) = destination_config.max_size {
1617            let current_len =
1618                load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1619                    .len();
1620            if current_len + selected.len() > max_size {
1621                return Err(RedDBError::Query(format!(
1622                    "queue '{}' is full (max_size={max_size})",
1623                    destination
1624                )));
1625            }
1626        }
1627
1628        for message in &selected {
1629            let lock = queue_message_lock_handle(self, source, message.id);
1630            let Some(_guard) = lock.try_lock() else {
1631                return Err(RedDBError::Query(format!(
1632                    "message '{}' is locked on queue '{}'",
1633                    message.id.raw(),
1634                    source
1635                )));
1636            };
1637            if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1638                return Err(RedDBError::Query(format!(
1639                    "message '{}' is no longer available on queue '{}'",
1640                    message.id.raw(),
1641                    source
1642                )));
1643            }
1644        }
1645
1646        let mut inserted = Vec::new();
1647        for message in &selected {
1648            match insert_moved_queue_message(
1649                store.as_ref(),
1650                destination,
1651                &destination_config,
1652                message,
1653            ) {
1654                Ok(id) => inserted.push(id),
1655                Err(err) => {
1656                    for id in inserted {
1657                        let _ = store.delete(destination, id);
1658                    }
1659                    return Err(err);
1660                }
1661            }
1662        }
1663
1664        let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1665        for message in &selected {
1666            move_lifecycle
1667                .delete_with_state(source, message.id.raw(), &move_txn)
1668                .map_err(map_qse)?;
1669        }
1670        if !selected.is_empty() {
1671            self.invalidate_result_cache();
1672        }
1673
1674        let selected_count = selected.len() as u64;
1675        self.audit_log().record_event(
1676            AuditEvent::builder("queue/move")
1677                .source(AuditAuthSource::System)
1678                .outcome(Outcome::Success)
1679                .resource(format!("queue:{source}->{destination}"))
1680                .fields([
1681                    AuditFieldEscaper::field("source", source),
1682                    AuditFieldEscaper::field("destination", destination),
1683                    AuditFieldEscaper::field("selected", selected_count),
1684                    AuditFieldEscaper::field("committed", selected_count),
1685                ])
1686                .build(),
1687        );
1688
1689        let mut result = UnifiedResult::with_columns(vec![
1690            "source".into(),
1691            "destination".into(),
1692            "selected".into(),
1693            "committed".into(),
1694        ]);
1695        let mut record = UnifiedRecord::new();
1696        record.set("source", Value::text(source.to_string()));
1697        record.set("destination", Value::text(destination.to_string()));
1698        record.set("selected", Value::UnsignedInteger(selected_count));
1699        record.set("committed", Value::UnsignedInteger(selected_count));
1700        result.push(record);
1701
1702        Ok(RuntimeQueryResult {
1703            query: raw_query.to_string(),
1704            mode: QueryMode::Sql,
1705            statement: "queue_move",
1706            engine: "runtime-queue",
1707            result,
1708            affected_rows: selected_count,
1709            statement_type: "update",
1710            bookmark: None,
1711        })
1712    }
1713
1714    /// Issue #723: routine retries are observable through metrics
1715    /// (`queue_nacked_total{outcome=...}`) so this is the audit
1716    /// shoulder for the *non-routine* cases: explicit NACK delay
1717    /// overrides whose magnitude or destination-changing impact would
1718    /// be invisible in metrics alone. Specifically:
1719    ///
1720    /// - Explicit override ≥ 60s (a worker decided to defer well past
1721    ///   the queue's default cadence — operators care).
1722    /// - Override that lands on a DLQ promotion or drop (destination
1723    ///   changed; the override may have influenced retire-vs-requeue
1724    ///   accounting on the caller's side and the audit trail needs to
1725    ///   show who asked).
1726    ///
1727    /// Calls with no override are intentionally silent here.
1728    fn maybe_emit_nack_audit(
1729        &self,
1730        queue: &str,
1731        group: &str,
1732        delivery_id: &str,
1733        override_ms: Option<u64>,
1734        default_ms: Option<u64>,
1735        outcome: &RetirementOutcome,
1736    ) {
1737        let Some(override_ms) = override_ms else {
1738            return;
1739        };
1740        let outcome_label = match outcome {
1741            RetirementOutcome::Requeued => "requeued",
1742            RetirementOutcome::MovedToDlq(_) => "dlq",
1743            RetirementOutcome::Dropped => "dropped",
1744        };
1745        const SIGNIFICANT_DELAY_MS: u64 = 60_000;
1746        let destination_changed = !matches!(outcome, RetirementOutcome::Requeued);
1747        if override_ms < SIGNIFICANT_DELAY_MS && !destination_changed {
1748            return;
1749        }
1750        self.audit_log().record_event(
1751            AuditEvent::builder("queue/nack/override")
1752                .source(AuditAuthSource::System)
1753                .outcome(Outcome::Success)
1754                .resource(format!("queue:{queue}"))
1755                .fields([
1756                    AuditFieldEscaper::field("queue", queue),
1757                    AuditFieldEscaper::field("group", group),
1758                    AuditFieldEscaper::field("delivery_id", delivery_id),
1759                    AuditFieldEscaper::field("override_delay_ms", override_ms),
1760                    AuditFieldEscaper::field("default_delay_ms", default_ms.unwrap_or(0)),
1761                    AuditFieldEscaper::field("outcome", outcome_label),
1762                ])
1763                .build(),
1764        );
1765    }
1766}
1767
1768fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1769    if store.get_collection(queue).is_some() {
1770        Ok(())
1771    } else {
1772        Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1773    }
1774}
1775
1776pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1777    let default = QueueRuntimeConfig {
1778        mode: QueueMode::Work,
1779        priority: false,
1780        max_size: None,
1781        ttl_ms: None,
1782        dlq: None,
1783        max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1784        lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1785        in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1786        retry_delay_ms: None,
1787    };
1788
1789    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1790        return default;
1791    };
1792    manager
1793        .query_all(|entity| {
1794            entity.data.as_row().is_some_and(|row| {
1795                row_text(row, "kind").as_deref() == Some("queue_config")
1796                    && row_text(row, "queue").as_deref() == Some(queue)
1797            })
1798        })
1799        .into_iter()
1800        .find_map(|entity| {
1801            let row = entity.data.as_row()?;
1802            Some(QueueRuntimeConfig {
1803                mode: row_text(row, "mode")
1804                    .as_deref()
1805                    .and_then(QueueMode::parse)
1806                    .unwrap_or_default(),
1807                priority: row_bool(row, "priority").unwrap_or(false),
1808                max_size: row_u64(row, "max_size").map(|value| value as usize),
1809                ttl_ms: row_u64(row, "ttl_ms"),
1810                dlq: row_text(row, "dlq"),
1811                max_attempts: row_u64(row, "max_attempts")
1812                    .map(|value| value as u32)
1813                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1814                lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1815                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1816                in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1817                    .map(|value| value as u32)
1818                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1819                retry_delay_ms: row_u64(row, "retry_delay_ms").filter(|v| *v > 0),
1820            })
1821        })
1822        .unwrap_or(default)
1823}
1824
1825pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1826    load_queue_config(store, queue).mode.as_str()
1827}
1828
1829fn save_queue_config(
1830    store: &UnifiedStore,
1831    queue: &str,
1832    config: &QueueRuntimeConfig,
1833) -> RedDBResult<()> {
1834    remove_meta_rows(store, |row| {
1835        row_text(row, "kind").as_deref() == Some("queue_config")
1836            && row_text(row, "queue").as_deref() == Some(queue)
1837    });
1838
1839    let mut fields = HashMap::new();
1840    fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1841    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1842    fields.insert(
1843        "mode".to_string(),
1844        Value::text(config.mode.as_str().to_string()),
1845    );
1846    fields.insert("priority".to_string(), Value::Boolean(config.priority));
1847    fields.insert(
1848        "max_size".to_string(),
1849        config
1850            .max_size
1851            .map(|value| Value::UnsignedInteger(value as u64))
1852            .unwrap_or(Value::Null),
1853    );
1854    fields.insert(
1855        "ttl_ms".to_string(),
1856        config
1857            .ttl_ms
1858            .map(Value::UnsignedInteger)
1859            .unwrap_or(Value::Null),
1860    );
1861    fields.insert(
1862        "dlq".to_string(),
1863        config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1864    );
1865    fields.insert(
1866        "max_attempts".to_string(),
1867        Value::UnsignedInteger(u64::from(config.max_attempts)),
1868    );
1869    fields.insert(
1870        "lock_deadline_ms".to_string(),
1871        Value::UnsignedInteger(config.lock_deadline_ms),
1872    );
1873    fields.insert(
1874        "in_flight_cap_per_group".to_string(),
1875        Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1876    );
1877    fields.insert(
1878        "retry_delay_ms".to_string(),
1879        config
1880            .retry_delay_ms
1881            .map(Value::UnsignedInteger)
1882            .unwrap_or(Value::Null),
1883    );
1884    insert_meta_row(store, fields)
1885}
1886
1887fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1888    remove_meta_rows(store, |row| {
1889        row_text(row, "queue").as_deref() == Some(queue)
1890    });
1891}
1892
1893fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1894    Ok(load_queue_groups(store, queue)?
1895        .into_iter()
1896        .any(|entry| entry.group == group))
1897}
1898
1899pub(super) fn require_queue_group(
1900    store: &UnifiedStore,
1901    queue: &str,
1902    group: &str,
1903) -> RedDBResult<()> {
1904    if queue_group_exists(store, queue, group)? {
1905        Ok(())
1906    } else {
1907        Err(RedDBError::NotFound(format!(
1908            "consumer group '{}' not found on queue '{}'",
1909            group, queue
1910        )))
1911    }
1912}
1913
1914pub(super) fn resolve_read_group(
1915    store: &UnifiedStore,
1916    queue: &str,
1917    group: Option<&str>,
1918    consumer: &str,
1919    config: &QueueRuntimeConfig,
1920) -> RedDBResult<String> {
1921    if let Some(group) = group {
1922        require_queue_group(store, queue, group)?;
1923        return Ok(group.to_string());
1924    }
1925
1926    match config.mode {
1927        QueueMode::Work => {
1928            if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1929                save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1930            }
1931            Ok(WORK_DEFAULT_GROUP.to_string())
1932        }
1933        QueueMode::Fanout => {
1934            let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1935            if !queue_group_exists(store, queue, &fanout_group)? {
1936                save_queue_group(store, queue, &fanout_group)?;
1937            }
1938            Ok(fanout_group)
1939        }
1940    }
1941}
1942
1943fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1944    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1945        return Ok(Vec::new());
1946    };
1947    Ok(manager
1948        .query_all(|entity| {
1949            entity.data.as_row().is_some_and(|row| {
1950                row_text(row, "kind").as_deref() == Some("queue_group")
1951                    && row_text(row, "queue").as_deref() == Some(queue)
1952            })
1953        })
1954        .into_iter()
1955        .filter_map(|entity| {
1956            let row = entity.data.as_row()?;
1957            Some(QueueGroupEntry {
1958                entity_id: entity.id,
1959                group: row_text(row, "group")?,
1960            })
1961        })
1962        .collect())
1963}
1964
1965fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1966    let mut fields = HashMap::new();
1967    fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1968    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1969    fields.insert("group".to_string(), Value::text(group.to_string()));
1970    fields.insert(
1971        "created_at_ns".to_string(),
1972        Value::UnsignedInteger(now_ns()),
1973    );
1974    insert_meta_row(store, fields)
1975}
1976
1977pub(super) fn load_pending_entries(
1978    store: &UnifiedStore,
1979    queue: &str,
1980    group: Option<&str>,
1981    message_id: Option<EntityId>,
1982) -> RedDBResult<Vec<QueuePendingEntry>> {
1983    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1984        return Ok(Vec::new());
1985    };
1986    let lock_deadline_ns = load_queue_config(store, queue)
1987        .lock_deadline_ms
1988        .saturating_mul(1_000_000);
1989    let attempts_by_key: HashMap<(String, String, u64), u64> = manager
1990        .query_all(|entity| {
1991            entity.data.as_row().is_some_and(|row| {
1992                row_text(row, "kind").as_deref() == Some("queue_attempts_lc")
1993                    && row_text(row, "queue").as_deref() == Some(queue)
1994            })
1995        })
1996        .into_iter()
1997        .filter_map(|entity| {
1998            let row = entity.data.as_row()?;
1999            Some((
2000                (
2001                    row_text(row, "queue")?,
2002                    row_text(row, "group")?,
2003                    row_u64(row, "message_id")?,
2004                ),
2005                row_u64(row, "attempts").unwrap_or(1),
2006            ))
2007        })
2008        .collect();
2009    Ok(manager
2010        .query_all(|entity| {
2011            entity.data.as_row().is_some_and(|row| {
2012                matches!(
2013                    row_text(row, "kind").as_deref(),
2014                    Some("queue_pending") | Some("queue_pending_lc")
2015                ) && row_text(row, "queue").as_deref() == Some(queue)
2016                    && group
2017                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2018                        .unwrap_or(true)
2019                    && message_id
2020                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2021                        .unwrap_or(true)
2022            })
2023        })
2024        .into_iter()
2025        .filter_map(|entity| {
2026            let row = entity.data.as_row()?;
2027            let group = row_text(row, "group")?;
2028            let message_id = row_u64(row, "message_id")?;
2029            let kind = row_text(row, "kind")?;
2030            let delivered_at_ns = if kind == "queue_pending_lc" {
2031                row_u64(row, "lock_deadline_ns")
2032                    .unwrap_or(0)
2033                    .saturating_sub(lock_deadline_ns)
2034            } else {
2035                row_u64(row, "delivered_at_ns")?
2036            };
2037            let delivery_count = if kind == "queue_pending_lc" {
2038                attempts_by_key
2039                    .get(&(queue.to_string(), group.clone(), message_id))
2040                    .copied()
2041                    .unwrap_or(1)
2042            } else {
2043                row_u64(row, "delivery_count").unwrap_or(1)
2044            };
2045            Some(QueuePendingEntry {
2046                entity_id: entity.id,
2047                group,
2048                message_id: EntityId::new(message_id),
2049                consumer: row_text(row, "consumer").unwrap_or_default(),
2050                delivered_at_ns,
2051                delivery_count: delivery_count as u32,
2052            })
2053        })
2054        .collect())
2055}
2056
2057pub(super) fn save_queue_pending(
2058    store: &UnifiedStore,
2059    queue: &str,
2060    group: &str,
2061    message_id: EntityId,
2062    consumer: &str,
2063    delivered_at_ns: u64,
2064    delivery_count: u32,
2065) -> RedDBResult<()> {
2066    remove_meta_rows(store, |row| {
2067        row_text(row, "kind").as_deref() == Some("queue_pending")
2068            && row_text(row, "queue").as_deref() == Some(queue)
2069            && row_text(row, "group").as_deref() == Some(group)
2070            && row_u64(row, "message_id") == Some(message_id.raw())
2071    });
2072
2073    let mut fields = HashMap::new();
2074    fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
2075    fields.insert("queue".to_string(), Value::text(queue.to_string()));
2076    fields.insert("group".to_string(), Value::text(group.to_string()));
2077    fields.insert(
2078        "message_id".to_string(),
2079        Value::UnsignedInteger(message_id.raw()),
2080    );
2081    fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
2082    fields.insert(
2083        "delivered_at_ns".to_string(),
2084        Value::UnsignedInteger(delivered_at_ns),
2085    );
2086    fields.insert(
2087        "delivery_count".to_string(),
2088        Value::UnsignedInteger(u64::from(delivery_count)),
2089    );
2090    insert_meta_row(store, fields)
2091}
2092
2093pub(super) fn require_pending_entry(
2094    store: &UnifiedStore,
2095    queue: &str,
2096    group: &str,
2097    message_id: EntityId,
2098) -> RedDBResult<QueuePendingEntry> {
2099    load_pending_entries(store, queue, Some(group), Some(message_id))?
2100        .into_iter()
2101        .next()
2102        .ok_or_else(|| {
2103            RedDBError::NotFound(format!(
2104                "message '{}' is not pending in group '{}' on queue '{}'",
2105                message_id.raw(),
2106                group,
2107                queue
2108            ))
2109        })
2110}
2111
2112pub(super) fn load_ack_entries(
2113    store: &UnifiedStore,
2114    queue: &str,
2115    group: Option<&str>,
2116    message_id: Option<EntityId>,
2117) -> RedDBResult<Vec<QueueAckEntry>> {
2118    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2119        return Ok(Vec::new());
2120    };
2121    Ok(manager
2122        .query_all(|entity| {
2123            entity.data.as_row().is_some_and(|row| {
2124                row_text(row, "kind").as_deref() == Some("queue_ack")
2125                    && row_text(row, "queue").as_deref() == Some(queue)
2126                    && group
2127                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2128                        .unwrap_or(true)
2129                    && message_id
2130                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2131                        .unwrap_or(true)
2132            })
2133        })
2134        .into_iter()
2135        .filter_map(|entity| {
2136            let row = entity.data.as_row()?;
2137            Some(QueueAckEntry {
2138                entity_id: entity.id,
2139                group: row_text(row, "group")?,
2140                message_id: EntityId::new(row_u64(row, "message_id")?),
2141            })
2142        })
2143        .collect())
2144}
2145
2146pub(super) fn save_queue_ack(
2147    store: &UnifiedStore,
2148    queue: &str,
2149    group: &str,
2150    message_id: EntityId,
2151) -> RedDBResult<()> {
2152    let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
2153    if !existing.is_empty() {
2154        return Ok(());
2155    }
2156
2157    let mut fields = HashMap::new();
2158    fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
2159    fields.insert("queue".to_string(), Value::text(queue.to_string()));
2160    fields.insert("group".to_string(), Value::text(group.to_string()));
2161    fields.insert(
2162        "message_id".to_string(),
2163        Value::UnsignedInteger(message_id.raw()),
2164    );
2165    fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
2166    insert_meta_row(store, fields)
2167}
2168
2169pub(super) fn queue_message_completed_for_all_groups(
2170    store: &UnifiedStore,
2171    queue: &str,
2172    message_id: EntityId,
2173) -> RedDBResult<bool> {
2174    let groups = load_queue_groups(store, queue)?;
2175    let pending = load_pending_entries(store, queue, None, Some(message_id))?;
2176    if !pending.is_empty() {
2177        return Ok(false);
2178    }
2179    if groups.is_empty() {
2180        return Ok(true);
2181    }
2182
2183    let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
2184        .into_iter()
2185        .map(|entry| entry.group)
2186        .collect::<HashSet<_>>();
2187    Ok(groups
2188        .into_iter()
2189        .all(|group| acked_groups.contains(&group.group)))
2190}
2191
2192fn load_queue_message_views(
2193    store: &UnifiedStore,
2194    queue: &str,
2195) -> RedDBResult<Vec<QueueMessageView>> {
2196    load_queue_message_views_with_runtime(None, store, queue)
2197}
2198
2199/// Kind-aware queue scan (Phase 2.5.5 RLS universal). When the
2200/// caller has a `RedDBRuntime` reference, the gate also applies
2201/// any `CREATE POLICY ... ON MESSAGES OF <queue>` predicate. In
2202/// autocommit / embedded paths that only have the raw store (e.g.
2203/// purge loops) we skip RLS because there's no session identity
2204/// to match against.
2205pub(super) fn load_queue_message_views_with_runtime(
2206    runtime: Option<&RedDBRuntime>,
2207    store: &UnifiedStore,
2208    queue: &str,
2209) -> RedDBResult<Vec<QueueMessageView>> {
2210    let manager = store
2211        .get_collection(queue)
2212        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
2213    // Phase 1.2 MVCC universal: capture before parallel scan. Messages
2214    // inserted by another connection's open txn stay invisible to
2215    // consumers until that txn commits (prevents phantom POPs).
2216    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
2217    let rls_filter = runtime.and_then(|rt| {
2218        crate::runtime::impl_core::rls_policy_filter_for_kind(
2219            rt,
2220            queue,
2221            crate::storage::query::ast::PolicyAction::Select,
2222            crate::storage::query::ast::PolicyTargetKind::Messages,
2223        )
2224    });
2225    let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
2226        && rls_filter.is_none()
2227        && runtime.is_some();
2228    if rls_enabled_but_denied {
2229        // RLS on + no Messages policy for this role = deny-default.
2230        return Ok(Vec::new());
2231    }
2232    let filter_arc = rls_filter.map(std::sync::Arc::new);
2233    let rt_arc = runtime;
2234    Ok(manager
2235        .query_all(move |entity| {
2236            if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
2237                return false;
2238            }
2239            if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
2240                return false;
2241            }
2242            if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
2243                return crate::runtime::query_exec::evaluate_entity_filter_with_db(
2244                    Some(&rt.inner.db),
2245                    entity,
2246                    filter,
2247                    queue,
2248                    queue,
2249                );
2250            }
2251            true
2252        })
2253        .into_iter()
2254        .filter_map(queue_message_view_from_entity)
2255        .map(|mut view| {
2256            view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2257            view
2258        })
2259        .collect())
2260}
2261
2262fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
2263    let (position, _) = match &entity.kind {
2264        EntityKind::QueueMessage { position, queue } => (*position, queue),
2265        _ => return None,
2266    };
2267    let data = match entity.data {
2268        EntityData::QueueMessage(data) => data,
2269        _ => return None,
2270    };
2271    Some(QueueMessageView {
2272        id: entity.id,
2273        position,
2274        priority: data.priority.unwrap_or(0),
2275        payload: data.payload,
2276        attempts: data.attempts,
2277        max_attempts: data.max_attempts,
2278        enqueued_at_ns: data.enqueued_at_ns,
2279        available_at_ns: None,
2280    })
2281}
2282
2283/// Insert a moved payload onto `queue` using only the payload value —
2284/// priority / attempts / TTL fall back to the destination queue's
2285/// catalog config (mirrors a fresh enqueue rather than carrying source
2286/// metadata over). Used by `PrimaryQueueStore::move_to_queue`, the
2287/// `QueueLifecycle::move_between_queues` adapter that owns only
2288/// `(message_id, payload)` after `pop_messages` retires the source row.
2289pub(super) fn insert_moved_queue_message_payload(
2290    store: &UnifiedStore,
2291    queue: &str,
2292    payload: &Value,
2293) -> RedDBResult<EntityId> {
2294    let config = load_queue_config(store, queue);
2295    let position = next_queue_position(store, queue, QueueSide::Right)?;
2296    let enqueued_at_ns = std::time::SystemTime::now()
2297        .duration_since(std::time::UNIX_EPOCH)
2298        .map(|d| d.as_nanos() as u64)
2299        .unwrap_or(0);
2300    let entity = UnifiedEntity::new(
2301        EntityId::new(0),
2302        EntityKind::QueueMessage {
2303            queue: queue.to_string(),
2304            position,
2305        },
2306        EntityData::QueueMessage(QueueMessageData {
2307            payload: payload.clone(),
2308            priority: None,
2309            enqueued_at_ns,
2310            attempts: 0,
2311            max_attempts: config.max_attempts,
2312            acked: false,
2313        }),
2314    );
2315    let id = store
2316        .insert_auto(queue, entity)
2317        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2318    if let Some(ttl_ms) = config.ttl_ms {
2319        store
2320            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2321            .map_err(|err| RedDBError::Internal(err.to_string()))?;
2322    }
2323    Ok(id)
2324}
2325
2326fn insert_moved_queue_message(
2327    store: &UnifiedStore,
2328    queue: &str,
2329    config: &QueueRuntimeConfig,
2330    message: &QueueMessageView,
2331) -> RedDBResult<EntityId> {
2332    let position = next_queue_position(store, queue, QueueSide::Right)?;
2333    let entity = UnifiedEntity::new(
2334        EntityId::new(0),
2335        EntityKind::QueueMessage {
2336            queue: queue.to_string(),
2337            position,
2338        },
2339        EntityData::QueueMessage(QueueMessageData {
2340            payload: message.payload.clone(),
2341            priority: if config.priority {
2342                Some(message.priority)
2343            } else {
2344                None
2345            },
2346            enqueued_at_ns: message.enqueued_at_ns,
2347            attempts: message.attempts,
2348            max_attempts: message.max_attempts,
2349            acked: false,
2350        }),
2351    );
2352    let id = store
2353        .insert_auto(queue, entity)
2354        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2355    if let Some(ttl_ms) = config.ttl_ms {
2356        store
2357            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2358            .map_err(|err| RedDBError::Internal(err.to_string()))?;
2359    }
2360    Ok(id)
2361}
2362
2363fn queue_projection_default_columns() -> Vec<String> {
2364    [
2365        "id",
2366        "payload",
2367        "priority",
2368        "attempts",
2369        "last_error",
2370        "enqueued_at",
2371        "available_at",
2372        "dlq",
2373        "tenant",
2374    ]
2375    .into_iter()
2376    .map(str::to_string)
2377    .collect()
2378}
2379
2380fn queue_projection_record(
2381    columns: &[String],
2382    message: &QueueMessageView,
2383    dlq: bool,
2384) -> RedDBResult<UnifiedRecord> {
2385    let mut record = UnifiedRecord::new();
2386    for column in columns {
2387        let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
2388            RedDBError::Query(format!("unknown queue projection column '{}'", column))
2389        })?;
2390        record.set(column, value);
2391    }
2392    Ok(record)
2393}
2394
2395fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
2396    match column {
2397        "id" => Some(Value::text(message_id_string(message.id))),
2398        "payload" => Some(message.payload.clone()),
2399        "priority" => Some(Value::Integer(i64::from(message.priority))),
2400        "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
2401        "last_error" => Some(Value::Null),
2402        "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
2403        "available_at" => Some(Value::UnsignedInteger(
2404            message.available_at_ns.unwrap_or(message.enqueued_at_ns),
2405        )),
2406        "dlq" => Some(Value::Boolean(dlq)),
2407        "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
2408        _ => None,
2409    }
2410}
2411
2412fn queue_message_tenant(payload: &Value) -> Option<Value> {
2413    let Value::Json(bytes) = payload else {
2414        return None;
2415    };
2416    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2417    json.get("tenant")
2418        .and_then(crate::json::Value::as_str)
2419        .map(|tenant| Value::text(tenant.to_string()))
2420}
2421
2422fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
2423    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2424        return false;
2425    };
2426    !manager
2427        .query_all(|entity| {
2428            entity.data.as_row().is_some_and(|row| {
2429                row_text(row, "kind").as_deref() == Some("queue_config")
2430                    && row_text(row, "dlq").as_deref() == Some(queue)
2431            })
2432        })
2433        .is_empty()
2434}
2435
2436fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
2437    match filter {
2438        Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
2439            .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
2440        Filter::CompareFields { left, op, right } => {
2441            match (
2442                queue_filter_field_value(message, dlq, left),
2443                queue_filter_field_value(message, dlq, right),
2444            ) {
2445                (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
2446                _ => false,
2447            }
2448        }
2449        Filter::And(left, right) => {
2450            queue_message_matches_filter(message, dlq, left)
2451                && queue_message_matches_filter(message, dlq, right)
2452        }
2453        Filter::Or(left, right) => {
2454            queue_message_matches_filter(message, dlq, left)
2455                || queue_message_matches_filter(message, dlq, right)
2456        }
2457        Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
2458        Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
2459            .is_none_or(|value| matches!(value, Value::Null)),
2460        Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
2461            .is_some_and(|value| !matches!(value, Value::Null)),
2462        Filter::In { field, values } => {
2463            queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
2464                values
2465                    .iter()
2466                    .any(|value| queue_values_equal(&candidate, value))
2467            })
2468        }
2469        Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
2470            .is_some_and(|candidate| {
2471                queue_compare_values(&candidate, low, CompareOp::Ge)
2472                    && queue_compare_values(&candidate, high, CompareOp::Le)
2473            }),
2474        Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
2475            .is_some_and(|value| queue_like_matches(&value, pattern)),
2476        Filter::StartsWith { field, prefix } => {
2477            queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
2478        }
2479        Filter::EndsWith { field, suffix } => {
2480            queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
2481        }
2482        Filter::Contains { field, substring } => {
2483            queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
2484        }
2485        Filter::CompareExpr { .. } => false,
2486    }
2487}
2488
2489fn queue_filter_field_value(
2490    message: &QueueMessageView,
2491    dlq: bool,
2492    field: &FieldRef,
2493) -> Option<Value> {
2494    match field {
2495        FieldRef::TableColumn { table, column } if table.is_empty() => {
2496            queue_projection_value(message, dlq, column)
2497                .or_else(|| queue_payload_field_value(&message.payload, column))
2498        }
2499        FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
2500            .or_else(|| queue_payload_field_value(&message.payload, column)),
2501        _ => None,
2502    }
2503}
2504
2505fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
2506    let Value::Json(bytes) = payload else {
2507        return None;
2508    };
2509    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2510    let value = json.get(field)?;
2511    json_value_to_schema_value(value)
2512}
2513
2514fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
2515    if matches!(value, crate::json::Value::Null) {
2516        Some(Value::Null)
2517    } else if let Some(value) = value.as_bool() {
2518        Some(Value::Boolean(value))
2519    } else if let Some(value) = value.as_i64() {
2520        Some(Value::Integer(value))
2521    } else if let Some(value) = value.as_u64() {
2522        Some(Value::UnsignedInteger(value))
2523    } else if let Some(value) = value.as_f64() {
2524        Some(Value::Float(value))
2525    } else if let Some(value) = value.as_str() {
2526        Some(Value::text(value.to_string()))
2527    } else {
2528        Some(Value::Json(value.to_string_compact().into_bytes()))
2529    }
2530}
2531
2532fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
2533    queue_filter_field_value(message, dlq, field).and_then(|value| match value {
2534        Value::Text(value) => Some(value.to_string()),
2535        Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
2536        Value::Integer(value) => Some(value.to_string()),
2537        Value::UnsignedInteger(value) => Some(value.to_string()),
2538        Value::Float(value) => Some(value.to_string()),
2539        Value::Boolean(value) => Some(value.to_string()),
2540        _ => None,
2541    })
2542}
2543
2544fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
2545    match op {
2546        CompareOp::Eq => queue_values_equal(left, right),
2547        CompareOp::Ne => !queue_values_equal(left, right),
2548        CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
2549        CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
2550        CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
2551        CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
2552    }
2553}
2554
2555fn queue_values_equal(left: &Value, right: &Value) -> bool {
2556    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2557        return (left - right).abs() < f64::EPSILON;
2558    }
2559    match (left, right) {
2560        (Value::Text(left), Value::Text(right)) => left == right,
2561        (Value::Boolean(left), Value::Boolean(right)) => left == right,
2562        _ => left == right,
2563    }
2564}
2565
2566fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
2567    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2568        return left.partial_cmp(&right);
2569    }
2570    match (left, right) {
2571        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
2572        _ => None,
2573    }
2574}
2575
2576fn queue_value_number(value: &Value) -> Option<f64> {
2577    match value {
2578        Value::Integer(value) => Some(*value as f64),
2579        Value::UnsignedInteger(value) => Some(*value as f64),
2580        Value::Float(value) => Some(*value),
2581        Value::Text(value) => value.parse().ok(),
2582        _ => None,
2583    }
2584}
2585
2586fn queue_like_matches(value: &str, pattern: &str) -> bool {
2587    if pattern == "%" {
2588        return true;
2589    }
2590    let starts_wild = pattern.starts_with('%');
2591    let ends_wild = pattern.ends_with('%');
2592    let needle = pattern.trim_matches('%');
2593    match (starts_wild, ends_wild) {
2594        (true, true) => value.contains(needle),
2595        (true, false) => value.ends_with(needle),
2596        (false, true) => value.starts_with(needle),
2597        (false, false) => value == needle,
2598    }
2599}
2600
2601pub(super) fn queue_message_view_by_id(
2602    store: &UnifiedStore,
2603    queue: &str,
2604    message_id: EntityId,
2605) -> RedDBResult<Option<QueueMessageView>> {
2606    let manager = queue_manager(store, queue)?;
2607    Ok(manager
2608        .get(message_id)
2609        .and_then(queue_message_view_from_entity)
2610        .map(|mut view| {
2611            view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2612            view
2613        }))
2614}
2615
2616pub(super) fn sort_queue_messages(
2617    messages: &mut [QueueMessageView],
2618    config: &QueueRuntimeConfig,
2619    side: QueueSide,
2620) {
2621    messages.sort_by(|left, right| {
2622        if config.priority {
2623            right
2624                .priority
2625                .cmp(&left.priority)
2626                .then_with(|| match side {
2627                    QueueSide::Left => left.position.cmp(&right.position),
2628                    QueueSide::Right => right.position.cmp(&left.position),
2629                })
2630                .then_with(|| left.id.raw().cmp(&right.id.raw()))
2631        } else {
2632            match side {
2633                QueueSide::Left => left.position.cmp(&right.position),
2634                QueueSide::Right => right.position.cmp(&left.position),
2635            }
2636            .then_with(|| left.id.raw().cmp(&right.id.raw()))
2637        }
2638    });
2639}
2640
2641pub(super) fn next_queue_position(
2642    store: &UnifiedStore,
2643    queue: &str,
2644    side: QueueSide,
2645) -> RedDBResult<u64> {
2646    let messages = load_queue_message_views(store, queue)?;
2647    if messages.is_empty() {
2648        return Ok(QUEUE_POSITION_CENTER);
2649    }
2650    match side {
2651        QueueSide::Left => Ok(messages
2652            .iter()
2653            .map(|message| message.position)
2654            .min()
2655            .unwrap_or(QUEUE_POSITION_CENTER)
2656            .saturating_sub(1)),
2657        QueueSide::Right => Ok(messages
2658            .iter()
2659            .map(|message| message.position)
2660            .max()
2661            .unwrap_or(QUEUE_POSITION_CENTER)
2662            .saturating_add(1)),
2663    }
2664}
2665
2666pub(super) fn increment_queue_attempts(
2667    store: &UnifiedStore,
2668    queue: &str,
2669    message_id: EntityId,
2670) -> RedDBResult<u32> {
2671    let manager = queue_manager(store, queue)?;
2672    let mut entity = manager
2673        .get(message_id)
2674        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2675    match &mut entity.data {
2676        EntityData::QueueMessage(message) => {
2677            message.attempts = message.attempts.saturating_add(1);
2678            let attempts = message.attempts;
2679            manager
2680                .update(entity)
2681                .map_err(|err| RedDBError::Internal(err.to_string()))?;
2682            Ok(attempts)
2683        }
2684        _ => Err(RedDBError::Query(format!(
2685            "entity '{}' is not a queue message",
2686            message_id.raw()
2687        ))),
2688    }
2689}
2690
2691pub(super) fn queue_message_attempts(
2692    store: &UnifiedStore,
2693    queue: &str,
2694    message_id: EntityId,
2695) -> RedDBResult<u32> {
2696    Ok(queue_message_data(store, queue, message_id)?.attempts)
2697}
2698
2699pub(super) fn queue_message_max_attempts(
2700    store: &UnifiedStore,
2701    queue: &str,
2702    message_id: EntityId,
2703) -> RedDBResult<u32> {
2704    Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2705}
2706
2707pub(super) fn queue_message_payload(
2708    store: &UnifiedStore,
2709    queue: &str,
2710    message_id: EntityId,
2711) -> RedDBResult<Value> {
2712    Ok(queue_message_data(store, queue, message_id)?.payload)
2713}
2714
2715pub(super) fn queue_message_pending_any(
2716    store: &UnifiedStore,
2717    queue: &str,
2718    message_id: EntityId,
2719) -> RedDBResult<bool> {
2720    Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2721}
2722
2723pub(super) fn queue_message_pending_for_group(
2724    store: &UnifiedStore,
2725    queue: &str,
2726    group: &str,
2727    message_id: EntityId,
2728) -> RedDBResult<bool> {
2729    Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2730}
2731
2732pub(super) fn queue_message_acked_for_group(
2733    store: &UnifiedStore,
2734    queue: &str,
2735    group: &str,
2736    message_id: EntityId,
2737) -> RedDBResult<bool> {
2738    Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2739}
2740
2741fn queue_manager(
2742    store: &UnifiedStore,
2743    queue: &str,
2744) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2745    store
2746        .get_collection(queue)
2747        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2748}
2749
2750pub(super) fn queue_message_data(
2751    store: &UnifiedStore,
2752    queue: &str,
2753    message_id: EntityId,
2754) -> RedDBResult<QueueMessageData> {
2755    let manager = queue_manager(store, queue)?;
2756    let entity = manager
2757        .get(message_id)
2758        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2759    match entity.data {
2760        EntityData::QueueMessage(message) => Ok(message),
2761        _ => Err(RedDBError::Query(format!(
2762            "entity '{}' is not a queue message",
2763            message_id.raw()
2764        ))),
2765    }
2766}
2767
2768fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2769    let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2770    store
2771        .insert_auto(
2772            QUEUE_META_COLLECTION,
2773            UnifiedEntity::new(
2774                EntityId::new(0),
2775                EntityKind::TableRow {
2776                    table: Arc::from(QUEUE_META_COLLECTION),
2777                    row_id: 0,
2778                },
2779                EntityData::Row(RowData {
2780                    columns: Vec::new(),
2781                    named: Some(fields),
2782                    schema: None,
2783                }),
2784            ),
2785        )
2786        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2787    Ok(())
2788}
2789
2790pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2791    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2792        return;
2793    };
2794    let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2795    for row in rows {
2796        let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2797    }
2798}
2799
2800pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2801    let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2802}
2803
2804fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2805    format!("{queue}:{}", message_id.raw())
2806}
2807
2808pub(super) fn queue_message_lock_handle(
2809    runtime: &RedDBRuntime,
2810    queue: &str,
2811    message_id: EntityId,
2812) -> Arc<parking_lot::Mutex<()>> {
2813    let key = queue_message_lock_key(queue, message_id);
2814    if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2815        return lock;
2816    }
2817
2818    let mut locks = runtime.inner.queue_message_locks.write();
2819    locks
2820        .entry(key)
2821        .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2822        .clone()
2823}
2824
2825pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2826    runtime
2827        .inner
2828        .queue_message_locks
2829        .write()
2830        .remove(&queue_message_lock_key(queue, message_id));
2831}
2832
2833fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2834    let raw = value.strip_prefix('e').unwrap_or(value);
2835    raw.parse::<u64>()
2836        .map(EntityId::new)
2837        .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2838}
2839
2840/// ADR 0026: resolve the ACK/NACK handle. When `delivery_id` is supplied,
2841/// it wins unconditionally — strict failure if the handle does not resolve
2842/// to a live pending delivery on `queue`. When only the legacy tuple is
2843/// supplied, emit a rate-limited deprecation log line and use the tuple.
2844/// At least one handle must be present.
2845pub(super) fn resolve_ack_nack_handle(
2846    store: &UnifiedStore,
2847    queue: &str,
2848    group_hint: &str,
2849    message_id_hint: &str,
2850    delivery_id: Option<&str>,
2851) -> RedDBResult<(String, EntityId)> {
2852    if let Some(did) = delivery_id {
2853        return resolve_delivery_id(store, queue, did);
2854    }
2855    if group_hint.is_empty() || message_id_hint.is_empty() {
2856        return Err(RedDBError::Query(
2857            "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2858                .to_string(),
2859        ));
2860    }
2861    log_tuple_deprecation(queue);
2862    let entity = parse_message_id(message_id_hint)?;
2863    Ok((group_hint.to_string(), entity))
2864}
2865
2866fn resolve_delivery_id(
2867    store: &UnifiedStore,
2868    queue: &str,
2869    delivery_id: &str,
2870) -> RedDBResult<(String, EntityId)> {
2871    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2872        return Err(RedDBError::Query(format!(
2873            "delivery_id '{}' does not resolve to a live pending delivery",
2874            delivery_id
2875        )));
2876    };
2877    for entity in manager.query_all(|entity| {
2878        entity.data.as_row().is_some_and(|row| {
2879            row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2880                && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2881        })
2882    }) {
2883        if let Some(row) = entity.data.as_row() {
2884            let row_queue = row_text(row, "queue").unwrap_or_default();
2885            let row_group = row_text(row, "group").unwrap_or_default();
2886            let row_message = row_u64(row, "message_id").unwrap_or(0);
2887            if row_queue != queue {
2888                return Err(RedDBError::Query(format!(
2889                    "delivery_id '{}' belongs to queue '{}', not '{}'",
2890                    delivery_id, row_queue, queue
2891                )));
2892            }
2893            return Ok((row_group, EntityId::new(row_message)));
2894        }
2895    }
2896    Err(RedDBError::Query(format!(
2897        "delivery_id '{}' does not resolve to a live pending delivery",
2898        delivery_id
2899    )))
2900}
2901
2902/// Per-(connection, queue) rate-limited "tuple ACK is deprecated" log line.
2903/// One emission per minute matches ADR 0026's operational guidance.
2904fn log_tuple_deprecation(queue: &str) {
2905    use std::sync::atomic::Ordering;
2906    use std::sync::{Mutex, OnceLock};
2907    use std::time::Instant;
2908
2909    static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2910    const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2911
2912    let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2913    let key = (super::impl_core::current_connection_id(), queue.to_string());
2914    let now = Instant::now();
2915    let mut guard = match map.lock() {
2916        Ok(g) => g,
2917        Err(_) => return,
2918    };
2919    let should_emit =
2920        !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2921    if should_emit {
2922        guard.insert(key.clone(), now);
2923        drop(guard);
2924        TUPLE_DEPRECATION_EMITS.fetch_add(1, Ordering::Relaxed);
2925        tracing::warn!(
2926            target: "reddb::queue_lifecycle",
2927            queue = queue,
2928            connection_id = key.0,
2929            "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2930             switch to the server-issued delivery_id (ADR 0026). \
2931             The tuple path will be removed one minor release after introduction.",
2932        );
2933    }
2934}
2935
2936/// Total count of tuple-deprecation log emissions since process start.
2937/// Intentionally process-wide and `pub` so the transport-bridge
2938/// integration tests can observe that the legacy tuple path emitted a
2939/// deprecation while the `delivery_id` path stayed silent, without
2940/// having to plumb a `tracing::Subscriber` through every test.
2941pub static TUPLE_DEPRECATION_EMITS: std::sync::atomic::AtomicU64 =
2942    std::sync::atomic::AtomicU64::new(0);
2943
2944fn message_id_string(message_id: EntityId) -> String {
2945    message_id.raw().to_string()
2946}
2947
2948/// Issue #917 — render a delivered queue message as the JSON object the
2949/// RedWire `QueueEventPush` frame carries. Mirrors the column shape the
2950/// SQL `QUEUE READ` projection emits (`message_id` / `payload` /
2951/// `consumer` / `delivery_count`) so the wire push and the pull path
2952/// stay client-compatible.
2953fn delivered_message_json(
2954    message: crate::runtime::queue_lifecycle::DeliveredMessage,
2955) -> crate::serde_json::Value {
2956    use crate::serde_json::{Map, Value as JsonValue};
2957    let mut obj = Map::new();
2958    obj.insert(
2959        "message_id".to_string(),
2960        JsonValue::String(message_id_string(EntityId::new(message.message_id))),
2961    );
2962    obj.insert(
2963        "payload".to_string(),
2964        crate::presentation::entity_json::storage_value_to_json(&message.payload),
2965    );
2966    obj.insert("consumer".to_string(), JsonValue::String(message.consumer));
2967    obj.insert(
2968        "delivery_count".to_string(),
2969        JsonValue::Number(message.delivery_count as f64),
2970    );
2971    JsonValue::Object(obj)
2972}
2973
2974/// Slice 10 of issue #527 — render-time scan of pending entries
2975/// per (queue, group) for `queue_pending_gauge` exposition. Walks
2976/// `red_queue_meta` live so the gauge cannot drift from the source
2977/// of truth.
2978pub(crate) fn pending_counts_by_group(
2979    store: &UnifiedStore,
2980) -> std::collections::BTreeMap<(String, String), u64> {
2981    let mut counts: std::collections::BTreeMap<(String, String), u64> =
2982        std::collections::BTreeMap::new();
2983    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2984        return counts;
2985    };
2986    for entity in manager.query_all(|entity| {
2987        entity
2988            .data
2989            .as_row()
2990            .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2991    }) {
2992        if let Some(row) = entity.data.as_row() {
2993            let queue = row_text(row, "queue");
2994            let group = row_text(row, "group");
2995            if let (Some(q), Some(g)) = (queue, group) {
2996                *counts.entry((q, g)).or_insert(0) += 1;
2997            }
2998        }
2999    }
3000    counts
3001}
3002
3003pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
3004    match row.get_field(field)?.clone() {
3005        Value::Text(value) => Some(value.to_string()),
3006        Value::NodeRef(value) => Some(value),
3007        Value::EdgeRef(value) => Some(value),
3008        Value::TableRef(value) => Some(value),
3009        _ => None,
3010    }
3011}
3012
3013pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
3014    match row.get_field(field)?.clone() {
3015        Value::UnsignedInteger(value) => Some(value),
3016        Value::Integer(value) if value >= 0 => Some(value as u64),
3017        Value::Float(value) if value >= 0.0 => Some(value as u64),
3018        Value::Text(value) => value.parse().ok(),
3019        _ => None,
3020    }
3021}
3022
3023fn row_bool(row: &RowData, field: &str) -> Option<bool> {
3024    match row.get_field(field)?.clone() {
3025        Value::Boolean(value) => Some(value),
3026        Value::Text(value) => match value.to_ascii_lowercase().as_str() {
3027            "true" => Some(true),
3028            "false" => Some(false),
3029            _ => None,
3030        },
3031        _ => None,
3032    }
3033}
3034
3035fn queue_collection_contract(
3036    name: &str,
3037    priority: bool,
3038    ttl_ms: Option<u64>,
3039) -> crate::physical::CollectionContract {
3040    let now = current_unix_ms();
3041    let mut context_index_fields = Vec::new();
3042    if priority {
3043        context_index_fields.push("priority".to_string());
3044    }
3045
3046    crate::physical::CollectionContract {
3047        name: name.to_string(),
3048        declared_model: crate::catalog::CollectionModel::Queue,
3049        schema_mode: crate::catalog::SchemaMode::Dynamic,
3050        origin: crate::physical::ContractOrigin::Explicit,
3051        version: 1,
3052        created_at_unix_ms: now,
3053        updated_at_unix_ms: now,
3054        default_ttl_ms: ttl_ms,
3055        vector_dimension: None,
3056        vector_metric: None,
3057        context_index_fields,
3058        declared_columns: Vec::new(),
3059        table_def: None,
3060        timestamps_enabled: false,
3061        context_index_enabled: false,
3062        metrics_raw_retention_ms: None,
3063        metrics_rollup_policies: Vec::new(),
3064        metrics_tenant_identity: None,
3065        metrics_namespace: None,
3066        // Queues manipulate messages via push/pop/ack — the row DML
3067        // paths never apply. Flag it as append_only so inadvertent
3068        // `UPDATE/DELETE FROM queue_name` statements fail loudly.
3069        append_only: true,
3070        subscriptions: Vec::new(),
3071        analytics_config: Vec::new(),
3072        session_key: None,
3073        session_gap_ms: None,
3074        retention_duration_ms: None,
3075        analytical_storage: None,
3076
3077        ai_policy: None,
3078    }
3079}
3080
3081fn current_unix_ms() -> u128 {
3082    std::time::SystemTime::now()
3083        .duration_since(std::time::UNIX_EPOCH)
3084        .unwrap_or_default()
3085        .as_millis()
3086}
3087
3088pub(super) fn now_ns() -> u64 {
3089    std::time::SystemTime::now()
3090        .duration_since(std::time::UNIX_EPOCH)
3091        .unwrap_or_default()
3092        .as_nanos() as u64
3093}
3094
3095pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
3096    queue_message_metadata(Some(ttl_ms), None)
3097}
3098
3099/// Build the per-message metadata row attached to a queue message. Both
3100/// fields are optional — `_ttl_ms` carries the per-message TTL (existing
3101/// behaviour) and `_available_at_ns` carries the first-delivery instant
3102/// for delayed messages (issue #722). When both are present they share
3103/// the same row, since `UnifiedStore::set_metadata` replaces the entry.
3104pub(super) fn queue_message_metadata(
3105    ttl_ms: Option<u64>,
3106    available_at_ns: Option<u64>,
3107) -> Metadata {
3108    let mut fields = HashMap::new();
3109    if let Some(ttl_ms) = ttl_ms {
3110        fields.insert(
3111            "_ttl_ms".to_string(),
3112            if ttl_ms <= i64::MAX as u64 {
3113                MetadataValue::Int(ttl_ms as i64)
3114            } else {
3115                MetadataValue::Timestamp(ttl_ms)
3116            },
3117        );
3118    }
3119    if let Some(at_ns) = available_at_ns {
3120        fields.insert(
3121            "_available_at_ns".to_string(),
3122            MetadataValue::Timestamp(at_ns),
3123        );
3124    }
3125    Metadata::with_fields(fields)
3126}
3127
3128/// Smallest future `available_at_ns` among messages currently sitting
3129/// on `queue`. Used by `QUEUE READ … WAIT` (issue #722) to cap the
3130/// condvar park horizon: without this, a waiter on a quiet queue with
3131/// only delayed messages would never wake when one became due, since
3132/// the wait registry is only notified by producer commits. Returns
3133/// `None` when no future-dated message exists (the common case — the
3134/// caller falls back to the user-supplied wait budget).
3135pub(super) fn earliest_future_available_at(store: &UnifiedStore, queue: &str) -> Option<u64> {
3136    let now_ns = now_ns();
3137    let views = load_queue_message_views_with_runtime(None, store, queue).ok()?;
3138    views
3139        .iter()
3140        .filter_map(|v| v.available_at_ns)
3141        .filter(|at| *at > now_ns)
3142        .min()
3143}
3144
3145/// Update the `_available_at_ns` metadata for a queue message in place
3146/// without dropping any `_ttl_ms` already present (issue #723 — used by
3147/// NACK retry delay). `available_at_ns = None` clears the field. The
3148/// `fallback_ttl_ms` argument is consulted only when the existing
3149/// metadata row carries no `_ttl_ms` — keeps the per-message TTL in
3150/// sync with the queue default in the common case where no per-message
3151/// TTL was set explicitly. Tolerant of missing collections / entities:
3152/// returns `Ok(())` rather than failing the surrounding NACK if the
3153/// underlying message has gone away between resolution and metadata
3154/// update (a benign race; the next delivery cycle reflects truth).
3155pub(super) fn set_message_available_at_ns(
3156    store: &UnifiedStore,
3157    queue: &str,
3158    message_id: EntityId,
3159    available_at_ns: Option<u64>,
3160    fallback_ttl_ms: Option<u64>,
3161) -> RedDBResult<()> {
3162    let existing_ttl_ms = store
3163        .get_metadata(queue, message_id)
3164        .and_then(|md| match md.get("_ttl_ms")? {
3165            MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3166            MetadataValue::Timestamp(t) => Some(*t),
3167            _ => None,
3168        })
3169        .or(fallback_ttl_ms);
3170    let metadata = queue_message_metadata(existing_ttl_ms, available_at_ns);
3171    match store.set_metadata(queue, message_id, metadata) {
3172        Ok(()) => Ok(()),
3173        Err(crate::storage::StoreError::CollectionNotFound(_)) => Ok(()),
3174        Err(err) => Err(RedDBError::Internal(err.to_string())),
3175    }
3176}
3177
3178/// Read the `_available_at_ns` metadata for a queue message. Returns
3179/// `None` for messages with no delay (the common case) or when the
3180/// metadata row is missing entirely.
3181pub(super) fn read_message_available_at_ns(
3182    store: &UnifiedStore,
3183    queue: &str,
3184    message_id: EntityId,
3185) -> Option<u64> {
3186    let md = store.get_metadata(queue, message_id)?;
3187    match md.get("_available_at_ns")? {
3188        MetadataValue::Timestamp(t) => Some(*t),
3189        MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3190        _ => None,
3191    }
3192}
3193
3194/// Rough payload byte estimate for outbox watermark tracking.
3195fn estimate_payload_bytes(payload: &Value) -> u64 {
3196    match payload {
3197        Value::Json(v) => v.len() as u64,
3198        Value::Text(s) => s.len() as u64,
3199        _ => 64,
3200    }
3201}
3202
3203#[cfg(test)]
3204mod presence_integration_tests {
3205    use super::*;
3206    use crate::storage::queue::presence::{PresenceState, DEFAULT_PRESENCE_TTL_MS};
3207    use crate::{RedDBOptions, RedDBRuntime};
3208
3209    /// Issue #742 acceptance: `QUEUE READ` must register and refresh
3210    /// consumer presence. The snapshot exposes the same `(queue, group,
3211    /// consumer)` triple the read named, with `state = Active`.
3212    #[test]
3213    fn queue_read_emits_consumer_presence_heartbeat() {
3214        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3215        rt.execute_query("CREATE QUEUE tasks").unwrap();
3216        rt.execute_query("QUEUE GROUP CREATE tasks workers")
3217            .unwrap();
3218        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
3219        rt.execute_query("QUEUE READ tasks GROUP workers CONSUMER w1")
3220            .unwrap();
3221
3222        let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3223        assert_eq!(snap.len(), 1, "exactly one heartbeat recorded");
3224        let row = &snap[0];
3225        assert_eq!(row.queue, "tasks");
3226        assert_eq!(row.group, "workers");
3227        assert_eq!(row.consumer, "w1");
3228        assert_eq!(row.state, PresenceState::Active);
3229
3230        let counts = rt.queue_active_consumer_counts(DEFAULT_PRESENCE_TTL_MS);
3231        assert_eq!(
3232            counts[&("tasks".to_string(), "workers".to_string())],
3233            1,
3234            "active count reflects the live consumer"
3235        );
3236    }
3237
3238    /// Issue #742 acceptance: a read that returns no messages must
3239    /// still heartbeat — aliveness is independent of pending
3240    /// deliveries.
3241    #[test]
3242    fn empty_queue_read_still_heartbeats() {
3243        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3244        rt.execute_query("CREATE QUEUE empty_q").unwrap();
3245        rt.execute_query("QUEUE GROUP CREATE empty_q workers")
3246            .unwrap();
3247        // No PUSH — the queue is empty.
3248        rt.execute_query("QUEUE READ empty_q GROUP workers CONSUMER w1")
3249            .unwrap();
3250
3251        let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3252        assert_eq!(
3253            snap.len(),
3254            1,
3255            "empty read still registers consumer presence"
3256        );
3257        assert_eq!(snap[0].state, PresenceState::Active);
3258        assert_eq!(
3259            snap[0].lease_count, 0,
3260            "no messages delivered → zero leases"
3261        );
3262    }
3263}