Skip to main content

reddb_server/runtime/
impl_queue.rs

1//! Queue DDL and command execution
2
3use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::runtime::impl_core::{
9    clear_current_auth_identity, clear_current_tenant, current_auth_identity, current_tenant,
10    set_current_auth_identity, set_current_tenant,
11};
12use crate::runtime::queue_telemetry::NackOutcomeLabel;
13use crate::storage::queue::QueueMode;
14use crate::storage::unified::entity::{QueueMessageData, RowData};
15use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
16use crate::telemetry::operator_event::OperatorEvent;
17
18use super::*;
19
20use super::primary_queue_store::PrimaryQueueStore;
21use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
22use crate::storage::queue::lifecycle::{
23    QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
24};
25
26/// Build a [`QueueLifecycle`] backed by a fresh [`PrimaryQueueStore`] for
27/// the given `queue`, plus a `QueueTxn` bound to the live runtime
28/// connection. The store inside the lifecycle and the standalone
29/// [`PrimaryQueueStore`] returned for ack/nack lookups share the same
30/// underlying [`UnifiedStore`] — calls against either are observable on
31/// the other.
32pub(super) fn runtime_lifecycle(
33    runtime: &RedDBRuntime,
34    queue: &str,
35) -> (
36    QueueLifecycle<PrimaryQueueStore>,
37    PrimaryQueueStore,
38    QueueTxn,
39) {
40    let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
41    let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
42    let txn = primary_for_lifecycle.new_txn();
43    let cfg = primary_for_lifecycle.lifecycle_config(queue);
44    (
45        QueueLifecycle::new(primary_for_lifecycle, cfg)
46            .with_telemetry(Arc::clone(&runtime.inner.queue_telemetry)),
47        primary_for_lookup,
48        txn,
49    )
50}
51
52/// Slice C of PRD #718 — error surfaced to callers when the wait
53/// registry is cancelled (server shutdown) while a `QUEUE READ … WAIT`
54/// is parked. Kept as a plain `RedDBError::Query` so transports
55/// inherit the message unchanged — there is no separate `Cancelled`
56/// variant on the public error today.
57pub(crate) const QUEUE_READ_WAIT_CANCELLED: &str =
58    "QUEUE READ WAIT cancelled — server shutting down";
59
60/// Slice B of PRD #718 — `red.config` key naming the maximum WAIT
61/// budget the runtime will honour. Values above the cap are rejected
62/// before any waiter is registered.
63pub(crate) const QUEUE_MAX_WAIT_MS_CONFIG_KEY: &str = "red.config.queue.max_wait_ms";
64
65/// Default cap when the operator has not set
66/// [`QUEUE_MAX_WAIT_MS_CONFIG_KEY`] — 60 seconds, in milliseconds.
67pub(crate) const QUEUE_MAX_WAIT_MS_DEFAULT: u64 = 60_000;
68
69/// Outcome of the async live queue-wait edge ([`RedDBRuntime::redwire_queue_wait_json`],
70/// issue #919). Carries the three non-error terminal states the RedWire
71/// session maps to distinct frames, so a timeout never aliases an empty
72/// delivery and a cancellation never aliases a timeout:
73///   - `Delivered` → one `QueueEventPush` per message.
74///   - `TimedOut`  → a distinct `QueueWaitTimeout` frame.
75///   - `Cancelled` → a `StreamError` with the cancellation code.
76///
77/// A genuine runtime failure (bad queue, read error) stays an `Err` on
78/// the surrounding `RedDBResult`.
79#[derive(Debug)]
80pub(crate) enum RedwireWaitOutcome {
81    Delivered(Vec<crate::serde_json::Value>),
82    TimedOut,
83    Cancelled,
84}
85
86/// Slice C of PRD #718 — scope key for the queue wait registry.
87/// Today every connection in the process shares a single namespace;
88/// the helper exists so multi-tenant scoping (e.g. tenant id) can be
89/// threaded through later without touching every call site.
90pub(super) fn queue_wait_scope() -> String {
91    crate::runtime::impl_core::current_tenant().unwrap_or_default()
92}
93
94fn with_redwire_wait_context<T>(
95    auth_identity: &Option<(String, crate::auth::Role)>,
96    tenant: &Option<String>,
97    f: impl FnOnce() -> T,
98) -> T {
99    let previous_auth = current_auth_identity();
100    let previous_tenant = current_tenant();
101    match tenant {
102        Some(t) => set_current_tenant(t.clone()),
103        None => clear_current_tenant(),
104    }
105    match auth_identity {
106        Some((username, role)) => set_current_auth_identity(username.clone(), *role),
107        None => clear_current_auth_identity(),
108    }
109    let result = f();
110    match previous_tenant {
111        Some(t) => set_current_tenant(t),
112        None => clear_current_tenant(),
113    }
114    match previous_auth {
115        Some((username, role)) => set_current_auth_identity(username, role),
116        None => clear_current_auth_identity(),
117    }
118    result
119}
120
121/// Convert a lifecycle `QueueSide` view into the AST flavour we accept
122/// from `QueueCommand` callers. Both enums are isomorphic but live in
123/// different modules.
124fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
125    use crate::storage::query::ast::QueueSide as Ast;
126    match side {
127        Ast::Left => LcQueueSide::Left,
128        Ast::Right => LcQueueSide::Right,
129    }
130}
131
132/// Map a `QueueStoreError` (returned by lifecycle methods) onto the
133/// runtime-facing `RedDBError`.
134fn map_qse(err: QueueStoreError) -> RedDBError {
135    match err {
136        QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
137            "delivery_id '{id}' does not resolve to a live pending delivery"
138        )),
139        QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
140        QueueStoreError::ReplicaImmutable => {
141            RedDBError::Internal("replica QueueStore is immutable".to_string())
142        }
143    }
144}
145
146// ---------------------------------------------------------------------------
147// Outbox metrics (exposed via /metrics)
148// ---------------------------------------------------------------------------
149
150/// Total event push attempts that failed (queue full or other error) and
151/// triggered DLQ routing.
152pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
153
154/// Total events routed to the dead-letter queue.
155pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
156
157/// Total events successfully enqueued to their target queue.
158pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
159
160/// Warn when total estimated outbox payload bytes exceed this value (1 GiB).
161const OUTBOX_WARN_BYTES: u64 = 1 << 30;
162
163/// Route all new events to DLQ when estimated outbox exceeds this value (10 GiB).
164const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
165
166/// Running estimate of bytes pending in event queues (approximate; not decremented on consume).
167static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
168
169const QUEUE_META_COLLECTION: &str = "red_queue_meta";
170const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
171const WORK_DEFAULT_GROUP: &str = "_work_default";
172const FANOUT_GROUP_PREFIX: &str = "_fanout_";
173
174#[derive(Debug, Clone)]
175pub(super) struct QueueRuntimeConfig {
176    pub(super) mode: QueueMode,
177    pub(super) priority: bool,
178    pub(super) max_size: Option<usize>,
179    pub(super) ttl_ms: Option<u64>,
180    pub(super) dlq: Option<String>,
181    pub(super) max_attempts: u32,
182    pub(super) lock_deadline_ms: u64,
183    pub(super) in_flight_cap_per_group: u32,
184    /// Default retry delay (issue #723) applied to NACK-requeued
185    /// messages before they become re-deliverable. `None` keeps the
186    /// pre-#723 immediate-requeue behaviour. Overridden per-failure by
187    /// an authorized `NACK ... WITH DELAY <duration>`.
188    pub(super) retry_delay_ms: Option<u64>,
189}
190
191#[derive(Debug, Clone)]
192struct QueueGroupEntry {
193    entity_id: EntityId,
194    group: String,
195}
196
197#[derive(Debug, Clone)]
198pub(super) struct QueuePendingEntry {
199    pub(super) entity_id: EntityId,
200    group: String,
201    pub(super) message_id: EntityId,
202    consumer: String,
203    pub(super) delivered_at_ns: u64,
204    pub(super) delivery_count: u32,
205}
206
207#[derive(Debug, Clone)]
208pub(super) struct QueueAckEntry {
209    entity_id: EntityId,
210    group: String,
211    pub(super) message_id: EntityId,
212}
213
214#[derive(Debug, Clone)]
215pub(super) struct QueueMessageView {
216    pub(super) id: EntityId,
217    position: u64,
218    priority: i32,
219    pub(super) payload: Value,
220    attempts: u32,
221    pub(super) max_attempts: u32,
222    enqueued_at_ns: u64,
223    /// First-delivery instant for delayed messages (issue #722). `None`
224    /// means immediate availability. Sourced from the
225    /// `_available_at_ns` metadata field, populated on push.
226    pub(super) available_at_ns: Option<u64>,
227}
228
229impl QueueMessageView {
230    /// Whether this message is currently deliverable. Messages whose
231    /// `available_at_ns` lies in the future remain durable and
232    /// inspectable but are filtered out of `QUEUE READ` / `QUEUE POP`
233    /// projections.
234    pub(super) fn is_available_now(&self) -> bool {
235        match self.available_at_ns {
236            Some(at) => at <= now_ns(),
237            None => true,
238        }
239    }
240}
241
242impl RedDBRuntime {
243    /// Slice C of PRD #718 — non-blocking `group_read` plus optional
244    /// `WAIT <duration>` retry. When `wait_ms` is `None` this is the
245    /// pre-slice-C synchronous read. When `Some`, an immediate empty
246    /// projection parks the caller on the shared
247    /// [`crate::runtime::queue_wait_registry::QueueWaitRegistry`] and
248    /// retries on wake until the deadline. Timeout returns an empty
249    /// projection (zero records, no error). Shutdown cancellation
250    /// returns [`QUEUE_READ_WAIT_CANCELLED`].
251    pub(super) fn group_read_with_optional_wait(
252        &self,
253        queue: &str,
254        group: &str,
255        consumer: &str,
256        count: usize,
257        wait_ms: Option<u64>,
258    ) -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
259        let do_read =
260            |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
261                let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
262                lifecycle
263                    .group_read(&txn, queue, group, consumer, count)
264                    .map_err(map_qse)
265            };
266
267        let delivered = do_read(self)?;
268        let Some(wait_ms) = wait_ms else {
269            return Ok(delivered);
270        };
271        if !delivered.is_empty() {
272            return Ok(delivered);
273        }
274        // Empty under WAIT: park on the registry. WAIT 0 collapses to
275        // a single re-probe of the registry's current state — useful
276        // for tests but the timeout path returns immediately.
277        //
278        // Telemetry (slice D / PRD #718 / #729): we record exactly one
279        // `wait_started` increment at entry, and exactly one terminal
280        // outcome increment + histogram observation at exit, for the
281        // (scope, queue) labels. The histogram measures wall-clock
282        // started→resolved across all re-park iterations of this call.
283        let registry = self.queue_wait_registry();
284        let scope = queue_wait_scope();
285        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
286        let telemetry = self.queue_telemetry();
287        telemetry.record_wait_started(&scope, queue);
288        let wait_start = std::time::Instant::now();
289        let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
290            let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
291            telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
292        };
293        loop {
294            // Snapshot BEFORE the re-probe so a notify that fires
295            // between the probe and the park bumps the generation and
296            // wait_until returns Woken without ever blocking.
297            let snapshot = registry.snapshot(&scope, queue);
298            let delivered = do_read(self)?;
299            if !delivered.is_empty() {
300                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
301                return Ok(delivered);
302            }
303            if registry.is_cancelled() {
304                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
305                return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
306            }
307            if std::time::Instant::now() >= deadline {
308                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
309                return Ok(Vec::new());
310            }
311            // Issue #722: a delayed message becomes deliverable when its
312            // `available_at_ns` passes, but the registry only wakes on
313            // producer commits — a quiet queue with a future due-time
314            // would otherwise sit on the condvar until the user budget
315            // expired. Cap the park horizon at the soonest future
316            // `available_at_ns` so the next loop iteration probes the
317            // queue at-or-just-after the message becomes due. A
318            // `Timeout` from the capped park is not the final answer; we
319            // loop and re-probe before deciding the user budget is up.
320            let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
321                Some(at_ns) => {
322                    let now_ns = now_ns();
323                    if at_ns <= now_ns {
324                        // Already due; re-probe immediately.
325                        deadline.min(std::time::Instant::now())
326                    } else {
327                        let wait_ns = at_ns - now_ns;
328                        let due_instant =
329                            std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
330                        deadline.min(due_instant)
331                    }
332                }
333                None => deadline,
334            };
335            match registry.wait_until(&snapshot, park_deadline) {
336                crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
337                crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
338                    // If this was the user-supplied deadline, give up;
339                    // otherwise loop and re-probe (a delayed message may
340                    // have just become due).
341                    if std::time::Instant::now() >= deadline {
342                        observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
343                        return Ok(Vec::new());
344                    }
345                    continue;
346                }
347                crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
348                    observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
349                    return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
350                }
351            }
352        }
353    }
354
355    /// Issue #917 — async live-wait edge used by the RedWire session.
356    ///
357    /// Unlike [`group_read_with_optional_wait`](Self::group_read_with_optional_wait)
358    /// (the synchronous HTTP/condvar caller), this parks on the
359    /// registry's async wake head and never holds a blocking OS thread:
360    /// the awaiting tokio worker is released back to the runtime for the
361    /// wait duration. On every wake it re-probes the *normal* queue
362    /// delivery path (`group_read`), so a delivered message is genuinely
363    /// claimed, not merely observed. Returns the delivered messages
364    /// rendered as JSON values (so the transport edge stays free of
365    /// runtime queue types); [`RedwireWaitOutcome::TimedOut`] means the
366    /// deadline elapsed without a delivery (issue #919 surfaces this as
367    /// a distinct timeout frame rather than an empty push), and a
368    /// cancellation surfaces as [`RedwireWaitOutcome::Cancelled`] — the
369    /// async analogue of the sync path's [`QUEUE_READ_WAIT_CANCELLED`].
370    /// A genuine runtime failure stays an `Err`.
371    pub(crate) async fn redwire_queue_wait_json(
372        &self,
373        queue: &str,
374        group: Option<&str>,
375        consumer: &str,
376        count: usize,
377        wait_ms: u64,
378        auth_identity: Option<(String, crate::auth::Role)>,
379        tenant: Option<String>,
380    ) -> RedDBResult<RedwireWaitOutcome> {
381        let group_owned: RedDBResult<String> =
382            with_redwire_wait_context(&auth_identity, &tenant, || {
383                let expr = crate::storage::query::ast::QueryExpr::QueueCommand(
384                    crate::storage::query::ast::QueueCommand::GroupRead {
385                        queue: queue.to_string(),
386                        group: group.map(str::to_string),
387                        consumer: consumer.to_string(),
388                        count,
389                        wait_ms: Some(wait_ms),
390                    },
391                );
392                self.check_query_privilege(&expr)
393                    .map_err(RedDBError::Query)?;
394                let store = self.inner.db.store();
395                ensure_queue_exists(store.as_ref(), queue)?;
396                let config = load_queue_config(store.as_ref(), queue);
397                resolve_read_group(store.as_ref(), queue, group, consumer, &config)
398            });
399        let group_owned = group_owned?;
400        let group_ref = group_owned.as_str();
401
402        let do_read =
403            |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
404                with_redwire_wait_context(&auth_identity, &tenant, || {
405                    let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
406                    lifecycle
407                        .group_read(&txn, queue, group_ref, consumer, count)
408                        .map_err(map_qse)
409                })
410            };
411
412        let render = |delivered: Vec<crate::runtime::queue_lifecycle::DeliveredMessage>| {
413            RedwireWaitOutcome::Delivered(
414                delivered.into_iter().map(delivered_message_json).collect(),
415            )
416        };
417
418        // Fast path: a message is already deliverable at open time.
419        let delivered = do_read(self)?;
420        if !delivered.is_empty() {
421            return Ok(render(delivered));
422        }
423
424        let registry = self.queue_wait_registry();
425        let scope = with_redwire_wait_context(&auth_identity, &tenant, queue_wait_scope);
426        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
427        let telemetry = self.queue_telemetry();
428        telemetry.record_wait_started(&scope, queue);
429        let wait_start = std::time::Instant::now();
430        tracing::debug!(
431            target: "reddb::redwire::queue_wait",
432            queue,
433            group = group_ref,
434            consumer,
435            count,
436            wait_ms,
437            scope = scope.as_str(),
438            "redwire queue wait parked"
439        );
440        let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
441            let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
442            telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
443            tracing::debug!(
444                target: "reddb::redwire::queue_wait",
445                queue,
446                group = group_ref,
447                consumer,
448                count,
449                wait_ms,
450                scope = scope.as_str(),
451                outcome = outcome.as_str(),
452                duration_ms = elapsed_ms,
453                "redwire queue wait resolved"
454            );
455        };
456        loop {
457            // Register the async waiter (snapshot the generation) BEFORE
458            // the re-probe so a notify landing between probe and park is
459            // observed as a generation move rather than a lost wake.
460            let waiter = registry.async_waiter(&scope, queue);
461            let delivered = do_read(self)?;
462            if !delivered.is_empty() {
463                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
464                return Ok(render(delivered));
465            }
466            if registry.is_cancelled() {
467                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
468                return Ok(RedwireWaitOutcome::Cancelled);
469            }
470            if std::time::Instant::now() >= deadline {
471                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
472                return Ok(RedwireWaitOutcome::TimedOut);
473            }
474            let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
475                Some(at_ns) => {
476                    let now_ns = now_ns();
477                    if at_ns <= now_ns {
478                        deadline.min(std::time::Instant::now())
479                    } else {
480                        let wait_ns = at_ns - now_ns;
481                        let due_instant =
482                            std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
483                        deadline.min(due_instant)
484                    }
485                }
486                None => deadline,
487            };
488            // The async waiter (and its `Arc<Slot>` clone) is a local
489            // dropped on every return below, so an expired or cancelled
490            // wait releases its registry slot reference and frees the
491            // tokio worker the moment this future resolves (AC #4).
492            match registry.wait_until_async(&waiter, park_deadline).await {
493                crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
494                crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
495                    if std::time::Instant::now() >= deadline {
496                        observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
497                        return Ok(RedwireWaitOutcome::TimedOut);
498                    }
499                    continue;
500                }
501                crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
502                    observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
503                    return Ok(RedwireWaitOutcome::Cancelled);
504                }
505            }
506        }
507    }
508
509    /// Reject a live queue-wait open whose requested budget exceeds the
510    /// server's maximum wait cap (issue #919), mirroring the SQL
511    /// `QUEUE READ … WAIT` cap (slice B of PRD #718). Returns the
512    /// operator-actionable message (naming the `red.config` key and the
513    /// active cap) when `wait_ms` is over the cap, or `Ok(())` to
514    /// proceed. The transport calls this *before* spawning the wait
515    /// task, so an over-cap request is refused with an explicit error
516    /// and never parks — not silently shortened (AC #3).
517    pub(crate) fn redwire_queue_wait_cap_check(&self, wait_ms: u64) -> Result<(), String> {
518        let cap = self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
519        if wait_ms > cap {
520            Err(format!(
521                "queue-wait WAIT {wait_ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
522            ))
523        } else {
524            Ok(())
525        }
526    }
527
528    pub(crate) fn enqueue_event_payload(
529        &self,
530        queue: &str,
531        payload: Value,
532    ) -> RedDBResult<EntityId> {
533        let store = self.inner.db.store();
534        // Auto-create the queue if it does not exist yet.
535        if store.get_collection(queue).is_none() {
536            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
537        }
538
539        // Estimate payload bytes for outbox watermark checks.
540        let payload_bytes = estimate_payload_bytes(&payload);
541        let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
542
543        // Hard limit: route directly to DLQ without even trying.
544        if outbox_bytes > OUTBOX_MAX_BYTES {
545            OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
546            EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
547            return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
548        }
549
550        // Soft limit: warn once per crossing.
551        if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
552            tracing::warn!(
553                outbox_bytes,
554                warn_threshold = OUTBOX_WARN_BYTES,
555                "event outbox approaching capacity warning threshold"
556            );
557            crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
558                queue: queue.to_string(),
559                dlq: format!("{queue}_outbox_dlq"),
560                reason: "outbox_warn_bytes_exceeded".to_string(),
561            }
562            .emit_global();
563        }
564
565        let config = load_queue_config(store.as_ref(), queue);
566
567        // If the target queue has a max_size and is full, route to DLQ.
568        if let Some(max_size) = config.max_size {
569            let current_len = load_queue_message_views(store.as_ref(), queue)
570                .unwrap_or_default()
571                .len();
572            if current_len >= max_size {
573                OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
574                EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
575                return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
576            }
577            // Warn at 80% capacity.
578            if current_len * 10 >= max_size * 8 {
579                tracing::warn!(
580                    queue = %queue,
581                    size = current_len,
582                    max = max_size,
583                    "event target queue near capacity"
584                );
585            }
586        }
587
588        let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
589        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
590        Ok(id)
591    }
592
593    /// Route a failed event to `<queue>_outbox_dlq`, auto-creating it if needed.
594    fn route_event_to_outbox_dlq(
595        &self,
596        queue: &str,
597        payload: Value,
598        reason: &str,
599    ) -> RedDBResult<EntityId> {
600        let dlq_name = format!("{queue}_outbox_dlq");
601        EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
602
603        crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
604            queue: queue.to_string(),
605            dlq: dlq_name.clone(),
606            reason: reason.to_string(),
607        }
608        .emit_global();
609
610        let store = self.inner.db.store();
611        if store.get_collection(&dlq_name).is_none() {
612            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
613        }
614        let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
615        let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
616        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
617        Ok(id)
618    }
619
620    /// Low-level event message insert — no size checks, no DLQ routing.
621    fn enqueue_event_payload_raw(
622        &self,
623        store: &UnifiedStore,
624        queue: &str,
625        config: &QueueRuntimeConfig,
626        payload: Value,
627    ) -> RedDBResult<EntityId> {
628        let position = next_queue_position(store, queue, QueueSide::Right)?;
629        let mut entity = UnifiedEntity::new(
630            EntityId::new(0),
631            EntityKind::QueueMessage {
632                queue: queue.to_string(),
633                position,
634            },
635            EntityData::QueueMessage(QueueMessageData {
636                payload,
637                priority: None,
638                enqueued_at_ns: now_ns(),
639                attempts: 0,
640                max_attempts: config.max_attempts,
641                acked: false,
642            }),
643        );
644        if let Some(xid) = self.current_xid() {
645            entity.set_xmin(xid);
646        }
647        let id = store
648            .insert_auto(queue, entity)
649            .map_err(|err| RedDBError::Internal(err.to_string()))?;
650        if let Some(ttl_ms) = config.ttl_ms {
651            store
652                .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
653                .map_err(|err| RedDBError::Internal(err.to_string()))?;
654        }
655        self.invalidate_result_cache_for_table(queue);
656        Ok(id)
657    }
658
659    pub fn execute_create_queue(
660        &self,
661        raw_query: &str,
662        query: &CreateQueueQuery,
663    ) -> RedDBResult<RuntimeQueryResult> {
664        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
665        if query.dlq.as_deref() == Some(query.name.as_str()) {
666            return Err(RedDBError::Query(
667                "dead-letter queue must be different from the source queue".to_string(),
668            ));
669        }
670
671        let store = self.inner.db.store();
672        let exists = store.get_collection(&query.name).is_some();
673        if exists {
674            if query.if_not_exists {
675                return Ok(RuntimeQueryResult::ok_message(
676                    raw_query.to_string(),
677                    &format!("queue '{}' already exists", query.name),
678                    "create",
679                ));
680            }
681            return Err(RedDBError::Query(format!(
682                "queue '{}' already exists",
683                query.name
684            )));
685        }
686
687        store
688            .create_collection(&query.name)
689            .map_err(|err| RedDBError::Internal(err.to_string()))?;
690        if let Some(ttl_ms) = query.ttl_ms {
691            self.inner
692                .db
693                .set_collection_default_ttl_ms(&query.name, ttl_ms);
694        }
695        self.inner
696            .db
697            .save_collection_contract(queue_collection_contract(
698                &query.name,
699                query.priority,
700                query.ttl_ms,
701            ))
702            .map_err(|err| RedDBError::Internal(err.to_string()))?;
703        save_queue_config(
704            store.as_ref(),
705            &query.name,
706            &QueueRuntimeConfig {
707                mode: query.mode,
708                priority: query.priority,
709                max_size: query.max_size,
710                ttl_ms: query.ttl_ms,
711                dlq: query.dlq.clone(),
712                max_attempts: query.max_attempts,
713                lock_deadline_ms: query.lock_deadline_ms,
714                in_flight_cap_per_group: query.in_flight_cap_per_group,
715                retry_delay_ms: query.retry_delay_ms,
716            },
717        )?;
718
719        if let Some(dlq) = &query.dlq {
720            if store.get_collection(dlq).is_none() {
721                store
722                    .create_collection(dlq)
723                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
724                self.inner
725                    .db
726                    .save_collection_contract(queue_collection_contract(dlq, false, None))
727                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
728            }
729        }
730
731        self.invalidate_result_cache();
732        self.inner
733            .db
734            .persist_metadata()
735            .map_err(|err| RedDBError::Internal(err.to_string()))?;
736        // Issue #120 — feed the queue into the schema-vocabulary so
737        // AskPipeline (#121) can resolve queue references. Queues
738        // have an opaque payload column, so we expose `payload` and
739        // (when configured) the DLQ partner as type-tag context.
740        let mut type_tags = Vec::new();
741        if let Some(dlq) = &query.dlq {
742            type_tags.push(format!("dlq:{}", dlq));
743        }
744        self.schema_vocabulary_apply(
745            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
746                collection: query.name.clone(),
747                columns: vec!["payload".to_string()],
748                type_tags,
749                description: None,
750            },
751        );
752
753        let mut msg = format!("queue '{}' created", query.name);
754        msg.push_str(&format!(" (mode={})", query.mode.as_str()));
755        if query.priority {
756            msg.push_str(" (priority)");
757        }
758        if let Some(max_size) = query.max_size {
759            msg.push_str(&format!(" (max_size={max_size})"));
760        }
761        if let Some(ttl_ms) = query.ttl_ms {
762            msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
763        }
764        if let Some(dlq) = &query.dlq {
765            msg.push_str(&format!(
766                " (dlq={dlq}, max_attempts={})",
767                query.max_attempts
768            ));
769        }
770
771        Ok(RuntimeQueryResult::ok_message(
772            raw_query.to_string(),
773            &msg,
774            "create",
775        ))
776    }
777
778    pub fn execute_alter_queue(
779        &self,
780        raw_query: &str,
781        query: &AlterQueueQuery,
782    ) -> RedDBResult<RuntimeQueryResult> {
783        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
784        let store = self.inner.db.store();
785        ensure_queue_exists(store.as_ref(), &query.name)?;
786
787        let mut config = load_queue_config(store.as_ref(), &query.name);
788        let mut summary: Vec<String> = Vec::new();
789
790        if let Some(new_mode) = query.mode {
791            let pending =
792                load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
793            if !pending.is_empty() {
794                tracing::warn!(
795                    queue = %query.name,
796                    pending_count = pending.len(),
797                    new_mode = %new_mode.as_str(),
798                    "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
799                     new reads use {}",
800                    pending.len(),
801                    new_mode.as_str(),
802                );
803            }
804            config.mode = new_mode;
805            summary.push(format!("mode={}", new_mode.as_str()));
806        }
807        if let Some(max_attempts) = query.max_attempts {
808            config.max_attempts = max_attempts;
809            summary.push(format!("max_attempts={max_attempts}"));
810        }
811        if let Some(lock_deadline_ms) = query.lock_deadline_ms {
812            config.lock_deadline_ms = lock_deadline_ms;
813            summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
814        }
815        if let Some(in_flight_cap) = query.in_flight_cap_per_group {
816            config.in_flight_cap_per_group = in_flight_cap;
817            summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
818        }
819        if let Some(dlq) = &query.dlq {
820            if dlq == &query.name {
821                return Err(RedDBError::Query(
822                    "dead-letter queue must be different from the source queue".to_string(),
823                ));
824            }
825            config.dlq = Some(dlq.clone());
826            summary.push(format!("dlq={dlq}"));
827        }
828        if let Some(retry_delay_ms) = query.retry_delay_ms {
829            config.retry_delay_ms = if retry_delay_ms == 0 {
830                None
831            } else {
832                Some(retry_delay_ms)
833            };
834            summary.push(format!("retry_delay_ms={retry_delay_ms}"));
835        }
836
837        save_queue_config(store.as_ref(), &query.name, &config)?;
838
839        self.invalidate_result_cache();
840        self.inner
841            .db
842            .persist_metadata()
843            .map_err(|err| RedDBError::Internal(err.to_string()))?;
844
845        Ok(RuntimeQueryResult::ok_message(
846            raw_query.to_string(),
847            &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
848            "alter",
849        ))
850    }
851
852    pub fn execute_drop_queue(
853        &self,
854        raw_query: &str,
855        query: &DropQueueQuery,
856    ) -> RedDBResult<RuntimeQueryResult> {
857        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
858        let store = self.inner.db.store();
859        if super::impl_ddl::is_system_schema_name(&query.name) {
860            return Err(RedDBError::Query("system schema is read-only".to_string()));
861        }
862        if store.get_collection(&query.name).is_none() {
863            if query.if_exists {
864                return Ok(RuntimeQueryResult::ok_message(
865                    raw_query.to_string(),
866                    &format!("queue '{}' does not exist", query.name),
867                    "drop",
868                ));
869            }
870            return Err(RedDBError::NotFound(format!(
871                "queue '{}' not found",
872                query.name
873            )));
874        }
875        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
876            &query.name,
877            &self.inner.db.catalog_model_snapshot(),
878        )?;
879        crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
880            crate::catalog::CollectionModel::Queue,
881            actual,
882        )?;
883
884        store
885            .drop_collection(&query.name)
886            .map_err(|err| RedDBError::Internal(err.to_string()))?;
887        self.inner.db.clear_collection_default_ttl_ms(&query.name);
888        self.inner
889            .db
890            .remove_collection_contract(&query.name)
891            .map_err(|err| RedDBError::Internal(err.to_string()))?;
892        remove_queue_metadata(store.as_ref(), &query.name);
893        self.invalidate_result_cache();
894        self.inner
895            .db
896            .persist_metadata()
897            .map_err(|err| RedDBError::Internal(err.to_string()))?;
898        // Issue #120 — invalidate the schema-vocabulary entry.
899        self.schema_vocabulary_apply(
900            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
901                collection: query.name.clone(),
902            },
903        );
904
905        Ok(RuntimeQueryResult::ok_message(
906            raw_query.to_string(),
907            &format!("queue '{}' dropped", query.name),
908            "drop",
909        ))
910    }
911
912    pub fn execute_queue_command(
913        &self,
914        raw_query: &str,
915        cmd: &QueueCommand,
916    ) -> RedDBResult<RuntimeQueryResult> {
917        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
918        match cmd {
919            QueueCommand::Push {
920                queue,
921                value,
922                side,
923                priority,
924                available,
925            } => {
926                let store = self.inner.db.store();
927                ensure_queue_exists(store.as_ref(), queue)?;
928                let config = load_queue_config(store.as_ref(), queue);
929                if priority.is_some() && !config.priority {
930                    return Err(RedDBError::Query(format!(
931                        "queue '{}' is not a priority queue",
932                        queue
933                    )));
934                }
935                if let Some(max_size) = config.max_size {
936                    let current_len =
937                        load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
938                            .len();
939                    if current_len >= max_size {
940                        return Err(RedDBError::Query(format!(
941                            "queue '{}' is full (max_size={max_size})",
942                            queue
943                        )));
944                    }
945                }
946
947                let position = next_queue_position(store.as_ref(), queue, *side)?;
948                let mut entity = UnifiedEntity::new(
949                    EntityId::new(0),
950                    EntityKind::QueueMessage {
951                        queue: queue.clone(),
952                        position,
953                    },
954                    EntityData::QueueMessage(QueueMessageData {
955                        payload: value.clone(),
956                        priority: if config.priority { *priority } else { None },
957                        enqueued_at_ns: now_ns(),
958                        attempts: 0,
959                        max_attempts: config.max_attempts,
960                        acked: false,
961                    }),
962                );
963                // Phase 1.1 MVCC universal: stamp xmin so other
964                // connections don't see this message until COMMIT.
965                if let Some(xid) = self.current_xid() {
966                    entity.set_xmin(xid);
967                }
968                let id = store
969                    .insert_auto(queue, entity)
970                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
971                // Resolve per-message availability (issue #722): DELAY is
972                // relative to the push instant, AVAILABLE AT carries an
973                // absolute unix-ms. Both collapse to a unix-ns timestamp
974                // delivery paths compare against. `None` means immediate.
975                let available_at_ns = available.map(|a| match a {
976                    crate::storage::query::ast::QueueAvailability::DelayMs(ms) => {
977                        now_ns().saturating_add(ms.saturating_mul(1_000_000))
978                    }
979                    crate::storage::query::ast::QueueAvailability::AtUnixMs(ms) => {
980                        ms.saturating_mul(1_000_000)
981                    }
982                });
983                if config.ttl_ms.is_some() || available_at_ns.is_some() {
984                    store
985                        .set_metadata(
986                            queue,
987                            id,
988                            queue_message_metadata(config.ttl_ms, available_at_ns),
989                        )
990                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
991                }
992                // Slice C of PRD #718 — wake `QUEUE READ … WAIT` waiters.
993                // Under autocommit this fires immediately; inside a txn
994                // the wake is buffered and replayed on COMMIT (rollback
995                // discards it so rolled-back enqueues do not deliver).
996                self.record_queue_wake(&queue_wait_scope(), queue);
997                self.invalidate_result_cache();
998
999                let mut result = UnifiedResult::with_columns(vec![
1000                    "message_id".into(),
1001                    "side".into(),
1002                    "queue".into(),
1003                ]);
1004                let mut record = UnifiedRecord::new();
1005                record.set("message_id", Value::text(message_id_string(id)));
1006                record.set(
1007                    "side",
1008                    Value::text(match side {
1009                        QueueSide::Left => "left".to_string(),
1010                        QueueSide::Right => "right".to_string(),
1011                    }),
1012                );
1013                record.set("queue", Value::text(queue.clone()));
1014                result.push(record);
1015
1016                Ok(RuntimeQueryResult {
1017                    query: raw_query.to_string(),
1018                    mode: QueryMode::Sql,
1019                    statement: "queue_push",
1020                    engine: "runtime-queue",
1021                    result,
1022                    affected_rows: 1,
1023                    statement_type: "insert",
1024                    bookmark: None,
1025                })
1026            }
1027            QueueCommand::Pop { queue, side, count } => {
1028                let store = self.inner.db.store();
1029                ensure_queue_exists(store.as_ref(), queue)?;
1030                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1031                let popped = lifecycle
1032                    .pop(queue, ast_side_to_lc(*side), *count, &txn)
1033                    .map_err(map_qse)?;
1034
1035                let mut result =
1036                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1037                for (message_id, payload) in &popped {
1038                    let mut record = UnifiedRecord::new();
1039                    record.set(
1040                        "message_id",
1041                        Value::text(message_id_string(EntityId::new(*message_id))),
1042                    );
1043                    record.set("payload", payload.clone());
1044                    result.push(record);
1045                }
1046                let popped_count = popped.len() as u64;
1047                if popped_count > 0 {
1048                    self.invalidate_result_cache();
1049                }
1050
1051                Ok(RuntimeQueryResult {
1052                    query: raw_query.to_string(),
1053                    mode: QueryMode::Sql,
1054                    statement: "queue_pop",
1055                    engine: "runtime-queue",
1056                    result,
1057                    affected_rows: popped_count,
1058                    statement_type: "delete",
1059                    bookmark: None,
1060                })
1061            }
1062            QueueCommand::Peek { queue, count } => {
1063                let store = self.inner.db.store();
1064                ensure_queue_exists(store.as_ref(), queue)?;
1065                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1066                let messages = lifecycle.peek(queue, *count, &txn);
1067
1068                let mut result =
1069                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1070                for message in messages {
1071                    let mut record = UnifiedRecord::new();
1072                    record.set(
1073                        "message_id",
1074                        Value::text(message_id_string(EntityId::new(message.message_id))),
1075                    );
1076                    record.set("payload", message.payload);
1077                    result.push(record);
1078                }
1079
1080                Ok(RuntimeQueryResult {
1081                    query: raw_query.to_string(),
1082                    mode: QueryMode::Sql,
1083                    statement: "queue_peek",
1084                    engine: "runtime-queue",
1085                    result,
1086                    affected_rows: 0,
1087                    statement_type: "select",
1088                    bookmark: None,
1089                })
1090            }
1091            QueueCommand::Len { queue } => {
1092                let store = self.inner.db.store();
1093                ensure_queue_exists(store.as_ref(), queue)?;
1094                let count =
1095                    load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
1096                        as u64;
1097                let mut result = UnifiedResult::with_columns(vec!["len".into()]);
1098                let mut record = UnifiedRecord::new();
1099                record.set("len", Value::UnsignedInteger(count));
1100                result.push(record);
1101
1102                Ok(RuntimeQueryResult {
1103                    query: raw_query.to_string(),
1104                    mode: QueryMode::Sql,
1105                    statement: "queue_len",
1106                    engine: "runtime-queue",
1107                    result,
1108                    affected_rows: 0,
1109                    statement_type: "select",
1110                    bookmark: None,
1111                })
1112            }
1113            QueueCommand::Purge { queue } => {
1114                let store = self.inner.db.store();
1115                ensure_queue_exists(store.as_ref(), queue)?;
1116                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1117                let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
1118                if count > 0 {
1119                    self.invalidate_result_cache();
1120                }
1121
1122                Ok(RuntimeQueryResult::ok_message(
1123                    raw_query.to_string(),
1124                    &format!("{count} messages purged from queue '{queue}'"),
1125                    "delete",
1126                ))
1127            }
1128            QueueCommand::GroupCreate { queue, group } => {
1129                let store = self.inner.db.store();
1130                ensure_queue_exists(store.as_ref(), queue)?;
1131                if queue_group_exists(store.as_ref(), queue, group)? {
1132                    return Ok(RuntimeQueryResult::ok_message(
1133                        raw_query.to_string(),
1134                        &format!(
1135                            "consumer group '{}' already exists on queue '{}'",
1136                            group, queue
1137                        ),
1138                        "create",
1139                    ));
1140                }
1141                save_queue_group(store.as_ref(), queue, group)?;
1142                self.invalidate_result_cache();
1143
1144                Ok(RuntimeQueryResult::ok_message(
1145                    raw_query.to_string(),
1146                    &format!("consumer group '{}' created on queue '{}'", group, queue),
1147                    "create",
1148                ))
1149            }
1150            QueueCommand::GroupRead {
1151                queue,
1152                group,
1153                consumer,
1154                count,
1155                wait_ms,
1156            } => {
1157                let store = self.inner.db.store();
1158                ensure_queue_exists(store.as_ref(), queue)?;
1159                // Slice B of PRD #718: reject `WAIT` issued inside an
1160                // explicit transaction, and reject `WAIT > cap` before
1161                // any waiter is registered. Both checks fire before the
1162                // lifecycle is touched so a refused statement leaves
1163                // no side effects (no group auto-create, no parking).
1164                if let Some(ms) = *wait_ms {
1165                    if self.current_xid().is_some() {
1166                        return Err(RedDBError::Query(
1167                            "QUEUE READ … WAIT is autocommit-only: refusing to park inside an explicit transaction (BEGIN/COMMIT)"
1168                                .to_string(),
1169                        ));
1170                    }
1171                    let cap =
1172                        self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
1173                    if ms > cap {
1174                        return Err(RedDBError::Query(format!(
1175                            "QUEUE READ … WAIT {ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
1176                        )));
1177                    }
1178                }
1179                // Resolve the consumer group up-front so the lifecycle
1180                // sees the same auto-created `_work_default` / fanout
1181                // group the legacy `read_messages` would have minted.
1182                let config = load_queue_config(store.as_ref(), queue);
1183                let group_owned =
1184                    resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
1185                let group_ref = group_owned.as_str();
1186                let delivered = self
1187                    .group_read_with_optional_wait(queue, group_ref, consumer, *count, *wait_ms)?;
1188
1189                // Issue #742 — record consumer presence on every read,
1190                // including empty returns. Heartbeat-driven aliveness
1191                // is the contract; pending deliveries don't define it.
1192                {
1193                    let lease_count = u32::try_from(delivered.len()).unwrap_or(u32::MAX);
1194                    let now_ns = std::time::SystemTime::now()
1195                        .duration_since(std::time::UNIX_EPOCH)
1196                        .map(|d| d.as_nanos() as u64)
1197                        .unwrap_or(0);
1198                    self.queue_presence().heartbeat(
1199                        queue,
1200                        group_ref,
1201                        consumer,
1202                        lease_count,
1203                        now_ns,
1204                    );
1205                }
1206
1207                let mut result = UnifiedResult::with_columns(vec![
1208                    "message_id".into(),
1209                    "payload".into(),
1210                    "consumer".into(),
1211                    "delivery_count".into(),
1212                    "attempts".into(),
1213                ]);
1214
1215                for message in delivered {
1216                    let mut record = UnifiedRecord::new();
1217                    record.set(
1218                        "message_id",
1219                        Value::text(message_id_string(EntityId::new(message.message_id))),
1220                    );
1221                    record.set("payload", message.payload);
1222                    record.set("consumer", Value::text(message.consumer));
1223                    record.set(
1224                        "delivery_count",
1225                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1226                    );
1227                    record.set(
1228                        "attempts",
1229                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1230                    );
1231                    result.push(record);
1232                }
1233                if !result.records.is_empty() {
1234                    self.invalidate_result_cache();
1235                }
1236
1237                Ok(RuntimeQueryResult {
1238                    query: raw_query.to_string(),
1239                    mode: QueryMode::Sql,
1240                    statement: "queue_group_read",
1241                    engine: "runtime-queue",
1242                    result,
1243                    affected_rows: 0,
1244                    statement_type: "select",
1245                    bookmark: None,
1246                })
1247            }
1248            QueueCommand::Pending { queue, group } => {
1249                let store = self.inner.db.store();
1250                ensure_queue_exists(store.as_ref(), queue)?;
1251                require_queue_group(store.as_ref(), queue, group)?;
1252                let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
1253                pending.sort_by_key(|entry| entry.delivered_at_ns);
1254                let current_time_ns = now_ns();
1255
1256                let mut result = UnifiedResult::with_columns(vec![
1257                    "message_id".into(),
1258                    "consumer".into(),
1259                    "delivered_at_ns".into(),
1260                    "delivery_count".into(),
1261                    "idle_ms".into(),
1262                ]);
1263                for entry in pending {
1264                    let mut record = UnifiedRecord::new();
1265                    record.set(
1266                        "message_id",
1267                        Value::text(message_id_string(entry.message_id)),
1268                    );
1269                    record.set("consumer", Value::text(entry.consumer));
1270                    record.set(
1271                        "delivered_at_ns",
1272                        Value::UnsignedInteger(entry.delivered_at_ns),
1273                    );
1274                    record.set(
1275                        "delivery_count",
1276                        Value::UnsignedInteger(u64::from(entry.delivery_count)),
1277                    );
1278                    record.set(
1279                        "idle_ms",
1280                        Value::UnsignedInteger(
1281                            current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
1282                        ),
1283                    );
1284                    result.push(record);
1285                }
1286
1287                Ok(RuntimeQueryResult {
1288                    query: raw_query.to_string(),
1289                    mode: QueryMode::Sql,
1290                    statement: "queue_pending",
1291                    engine: "runtime-queue",
1292                    result,
1293                    affected_rows: 0,
1294                    statement_type: "select",
1295                    bookmark: None,
1296                })
1297            }
1298            QueueCommand::Claim {
1299                queue,
1300                group,
1301                consumer,
1302                min_idle_ms,
1303            } => {
1304                let store = self.inner.db.store();
1305                ensure_queue_exists(store.as_ref(), queue)?;
1306                require_queue_group(store.as_ref(), queue, group)?;
1307                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1308                let delivered = lifecycle
1309                    .claim_delivering(queue, consumer, *min_idle_ms, &txn)
1310                    .map_err(map_qse)?;
1311
1312                let mut result = UnifiedResult::with_columns(vec![
1313                    "message_id".into(),
1314                    "payload".into(),
1315                    "consumer".into(),
1316                    "delivery_count".into(),
1317                ]);
1318
1319                for message in delivered {
1320                    let mut record = UnifiedRecord::new();
1321                    record.set(
1322                        "message_id",
1323                        Value::text(message_id_string(EntityId::new(message.message_id))),
1324                    );
1325                    record.set("payload", message.payload);
1326                    record.set("consumer", Value::text(message.consumer));
1327                    record.set(
1328                        "delivery_count",
1329                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1330                    );
1331                    result.push(record);
1332                }
1333                if !result.records.is_empty() {
1334                    self.invalidate_result_cache();
1335                }
1336                let affected_rows = result.records.len() as u64;
1337
1338                Ok(RuntimeQueryResult {
1339                    query: raw_query.to_string(),
1340                    mode: QueryMode::Sql,
1341                    statement: "queue_claim",
1342                    engine: "runtime-queue",
1343                    result,
1344                    affected_rows,
1345                    statement_type: "update",
1346                    bookmark: None,
1347                })
1348            }
1349            QueueCommand::Ack {
1350                queue,
1351                group,
1352                message_id,
1353                delivery_id,
1354            } => {
1355                let store = self.inner.db.store();
1356                ensure_queue_exists(store.as_ref(), queue)?;
1357                let (group_owned, message_entity) = resolve_ack_nack_handle(
1358                    store.as_ref(),
1359                    queue,
1360                    group,
1361                    message_id,
1362                    delivery_id.as_deref(),
1363                )?;
1364                let group_ref = group_owned.as_str();
1365                require_queue_group(store.as_ref(), queue, group_ref)?;
1366                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1367                let did = match delivery_id.as_deref() {
1368                    Some(d) => d.to_string(),
1369                    None => ps
1370                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
1371                        .ok_or_else(|| {
1372                            RedDBError::NotFound(format!(
1373                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
1374                                message_entity.raw(),
1375                                queue,
1376                                group_ref
1377                            ))
1378                        })?,
1379                };
1380                lifecycle.ack(&txn, &did).map_err(map_qse)?;
1381                self.invalidate_result_cache();
1382
1383                Ok(RuntimeQueryResult::ok_message(
1384                    raw_query.to_string(),
1385                    "message acknowledged",
1386                    "update",
1387                ))
1388            }
1389            QueueCommand::Nack {
1390                queue,
1391                group,
1392                message_id,
1393                delivery_id,
1394                delay_ms,
1395            } => {
1396                let store = self.inner.db.store();
1397                ensure_queue_exists(store.as_ref(), queue)?;
1398                let config = load_queue_config(store.as_ref(), queue);
1399                // Issue #723: a per-failure DELAY override is a write
1400                // operation that re-shapes retry behavior; readers must
1401                // not be able to silently re-schedule another worker's
1402                // work. Embedded callers (no auth identity attached)
1403                // are trusted and bypass the check.
1404                if delay_ms.is_some() {
1405                    if let Some((_, role)) = current_auth_identity() {
1406                        if !role.can_write() {
1407                            return Err(RedDBError::InvalidOperation(format!(
1408                                "role '{role}' is not authorized to override NACK retry delay on queue '{queue}'"
1409                            )));
1410                        }
1411                    }
1412                }
1413                let (group_owned, message_entity) = resolve_ack_nack_handle(
1414                    store.as_ref(),
1415                    queue,
1416                    group,
1417                    message_id,
1418                    delivery_id.as_deref(),
1419                )?;
1420                let group_ref = group_owned.as_str();
1421                require_queue_group(store.as_ref(), queue, group_ref)?;
1422                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1423                let did = match delivery_id.as_deref() {
1424                    Some(d) => d.to_string(),
1425                    None => ps
1426                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
1427                        .ok_or_else(|| {
1428                            RedDBError::NotFound(format!(
1429                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
1430                                message_entity.raw(),
1431                                queue,
1432                                group_ref
1433                            ))
1434                        })?,
1435                };
1436                // Resolve the effective retry delay: per-failure
1437                // override wins, then queue default, then zero
1438                // (immediate requeue — pre-#723 behavior).
1439                let effective_delay_ms = delay_ms.or(config.retry_delay_ms).unwrap_or(0);
1440                let pending_attempt = ps.read_pending_attempt(&did).map_err(map_qse)?;
1441                let nack_attempts = pending_attempt.attempts.saturating_add(1);
1442                let outcome = lifecycle.nack(&txn, &did).map_err(map_qse)?;
1443                // Apply delay only when the message was actually
1444                // requeued — DLQ promotion / drop terminate the
1445                // retry cycle and a delay would be meaningless.
1446                if matches!(outcome, RetirementOutcome::Requeued) && effective_delay_ms > 0 {
1447                    let at_ns =
1448                        now_ns().saturating_add(effective_delay_ms.saturating_mul(1_000_000));
1449                    set_message_available_at_ns(
1450                        store.as_ref(),
1451                        queue,
1452                        message_entity,
1453                        Some(at_ns),
1454                        config.ttl_ms,
1455                    )?;
1456                }
1457                // Issue #723: routine retries do not flood audit
1458                // channels (telemetry already covers them via
1459                // `queue_nacked_total{outcome=...}`). Significant
1460                // overrides — large delays, destination changes,
1461                // drops — are audited so operators see the events
1462                // that re-shape operational risk.
1463                self.maybe_emit_nack_audit(
1464                    queue,
1465                    group_ref,
1466                    &did,
1467                    *delay_ms,
1468                    config.retry_delay_ms,
1469                    &outcome,
1470                );
1471                let outcome_label = match &outcome {
1472                    RetirementOutcome::Requeued => NackOutcomeLabel::Retry,
1473                    RetirementOutcome::MovedToDlq(_) => NackOutcomeLabel::Dlq,
1474                    RetirementOutcome::Dropped => NackOutcomeLabel::Drop,
1475                };
1476                self.queue_telemetry().record_nacked(
1477                    queue,
1478                    group_ref,
1479                    config.mode.as_str(),
1480                    outcome_label,
1481                );
1482                if let RetirementOutcome::MovedToDlq(dlq) = &outcome {
1483                    OperatorEvent::QueueDlqPromoted {
1484                        queue: queue.to_string(),
1485                        group: group_ref.to_string(),
1486                        dlq: dlq.clone(),
1487                        message_id: pending_attempt.message_id,
1488                        attempts: nack_attempts,
1489                        reason: format!("lifecycle_nack:{did}"),
1490                    }
1491                    .emit(self.audit_log());
1492                }
1493                let message = match outcome {
1494                    RetirementOutcome::Requeued => {
1495                        if effective_delay_ms > 0 {
1496                            format!("message requeued (delay={effective_delay_ms}ms)")
1497                        } else {
1498                            "message requeued".to_string()
1499                        }
1500                    }
1501                    RetirementOutcome::MovedToDlq(dlq) => {
1502                        format!("message moved to dead-letter queue '{}'", dlq)
1503                    }
1504                    RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
1505                };
1506                self.invalidate_result_cache();
1507
1508                Ok(RuntimeQueryResult::ok_message(
1509                    raw_query.to_string(),
1510                    &message,
1511                    "update",
1512                ))
1513            }
1514            QueueCommand::Move {
1515                source,
1516                destination,
1517                filter,
1518                limit,
1519            } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
1520        }
1521    }
1522
1523    pub fn execute_queue_select(
1524        &self,
1525        raw_query: &str,
1526        query: &QueueSelectQuery,
1527    ) -> RedDBResult<RuntimeQueryResult> {
1528        let store = self.inner.db.store();
1529        ensure_queue_exists(store.as_ref(), &query.queue)?;
1530        let config = load_queue_config(store.as_ref(), &query.queue);
1531        let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
1532        let columns = if query.columns.is_empty() {
1533            queue_projection_default_columns()
1534        } else {
1535            query.columns.clone()
1536        };
1537
1538        let mut messages =
1539            load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1540        sort_queue_messages(&mut messages, &config, QueueSide::Left);
1541
1542        let mut result = UnifiedResult::with_columns(columns.clone());
1543        for message in messages {
1544            if query
1545                .filter
1546                .as_ref()
1547                .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1548            {
1549                continue;
1550            }
1551            let record = queue_projection_record(&columns, &message, dlq)?;
1552            result.push(record);
1553            if query
1554                .limit
1555                .is_some_and(|limit| result.records.len() >= limit as usize)
1556            {
1557                break;
1558            }
1559        }
1560
1561        Ok(RuntimeQueryResult {
1562            query: raw_query.to_string(),
1563            mode: QueryMode::Sql,
1564            statement: "queue_select",
1565            engine: "runtime-queue",
1566            result,
1567            affected_rows: 0,
1568            statement_type: "select",
1569            bookmark: None,
1570        })
1571    }
1572
1573    fn execute_queue_move(
1574        &self,
1575        raw_query: &str,
1576        source: &str,
1577        destination: &str,
1578        filter: Option<&Filter>,
1579        limit: usize,
1580    ) -> RedDBResult<RuntimeQueryResult> {
1581        if source == destination {
1582            return Err(RedDBError::Query(
1583                "QUEUE MOVE source and destination must be different".to_string(),
1584            ));
1585        }
1586        let store = self.inner.db.store();
1587        ensure_queue_exists(store.as_ref(), source)?;
1588        ensure_queue_exists(store.as_ref(), destination)?;
1589        let source_config = load_queue_config(store.as_ref(), source);
1590        let destination_config = load_queue_config(store.as_ref(), destination);
1591        let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1592
1593        let mut messages =
1594            load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1595        sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1596        let selected = messages
1597            .into_iter()
1598            .filter(|message| {
1599                filter
1600                    .map(|f| queue_message_matches_filter(message, source_dlq, f))
1601                    .unwrap_or(true)
1602            })
1603            .take(limit)
1604            .collect::<Vec<_>>();
1605
1606        if let Some(max_size) = destination_config.max_size {
1607            let current_len =
1608                load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1609                    .len();
1610            if current_len + selected.len() > max_size {
1611                return Err(RedDBError::Query(format!(
1612                    "queue '{}' is full (max_size={max_size})",
1613                    destination
1614                )));
1615            }
1616        }
1617
1618        for message in &selected {
1619            let lock = queue_message_lock_handle(self, source, message.id);
1620            let Some(_guard) = lock.try_lock() else {
1621                return Err(RedDBError::Query(format!(
1622                    "message '{}' is locked on queue '{}'",
1623                    message.id.raw(),
1624                    source
1625                )));
1626            };
1627            if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1628                return Err(RedDBError::Query(format!(
1629                    "message '{}' is no longer available on queue '{}'",
1630                    message.id.raw(),
1631                    source
1632                )));
1633            }
1634        }
1635
1636        let mut inserted = Vec::new();
1637        for message in &selected {
1638            match insert_moved_queue_message(
1639                store.as_ref(),
1640                destination,
1641                &destination_config,
1642                message,
1643            ) {
1644                Ok(id) => inserted.push(id),
1645                Err(err) => {
1646                    for id in inserted {
1647                        let _ = store.delete(destination, id);
1648                    }
1649                    return Err(err);
1650                }
1651            }
1652        }
1653
1654        let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1655        for message in &selected {
1656            move_lifecycle
1657                .delete_with_state(source, message.id.raw(), &move_txn)
1658                .map_err(map_qse)?;
1659        }
1660        if !selected.is_empty() {
1661            self.invalidate_result_cache();
1662        }
1663
1664        let selected_count = selected.len() as u64;
1665        self.audit_log().record_event(
1666            AuditEvent::builder("queue/move")
1667                .source(AuditAuthSource::System)
1668                .outcome(Outcome::Success)
1669                .resource(format!("queue:{source}->{destination}"))
1670                .fields([
1671                    AuditFieldEscaper::field("source", source),
1672                    AuditFieldEscaper::field("destination", destination),
1673                    AuditFieldEscaper::field("selected", selected_count),
1674                    AuditFieldEscaper::field("committed", selected_count),
1675                ])
1676                .build(),
1677        );
1678
1679        let mut result = UnifiedResult::with_columns(vec![
1680            "source".into(),
1681            "destination".into(),
1682            "selected".into(),
1683            "committed".into(),
1684        ]);
1685        let mut record = UnifiedRecord::new();
1686        record.set("source", Value::text(source.to_string()));
1687        record.set("destination", Value::text(destination.to_string()));
1688        record.set("selected", Value::UnsignedInteger(selected_count));
1689        record.set("committed", Value::UnsignedInteger(selected_count));
1690        result.push(record);
1691
1692        Ok(RuntimeQueryResult {
1693            query: raw_query.to_string(),
1694            mode: QueryMode::Sql,
1695            statement: "queue_move",
1696            engine: "runtime-queue",
1697            result,
1698            affected_rows: selected_count,
1699            statement_type: "update",
1700            bookmark: None,
1701        })
1702    }
1703
1704    /// Issue #723: routine retries are observable through metrics
1705    /// (`queue_nacked_total{outcome=...}`) so this is the audit
1706    /// shoulder for the *non-routine* cases: explicit NACK delay
1707    /// overrides whose magnitude or destination-changing impact would
1708    /// be invisible in metrics alone. Specifically:
1709    ///
1710    /// - Explicit override ≥ 60s (a worker decided to defer well past
1711    ///   the queue's default cadence — operators care).
1712    /// - Override that lands on a DLQ promotion or drop (destination
1713    ///   changed; the override may have influenced retire-vs-requeue
1714    ///   accounting on the caller's side and the audit trail needs to
1715    ///   show who asked).
1716    ///
1717    /// Calls with no override are intentionally silent here.
1718    fn maybe_emit_nack_audit(
1719        &self,
1720        queue: &str,
1721        group: &str,
1722        delivery_id: &str,
1723        override_ms: Option<u64>,
1724        default_ms: Option<u64>,
1725        outcome: &RetirementOutcome,
1726    ) {
1727        let Some(override_ms) = override_ms else {
1728            return;
1729        };
1730        let outcome_label = match outcome {
1731            RetirementOutcome::Requeued => "requeued",
1732            RetirementOutcome::MovedToDlq(_) => "dlq",
1733            RetirementOutcome::Dropped => "dropped",
1734        };
1735        const SIGNIFICANT_DELAY_MS: u64 = 60_000;
1736        let destination_changed = !matches!(outcome, RetirementOutcome::Requeued);
1737        if override_ms < SIGNIFICANT_DELAY_MS && !destination_changed {
1738            return;
1739        }
1740        self.audit_log().record_event(
1741            AuditEvent::builder("queue/nack/override")
1742                .source(AuditAuthSource::System)
1743                .outcome(Outcome::Success)
1744                .resource(format!("queue:{queue}"))
1745                .fields([
1746                    AuditFieldEscaper::field("queue", queue),
1747                    AuditFieldEscaper::field("group", group),
1748                    AuditFieldEscaper::field("delivery_id", delivery_id),
1749                    AuditFieldEscaper::field("override_delay_ms", override_ms),
1750                    AuditFieldEscaper::field("default_delay_ms", default_ms.unwrap_or(0)),
1751                    AuditFieldEscaper::field("outcome", outcome_label),
1752                ])
1753                .build(),
1754        );
1755    }
1756}
1757
1758fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1759    if store.get_collection(queue).is_some() {
1760        Ok(())
1761    } else {
1762        Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1763    }
1764}
1765
1766pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1767    let default = QueueRuntimeConfig {
1768        mode: QueueMode::Work,
1769        priority: false,
1770        max_size: None,
1771        ttl_ms: None,
1772        dlq: None,
1773        max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1774        lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1775        in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1776        retry_delay_ms: None,
1777    };
1778
1779    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1780        return default;
1781    };
1782    manager
1783        .query_all(|entity| {
1784            entity.data.as_row().is_some_and(|row| {
1785                row_text(row, "kind").as_deref() == Some("queue_config")
1786                    && row_text(row, "queue").as_deref() == Some(queue)
1787            })
1788        })
1789        .into_iter()
1790        .find_map(|entity| {
1791            let row = entity.data.as_row()?;
1792            Some(QueueRuntimeConfig {
1793                mode: row_text(row, "mode")
1794                    .as_deref()
1795                    .and_then(QueueMode::parse)
1796                    .unwrap_or_default(),
1797                priority: row_bool(row, "priority").unwrap_or(false),
1798                max_size: row_u64(row, "max_size").map(|value| value as usize),
1799                ttl_ms: row_u64(row, "ttl_ms"),
1800                dlq: row_text(row, "dlq"),
1801                max_attempts: row_u64(row, "max_attempts")
1802                    .map(|value| value as u32)
1803                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1804                lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1805                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1806                in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1807                    .map(|value| value as u32)
1808                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1809                retry_delay_ms: row_u64(row, "retry_delay_ms").filter(|v| *v > 0),
1810            })
1811        })
1812        .unwrap_or(default)
1813}
1814
1815pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1816    load_queue_config(store, queue).mode.as_str()
1817}
1818
1819fn save_queue_config(
1820    store: &UnifiedStore,
1821    queue: &str,
1822    config: &QueueRuntimeConfig,
1823) -> RedDBResult<()> {
1824    remove_meta_rows(store, |row| {
1825        row_text(row, "kind").as_deref() == Some("queue_config")
1826            && row_text(row, "queue").as_deref() == Some(queue)
1827    });
1828
1829    let mut fields = HashMap::new();
1830    fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1831    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1832    fields.insert(
1833        "mode".to_string(),
1834        Value::text(config.mode.as_str().to_string()),
1835    );
1836    fields.insert("priority".to_string(), Value::Boolean(config.priority));
1837    fields.insert(
1838        "max_size".to_string(),
1839        config
1840            .max_size
1841            .map(|value| Value::UnsignedInteger(value as u64))
1842            .unwrap_or(Value::Null),
1843    );
1844    fields.insert(
1845        "ttl_ms".to_string(),
1846        config
1847            .ttl_ms
1848            .map(Value::UnsignedInteger)
1849            .unwrap_or(Value::Null),
1850    );
1851    fields.insert(
1852        "dlq".to_string(),
1853        config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1854    );
1855    fields.insert(
1856        "max_attempts".to_string(),
1857        Value::UnsignedInteger(u64::from(config.max_attempts)),
1858    );
1859    fields.insert(
1860        "lock_deadline_ms".to_string(),
1861        Value::UnsignedInteger(config.lock_deadline_ms),
1862    );
1863    fields.insert(
1864        "in_flight_cap_per_group".to_string(),
1865        Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1866    );
1867    fields.insert(
1868        "retry_delay_ms".to_string(),
1869        config
1870            .retry_delay_ms
1871            .map(Value::UnsignedInteger)
1872            .unwrap_or(Value::Null),
1873    );
1874    insert_meta_row(store, fields)
1875}
1876
1877fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1878    remove_meta_rows(store, |row| {
1879        row_text(row, "queue").as_deref() == Some(queue)
1880    });
1881}
1882
1883fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1884    Ok(load_queue_groups(store, queue)?
1885        .into_iter()
1886        .any(|entry| entry.group == group))
1887}
1888
1889pub(super) fn require_queue_group(
1890    store: &UnifiedStore,
1891    queue: &str,
1892    group: &str,
1893) -> RedDBResult<()> {
1894    if queue_group_exists(store, queue, group)? {
1895        Ok(())
1896    } else {
1897        Err(RedDBError::NotFound(format!(
1898            "consumer group '{}' not found on queue '{}'",
1899            group, queue
1900        )))
1901    }
1902}
1903
1904pub(super) fn resolve_read_group(
1905    store: &UnifiedStore,
1906    queue: &str,
1907    group: Option<&str>,
1908    consumer: &str,
1909    config: &QueueRuntimeConfig,
1910) -> RedDBResult<String> {
1911    if let Some(group) = group {
1912        require_queue_group(store, queue, group)?;
1913        return Ok(group.to_string());
1914    }
1915
1916    match config.mode {
1917        QueueMode::Work => {
1918            if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1919                save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1920            }
1921            Ok(WORK_DEFAULT_GROUP.to_string())
1922        }
1923        QueueMode::Fanout => {
1924            let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1925            if !queue_group_exists(store, queue, &fanout_group)? {
1926                save_queue_group(store, queue, &fanout_group)?;
1927            }
1928            Ok(fanout_group)
1929        }
1930    }
1931}
1932
1933fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1934    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1935        return Ok(Vec::new());
1936    };
1937    Ok(manager
1938        .query_all(|entity| {
1939            entity.data.as_row().is_some_and(|row| {
1940                row_text(row, "kind").as_deref() == Some("queue_group")
1941                    && row_text(row, "queue").as_deref() == Some(queue)
1942            })
1943        })
1944        .into_iter()
1945        .filter_map(|entity| {
1946            let row = entity.data.as_row()?;
1947            Some(QueueGroupEntry {
1948                entity_id: entity.id,
1949                group: row_text(row, "group")?,
1950            })
1951        })
1952        .collect())
1953}
1954
1955fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1956    let mut fields = HashMap::new();
1957    fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1958    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1959    fields.insert("group".to_string(), Value::text(group.to_string()));
1960    fields.insert(
1961        "created_at_ns".to_string(),
1962        Value::UnsignedInteger(now_ns()),
1963    );
1964    insert_meta_row(store, fields)
1965}
1966
1967pub(super) fn load_pending_entries(
1968    store: &UnifiedStore,
1969    queue: &str,
1970    group: Option<&str>,
1971    message_id: Option<EntityId>,
1972) -> RedDBResult<Vec<QueuePendingEntry>> {
1973    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1974        return Ok(Vec::new());
1975    };
1976    let lock_deadline_ns = load_queue_config(store, queue)
1977        .lock_deadline_ms
1978        .saturating_mul(1_000_000);
1979    let attempts_by_key: HashMap<(String, String, u64), u64> = manager
1980        .query_all(|entity| {
1981            entity.data.as_row().is_some_and(|row| {
1982                row_text(row, "kind").as_deref() == Some("queue_attempts_lc")
1983                    && row_text(row, "queue").as_deref() == Some(queue)
1984            })
1985        })
1986        .into_iter()
1987        .filter_map(|entity| {
1988            let row = entity.data.as_row()?;
1989            Some((
1990                (
1991                    row_text(row, "queue")?,
1992                    row_text(row, "group")?,
1993                    row_u64(row, "message_id")?,
1994                ),
1995                row_u64(row, "attempts").unwrap_or(1),
1996            ))
1997        })
1998        .collect();
1999    Ok(manager
2000        .query_all(|entity| {
2001            entity.data.as_row().is_some_and(|row| {
2002                matches!(
2003                    row_text(row, "kind").as_deref(),
2004                    Some("queue_pending") | Some("queue_pending_lc")
2005                ) && row_text(row, "queue").as_deref() == Some(queue)
2006                    && group
2007                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2008                        .unwrap_or(true)
2009                    && message_id
2010                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2011                        .unwrap_or(true)
2012            })
2013        })
2014        .into_iter()
2015        .filter_map(|entity| {
2016            let row = entity.data.as_row()?;
2017            let group = row_text(row, "group")?;
2018            let message_id = row_u64(row, "message_id")?;
2019            let kind = row_text(row, "kind")?;
2020            let delivered_at_ns = if kind == "queue_pending_lc" {
2021                row_u64(row, "lock_deadline_ns")
2022                    .unwrap_or(0)
2023                    .saturating_sub(lock_deadline_ns)
2024            } else {
2025                row_u64(row, "delivered_at_ns")?
2026            };
2027            let delivery_count = if kind == "queue_pending_lc" {
2028                attempts_by_key
2029                    .get(&(queue.to_string(), group.clone(), message_id))
2030                    .copied()
2031                    .unwrap_or(1)
2032            } else {
2033                row_u64(row, "delivery_count").unwrap_or(1)
2034            };
2035            Some(QueuePendingEntry {
2036                entity_id: entity.id,
2037                group,
2038                message_id: EntityId::new(message_id),
2039                consumer: row_text(row, "consumer").unwrap_or_default(),
2040                delivered_at_ns,
2041                delivery_count: delivery_count as u32,
2042            })
2043        })
2044        .collect())
2045}
2046
2047pub(super) fn save_queue_pending(
2048    store: &UnifiedStore,
2049    queue: &str,
2050    group: &str,
2051    message_id: EntityId,
2052    consumer: &str,
2053    delivered_at_ns: u64,
2054    delivery_count: u32,
2055) -> RedDBResult<()> {
2056    remove_meta_rows(store, |row| {
2057        row_text(row, "kind").as_deref() == Some("queue_pending")
2058            && row_text(row, "queue").as_deref() == Some(queue)
2059            && row_text(row, "group").as_deref() == Some(group)
2060            && row_u64(row, "message_id") == Some(message_id.raw())
2061    });
2062
2063    let mut fields = HashMap::new();
2064    fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
2065    fields.insert("queue".to_string(), Value::text(queue.to_string()));
2066    fields.insert("group".to_string(), Value::text(group.to_string()));
2067    fields.insert(
2068        "message_id".to_string(),
2069        Value::UnsignedInteger(message_id.raw()),
2070    );
2071    fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
2072    fields.insert(
2073        "delivered_at_ns".to_string(),
2074        Value::UnsignedInteger(delivered_at_ns),
2075    );
2076    fields.insert(
2077        "delivery_count".to_string(),
2078        Value::UnsignedInteger(u64::from(delivery_count)),
2079    );
2080    insert_meta_row(store, fields)
2081}
2082
2083pub(super) fn require_pending_entry(
2084    store: &UnifiedStore,
2085    queue: &str,
2086    group: &str,
2087    message_id: EntityId,
2088) -> RedDBResult<QueuePendingEntry> {
2089    load_pending_entries(store, queue, Some(group), Some(message_id))?
2090        .into_iter()
2091        .next()
2092        .ok_or_else(|| {
2093            RedDBError::NotFound(format!(
2094                "message '{}' is not pending in group '{}' on queue '{}'",
2095                message_id.raw(),
2096                group,
2097                queue
2098            ))
2099        })
2100}
2101
2102pub(super) fn load_ack_entries(
2103    store: &UnifiedStore,
2104    queue: &str,
2105    group: Option<&str>,
2106    message_id: Option<EntityId>,
2107) -> RedDBResult<Vec<QueueAckEntry>> {
2108    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2109        return Ok(Vec::new());
2110    };
2111    Ok(manager
2112        .query_all(|entity| {
2113            entity.data.as_row().is_some_and(|row| {
2114                row_text(row, "kind").as_deref() == Some("queue_ack")
2115                    && row_text(row, "queue").as_deref() == Some(queue)
2116                    && group
2117                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2118                        .unwrap_or(true)
2119                    && message_id
2120                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2121                        .unwrap_or(true)
2122            })
2123        })
2124        .into_iter()
2125        .filter_map(|entity| {
2126            let row = entity.data.as_row()?;
2127            Some(QueueAckEntry {
2128                entity_id: entity.id,
2129                group: row_text(row, "group")?,
2130                message_id: EntityId::new(row_u64(row, "message_id")?),
2131            })
2132        })
2133        .collect())
2134}
2135
2136pub(super) fn save_queue_ack(
2137    store: &UnifiedStore,
2138    queue: &str,
2139    group: &str,
2140    message_id: EntityId,
2141) -> RedDBResult<()> {
2142    let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
2143    if !existing.is_empty() {
2144        return Ok(());
2145    }
2146
2147    let mut fields = HashMap::new();
2148    fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
2149    fields.insert("queue".to_string(), Value::text(queue.to_string()));
2150    fields.insert("group".to_string(), Value::text(group.to_string()));
2151    fields.insert(
2152        "message_id".to_string(),
2153        Value::UnsignedInteger(message_id.raw()),
2154    );
2155    fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
2156    insert_meta_row(store, fields)
2157}
2158
2159pub(super) fn queue_message_completed_for_all_groups(
2160    store: &UnifiedStore,
2161    queue: &str,
2162    message_id: EntityId,
2163) -> RedDBResult<bool> {
2164    let groups = load_queue_groups(store, queue)?;
2165    let pending = load_pending_entries(store, queue, None, Some(message_id))?;
2166    if !pending.is_empty() {
2167        return Ok(false);
2168    }
2169    if groups.is_empty() {
2170        return Ok(true);
2171    }
2172
2173    let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
2174        .into_iter()
2175        .map(|entry| entry.group)
2176        .collect::<HashSet<_>>();
2177    Ok(groups
2178        .into_iter()
2179        .all(|group| acked_groups.contains(&group.group)))
2180}
2181
2182fn load_queue_message_views(
2183    store: &UnifiedStore,
2184    queue: &str,
2185) -> RedDBResult<Vec<QueueMessageView>> {
2186    load_queue_message_views_with_runtime(None, store, queue)
2187}
2188
2189/// Kind-aware queue scan (Phase 2.5.5 RLS universal). When the
2190/// caller has a `RedDBRuntime` reference, the gate also applies
2191/// any `CREATE POLICY ... ON MESSAGES OF <queue>` predicate. In
2192/// autocommit / embedded paths that only have the raw store (e.g.
2193/// purge loops) we skip RLS because there's no session identity
2194/// to match against.
2195pub(super) fn load_queue_message_views_with_runtime(
2196    runtime: Option<&RedDBRuntime>,
2197    store: &UnifiedStore,
2198    queue: &str,
2199) -> RedDBResult<Vec<QueueMessageView>> {
2200    let manager = store
2201        .get_collection(queue)
2202        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
2203    // Phase 1.2 MVCC universal: capture before parallel scan. Messages
2204    // inserted by another connection's open txn stay invisible to
2205    // consumers until that txn commits (prevents phantom POPs).
2206    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
2207    let rls_filter = runtime.and_then(|rt| {
2208        crate::runtime::impl_core::rls_policy_filter_for_kind(
2209            rt,
2210            queue,
2211            crate::storage::query::ast::PolicyAction::Select,
2212            crate::storage::query::ast::PolicyTargetKind::Messages,
2213        )
2214    });
2215    let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
2216        && rls_filter.is_none()
2217        && runtime.is_some();
2218    if rls_enabled_but_denied {
2219        // RLS on + no Messages policy for this role = deny-default.
2220        return Ok(Vec::new());
2221    }
2222    let filter_arc = rls_filter.map(std::sync::Arc::new);
2223    let rt_arc = runtime;
2224    Ok(manager
2225        .query_all(move |entity| {
2226            if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
2227                return false;
2228            }
2229            if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
2230                return false;
2231            }
2232            if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
2233                return crate::runtime::query_exec::evaluate_entity_filter_with_db(
2234                    Some(&rt.inner.db),
2235                    entity,
2236                    filter,
2237                    queue,
2238                    queue,
2239                );
2240            }
2241            true
2242        })
2243        .into_iter()
2244        .filter_map(queue_message_view_from_entity)
2245        .map(|mut view| {
2246            view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2247            view
2248        })
2249        .collect())
2250}
2251
2252fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
2253    let (position, _) = match &entity.kind {
2254        EntityKind::QueueMessage { position, queue } => (*position, queue),
2255        _ => return None,
2256    };
2257    let data = match entity.data {
2258        EntityData::QueueMessage(data) => data,
2259        _ => return None,
2260    };
2261    Some(QueueMessageView {
2262        id: entity.id,
2263        position,
2264        priority: data.priority.unwrap_or(0),
2265        payload: data.payload,
2266        attempts: data.attempts,
2267        max_attempts: data.max_attempts,
2268        enqueued_at_ns: data.enqueued_at_ns,
2269        available_at_ns: None,
2270    })
2271}
2272
2273/// Insert a moved payload onto `queue` using only the payload value —
2274/// priority / attempts / TTL fall back to the destination queue's
2275/// catalog config (mirrors a fresh enqueue rather than carrying source
2276/// metadata over). Used by `PrimaryQueueStore::move_to_queue`, the
2277/// `QueueLifecycle::move_between_queues` adapter that owns only
2278/// `(message_id, payload)` after `pop_messages` retires the source row.
2279pub(super) fn insert_moved_queue_message_payload(
2280    store: &UnifiedStore,
2281    queue: &str,
2282    payload: &Value,
2283) -> RedDBResult<EntityId> {
2284    let config = load_queue_config(store, queue);
2285    let position = next_queue_position(store, queue, QueueSide::Right)?;
2286    let enqueued_at_ns = std::time::SystemTime::now()
2287        .duration_since(std::time::UNIX_EPOCH)
2288        .map(|d| d.as_nanos() as u64)
2289        .unwrap_or(0);
2290    let entity = UnifiedEntity::new(
2291        EntityId::new(0),
2292        EntityKind::QueueMessage {
2293            queue: queue.to_string(),
2294            position,
2295        },
2296        EntityData::QueueMessage(QueueMessageData {
2297            payload: payload.clone(),
2298            priority: None,
2299            enqueued_at_ns,
2300            attempts: 0,
2301            max_attempts: config.max_attempts,
2302            acked: false,
2303        }),
2304    );
2305    let id = store
2306        .insert_auto(queue, entity)
2307        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2308    if let Some(ttl_ms) = config.ttl_ms {
2309        store
2310            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2311            .map_err(|err| RedDBError::Internal(err.to_string()))?;
2312    }
2313    Ok(id)
2314}
2315
2316fn insert_moved_queue_message(
2317    store: &UnifiedStore,
2318    queue: &str,
2319    config: &QueueRuntimeConfig,
2320    message: &QueueMessageView,
2321) -> RedDBResult<EntityId> {
2322    let position = next_queue_position(store, queue, QueueSide::Right)?;
2323    let entity = UnifiedEntity::new(
2324        EntityId::new(0),
2325        EntityKind::QueueMessage {
2326            queue: queue.to_string(),
2327            position,
2328        },
2329        EntityData::QueueMessage(QueueMessageData {
2330            payload: message.payload.clone(),
2331            priority: if config.priority {
2332                Some(message.priority)
2333            } else {
2334                None
2335            },
2336            enqueued_at_ns: message.enqueued_at_ns,
2337            attempts: message.attempts,
2338            max_attempts: message.max_attempts,
2339            acked: false,
2340        }),
2341    );
2342    let id = store
2343        .insert_auto(queue, entity)
2344        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2345    if let Some(ttl_ms) = config.ttl_ms {
2346        store
2347            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2348            .map_err(|err| RedDBError::Internal(err.to_string()))?;
2349    }
2350    Ok(id)
2351}
2352
2353fn queue_projection_default_columns() -> Vec<String> {
2354    [
2355        "id",
2356        "payload",
2357        "priority",
2358        "attempts",
2359        "last_error",
2360        "enqueued_at",
2361        "available_at",
2362        "dlq",
2363        "tenant",
2364    ]
2365    .into_iter()
2366    .map(str::to_string)
2367    .collect()
2368}
2369
2370fn queue_projection_record(
2371    columns: &[String],
2372    message: &QueueMessageView,
2373    dlq: bool,
2374) -> RedDBResult<UnifiedRecord> {
2375    let mut record = UnifiedRecord::new();
2376    for column in columns {
2377        let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
2378            RedDBError::Query(format!("unknown queue projection column '{}'", column))
2379        })?;
2380        record.set(column, value);
2381    }
2382    Ok(record)
2383}
2384
2385fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
2386    match column {
2387        "id" => Some(Value::text(message_id_string(message.id))),
2388        "payload" => Some(message.payload.clone()),
2389        "priority" => Some(Value::Integer(i64::from(message.priority))),
2390        "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
2391        "last_error" => Some(Value::Null),
2392        "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
2393        "available_at" => Some(Value::UnsignedInteger(
2394            message.available_at_ns.unwrap_or(message.enqueued_at_ns),
2395        )),
2396        "dlq" => Some(Value::Boolean(dlq)),
2397        "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
2398        _ => None,
2399    }
2400}
2401
2402fn queue_message_tenant(payload: &Value) -> Option<Value> {
2403    let Value::Json(bytes) = payload else {
2404        return None;
2405    };
2406    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2407    json.get("tenant")
2408        .and_then(crate::json::Value::as_str)
2409        .map(|tenant| Value::text(tenant.to_string()))
2410}
2411
2412fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
2413    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2414        return false;
2415    };
2416    !manager
2417        .query_all(|entity| {
2418            entity.data.as_row().is_some_and(|row| {
2419                row_text(row, "kind").as_deref() == Some("queue_config")
2420                    && row_text(row, "dlq").as_deref() == Some(queue)
2421            })
2422        })
2423        .is_empty()
2424}
2425
2426fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
2427    match filter {
2428        Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
2429            .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
2430        Filter::CompareFields { left, op, right } => {
2431            match (
2432                queue_filter_field_value(message, dlq, left),
2433                queue_filter_field_value(message, dlq, right),
2434            ) {
2435                (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
2436                _ => false,
2437            }
2438        }
2439        Filter::And(left, right) => {
2440            queue_message_matches_filter(message, dlq, left)
2441                && queue_message_matches_filter(message, dlq, right)
2442        }
2443        Filter::Or(left, right) => {
2444            queue_message_matches_filter(message, dlq, left)
2445                || queue_message_matches_filter(message, dlq, right)
2446        }
2447        Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
2448        Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
2449            .is_none_or(|value| matches!(value, Value::Null)),
2450        Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
2451            .is_some_and(|value| !matches!(value, Value::Null)),
2452        Filter::In { field, values } => {
2453            queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
2454                values
2455                    .iter()
2456                    .any(|value| queue_values_equal(&candidate, value))
2457            })
2458        }
2459        Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
2460            .is_some_and(|candidate| {
2461                queue_compare_values(&candidate, low, CompareOp::Ge)
2462                    && queue_compare_values(&candidate, high, CompareOp::Le)
2463            }),
2464        Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
2465            .is_some_and(|value| queue_like_matches(&value, pattern)),
2466        Filter::StartsWith { field, prefix } => {
2467            queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
2468        }
2469        Filter::EndsWith { field, suffix } => {
2470            queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
2471        }
2472        Filter::Contains { field, substring } => {
2473            queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
2474        }
2475        Filter::CompareExpr { .. } => false,
2476    }
2477}
2478
2479fn queue_filter_field_value(
2480    message: &QueueMessageView,
2481    dlq: bool,
2482    field: &FieldRef,
2483) -> Option<Value> {
2484    match field {
2485        FieldRef::TableColumn { table, column } if table.is_empty() => {
2486            queue_projection_value(message, dlq, column)
2487                .or_else(|| queue_payload_field_value(&message.payload, column))
2488        }
2489        FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
2490            .or_else(|| queue_payload_field_value(&message.payload, column)),
2491        _ => None,
2492    }
2493}
2494
2495fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
2496    let Value::Json(bytes) = payload else {
2497        return None;
2498    };
2499    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2500    let value = json.get(field)?;
2501    json_value_to_schema_value(value)
2502}
2503
2504fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
2505    if matches!(value, crate::json::Value::Null) {
2506        Some(Value::Null)
2507    } else if let Some(value) = value.as_bool() {
2508        Some(Value::Boolean(value))
2509    } else if let Some(value) = value.as_i64() {
2510        Some(Value::Integer(value))
2511    } else if let Some(value) = value.as_u64() {
2512        Some(Value::UnsignedInteger(value))
2513    } else if let Some(value) = value.as_f64() {
2514        Some(Value::Float(value))
2515    } else if let Some(value) = value.as_str() {
2516        Some(Value::text(value.to_string()))
2517    } else {
2518        Some(Value::Json(value.to_string_compact().into_bytes()))
2519    }
2520}
2521
2522fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
2523    queue_filter_field_value(message, dlq, field).and_then(|value| match value {
2524        Value::Text(value) => Some(value.to_string()),
2525        Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
2526        Value::Integer(value) => Some(value.to_string()),
2527        Value::UnsignedInteger(value) => Some(value.to_string()),
2528        Value::Float(value) => Some(value.to_string()),
2529        Value::Boolean(value) => Some(value.to_string()),
2530        _ => None,
2531    })
2532}
2533
2534fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
2535    match op {
2536        CompareOp::Eq => queue_values_equal(left, right),
2537        CompareOp::Ne => !queue_values_equal(left, right),
2538        CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
2539        CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
2540        CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
2541        CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
2542    }
2543}
2544
2545fn queue_values_equal(left: &Value, right: &Value) -> bool {
2546    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2547        return (left - right).abs() < f64::EPSILON;
2548    }
2549    match (left, right) {
2550        (Value::Text(left), Value::Text(right)) => left == right,
2551        (Value::Boolean(left), Value::Boolean(right)) => left == right,
2552        _ => left == right,
2553    }
2554}
2555
2556fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
2557    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2558        return left.partial_cmp(&right);
2559    }
2560    match (left, right) {
2561        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
2562        _ => None,
2563    }
2564}
2565
2566fn queue_value_number(value: &Value) -> Option<f64> {
2567    match value {
2568        Value::Integer(value) => Some(*value as f64),
2569        Value::UnsignedInteger(value) => Some(*value as f64),
2570        Value::Float(value) => Some(*value),
2571        Value::Text(value) => value.parse().ok(),
2572        _ => None,
2573    }
2574}
2575
2576fn queue_like_matches(value: &str, pattern: &str) -> bool {
2577    if pattern == "%" {
2578        return true;
2579    }
2580    let starts_wild = pattern.starts_with('%');
2581    let ends_wild = pattern.ends_with('%');
2582    let needle = pattern.trim_matches('%');
2583    match (starts_wild, ends_wild) {
2584        (true, true) => value.contains(needle),
2585        (true, false) => value.ends_with(needle),
2586        (false, true) => value.starts_with(needle),
2587        (false, false) => value == needle,
2588    }
2589}
2590
2591pub(super) fn queue_message_view_by_id(
2592    store: &UnifiedStore,
2593    queue: &str,
2594    message_id: EntityId,
2595) -> RedDBResult<Option<QueueMessageView>> {
2596    let manager = queue_manager(store, queue)?;
2597    Ok(manager
2598        .get(message_id)
2599        .and_then(queue_message_view_from_entity)
2600        .map(|mut view| {
2601            view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2602            view
2603        }))
2604}
2605
2606pub(super) fn sort_queue_messages(
2607    messages: &mut [QueueMessageView],
2608    config: &QueueRuntimeConfig,
2609    side: QueueSide,
2610) {
2611    messages.sort_by(|left, right| {
2612        if config.priority {
2613            right
2614                .priority
2615                .cmp(&left.priority)
2616                .then_with(|| match side {
2617                    QueueSide::Left => left.position.cmp(&right.position),
2618                    QueueSide::Right => right.position.cmp(&left.position),
2619                })
2620                .then_with(|| left.id.raw().cmp(&right.id.raw()))
2621        } else {
2622            match side {
2623                QueueSide::Left => left.position.cmp(&right.position),
2624                QueueSide::Right => right.position.cmp(&left.position),
2625            }
2626            .then_with(|| left.id.raw().cmp(&right.id.raw()))
2627        }
2628    });
2629}
2630
2631pub(super) fn next_queue_position(
2632    store: &UnifiedStore,
2633    queue: &str,
2634    side: QueueSide,
2635) -> RedDBResult<u64> {
2636    let messages = load_queue_message_views(store, queue)?;
2637    if messages.is_empty() {
2638        return Ok(QUEUE_POSITION_CENTER);
2639    }
2640    match side {
2641        QueueSide::Left => Ok(messages
2642            .iter()
2643            .map(|message| message.position)
2644            .min()
2645            .unwrap_or(QUEUE_POSITION_CENTER)
2646            .saturating_sub(1)),
2647        QueueSide::Right => Ok(messages
2648            .iter()
2649            .map(|message| message.position)
2650            .max()
2651            .unwrap_or(QUEUE_POSITION_CENTER)
2652            .saturating_add(1)),
2653    }
2654}
2655
2656pub(super) fn increment_queue_attempts(
2657    store: &UnifiedStore,
2658    queue: &str,
2659    message_id: EntityId,
2660) -> RedDBResult<u32> {
2661    let manager = queue_manager(store, queue)?;
2662    let mut entity = manager
2663        .get(message_id)
2664        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2665    match &mut entity.data {
2666        EntityData::QueueMessage(message) => {
2667            message.attempts = message.attempts.saturating_add(1);
2668            let attempts = message.attempts;
2669            manager
2670                .update(entity)
2671                .map_err(|err| RedDBError::Internal(err.to_string()))?;
2672            Ok(attempts)
2673        }
2674        _ => Err(RedDBError::Query(format!(
2675            "entity '{}' is not a queue message",
2676            message_id.raw()
2677        ))),
2678    }
2679}
2680
2681pub(super) fn queue_message_attempts(
2682    store: &UnifiedStore,
2683    queue: &str,
2684    message_id: EntityId,
2685) -> RedDBResult<u32> {
2686    Ok(queue_message_data(store, queue, message_id)?.attempts)
2687}
2688
2689pub(super) fn queue_message_max_attempts(
2690    store: &UnifiedStore,
2691    queue: &str,
2692    message_id: EntityId,
2693) -> RedDBResult<u32> {
2694    Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2695}
2696
2697pub(super) fn queue_message_payload(
2698    store: &UnifiedStore,
2699    queue: &str,
2700    message_id: EntityId,
2701) -> RedDBResult<Value> {
2702    Ok(queue_message_data(store, queue, message_id)?.payload)
2703}
2704
2705pub(super) fn queue_message_pending_any(
2706    store: &UnifiedStore,
2707    queue: &str,
2708    message_id: EntityId,
2709) -> RedDBResult<bool> {
2710    Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2711}
2712
2713pub(super) fn queue_message_pending_for_group(
2714    store: &UnifiedStore,
2715    queue: &str,
2716    group: &str,
2717    message_id: EntityId,
2718) -> RedDBResult<bool> {
2719    Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2720}
2721
2722pub(super) fn queue_message_acked_for_group(
2723    store: &UnifiedStore,
2724    queue: &str,
2725    group: &str,
2726    message_id: EntityId,
2727) -> RedDBResult<bool> {
2728    Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2729}
2730
2731fn queue_manager(
2732    store: &UnifiedStore,
2733    queue: &str,
2734) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2735    store
2736        .get_collection(queue)
2737        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2738}
2739
2740pub(super) fn queue_message_data(
2741    store: &UnifiedStore,
2742    queue: &str,
2743    message_id: EntityId,
2744) -> RedDBResult<QueueMessageData> {
2745    let manager = queue_manager(store, queue)?;
2746    let entity = manager
2747        .get(message_id)
2748        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2749    match entity.data {
2750        EntityData::QueueMessage(message) => Ok(message),
2751        _ => Err(RedDBError::Query(format!(
2752            "entity '{}' is not a queue message",
2753            message_id.raw()
2754        ))),
2755    }
2756}
2757
2758fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2759    let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2760    store
2761        .insert_auto(
2762            QUEUE_META_COLLECTION,
2763            UnifiedEntity::new(
2764                EntityId::new(0),
2765                EntityKind::TableRow {
2766                    table: Arc::from(QUEUE_META_COLLECTION),
2767                    row_id: 0,
2768                },
2769                EntityData::Row(RowData {
2770                    columns: Vec::new(),
2771                    named: Some(fields),
2772                    schema: None,
2773                }),
2774            ),
2775        )
2776        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2777    Ok(())
2778}
2779
2780pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2781    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2782        return;
2783    };
2784    let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2785    for row in rows {
2786        let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2787    }
2788}
2789
2790pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2791    let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2792}
2793
2794fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2795    format!("{queue}:{}", message_id.raw())
2796}
2797
2798pub(super) fn queue_message_lock_handle(
2799    runtime: &RedDBRuntime,
2800    queue: &str,
2801    message_id: EntityId,
2802) -> Arc<parking_lot::Mutex<()>> {
2803    let key = queue_message_lock_key(queue, message_id);
2804    if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2805        return lock;
2806    }
2807
2808    let mut locks = runtime.inner.queue_message_locks.write();
2809    locks
2810        .entry(key)
2811        .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2812        .clone()
2813}
2814
2815pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2816    runtime
2817        .inner
2818        .queue_message_locks
2819        .write()
2820        .remove(&queue_message_lock_key(queue, message_id));
2821}
2822
2823fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2824    let raw = value.strip_prefix('e').unwrap_or(value);
2825    raw.parse::<u64>()
2826        .map(EntityId::new)
2827        .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2828}
2829
2830/// ADR 0026: resolve the ACK/NACK handle. When `delivery_id` is supplied,
2831/// it wins unconditionally — strict failure if the handle does not resolve
2832/// to a live pending delivery on `queue`. When only the legacy tuple is
2833/// supplied, emit a rate-limited deprecation log line and use the tuple.
2834/// At least one handle must be present.
2835pub(super) fn resolve_ack_nack_handle(
2836    store: &UnifiedStore,
2837    queue: &str,
2838    group_hint: &str,
2839    message_id_hint: &str,
2840    delivery_id: Option<&str>,
2841) -> RedDBResult<(String, EntityId)> {
2842    if let Some(did) = delivery_id {
2843        return resolve_delivery_id(store, queue, did);
2844    }
2845    if group_hint.is_empty() || message_id_hint.is_empty() {
2846        return Err(RedDBError::Query(
2847            "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2848                .to_string(),
2849        ));
2850    }
2851    log_tuple_deprecation(queue);
2852    let entity = parse_message_id(message_id_hint)?;
2853    Ok((group_hint.to_string(), entity))
2854}
2855
2856fn resolve_delivery_id(
2857    store: &UnifiedStore,
2858    queue: &str,
2859    delivery_id: &str,
2860) -> RedDBResult<(String, EntityId)> {
2861    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2862        return Err(RedDBError::Query(format!(
2863            "delivery_id '{}' does not resolve to a live pending delivery",
2864            delivery_id
2865        )));
2866    };
2867    for entity in manager.query_all(|entity| {
2868        entity.data.as_row().is_some_and(|row| {
2869            row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2870                && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2871        })
2872    }) {
2873        if let Some(row) = entity.data.as_row() {
2874            let row_queue = row_text(row, "queue").unwrap_or_default();
2875            let row_group = row_text(row, "group").unwrap_or_default();
2876            let row_message = row_u64(row, "message_id").unwrap_or(0);
2877            if row_queue != queue {
2878                return Err(RedDBError::Query(format!(
2879                    "delivery_id '{}' belongs to queue '{}', not '{}'",
2880                    delivery_id, row_queue, queue
2881                )));
2882            }
2883            return Ok((row_group, EntityId::new(row_message)));
2884        }
2885    }
2886    Err(RedDBError::Query(format!(
2887        "delivery_id '{}' does not resolve to a live pending delivery",
2888        delivery_id
2889    )))
2890}
2891
2892/// Per-(connection, queue) rate-limited "tuple ACK is deprecated" log line.
2893/// One emission per minute matches ADR 0026's operational guidance.
2894fn log_tuple_deprecation(queue: &str) {
2895    use std::sync::atomic::Ordering;
2896    use std::sync::{Mutex, OnceLock};
2897    use std::time::Instant;
2898
2899    static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2900    const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2901
2902    let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2903    let key = (super::impl_core::current_connection_id(), queue.to_string());
2904    let now = Instant::now();
2905    let mut guard = match map.lock() {
2906        Ok(g) => g,
2907        Err(_) => return,
2908    };
2909    let should_emit =
2910        !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2911    if should_emit {
2912        guard.insert(key.clone(), now);
2913        drop(guard);
2914        TUPLE_DEPRECATION_EMITS.fetch_add(1, Ordering::Relaxed);
2915        tracing::warn!(
2916            target: "reddb::queue_lifecycle",
2917            queue = queue,
2918            connection_id = key.0,
2919            "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2920             switch to the server-issued delivery_id (ADR 0026). \
2921             The tuple path will be removed one minor release after introduction.",
2922        );
2923    }
2924}
2925
2926/// Total count of tuple-deprecation log emissions since process start.
2927/// Intentionally process-wide and `pub` so the transport-bridge
2928/// integration tests can observe that the legacy tuple path emitted a
2929/// deprecation while the `delivery_id` path stayed silent, without
2930/// having to plumb a `tracing::Subscriber` through every test.
2931pub static TUPLE_DEPRECATION_EMITS: std::sync::atomic::AtomicU64 =
2932    std::sync::atomic::AtomicU64::new(0);
2933
2934fn message_id_string(message_id: EntityId) -> String {
2935    message_id.raw().to_string()
2936}
2937
2938/// Issue #917 — render a delivered queue message as the JSON object the
2939/// RedWire `QueueEventPush` frame carries. Mirrors the column shape the
2940/// SQL `QUEUE READ` projection emits (`message_id` / `payload` /
2941/// `consumer` / `delivery_count`) so the wire push and the pull path
2942/// stay client-compatible.
2943fn delivered_message_json(
2944    message: crate::runtime::queue_lifecycle::DeliveredMessage,
2945) -> crate::serde_json::Value {
2946    use crate::serde_json::{Map, Value as JsonValue};
2947    let mut obj = Map::new();
2948    obj.insert(
2949        "message_id".to_string(),
2950        JsonValue::String(message_id_string(EntityId::new(message.message_id))),
2951    );
2952    obj.insert(
2953        "payload".to_string(),
2954        crate::presentation::entity_json::storage_value_to_json(&message.payload),
2955    );
2956    obj.insert("consumer".to_string(), JsonValue::String(message.consumer));
2957    obj.insert(
2958        "delivery_count".to_string(),
2959        JsonValue::Number(message.delivery_count as f64),
2960    );
2961    JsonValue::Object(obj)
2962}
2963
2964/// Slice 10 of issue #527 — render-time scan of pending entries
2965/// per (queue, group) for `queue_pending_gauge` exposition. Walks
2966/// `red_queue_meta` live so the gauge cannot drift from the source
2967/// of truth.
2968pub(crate) fn pending_counts_by_group(
2969    store: &UnifiedStore,
2970) -> std::collections::BTreeMap<(String, String), u64> {
2971    let mut counts: std::collections::BTreeMap<(String, String), u64> =
2972        std::collections::BTreeMap::new();
2973    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2974        return counts;
2975    };
2976    for entity in manager.query_all(|entity| {
2977        entity
2978            .data
2979            .as_row()
2980            .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2981    }) {
2982        if let Some(row) = entity.data.as_row() {
2983            let queue = row_text(row, "queue");
2984            let group = row_text(row, "group");
2985            if let (Some(q), Some(g)) = (queue, group) {
2986                *counts.entry((q, g)).or_insert(0) += 1;
2987            }
2988        }
2989    }
2990    counts
2991}
2992
2993pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2994    match row.get_field(field)?.clone() {
2995        Value::Text(value) => Some(value.to_string()),
2996        Value::NodeRef(value) => Some(value),
2997        Value::EdgeRef(value) => Some(value),
2998        Value::TableRef(value) => Some(value),
2999        _ => None,
3000    }
3001}
3002
3003pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
3004    match row.get_field(field)?.clone() {
3005        Value::UnsignedInteger(value) => Some(value),
3006        Value::Integer(value) if value >= 0 => Some(value as u64),
3007        Value::Float(value) if value >= 0.0 => Some(value as u64),
3008        Value::Text(value) => value.parse().ok(),
3009        _ => None,
3010    }
3011}
3012
3013fn row_bool(row: &RowData, field: &str) -> Option<bool> {
3014    match row.get_field(field)?.clone() {
3015        Value::Boolean(value) => Some(value),
3016        Value::Text(value) => match value.to_ascii_lowercase().as_str() {
3017            "true" => Some(true),
3018            "false" => Some(false),
3019            _ => None,
3020        },
3021        _ => None,
3022    }
3023}
3024
3025fn queue_collection_contract(
3026    name: &str,
3027    priority: bool,
3028    ttl_ms: Option<u64>,
3029) -> crate::physical::CollectionContract {
3030    let now = current_unix_ms();
3031    let mut context_index_fields = Vec::new();
3032    if priority {
3033        context_index_fields.push("priority".to_string());
3034    }
3035
3036    crate::physical::CollectionContract {
3037        name: name.to_string(),
3038        declared_model: crate::catalog::CollectionModel::Queue,
3039        schema_mode: crate::catalog::SchemaMode::Dynamic,
3040        origin: crate::physical::ContractOrigin::Explicit,
3041        version: 1,
3042        created_at_unix_ms: now,
3043        updated_at_unix_ms: now,
3044        default_ttl_ms: ttl_ms,
3045        vector_dimension: None,
3046        vector_metric: None,
3047        context_index_fields,
3048        declared_columns: Vec::new(),
3049        table_def: None,
3050        timestamps_enabled: false,
3051        context_index_enabled: false,
3052        metrics_raw_retention_ms: None,
3053        metrics_rollup_policies: Vec::new(),
3054        metrics_tenant_identity: None,
3055        metrics_namespace: None,
3056        // Queues manipulate messages via push/pop/ack — the row DML
3057        // paths never apply. Flag it as append_only so inadvertent
3058        // `UPDATE/DELETE FROM queue_name` statements fail loudly.
3059        append_only: true,
3060        subscriptions: Vec::new(),
3061        analytics_config: Vec::new(),
3062        session_key: None,
3063        session_gap_ms: None,
3064        retention_duration_ms: None,
3065        analytical_storage: None,
3066    }
3067}
3068
3069fn current_unix_ms() -> u128 {
3070    std::time::SystemTime::now()
3071        .duration_since(std::time::UNIX_EPOCH)
3072        .unwrap_or_default()
3073        .as_millis()
3074}
3075
3076pub(super) fn now_ns() -> u64 {
3077    std::time::SystemTime::now()
3078        .duration_since(std::time::UNIX_EPOCH)
3079        .unwrap_or_default()
3080        .as_nanos() as u64
3081}
3082
3083pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
3084    queue_message_metadata(Some(ttl_ms), None)
3085}
3086
3087/// Build the per-message metadata row attached to a queue message. Both
3088/// fields are optional — `_ttl_ms` carries the per-message TTL (existing
3089/// behaviour) and `_available_at_ns` carries the first-delivery instant
3090/// for delayed messages (issue #722). When both are present they share
3091/// the same row, since `UnifiedStore::set_metadata` replaces the entry.
3092pub(super) fn queue_message_metadata(
3093    ttl_ms: Option<u64>,
3094    available_at_ns: Option<u64>,
3095) -> Metadata {
3096    let mut fields = HashMap::new();
3097    if let Some(ttl_ms) = ttl_ms {
3098        fields.insert(
3099            "_ttl_ms".to_string(),
3100            if ttl_ms <= i64::MAX as u64 {
3101                MetadataValue::Int(ttl_ms as i64)
3102            } else {
3103                MetadataValue::Timestamp(ttl_ms)
3104            },
3105        );
3106    }
3107    if let Some(at_ns) = available_at_ns {
3108        fields.insert(
3109            "_available_at_ns".to_string(),
3110            MetadataValue::Timestamp(at_ns),
3111        );
3112    }
3113    Metadata::with_fields(fields)
3114}
3115
3116/// Smallest future `available_at_ns` among messages currently sitting
3117/// on `queue`. Used by `QUEUE READ … WAIT` (issue #722) to cap the
3118/// condvar park horizon: without this, a waiter on a quiet queue with
3119/// only delayed messages would never wake when one became due, since
3120/// the wait registry is only notified by producer commits. Returns
3121/// `None` when no future-dated message exists (the common case — the
3122/// caller falls back to the user-supplied wait budget).
3123pub(super) fn earliest_future_available_at(store: &UnifiedStore, queue: &str) -> Option<u64> {
3124    let now_ns = now_ns();
3125    let views = load_queue_message_views_with_runtime(None, store, queue).ok()?;
3126    views
3127        .iter()
3128        .filter_map(|v| v.available_at_ns)
3129        .filter(|at| *at > now_ns)
3130        .min()
3131}
3132
3133/// Update the `_available_at_ns` metadata for a queue message in place
3134/// without dropping any `_ttl_ms` already present (issue #723 — used by
3135/// NACK retry delay). `available_at_ns = None` clears the field. The
3136/// `fallback_ttl_ms` argument is consulted only when the existing
3137/// metadata row carries no `_ttl_ms` — keeps the per-message TTL in
3138/// sync with the queue default in the common case where no per-message
3139/// TTL was set explicitly. Tolerant of missing collections / entities:
3140/// returns `Ok(())` rather than failing the surrounding NACK if the
3141/// underlying message has gone away between resolution and metadata
3142/// update (a benign race; the next delivery cycle reflects truth).
3143pub(super) fn set_message_available_at_ns(
3144    store: &UnifiedStore,
3145    queue: &str,
3146    message_id: EntityId,
3147    available_at_ns: Option<u64>,
3148    fallback_ttl_ms: Option<u64>,
3149) -> RedDBResult<()> {
3150    let existing_ttl_ms = store
3151        .get_metadata(queue, message_id)
3152        .and_then(|md| match md.get("_ttl_ms")? {
3153            MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3154            MetadataValue::Timestamp(t) => Some(*t),
3155            _ => None,
3156        })
3157        .or(fallback_ttl_ms);
3158    let metadata = queue_message_metadata(existing_ttl_ms, available_at_ns);
3159    match store.set_metadata(queue, message_id, metadata) {
3160        Ok(()) => Ok(()),
3161        Err(crate::storage::StoreError::CollectionNotFound(_)) => Ok(()),
3162        Err(err) => Err(RedDBError::Internal(err.to_string())),
3163    }
3164}
3165
3166/// Read the `_available_at_ns` metadata for a queue message. Returns
3167/// `None` for messages with no delay (the common case) or when the
3168/// metadata row is missing entirely.
3169pub(super) fn read_message_available_at_ns(
3170    store: &UnifiedStore,
3171    queue: &str,
3172    message_id: EntityId,
3173) -> Option<u64> {
3174    let md = store.get_metadata(queue, message_id)?;
3175    match md.get("_available_at_ns")? {
3176        MetadataValue::Timestamp(t) => Some(*t),
3177        MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3178        _ => None,
3179    }
3180}
3181
3182/// Rough payload byte estimate for outbox watermark tracking.
3183fn estimate_payload_bytes(payload: &Value) -> u64 {
3184    match payload {
3185        Value::Json(v) => v.len() as u64,
3186        Value::Text(s) => s.len() as u64,
3187        _ => 64,
3188    }
3189}
3190
3191#[cfg(test)]
3192mod presence_integration_tests {
3193    use super::*;
3194    use crate::storage::queue::presence::{PresenceState, DEFAULT_PRESENCE_TTL_MS};
3195    use crate::{RedDBOptions, RedDBRuntime};
3196
3197    /// Issue #742 acceptance: `QUEUE READ` must register and refresh
3198    /// consumer presence. The snapshot exposes the same `(queue, group,
3199    /// consumer)` triple the read named, with `state = Active`.
3200    #[test]
3201    fn queue_read_emits_consumer_presence_heartbeat() {
3202        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3203        rt.execute_query("CREATE QUEUE tasks").unwrap();
3204        rt.execute_query("QUEUE GROUP CREATE tasks workers")
3205            .unwrap();
3206        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
3207        rt.execute_query("QUEUE READ tasks GROUP workers CONSUMER w1")
3208            .unwrap();
3209
3210        let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3211        assert_eq!(snap.len(), 1, "exactly one heartbeat recorded");
3212        let row = &snap[0];
3213        assert_eq!(row.queue, "tasks");
3214        assert_eq!(row.group, "workers");
3215        assert_eq!(row.consumer, "w1");
3216        assert_eq!(row.state, PresenceState::Active);
3217
3218        let counts = rt.queue_active_consumer_counts(DEFAULT_PRESENCE_TTL_MS);
3219        assert_eq!(
3220            counts[&("tasks".to_string(), "workers".to_string())],
3221            1,
3222            "active count reflects the live consumer"
3223        );
3224    }
3225
3226    /// Issue #742 acceptance: a read that returns no messages must
3227    /// still heartbeat — aliveness is independent of pending
3228    /// deliveries.
3229    #[test]
3230    fn empty_queue_read_still_heartbeats() {
3231        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3232        rt.execute_query("CREATE QUEUE empty_q").unwrap();
3233        rt.execute_query("QUEUE GROUP CREATE empty_q workers")
3234            .unwrap();
3235        // No PUSH — the queue is empty.
3236        rt.execute_query("QUEUE READ empty_q GROUP workers CONSUMER w1")
3237            .unwrap();
3238
3239        let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3240        assert_eq!(
3241            snap.len(),
3242            1,
3243            "empty read still registers consumer presence"
3244        );
3245        assert_eq!(snap[0].state, PresenceState::Active);
3246        assert_eq!(
3247            snap[0].lease_count, 0,
3248            "no messages delivered → zero leases"
3249        );
3250    }
3251}