Skip to main content

reddb_server/runtime/
impl_queue.rs

1//! Queue DDL and command execution
2
3use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::runtime::impl_core::{
9    clear_current_auth_identity, clear_current_tenant, current_auth_identity, current_tenant,
10    set_current_auth_identity, set_current_tenant,
11};
12use crate::storage::queue::QueueMode;
13use crate::storage::unified::entity::{QueueMessageData, RowData};
14use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
15
16use super::*;
17
18use super::primary_queue_store::PrimaryQueueStore;
19use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
20use crate::storage::queue::lifecycle::{
21    QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
22};
23
24/// Build a [`QueueLifecycle`] backed by a fresh [`PrimaryQueueStore`] for
25/// the given `queue`, plus a `QueueTxn` bound to the live runtime
26/// connection. The store inside the lifecycle and the standalone
27/// [`PrimaryQueueStore`] returned for ack/nack lookups share the same
28/// underlying [`UnifiedStore`] — calls against either are observable on
29/// the other.
30pub(super) fn runtime_lifecycle(
31    runtime: &RedDBRuntime,
32    queue: &str,
33) -> (
34    QueueLifecycle<PrimaryQueueStore>,
35    PrimaryQueueStore,
36    QueueTxn,
37) {
38    let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
39    let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
40    let txn = primary_for_lifecycle.new_txn();
41    let cfg = primary_for_lifecycle.lifecycle_config(queue);
42    (
43        QueueLifecycle::new(primary_for_lifecycle, cfg),
44        primary_for_lookup,
45        txn,
46    )
47}
48
49/// Slice C of PRD #718 — error surfaced to callers when the wait
50/// registry is cancelled (server shutdown) while a `QUEUE READ … WAIT`
51/// is parked. Kept as a plain `RedDBError::Query` so transports
52/// inherit the message unchanged — there is no separate `Cancelled`
53/// variant on the public error today.
54pub(crate) const QUEUE_READ_WAIT_CANCELLED: &str =
55    "QUEUE READ WAIT cancelled — server shutting down";
56
57/// Slice B of PRD #718 — `red.config` key naming the maximum WAIT
58/// budget the runtime will honour. Values above the cap are rejected
59/// before any waiter is registered.
60pub(crate) const QUEUE_MAX_WAIT_MS_CONFIG_KEY: &str = "red.config.queue.max_wait_ms";
61
62/// Default cap when the operator has not set
63/// [`QUEUE_MAX_WAIT_MS_CONFIG_KEY`] — 60 seconds, in milliseconds.
64pub(crate) const QUEUE_MAX_WAIT_MS_DEFAULT: u64 = 60_000;
65
66/// Outcome of the async live queue-wait edge ([`RedDBRuntime::redwire_queue_wait_json`],
67/// issue #919). Carries the three non-error terminal states the RedWire
68/// session maps to distinct frames, so a timeout never aliases an empty
69/// delivery and a cancellation never aliases a timeout:
70///   - `Delivered` → one `QueueEventPush` per message.
71///   - `TimedOut`  → a distinct `QueueWaitTimeout` frame.
72///   - `Cancelled` → a `StreamError` with the cancellation code.
73///
74/// A genuine runtime failure (bad queue, read error) stays an `Err` on
75/// the surrounding `RedDBResult`.
76#[derive(Debug)]
77pub(crate) enum RedwireWaitOutcome {
78    Delivered(Vec<crate::serde_json::Value>),
79    TimedOut,
80    Cancelled,
81}
82
83/// Slice C of PRD #718 — scope key for the queue wait registry.
84/// Today every connection in the process shares a single namespace;
85/// the helper exists so multi-tenant scoping (e.g. tenant id) can be
86/// threaded through later without touching every call site.
87pub(super) fn queue_wait_scope() -> String {
88    crate::runtime::impl_core::current_tenant().unwrap_or_default()
89}
90
91fn with_redwire_wait_context<T>(
92    auth_identity: &Option<(String, crate::auth::Role)>,
93    tenant: &Option<String>,
94    f: impl FnOnce() -> T,
95) -> T {
96    let previous_auth = current_auth_identity();
97    let previous_tenant = current_tenant();
98    match tenant {
99        Some(t) => set_current_tenant(t.clone()),
100        None => clear_current_tenant(),
101    }
102    match auth_identity {
103        Some((username, role)) => set_current_auth_identity(username.clone(), *role),
104        None => clear_current_auth_identity(),
105    }
106    let result = f();
107    match previous_tenant {
108        Some(t) => set_current_tenant(t),
109        None => clear_current_tenant(),
110    }
111    match previous_auth {
112        Some((username, role)) => set_current_auth_identity(username, role),
113        None => clear_current_auth_identity(),
114    }
115    result
116}
117
118/// Convert a lifecycle `QueueSide` view into the AST flavour we accept
119/// from `QueueCommand` callers. Both enums are isomorphic but live in
120/// different modules.
121fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
122    use crate::storage::query::ast::QueueSide as Ast;
123    match side {
124        Ast::Left => LcQueueSide::Left,
125        Ast::Right => LcQueueSide::Right,
126    }
127}
128
129/// Map a `QueueStoreError` (returned by lifecycle methods) onto the
130/// runtime-facing `RedDBError`. Mirrors the wire-error shapes the legacy
131/// `queue_delivery::*` helpers produced.
132fn map_qse(err: QueueStoreError) -> RedDBError {
133    match err {
134        QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
135            "delivery_id '{id}' does not resolve to a live pending delivery"
136        )),
137        QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
138        QueueStoreError::ReplicaImmutable => {
139            RedDBError::Internal("replica QueueStore is immutable".to_string())
140        }
141    }
142}
143
144// ---------------------------------------------------------------------------
145// Outbox metrics (exposed via /metrics)
146// ---------------------------------------------------------------------------
147
148/// Total event push attempts that failed (queue full or other error) and
149/// triggered DLQ routing.
150pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
151
152/// Total events routed to the dead-letter queue.
153pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
154
155/// Total events successfully enqueued to their target queue.
156pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
157
158/// Warn when total estimated outbox payload bytes exceed this value (1 GiB).
159const OUTBOX_WARN_BYTES: u64 = 1 << 30;
160
161/// Route all new events to DLQ when estimated outbox exceeds this value (10 GiB).
162const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
163
164/// Running estimate of bytes pending in event queues (approximate; not decremented on consume).
165static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
166
167const QUEUE_META_COLLECTION: &str = "red_queue_meta";
168const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
169const WORK_DEFAULT_GROUP: &str = "_work_default";
170const FANOUT_GROUP_PREFIX: &str = "_fanout_";
171
172#[derive(Debug, Clone)]
173pub(super) struct QueueRuntimeConfig {
174    pub(super) mode: QueueMode,
175    pub(super) priority: bool,
176    pub(super) max_size: Option<usize>,
177    pub(super) ttl_ms: Option<u64>,
178    pub(super) dlq: Option<String>,
179    pub(super) max_attempts: u32,
180    pub(super) lock_deadline_ms: u64,
181    pub(super) in_flight_cap_per_group: u32,
182    /// Default retry delay (issue #723) applied to NACK-requeued
183    /// messages before they become re-deliverable. `None` keeps the
184    /// pre-#723 immediate-requeue behaviour. Overridden per-failure by
185    /// an authorized `NACK ... WITH DELAY <duration>`.
186    pub(super) retry_delay_ms: Option<u64>,
187}
188
189#[derive(Debug, Clone)]
190struct QueueGroupEntry {
191    entity_id: EntityId,
192    group: String,
193}
194
195#[derive(Debug, Clone)]
196pub(super) struct QueuePendingEntry {
197    pub(super) entity_id: EntityId,
198    group: String,
199    pub(super) message_id: EntityId,
200    consumer: String,
201    pub(super) delivered_at_ns: u64,
202    pub(super) delivery_count: u32,
203}
204
205#[derive(Debug, Clone)]
206pub(super) struct QueueAckEntry {
207    entity_id: EntityId,
208    group: String,
209    pub(super) message_id: EntityId,
210}
211
212#[derive(Debug, Clone)]
213pub(super) struct QueueMessageView {
214    pub(super) id: EntityId,
215    position: u64,
216    priority: i32,
217    pub(super) payload: Value,
218    attempts: u32,
219    pub(super) max_attempts: u32,
220    enqueued_at_ns: u64,
221    /// First-delivery instant for delayed messages (issue #722). `None`
222    /// means immediate availability. Sourced from the
223    /// `_available_at_ns` metadata field, populated on push.
224    pub(super) available_at_ns: Option<u64>,
225}
226
227impl QueueMessageView {
228    /// Whether this message is currently deliverable. Messages whose
229    /// `available_at_ns` lies in the future remain durable and
230    /// inspectable but are filtered out of `QUEUE READ` / `QUEUE POP`
231    /// projections.
232    pub(super) fn is_available_now(&self) -> bool {
233        match self.available_at_ns {
234            Some(at) => at <= now_ns(),
235            None => true,
236        }
237    }
238}
239
240impl RedDBRuntime {
241    /// Slice C of PRD #718 — non-blocking `group_read` plus optional
242    /// `WAIT <duration>` retry. When `wait_ms` is `None` this is the
243    /// pre-slice-C synchronous read. When `Some`, an immediate empty
244    /// projection parks the caller on the shared
245    /// [`crate::runtime::queue_wait_registry::QueueWaitRegistry`] and
246    /// retries on wake until the deadline. Timeout returns an empty
247    /// projection (zero records, no error). Shutdown cancellation
248    /// returns [`QUEUE_READ_WAIT_CANCELLED`].
249    pub(super) fn group_read_with_optional_wait(
250        &self,
251        queue: &str,
252        group: &str,
253        consumer: &str,
254        count: usize,
255        wait_ms: Option<u64>,
256    ) -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
257        let do_read =
258            |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
259                let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
260                lifecycle
261                    .group_read(&txn, queue, group, consumer, count)
262                    .map_err(map_qse)
263            };
264
265        let delivered = do_read(self)?;
266        let Some(wait_ms) = wait_ms else {
267            return Ok(delivered);
268        };
269        if !delivered.is_empty() {
270            return Ok(delivered);
271        }
272        // Empty under WAIT: park on the registry. WAIT 0 collapses to
273        // a single re-probe of the registry's current state — useful
274        // for tests but the timeout path returns immediately.
275        //
276        // Telemetry (slice D / PRD #718 / #729): we record exactly one
277        // `wait_started` increment at entry, and exactly one terminal
278        // outcome increment + histogram observation at exit, for the
279        // (scope, queue) labels. The histogram measures wall-clock
280        // started→resolved across all re-park iterations of this call.
281        let registry = self.queue_wait_registry();
282        let scope = queue_wait_scope();
283        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
284        let telemetry = self.queue_telemetry();
285        telemetry.record_wait_started(&scope, queue);
286        let wait_start = std::time::Instant::now();
287        let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
288            let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
289            telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
290        };
291        loop {
292            // Snapshot BEFORE the re-probe so a notify that fires
293            // between the probe and the park bumps the generation and
294            // wait_until returns Woken without ever blocking.
295            let snapshot = registry.snapshot(&scope, queue);
296            let delivered = do_read(self)?;
297            if !delivered.is_empty() {
298                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
299                return Ok(delivered);
300            }
301            if registry.is_cancelled() {
302                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
303                return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
304            }
305            if std::time::Instant::now() >= deadline {
306                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
307                return Ok(Vec::new());
308            }
309            // Issue #722: a delayed message becomes deliverable when its
310            // `available_at_ns` passes, but the registry only wakes on
311            // producer commits — a quiet queue with a future due-time
312            // would otherwise sit on the condvar until the user budget
313            // expired. Cap the park horizon at the soonest future
314            // `available_at_ns` so the next loop iteration probes the
315            // queue at-or-just-after the message becomes due. A
316            // `Timeout` from the capped park is not the final answer; we
317            // loop and re-probe before deciding the user budget is up.
318            let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
319                Some(at_ns) => {
320                    let now_ns = now_ns();
321                    if at_ns <= now_ns {
322                        // Already due; re-probe immediately.
323                        deadline.min(std::time::Instant::now())
324                    } else {
325                        let wait_ns = at_ns - now_ns;
326                        let due_instant =
327                            std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
328                        deadline.min(due_instant)
329                    }
330                }
331                None => deadline,
332            };
333            match registry.wait_until(&snapshot, park_deadline) {
334                crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
335                crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
336                    // If this was the user-supplied deadline, give up;
337                    // otherwise loop and re-probe (a delayed message may
338                    // have just become due).
339                    if std::time::Instant::now() >= deadline {
340                        observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
341                        return Ok(Vec::new());
342                    }
343                    continue;
344                }
345                crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
346                    observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
347                    return Err(RedDBError::Query(QUEUE_READ_WAIT_CANCELLED.to_string()));
348                }
349            }
350        }
351    }
352
353    /// Issue #917 — async live-wait edge used by the RedWire session.
354    ///
355    /// Unlike [`group_read_with_optional_wait`](Self::group_read_with_optional_wait)
356    /// (the synchronous HTTP/condvar caller), this parks on the
357    /// registry's async wake head and never holds a blocking OS thread:
358    /// the awaiting tokio worker is released back to the runtime for the
359    /// wait duration. On every wake it re-probes the *normal* queue
360    /// delivery path (`group_read`), so a delivered message is genuinely
361    /// claimed, not merely observed. Returns the delivered messages
362    /// rendered as JSON values (so the transport edge stays free of
363    /// runtime queue types); [`RedwireWaitOutcome::TimedOut`] means the
364    /// deadline elapsed without a delivery (issue #919 surfaces this as
365    /// a distinct timeout frame rather than an empty push), and a
366    /// cancellation surfaces as [`RedwireWaitOutcome::Cancelled`] — the
367    /// async analogue of the sync path's [`QUEUE_READ_WAIT_CANCELLED`].
368    /// A genuine runtime failure stays an `Err`.
369    pub(crate) async fn redwire_queue_wait_json(
370        &self,
371        queue: &str,
372        group: Option<&str>,
373        consumer: &str,
374        count: usize,
375        wait_ms: u64,
376        auth_identity: Option<(String, crate::auth::Role)>,
377        tenant: Option<String>,
378    ) -> RedDBResult<RedwireWaitOutcome> {
379        let group_owned: RedDBResult<String> =
380            with_redwire_wait_context(&auth_identity, &tenant, || {
381                let expr = crate::storage::query::ast::QueryExpr::QueueCommand(
382                    crate::storage::query::ast::QueueCommand::GroupRead {
383                        queue: queue.to_string(),
384                        group: group.map(str::to_string),
385                        consumer: consumer.to_string(),
386                        count,
387                        wait_ms: Some(wait_ms),
388                    },
389                );
390                self.check_query_privilege(&expr)
391                    .map_err(RedDBError::Query)?;
392                let store = self.inner.db.store();
393                ensure_queue_exists(store.as_ref(), queue)?;
394                let config = load_queue_config(store.as_ref(), queue);
395                resolve_read_group(store.as_ref(), queue, group, consumer, &config)
396            });
397        let group_owned = group_owned?;
398        let group_ref = group_owned.as_str();
399
400        let do_read =
401            |runtime: &RedDBRuntime| -> RedDBResult<Vec<crate::runtime::queue_lifecycle::DeliveredMessage>> {
402                with_redwire_wait_context(&auth_identity, &tenant, || {
403                    let (lifecycle, _ps, txn) = runtime_lifecycle(runtime, queue);
404                    lifecycle
405                        .group_read(&txn, queue, group_ref, consumer, count)
406                        .map_err(map_qse)
407                })
408            };
409
410        let render = |delivered: Vec<crate::runtime::queue_lifecycle::DeliveredMessage>| {
411            RedwireWaitOutcome::Delivered(
412                delivered.into_iter().map(delivered_message_json).collect(),
413            )
414        };
415
416        // Fast path: a message is already deliverable at open time.
417        let delivered = do_read(self)?;
418        if !delivered.is_empty() {
419            return Ok(render(delivered));
420        }
421
422        let registry = self.queue_wait_registry();
423        let scope = with_redwire_wait_context(&auth_identity, &tenant, queue_wait_scope);
424        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(wait_ms);
425        let telemetry = self.queue_telemetry();
426        telemetry.record_wait_started(&scope, queue);
427        let wait_start = std::time::Instant::now();
428        tracing::debug!(
429            target: "reddb::redwire::queue_wait",
430            queue,
431            group = group_ref,
432            consumer,
433            count,
434            wait_ms,
435            scope = scope.as_str(),
436            "redwire queue wait parked"
437        );
438        let observe = |outcome: crate::runtime::queue_telemetry::WaitOutcomeLabel| {
439            let elapsed_ms = wait_start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
440            telemetry.record_wait_outcome(&scope, queue, outcome, elapsed_ms);
441            tracing::debug!(
442                target: "reddb::redwire::queue_wait",
443                queue,
444                group = group_ref,
445                consumer,
446                count,
447                wait_ms,
448                scope = scope.as_str(),
449                outcome = outcome.as_str(),
450                duration_ms = elapsed_ms,
451                "redwire queue wait resolved"
452            );
453        };
454        loop {
455            // Register the async waiter (snapshot the generation) BEFORE
456            // the re-probe so a notify landing between probe and park is
457            // observed as a generation move rather than a lost wake.
458            let waiter = registry.async_waiter(&scope, queue);
459            let delivered = do_read(self)?;
460            if !delivered.is_empty() {
461                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Woken);
462                return Ok(render(delivered));
463            }
464            if registry.is_cancelled() {
465                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
466                return Ok(RedwireWaitOutcome::Cancelled);
467            }
468            if std::time::Instant::now() >= deadline {
469                observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
470                return Ok(RedwireWaitOutcome::TimedOut);
471            }
472            let park_deadline = match earliest_future_available_at(&self.inner.db.store(), queue) {
473                Some(at_ns) => {
474                    let now_ns = now_ns();
475                    if at_ns <= now_ns {
476                        deadline.min(std::time::Instant::now())
477                    } else {
478                        let wait_ns = at_ns - now_ns;
479                        let due_instant =
480                            std::time::Instant::now() + std::time::Duration::from_nanos(wait_ns);
481                        deadline.min(due_instant)
482                    }
483                }
484                None => deadline,
485            };
486            // The async waiter (and its `Arc<Slot>` clone) is a local
487            // dropped on every return below, so an expired or cancelled
488            // wait releases its registry slot reference and frees the
489            // tokio worker the moment this future resolves (AC #4).
490            match registry.wait_until_async(&waiter, park_deadline).await {
491                crate::runtime::queue_wait_registry::WaitOutcome::Woken => continue,
492                crate::runtime::queue_wait_registry::WaitOutcome::Timeout => {
493                    if std::time::Instant::now() >= deadline {
494                        observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Timeout);
495                        return Ok(RedwireWaitOutcome::TimedOut);
496                    }
497                    continue;
498                }
499                crate::runtime::queue_wait_registry::WaitOutcome::Cancelled => {
500                    observe(crate::runtime::queue_telemetry::WaitOutcomeLabel::Cancelled);
501                    return Ok(RedwireWaitOutcome::Cancelled);
502                }
503            }
504        }
505    }
506
507    /// Reject a live queue-wait open whose requested budget exceeds the
508    /// server's maximum wait cap (issue #919), mirroring the SQL
509    /// `QUEUE READ … WAIT` cap (slice B of PRD #718). Returns the
510    /// operator-actionable message (naming the `red.config` key and the
511    /// active cap) when `wait_ms` is over the cap, or `Ok(())` to
512    /// proceed. The transport calls this *before* spawning the wait
513    /// task, so an over-cap request is refused with an explicit error
514    /// and never parks — not silently shortened (AC #3).
515    pub(crate) fn redwire_queue_wait_cap_check(&self, wait_ms: u64) -> Result<(), String> {
516        let cap = self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
517        if wait_ms > cap {
518            Err(format!(
519                "queue-wait WAIT {wait_ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
520            ))
521        } else {
522            Ok(())
523        }
524    }
525
526    pub(crate) fn enqueue_event_payload(
527        &self,
528        queue: &str,
529        payload: Value,
530    ) -> RedDBResult<EntityId> {
531        let store = self.inner.db.store();
532        // Auto-create the queue if it does not exist yet.
533        if store.get_collection(queue).is_none() {
534            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
535        }
536
537        // Estimate payload bytes for outbox watermark checks.
538        let payload_bytes = estimate_payload_bytes(&payload);
539        let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
540
541        // Hard limit: route directly to DLQ without even trying.
542        if outbox_bytes > OUTBOX_MAX_BYTES {
543            OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
544            EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
545            return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
546        }
547
548        // Soft limit: warn once per crossing.
549        if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
550            tracing::warn!(
551                outbox_bytes,
552                warn_threshold = OUTBOX_WARN_BYTES,
553                "event outbox approaching capacity warning threshold"
554            );
555            crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
556                queue: queue.to_string(),
557                dlq: format!("{queue}_outbox_dlq"),
558                reason: "outbox_warn_bytes_exceeded".to_string(),
559            }
560            .emit_global();
561        }
562
563        let config = load_queue_config(store.as_ref(), queue);
564
565        // If the target queue has a max_size and is full, route to DLQ.
566        if let Some(max_size) = config.max_size {
567            let current_len = load_queue_message_views(store.as_ref(), queue)
568                .unwrap_or_default()
569                .len();
570            if current_len >= max_size {
571                OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
572                EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
573                return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
574            }
575            // Warn at 80% capacity.
576            if current_len * 10 >= max_size * 8 {
577                tracing::warn!(
578                    queue = %queue,
579                    size = current_len,
580                    max = max_size,
581                    "event target queue near capacity"
582                );
583            }
584        }
585
586        let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
587        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
588        Ok(id)
589    }
590
591    /// Route a failed event to `<queue>_outbox_dlq`, auto-creating it if needed.
592    fn route_event_to_outbox_dlq(
593        &self,
594        queue: &str,
595        payload: Value,
596        reason: &str,
597    ) -> RedDBResult<EntityId> {
598        let dlq_name = format!("{queue}_outbox_dlq");
599        EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
600
601        crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
602            queue: queue.to_string(),
603            dlq: dlq_name.clone(),
604            reason: reason.to_string(),
605        }
606        .emit_global();
607
608        let store = self.inner.db.store();
609        if store.get_collection(&dlq_name).is_none() {
610            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
611        }
612        let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
613        let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
614        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
615        Ok(id)
616    }
617
618    /// Low-level event message insert — no size checks, no DLQ routing.
619    fn enqueue_event_payload_raw(
620        &self,
621        store: &UnifiedStore,
622        queue: &str,
623        config: &QueueRuntimeConfig,
624        payload: Value,
625    ) -> RedDBResult<EntityId> {
626        let position = next_queue_position(store, queue, QueueSide::Right)?;
627        let mut entity = UnifiedEntity::new(
628            EntityId::new(0),
629            EntityKind::QueueMessage {
630                queue: queue.to_string(),
631                position,
632            },
633            EntityData::QueueMessage(QueueMessageData {
634                payload,
635                priority: None,
636                enqueued_at_ns: now_ns(),
637                attempts: 0,
638                max_attempts: config.max_attempts,
639                acked: false,
640            }),
641        );
642        if let Some(xid) = self.current_xid() {
643            entity.set_xmin(xid);
644        }
645        let id = store
646            .insert_auto(queue, entity)
647            .map_err(|err| RedDBError::Internal(err.to_string()))?;
648        if let Some(ttl_ms) = config.ttl_ms {
649            store
650                .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
651                .map_err(|err| RedDBError::Internal(err.to_string()))?;
652        }
653        self.invalidate_result_cache_for_table(queue);
654        Ok(id)
655    }
656
657    pub fn execute_create_queue(
658        &self,
659        raw_query: &str,
660        query: &CreateQueueQuery,
661    ) -> RedDBResult<RuntimeQueryResult> {
662        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
663        if query.dlq.as_deref() == Some(query.name.as_str()) {
664            return Err(RedDBError::Query(
665                "dead-letter queue must be different from the source queue".to_string(),
666            ));
667        }
668
669        let store = self.inner.db.store();
670        let exists = store.get_collection(&query.name).is_some();
671        if exists {
672            if query.if_not_exists {
673                return Ok(RuntimeQueryResult::ok_message(
674                    raw_query.to_string(),
675                    &format!("queue '{}' already exists", query.name),
676                    "create",
677                ));
678            }
679            return Err(RedDBError::Query(format!(
680                "queue '{}' already exists",
681                query.name
682            )));
683        }
684
685        store
686            .create_collection(&query.name)
687            .map_err(|err| RedDBError::Internal(err.to_string()))?;
688        if let Some(ttl_ms) = query.ttl_ms {
689            self.inner
690                .db
691                .set_collection_default_ttl_ms(&query.name, ttl_ms);
692        }
693        self.inner
694            .db
695            .save_collection_contract(queue_collection_contract(
696                &query.name,
697                query.priority,
698                query.ttl_ms,
699            ))
700            .map_err(|err| RedDBError::Internal(err.to_string()))?;
701        save_queue_config(
702            store.as_ref(),
703            &query.name,
704            &QueueRuntimeConfig {
705                mode: query.mode,
706                priority: query.priority,
707                max_size: query.max_size,
708                ttl_ms: query.ttl_ms,
709                dlq: query.dlq.clone(),
710                max_attempts: query.max_attempts,
711                lock_deadline_ms: query.lock_deadline_ms,
712                in_flight_cap_per_group: query.in_flight_cap_per_group,
713                retry_delay_ms: query.retry_delay_ms,
714            },
715        )?;
716
717        if let Some(dlq) = &query.dlq {
718            if store.get_collection(dlq).is_none() {
719                store
720                    .create_collection(dlq)
721                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
722                self.inner
723                    .db
724                    .save_collection_contract(queue_collection_contract(dlq, false, None))
725                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
726            }
727        }
728
729        self.invalidate_result_cache();
730        self.inner
731            .db
732            .persist_metadata()
733            .map_err(|err| RedDBError::Internal(err.to_string()))?;
734        // Issue #120 — feed the queue into the schema-vocabulary so
735        // AskPipeline (#121) can resolve queue references. Queues
736        // have an opaque payload column, so we expose `payload` and
737        // (when configured) the DLQ partner as type-tag context.
738        let mut type_tags = Vec::new();
739        if let Some(dlq) = &query.dlq {
740            type_tags.push(format!("dlq:{}", dlq));
741        }
742        self.schema_vocabulary_apply(
743            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
744                collection: query.name.clone(),
745                columns: vec!["payload".to_string()],
746                type_tags,
747                description: None,
748            },
749        );
750
751        let mut msg = format!("queue '{}' created", query.name);
752        msg.push_str(&format!(" (mode={})", query.mode.as_str()));
753        if query.priority {
754            msg.push_str(" (priority)");
755        }
756        if let Some(max_size) = query.max_size {
757            msg.push_str(&format!(" (max_size={max_size})"));
758        }
759        if let Some(ttl_ms) = query.ttl_ms {
760            msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
761        }
762        if let Some(dlq) = &query.dlq {
763            msg.push_str(&format!(
764                " (dlq={dlq}, max_attempts={})",
765                query.max_attempts
766            ));
767        }
768
769        Ok(RuntimeQueryResult::ok_message(
770            raw_query.to_string(),
771            &msg,
772            "create",
773        ))
774    }
775
776    pub fn execute_alter_queue(
777        &self,
778        raw_query: &str,
779        query: &AlterQueueQuery,
780    ) -> RedDBResult<RuntimeQueryResult> {
781        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
782        let store = self.inner.db.store();
783        ensure_queue_exists(store.as_ref(), &query.name)?;
784
785        let mut config = load_queue_config(store.as_ref(), &query.name);
786        let mut summary: Vec<String> = Vec::new();
787
788        if let Some(new_mode) = query.mode {
789            let pending =
790                load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
791            if !pending.is_empty() {
792                tracing::warn!(
793                    queue = %query.name,
794                    pending_count = pending.len(),
795                    new_mode = %new_mode.as_str(),
796                    "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
797                     new reads use {}",
798                    pending.len(),
799                    new_mode.as_str(),
800                );
801            }
802            config.mode = new_mode;
803            summary.push(format!("mode={}", new_mode.as_str()));
804        }
805        if let Some(max_attempts) = query.max_attempts {
806            config.max_attempts = max_attempts;
807            summary.push(format!("max_attempts={max_attempts}"));
808        }
809        if let Some(lock_deadline_ms) = query.lock_deadline_ms {
810            config.lock_deadline_ms = lock_deadline_ms;
811            summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
812        }
813        if let Some(in_flight_cap) = query.in_flight_cap_per_group {
814            config.in_flight_cap_per_group = in_flight_cap;
815            summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
816        }
817        if let Some(dlq) = &query.dlq {
818            if dlq == &query.name {
819                return Err(RedDBError::Query(
820                    "dead-letter queue must be different from the source queue".to_string(),
821                ));
822            }
823            config.dlq = Some(dlq.clone());
824            summary.push(format!("dlq={dlq}"));
825        }
826        if let Some(retry_delay_ms) = query.retry_delay_ms {
827            config.retry_delay_ms = if retry_delay_ms == 0 {
828                None
829            } else {
830                Some(retry_delay_ms)
831            };
832            summary.push(format!("retry_delay_ms={retry_delay_ms}"));
833        }
834
835        save_queue_config(store.as_ref(), &query.name, &config)?;
836
837        self.invalidate_result_cache();
838        self.inner
839            .db
840            .persist_metadata()
841            .map_err(|err| RedDBError::Internal(err.to_string()))?;
842
843        Ok(RuntimeQueryResult::ok_message(
844            raw_query.to_string(),
845            &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
846            "alter",
847        ))
848    }
849
850    pub fn execute_drop_queue(
851        &self,
852        raw_query: &str,
853        query: &DropQueueQuery,
854    ) -> RedDBResult<RuntimeQueryResult> {
855        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
856        let store = self.inner.db.store();
857        if super::impl_ddl::is_system_schema_name(&query.name) {
858            return Err(RedDBError::Query("system schema is read-only".to_string()));
859        }
860        if store.get_collection(&query.name).is_none() {
861            if query.if_exists {
862                return Ok(RuntimeQueryResult::ok_message(
863                    raw_query.to_string(),
864                    &format!("queue '{}' does not exist", query.name),
865                    "drop",
866                ));
867            }
868            return Err(RedDBError::NotFound(format!(
869                "queue '{}' not found",
870                query.name
871            )));
872        }
873        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
874            &query.name,
875            &self.inner.db.catalog_model_snapshot(),
876        )?;
877        crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
878            crate::catalog::CollectionModel::Queue,
879            actual,
880        )?;
881
882        store
883            .drop_collection(&query.name)
884            .map_err(|err| RedDBError::Internal(err.to_string()))?;
885        self.inner.db.clear_collection_default_ttl_ms(&query.name);
886        self.inner
887            .db
888            .remove_collection_contract(&query.name)
889            .map_err(|err| RedDBError::Internal(err.to_string()))?;
890        remove_queue_metadata(store.as_ref(), &query.name);
891        self.invalidate_result_cache();
892        self.inner
893            .db
894            .persist_metadata()
895            .map_err(|err| RedDBError::Internal(err.to_string()))?;
896        // Issue #120 — invalidate the schema-vocabulary entry.
897        self.schema_vocabulary_apply(
898            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
899                collection: query.name.clone(),
900            },
901        );
902
903        Ok(RuntimeQueryResult::ok_message(
904            raw_query.to_string(),
905            &format!("queue '{}' dropped", query.name),
906            "drop",
907        ))
908    }
909
910    pub fn execute_queue_command(
911        &self,
912        raw_query: &str,
913        cmd: &QueueCommand,
914    ) -> RedDBResult<RuntimeQueryResult> {
915        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
916        match cmd {
917            QueueCommand::Push {
918                queue,
919                value,
920                side,
921                priority,
922                available,
923            } => {
924                let store = self.inner.db.store();
925                ensure_queue_exists(store.as_ref(), queue)?;
926                let config = load_queue_config(store.as_ref(), queue);
927                if priority.is_some() && !config.priority {
928                    return Err(RedDBError::Query(format!(
929                        "queue '{}' is not a priority queue",
930                        queue
931                    )));
932                }
933                if let Some(max_size) = config.max_size {
934                    let current_len =
935                        load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
936                            .len();
937                    if current_len >= max_size {
938                        return Err(RedDBError::Query(format!(
939                            "queue '{}' is full (max_size={max_size})",
940                            queue
941                        )));
942                    }
943                }
944
945                let position = next_queue_position(store.as_ref(), queue, *side)?;
946                let mut entity = UnifiedEntity::new(
947                    EntityId::new(0),
948                    EntityKind::QueueMessage {
949                        queue: queue.clone(),
950                        position,
951                    },
952                    EntityData::QueueMessage(QueueMessageData {
953                        payload: value.clone(),
954                        priority: if config.priority { *priority } else { None },
955                        enqueued_at_ns: now_ns(),
956                        attempts: 0,
957                        max_attempts: config.max_attempts,
958                        acked: false,
959                    }),
960                );
961                // Phase 1.1 MVCC universal: stamp xmin so other
962                // connections don't see this message until COMMIT.
963                if let Some(xid) = self.current_xid() {
964                    entity.set_xmin(xid);
965                }
966                let id = store
967                    .insert_auto(queue, entity)
968                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
969                // Resolve per-message availability (issue #722): DELAY is
970                // relative to the push instant, AVAILABLE AT carries an
971                // absolute unix-ms. Both collapse to a unix-ns timestamp
972                // delivery paths compare against. `None` means immediate.
973                let available_at_ns = available.map(|a| match a {
974                    crate::storage::query::ast::QueueAvailability::DelayMs(ms) => {
975                        now_ns().saturating_add(ms.saturating_mul(1_000_000))
976                    }
977                    crate::storage::query::ast::QueueAvailability::AtUnixMs(ms) => {
978                        ms.saturating_mul(1_000_000)
979                    }
980                });
981                if config.ttl_ms.is_some() || available_at_ns.is_some() {
982                    store
983                        .set_metadata(
984                            queue,
985                            id,
986                            queue_message_metadata(config.ttl_ms, available_at_ns),
987                        )
988                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
989                }
990                // Slice C of PRD #718 — wake `QUEUE READ … WAIT` waiters.
991                // Under autocommit this fires immediately; inside a txn
992                // the wake is buffered and replayed on COMMIT (rollback
993                // discards it so rolled-back enqueues do not deliver).
994                self.record_queue_wake(&queue_wait_scope(), queue);
995                self.invalidate_result_cache();
996
997                let mut result = UnifiedResult::with_columns(vec![
998                    "message_id".into(),
999                    "side".into(),
1000                    "queue".into(),
1001                ]);
1002                let mut record = UnifiedRecord::new();
1003                record.set("message_id", Value::text(message_id_string(id)));
1004                record.set(
1005                    "side",
1006                    Value::text(match side {
1007                        QueueSide::Left => "left".to_string(),
1008                        QueueSide::Right => "right".to_string(),
1009                    }),
1010                );
1011                record.set("queue", Value::text(queue.clone()));
1012                result.push(record);
1013
1014                Ok(RuntimeQueryResult {
1015                    query: raw_query.to_string(),
1016                    mode: QueryMode::Sql,
1017                    statement: "queue_push",
1018                    engine: "runtime-queue",
1019                    result,
1020                    affected_rows: 1,
1021                    statement_type: "insert",
1022                    bookmark: None,
1023                })
1024            }
1025            QueueCommand::Pop { queue, side, count } => {
1026                let store = self.inner.db.store();
1027                ensure_queue_exists(store.as_ref(), queue)?;
1028                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1029                let popped = lifecycle
1030                    .pop(queue, ast_side_to_lc(*side), *count, &txn)
1031                    .map_err(map_qse)?;
1032
1033                let mut result =
1034                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1035                for (message_id, payload) in &popped {
1036                    let mut record = UnifiedRecord::new();
1037                    record.set(
1038                        "message_id",
1039                        Value::text(message_id_string(EntityId::new(*message_id))),
1040                    );
1041                    record.set("payload", payload.clone());
1042                    result.push(record);
1043                }
1044                let popped_count = popped.len() as u64;
1045                if popped_count > 0 {
1046                    self.invalidate_result_cache();
1047                }
1048
1049                Ok(RuntimeQueryResult {
1050                    query: raw_query.to_string(),
1051                    mode: QueryMode::Sql,
1052                    statement: "queue_pop",
1053                    engine: "runtime-queue",
1054                    result,
1055                    affected_rows: popped_count,
1056                    statement_type: "delete",
1057                    bookmark: None,
1058                })
1059            }
1060            QueueCommand::Peek { queue, count } => {
1061                let store = self.inner.db.store();
1062                ensure_queue_exists(store.as_ref(), queue)?;
1063                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1064                let messages = lifecycle.peek(queue, *count, &txn);
1065
1066                let mut result =
1067                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
1068                for message in messages {
1069                    let mut record = UnifiedRecord::new();
1070                    record.set(
1071                        "message_id",
1072                        Value::text(message_id_string(EntityId::new(message.message_id))),
1073                    );
1074                    record.set("payload", message.payload);
1075                    result.push(record);
1076                }
1077
1078                Ok(RuntimeQueryResult {
1079                    query: raw_query.to_string(),
1080                    mode: QueryMode::Sql,
1081                    statement: "queue_peek",
1082                    engine: "runtime-queue",
1083                    result,
1084                    affected_rows: 0,
1085                    statement_type: "select",
1086                    bookmark: None,
1087                })
1088            }
1089            QueueCommand::Len { queue } => {
1090                let store = self.inner.db.store();
1091                ensure_queue_exists(store.as_ref(), queue)?;
1092                let count =
1093                    load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
1094                        as u64;
1095                let mut result = UnifiedResult::with_columns(vec!["len".into()]);
1096                let mut record = UnifiedRecord::new();
1097                record.set("len", Value::UnsignedInteger(count));
1098                result.push(record);
1099
1100                Ok(RuntimeQueryResult {
1101                    query: raw_query.to_string(),
1102                    mode: QueryMode::Sql,
1103                    statement: "queue_len",
1104                    engine: "runtime-queue",
1105                    result,
1106                    affected_rows: 0,
1107                    statement_type: "select",
1108                    bookmark: None,
1109                })
1110            }
1111            QueueCommand::Purge { queue } => {
1112                let store = self.inner.db.store();
1113                ensure_queue_exists(store.as_ref(), queue)?;
1114                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1115                let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
1116                if count > 0 {
1117                    self.invalidate_result_cache();
1118                }
1119
1120                Ok(RuntimeQueryResult::ok_message(
1121                    raw_query.to_string(),
1122                    &format!("{count} messages purged from queue '{queue}'"),
1123                    "delete",
1124                ))
1125            }
1126            QueueCommand::GroupCreate { queue, group } => {
1127                let store = self.inner.db.store();
1128                ensure_queue_exists(store.as_ref(), queue)?;
1129                if queue_group_exists(store.as_ref(), queue, group)? {
1130                    return Ok(RuntimeQueryResult::ok_message(
1131                        raw_query.to_string(),
1132                        &format!(
1133                            "consumer group '{}' already exists on queue '{}'",
1134                            group, queue
1135                        ),
1136                        "create",
1137                    ));
1138                }
1139                save_queue_group(store.as_ref(), queue, group)?;
1140                self.invalidate_result_cache();
1141
1142                Ok(RuntimeQueryResult::ok_message(
1143                    raw_query.to_string(),
1144                    &format!("consumer group '{}' created on queue '{}'", group, queue),
1145                    "create",
1146                ))
1147            }
1148            QueueCommand::GroupRead {
1149                queue,
1150                group,
1151                consumer,
1152                count,
1153                wait_ms,
1154            } => {
1155                let store = self.inner.db.store();
1156                ensure_queue_exists(store.as_ref(), queue)?;
1157                // Slice B of PRD #718: reject `WAIT` issued inside an
1158                // explicit transaction, and reject `WAIT > cap` before
1159                // any waiter is registered. Both checks fire before the
1160                // lifecycle is touched so a refused statement leaves
1161                // no side effects (no group auto-create, no parking).
1162                if let Some(ms) = *wait_ms {
1163                    if self.current_xid().is_some() {
1164                        return Err(RedDBError::Query(
1165                            "QUEUE READ … WAIT is autocommit-only: refusing to park inside an explicit transaction (BEGIN/COMMIT)"
1166                                .to_string(),
1167                        ));
1168                    }
1169                    let cap =
1170                        self.config_u64(QUEUE_MAX_WAIT_MS_CONFIG_KEY, QUEUE_MAX_WAIT_MS_DEFAULT);
1171                    if ms > cap {
1172                        return Err(RedDBError::Query(format!(
1173                            "QUEUE READ … WAIT {ms}ms exceeds server cap {QUEUE_MAX_WAIT_MS_CONFIG_KEY} = {cap}ms"
1174                        )));
1175                    }
1176                }
1177                // Resolve the consumer group up-front so the lifecycle
1178                // sees the same auto-created `_work_default` / fanout
1179                // group the legacy `read_messages` would have minted.
1180                let config = load_queue_config(store.as_ref(), queue);
1181                let group_owned =
1182                    resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
1183                let group_ref = group_owned.as_str();
1184                let delivered = self
1185                    .group_read_with_optional_wait(queue, group_ref, consumer, *count, *wait_ms)?;
1186
1187                // Issue #742 — record consumer presence on every read,
1188                // including empty returns. Heartbeat-driven aliveness
1189                // is the contract; pending deliveries don't define it.
1190                {
1191                    let lease_count = u32::try_from(delivered.len()).unwrap_or(u32::MAX);
1192                    let now_ns = std::time::SystemTime::now()
1193                        .duration_since(std::time::UNIX_EPOCH)
1194                        .map(|d| d.as_nanos() as u64)
1195                        .unwrap_or(0);
1196                    self.queue_presence().heartbeat(
1197                        queue,
1198                        group_ref,
1199                        consumer,
1200                        lease_count,
1201                        now_ns,
1202                    );
1203                }
1204
1205                let mut result = UnifiedResult::with_columns(vec![
1206                    "message_id".into(),
1207                    "payload".into(),
1208                    "consumer".into(),
1209                    "delivery_count".into(),
1210                    "attempts".into(),
1211                ]);
1212
1213                for message in delivered {
1214                    let mut record = UnifiedRecord::new();
1215                    record.set(
1216                        "message_id",
1217                        Value::text(message_id_string(EntityId::new(message.message_id))),
1218                    );
1219                    record.set("payload", message.payload);
1220                    record.set("consumer", Value::text(message.consumer));
1221                    record.set(
1222                        "delivery_count",
1223                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1224                    );
1225                    record.set(
1226                        "attempts",
1227                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1228                    );
1229                    result.push(record);
1230                }
1231                if !result.records.is_empty() {
1232                    self.invalidate_result_cache();
1233                }
1234
1235                Ok(RuntimeQueryResult {
1236                    query: raw_query.to_string(),
1237                    mode: QueryMode::Sql,
1238                    statement: "queue_group_read",
1239                    engine: "runtime-queue",
1240                    result,
1241                    affected_rows: 0,
1242                    statement_type: "select",
1243                    bookmark: None,
1244                })
1245            }
1246            QueueCommand::Pending { queue, group } => {
1247                let store = self.inner.db.store();
1248                ensure_queue_exists(store.as_ref(), queue)?;
1249                require_queue_group(store.as_ref(), queue, group)?;
1250                let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
1251                pending.sort_by_key(|entry| entry.delivered_at_ns);
1252                let current_time_ns = now_ns();
1253
1254                let mut result = UnifiedResult::with_columns(vec![
1255                    "message_id".into(),
1256                    "consumer".into(),
1257                    "delivered_at_ns".into(),
1258                    "delivery_count".into(),
1259                    "idle_ms".into(),
1260                ]);
1261                for entry in pending {
1262                    let mut record = UnifiedRecord::new();
1263                    record.set(
1264                        "message_id",
1265                        Value::text(message_id_string(entry.message_id)),
1266                    );
1267                    record.set("consumer", Value::text(entry.consumer));
1268                    record.set(
1269                        "delivered_at_ns",
1270                        Value::UnsignedInteger(entry.delivered_at_ns),
1271                    );
1272                    record.set(
1273                        "delivery_count",
1274                        Value::UnsignedInteger(u64::from(entry.delivery_count)),
1275                    );
1276                    record.set(
1277                        "idle_ms",
1278                        Value::UnsignedInteger(
1279                            current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
1280                        ),
1281                    );
1282                    result.push(record);
1283                }
1284
1285                Ok(RuntimeQueryResult {
1286                    query: raw_query.to_string(),
1287                    mode: QueryMode::Sql,
1288                    statement: "queue_pending",
1289                    engine: "runtime-queue",
1290                    result,
1291                    affected_rows: 0,
1292                    statement_type: "select",
1293                    bookmark: None,
1294                })
1295            }
1296            QueueCommand::Claim {
1297                queue,
1298                group,
1299                consumer,
1300                min_idle_ms,
1301            } => {
1302                let store = self.inner.db.store();
1303                ensure_queue_exists(store.as_ref(), queue)?;
1304                require_queue_group(store.as_ref(), queue, group)?;
1305                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
1306                let delivered = lifecycle
1307                    .claim_delivering(queue, consumer, *min_idle_ms, &txn)
1308                    .map_err(map_qse)?;
1309
1310                let mut result = UnifiedResult::with_columns(vec![
1311                    "message_id".into(),
1312                    "payload".into(),
1313                    "consumer".into(),
1314                    "delivery_count".into(),
1315                ]);
1316
1317                for message in delivered {
1318                    let mut record = UnifiedRecord::new();
1319                    record.set(
1320                        "message_id",
1321                        Value::text(message_id_string(EntityId::new(message.message_id))),
1322                    );
1323                    record.set("payload", message.payload);
1324                    record.set("consumer", Value::text(message.consumer));
1325                    record.set(
1326                        "delivery_count",
1327                        Value::UnsignedInteger(u64::from(message.delivery_count)),
1328                    );
1329                    result.push(record);
1330                }
1331                if !result.records.is_empty() {
1332                    self.invalidate_result_cache();
1333                }
1334                let affected_rows = result.records.len() as u64;
1335
1336                Ok(RuntimeQueryResult {
1337                    query: raw_query.to_string(),
1338                    mode: QueryMode::Sql,
1339                    statement: "queue_claim",
1340                    engine: "runtime-queue",
1341                    result,
1342                    affected_rows,
1343                    statement_type: "update",
1344                    bookmark: None,
1345                })
1346            }
1347            QueueCommand::Ack {
1348                queue,
1349                group,
1350                message_id,
1351                delivery_id,
1352            } => {
1353                let store = self.inner.db.store();
1354                ensure_queue_exists(store.as_ref(), queue)?;
1355                let (group_owned, message_entity) = resolve_ack_nack_handle(
1356                    store.as_ref(),
1357                    queue,
1358                    group,
1359                    message_id,
1360                    delivery_id.as_deref(),
1361                )?;
1362                let group_ref = group_owned.as_str();
1363                require_queue_group(store.as_ref(), queue, group_ref)?;
1364                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1365                let did = match delivery_id.as_deref() {
1366                    Some(d) => d.to_string(),
1367                    None => ps
1368                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
1369                        .ok_or_else(|| {
1370                            RedDBError::NotFound(format!(
1371                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
1372                                message_entity.raw(),
1373                                queue,
1374                                group_ref
1375                            ))
1376                        })?,
1377                };
1378                lifecycle.ack(&txn, &did).map_err(map_qse)?;
1379                self.invalidate_result_cache();
1380
1381                Ok(RuntimeQueryResult::ok_message(
1382                    raw_query.to_string(),
1383                    "message acknowledged",
1384                    "update",
1385                ))
1386            }
1387            QueueCommand::Nack {
1388                queue,
1389                group,
1390                message_id,
1391                delivery_id,
1392                delay_ms,
1393            } => {
1394                let store = self.inner.db.store();
1395                ensure_queue_exists(store.as_ref(), queue)?;
1396                let config = load_queue_config(store.as_ref(), queue);
1397                // Issue #723: a per-failure DELAY override is a write
1398                // operation that re-shapes retry behavior; readers must
1399                // not be able to silently re-schedule another worker's
1400                // work. Embedded callers (no auth identity attached)
1401                // are trusted and bypass the check.
1402                if delay_ms.is_some() {
1403                    if let Some((_, role)) = current_auth_identity() {
1404                        if !role.can_write() {
1405                            return Err(RedDBError::InvalidOperation(format!(
1406                                "role '{role}' is not authorized to override NACK retry delay on queue '{queue}'"
1407                            )));
1408                        }
1409                    }
1410                }
1411                let (group_owned, message_entity) = resolve_ack_nack_handle(
1412                    store.as_ref(),
1413                    queue,
1414                    group,
1415                    message_id,
1416                    delivery_id.as_deref(),
1417                )?;
1418                let group_ref = group_owned.as_str();
1419                require_queue_group(store.as_ref(), queue, group_ref)?;
1420                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
1421                let did = match delivery_id.as_deref() {
1422                    Some(d) => d.to_string(),
1423                    None => ps
1424                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
1425                        .ok_or_else(|| {
1426                            RedDBError::NotFound(format!(
1427                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
1428                                message_entity.raw(),
1429                                queue,
1430                                group_ref
1431                            ))
1432                        })?,
1433                };
1434                // Resolve the effective retry delay: per-failure
1435                // override wins, then queue default, then zero
1436                // (immediate requeue — pre-#723 behavior).
1437                let effective_delay_ms = delay_ms.or(config.retry_delay_ms).unwrap_or(0);
1438                let outcome = lifecycle.nack(&txn, &did).map_err(map_qse)?;
1439                // Apply delay only when the message was actually
1440                // requeued — DLQ promotion / drop terminate the
1441                // retry cycle and a delay would be meaningless.
1442                if matches!(outcome, RetirementOutcome::Requeued) && effective_delay_ms > 0 {
1443                    let at_ns =
1444                        now_ns().saturating_add(effective_delay_ms.saturating_mul(1_000_000));
1445                    set_message_available_at_ns(
1446                        store.as_ref(),
1447                        queue,
1448                        message_entity,
1449                        Some(at_ns),
1450                        config.ttl_ms,
1451                    )?;
1452                }
1453                // Issue #723: routine retries do not flood audit
1454                // channels (telemetry already covers them via
1455                // `queue_nacked_total{outcome=...}`). Significant
1456                // overrides — large delays, destination changes,
1457                // drops — are audited so operators see the events
1458                // that re-shape operational risk.
1459                self.maybe_emit_nack_audit(
1460                    queue,
1461                    group_ref,
1462                    &did,
1463                    *delay_ms,
1464                    config.retry_delay_ms,
1465                    &outcome,
1466                );
1467                let message = match outcome {
1468                    RetirementOutcome::Requeued => {
1469                        if effective_delay_ms > 0 {
1470                            format!("message requeued (delay={effective_delay_ms}ms)")
1471                        } else {
1472                            "message requeued".to_string()
1473                        }
1474                    }
1475                    RetirementOutcome::MovedToDlq(dlq) => {
1476                        format!("message moved to dead-letter queue '{}'", dlq)
1477                    }
1478                    RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
1479                };
1480                self.invalidate_result_cache();
1481
1482                Ok(RuntimeQueryResult::ok_message(
1483                    raw_query.to_string(),
1484                    &message,
1485                    "update",
1486                ))
1487            }
1488            QueueCommand::Move {
1489                source,
1490                destination,
1491                filter,
1492                limit,
1493            } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
1494        }
1495    }
1496
1497    pub fn execute_queue_select(
1498        &self,
1499        raw_query: &str,
1500        query: &QueueSelectQuery,
1501    ) -> RedDBResult<RuntimeQueryResult> {
1502        let store = self.inner.db.store();
1503        ensure_queue_exists(store.as_ref(), &query.queue)?;
1504        let config = load_queue_config(store.as_ref(), &query.queue);
1505        let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
1506        let columns = if query.columns.is_empty() {
1507            queue_projection_default_columns()
1508        } else {
1509            query.columns.clone()
1510        };
1511
1512        let mut messages =
1513            load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1514        sort_queue_messages(&mut messages, &config, QueueSide::Left);
1515
1516        let mut result = UnifiedResult::with_columns(columns.clone());
1517        for message in messages {
1518            if query
1519                .filter
1520                .as_ref()
1521                .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1522            {
1523                continue;
1524            }
1525            let record = queue_projection_record(&columns, &message, dlq)?;
1526            result.push(record);
1527            if query
1528                .limit
1529                .is_some_and(|limit| result.records.len() >= limit as usize)
1530            {
1531                break;
1532            }
1533        }
1534
1535        Ok(RuntimeQueryResult {
1536            query: raw_query.to_string(),
1537            mode: QueryMode::Sql,
1538            statement: "queue_select",
1539            engine: "runtime-queue",
1540            result,
1541            affected_rows: 0,
1542            statement_type: "select",
1543            bookmark: None,
1544        })
1545    }
1546
1547    fn execute_queue_move(
1548        &self,
1549        raw_query: &str,
1550        source: &str,
1551        destination: &str,
1552        filter: Option<&Filter>,
1553        limit: usize,
1554    ) -> RedDBResult<RuntimeQueryResult> {
1555        if source == destination {
1556            return Err(RedDBError::Query(
1557                "QUEUE MOVE source and destination must be different".to_string(),
1558            ));
1559        }
1560        let store = self.inner.db.store();
1561        ensure_queue_exists(store.as_ref(), source)?;
1562        ensure_queue_exists(store.as_ref(), destination)?;
1563        let source_config = load_queue_config(store.as_ref(), source);
1564        let destination_config = load_queue_config(store.as_ref(), destination);
1565        let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1566
1567        let mut messages =
1568            load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1569        sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1570        let selected = messages
1571            .into_iter()
1572            .filter(|message| {
1573                filter
1574                    .map(|f| queue_message_matches_filter(message, source_dlq, f))
1575                    .unwrap_or(true)
1576            })
1577            .take(limit)
1578            .collect::<Vec<_>>();
1579
1580        if let Some(max_size) = destination_config.max_size {
1581            let current_len =
1582                load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1583                    .len();
1584            if current_len + selected.len() > max_size {
1585                return Err(RedDBError::Query(format!(
1586                    "queue '{}' is full (max_size={max_size})",
1587                    destination
1588                )));
1589            }
1590        }
1591
1592        for message in &selected {
1593            let lock = queue_message_lock_handle(self, source, message.id);
1594            let Some(_guard) = lock.try_lock() else {
1595                return Err(RedDBError::Query(format!(
1596                    "message '{}' is locked on queue '{}'",
1597                    message.id.raw(),
1598                    source
1599                )));
1600            };
1601            if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1602                return Err(RedDBError::Query(format!(
1603                    "message '{}' is no longer available on queue '{}'",
1604                    message.id.raw(),
1605                    source
1606                )));
1607            }
1608        }
1609
1610        let mut inserted = Vec::new();
1611        for message in &selected {
1612            match insert_moved_queue_message(
1613                store.as_ref(),
1614                destination,
1615                &destination_config,
1616                message,
1617            ) {
1618                Ok(id) => inserted.push(id),
1619                Err(err) => {
1620                    for id in inserted {
1621                        let _ = store.delete(destination, id);
1622                    }
1623                    return Err(err);
1624                }
1625            }
1626        }
1627
1628        let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1629        for message in &selected {
1630            move_lifecycle
1631                .delete_with_state(source, message.id.raw(), &move_txn)
1632                .map_err(map_qse)?;
1633        }
1634        if !selected.is_empty() {
1635            self.invalidate_result_cache();
1636        }
1637
1638        let selected_count = selected.len() as u64;
1639        self.audit_log().record_event(
1640            AuditEvent::builder("queue/move")
1641                .source(AuditAuthSource::System)
1642                .outcome(Outcome::Success)
1643                .resource(format!("queue:{source}->{destination}"))
1644                .fields([
1645                    AuditFieldEscaper::field("source", source),
1646                    AuditFieldEscaper::field("destination", destination),
1647                    AuditFieldEscaper::field("selected", selected_count),
1648                    AuditFieldEscaper::field("committed", selected_count),
1649                ])
1650                .build(),
1651        );
1652
1653        let mut result = UnifiedResult::with_columns(vec![
1654            "source".into(),
1655            "destination".into(),
1656            "selected".into(),
1657            "committed".into(),
1658        ]);
1659        let mut record = UnifiedRecord::new();
1660        record.set("source", Value::text(source.to_string()));
1661        record.set("destination", Value::text(destination.to_string()));
1662        record.set("selected", Value::UnsignedInteger(selected_count));
1663        record.set("committed", Value::UnsignedInteger(selected_count));
1664        result.push(record);
1665
1666        Ok(RuntimeQueryResult {
1667            query: raw_query.to_string(),
1668            mode: QueryMode::Sql,
1669            statement: "queue_move",
1670            engine: "runtime-queue",
1671            result,
1672            affected_rows: selected_count,
1673            statement_type: "update",
1674            bookmark: None,
1675        })
1676    }
1677
1678    /// Issue #723: routine retries are observable through metrics
1679    /// (`queue_nacked_total{outcome=...}`) so this is the audit
1680    /// shoulder for the *non-routine* cases: explicit NACK delay
1681    /// overrides whose magnitude or destination-changing impact would
1682    /// be invisible in metrics alone. Specifically:
1683    ///
1684    /// - Explicit override ≥ 60s (a worker decided to defer well past
1685    ///   the queue's default cadence — operators care).
1686    /// - Override that lands on a DLQ promotion or drop (destination
1687    ///   changed; the override may have influenced retire-vs-requeue
1688    ///   accounting on the caller's side and the audit trail needs to
1689    ///   show who asked).
1690    ///
1691    /// Calls with no override are intentionally silent here.
1692    fn maybe_emit_nack_audit(
1693        &self,
1694        queue: &str,
1695        group: &str,
1696        delivery_id: &str,
1697        override_ms: Option<u64>,
1698        default_ms: Option<u64>,
1699        outcome: &RetirementOutcome,
1700    ) {
1701        let Some(override_ms) = override_ms else {
1702            return;
1703        };
1704        let outcome_label = match outcome {
1705            RetirementOutcome::Requeued => "requeued",
1706            RetirementOutcome::MovedToDlq(_) => "dlq",
1707            RetirementOutcome::Dropped => "dropped",
1708        };
1709        const SIGNIFICANT_DELAY_MS: u64 = 60_000;
1710        let destination_changed = !matches!(outcome, RetirementOutcome::Requeued);
1711        if override_ms < SIGNIFICANT_DELAY_MS && !destination_changed {
1712            return;
1713        }
1714        self.audit_log().record_event(
1715            AuditEvent::builder("queue/nack/override")
1716                .source(AuditAuthSource::System)
1717                .outcome(Outcome::Success)
1718                .resource(format!("queue:{queue}"))
1719                .fields([
1720                    AuditFieldEscaper::field("queue", queue),
1721                    AuditFieldEscaper::field("group", group),
1722                    AuditFieldEscaper::field("delivery_id", delivery_id),
1723                    AuditFieldEscaper::field("override_delay_ms", override_ms),
1724                    AuditFieldEscaper::field("default_delay_ms", default_ms.unwrap_or(0)),
1725                    AuditFieldEscaper::field("outcome", outcome_label),
1726                ])
1727                .build(),
1728        );
1729    }
1730}
1731
1732fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1733    if store.get_collection(queue).is_some() {
1734        Ok(())
1735    } else {
1736        Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1737    }
1738}
1739
1740pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1741    let default = QueueRuntimeConfig {
1742        mode: QueueMode::Work,
1743        priority: false,
1744        max_size: None,
1745        ttl_ms: None,
1746        dlq: None,
1747        max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1748        lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1749        in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1750        retry_delay_ms: None,
1751    };
1752
1753    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1754        return default;
1755    };
1756    manager
1757        .query_all(|entity| {
1758            entity.data.as_row().is_some_and(|row| {
1759                row_text(row, "kind").as_deref() == Some("queue_config")
1760                    && row_text(row, "queue").as_deref() == Some(queue)
1761            })
1762        })
1763        .into_iter()
1764        .find_map(|entity| {
1765            let row = entity.data.as_row()?;
1766            Some(QueueRuntimeConfig {
1767                mode: row_text(row, "mode")
1768                    .as_deref()
1769                    .and_then(QueueMode::parse)
1770                    .unwrap_or_default(),
1771                priority: row_bool(row, "priority").unwrap_or(false),
1772                max_size: row_u64(row, "max_size").map(|value| value as usize),
1773                ttl_ms: row_u64(row, "ttl_ms"),
1774                dlq: row_text(row, "dlq"),
1775                max_attempts: row_u64(row, "max_attempts")
1776                    .map(|value| value as u32)
1777                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1778                lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1779                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1780                in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1781                    .map(|value| value as u32)
1782                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1783                retry_delay_ms: row_u64(row, "retry_delay_ms").filter(|v| *v > 0),
1784            })
1785        })
1786        .unwrap_or(default)
1787}
1788
1789pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1790    load_queue_config(store, queue).mode.as_str()
1791}
1792
1793fn save_queue_config(
1794    store: &UnifiedStore,
1795    queue: &str,
1796    config: &QueueRuntimeConfig,
1797) -> RedDBResult<()> {
1798    remove_meta_rows(store, |row| {
1799        row_text(row, "kind").as_deref() == Some("queue_config")
1800            && row_text(row, "queue").as_deref() == Some(queue)
1801    });
1802
1803    let mut fields = HashMap::new();
1804    fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1805    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1806    fields.insert(
1807        "mode".to_string(),
1808        Value::text(config.mode.as_str().to_string()),
1809    );
1810    fields.insert("priority".to_string(), Value::Boolean(config.priority));
1811    fields.insert(
1812        "max_size".to_string(),
1813        config
1814            .max_size
1815            .map(|value| Value::UnsignedInteger(value as u64))
1816            .unwrap_or(Value::Null),
1817    );
1818    fields.insert(
1819        "ttl_ms".to_string(),
1820        config
1821            .ttl_ms
1822            .map(Value::UnsignedInteger)
1823            .unwrap_or(Value::Null),
1824    );
1825    fields.insert(
1826        "dlq".to_string(),
1827        config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1828    );
1829    fields.insert(
1830        "max_attempts".to_string(),
1831        Value::UnsignedInteger(u64::from(config.max_attempts)),
1832    );
1833    fields.insert(
1834        "lock_deadline_ms".to_string(),
1835        Value::UnsignedInteger(config.lock_deadline_ms),
1836    );
1837    fields.insert(
1838        "in_flight_cap_per_group".to_string(),
1839        Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1840    );
1841    fields.insert(
1842        "retry_delay_ms".to_string(),
1843        config
1844            .retry_delay_ms
1845            .map(Value::UnsignedInteger)
1846            .unwrap_or(Value::Null),
1847    );
1848    insert_meta_row(store, fields)
1849}
1850
1851fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1852    remove_meta_rows(store, |row| {
1853        row_text(row, "queue").as_deref() == Some(queue)
1854    });
1855}
1856
1857fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1858    Ok(load_queue_groups(store, queue)?
1859        .into_iter()
1860        .any(|entry| entry.group == group))
1861}
1862
1863pub(super) fn require_queue_group(
1864    store: &UnifiedStore,
1865    queue: &str,
1866    group: &str,
1867) -> RedDBResult<()> {
1868    if queue_group_exists(store, queue, group)? {
1869        Ok(())
1870    } else {
1871        Err(RedDBError::NotFound(format!(
1872            "consumer group '{}' not found on queue '{}'",
1873            group, queue
1874        )))
1875    }
1876}
1877
1878pub(super) fn resolve_read_group(
1879    store: &UnifiedStore,
1880    queue: &str,
1881    group: Option<&str>,
1882    consumer: &str,
1883    config: &QueueRuntimeConfig,
1884) -> RedDBResult<String> {
1885    if let Some(group) = group {
1886        require_queue_group(store, queue, group)?;
1887        return Ok(group.to_string());
1888    }
1889
1890    match config.mode {
1891        QueueMode::Work => {
1892            if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1893                save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1894            }
1895            Ok(WORK_DEFAULT_GROUP.to_string())
1896        }
1897        QueueMode::Fanout => {
1898            let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1899            if !queue_group_exists(store, queue, &fanout_group)? {
1900                save_queue_group(store, queue, &fanout_group)?;
1901            }
1902            Ok(fanout_group)
1903        }
1904    }
1905}
1906
1907fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1908    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1909        return Ok(Vec::new());
1910    };
1911    Ok(manager
1912        .query_all(|entity| {
1913            entity.data.as_row().is_some_and(|row| {
1914                row_text(row, "kind").as_deref() == Some("queue_group")
1915                    && row_text(row, "queue").as_deref() == Some(queue)
1916            })
1917        })
1918        .into_iter()
1919        .filter_map(|entity| {
1920            let row = entity.data.as_row()?;
1921            Some(QueueGroupEntry {
1922                entity_id: entity.id,
1923                group: row_text(row, "group")?,
1924            })
1925        })
1926        .collect())
1927}
1928
1929fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1930    let mut fields = HashMap::new();
1931    fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1932    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1933    fields.insert("group".to_string(), Value::text(group.to_string()));
1934    fields.insert(
1935        "created_at_ns".to_string(),
1936        Value::UnsignedInteger(now_ns()),
1937    );
1938    insert_meta_row(store, fields)
1939}
1940
1941pub(super) fn load_pending_entries(
1942    store: &UnifiedStore,
1943    queue: &str,
1944    group: Option<&str>,
1945    message_id: Option<EntityId>,
1946) -> RedDBResult<Vec<QueuePendingEntry>> {
1947    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1948        return Ok(Vec::new());
1949    };
1950    Ok(manager
1951        .query_all(|entity| {
1952            entity.data.as_row().is_some_and(|row| {
1953                row_text(row, "kind").as_deref() == Some("queue_pending")
1954                    && row_text(row, "queue").as_deref() == Some(queue)
1955                    && group
1956                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1957                        .unwrap_or(true)
1958                    && message_id
1959                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1960                        .unwrap_or(true)
1961            })
1962        })
1963        .into_iter()
1964        .filter_map(|entity| {
1965            let row = entity.data.as_row()?;
1966            Some(QueuePendingEntry {
1967                entity_id: entity.id,
1968                group: row_text(row, "group")?,
1969                message_id: EntityId::new(row_u64(row, "message_id")?),
1970                consumer: row_text(row, "consumer")?,
1971                delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1972                delivery_count: row_u64(row, "delivery_count")
1973                    .map(|value| value as u32)
1974                    .unwrap_or(1),
1975            })
1976        })
1977        .collect())
1978}
1979
1980pub(super) fn save_queue_pending(
1981    store: &UnifiedStore,
1982    queue: &str,
1983    group: &str,
1984    message_id: EntityId,
1985    consumer: &str,
1986    delivered_at_ns: u64,
1987    delivery_count: u32,
1988) -> RedDBResult<()> {
1989    remove_meta_rows(store, |row| {
1990        row_text(row, "kind").as_deref() == Some("queue_pending")
1991            && row_text(row, "queue").as_deref() == Some(queue)
1992            && row_text(row, "group").as_deref() == Some(group)
1993            && row_u64(row, "message_id") == Some(message_id.raw())
1994    });
1995
1996    let mut fields = HashMap::new();
1997    fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1998    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1999    fields.insert("group".to_string(), Value::text(group.to_string()));
2000    fields.insert(
2001        "message_id".to_string(),
2002        Value::UnsignedInteger(message_id.raw()),
2003    );
2004    fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
2005    fields.insert(
2006        "delivered_at_ns".to_string(),
2007        Value::UnsignedInteger(delivered_at_ns),
2008    );
2009    fields.insert(
2010        "delivery_count".to_string(),
2011        Value::UnsignedInteger(u64::from(delivery_count)),
2012    );
2013    insert_meta_row(store, fields)
2014}
2015
2016pub(super) fn require_pending_entry(
2017    store: &UnifiedStore,
2018    queue: &str,
2019    group: &str,
2020    message_id: EntityId,
2021) -> RedDBResult<QueuePendingEntry> {
2022    load_pending_entries(store, queue, Some(group), Some(message_id))?
2023        .into_iter()
2024        .next()
2025        .ok_or_else(|| {
2026            RedDBError::NotFound(format!(
2027                "message '{}' is not pending in group '{}' on queue '{}'",
2028                message_id.raw(),
2029                group,
2030                queue
2031            ))
2032        })
2033}
2034
2035pub(super) fn load_ack_entries(
2036    store: &UnifiedStore,
2037    queue: &str,
2038    group: Option<&str>,
2039    message_id: Option<EntityId>,
2040) -> RedDBResult<Vec<QueueAckEntry>> {
2041    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2042        return Ok(Vec::new());
2043    };
2044    Ok(manager
2045        .query_all(|entity| {
2046            entity.data.as_row().is_some_and(|row| {
2047                row_text(row, "kind").as_deref() == Some("queue_ack")
2048                    && row_text(row, "queue").as_deref() == Some(queue)
2049                    && group
2050                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
2051                        .unwrap_or(true)
2052                    && message_id
2053                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
2054                        .unwrap_or(true)
2055            })
2056        })
2057        .into_iter()
2058        .filter_map(|entity| {
2059            let row = entity.data.as_row()?;
2060            Some(QueueAckEntry {
2061                entity_id: entity.id,
2062                group: row_text(row, "group")?,
2063                message_id: EntityId::new(row_u64(row, "message_id")?),
2064            })
2065        })
2066        .collect())
2067}
2068
2069pub(super) fn save_queue_ack(
2070    store: &UnifiedStore,
2071    queue: &str,
2072    group: &str,
2073    message_id: EntityId,
2074) -> RedDBResult<()> {
2075    let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
2076    if !existing.is_empty() {
2077        return Ok(());
2078    }
2079
2080    let mut fields = HashMap::new();
2081    fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
2082    fields.insert("queue".to_string(), Value::text(queue.to_string()));
2083    fields.insert("group".to_string(), Value::text(group.to_string()));
2084    fields.insert(
2085        "message_id".to_string(),
2086        Value::UnsignedInteger(message_id.raw()),
2087    );
2088    fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
2089    insert_meta_row(store, fields)
2090}
2091
2092pub(super) fn queue_message_completed_for_all_groups(
2093    store: &UnifiedStore,
2094    queue: &str,
2095    message_id: EntityId,
2096) -> RedDBResult<bool> {
2097    let groups = load_queue_groups(store, queue)?;
2098    let pending = load_pending_entries(store, queue, None, Some(message_id))?;
2099    if !pending.is_empty() {
2100        return Ok(false);
2101    }
2102    if groups.is_empty() {
2103        return Ok(true);
2104    }
2105
2106    let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
2107        .into_iter()
2108        .map(|entry| entry.group)
2109        .collect::<HashSet<_>>();
2110    Ok(groups
2111        .into_iter()
2112        .all(|group| acked_groups.contains(&group.group)))
2113}
2114
2115fn load_queue_message_views(
2116    store: &UnifiedStore,
2117    queue: &str,
2118) -> RedDBResult<Vec<QueueMessageView>> {
2119    load_queue_message_views_with_runtime(None, store, queue)
2120}
2121
2122/// Kind-aware queue scan (Phase 2.5.5 RLS universal). When the
2123/// caller has a `RedDBRuntime` reference, the gate also applies
2124/// any `CREATE POLICY ... ON MESSAGES OF <queue>` predicate. In
2125/// autocommit / embedded paths that only have the raw store (e.g.
2126/// purge loops) we skip RLS because there's no session identity
2127/// to match against.
2128pub(super) fn load_queue_message_views_with_runtime(
2129    runtime: Option<&RedDBRuntime>,
2130    store: &UnifiedStore,
2131    queue: &str,
2132) -> RedDBResult<Vec<QueueMessageView>> {
2133    let manager = store
2134        .get_collection(queue)
2135        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
2136    // Phase 1.2 MVCC universal: capture before parallel scan. Messages
2137    // inserted by another connection's open txn stay invisible to
2138    // consumers until that txn commits (prevents phantom POPs).
2139    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
2140    let rls_filter = runtime.and_then(|rt| {
2141        crate::runtime::impl_core::rls_policy_filter_for_kind(
2142            rt,
2143            queue,
2144            crate::storage::query::ast::PolicyAction::Select,
2145            crate::storage::query::ast::PolicyTargetKind::Messages,
2146        )
2147    });
2148    let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
2149        && rls_filter.is_none()
2150        && runtime.is_some();
2151    if rls_enabled_but_denied {
2152        // RLS on + no Messages policy for this role = deny-default.
2153        return Ok(Vec::new());
2154    }
2155    let filter_arc = rls_filter.map(std::sync::Arc::new);
2156    let rt_arc = runtime;
2157    Ok(manager
2158        .query_all(move |entity| {
2159            if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
2160                return false;
2161            }
2162            if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
2163                return false;
2164            }
2165            if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
2166                return crate::runtime::query_exec::evaluate_entity_filter_with_db(
2167                    Some(&rt.inner.db),
2168                    entity,
2169                    filter,
2170                    queue,
2171                    queue,
2172                );
2173            }
2174            true
2175        })
2176        .into_iter()
2177        .filter_map(queue_message_view_from_entity)
2178        .map(|mut view| {
2179            view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2180            view
2181        })
2182        .collect())
2183}
2184
2185fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
2186    let (position, _) = match &entity.kind {
2187        EntityKind::QueueMessage { position, queue } => (*position, queue),
2188        _ => return None,
2189    };
2190    let data = match entity.data {
2191        EntityData::QueueMessage(data) => data,
2192        _ => return None,
2193    };
2194    Some(QueueMessageView {
2195        id: entity.id,
2196        position,
2197        priority: data.priority.unwrap_or(0),
2198        payload: data.payload,
2199        attempts: data.attempts,
2200        max_attempts: data.max_attempts,
2201        enqueued_at_ns: data.enqueued_at_ns,
2202        available_at_ns: None,
2203    })
2204}
2205
2206/// Insert a moved payload onto `queue` using only the payload value —
2207/// priority / attempts / TTL fall back to the destination queue's
2208/// catalog config (mirrors a fresh enqueue rather than carrying source
2209/// metadata over). Used by `PrimaryQueueStore::move_to_queue`, the
2210/// `QueueLifecycle::move_between_queues` adapter that owns only
2211/// `(message_id, payload)` after `pop_messages` retires the source row.
2212pub(super) fn insert_moved_queue_message_payload(
2213    store: &UnifiedStore,
2214    queue: &str,
2215    payload: &Value,
2216) -> RedDBResult<EntityId> {
2217    let config = load_queue_config(store, queue);
2218    let position = next_queue_position(store, queue, QueueSide::Right)?;
2219    let enqueued_at_ns = std::time::SystemTime::now()
2220        .duration_since(std::time::UNIX_EPOCH)
2221        .map(|d| d.as_nanos() as u64)
2222        .unwrap_or(0);
2223    let entity = UnifiedEntity::new(
2224        EntityId::new(0),
2225        EntityKind::QueueMessage {
2226            queue: queue.to_string(),
2227            position,
2228        },
2229        EntityData::QueueMessage(QueueMessageData {
2230            payload: payload.clone(),
2231            priority: None,
2232            enqueued_at_ns,
2233            attempts: 0,
2234            max_attempts: config.max_attempts,
2235            acked: false,
2236        }),
2237    );
2238    let id = store
2239        .insert_auto(queue, entity)
2240        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2241    if let Some(ttl_ms) = config.ttl_ms {
2242        store
2243            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2244            .map_err(|err| RedDBError::Internal(err.to_string()))?;
2245    }
2246    Ok(id)
2247}
2248
2249fn insert_moved_queue_message(
2250    store: &UnifiedStore,
2251    queue: &str,
2252    config: &QueueRuntimeConfig,
2253    message: &QueueMessageView,
2254) -> RedDBResult<EntityId> {
2255    let position = next_queue_position(store, queue, QueueSide::Right)?;
2256    let entity = UnifiedEntity::new(
2257        EntityId::new(0),
2258        EntityKind::QueueMessage {
2259            queue: queue.to_string(),
2260            position,
2261        },
2262        EntityData::QueueMessage(QueueMessageData {
2263            payload: message.payload.clone(),
2264            priority: if config.priority {
2265                Some(message.priority)
2266            } else {
2267                None
2268            },
2269            enqueued_at_ns: message.enqueued_at_ns,
2270            attempts: message.attempts,
2271            max_attempts: message.max_attempts,
2272            acked: false,
2273        }),
2274    );
2275    let id = store
2276        .insert_auto(queue, entity)
2277        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2278    if let Some(ttl_ms) = config.ttl_ms {
2279        store
2280            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
2281            .map_err(|err| RedDBError::Internal(err.to_string()))?;
2282    }
2283    Ok(id)
2284}
2285
2286fn queue_projection_default_columns() -> Vec<String> {
2287    [
2288        "id",
2289        "payload",
2290        "priority",
2291        "attempts",
2292        "last_error",
2293        "enqueued_at",
2294        "available_at",
2295        "dlq",
2296        "tenant",
2297    ]
2298    .into_iter()
2299    .map(str::to_string)
2300    .collect()
2301}
2302
2303fn queue_projection_record(
2304    columns: &[String],
2305    message: &QueueMessageView,
2306    dlq: bool,
2307) -> RedDBResult<UnifiedRecord> {
2308    let mut record = UnifiedRecord::new();
2309    for column in columns {
2310        let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
2311            RedDBError::Query(format!("unknown queue projection column '{}'", column))
2312        })?;
2313        record.set(column, value);
2314    }
2315    Ok(record)
2316}
2317
2318fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
2319    match column {
2320        "id" => Some(Value::text(message_id_string(message.id))),
2321        "payload" => Some(message.payload.clone()),
2322        "priority" => Some(Value::Integer(i64::from(message.priority))),
2323        "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
2324        "last_error" => Some(Value::Null),
2325        "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
2326        "available_at" => Some(Value::UnsignedInteger(
2327            message.available_at_ns.unwrap_or(message.enqueued_at_ns),
2328        )),
2329        "dlq" => Some(Value::Boolean(dlq)),
2330        "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
2331        _ => None,
2332    }
2333}
2334
2335fn queue_message_tenant(payload: &Value) -> Option<Value> {
2336    let Value::Json(bytes) = payload else {
2337        return None;
2338    };
2339    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2340    json.get("tenant")
2341        .and_then(crate::json::Value::as_str)
2342        .map(|tenant| Value::text(tenant.to_string()))
2343}
2344
2345fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
2346    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2347        return false;
2348    };
2349    !manager
2350        .query_all(|entity| {
2351            entity.data.as_row().is_some_and(|row| {
2352                row_text(row, "kind").as_deref() == Some("queue_config")
2353                    && row_text(row, "dlq").as_deref() == Some(queue)
2354            })
2355        })
2356        .is_empty()
2357}
2358
2359fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
2360    match filter {
2361        Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
2362            .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
2363        Filter::CompareFields { left, op, right } => {
2364            match (
2365                queue_filter_field_value(message, dlq, left),
2366                queue_filter_field_value(message, dlq, right),
2367            ) {
2368                (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
2369                _ => false,
2370            }
2371        }
2372        Filter::And(left, right) => {
2373            queue_message_matches_filter(message, dlq, left)
2374                && queue_message_matches_filter(message, dlq, right)
2375        }
2376        Filter::Or(left, right) => {
2377            queue_message_matches_filter(message, dlq, left)
2378                || queue_message_matches_filter(message, dlq, right)
2379        }
2380        Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
2381        Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
2382            .is_none_or(|value| matches!(value, Value::Null)),
2383        Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
2384            .is_some_and(|value| !matches!(value, Value::Null)),
2385        Filter::In { field, values } => {
2386            queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
2387                values
2388                    .iter()
2389                    .any(|value| queue_values_equal(&candidate, value))
2390            })
2391        }
2392        Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
2393            .is_some_and(|candidate| {
2394                queue_compare_values(&candidate, low, CompareOp::Ge)
2395                    && queue_compare_values(&candidate, high, CompareOp::Le)
2396            }),
2397        Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
2398            .is_some_and(|value| queue_like_matches(&value, pattern)),
2399        Filter::StartsWith { field, prefix } => {
2400            queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
2401        }
2402        Filter::EndsWith { field, suffix } => {
2403            queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
2404        }
2405        Filter::Contains { field, substring } => {
2406            queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
2407        }
2408        Filter::CompareExpr { .. } => false,
2409    }
2410}
2411
2412fn queue_filter_field_value(
2413    message: &QueueMessageView,
2414    dlq: bool,
2415    field: &FieldRef,
2416) -> Option<Value> {
2417    match field {
2418        FieldRef::TableColumn { table, column } if table.is_empty() => {
2419            queue_projection_value(message, dlq, column)
2420                .or_else(|| queue_payload_field_value(&message.payload, column))
2421        }
2422        FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
2423            .or_else(|| queue_payload_field_value(&message.payload, column)),
2424        _ => None,
2425    }
2426}
2427
2428fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
2429    let Value::Json(bytes) = payload else {
2430        return None;
2431    };
2432    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
2433    let value = json.get(field)?;
2434    json_value_to_schema_value(value)
2435}
2436
2437fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
2438    if matches!(value, crate::json::Value::Null) {
2439        Some(Value::Null)
2440    } else if let Some(value) = value.as_bool() {
2441        Some(Value::Boolean(value))
2442    } else if let Some(value) = value.as_i64() {
2443        Some(Value::Integer(value))
2444    } else if let Some(value) = value.as_u64() {
2445        Some(Value::UnsignedInteger(value))
2446    } else if let Some(value) = value.as_f64() {
2447        Some(Value::Float(value))
2448    } else if let Some(value) = value.as_str() {
2449        Some(Value::text(value.to_string()))
2450    } else {
2451        Some(Value::Json(value.to_string_compact().into_bytes()))
2452    }
2453}
2454
2455fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
2456    queue_filter_field_value(message, dlq, field).and_then(|value| match value {
2457        Value::Text(value) => Some(value.to_string()),
2458        Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
2459        Value::Integer(value) => Some(value.to_string()),
2460        Value::UnsignedInteger(value) => Some(value.to_string()),
2461        Value::Float(value) => Some(value.to_string()),
2462        Value::Boolean(value) => Some(value.to_string()),
2463        _ => None,
2464    })
2465}
2466
2467fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
2468    match op {
2469        CompareOp::Eq => queue_values_equal(left, right),
2470        CompareOp::Ne => !queue_values_equal(left, right),
2471        CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
2472        CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
2473        CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
2474        CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
2475    }
2476}
2477
2478fn queue_values_equal(left: &Value, right: &Value) -> bool {
2479    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2480        return (left - right).abs() < f64::EPSILON;
2481    }
2482    match (left, right) {
2483        (Value::Text(left), Value::Text(right)) => left == right,
2484        (Value::Boolean(left), Value::Boolean(right)) => left == right,
2485        _ => left == right,
2486    }
2487}
2488
2489fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
2490    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
2491        return left.partial_cmp(&right);
2492    }
2493    match (left, right) {
2494        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
2495        _ => None,
2496    }
2497}
2498
2499fn queue_value_number(value: &Value) -> Option<f64> {
2500    match value {
2501        Value::Integer(value) => Some(*value as f64),
2502        Value::UnsignedInteger(value) => Some(*value as f64),
2503        Value::Float(value) => Some(*value),
2504        Value::Text(value) => value.parse().ok(),
2505        _ => None,
2506    }
2507}
2508
2509fn queue_like_matches(value: &str, pattern: &str) -> bool {
2510    if pattern == "%" {
2511        return true;
2512    }
2513    let starts_wild = pattern.starts_with('%');
2514    let ends_wild = pattern.ends_with('%');
2515    let needle = pattern.trim_matches('%');
2516    match (starts_wild, ends_wild) {
2517        (true, true) => value.contains(needle),
2518        (true, false) => value.ends_with(needle),
2519        (false, true) => value.starts_with(needle),
2520        (false, false) => value == needle,
2521    }
2522}
2523
2524pub(super) fn queue_message_view_by_id(
2525    store: &UnifiedStore,
2526    queue: &str,
2527    message_id: EntityId,
2528) -> RedDBResult<Option<QueueMessageView>> {
2529    let manager = queue_manager(store, queue)?;
2530    Ok(manager
2531        .get(message_id)
2532        .and_then(queue_message_view_from_entity)
2533        .map(|mut view| {
2534            view.available_at_ns = read_message_available_at_ns(store, queue, view.id);
2535            view
2536        }))
2537}
2538
2539pub(super) fn sort_queue_messages(
2540    messages: &mut [QueueMessageView],
2541    config: &QueueRuntimeConfig,
2542    side: QueueSide,
2543) {
2544    messages.sort_by(|left, right| {
2545        if config.priority {
2546            right
2547                .priority
2548                .cmp(&left.priority)
2549                .then_with(|| match side {
2550                    QueueSide::Left => left.position.cmp(&right.position),
2551                    QueueSide::Right => right.position.cmp(&left.position),
2552                })
2553                .then_with(|| left.id.raw().cmp(&right.id.raw()))
2554        } else {
2555            match side {
2556                QueueSide::Left => left.position.cmp(&right.position),
2557                QueueSide::Right => right.position.cmp(&left.position),
2558            }
2559            .then_with(|| left.id.raw().cmp(&right.id.raw()))
2560        }
2561    });
2562}
2563
2564pub(super) fn next_queue_position(
2565    store: &UnifiedStore,
2566    queue: &str,
2567    side: QueueSide,
2568) -> RedDBResult<u64> {
2569    let messages = load_queue_message_views(store, queue)?;
2570    if messages.is_empty() {
2571        return Ok(QUEUE_POSITION_CENTER);
2572    }
2573    match side {
2574        QueueSide::Left => Ok(messages
2575            .iter()
2576            .map(|message| message.position)
2577            .min()
2578            .unwrap_or(QUEUE_POSITION_CENTER)
2579            .saturating_sub(1)),
2580        QueueSide::Right => Ok(messages
2581            .iter()
2582            .map(|message| message.position)
2583            .max()
2584            .unwrap_or(QUEUE_POSITION_CENTER)
2585            .saturating_add(1)),
2586    }
2587}
2588
2589pub(super) fn increment_queue_attempts(
2590    store: &UnifiedStore,
2591    queue: &str,
2592    message_id: EntityId,
2593) -> RedDBResult<u32> {
2594    let manager = queue_manager(store, queue)?;
2595    let mut entity = manager
2596        .get(message_id)
2597        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2598    match &mut entity.data {
2599        EntityData::QueueMessage(message) => {
2600            message.attempts = message.attempts.saturating_add(1);
2601            let attempts = message.attempts;
2602            manager
2603                .update(entity)
2604                .map_err(|err| RedDBError::Internal(err.to_string()))?;
2605            Ok(attempts)
2606        }
2607        _ => Err(RedDBError::Query(format!(
2608            "entity '{}' is not a queue message",
2609            message_id.raw()
2610        ))),
2611    }
2612}
2613
2614pub(super) fn queue_message_attempts(
2615    store: &UnifiedStore,
2616    queue: &str,
2617    message_id: EntityId,
2618) -> RedDBResult<u32> {
2619    Ok(queue_message_data(store, queue, message_id)?.attempts)
2620}
2621
2622pub(super) fn queue_message_max_attempts(
2623    store: &UnifiedStore,
2624    queue: &str,
2625    message_id: EntityId,
2626) -> RedDBResult<u32> {
2627    Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2628}
2629
2630pub(super) fn queue_message_payload(
2631    store: &UnifiedStore,
2632    queue: &str,
2633    message_id: EntityId,
2634) -> RedDBResult<Value> {
2635    Ok(queue_message_data(store, queue, message_id)?.payload)
2636}
2637
2638pub(super) fn queue_message_pending_any(
2639    store: &UnifiedStore,
2640    queue: &str,
2641    message_id: EntityId,
2642) -> RedDBResult<bool> {
2643    Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2644}
2645
2646pub(super) fn queue_message_pending_for_group(
2647    store: &UnifiedStore,
2648    queue: &str,
2649    group: &str,
2650    message_id: EntityId,
2651) -> RedDBResult<bool> {
2652    Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2653}
2654
2655pub(super) fn queue_message_acked_for_group(
2656    store: &UnifiedStore,
2657    queue: &str,
2658    group: &str,
2659    message_id: EntityId,
2660) -> RedDBResult<bool> {
2661    Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2662}
2663
2664fn queue_manager(
2665    store: &UnifiedStore,
2666    queue: &str,
2667) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2668    store
2669        .get_collection(queue)
2670        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2671}
2672
2673pub(super) fn queue_message_data(
2674    store: &UnifiedStore,
2675    queue: &str,
2676    message_id: EntityId,
2677) -> RedDBResult<QueueMessageData> {
2678    let manager = queue_manager(store, queue)?;
2679    let entity = manager
2680        .get(message_id)
2681        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2682    match entity.data {
2683        EntityData::QueueMessage(message) => Ok(message),
2684        _ => Err(RedDBError::Query(format!(
2685            "entity '{}' is not a queue message",
2686            message_id.raw()
2687        ))),
2688    }
2689}
2690
2691fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2692    let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2693    store
2694        .insert_auto(
2695            QUEUE_META_COLLECTION,
2696            UnifiedEntity::new(
2697                EntityId::new(0),
2698                EntityKind::TableRow {
2699                    table: Arc::from(QUEUE_META_COLLECTION),
2700                    row_id: 0,
2701                },
2702                EntityData::Row(RowData {
2703                    columns: Vec::new(),
2704                    named: Some(fields),
2705                    schema: None,
2706                }),
2707            ),
2708        )
2709        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2710    Ok(())
2711}
2712
2713pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2714    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2715        return;
2716    };
2717    let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2718    for row in rows {
2719        let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2720    }
2721}
2722
2723pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2724    let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2725}
2726
2727fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2728    format!("{queue}:{}", message_id.raw())
2729}
2730
2731pub(super) fn queue_message_lock_handle(
2732    runtime: &RedDBRuntime,
2733    queue: &str,
2734    message_id: EntityId,
2735) -> Arc<parking_lot::Mutex<()>> {
2736    let key = queue_message_lock_key(queue, message_id);
2737    if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2738        return lock;
2739    }
2740
2741    let mut locks = runtime.inner.queue_message_locks.write();
2742    locks
2743        .entry(key)
2744        .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2745        .clone()
2746}
2747
2748pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2749    runtime
2750        .inner
2751        .queue_message_locks
2752        .write()
2753        .remove(&queue_message_lock_key(queue, message_id));
2754}
2755
2756fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2757    let raw = value.strip_prefix('e').unwrap_or(value);
2758    raw.parse::<u64>()
2759        .map(EntityId::new)
2760        .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2761}
2762
2763/// ADR 0026: resolve the ACK/NACK handle. When `delivery_id` is supplied,
2764/// it wins unconditionally — strict failure if the handle does not resolve
2765/// to a live pending delivery on `queue`. When only the legacy tuple is
2766/// supplied, emit a rate-limited deprecation log line and use the tuple.
2767/// At least one handle must be present.
2768pub(super) fn resolve_ack_nack_handle(
2769    store: &UnifiedStore,
2770    queue: &str,
2771    group_hint: &str,
2772    message_id_hint: &str,
2773    delivery_id: Option<&str>,
2774) -> RedDBResult<(String, EntityId)> {
2775    if let Some(did) = delivery_id {
2776        return resolve_delivery_id(store, queue, did);
2777    }
2778    if group_hint.is_empty() || message_id_hint.is_empty() {
2779        return Err(RedDBError::Query(
2780            "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2781                .to_string(),
2782        ));
2783    }
2784    log_tuple_deprecation(queue);
2785    let entity = parse_message_id(message_id_hint)?;
2786    Ok((group_hint.to_string(), entity))
2787}
2788
2789fn resolve_delivery_id(
2790    store: &UnifiedStore,
2791    queue: &str,
2792    delivery_id: &str,
2793) -> RedDBResult<(String, EntityId)> {
2794    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2795        return Err(RedDBError::Query(format!(
2796            "delivery_id '{}' does not resolve to a live pending delivery",
2797            delivery_id
2798        )));
2799    };
2800    for entity in manager.query_all(|entity| {
2801        entity.data.as_row().is_some_and(|row| {
2802            row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2803                && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2804        })
2805    }) {
2806        if let Some(row) = entity.data.as_row() {
2807            let row_queue = row_text(row, "queue").unwrap_or_default();
2808            let row_group = row_text(row, "group").unwrap_or_default();
2809            let row_message = row_u64(row, "message_id").unwrap_or(0);
2810            if row_queue != queue {
2811                return Err(RedDBError::Query(format!(
2812                    "delivery_id '{}' belongs to queue '{}', not '{}'",
2813                    delivery_id, row_queue, queue
2814                )));
2815            }
2816            return Ok((row_group, EntityId::new(row_message)));
2817        }
2818    }
2819    Err(RedDBError::Query(format!(
2820        "delivery_id '{}' does not resolve to a live pending delivery",
2821        delivery_id
2822    )))
2823}
2824
2825/// Per-(connection, queue) rate-limited "tuple ACK is deprecated" log line.
2826/// One emission per minute matches ADR 0026's operational guidance.
2827fn log_tuple_deprecation(queue: &str) {
2828    use std::sync::atomic::Ordering;
2829    use std::sync::{Mutex, OnceLock};
2830    use std::time::Instant;
2831
2832    static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2833    const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2834
2835    let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2836    let key = (super::impl_core::current_connection_id(), queue.to_string());
2837    let now = Instant::now();
2838    let mut guard = match map.lock() {
2839        Ok(g) => g,
2840        Err(_) => return,
2841    };
2842    let should_emit =
2843        !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2844    if should_emit {
2845        guard.insert(key.clone(), now);
2846        drop(guard);
2847        TUPLE_DEPRECATION_EMITS.fetch_add(1, Ordering::Relaxed);
2848        tracing::warn!(
2849            target: "reddb::queue_lifecycle",
2850            queue = queue,
2851            connection_id = key.0,
2852            "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2853             switch to the server-issued delivery_id (ADR 0026). \
2854             The tuple path will be removed one minor release after introduction.",
2855        );
2856    }
2857}
2858
2859/// Total count of tuple-deprecation log emissions since process start.
2860/// Intentionally process-wide and `pub` so the transport-bridge
2861/// integration tests can observe that the legacy tuple path emitted a
2862/// deprecation while the `delivery_id` path stayed silent, without
2863/// having to plumb a `tracing::Subscriber` through every test.
2864pub static TUPLE_DEPRECATION_EMITS: std::sync::atomic::AtomicU64 =
2865    std::sync::atomic::AtomicU64::new(0);
2866
2867fn message_id_string(message_id: EntityId) -> String {
2868    message_id.raw().to_string()
2869}
2870
2871/// Issue #917 — render a delivered queue message as the JSON object the
2872/// RedWire `QueueEventPush` frame carries. Mirrors the column shape the
2873/// SQL `QUEUE READ` projection emits (`message_id` / `payload` /
2874/// `consumer` / `delivery_count`) so the wire push and the pull path
2875/// stay client-compatible.
2876fn delivered_message_json(
2877    message: crate::runtime::queue_lifecycle::DeliveredMessage,
2878) -> crate::serde_json::Value {
2879    use crate::serde_json::{Map, Value as JsonValue};
2880    let mut obj = Map::new();
2881    obj.insert(
2882        "message_id".to_string(),
2883        JsonValue::String(message_id_string(EntityId::new(message.message_id))),
2884    );
2885    obj.insert(
2886        "payload".to_string(),
2887        crate::presentation::entity_json::storage_value_to_json(&message.payload),
2888    );
2889    obj.insert("consumer".to_string(), JsonValue::String(message.consumer));
2890    obj.insert(
2891        "delivery_count".to_string(),
2892        JsonValue::Number(message.delivery_count as f64),
2893    );
2894    JsonValue::Object(obj)
2895}
2896
2897/// Slice 10 of issue #527 — render-time scan of pending entries
2898/// per (queue, group) for `queue_pending_gauge` exposition. Walks
2899/// `red_queue_meta` live so the gauge cannot drift from the source
2900/// of truth.
2901pub(crate) fn pending_counts_by_group(
2902    store: &UnifiedStore,
2903) -> std::collections::BTreeMap<(String, String), u64> {
2904    let mut counts: std::collections::BTreeMap<(String, String), u64> =
2905        std::collections::BTreeMap::new();
2906    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2907        return counts;
2908    };
2909    for entity in manager.query_all(|entity| {
2910        entity
2911            .data
2912            .as_row()
2913            .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2914    }) {
2915        if let Some(row) = entity.data.as_row() {
2916            let queue = row_text(row, "queue");
2917            let group = row_text(row, "group");
2918            if let (Some(q), Some(g)) = (queue, group) {
2919                *counts.entry((q, g)).or_insert(0) += 1;
2920            }
2921        }
2922    }
2923    counts
2924}
2925
2926pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2927    match row.get_field(field)?.clone() {
2928        Value::Text(value) => Some(value.to_string()),
2929        Value::NodeRef(value) => Some(value),
2930        Value::EdgeRef(value) => Some(value),
2931        Value::TableRef(value) => Some(value),
2932        _ => None,
2933    }
2934}
2935
2936pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2937    match row.get_field(field)?.clone() {
2938        Value::UnsignedInteger(value) => Some(value),
2939        Value::Integer(value) if value >= 0 => Some(value as u64),
2940        Value::Float(value) if value >= 0.0 => Some(value as u64),
2941        Value::Text(value) => value.parse().ok(),
2942        _ => None,
2943    }
2944}
2945
2946fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2947    match row.get_field(field)?.clone() {
2948        Value::Boolean(value) => Some(value),
2949        Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2950            "true" => Some(true),
2951            "false" => Some(false),
2952            _ => None,
2953        },
2954        _ => None,
2955    }
2956}
2957
2958fn queue_collection_contract(
2959    name: &str,
2960    priority: bool,
2961    ttl_ms: Option<u64>,
2962) -> crate::physical::CollectionContract {
2963    let now = current_unix_ms();
2964    let mut context_index_fields = Vec::new();
2965    if priority {
2966        context_index_fields.push("priority".to_string());
2967    }
2968
2969    crate::physical::CollectionContract {
2970        name: name.to_string(),
2971        declared_model: crate::catalog::CollectionModel::Queue,
2972        schema_mode: crate::catalog::SchemaMode::Dynamic,
2973        origin: crate::physical::ContractOrigin::Explicit,
2974        version: 1,
2975        created_at_unix_ms: now,
2976        updated_at_unix_ms: now,
2977        default_ttl_ms: ttl_ms,
2978        vector_dimension: None,
2979        vector_metric: None,
2980        context_index_fields,
2981        declared_columns: Vec::new(),
2982        table_def: None,
2983        timestamps_enabled: false,
2984        context_index_enabled: false,
2985        metrics_raw_retention_ms: None,
2986        metrics_rollup_policies: Vec::new(),
2987        metrics_tenant_identity: None,
2988        metrics_namespace: None,
2989        // Queues manipulate messages via push/pop/ack — the row DML
2990        // paths never apply. Flag it as append_only so inadvertent
2991        // `UPDATE/DELETE FROM queue_name` statements fail loudly.
2992        append_only: true,
2993        subscriptions: Vec::new(),
2994        analytics_config: Vec::new(),
2995        session_key: None,
2996        session_gap_ms: None,
2997        retention_duration_ms: None,
2998        analytical_storage: None,
2999    }
3000}
3001
3002fn current_unix_ms() -> u128 {
3003    std::time::SystemTime::now()
3004        .duration_since(std::time::UNIX_EPOCH)
3005        .unwrap_or_default()
3006        .as_millis()
3007}
3008
3009pub(super) fn now_ns() -> u64 {
3010    std::time::SystemTime::now()
3011        .duration_since(std::time::UNIX_EPOCH)
3012        .unwrap_or_default()
3013        .as_nanos() as u64
3014}
3015
3016pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
3017    queue_message_metadata(Some(ttl_ms), None)
3018}
3019
3020/// Build the per-message metadata row attached to a queue message. Both
3021/// fields are optional — `_ttl_ms` carries the per-message TTL (existing
3022/// behaviour) and `_available_at_ns` carries the first-delivery instant
3023/// for delayed messages (issue #722). When both are present they share
3024/// the same row, since `UnifiedStore::set_metadata` replaces the entry.
3025pub(super) fn queue_message_metadata(
3026    ttl_ms: Option<u64>,
3027    available_at_ns: Option<u64>,
3028) -> Metadata {
3029    let mut fields = HashMap::new();
3030    if let Some(ttl_ms) = ttl_ms {
3031        fields.insert(
3032            "_ttl_ms".to_string(),
3033            if ttl_ms <= i64::MAX as u64 {
3034                MetadataValue::Int(ttl_ms as i64)
3035            } else {
3036                MetadataValue::Timestamp(ttl_ms)
3037            },
3038        );
3039    }
3040    if let Some(at_ns) = available_at_ns {
3041        fields.insert(
3042            "_available_at_ns".to_string(),
3043            MetadataValue::Timestamp(at_ns),
3044        );
3045    }
3046    Metadata::with_fields(fields)
3047}
3048
3049/// Smallest future `available_at_ns` among messages currently sitting
3050/// on `queue`. Used by `QUEUE READ … WAIT` (issue #722) to cap the
3051/// condvar park horizon: without this, a waiter on a quiet queue with
3052/// only delayed messages would never wake when one became due, since
3053/// the wait registry is only notified by producer commits. Returns
3054/// `None` when no future-dated message exists (the common case — the
3055/// caller falls back to the user-supplied wait budget).
3056pub(super) fn earliest_future_available_at(store: &UnifiedStore, queue: &str) -> Option<u64> {
3057    let now_ns = now_ns();
3058    let views = load_queue_message_views_with_runtime(None, store, queue).ok()?;
3059    views
3060        .iter()
3061        .filter_map(|v| v.available_at_ns)
3062        .filter(|at| *at > now_ns)
3063        .min()
3064}
3065
3066/// Update the `_available_at_ns` metadata for a queue message in place
3067/// without dropping any `_ttl_ms` already present (issue #723 — used by
3068/// NACK retry delay). `available_at_ns = None` clears the field. The
3069/// `fallback_ttl_ms` argument is consulted only when the existing
3070/// metadata row carries no `_ttl_ms` — keeps the per-message TTL in
3071/// sync with the queue default in the common case where no per-message
3072/// TTL was set explicitly. Tolerant of missing collections / entities:
3073/// returns `Ok(())` rather than failing the surrounding NACK if the
3074/// underlying message has gone away between resolution and metadata
3075/// update (a benign race; the next delivery cycle reflects truth).
3076pub(super) fn set_message_available_at_ns(
3077    store: &UnifiedStore,
3078    queue: &str,
3079    message_id: EntityId,
3080    available_at_ns: Option<u64>,
3081    fallback_ttl_ms: Option<u64>,
3082) -> RedDBResult<()> {
3083    let existing_ttl_ms = store
3084        .get_metadata(queue, message_id)
3085        .and_then(|md| match md.get("_ttl_ms")? {
3086            MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3087            MetadataValue::Timestamp(t) => Some(*t),
3088            _ => None,
3089        })
3090        .or(fallback_ttl_ms);
3091    let metadata = queue_message_metadata(existing_ttl_ms, available_at_ns);
3092    match store.set_metadata(queue, message_id, metadata) {
3093        Ok(()) => Ok(()),
3094        Err(crate::storage::StoreError::CollectionNotFound(_)) => Ok(()),
3095        Err(err) => Err(RedDBError::Internal(err.to_string())),
3096    }
3097}
3098
3099/// Read the `_available_at_ns` metadata for a queue message. Returns
3100/// `None` for messages with no delay (the common case) or when the
3101/// metadata row is missing entirely.
3102pub(super) fn read_message_available_at_ns(
3103    store: &UnifiedStore,
3104    queue: &str,
3105    message_id: EntityId,
3106) -> Option<u64> {
3107    let md = store.get_metadata(queue, message_id)?;
3108    match md.get("_available_at_ns")? {
3109        MetadataValue::Timestamp(t) => Some(*t),
3110        MetadataValue::Int(i) if *i >= 0 => Some(*i as u64),
3111        _ => None,
3112    }
3113}
3114
3115/// Rough payload byte estimate for outbox watermark tracking.
3116fn estimate_payload_bytes(payload: &Value) -> u64 {
3117    match payload {
3118        Value::Json(v) => v.len() as u64,
3119        Value::Text(s) => s.len() as u64,
3120        _ => 64,
3121    }
3122}
3123
3124#[cfg(test)]
3125mod presence_integration_tests {
3126    use super::*;
3127    use crate::storage::queue::presence::{PresenceState, DEFAULT_PRESENCE_TTL_MS};
3128    use crate::{RedDBOptions, RedDBRuntime};
3129
3130    /// Issue #742 acceptance: `QUEUE READ` must register and refresh
3131    /// consumer presence. The snapshot exposes the same `(queue, group,
3132    /// consumer)` triple the read named, with `state = Active`.
3133    #[test]
3134    fn queue_read_emits_consumer_presence_heartbeat() {
3135        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3136        rt.execute_query("CREATE QUEUE tasks").unwrap();
3137        rt.execute_query("QUEUE GROUP CREATE tasks workers")
3138            .unwrap();
3139        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
3140        rt.execute_query("QUEUE READ tasks GROUP workers CONSUMER w1")
3141            .unwrap();
3142
3143        let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3144        assert_eq!(snap.len(), 1, "exactly one heartbeat recorded");
3145        let row = &snap[0];
3146        assert_eq!(row.queue, "tasks");
3147        assert_eq!(row.group, "workers");
3148        assert_eq!(row.consumer, "w1");
3149        assert_eq!(row.state, PresenceState::Active);
3150
3151        let counts = rt.queue_active_consumer_counts(DEFAULT_PRESENCE_TTL_MS);
3152        assert_eq!(
3153            counts[&("tasks".to_string(), "workers".to_string())],
3154            1,
3155            "active count reflects the live consumer"
3156        );
3157    }
3158
3159    /// Issue #742 acceptance: a read that returns no messages must
3160    /// still heartbeat — aliveness is independent of pending
3161    /// deliveries.
3162    #[test]
3163    fn empty_queue_read_still_heartbeats() {
3164        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3165        rt.execute_query("CREATE QUEUE empty_q").unwrap();
3166        rt.execute_query("QUEUE GROUP CREATE empty_q workers")
3167            .unwrap();
3168        // No PUSH — the queue is empty.
3169        rt.execute_query("QUEUE READ empty_q GROUP workers CONSUMER w1")
3170            .unwrap();
3171
3172        let snap = rt.queue_consumer_presence_snapshot(DEFAULT_PRESENCE_TTL_MS);
3173        assert_eq!(
3174            snap.len(),
3175            1,
3176            "empty read still registers consumer presence"
3177        );
3178        assert_eq!(snap[0].state, PresenceState::Active);
3179        assert_eq!(
3180            snap[0].lease_count, 0,
3181            "no messages delivered → zero leases"
3182        );
3183    }
3184}