Skip to main content

reddb_server/runtime/
impl_queue.rs

1//! Queue DDL and command execution
2
3use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::storage::queue::QueueMode;
9use crate::storage::unified::entity::{QueueMessageData, RowData};
10use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
11
12use super::*;
13
14use super::primary_queue_store::PrimaryQueueStore;
15use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
16use crate::storage::queue::lifecycle::{
17    QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
18};
19
20/// Build a [`QueueLifecycle`] backed by a fresh [`PrimaryQueueStore`] for
21/// the given `queue`, plus a `QueueTxn` bound to the live runtime
22/// connection. The store inside the lifecycle and the standalone
23/// [`PrimaryQueueStore`] returned for ack/nack lookups share the same
24/// underlying [`UnifiedStore`] — calls against either are observable on
25/// the other.
26pub(super) fn runtime_lifecycle(
27    runtime: &RedDBRuntime,
28    queue: &str,
29) -> (
30    QueueLifecycle<PrimaryQueueStore>,
31    PrimaryQueueStore,
32    QueueTxn,
33) {
34    let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
35    let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
36    let txn = primary_for_lifecycle.new_txn();
37    let cfg = primary_for_lifecycle.lifecycle_config(queue);
38    (
39        QueueLifecycle::new(primary_for_lifecycle, cfg),
40        primary_for_lookup,
41        txn,
42    )
43}
44
45/// Convert a lifecycle `QueueSide` view into the AST flavour we accept
46/// from `QueueCommand` callers. Both enums are isomorphic but live in
47/// different modules.
48fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
49    use crate::storage::query::ast::QueueSide as Ast;
50    match side {
51        Ast::Left => LcQueueSide::Left,
52        Ast::Right => LcQueueSide::Right,
53    }
54}
55
56/// Map a `QueueStoreError` (returned by lifecycle methods) onto the
57/// runtime-facing `RedDBError`. Mirrors the wire-error shapes the legacy
58/// `queue_delivery::*` helpers produced.
59fn map_qse(err: QueueStoreError) -> RedDBError {
60    match err {
61        QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
62            "delivery_id '{id}' does not resolve to a live pending delivery"
63        )),
64        QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
65        QueueStoreError::ReplicaImmutable => {
66            RedDBError::Internal("replica QueueStore is immutable".to_string())
67        }
68    }
69}
70
71// ---------------------------------------------------------------------------
72// Outbox metrics (exposed via /metrics)
73// ---------------------------------------------------------------------------
74
75/// Total event push attempts that failed (queue full or other error) and
76/// triggered DLQ routing.
77pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
78
79/// Total events routed to the dead-letter queue.
80pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
81
82/// Total events successfully enqueued to their target queue.
83pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
84
85/// Warn when total estimated outbox payload bytes exceed this value (1 GiB).
86const OUTBOX_WARN_BYTES: u64 = 1 << 30;
87
88/// Route all new events to DLQ when estimated outbox exceeds this value (10 GiB).
89const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
90
91/// Running estimate of bytes pending in event queues (approximate; not decremented on consume).
92static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
93
94const QUEUE_META_COLLECTION: &str = "red_queue_meta";
95const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
96const WORK_DEFAULT_GROUP: &str = "_work_default";
97const FANOUT_GROUP_PREFIX: &str = "_fanout_";
98
99#[derive(Debug, Clone)]
100pub(super) struct QueueRuntimeConfig {
101    pub(super) mode: QueueMode,
102    pub(super) priority: bool,
103    pub(super) max_size: Option<usize>,
104    pub(super) ttl_ms: Option<u64>,
105    pub(super) dlq: Option<String>,
106    pub(super) max_attempts: u32,
107    pub(super) lock_deadline_ms: u64,
108    pub(super) in_flight_cap_per_group: u32,
109}
110
111#[derive(Debug, Clone)]
112struct QueueGroupEntry {
113    entity_id: EntityId,
114    group: String,
115}
116
117#[derive(Debug, Clone)]
118pub(super) struct QueuePendingEntry {
119    pub(super) entity_id: EntityId,
120    group: String,
121    pub(super) message_id: EntityId,
122    consumer: String,
123    pub(super) delivered_at_ns: u64,
124    pub(super) delivery_count: u32,
125}
126
127#[derive(Debug, Clone)]
128pub(super) struct QueueAckEntry {
129    entity_id: EntityId,
130    group: String,
131    pub(super) message_id: EntityId,
132}
133
134#[derive(Debug, Clone)]
135pub(super) struct QueueMessageView {
136    pub(super) id: EntityId,
137    position: u64,
138    priority: i32,
139    pub(super) payload: Value,
140    attempts: u32,
141    pub(super) max_attempts: u32,
142    enqueued_at_ns: u64,
143}
144
145impl RedDBRuntime {
146    pub(crate) fn enqueue_event_payload(
147        &self,
148        queue: &str,
149        payload: Value,
150    ) -> RedDBResult<EntityId> {
151        let store = self.inner.db.store();
152        // Auto-create the queue if it does not exist yet.
153        if store.get_collection(queue).is_none() {
154            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
155        }
156
157        // Estimate payload bytes for outbox watermark checks.
158        let payload_bytes = estimate_payload_bytes(&payload);
159        let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
160
161        // Hard limit: route directly to DLQ without even trying.
162        if outbox_bytes > OUTBOX_MAX_BYTES {
163            OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
164            EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
165            return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
166        }
167
168        // Soft limit: warn once per crossing.
169        if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
170            tracing::warn!(
171                outbox_bytes,
172                warn_threshold = OUTBOX_WARN_BYTES,
173                "event outbox approaching capacity warning threshold"
174            );
175            crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
176                queue: queue.to_string(),
177                dlq: format!("{queue}_outbox_dlq"),
178                reason: "outbox_warn_bytes_exceeded".to_string(),
179            }
180            .emit_global();
181        }
182
183        let config = load_queue_config(store.as_ref(), queue);
184
185        // If the target queue has a max_size and is full, route to DLQ.
186        if let Some(max_size) = config.max_size {
187            let current_len = load_queue_message_views(store.as_ref(), queue)
188                .unwrap_or_default()
189                .len();
190            if current_len >= max_size {
191                OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
192                EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
193                return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
194            }
195            // Warn at 80% capacity.
196            if current_len * 10 >= max_size * 8 {
197                tracing::warn!(
198                    queue = %queue,
199                    size = current_len,
200                    max = max_size,
201                    "event target queue near capacity"
202                );
203            }
204        }
205
206        let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
207        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
208        Ok(id)
209    }
210
211    /// Route a failed event to `<queue>_outbox_dlq`, auto-creating it if needed.
212    fn route_event_to_outbox_dlq(
213        &self,
214        queue: &str,
215        payload: Value,
216        reason: &str,
217    ) -> RedDBResult<EntityId> {
218        let dlq_name = format!("{queue}_outbox_dlq");
219        EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
220
221        crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
222            queue: queue.to_string(),
223            dlq: dlq_name.clone(),
224            reason: reason.to_string(),
225        }
226        .emit_global();
227
228        let store = self.inner.db.store();
229        if store.get_collection(&dlq_name).is_none() {
230            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
231        }
232        let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
233        let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
234        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
235        Ok(id)
236    }
237
238    /// Low-level event message insert — no size checks, no DLQ routing.
239    fn enqueue_event_payload_raw(
240        &self,
241        store: &UnifiedStore,
242        queue: &str,
243        config: &QueueRuntimeConfig,
244        payload: Value,
245    ) -> RedDBResult<EntityId> {
246        let position = next_queue_position(store, queue, QueueSide::Right)?;
247        let mut entity = UnifiedEntity::new(
248            EntityId::new(0),
249            EntityKind::QueueMessage {
250                queue: queue.to_string(),
251                position,
252            },
253            EntityData::QueueMessage(QueueMessageData {
254                payload,
255                priority: None,
256                enqueued_at_ns: now_ns(),
257                attempts: 0,
258                max_attempts: config.max_attempts,
259                acked: false,
260            }),
261        );
262        if let Some(xid) = self.current_xid() {
263            entity.set_xmin(xid);
264        }
265        let id = store
266            .insert_auto(queue, entity)
267            .map_err(|err| RedDBError::Internal(err.to_string()))?;
268        if let Some(ttl_ms) = config.ttl_ms {
269            store
270                .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
271                .map_err(|err| RedDBError::Internal(err.to_string()))?;
272        }
273        self.invalidate_result_cache_for_table(queue);
274        Ok(id)
275    }
276
277    pub fn execute_create_queue(
278        &self,
279        raw_query: &str,
280        query: &CreateQueueQuery,
281    ) -> RedDBResult<RuntimeQueryResult> {
282        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
283        if query.dlq.as_deref() == Some(query.name.as_str()) {
284            return Err(RedDBError::Query(
285                "dead-letter queue must be different from the source queue".to_string(),
286            ));
287        }
288
289        let store = self.inner.db.store();
290        let exists = store.get_collection(&query.name).is_some();
291        if exists {
292            if query.if_not_exists {
293                return Ok(RuntimeQueryResult::ok_message(
294                    raw_query.to_string(),
295                    &format!("queue '{}' already exists", query.name),
296                    "create",
297                ));
298            }
299            return Err(RedDBError::Query(format!(
300                "queue '{}' already exists",
301                query.name
302            )));
303        }
304
305        store
306            .create_collection(&query.name)
307            .map_err(|err| RedDBError::Internal(err.to_string()))?;
308        if let Some(ttl_ms) = query.ttl_ms {
309            self.inner
310                .db
311                .set_collection_default_ttl_ms(&query.name, ttl_ms);
312        }
313        self.inner
314            .db
315            .save_collection_contract(queue_collection_contract(
316                &query.name,
317                query.priority,
318                query.ttl_ms,
319            ))
320            .map_err(|err| RedDBError::Internal(err.to_string()))?;
321        save_queue_config(
322            store.as_ref(),
323            &query.name,
324            &QueueRuntimeConfig {
325                mode: query.mode,
326                priority: query.priority,
327                max_size: query.max_size,
328                ttl_ms: query.ttl_ms,
329                dlq: query.dlq.clone(),
330                max_attempts: query.max_attempts,
331                lock_deadline_ms: query.lock_deadline_ms,
332                in_flight_cap_per_group: query.in_flight_cap_per_group,
333            },
334        )?;
335
336        if let Some(dlq) = &query.dlq {
337            if store.get_collection(dlq).is_none() {
338                store
339                    .create_collection(dlq)
340                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
341                self.inner
342                    .db
343                    .save_collection_contract(queue_collection_contract(dlq, false, None))
344                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
345            }
346        }
347
348        self.invalidate_result_cache();
349        self.inner
350            .db
351            .persist_metadata()
352            .map_err(|err| RedDBError::Internal(err.to_string()))?;
353        // Issue #120 — feed the queue into the schema-vocabulary so
354        // AskPipeline (#121) can resolve queue references. Queues
355        // have an opaque payload column, so we expose `payload` and
356        // (when configured) the DLQ partner as type-tag context.
357        let mut type_tags = Vec::new();
358        if let Some(dlq) = &query.dlq {
359            type_tags.push(format!("dlq:{}", dlq));
360        }
361        self.schema_vocabulary_apply(
362            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
363                collection: query.name.clone(),
364                columns: vec!["payload".to_string()],
365                type_tags,
366                description: None,
367            },
368        );
369
370        let mut msg = format!("queue '{}' created", query.name);
371        msg.push_str(&format!(" (mode={})", query.mode.as_str()));
372        if query.priority {
373            msg.push_str(" (priority)");
374        }
375        if let Some(max_size) = query.max_size {
376            msg.push_str(&format!(" (max_size={max_size})"));
377        }
378        if let Some(ttl_ms) = query.ttl_ms {
379            msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
380        }
381        if let Some(dlq) = &query.dlq {
382            msg.push_str(&format!(
383                " (dlq={dlq}, max_attempts={})",
384                query.max_attempts
385            ));
386        }
387
388        Ok(RuntimeQueryResult::ok_message(
389            raw_query.to_string(),
390            &msg,
391            "create",
392        ))
393    }
394
395    pub fn execute_alter_queue(
396        &self,
397        raw_query: &str,
398        query: &AlterQueueQuery,
399    ) -> RedDBResult<RuntimeQueryResult> {
400        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
401        let store = self.inner.db.store();
402        ensure_queue_exists(store.as_ref(), &query.name)?;
403
404        let mut config = load_queue_config(store.as_ref(), &query.name);
405        let mut summary: Vec<String> = Vec::new();
406
407        if let Some(new_mode) = query.mode {
408            let pending =
409                load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
410            if !pending.is_empty() {
411                tracing::warn!(
412                    queue = %query.name,
413                    pending_count = pending.len(),
414                    new_mode = %new_mode.as_str(),
415                    "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
416                     new reads use {}",
417                    pending.len(),
418                    new_mode.as_str(),
419                );
420            }
421            config.mode = new_mode;
422            summary.push(format!("mode={}", new_mode.as_str()));
423        }
424        if let Some(max_attempts) = query.max_attempts {
425            config.max_attempts = max_attempts;
426            summary.push(format!("max_attempts={max_attempts}"));
427        }
428        if let Some(lock_deadline_ms) = query.lock_deadline_ms {
429            config.lock_deadline_ms = lock_deadline_ms;
430            summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
431        }
432        if let Some(in_flight_cap) = query.in_flight_cap_per_group {
433            config.in_flight_cap_per_group = in_flight_cap;
434            summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
435        }
436        if let Some(dlq) = &query.dlq {
437            if dlq == &query.name {
438                return Err(RedDBError::Query(
439                    "dead-letter queue must be different from the source queue".to_string(),
440                ));
441            }
442            config.dlq = Some(dlq.clone());
443            summary.push(format!("dlq={dlq}"));
444        }
445
446        save_queue_config(store.as_ref(), &query.name, &config)?;
447
448        self.invalidate_result_cache();
449        self.inner
450            .db
451            .persist_metadata()
452            .map_err(|err| RedDBError::Internal(err.to_string()))?;
453
454        Ok(RuntimeQueryResult::ok_message(
455            raw_query.to_string(),
456            &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
457            "alter",
458        ))
459    }
460
461    pub fn execute_drop_queue(
462        &self,
463        raw_query: &str,
464        query: &DropQueueQuery,
465    ) -> RedDBResult<RuntimeQueryResult> {
466        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
467        let store = self.inner.db.store();
468        if super::impl_ddl::is_system_schema_name(&query.name) {
469            return Err(RedDBError::Query("system schema is read-only".to_string()));
470        }
471        if store.get_collection(&query.name).is_none() {
472            if query.if_exists {
473                return Ok(RuntimeQueryResult::ok_message(
474                    raw_query.to_string(),
475                    &format!("queue '{}' does not exist", query.name),
476                    "drop",
477                ));
478            }
479            return Err(RedDBError::NotFound(format!(
480                "queue '{}' not found",
481                query.name
482            )));
483        }
484        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
485            &query.name,
486            &self.inner.db.catalog_model_snapshot(),
487        )?;
488        crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
489            crate::catalog::CollectionModel::Queue,
490            actual,
491        )?;
492
493        store
494            .drop_collection(&query.name)
495            .map_err(|err| RedDBError::Internal(err.to_string()))?;
496        self.inner.db.clear_collection_default_ttl_ms(&query.name);
497        self.inner
498            .db
499            .remove_collection_contract(&query.name)
500            .map_err(|err| RedDBError::Internal(err.to_string()))?;
501        remove_queue_metadata(store.as_ref(), &query.name);
502        self.invalidate_result_cache();
503        self.inner
504            .db
505            .persist_metadata()
506            .map_err(|err| RedDBError::Internal(err.to_string()))?;
507        // Issue #120 — invalidate the schema-vocabulary entry.
508        self.schema_vocabulary_apply(
509            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
510                collection: query.name.clone(),
511            },
512        );
513
514        Ok(RuntimeQueryResult::ok_message(
515            raw_query.to_string(),
516            &format!("queue '{}' dropped", query.name),
517            "drop",
518        ))
519    }
520
521    pub fn execute_queue_command(
522        &self,
523        raw_query: &str,
524        cmd: &QueueCommand,
525    ) -> RedDBResult<RuntimeQueryResult> {
526        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
527        match cmd {
528            QueueCommand::Push {
529                queue,
530                value,
531                side,
532                priority,
533            } => {
534                let store = self.inner.db.store();
535                ensure_queue_exists(store.as_ref(), queue)?;
536                let config = load_queue_config(store.as_ref(), queue);
537                if priority.is_some() && !config.priority {
538                    return Err(RedDBError::Query(format!(
539                        "queue '{}' is not a priority queue",
540                        queue
541                    )));
542                }
543                if let Some(max_size) = config.max_size {
544                    let current_len =
545                        load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
546                            .len();
547                    if current_len >= max_size {
548                        return Err(RedDBError::Query(format!(
549                            "queue '{}' is full (max_size={max_size})",
550                            queue
551                        )));
552                    }
553                }
554
555                let position = next_queue_position(store.as_ref(), queue, *side)?;
556                let mut entity = UnifiedEntity::new(
557                    EntityId::new(0),
558                    EntityKind::QueueMessage {
559                        queue: queue.clone(),
560                        position,
561                    },
562                    EntityData::QueueMessage(QueueMessageData {
563                        payload: value.clone(),
564                        priority: if config.priority { *priority } else { None },
565                        enqueued_at_ns: now_ns(),
566                        attempts: 0,
567                        max_attempts: config.max_attempts,
568                        acked: false,
569                    }),
570                );
571                // Phase 1.1 MVCC universal: stamp xmin so other
572                // connections don't see this message until COMMIT.
573                if let Some(xid) = self.current_xid() {
574                    entity.set_xmin(xid);
575                }
576                let id = store
577                    .insert_auto(queue, entity)
578                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
579                if let Some(ttl_ms) = config.ttl_ms {
580                    store
581                        .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
582                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
583                }
584                self.invalidate_result_cache();
585
586                let mut result = UnifiedResult::with_columns(vec![
587                    "message_id".into(),
588                    "side".into(),
589                    "queue".into(),
590                ]);
591                let mut record = UnifiedRecord::new();
592                record.set("message_id", Value::text(message_id_string(id)));
593                record.set(
594                    "side",
595                    Value::text(match side {
596                        QueueSide::Left => "left".to_string(),
597                        QueueSide::Right => "right".to_string(),
598                    }),
599                );
600                record.set("queue", Value::text(queue.clone()));
601                result.push(record);
602
603                Ok(RuntimeQueryResult {
604                    query: raw_query.to_string(),
605                    mode: QueryMode::Sql,
606                    statement: "queue_push",
607                    engine: "runtime-queue",
608                    result,
609                    affected_rows: 1,
610                    statement_type: "insert",
611                })
612            }
613            QueueCommand::Pop { queue, side, count } => {
614                let store = self.inner.db.store();
615                ensure_queue_exists(store.as_ref(), queue)?;
616                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
617                let popped = lifecycle
618                    .pop(queue, ast_side_to_lc(*side), *count, &txn)
619                    .map_err(map_qse)?;
620
621                let mut result =
622                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
623                for (message_id, payload) in &popped {
624                    let mut record = UnifiedRecord::new();
625                    record.set(
626                        "message_id",
627                        Value::text(message_id_string(EntityId::new(*message_id))),
628                    );
629                    record.set("payload", payload.clone());
630                    result.push(record);
631                }
632                let popped_count = popped.len() as u64;
633                if popped_count > 0 {
634                    self.invalidate_result_cache();
635                }
636
637                Ok(RuntimeQueryResult {
638                    query: raw_query.to_string(),
639                    mode: QueryMode::Sql,
640                    statement: "queue_pop",
641                    engine: "runtime-queue",
642                    result,
643                    affected_rows: popped_count,
644                    statement_type: "delete",
645                })
646            }
647            QueueCommand::Peek { queue, count } => {
648                let store = self.inner.db.store();
649                ensure_queue_exists(store.as_ref(), queue)?;
650                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
651                let messages = lifecycle.peek(queue, *count, &txn);
652
653                let mut result =
654                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
655                for message in messages {
656                    let mut record = UnifiedRecord::new();
657                    record.set(
658                        "message_id",
659                        Value::text(message_id_string(EntityId::new(message.message_id))),
660                    );
661                    record.set("payload", message.payload);
662                    result.push(record);
663                }
664
665                Ok(RuntimeQueryResult {
666                    query: raw_query.to_string(),
667                    mode: QueryMode::Sql,
668                    statement: "queue_peek",
669                    engine: "runtime-queue",
670                    result,
671                    affected_rows: 0,
672                    statement_type: "select",
673                })
674            }
675            QueueCommand::Len { queue } => {
676                let store = self.inner.db.store();
677                ensure_queue_exists(store.as_ref(), queue)?;
678                let count =
679                    load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
680                        as u64;
681                let mut result = UnifiedResult::with_columns(vec!["len".into()]);
682                let mut record = UnifiedRecord::new();
683                record.set("len", Value::UnsignedInteger(count));
684                result.push(record);
685
686                Ok(RuntimeQueryResult {
687                    query: raw_query.to_string(),
688                    mode: QueryMode::Sql,
689                    statement: "queue_len",
690                    engine: "runtime-queue",
691                    result,
692                    affected_rows: 0,
693                    statement_type: "select",
694                })
695            }
696            QueueCommand::Purge { queue } => {
697                let store = self.inner.db.store();
698                ensure_queue_exists(store.as_ref(), queue)?;
699                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
700                let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
701                if count > 0 {
702                    self.invalidate_result_cache();
703                }
704
705                Ok(RuntimeQueryResult::ok_message(
706                    raw_query.to_string(),
707                    &format!("{count} messages purged from queue '{queue}'"),
708                    "delete",
709                ))
710            }
711            QueueCommand::GroupCreate { queue, group } => {
712                let store = self.inner.db.store();
713                ensure_queue_exists(store.as_ref(), queue)?;
714                if queue_group_exists(store.as_ref(), queue, group)? {
715                    return Ok(RuntimeQueryResult::ok_message(
716                        raw_query.to_string(),
717                        &format!(
718                            "consumer group '{}' already exists on queue '{}'",
719                            group, queue
720                        ),
721                        "create",
722                    ));
723                }
724                save_queue_group(store.as_ref(), queue, group)?;
725                self.invalidate_result_cache();
726
727                Ok(RuntimeQueryResult::ok_message(
728                    raw_query.to_string(),
729                    &format!("consumer group '{}' created on queue '{}'", group, queue),
730                    "create",
731                ))
732            }
733            QueueCommand::GroupRead {
734                queue,
735                group,
736                consumer,
737                count,
738            } => {
739                let store = self.inner.db.store();
740                ensure_queue_exists(store.as_ref(), queue)?;
741                // Resolve the consumer group up-front so the lifecycle
742                // sees the same auto-created `_work_default` / fanout
743                // group the legacy `read_messages` would have minted.
744                let config = load_queue_config(store.as_ref(), queue);
745                let group_owned =
746                    resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
747                let group_ref = group_owned.as_str();
748                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
749                let delivered = lifecycle
750                    .group_read(&txn, queue, group_ref, consumer, *count)
751                    .map_err(map_qse)?;
752
753                let mut result = UnifiedResult::with_columns(vec![
754                    "message_id".into(),
755                    "payload".into(),
756                    "consumer".into(),
757                    "delivery_count".into(),
758                    "attempts".into(),
759                ]);
760
761                for message in delivered {
762                    let mut record = UnifiedRecord::new();
763                    record.set(
764                        "message_id",
765                        Value::text(message_id_string(EntityId::new(message.message_id))),
766                    );
767                    record.set("payload", message.payload);
768                    record.set("consumer", Value::text(message.consumer));
769                    record.set(
770                        "delivery_count",
771                        Value::UnsignedInteger(u64::from(message.delivery_count)),
772                    );
773                    record.set(
774                        "attempts",
775                        Value::UnsignedInteger(u64::from(message.delivery_count)),
776                    );
777                    result.push(record);
778                }
779                if !result.records.is_empty() {
780                    self.invalidate_result_cache();
781                }
782
783                Ok(RuntimeQueryResult {
784                    query: raw_query.to_string(),
785                    mode: QueryMode::Sql,
786                    statement: "queue_group_read",
787                    engine: "runtime-queue",
788                    result,
789                    affected_rows: 0,
790                    statement_type: "select",
791                })
792            }
793            QueueCommand::Pending { queue, group } => {
794                let store = self.inner.db.store();
795                ensure_queue_exists(store.as_ref(), queue)?;
796                require_queue_group(store.as_ref(), queue, group)?;
797                let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
798                pending.sort_by_key(|entry| entry.delivered_at_ns);
799                let current_time_ns = now_ns();
800
801                let mut result = UnifiedResult::with_columns(vec![
802                    "message_id".into(),
803                    "consumer".into(),
804                    "delivered_at_ns".into(),
805                    "delivery_count".into(),
806                    "idle_ms".into(),
807                ]);
808                for entry in pending {
809                    let mut record = UnifiedRecord::new();
810                    record.set(
811                        "message_id",
812                        Value::text(message_id_string(entry.message_id)),
813                    );
814                    record.set("consumer", Value::text(entry.consumer));
815                    record.set(
816                        "delivered_at_ns",
817                        Value::UnsignedInteger(entry.delivered_at_ns),
818                    );
819                    record.set(
820                        "delivery_count",
821                        Value::UnsignedInteger(u64::from(entry.delivery_count)),
822                    );
823                    record.set(
824                        "idle_ms",
825                        Value::UnsignedInteger(
826                            current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
827                        ),
828                    );
829                    result.push(record);
830                }
831
832                Ok(RuntimeQueryResult {
833                    query: raw_query.to_string(),
834                    mode: QueryMode::Sql,
835                    statement: "queue_pending",
836                    engine: "runtime-queue",
837                    result,
838                    affected_rows: 0,
839                    statement_type: "select",
840                })
841            }
842            QueueCommand::Claim {
843                queue,
844                group,
845                consumer,
846                min_idle_ms,
847            } => {
848                let store = self.inner.db.store();
849                ensure_queue_exists(store.as_ref(), queue)?;
850                require_queue_group(store.as_ref(), queue, group)?;
851                let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
852                let delivered = lifecycle
853                    .claim_delivering(queue, consumer, *min_idle_ms, &txn)
854                    .map_err(map_qse)?;
855
856                let mut result = UnifiedResult::with_columns(vec![
857                    "message_id".into(),
858                    "payload".into(),
859                    "consumer".into(),
860                    "delivery_count".into(),
861                ]);
862
863                for message in delivered {
864                    let mut record = UnifiedRecord::new();
865                    record.set(
866                        "message_id",
867                        Value::text(message_id_string(EntityId::new(message.message_id))),
868                    );
869                    record.set("payload", message.payload);
870                    record.set("consumer", Value::text(message.consumer));
871                    record.set(
872                        "delivery_count",
873                        Value::UnsignedInteger(u64::from(message.delivery_count)),
874                    );
875                    result.push(record);
876                }
877                if !result.records.is_empty() {
878                    self.invalidate_result_cache();
879                }
880                let affected_rows = result.records.len() as u64;
881
882                Ok(RuntimeQueryResult {
883                    query: raw_query.to_string(),
884                    mode: QueryMode::Sql,
885                    statement: "queue_claim",
886                    engine: "runtime-queue",
887                    result,
888                    affected_rows,
889                    statement_type: "update",
890                })
891            }
892            QueueCommand::Ack {
893                queue,
894                group,
895                message_id,
896                delivery_id,
897            } => {
898                let store = self.inner.db.store();
899                ensure_queue_exists(store.as_ref(), queue)?;
900                let (group_owned, message_entity) = resolve_ack_nack_handle(
901                    store.as_ref(),
902                    queue,
903                    group,
904                    message_id,
905                    delivery_id.as_deref(),
906                )?;
907                let group_ref = group_owned.as_str();
908                require_queue_group(store.as_ref(), queue, group_ref)?;
909                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
910                let did = match delivery_id.as_deref() {
911                    Some(d) => d.to_string(),
912                    None => ps
913                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
914                        .ok_or_else(|| {
915                            RedDBError::NotFound(format!(
916                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
917                                message_entity.raw(),
918                                queue,
919                                group_ref
920                            ))
921                        })?,
922                };
923                lifecycle.ack(&txn, &did).map_err(map_qse)?;
924                self.invalidate_result_cache();
925
926                Ok(RuntimeQueryResult::ok_message(
927                    raw_query.to_string(),
928                    "message acknowledged",
929                    "update",
930                ))
931            }
932            QueueCommand::Nack {
933                queue,
934                group,
935                message_id,
936                delivery_id,
937            } => {
938                let store = self.inner.db.store();
939                ensure_queue_exists(store.as_ref(), queue)?;
940                let (group_owned, message_entity) = resolve_ack_nack_handle(
941                    store.as_ref(),
942                    queue,
943                    group,
944                    message_id,
945                    delivery_id.as_deref(),
946                )?;
947                let group_ref = group_owned.as_str();
948                require_queue_group(store.as_ref(), queue, group_ref)?;
949                let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
950                let did = match delivery_id.as_deref() {
951                    Some(d) => d.to_string(),
952                    None => ps
953                        .find_pending_by_key(queue, message_entity.raw(), group_ref)
954                        .ok_or_else(|| {
955                            RedDBError::NotFound(format!(
956                                "no pending delivery for message '{}' on queue '{}' (group '{}')",
957                                message_entity.raw(),
958                                queue,
959                                group_ref
960                            ))
961                        })?,
962                };
963                let message = match lifecycle.nack(&txn, &did).map_err(map_qse)? {
964                    RetirementOutcome::Requeued => "message requeued".to_string(),
965                    RetirementOutcome::MovedToDlq(dlq) => {
966                        format!("message moved to dead-letter queue '{}'", dlq)
967                    }
968                    RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
969                };
970                self.invalidate_result_cache();
971
972                Ok(RuntimeQueryResult::ok_message(
973                    raw_query.to_string(),
974                    &message,
975                    "update",
976                ))
977            }
978            QueueCommand::Move {
979                source,
980                destination,
981                filter,
982                limit,
983            } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
984        }
985    }
986
987    pub fn execute_queue_select(
988        &self,
989        raw_query: &str,
990        query: &QueueSelectQuery,
991    ) -> RedDBResult<RuntimeQueryResult> {
992        let store = self.inner.db.store();
993        ensure_queue_exists(store.as_ref(), &query.queue)?;
994        let config = load_queue_config(store.as_ref(), &query.queue);
995        let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
996        let columns = if query.columns.is_empty() {
997            queue_projection_default_columns()
998        } else {
999            query.columns.clone()
1000        };
1001
1002        let mut messages =
1003            load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1004        sort_queue_messages(&mut messages, &config, QueueSide::Left);
1005
1006        let mut result = UnifiedResult::with_columns(columns.clone());
1007        for message in messages {
1008            if query
1009                .filter
1010                .as_ref()
1011                .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1012            {
1013                continue;
1014            }
1015            let record = queue_projection_record(&columns, &message, dlq)?;
1016            result.push(record);
1017            if query
1018                .limit
1019                .is_some_and(|limit| result.records.len() >= limit as usize)
1020            {
1021                break;
1022            }
1023        }
1024
1025        Ok(RuntimeQueryResult {
1026            query: raw_query.to_string(),
1027            mode: QueryMode::Sql,
1028            statement: "queue_select",
1029            engine: "runtime-queue",
1030            result,
1031            affected_rows: 0,
1032            statement_type: "select",
1033        })
1034    }
1035
1036    fn execute_queue_move(
1037        &self,
1038        raw_query: &str,
1039        source: &str,
1040        destination: &str,
1041        filter: Option<&Filter>,
1042        limit: usize,
1043    ) -> RedDBResult<RuntimeQueryResult> {
1044        if source == destination {
1045            return Err(RedDBError::Query(
1046                "QUEUE MOVE source and destination must be different".to_string(),
1047            ));
1048        }
1049        let store = self.inner.db.store();
1050        ensure_queue_exists(store.as_ref(), source)?;
1051        ensure_queue_exists(store.as_ref(), destination)?;
1052        let source_config = load_queue_config(store.as_ref(), source);
1053        let destination_config = load_queue_config(store.as_ref(), destination);
1054        let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1055
1056        let mut messages =
1057            load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1058        sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1059        let selected = messages
1060            .into_iter()
1061            .filter(|message| {
1062                filter
1063                    .map(|f| queue_message_matches_filter(message, source_dlq, f))
1064                    .unwrap_or(true)
1065            })
1066            .take(limit)
1067            .collect::<Vec<_>>();
1068
1069        if let Some(max_size) = destination_config.max_size {
1070            let current_len =
1071                load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1072                    .len();
1073            if current_len + selected.len() > max_size {
1074                return Err(RedDBError::Query(format!(
1075                    "queue '{}' is full (max_size={max_size})",
1076                    destination
1077                )));
1078            }
1079        }
1080
1081        for message in &selected {
1082            let lock = queue_message_lock_handle(self, source, message.id);
1083            let Some(_guard) = lock.try_lock() else {
1084                return Err(RedDBError::Query(format!(
1085                    "message '{}' is locked on queue '{}'",
1086                    message.id.raw(),
1087                    source
1088                )));
1089            };
1090            if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1091                return Err(RedDBError::Query(format!(
1092                    "message '{}' is no longer available on queue '{}'",
1093                    message.id.raw(),
1094                    source
1095                )));
1096            }
1097        }
1098
1099        let mut inserted = Vec::new();
1100        for message in &selected {
1101            match insert_moved_queue_message(
1102                store.as_ref(),
1103                destination,
1104                &destination_config,
1105                message,
1106            ) {
1107                Ok(id) => inserted.push(id),
1108                Err(err) => {
1109                    for id in inserted {
1110                        let _ = store.delete(destination, id);
1111                    }
1112                    return Err(err);
1113                }
1114            }
1115        }
1116
1117        let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1118        for message in &selected {
1119            move_lifecycle
1120                .delete_with_state(source, message.id.raw(), &move_txn)
1121                .map_err(map_qse)?;
1122        }
1123        if !selected.is_empty() {
1124            self.invalidate_result_cache();
1125        }
1126
1127        let selected_count = selected.len() as u64;
1128        self.audit_log().record_event(
1129            AuditEvent::builder("queue/move")
1130                .source(AuditAuthSource::System)
1131                .outcome(Outcome::Success)
1132                .resource(format!("queue:{source}->{destination}"))
1133                .fields([
1134                    AuditFieldEscaper::field("source", source),
1135                    AuditFieldEscaper::field("destination", destination),
1136                    AuditFieldEscaper::field("selected", selected_count),
1137                    AuditFieldEscaper::field("committed", selected_count),
1138                ])
1139                .build(),
1140        );
1141
1142        let mut result = UnifiedResult::with_columns(vec![
1143            "source".into(),
1144            "destination".into(),
1145            "selected".into(),
1146            "committed".into(),
1147        ]);
1148        let mut record = UnifiedRecord::new();
1149        record.set("source", Value::text(source.to_string()));
1150        record.set("destination", Value::text(destination.to_string()));
1151        record.set("selected", Value::UnsignedInteger(selected_count));
1152        record.set("committed", Value::UnsignedInteger(selected_count));
1153        result.push(record);
1154
1155        Ok(RuntimeQueryResult {
1156            query: raw_query.to_string(),
1157            mode: QueryMode::Sql,
1158            statement: "queue_move",
1159            engine: "runtime-queue",
1160            result,
1161            affected_rows: selected_count,
1162            statement_type: "update",
1163        })
1164    }
1165}
1166
1167fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1168    if store.get_collection(queue).is_some() {
1169        Ok(())
1170    } else {
1171        Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1172    }
1173}
1174
1175pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1176    let default = QueueRuntimeConfig {
1177        mode: QueueMode::Work,
1178        priority: false,
1179        max_size: None,
1180        ttl_ms: None,
1181        dlq: None,
1182        max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1183        lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1184        in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1185    };
1186
1187    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1188        return default;
1189    };
1190    manager
1191        .query_all(|entity| {
1192            entity.data.as_row().is_some_and(|row| {
1193                row_text(row, "kind").as_deref() == Some("queue_config")
1194                    && row_text(row, "queue").as_deref() == Some(queue)
1195            })
1196        })
1197        .into_iter()
1198        .find_map(|entity| {
1199            let row = entity.data.as_row()?;
1200            Some(QueueRuntimeConfig {
1201                mode: row_text(row, "mode")
1202                    .as_deref()
1203                    .and_then(QueueMode::parse)
1204                    .unwrap_or_default(),
1205                priority: row_bool(row, "priority").unwrap_or(false),
1206                max_size: row_u64(row, "max_size").map(|value| value as usize),
1207                ttl_ms: row_u64(row, "ttl_ms"),
1208                dlq: row_text(row, "dlq"),
1209                max_attempts: row_u64(row, "max_attempts")
1210                    .map(|value| value as u32)
1211                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1212                lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1213                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1214                in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1215                    .map(|value| value as u32)
1216                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1217            })
1218        })
1219        .unwrap_or(default)
1220}
1221
1222pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1223    load_queue_config(store, queue).mode.as_str()
1224}
1225
1226fn save_queue_config(
1227    store: &UnifiedStore,
1228    queue: &str,
1229    config: &QueueRuntimeConfig,
1230) -> RedDBResult<()> {
1231    remove_meta_rows(store, |row| {
1232        row_text(row, "kind").as_deref() == Some("queue_config")
1233            && row_text(row, "queue").as_deref() == Some(queue)
1234    });
1235
1236    let mut fields = HashMap::new();
1237    fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1238    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1239    fields.insert(
1240        "mode".to_string(),
1241        Value::text(config.mode.as_str().to_string()),
1242    );
1243    fields.insert("priority".to_string(), Value::Boolean(config.priority));
1244    fields.insert(
1245        "max_size".to_string(),
1246        config
1247            .max_size
1248            .map(|value| Value::UnsignedInteger(value as u64))
1249            .unwrap_or(Value::Null),
1250    );
1251    fields.insert(
1252        "ttl_ms".to_string(),
1253        config
1254            .ttl_ms
1255            .map(Value::UnsignedInteger)
1256            .unwrap_or(Value::Null),
1257    );
1258    fields.insert(
1259        "dlq".to_string(),
1260        config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1261    );
1262    fields.insert(
1263        "max_attempts".to_string(),
1264        Value::UnsignedInteger(u64::from(config.max_attempts)),
1265    );
1266    fields.insert(
1267        "lock_deadline_ms".to_string(),
1268        Value::UnsignedInteger(config.lock_deadline_ms),
1269    );
1270    fields.insert(
1271        "in_flight_cap_per_group".to_string(),
1272        Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1273    );
1274    insert_meta_row(store, fields)
1275}
1276
1277fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1278    remove_meta_rows(store, |row| {
1279        row_text(row, "queue").as_deref() == Some(queue)
1280    });
1281}
1282
1283fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1284    Ok(load_queue_groups(store, queue)?
1285        .into_iter()
1286        .any(|entry| entry.group == group))
1287}
1288
1289pub(super) fn require_queue_group(
1290    store: &UnifiedStore,
1291    queue: &str,
1292    group: &str,
1293) -> RedDBResult<()> {
1294    if queue_group_exists(store, queue, group)? {
1295        Ok(())
1296    } else {
1297        Err(RedDBError::NotFound(format!(
1298            "consumer group '{}' not found on queue '{}'",
1299            group, queue
1300        )))
1301    }
1302}
1303
1304pub(super) fn resolve_read_group(
1305    store: &UnifiedStore,
1306    queue: &str,
1307    group: Option<&str>,
1308    consumer: &str,
1309    config: &QueueRuntimeConfig,
1310) -> RedDBResult<String> {
1311    if let Some(group) = group {
1312        require_queue_group(store, queue, group)?;
1313        return Ok(group.to_string());
1314    }
1315
1316    match config.mode {
1317        QueueMode::Work => {
1318            if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1319                save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1320            }
1321            Ok(WORK_DEFAULT_GROUP.to_string())
1322        }
1323        QueueMode::Fanout => {
1324            let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1325            if !queue_group_exists(store, queue, &fanout_group)? {
1326                save_queue_group(store, queue, &fanout_group)?;
1327            }
1328            Ok(fanout_group)
1329        }
1330    }
1331}
1332
1333fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1334    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1335        return Ok(Vec::new());
1336    };
1337    Ok(manager
1338        .query_all(|entity| {
1339            entity.data.as_row().is_some_and(|row| {
1340                row_text(row, "kind").as_deref() == Some("queue_group")
1341                    && row_text(row, "queue").as_deref() == Some(queue)
1342            })
1343        })
1344        .into_iter()
1345        .filter_map(|entity| {
1346            let row = entity.data.as_row()?;
1347            Some(QueueGroupEntry {
1348                entity_id: entity.id,
1349                group: row_text(row, "group")?,
1350            })
1351        })
1352        .collect())
1353}
1354
1355fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1356    let mut fields = HashMap::new();
1357    fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1358    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1359    fields.insert("group".to_string(), Value::text(group.to_string()));
1360    fields.insert(
1361        "created_at_ns".to_string(),
1362        Value::UnsignedInteger(now_ns()),
1363    );
1364    insert_meta_row(store, fields)
1365}
1366
1367pub(super) fn load_pending_entries(
1368    store: &UnifiedStore,
1369    queue: &str,
1370    group: Option<&str>,
1371    message_id: Option<EntityId>,
1372) -> RedDBResult<Vec<QueuePendingEntry>> {
1373    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1374        return Ok(Vec::new());
1375    };
1376    Ok(manager
1377        .query_all(|entity| {
1378            entity.data.as_row().is_some_and(|row| {
1379                row_text(row, "kind").as_deref() == Some("queue_pending")
1380                    && row_text(row, "queue").as_deref() == Some(queue)
1381                    && group
1382                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1383                        .unwrap_or(true)
1384                    && message_id
1385                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1386                        .unwrap_or(true)
1387            })
1388        })
1389        .into_iter()
1390        .filter_map(|entity| {
1391            let row = entity.data.as_row()?;
1392            Some(QueuePendingEntry {
1393                entity_id: entity.id,
1394                group: row_text(row, "group")?,
1395                message_id: EntityId::new(row_u64(row, "message_id")?),
1396                consumer: row_text(row, "consumer")?,
1397                delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1398                delivery_count: row_u64(row, "delivery_count")
1399                    .map(|value| value as u32)
1400                    .unwrap_or(1),
1401            })
1402        })
1403        .collect())
1404}
1405
1406pub(super) fn save_queue_pending(
1407    store: &UnifiedStore,
1408    queue: &str,
1409    group: &str,
1410    message_id: EntityId,
1411    consumer: &str,
1412    delivered_at_ns: u64,
1413    delivery_count: u32,
1414) -> RedDBResult<()> {
1415    remove_meta_rows(store, |row| {
1416        row_text(row, "kind").as_deref() == Some("queue_pending")
1417            && row_text(row, "queue").as_deref() == Some(queue)
1418            && row_text(row, "group").as_deref() == Some(group)
1419            && row_u64(row, "message_id") == Some(message_id.raw())
1420    });
1421
1422    let mut fields = HashMap::new();
1423    fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1424    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1425    fields.insert("group".to_string(), Value::text(group.to_string()));
1426    fields.insert(
1427        "message_id".to_string(),
1428        Value::UnsignedInteger(message_id.raw()),
1429    );
1430    fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1431    fields.insert(
1432        "delivered_at_ns".to_string(),
1433        Value::UnsignedInteger(delivered_at_ns),
1434    );
1435    fields.insert(
1436        "delivery_count".to_string(),
1437        Value::UnsignedInteger(u64::from(delivery_count)),
1438    );
1439    insert_meta_row(store, fields)
1440}
1441
1442pub(super) fn require_pending_entry(
1443    store: &UnifiedStore,
1444    queue: &str,
1445    group: &str,
1446    message_id: EntityId,
1447) -> RedDBResult<QueuePendingEntry> {
1448    load_pending_entries(store, queue, Some(group), Some(message_id))?
1449        .into_iter()
1450        .next()
1451        .ok_or_else(|| {
1452            RedDBError::NotFound(format!(
1453                "message '{}' is not pending in group '{}' on queue '{}'",
1454                message_id.raw(),
1455                group,
1456                queue
1457            ))
1458        })
1459}
1460
1461pub(super) fn load_ack_entries(
1462    store: &UnifiedStore,
1463    queue: &str,
1464    group: Option<&str>,
1465    message_id: Option<EntityId>,
1466) -> RedDBResult<Vec<QueueAckEntry>> {
1467    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1468        return Ok(Vec::new());
1469    };
1470    Ok(manager
1471        .query_all(|entity| {
1472            entity.data.as_row().is_some_and(|row| {
1473                row_text(row, "kind").as_deref() == Some("queue_ack")
1474                    && row_text(row, "queue").as_deref() == Some(queue)
1475                    && group
1476                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1477                        .unwrap_or(true)
1478                    && message_id
1479                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1480                        .unwrap_or(true)
1481            })
1482        })
1483        .into_iter()
1484        .filter_map(|entity| {
1485            let row = entity.data.as_row()?;
1486            Some(QueueAckEntry {
1487                entity_id: entity.id,
1488                group: row_text(row, "group")?,
1489                message_id: EntityId::new(row_u64(row, "message_id")?),
1490            })
1491        })
1492        .collect())
1493}
1494
1495pub(super) fn save_queue_ack(
1496    store: &UnifiedStore,
1497    queue: &str,
1498    group: &str,
1499    message_id: EntityId,
1500) -> RedDBResult<()> {
1501    let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1502    if !existing.is_empty() {
1503        return Ok(());
1504    }
1505
1506    let mut fields = HashMap::new();
1507    fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1508    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1509    fields.insert("group".to_string(), Value::text(group.to_string()));
1510    fields.insert(
1511        "message_id".to_string(),
1512        Value::UnsignedInteger(message_id.raw()),
1513    );
1514    fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1515    insert_meta_row(store, fields)
1516}
1517
1518pub(super) fn queue_message_completed_for_all_groups(
1519    store: &UnifiedStore,
1520    queue: &str,
1521    message_id: EntityId,
1522) -> RedDBResult<bool> {
1523    let groups = load_queue_groups(store, queue)?;
1524    let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1525    if !pending.is_empty() {
1526        return Ok(false);
1527    }
1528    if groups.is_empty() {
1529        return Ok(true);
1530    }
1531
1532    let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1533        .into_iter()
1534        .map(|entry| entry.group)
1535        .collect::<HashSet<_>>();
1536    Ok(groups
1537        .into_iter()
1538        .all(|group| acked_groups.contains(&group.group)))
1539}
1540
1541fn load_queue_message_views(
1542    store: &UnifiedStore,
1543    queue: &str,
1544) -> RedDBResult<Vec<QueueMessageView>> {
1545    load_queue_message_views_with_runtime(None, store, queue)
1546}
1547
1548/// Kind-aware queue scan (Phase 2.5.5 RLS universal). When the
1549/// caller has a `RedDBRuntime` reference, the gate also applies
1550/// any `CREATE POLICY ... ON MESSAGES OF <queue>` predicate. In
1551/// autocommit / embedded paths that only have the raw store (e.g.
1552/// purge loops) we skip RLS because there's no session identity
1553/// to match against.
1554pub(super) fn load_queue_message_views_with_runtime(
1555    runtime: Option<&RedDBRuntime>,
1556    store: &UnifiedStore,
1557    queue: &str,
1558) -> RedDBResult<Vec<QueueMessageView>> {
1559    let manager = store
1560        .get_collection(queue)
1561        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1562    // Phase 1.2 MVCC universal: capture before parallel scan. Messages
1563    // inserted by another connection's open txn stay invisible to
1564    // consumers until that txn commits (prevents phantom POPs).
1565    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1566    let rls_filter = runtime.and_then(|rt| {
1567        crate::runtime::impl_core::rls_policy_filter_for_kind(
1568            rt,
1569            queue,
1570            crate::storage::query::ast::PolicyAction::Select,
1571            crate::storage::query::ast::PolicyTargetKind::Messages,
1572        )
1573    });
1574    let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1575        && rls_filter.is_none()
1576        && runtime.is_some();
1577    if rls_enabled_but_denied {
1578        // RLS on + no Messages policy for this role = deny-default.
1579        return Ok(Vec::new());
1580    }
1581    let filter_arc = rls_filter.map(std::sync::Arc::new);
1582    let rt_arc = runtime;
1583    Ok(manager
1584        .query_all(move |entity| {
1585            if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1586                return false;
1587            }
1588            if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1589                return false;
1590            }
1591            if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1592                return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1593                    Some(&rt.inner.db),
1594                    entity,
1595                    filter,
1596                    queue,
1597                    queue,
1598                );
1599            }
1600            true
1601        })
1602        .into_iter()
1603        .filter_map(queue_message_view_from_entity)
1604        .collect())
1605}
1606
1607fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1608    let (position, _) = match &entity.kind {
1609        EntityKind::QueueMessage { position, queue } => (*position, queue),
1610        _ => return None,
1611    };
1612    let data = match entity.data {
1613        EntityData::QueueMessage(data) => data,
1614        _ => return None,
1615    };
1616    Some(QueueMessageView {
1617        id: entity.id,
1618        position,
1619        priority: data.priority.unwrap_or(0),
1620        payload: data.payload,
1621        attempts: data.attempts,
1622        max_attempts: data.max_attempts,
1623        enqueued_at_ns: data.enqueued_at_ns,
1624    })
1625}
1626
1627/// Insert a moved payload onto `queue` using only the payload value —
1628/// priority / attempts / TTL fall back to the destination queue's
1629/// catalog config (mirrors a fresh enqueue rather than carrying source
1630/// metadata over). Used by `PrimaryQueueStore::move_to_queue`, the
1631/// `QueueLifecycle::move_between_queues` adapter that owns only
1632/// `(message_id, payload)` after `pop_messages` retires the source row.
1633pub(super) fn insert_moved_queue_message_payload(
1634    store: &UnifiedStore,
1635    queue: &str,
1636    payload: &Value,
1637) -> RedDBResult<EntityId> {
1638    let config = load_queue_config(store, queue);
1639    let position = next_queue_position(store, queue, QueueSide::Right)?;
1640    let enqueued_at_ns = std::time::SystemTime::now()
1641        .duration_since(std::time::UNIX_EPOCH)
1642        .map(|d| d.as_nanos() as u64)
1643        .unwrap_or(0);
1644    let entity = UnifiedEntity::new(
1645        EntityId::new(0),
1646        EntityKind::QueueMessage {
1647            queue: queue.to_string(),
1648            position,
1649        },
1650        EntityData::QueueMessage(QueueMessageData {
1651            payload: payload.clone(),
1652            priority: None,
1653            enqueued_at_ns,
1654            attempts: 0,
1655            max_attempts: config.max_attempts,
1656            acked: false,
1657        }),
1658    );
1659    let id = store
1660        .insert_auto(queue, entity)
1661        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1662    if let Some(ttl_ms) = config.ttl_ms {
1663        store
1664            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1665            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1666    }
1667    Ok(id)
1668}
1669
1670fn insert_moved_queue_message(
1671    store: &UnifiedStore,
1672    queue: &str,
1673    config: &QueueRuntimeConfig,
1674    message: &QueueMessageView,
1675) -> RedDBResult<EntityId> {
1676    let position = next_queue_position(store, queue, QueueSide::Right)?;
1677    let entity = UnifiedEntity::new(
1678        EntityId::new(0),
1679        EntityKind::QueueMessage {
1680            queue: queue.to_string(),
1681            position,
1682        },
1683        EntityData::QueueMessage(QueueMessageData {
1684            payload: message.payload.clone(),
1685            priority: if config.priority {
1686                Some(message.priority)
1687            } else {
1688                None
1689            },
1690            enqueued_at_ns: message.enqueued_at_ns,
1691            attempts: message.attempts,
1692            max_attempts: message.max_attempts,
1693            acked: false,
1694        }),
1695    );
1696    let id = store
1697        .insert_auto(queue, entity)
1698        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1699    if let Some(ttl_ms) = config.ttl_ms {
1700        store
1701            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1702            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1703    }
1704    Ok(id)
1705}
1706
1707fn queue_projection_default_columns() -> Vec<String> {
1708    [
1709        "id",
1710        "payload",
1711        "priority",
1712        "attempts",
1713        "last_error",
1714        "enqueued_at",
1715        "available_at",
1716        "dlq",
1717        "tenant",
1718    ]
1719    .into_iter()
1720    .map(str::to_string)
1721    .collect()
1722}
1723
1724fn queue_projection_record(
1725    columns: &[String],
1726    message: &QueueMessageView,
1727    dlq: bool,
1728) -> RedDBResult<UnifiedRecord> {
1729    let mut record = UnifiedRecord::new();
1730    for column in columns {
1731        let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
1732            RedDBError::Query(format!("unknown queue projection column '{}'", column))
1733        })?;
1734        record.set(column, value);
1735    }
1736    Ok(record)
1737}
1738
1739fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
1740    match column {
1741        "id" => Some(Value::text(message_id_string(message.id))),
1742        "payload" => Some(message.payload.clone()),
1743        "priority" => Some(Value::Integer(i64::from(message.priority))),
1744        "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
1745        "last_error" => Some(Value::Null),
1746        "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1747        "available_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1748        "dlq" => Some(Value::Boolean(dlq)),
1749        "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
1750        _ => None,
1751    }
1752}
1753
1754fn queue_message_tenant(payload: &Value) -> Option<Value> {
1755    let Value::Json(bytes) = payload else {
1756        return None;
1757    };
1758    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1759    json.get("tenant")
1760        .and_then(crate::json::Value::as_str)
1761        .map(|tenant| Value::text(tenant.to_string()))
1762}
1763
1764fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
1765    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1766        return false;
1767    };
1768    !manager
1769        .query_all(|entity| {
1770            entity.data.as_row().is_some_and(|row| {
1771                row_text(row, "kind").as_deref() == Some("queue_config")
1772                    && row_text(row, "dlq").as_deref() == Some(queue)
1773            })
1774        })
1775        .is_empty()
1776}
1777
1778fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
1779    match filter {
1780        Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
1781            .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
1782        Filter::CompareFields { left, op, right } => {
1783            match (
1784                queue_filter_field_value(message, dlq, left),
1785                queue_filter_field_value(message, dlq, right),
1786            ) {
1787                (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
1788                _ => false,
1789            }
1790        }
1791        Filter::And(left, right) => {
1792            queue_message_matches_filter(message, dlq, left)
1793                && queue_message_matches_filter(message, dlq, right)
1794        }
1795        Filter::Or(left, right) => {
1796            queue_message_matches_filter(message, dlq, left)
1797                || queue_message_matches_filter(message, dlq, right)
1798        }
1799        Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
1800        Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
1801            .is_none_or(|value| matches!(value, Value::Null)),
1802        Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
1803            .is_some_and(|value| !matches!(value, Value::Null)),
1804        Filter::In { field, values } => {
1805            queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
1806                values
1807                    .iter()
1808                    .any(|value| queue_values_equal(&candidate, value))
1809            })
1810        }
1811        Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
1812            .is_some_and(|candidate| {
1813                queue_compare_values(&candidate, low, CompareOp::Ge)
1814                    && queue_compare_values(&candidate, high, CompareOp::Le)
1815            }),
1816        Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
1817            .is_some_and(|value| queue_like_matches(&value, pattern)),
1818        Filter::StartsWith { field, prefix } => {
1819            queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
1820        }
1821        Filter::EndsWith { field, suffix } => {
1822            queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
1823        }
1824        Filter::Contains { field, substring } => {
1825            queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
1826        }
1827        Filter::CompareExpr { .. } => false,
1828    }
1829}
1830
1831fn queue_filter_field_value(
1832    message: &QueueMessageView,
1833    dlq: bool,
1834    field: &FieldRef,
1835) -> Option<Value> {
1836    match field {
1837        FieldRef::TableColumn { table, column } if table.is_empty() => {
1838            queue_projection_value(message, dlq, column)
1839                .or_else(|| queue_payload_field_value(&message.payload, column))
1840        }
1841        FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
1842            .or_else(|| queue_payload_field_value(&message.payload, column)),
1843        _ => None,
1844    }
1845}
1846
1847fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
1848    let Value::Json(bytes) = payload else {
1849        return None;
1850    };
1851    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1852    let value = json.get(field)?;
1853    json_value_to_schema_value(value)
1854}
1855
1856fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
1857    if matches!(value, crate::json::Value::Null) {
1858        Some(Value::Null)
1859    } else if let Some(value) = value.as_bool() {
1860        Some(Value::Boolean(value))
1861    } else if let Some(value) = value.as_i64() {
1862        Some(Value::Integer(value))
1863    } else if let Some(value) = value.as_u64() {
1864        Some(Value::UnsignedInteger(value))
1865    } else if let Some(value) = value.as_f64() {
1866        Some(Value::Float(value))
1867    } else if let Some(value) = value.as_str() {
1868        Some(Value::text(value.to_string()))
1869    } else {
1870        Some(Value::Json(value.to_string_compact().into_bytes()))
1871    }
1872}
1873
1874fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
1875    queue_filter_field_value(message, dlq, field).and_then(|value| match value {
1876        Value::Text(value) => Some(value.to_string()),
1877        Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
1878        Value::Integer(value) => Some(value.to_string()),
1879        Value::UnsignedInteger(value) => Some(value.to_string()),
1880        Value::Float(value) => Some(value.to_string()),
1881        Value::Boolean(value) => Some(value.to_string()),
1882        _ => None,
1883    })
1884}
1885
1886fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
1887    match op {
1888        CompareOp::Eq => queue_values_equal(left, right),
1889        CompareOp::Ne => !queue_values_equal(left, right),
1890        CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
1891        CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
1892        CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
1893        CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
1894    }
1895}
1896
1897fn queue_values_equal(left: &Value, right: &Value) -> bool {
1898    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1899        return (left - right).abs() < f64::EPSILON;
1900    }
1901    match (left, right) {
1902        (Value::Text(left), Value::Text(right)) => left == right,
1903        (Value::Boolean(left), Value::Boolean(right)) => left == right,
1904        _ => left == right,
1905    }
1906}
1907
1908fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
1909    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1910        return left.partial_cmp(&right);
1911    }
1912    match (left, right) {
1913        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1914        _ => None,
1915    }
1916}
1917
1918fn queue_value_number(value: &Value) -> Option<f64> {
1919    match value {
1920        Value::Integer(value) => Some(*value as f64),
1921        Value::UnsignedInteger(value) => Some(*value as f64),
1922        Value::Float(value) => Some(*value),
1923        Value::Text(value) => value.parse().ok(),
1924        _ => None,
1925    }
1926}
1927
1928fn queue_like_matches(value: &str, pattern: &str) -> bool {
1929    if pattern == "%" {
1930        return true;
1931    }
1932    let starts_wild = pattern.starts_with('%');
1933    let ends_wild = pattern.ends_with('%');
1934    let needle = pattern.trim_matches('%');
1935    match (starts_wild, ends_wild) {
1936        (true, true) => value.contains(needle),
1937        (true, false) => value.ends_with(needle),
1938        (false, true) => value.starts_with(needle),
1939        (false, false) => value == needle,
1940    }
1941}
1942
1943pub(super) fn queue_message_view_by_id(
1944    store: &UnifiedStore,
1945    queue: &str,
1946    message_id: EntityId,
1947) -> RedDBResult<Option<QueueMessageView>> {
1948    let manager = queue_manager(store, queue)?;
1949    Ok(manager
1950        .get(message_id)
1951        .and_then(queue_message_view_from_entity))
1952}
1953
1954pub(super) fn sort_queue_messages(
1955    messages: &mut [QueueMessageView],
1956    config: &QueueRuntimeConfig,
1957    side: QueueSide,
1958) {
1959    messages.sort_by(|left, right| {
1960        if config.priority {
1961            right
1962                .priority
1963                .cmp(&left.priority)
1964                .then_with(|| match side {
1965                    QueueSide::Left => left.position.cmp(&right.position),
1966                    QueueSide::Right => right.position.cmp(&left.position),
1967                })
1968                .then_with(|| left.id.raw().cmp(&right.id.raw()))
1969        } else {
1970            match side {
1971                QueueSide::Left => left.position.cmp(&right.position),
1972                QueueSide::Right => right.position.cmp(&left.position),
1973            }
1974            .then_with(|| left.id.raw().cmp(&right.id.raw()))
1975        }
1976    });
1977}
1978
1979pub(super) fn next_queue_position(
1980    store: &UnifiedStore,
1981    queue: &str,
1982    side: QueueSide,
1983) -> RedDBResult<u64> {
1984    let messages = load_queue_message_views(store, queue)?;
1985    if messages.is_empty() {
1986        return Ok(QUEUE_POSITION_CENTER);
1987    }
1988    match side {
1989        QueueSide::Left => Ok(messages
1990            .iter()
1991            .map(|message| message.position)
1992            .min()
1993            .unwrap_or(QUEUE_POSITION_CENTER)
1994            .saturating_sub(1)),
1995        QueueSide::Right => Ok(messages
1996            .iter()
1997            .map(|message| message.position)
1998            .max()
1999            .unwrap_or(QUEUE_POSITION_CENTER)
2000            .saturating_add(1)),
2001    }
2002}
2003
2004pub(super) fn increment_queue_attempts(
2005    store: &UnifiedStore,
2006    queue: &str,
2007    message_id: EntityId,
2008) -> RedDBResult<u32> {
2009    let manager = queue_manager(store, queue)?;
2010    let mut entity = manager
2011        .get(message_id)
2012        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2013    match &mut entity.data {
2014        EntityData::QueueMessage(message) => {
2015            message.attempts = message.attempts.saturating_add(1);
2016            let attempts = message.attempts;
2017            manager
2018                .update(entity)
2019                .map_err(|err| RedDBError::Internal(err.to_string()))?;
2020            Ok(attempts)
2021        }
2022        _ => Err(RedDBError::Query(format!(
2023            "entity '{}' is not a queue message",
2024            message_id.raw()
2025        ))),
2026    }
2027}
2028
2029pub(super) fn queue_message_attempts(
2030    store: &UnifiedStore,
2031    queue: &str,
2032    message_id: EntityId,
2033) -> RedDBResult<u32> {
2034    Ok(queue_message_data(store, queue, message_id)?.attempts)
2035}
2036
2037pub(super) fn queue_message_max_attempts(
2038    store: &UnifiedStore,
2039    queue: &str,
2040    message_id: EntityId,
2041) -> RedDBResult<u32> {
2042    Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2043}
2044
2045pub(super) fn queue_message_payload(
2046    store: &UnifiedStore,
2047    queue: &str,
2048    message_id: EntityId,
2049) -> RedDBResult<Value> {
2050    Ok(queue_message_data(store, queue, message_id)?.payload)
2051}
2052
2053pub(super) fn queue_message_pending_any(
2054    store: &UnifiedStore,
2055    queue: &str,
2056    message_id: EntityId,
2057) -> RedDBResult<bool> {
2058    Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2059}
2060
2061pub(super) fn queue_message_pending_for_group(
2062    store: &UnifiedStore,
2063    queue: &str,
2064    group: &str,
2065    message_id: EntityId,
2066) -> RedDBResult<bool> {
2067    Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2068}
2069
2070pub(super) fn queue_message_acked_for_group(
2071    store: &UnifiedStore,
2072    queue: &str,
2073    group: &str,
2074    message_id: EntityId,
2075) -> RedDBResult<bool> {
2076    Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2077}
2078
2079fn queue_manager(
2080    store: &UnifiedStore,
2081    queue: &str,
2082) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2083    store
2084        .get_collection(queue)
2085        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2086}
2087
2088pub(super) fn queue_message_data(
2089    store: &UnifiedStore,
2090    queue: &str,
2091    message_id: EntityId,
2092) -> RedDBResult<QueueMessageData> {
2093    let manager = queue_manager(store, queue)?;
2094    let entity = manager
2095        .get(message_id)
2096        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2097    match entity.data {
2098        EntityData::QueueMessage(message) => Ok(message),
2099        _ => Err(RedDBError::Query(format!(
2100            "entity '{}' is not a queue message",
2101            message_id.raw()
2102        ))),
2103    }
2104}
2105
2106fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2107    let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2108    store
2109        .insert_auto(
2110            QUEUE_META_COLLECTION,
2111            UnifiedEntity::new(
2112                EntityId::new(0),
2113                EntityKind::TableRow {
2114                    table: Arc::from(QUEUE_META_COLLECTION),
2115                    row_id: 0,
2116                },
2117                EntityData::Row(RowData {
2118                    columns: Vec::new(),
2119                    named: Some(fields),
2120                    schema: None,
2121                }),
2122            ),
2123        )
2124        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2125    Ok(())
2126}
2127
2128pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2129    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2130        return;
2131    };
2132    let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2133    for row in rows {
2134        let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2135    }
2136}
2137
2138pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2139    let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2140}
2141
2142fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2143    format!("{queue}:{}", message_id.raw())
2144}
2145
2146pub(super) fn queue_message_lock_handle(
2147    runtime: &RedDBRuntime,
2148    queue: &str,
2149    message_id: EntityId,
2150) -> Arc<parking_lot::Mutex<()>> {
2151    let key = queue_message_lock_key(queue, message_id);
2152    if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2153        return lock;
2154    }
2155
2156    let mut locks = runtime.inner.queue_message_locks.write();
2157    locks
2158        .entry(key)
2159        .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2160        .clone()
2161}
2162
2163pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2164    runtime
2165        .inner
2166        .queue_message_locks
2167        .write()
2168        .remove(&queue_message_lock_key(queue, message_id));
2169}
2170
2171fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2172    let raw = value.strip_prefix('e').unwrap_or(value);
2173    raw.parse::<u64>()
2174        .map(EntityId::new)
2175        .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2176}
2177
2178/// ADR 0026: resolve the ACK/NACK handle. When `delivery_id` is supplied,
2179/// it wins unconditionally — strict failure if the handle does not resolve
2180/// to a live pending delivery on `queue`. When only the legacy tuple is
2181/// supplied, emit a rate-limited deprecation log line and use the tuple.
2182/// At least one handle must be present.
2183pub(super) fn resolve_ack_nack_handle(
2184    store: &UnifiedStore,
2185    queue: &str,
2186    group_hint: &str,
2187    message_id_hint: &str,
2188    delivery_id: Option<&str>,
2189) -> RedDBResult<(String, EntityId)> {
2190    if let Some(did) = delivery_id {
2191        return resolve_delivery_id(store, queue, did);
2192    }
2193    if group_hint.is_empty() || message_id_hint.is_empty() {
2194        return Err(RedDBError::Query(
2195            "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2196                .to_string(),
2197        ));
2198    }
2199    log_tuple_deprecation(queue);
2200    let entity = parse_message_id(message_id_hint)?;
2201    Ok((group_hint.to_string(), entity))
2202}
2203
2204fn resolve_delivery_id(
2205    store: &UnifiedStore,
2206    queue: &str,
2207    delivery_id: &str,
2208) -> RedDBResult<(String, EntityId)> {
2209    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2210        return Err(RedDBError::Query(format!(
2211            "delivery_id '{}' does not resolve to a live pending delivery",
2212            delivery_id
2213        )));
2214    };
2215    for entity in manager.query_all(|entity| {
2216        entity.data.as_row().is_some_and(|row| {
2217            row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2218                && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2219        })
2220    }) {
2221        if let Some(row) = entity.data.as_row() {
2222            let row_queue = row_text(row, "queue").unwrap_or_default();
2223            let row_group = row_text(row, "group").unwrap_or_default();
2224            let row_message = row_u64(row, "message_id").unwrap_or(0);
2225            if row_queue != queue {
2226                return Err(RedDBError::Query(format!(
2227                    "delivery_id '{}' belongs to queue '{}', not '{}'",
2228                    delivery_id, row_queue, queue
2229                )));
2230            }
2231            return Ok((row_group, EntityId::new(row_message)));
2232        }
2233    }
2234    Err(RedDBError::Query(format!(
2235        "delivery_id '{}' does not resolve to a live pending delivery",
2236        delivery_id
2237    )))
2238}
2239
2240/// Per-(connection, queue) rate-limited "tuple ACK is deprecated" log line.
2241/// One emission per minute matches ADR 0026's operational guidance.
2242fn log_tuple_deprecation(queue: &str) {
2243    use std::sync::{Mutex, OnceLock};
2244    use std::time::Instant;
2245
2246    static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2247    const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2248
2249    let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2250    let key = (super::impl_core::current_connection_id(), queue.to_string());
2251    let now = Instant::now();
2252    let mut guard = match map.lock() {
2253        Ok(g) => g,
2254        Err(_) => return,
2255    };
2256    let should_emit =
2257        !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2258    if should_emit {
2259        guard.insert(key.clone(), now);
2260        drop(guard);
2261        tracing::warn!(
2262            target: "reddb::queue_lifecycle",
2263            queue = queue,
2264            connection_id = key.0,
2265            "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2266             switch to the server-issued delivery_id (ADR 0026). \
2267             The tuple path will be removed one minor release after introduction.",
2268        );
2269    }
2270}
2271
2272fn message_id_string(message_id: EntityId) -> String {
2273    message_id.raw().to_string()
2274}
2275
2276/// Slice 10 of issue #527 — render-time scan of pending entries
2277/// per (queue, group) for `queue_pending_gauge` exposition. Walks
2278/// `red_queue_meta` live so the gauge cannot drift from the source
2279/// of truth.
2280pub(crate) fn pending_counts_by_group(
2281    store: &UnifiedStore,
2282) -> std::collections::BTreeMap<(String, String), u64> {
2283    let mut counts: std::collections::BTreeMap<(String, String), u64> =
2284        std::collections::BTreeMap::new();
2285    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2286        return counts;
2287    };
2288    for entity in manager.query_all(|entity| {
2289        entity
2290            .data
2291            .as_row()
2292            .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2293    }) {
2294        if let Some(row) = entity.data.as_row() {
2295            let queue = row_text(row, "queue");
2296            let group = row_text(row, "group");
2297            if let (Some(q), Some(g)) = (queue, group) {
2298                *counts.entry((q, g)).or_insert(0) += 1;
2299            }
2300        }
2301    }
2302    counts
2303}
2304
2305pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2306    match row.get_field(field)?.clone() {
2307        Value::Text(value) => Some(value.to_string()),
2308        Value::NodeRef(value) => Some(value),
2309        Value::EdgeRef(value) => Some(value),
2310        Value::TableRef(value) => Some(value),
2311        _ => None,
2312    }
2313}
2314
2315pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2316    match row.get_field(field)?.clone() {
2317        Value::UnsignedInteger(value) => Some(value),
2318        Value::Integer(value) if value >= 0 => Some(value as u64),
2319        Value::Float(value) if value >= 0.0 => Some(value as u64),
2320        Value::Text(value) => value.parse().ok(),
2321        _ => None,
2322    }
2323}
2324
2325fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2326    match row.get_field(field)?.clone() {
2327        Value::Boolean(value) => Some(value),
2328        Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2329            "true" => Some(true),
2330            "false" => Some(false),
2331            _ => None,
2332        },
2333        _ => None,
2334    }
2335}
2336
2337fn queue_collection_contract(
2338    name: &str,
2339    priority: bool,
2340    ttl_ms: Option<u64>,
2341) -> crate::physical::CollectionContract {
2342    let now = current_unix_ms();
2343    let mut context_index_fields = Vec::new();
2344    if priority {
2345        context_index_fields.push("priority".to_string());
2346    }
2347
2348    crate::physical::CollectionContract {
2349        name: name.to_string(),
2350        declared_model: crate::catalog::CollectionModel::Queue,
2351        schema_mode: crate::catalog::SchemaMode::Dynamic,
2352        origin: crate::physical::ContractOrigin::Explicit,
2353        version: 1,
2354        created_at_unix_ms: now,
2355        updated_at_unix_ms: now,
2356        default_ttl_ms: ttl_ms,
2357        vector_dimension: None,
2358        vector_metric: None,
2359        context_index_fields,
2360        declared_columns: Vec::new(),
2361        table_def: None,
2362        timestamps_enabled: false,
2363        context_index_enabled: false,
2364        metrics_raw_retention_ms: None,
2365        metrics_rollup_policies: Vec::new(),
2366        metrics_tenant_identity: None,
2367        metrics_namespace: None,
2368        // Queues manipulate messages via push/pop/ack — the row DML
2369        // paths never apply. Flag it as append_only so inadvertent
2370        // `UPDATE/DELETE FROM queue_name` statements fail loudly.
2371        append_only: true,
2372        subscriptions: Vec::new(),
2373        session_key: None,
2374        session_gap_ms: None,
2375        retention_duration_ms: None,
2376    }
2377}
2378
2379fn current_unix_ms() -> u128 {
2380    std::time::SystemTime::now()
2381        .duration_since(std::time::UNIX_EPOCH)
2382        .unwrap_or_default()
2383        .as_millis()
2384}
2385
2386pub(super) fn now_ns() -> u64 {
2387    std::time::SystemTime::now()
2388        .duration_since(std::time::UNIX_EPOCH)
2389        .unwrap_or_default()
2390        .as_nanos() as u64
2391}
2392
2393pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2394    Metadata::with_fields(
2395        [(
2396            "_ttl_ms".to_string(),
2397            if ttl_ms <= i64::MAX as u64 {
2398                MetadataValue::Int(ttl_ms as i64)
2399            } else {
2400                MetadataValue::Timestamp(ttl_ms)
2401            },
2402        )]
2403        .into_iter()
2404        .collect(),
2405    )
2406}
2407
2408/// Rough payload byte estimate for outbox watermark tracking.
2409fn estimate_payload_bytes(payload: &Value) -> u64 {
2410    match payload {
2411        Value::Json(v) => v.len() as u64,
2412        Value::Text(s) => s.len() as u64,
2413        _ => 64,
2414    }
2415}