Skip to main content

reddb_server/runtime/
impl_queue.rs

1//! Queue DDL and command execution
2
3use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::storage::queue::QueueMode;
9use crate::storage::unified::entity::{QueueMessageData, RowData};
10use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
11
12use super::*;
13
14// ---------------------------------------------------------------------------
15// Outbox metrics (exposed via /metrics)
16// ---------------------------------------------------------------------------
17
18/// Total event push attempts that failed (queue full or other error) and
19/// triggered DLQ routing.
20pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
21
22/// Total events routed to the dead-letter queue.
23pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
24
25/// Total events successfully enqueued to their target queue.
26pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
27
28/// Warn when total estimated outbox payload bytes exceed this value (1 GiB).
29const OUTBOX_WARN_BYTES: u64 = 1 << 30;
30
31/// Route all new events to DLQ when estimated outbox exceeds this value (10 GiB).
32const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
33
34/// Running estimate of bytes pending in event queues (approximate; not decremented on consume).
35static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
36
37const QUEUE_META_COLLECTION: &str = "red_queue_meta";
38const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
39const WORK_DEFAULT_GROUP: &str = "_work_default";
40const FANOUT_GROUP_PREFIX: &str = "_fanout_";
41
42#[derive(Debug, Clone)]
43pub(super) struct QueueRuntimeConfig {
44    pub(super) mode: QueueMode,
45    pub(super) priority: bool,
46    pub(super) max_size: Option<usize>,
47    pub(super) ttl_ms: Option<u64>,
48    pub(super) dlq: Option<String>,
49    pub(super) max_attempts: u32,
50    pub(super) lock_deadline_ms: u64,
51    pub(super) in_flight_cap_per_group: u32,
52}
53
54#[derive(Debug, Clone)]
55struct QueueGroupEntry {
56    entity_id: EntityId,
57    group: String,
58}
59
60#[derive(Debug, Clone)]
61pub(super) struct QueuePendingEntry {
62    pub(super) entity_id: EntityId,
63    group: String,
64    pub(super) message_id: EntityId,
65    consumer: String,
66    pub(super) delivered_at_ns: u64,
67    pub(super) delivery_count: u32,
68}
69
70#[derive(Debug, Clone)]
71pub(super) struct QueueAckEntry {
72    entity_id: EntityId,
73    group: String,
74    pub(super) message_id: EntityId,
75}
76
77#[derive(Debug, Clone)]
78pub(super) struct QueueMessageView {
79    pub(super) id: EntityId,
80    position: u64,
81    priority: i32,
82    pub(super) payload: Value,
83    attempts: u32,
84    pub(super) max_attempts: u32,
85    enqueued_at_ns: u64,
86}
87
88impl RedDBRuntime {
89    pub(crate) fn enqueue_event_payload(
90        &self,
91        queue: &str,
92        payload: Value,
93    ) -> RedDBResult<EntityId> {
94        let store = self.inner.db.store();
95        // Auto-create the queue if it does not exist yet.
96        if store.get_collection(queue).is_none() {
97            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
98        }
99
100        // Estimate payload bytes for outbox watermark checks.
101        let payload_bytes = estimate_payload_bytes(&payload);
102        let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
103
104        // Hard limit: route directly to DLQ without even trying.
105        if outbox_bytes > OUTBOX_MAX_BYTES {
106            OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
107            EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
108            return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
109        }
110
111        // Soft limit: warn once per crossing.
112        if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
113            tracing::warn!(
114                outbox_bytes,
115                warn_threshold = OUTBOX_WARN_BYTES,
116                "event outbox approaching capacity warning threshold"
117            );
118            crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
119                queue: queue.to_string(),
120                dlq: format!("{queue}_outbox_dlq"),
121                reason: "outbox_warn_bytes_exceeded".to_string(),
122            }
123            .emit_global();
124        }
125
126        let config = load_queue_config(store.as_ref(), queue);
127
128        // If the target queue has a max_size and is full, route to DLQ.
129        if let Some(max_size) = config.max_size {
130            let current_len = load_queue_message_views(store.as_ref(), queue)
131                .unwrap_or_default()
132                .len();
133            if current_len >= max_size {
134                OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
135                EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
136                return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
137            }
138            // Warn at 80% capacity.
139            if current_len * 10 >= max_size * 8 {
140                tracing::warn!(
141                    queue = %queue,
142                    size = current_len,
143                    max = max_size,
144                    "event target queue near capacity"
145                );
146            }
147        }
148
149        let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
150        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
151        Ok(id)
152    }
153
154    /// Route a failed event to `<queue>_outbox_dlq`, auto-creating it if needed.
155    fn route_event_to_outbox_dlq(
156        &self,
157        queue: &str,
158        payload: Value,
159        reason: &str,
160    ) -> RedDBResult<EntityId> {
161        let dlq_name = format!("{queue}_outbox_dlq");
162        EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
163
164        crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
165            queue: queue.to_string(),
166            dlq: dlq_name.clone(),
167            reason: reason.to_string(),
168        }
169        .emit_global();
170
171        let store = self.inner.db.store();
172        if store.get_collection(&dlq_name).is_none() {
173            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
174        }
175        let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
176        let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
177        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
178        Ok(id)
179    }
180
181    /// Low-level event message insert — no size checks, no DLQ routing.
182    fn enqueue_event_payload_raw(
183        &self,
184        store: &UnifiedStore,
185        queue: &str,
186        config: &QueueRuntimeConfig,
187        payload: Value,
188    ) -> RedDBResult<EntityId> {
189        let position = next_queue_position(store, queue, QueueSide::Right)?;
190        let mut entity = UnifiedEntity::new(
191            EntityId::new(0),
192            EntityKind::QueueMessage {
193                queue: queue.to_string(),
194                position,
195            },
196            EntityData::QueueMessage(QueueMessageData {
197                payload,
198                priority: None,
199                enqueued_at_ns: now_ns(),
200                attempts: 0,
201                max_attempts: config.max_attempts,
202                acked: false,
203            }),
204        );
205        if let Some(xid) = self.current_xid() {
206            entity.set_xmin(xid);
207        }
208        let id = store
209            .insert_auto(queue, entity)
210            .map_err(|err| RedDBError::Internal(err.to_string()))?;
211        if let Some(ttl_ms) = config.ttl_ms {
212            store
213                .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
214                .map_err(|err| RedDBError::Internal(err.to_string()))?;
215        }
216        self.invalidate_result_cache_for_table(queue);
217        Ok(id)
218    }
219
220    pub fn execute_create_queue(
221        &self,
222        raw_query: &str,
223        query: &CreateQueueQuery,
224    ) -> RedDBResult<RuntimeQueryResult> {
225        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
226        if query.dlq.as_deref() == Some(query.name.as_str()) {
227            return Err(RedDBError::Query(
228                "dead-letter queue must be different from the source queue".to_string(),
229            ));
230        }
231
232        let store = self.inner.db.store();
233        let exists = store.get_collection(&query.name).is_some();
234        if exists {
235            if query.if_not_exists {
236                return Ok(RuntimeQueryResult::ok_message(
237                    raw_query.to_string(),
238                    &format!("queue '{}' already exists", query.name),
239                    "create",
240                ));
241            }
242            return Err(RedDBError::Query(format!(
243                "queue '{}' already exists",
244                query.name
245            )));
246        }
247
248        store
249            .create_collection(&query.name)
250            .map_err(|err| RedDBError::Internal(err.to_string()))?;
251        if let Some(ttl_ms) = query.ttl_ms {
252            self.inner
253                .db
254                .set_collection_default_ttl_ms(&query.name, ttl_ms);
255        }
256        self.inner
257            .db
258            .save_collection_contract(queue_collection_contract(
259                &query.name,
260                query.priority,
261                query.ttl_ms,
262            ))
263            .map_err(|err| RedDBError::Internal(err.to_string()))?;
264        save_queue_config(
265            store.as_ref(),
266            &query.name,
267            &QueueRuntimeConfig {
268                mode: query.mode,
269                priority: query.priority,
270                max_size: query.max_size,
271                ttl_ms: query.ttl_ms,
272                dlq: query.dlq.clone(),
273                max_attempts: query.max_attempts,
274                lock_deadline_ms: query.lock_deadline_ms,
275                in_flight_cap_per_group: query.in_flight_cap_per_group,
276            },
277        )?;
278
279        if let Some(dlq) = &query.dlq {
280            if store.get_collection(dlq).is_none() {
281                store
282                    .create_collection(dlq)
283                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
284                self.inner
285                    .db
286                    .save_collection_contract(queue_collection_contract(dlq, false, None))
287                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
288            }
289        }
290
291        self.invalidate_result_cache();
292        self.inner
293            .db
294            .persist_metadata()
295            .map_err(|err| RedDBError::Internal(err.to_string()))?;
296        // Issue #120 — feed the queue into the schema-vocabulary so
297        // AskPipeline (#121) can resolve queue references. Queues
298        // have an opaque payload column, so we expose `payload` and
299        // (when configured) the DLQ partner as type-tag context.
300        let mut type_tags = Vec::new();
301        if let Some(dlq) = &query.dlq {
302            type_tags.push(format!("dlq:{}", dlq));
303        }
304        self.schema_vocabulary_apply(
305            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
306                collection: query.name.clone(),
307                columns: vec!["payload".to_string()],
308                type_tags,
309                description: None,
310            },
311        );
312
313        let mut msg = format!("queue '{}' created", query.name);
314        msg.push_str(&format!(" (mode={})", query.mode.as_str()));
315        if query.priority {
316            msg.push_str(" (priority)");
317        }
318        if let Some(max_size) = query.max_size {
319            msg.push_str(&format!(" (max_size={max_size})"));
320        }
321        if let Some(ttl_ms) = query.ttl_ms {
322            msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
323        }
324        if let Some(dlq) = &query.dlq {
325            msg.push_str(&format!(
326                " (dlq={dlq}, max_attempts={})",
327                query.max_attempts
328            ));
329        }
330
331        Ok(RuntimeQueryResult::ok_message(
332            raw_query.to_string(),
333            &msg,
334            "create",
335        ))
336    }
337
338    pub fn execute_alter_queue(
339        &self,
340        raw_query: &str,
341        query: &AlterQueueQuery,
342    ) -> RedDBResult<RuntimeQueryResult> {
343        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
344        let store = self.inner.db.store();
345        ensure_queue_exists(store.as_ref(), &query.name)?;
346
347        let mut config = load_queue_config(store.as_ref(), &query.name);
348        let mut summary: Vec<String> = Vec::new();
349
350        if let Some(new_mode) = query.mode {
351            let pending =
352                load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
353            if !pending.is_empty() {
354                tracing::warn!(
355                    queue = %query.name,
356                    pending_count = pending.len(),
357                    new_mode = %new_mode.as_str(),
358                    "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
359                     new reads use {}",
360                    pending.len(),
361                    new_mode.as_str(),
362                );
363            }
364            config.mode = new_mode;
365            summary.push(format!("mode={}", new_mode.as_str()));
366        }
367        if let Some(max_attempts) = query.max_attempts {
368            config.max_attempts = max_attempts;
369            summary.push(format!("max_attempts={max_attempts}"));
370        }
371        if let Some(lock_deadline_ms) = query.lock_deadline_ms {
372            config.lock_deadline_ms = lock_deadline_ms;
373            summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
374        }
375        if let Some(in_flight_cap) = query.in_flight_cap_per_group {
376            config.in_flight_cap_per_group = in_flight_cap;
377            summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
378        }
379        if let Some(dlq) = &query.dlq {
380            if dlq == &query.name {
381                return Err(RedDBError::Query(
382                    "dead-letter queue must be different from the source queue".to_string(),
383                ));
384            }
385            config.dlq = Some(dlq.clone());
386            summary.push(format!("dlq={dlq}"));
387        }
388
389        save_queue_config(store.as_ref(), &query.name, &config)?;
390
391        self.invalidate_result_cache();
392        self.inner
393            .db
394            .persist_metadata()
395            .map_err(|err| RedDBError::Internal(err.to_string()))?;
396
397        Ok(RuntimeQueryResult::ok_message(
398            raw_query.to_string(),
399            &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
400            "alter",
401        ))
402    }
403
404    pub fn execute_drop_queue(
405        &self,
406        raw_query: &str,
407        query: &DropQueueQuery,
408    ) -> RedDBResult<RuntimeQueryResult> {
409        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
410        let store = self.inner.db.store();
411        if super::impl_ddl::is_system_schema_name(&query.name) {
412            return Err(RedDBError::Query("system schema is read-only".to_string()));
413        }
414        if store.get_collection(&query.name).is_none() {
415            if query.if_exists {
416                return Ok(RuntimeQueryResult::ok_message(
417                    raw_query.to_string(),
418                    &format!("queue '{}' does not exist", query.name),
419                    "drop",
420                ));
421            }
422            return Err(RedDBError::NotFound(format!(
423                "queue '{}' not found",
424                query.name
425            )));
426        }
427        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
428            &query.name,
429            &self.inner.db.catalog_model_snapshot(),
430        )?;
431        crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
432            crate::catalog::CollectionModel::Queue,
433            actual,
434        )?;
435
436        store
437            .drop_collection(&query.name)
438            .map_err(|err| RedDBError::Internal(err.to_string()))?;
439        self.inner.db.clear_collection_default_ttl_ms(&query.name);
440        self.inner
441            .db
442            .remove_collection_contract(&query.name)
443            .map_err(|err| RedDBError::Internal(err.to_string()))?;
444        remove_queue_metadata(store.as_ref(), &query.name);
445        self.invalidate_result_cache();
446        self.inner
447            .db
448            .persist_metadata()
449            .map_err(|err| RedDBError::Internal(err.to_string()))?;
450        // Issue #120 — invalidate the schema-vocabulary entry.
451        self.schema_vocabulary_apply(
452            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
453                collection: query.name.clone(),
454            },
455        );
456
457        Ok(RuntimeQueryResult::ok_message(
458            raw_query.to_string(),
459            &format!("queue '{}' dropped", query.name),
460            "drop",
461        ))
462    }
463
464    pub fn execute_queue_command(
465        &self,
466        raw_query: &str,
467        cmd: &QueueCommand,
468    ) -> RedDBResult<RuntimeQueryResult> {
469        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
470        match cmd {
471            QueueCommand::Push {
472                queue,
473                value,
474                side,
475                priority,
476            } => {
477                let store = self.inner.db.store();
478                ensure_queue_exists(store.as_ref(), queue)?;
479                let config = load_queue_config(store.as_ref(), queue);
480                if priority.is_some() && !config.priority {
481                    return Err(RedDBError::Query(format!(
482                        "queue '{}' is not a priority queue",
483                        queue
484                    )));
485                }
486                if let Some(max_size) = config.max_size {
487                    let current_len =
488                        load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
489                            .len();
490                    if current_len >= max_size {
491                        return Err(RedDBError::Query(format!(
492                            "queue '{}' is full (max_size={max_size})",
493                            queue
494                        )));
495                    }
496                }
497
498                let position = next_queue_position(store.as_ref(), queue, *side)?;
499                let mut entity = UnifiedEntity::new(
500                    EntityId::new(0),
501                    EntityKind::QueueMessage {
502                        queue: queue.clone(),
503                        position,
504                    },
505                    EntityData::QueueMessage(QueueMessageData {
506                        payload: value.clone(),
507                        priority: if config.priority { *priority } else { None },
508                        enqueued_at_ns: now_ns(),
509                        attempts: 0,
510                        max_attempts: config.max_attempts,
511                        acked: false,
512                    }),
513                );
514                // Phase 1.1 MVCC universal: stamp xmin so other
515                // connections don't see this message until COMMIT.
516                if let Some(xid) = self.current_xid() {
517                    entity.set_xmin(xid);
518                }
519                let id = store
520                    .insert_auto(queue, entity)
521                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
522                if let Some(ttl_ms) = config.ttl_ms {
523                    store
524                        .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
525                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
526                }
527                self.invalidate_result_cache();
528
529                let mut result = UnifiedResult::with_columns(vec![
530                    "message_id".into(),
531                    "side".into(),
532                    "queue".into(),
533                ]);
534                let mut record = UnifiedRecord::new();
535                record.set("message_id", Value::text(message_id_string(id)));
536                record.set(
537                    "side",
538                    Value::text(match side {
539                        QueueSide::Left => "left".to_string(),
540                        QueueSide::Right => "right".to_string(),
541                    }),
542                );
543                record.set("queue", Value::text(queue.clone()));
544                result.push(record);
545
546                Ok(RuntimeQueryResult {
547                    query: raw_query.to_string(),
548                    mode: QueryMode::Sql,
549                    statement: "queue_push",
550                    engine: "runtime-queue",
551                    result,
552                    affected_rows: 1,
553                    statement_type: "insert",
554                })
555            }
556            QueueCommand::Pop { queue, side, count } => {
557                let store = self.inner.db.store();
558                ensure_queue_exists(store.as_ref(), queue)?;
559                let popped = super::queue_delivery::pop_messages(
560                    self,
561                    store.as_ref(),
562                    queue,
563                    *side,
564                    *count,
565                )?;
566
567                let mut result =
568                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
569                for message in &popped {
570                    let mut record = UnifiedRecord::new();
571                    record.set(
572                        "message_id",
573                        Value::text(message_id_string(message.message_id)),
574                    );
575                    record.set("payload", message.payload.clone());
576                    result.push(record);
577                }
578                let popped_count = popped.len() as u64;
579                if popped_count > 0 {
580                    self.invalidate_result_cache();
581                }
582
583                Ok(RuntimeQueryResult {
584                    query: raw_query.to_string(),
585                    mode: QueryMode::Sql,
586                    statement: "queue_pop",
587                    engine: "runtime-queue",
588                    result,
589                    affected_rows: popped_count,
590                    statement_type: "delete",
591                })
592            }
593            QueueCommand::Peek { queue, count } => {
594                let store = self.inner.db.store();
595                ensure_queue_exists(store.as_ref(), queue)?;
596                let messages =
597                    super::queue_delivery::peek_messages(self, store.as_ref(), queue, *count)?;
598
599                let mut result =
600                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
601                for message in messages {
602                    let mut record = UnifiedRecord::new();
603                    record.set(
604                        "message_id",
605                        Value::text(message_id_string(message.message_id)),
606                    );
607                    record.set("payload", message.payload);
608                    result.push(record);
609                }
610
611                Ok(RuntimeQueryResult {
612                    query: raw_query.to_string(),
613                    mode: QueryMode::Sql,
614                    statement: "queue_peek",
615                    engine: "runtime-queue",
616                    result,
617                    affected_rows: 0,
618                    statement_type: "select",
619                })
620            }
621            QueueCommand::Len { queue } => {
622                let store = self.inner.db.store();
623                ensure_queue_exists(store.as_ref(), queue)?;
624                let count =
625                    load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
626                        as u64;
627                let mut result = UnifiedResult::with_columns(vec!["len".into()]);
628                let mut record = UnifiedRecord::new();
629                record.set("len", Value::UnsignedInteger(count));
630                result.push(record);
631
632                Ok(RuntimeQueryResult {
633                    query: raw_query.to_string(),
634                    mode: QueryMode::Sql,
635                    statement: "queue_len",
636                    engine: "runtime-queue",
637                    result,
638                    affected_rows: 0,
639                    statement_type: "select",
640                })
641            }
642            QueueCommand::Purge { queue } => {
643                let store = self.inner.db.store();
644                ensure_queue_exists(store.as_ref(), queue)?;
645                let count = super::queue_delivery::purge_messages(self, store.as_ref(), queue)?;
646                if count > 0 {
647                    self.invalidate_result_cache();
648                }
649
650                Ok(RuntimeQueryResult::ok_message(
651                    raw_query.to_string(),
652                    &format!("{count} messages purged from queue '{queue}'"),
653                    "delete",
654                ))
655            }
656            QueueCommand::GroupCreate { queue, group } => {
657                let store = self.inner.db.store();
658                ensure_queue_exists(store.as_ref(), queue)?;
659                if queue_group_exists(store.as_ref(), queue, group)? {
660                    return Ok(RuntimeQueryResult::ok_message(
661                        raw_query.to_string(),
662                        &format!(
663                            "consumer group '{}' already exists on queue '{}'",
664                            group, queue
665                        ),
666                        "create",
667                    ));
668                }
669                save_queue_group(store.as_ref(), queue, group)?;
670                self.invalidate_result_cache();
671
672                Ok(RuntimeQueryResult::ok_message(
673                    raw_query.to_string(),
674                    &format!("consumer group '{}' created on queue '{}'", group, queue),
675                    "create",
676                ))
677            }
678            QueueCommand::GroupRead {
679                queue,
680                group,
681                consumer,
682                count,
683            } => {
684                let store = self.inner.db.store();
685                ensure_queue_exists(store.as_ref(), queue)?;
686                let delivered = super::queue_delivery::read_messages(
687                    self,
688                    store.as_ref(),
689                    queue,
690                    group.as_deref(),
691                    consumer,
692                    *count,
693                )?;
694
695                let mut result = UnifiedResult::with_columns(vec![
696                    "message_id".into(),
697                    "payload".into(),
698                    "consumer".into(),
699                    "delivery_count".into(),
700                    "attempts".into(),
701                ]);
702
703                for message in delivered {
704                    let mut record = UnifiedRecord::new();
705                    record.set(
706                        "message_id",
707                        Value::text(message_id_string(message.message_id)),
708                    );
709                    record.set("payload", message.payload);
710                    record.set("consumer", Value::text(message.consumer));
711                    record.set(
712                        "delivery_count",
713                        Value::UnsignedInteger(u64::from(message.delivery_count)),
714                    );
715                    record.set(
716                        "attempts",
717                        Value::UnsignedInteger(u64::from(message.delivery_count)),
718                    );
719                    result.push(record);
720                }
721                if !result.records.is_empty() {
722                    self.invalidate_result_cache();
723                }
724
725                Ok(RuntimeQueryResult {
726                    query: raw_query.to_string(),
727                    mode: QueryMode::Sql,
728                    statement: "queue_group_read",
729                    engine: "runtime-queue",
730                    result,
731                    affected_rows: 0,
732                    statement_type: "select",
733                })
734            }
735            QueueCommand::Pending { queue, group } => {
736                let store = self.inner.db.store();
737                ensure_queue_exists(store.as_ref(), queue)?;
738                require_queue_group(store.as_ref(), queue, group)?;
739                let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
740                pending.sort_by_key(|entry| entry.delivered_at_ns);
741                let current_time_ns = now_ns();
742
743                let mut result = UnifiedResult::with_columns(vec![
744                    "message_id".into(),
745                    "consumer".into(),
746                    "delivered_at_ns".into(),
747                    "delivery_count".into(),
748                    "idle_ms".into(),
749                ]);
750                for entry in pending {
751                    let mut record = UnifiedRecord::new();
752                    record.set(
753                        "message_id",
754                        Value::text(message_id_string(entry.message_id)),
755                    );
756                    record.set("consumer", Value::text(entry.consumer));
757                    record.set(
758                        "delivered_at_ns",
759                        Value::UnsignedInteger(entry.delivered_at_ns),
760                    );
761                    record.set(
762                        "delivery_count",
763                        Value::UnsignedInteger(u64::from(entry.delivery_count)),
764                    );
765                    record.set(
766                        "idle_ms",
767                        Value::UnsignedInteger(
768                            current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
769                        ),
770                    );
771                    result.push(record);
772                }
773
774                Ok(RuntimeQueryResult {
775                    query: raw_query.to_string(),
776                    mode: QueryMode::Sql,
777                    statement: "queue_pending",
778                    engine: "runtime-queue",
779                    result,
780                    affected_rows: 0,
781                    statement_type: "select",
782                })
783            }
784            QueueCommand::Claim {
785                queue,
786                group,
787                consumer,
788                min_idle_ms,
789            } => {
790                let store = self.inner.db.store();
791                ensure_queue_exists(store.as_ref(), queue)?;
792                let delivered = super::queue_delivery::claim_messages(
793                    self,
794                    store.as_ref(),
795                    queue,
796                    group,
797                    consumer,
798                    *min_idle_ms,
799                )?;
800
801                let mut result = UnifiedResult::with_columns(vec![
802                    "message_id".into(),
803                    "payload".into(),
804                    "consumer".into(),
805                    "delivery_count".into(),
806                ]);
807
808                for message in delivered {
809                    let mut record = UnifiedRecord::new();
810                    record.set(
811                        "message_id",
812                        Value::text(message_id_string(message.message_id)),
813                    );
814                    record.set("payload", message.payload);
815                    record.set("consumer", Value::text(message.consumer));
816                    record.set(
817                        "delivery_count",
818                        Value::UnsignedInteger(u64::from(message.delivery_count)),
819                    );
820                    result.push(record);
821                }
822                if !result.records.is_empty() {
823                    self.invalidate_result_cache();
824                }
825                let affected_rows = result.records.len() as u64;
826
827                Ok(RuntimeQueryResult {
828                    query: raw_query.to_string(),
829                    mode: QueryMode::Sql,
830                    statement: "queue_claim",
831                    engine: "runtime-queue",
832                    result,
833                    affected_rows,
834                    statement_type: "update",
835                })
836            }
837            QueueCommand::Ack {
838                queue,
839                group,
840                message_id,
841            } => {
842                let store = self.inner.db.store();
843                ensure_queue_exists(store.as_ref(), queue)?;
844                require_queue_group(store.as_ref(), queue, group)?;
845                let message_id = parse_message_id(message_id)?;
846                let config = load_queue_config(store.as_ref(), queue);
847                super::queue_delivery::ack_message(
848                    self,
849                    store.as_ref(),
850                    queue,
851                    group,
852                    message_id,
853                    &config,
854                )?;
855                self.invalidate_result_cache();
856
857                Ok(RuntimeQueryResult::ok_message(
858                    raw_query.to_string(),
859                    "message acknowledged",
860                    "update",
861                ))
862            }
863            QueueCommand::Nack {
864                queue,
865                group,
866                message_id,
867            } => {
868                let store = self.inner.db.store();
869                ensure_queue_exists(store.as_ref(), queue)?;
870                require_queue_group(store.as_ref(), queue, group)?;
871                let message_id = parse_message_id(message_id)?;
872                let config = load_queue_config(store.as_ref(), queue);
873                let message = match super::queue_delivery::nack_message(
874                    self,
875                    store.as_ref(),
876                    queue,
877                    group,
878                    message_id,
879                    &config,
880                )? {
881                    super::queue_delivery::NackOutcome::Requeued => "message requeued".to_string(),
882                    super::queue_delivery::NackOutcome::MovedToDlq(dlq) => {
883                        format!("message moved to dead-letter queue '{}'", dlq)
884                    }
885                    super::queue_delivery::NackOutcome::Dropped => {
886                        "message dropped after max attempts".to_string()
887                    }
888                };
889                self.invalidate_result_cache();
890
891                Ok(RuntimeQueryResult::ok_message(
892                    raw_query.to_string(),
893                    &message,
894                    "update",
895                ))
896            }
897            QueueCommand::Move {
898                source,
899                destination,
900                filter,
901                limit,
902            } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
903        }
904    }
905
906    pub fn execute_queue_select(
907        &self,
908        raw_query: &str,
909        query: &QueueSelectQuery,
910    ) -> RedDBResult<RuntimeQueryResult> {
911        let store = self.inner.db.store();
912        ensure_queue_exists(store.as_ref(), &query.queue)?;
913        let config = load_queue_config(store.as_ref(), &query.queue);
914        let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
915        let columns = if query.columns.is_empty() {
916            queue_projection_default_columns()
917        } else {
918            query.columns.clone()
919        };
920
921        let mut messages =
922            load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
923        sort_queue_messages(&mut messages, &config, QueueSide::Left);
924
925        let mut result = UnifiedResult::with_columns(columns.clone());
926        for message in messages {
927            if query
928                .filter
929                .as_ref()
930                .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
931            {
932                continue;
933            }
934            let record = queue_projection_record(&columns, &message, dlq)?;
935            result.push(record);
936            if query
937                .limit
938                .is_some_and(|limit| result.records.len() >= limit as usize)
939            {
940                break;
941            }
942        }
943
944        Ok(RuntimeQueryResult {
945            query: raw_query.to_string(),
946            mode: QueryMode::Sql,
947            statement: "queue_select",
948            engine: "runtime-queue",
949            result,
950            affected_rows: 0,
951            statement_type: "select",
952        })
953    }
954
955    fn execute_queue_move(
956        &self,
957        raw_query: &str,
958        source: &str,
959        destination: &str,
960        filter: Option<&Filter>,
961        limit: usize,
962    ) -> RedDBResult<RuntimeQueryResult> {
963        if source == destination {
964            return Err(RedDBError::Query(
965                "QUEUE MOVE source and destination must be different".to_string(),
966            ));
967        }
968        let store = self.inner.db.store();
969        ensure_queue_exists(store.as_ref(), source)?;
970        ensure_queue_exists(store.as_ref(), destination)?;
971        let source_config = load_queue_config(store.as_ref(), source);
972        let destination_config = load_queue_config(store.as_ref(), destination);
973        let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
974
975        let mut messages =
976            load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
977        sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
978        let selected = messages
979            .into_iter()
980            .filter(|message| {
981                filter
982                    .map(|f| queue_message_matches_filter(message, source_dlq, f))
983                    .unwrap_or(true)
984            })
985            .take(limit)
986            .collect::<Vec<_>>();
987
988        if let Some(max_size) = destination_config.max_size {
989            let current_len =
990                load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
991                    .len();
992            if current_len + selected.len() > max_size {
993                return Err(RedDBError::Query(format!(
994                    "queue '{}' is full (max_size={max_size})",
995                    destination
996                )));
997            }
998        }
999
1000        for message in &selected {
1001            let lock = queue_message_lock_handle(self, source, message.id);
1002            let Some(_guard) = lock.try_lock() else {
1003                return Err(RedDBError::Query(format!(
1004                    "message '{}' is locked on queue '{}'",
1005                    message.id.raw(),
1006                    source
1007                )));
1008            };
1009            if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1010                return Err(RedDBError::Query(format!(
1011                    "message '{}' is no longer available on queue '{}'",
1012                    message.id.raw(),
1013                    source
1014                )));
1015            }
1016        }
1017
1018        let mut inserted = Vec::new();
1019        for message in &selected {
1020            match insert_moved_queue_message(
1021                store.as_ref(),
1022                destination,
1023                &destination_config,
1024                message,
1025            ) {
1026                Ok(id) => inserted.push(id),
1027                Err(err) => {
1028                    for id in inserted {
1029                        let _ = store.delete(destination, id);
1030                    }
1031                    return Err(err);
1032                }
1033            }
1034        }
1035
1036        for message in &selected {
1037            super::queue_delivery::delete_message_with_state(
1038                Some(self),
1039                store.as_ref(),
1040                source,
1041                message.id,
1042            )?;
1043        }
1044        if !selected.is_empty() {
1045            self.invalidate_result_cache();
1046        }
1047
1048        let selected_count = selected.len() as u64;
1049        self.audit_log().record_event(
1050            AuditEvent::builder("queue/move")
1051                .source(AuditAuthSource::System)
1052                .outcome(Outcome::Success)
1053                .resource(format!("queue:{source}->{destination}"))
1054                .fields([
1055                    AuditFieldEscaper::field("source", source),
1056                    AuditFieldEscaper::field("destination", destination),
1057                    AuditFieldEscaper::field("selected", selected_count),
1058                    AuditFieldEscaper::field("committed", selected_count),
1059                ])
1060                .build(),
1061        );
1062
1063        let mut result = UnifiedResult::with_columns(vec![
1064            "source".into(),
1065            "destination".into(),
1066            "selected".into(),
1067            "committed".into(),
1068        ]);
1069        let mut record = UnifiedRecord::new();
1070        record.set("source", Value::text(source.to_string()));
1071        record.set("destination", Value::text(destination.to_string()));
1072        record.set("selected", Value::UnsignedInteger(selected_count));
1073        record.set("committed", Value::UnsignedInteger(selected_count));
1074        result.push(record);
1075
1076        Ok(RuntimeQueryResult {
1077            query: raw_query.to_string(),
1078            mode: QueryMode::Sql,
1079            statement: "queue_move",
1080            engine: "runtime-queue",
1081            result,
1082            affected_rows: selected_count,
1083            statement_type: "update",
1084        })
1085    }
1086}
1087
1088fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1089    if store.get_collection(queue).is_some() {
1090        Ok(())
1091    } else {
1092        Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1093    }
1094}
1095
1096pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1097    let default = QueueRuntimeConfig {
1098        mode: QueueMode::Work,
1099        priority: false,
1100        max_size: None,
1101        ttl_ms: None,
1102        dlq: None,
1103        max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1104        lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1105        in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1106    };
1107
1108    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1109        return default;
1110    };
1111    manager
1112        .query_all(|entity| {
1113            entity.data.as_row().is_some_and(|row| {
1114                row_text(row, "kind").as_deref() == Some("queue_config")
1115                    && row_text(row, "queue").as_deref() == Some(queue)
1116            })
1117        })
1118        .into_iter()
1119        .find_map(|entity| {
1120            let row = entity.data.as_row()?;
1121            Some(QueueRuntimeConfig {
1122                mode: row_text(row, "mode")
1123                    .as_deref()
1124                    .and_then(QueueMode::parse)
1125                    .unwrap_or_default(),
1126                priority: row_bool(row, "priority").unwrap_or(false),
1127                max_size: row_u64(row, "max_size").map(|value| value as usize),
1128                ttl_ms: row_u64(row, "ttl_ms"),
1129                dlq: row_text(row, "dlq"),
1130                max_attempts: row_u64(row, "max_attempts")
1131                    .map(|value| value as u32)
1132                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1133                lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1134                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1135                in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1136                    .map(|value| value as u32)
1137                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1138            })
1139        })
1140        .unwrap_or(default)
1141}
1142
1143pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1144    load_queue_config(store, queue).mode.as_str()
1145}
1146
1147fn save_queue_config(
1148    store: &UnifiedStore,
1149    queue: &str,
1150    config: &QueueRuntimeConfig,
1151) -> RedDBResult<()> {
1152    remove_meta_rows(store, |row| {
1153        row_text(row, "kind").as_deref() == Some("queue_config")
1154            && row_text(row, "queue").as_deref() == Some(queue)
1155    });
1156
1157    let mut fields = HashMap::new();
1158    fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1159    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1160    fields.insert(
1161        "mode".to_string(),
1162        Value::text(config.mode.as_str().to_string()),
1163    );
1164    fields.insert("priority".to_string(), Value::Boolean(config.priority));
1165    fields.insert(
1166        "max_size".to_string(),
1167        config
1168            .max_size
1169            .map(|value| Value::UnsignedInteger(value as u64))
1170            .unwrap_or(Value::Null),
1171    );
1172    fields.insert(
1173        "ttl_ms".to_string(),
1174        config
1175            .ttl_ms
1176            .map(Value::UnsignedInteger)
1177            .unwrap_or(Value::Null),
1178    );
1179    fields.insert(
1180        "dlq".to_string(),
1181        config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1182    );
1183    fields.insert(
1184        "max_attempts".to_string(),
1185        Value::UnsignedInteger(u64::from(config.max_attempts)),
1186    );
1187    fields.insert(
1188        "lock_deadline_ms".to_string(),
1189        Value::UnsignedInteger(config.lock_deadline_ms),
1190    );
1191    fields.insert(
1192        "in_flight_cap_per_group".to_string(),
1193        Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1194    );
1195    insert_meta_row(store, fields)
1196}
1197
1198fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1199    remove_meta_rows(store, |row| {
1200        row_text(row, "queue").as_deref() == Some(queue)
1201    });
1202}
1203
1204fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1205    Ok(load_queue_groups(store, queue)?
1206        .into_iter()
1207        .any(|entry| entry.group == group))
1208}
1209
1210pub(super) fn require_queue_group(
1211    store: &UnifiedStore,
1212    queue: &str,
1213    group: &str,
1214) -> RedDBResult<()> {
1215    if queue_group_exists(store, queue, group)? {
1216        Ok(())
1217    } else {
1218        Err(RedDBError::NotFound(format!(
1219            "consumer group '{}' not found on queue '{}'",
1220            group, queue
1221        )))
1222    }
1223}
1224
1225pub(super) fn resolve_read_group(
1226    store: &UnifiedStore,
1227    queue: &str,
1228    group: Option<&str>,
1229    consumer: &str,
1230    config: &QueueRuntimeConfig,
1231) -> RedDBResult<String> {
1232    if let Some(group) = group {
1233        require_queue_group(store, queue, group)?;
1234        return Ok(group.to_string());
1235    }
1236
1237    match config.mode {
1238        QueueMode::Work => {
1239            if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1240                save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1241            }
1242            Ok(WORK_DEFAULT_GROUP.to_string())
1243        }
1244        QueueMode::Fanout => {
1245            let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1246            if !queue_group_exists(store, queue, &fanout_group)? {
1247                save_queue_group(store, queue, &fanout_group)?;
1248            }
1249            Ok(fanout_group)
1250        }
1251    }
1252}
1253
1254fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1255    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1256        return Ok(Vec::new());
1257    };
1258    Ok(manager
1259        .query_all(|entity| {
1260            entity.data.as_row().is_some_and(|row| {
1261                row_text(row, "kind").as_deref() == Some("queue_group")
1262                    && row_text(row, "queue").as_deref() == Some(queue)
1263            })
1264        })
1265        .into_iter()
1266        .filter_map(|entity| {
1267            let row = entity.data.as_row()?;
1268            Some(QueueGroupEntry {
1269                entity_id: entity.id,
1270                group: row_text(row, "group")?,
1271            })
1272        })
1273        .collect())
1274}
1275
1276fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1277    let mut fields = HashMap::new();
1278    fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1279    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1280    fields.insert("group".to_string(), Value::text(group.to_string()));
1281    fields.insert(
1282        "created_at_ns".to_string(),
1283        Value::UnsignedInteger(now_ns()),
1284    );
1285    insert_meta_row(store, fields)
1286}
1287
1288pub(super) fn load_pending_entries(
1289    store: &UnifiedStore,
1290    queue: &str,
1291    group: Option<&str>,
1292    message_id: Option<EntityId>,
1293) -> RedDBResult<Vec<QueuePendingEntry>> {
1294    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1295        return Ok(Vec::new());
1296    };
1297    Ok(manager
1298        .query_all(|entity| {
1299            entity.data.as_row().is_some_and(|row| {
1300                row_text(row, "kind").as_deref() == Some("queue_pending")
1301                    && row_text(row, "queue").as_deref() == Some(queue)
1302                    && group
1303                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1304                        .unwrap_or(true)
1305                    && message_id
1306                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1307                        .unwrap_or(true)
1308            })
1309        })
1310        .into_iter()
1311        .filter_map(|entity| {
1312            let row = entity.data.as_row()?;
1313            Some(QueuePendingEntry {
1314                entity_id: entity.id,
1315                group: row_text(row, "group")?,
1316                message_id: EntityId::new(row_u64(row, "message_id")?),
1317                consumer: row_text(row, "consumer")?,
1318                delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1319                delivery_count: row_u64(row, "delivery_count")
1320                    .map(|value| value as u32)
1321                    .unwrap_or(1),
1322            })
1323        })
1324        .collect())
1325}
1326
1327pub(super) fn save_queue_pending(
1328    store: &UnifiedStore,
1329    queue: &str,
1330    group: &str,
1331    message_id: EntityId,
1332    consumer: &str,
1333    delivered_at_ns: u64,
1334    delivery_count: u32,
1335) -> RedDBResult<()> {
1336    remove_meta_rows(store, |row| {
1337        row_text(row, "kind").as_deref() == Some("queue_pending")
1338            && row_text(row, "queue").as_deref() == Some(queue)
1339            && row_text(row, "group").as_deref() == Some(group)
1340            && row_u64(row, "message_id") == Some(message_id.raw())
1341    });
1342
1343    let mut fields = HashMap::new();
1344    fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1345    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1346    fields.insert("group".to_string(), Value::text(group.to_string()));
1347    fields.insert(
1348        "message_id".to_string(),
1349        Value::UnsignedInteger(message_id.raw()),
1350    );
1351    fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1352    fields.insert(
1353        "delivered_at_ns".to_string(),
1354        Value::UnsignedInteger(delivered_at_ns),
1355    );
1356    fields.insert(
1357        "delivery_count".to_string(),
1358        Value::UnsignedInteger(u64::from(delivery_count)),
1359    );
1360    insert_meta_row(store, fields)
1361}
1362
1363pub(super) fn require_pending_entry(
1364    store: &UnifiedStore,
1365    queue: &str,
1366    group: &str,
1367    message_id: EntityId,
1368) -> RedDBResult<QueuePendingEntry> {
1369    load_pending_entries(store, queue, Some(group), Some(message_id))?
1370        .into_iter()
1371        .next()
1372        .ok_or_else(|| {
1373            RedDBError::NotFound(format!(
1374                "message '{}' is not pending in group '{}' on queue '{}'",
1375                message_id.raw(),
1376                group,
1377                queue
1378            ))
1379        })
1380}
1381
1382pub(super) fn load_ack_entries(
1383    store: &UnifiedStore,
1384    queue: &str,
1385    group: Option<&str>,
1386    message_id: Option<EntityId>,
1387) -> RedDBResult<Vec<QueueAckEntry>> {
1388    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1389        return Ok(Vec::new());
1390    };
1391    Ok(manager
1392        .query_all(|entity| {
1393            entity.data.as_row().is_some_and(|row| {
1394                row_text(row, "kind").as_deref() == Some("queue_ack")
1395                    && row_text(row, "queue").as_deref() == Some(queue)
1396                    && group
1397                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1398                        .unwrap_or(true)
1399                    && message_id
1400                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1401                        .unwrap_or(true)
1402            })
1403        })
1404        .into_iter()
1405        .filter_map(|entity| {
1406            let row = entity.data.as_row()?;
1407            Some(QueueAckEntry {
1408                entity_id: entity.id,
1409                group: row_text(row, "group")?,
1410                message_id: EntityId::new(row_u64(row, "message_id")?),
1411            })
1412        })
1413        .collect())
1414}
1415
1416pub(super) fn save_queue_ack(
1417    store: &UnifiedStore,
1418    queue: &str,
1419    group: &str,
1420    message_id: EntityId,
1421) -> RedDBResult<()> {
1422    let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1423    if !existing.is_empty() {
1424        return Ok(());
1425    }
1426
1427    let mut fields = HashMap::new();
1428    fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1429    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1430    fields.insert("group".to_string(), Value::text(group.to_string()));
1431    fields.insert(
1432        "message_id".to_string(),
1433        Value::UnsignedInteger(message_id.raw()),
1434    );
1435    fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1436    insert_meta_row(store, fields)
1437}
1438
1439pub(super) fn queue_message_completed_for_all_groups(
1440    store: &UnifiedStore,
1441    queue: &str,
1442    message_id: EntityId,
1443) -> RedDBResult<bool> {
1444    let groups = load_queue_groups(store, queue)?;
1445    let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1446    if !pending.is_empty() {
1447        return Ok(false);
1448    }
1449    if groups.is_empty() {
1450        return Ok(true);
1451    }
1452
1453    let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1454        .into_iter()
1455        .map(|entry| entry.group)
1456        .collect::<HashSet<_>>();
1457    Ok(groups
1458        .into_iter()
1459        .all(|group| acked_groups.contains(&group.group)))
1460}
1461
1462fn load_queue_message_views(
1463    store: &UnifiedStore,
1464    queue: &str,
1465) -> RedDBResult<Vec<QueueMessageView>> {
1466    load_queue_message_views_with_runtime(None, store, queue)
1467}
1468
1469/// Kind-aware queue scan (Phase 2.5.5 RLS universal). When the
1470/// caller has a `RedDBRuntime` reference, the gate also applies
1471/// any `CREATE POLICY ... ON MESSAGES OF <queue>` predicate. In
1472/// autocommit / embedded paths that only have the raw store (e.g.
1473/// purge loops) we skip RLS because there's no session identity
1474/// to match against.
1475pub(super) fn load_queue_message_views_with_runtime(
1476    runtime: Option<&RedDBRuntime>,
1477    store: &UnifiedStore,
1478    queue: &str,
1479) -> RedDBResult<Vec<QueueMessageView>> {
1480    let manager = store
1481        .get_collection(queue)
1482        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1483    // Phase 1.2 MVCC universal: capture before parallel scan. Messages
1484    // inserted by another connection's open txn stay invisible to
1485    // consumers until that txn commits (prevents phantom POPs).
1486    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1487    let rls_filter = runtime.and_then(|rt| {
1488        crate::runtime::impl_core::rls_policy_filter_for_kind(
1489            rt,
1490            queue,
1491            crate::storage::query::ast::PolicyAction::Select,
1492            crate::storage::query::ast::PolicyTargetKind::Messages,
1493        )
1494    });
1495    let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1496        && rls_filter.is_none()
1497        && runtime.is_some();
1498    if rls_enabled_but_denied {
1499        // RLS on + no Messages policy for this role = deny-default.
1500        return Ok(Vec::new());
1501    }
1502    let filter_arc = rls_filter.map(std::sync::Arc::new);
1503    let rt_arc = runtime;
1504    Ok(manager
1505        .query_all(move |entity| {
1506            if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1507                return false;
1508            }
1509            if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1510                return false;
1511            }
1512            if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1513                return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1514                    Some(&rt.inner.db),
1515                    entity,
1516                    filter,
1517                    queue,
1518                    queue,
1519                );
1520            }
1521            true
1522        })
1523        .into_iter()
1524        .filter_map(queue_message_view_from_entity)
1525        .collect())
1526}
1527
1528fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1529    let (position, _) = match &entity.kind {
1530        EntityKind::QueueMessage { position, queue } => (*position, queue),
1531        _ => return None,
1532    };
1533    let data = match entity.data {
1534        EntityData::QueueMessage(data) => data,
1535        _ => return None,
1536    };
1537    Some(QueueMessageView {
1538        id: entity.id,
1539        position,
1540        priority: data.priority.unwrap_or(0),
1541        payload: data.payload,
1542        attempts: data.attempts,
1543        max_attempts: data.max_attempts,
1544        enqueued_at_ns: data.enqueued_at_ns,
1545    })
1546}
1547
1548fn insert_moved_queue_message(
1549    store: &UnifiedStore,
1550    queue: &str,
1551    config: &QueueRuntimeConfig,
1552    message: &QueueMessageView,
1553) -> RedDBResult<EntityId> {
1554    let position = next_queue_position(store, queue, QueueSide::Right)?;
1555    let entity = UnifiedEntity::new(
1556        EntityId::new(0),
1557        EntityKind::QueueMessage {
1558            queue: queue.to_string(),
1559            position,
1560        },
1561        EntityData::QueueMessage(QueueMessageData {
1562            payload: message.payload.clone(),
1563            priority: if config.priority {
1564                Some(message.priority)
1565            } else {
1566                None
1567            },
1568            enqueued_at_ns: message.enqueued_at_ns,
1569            attempts: message.attempts,
1570            max_attempts: message.max_attempts,
1571            acked: false,
1572        }),
1573    );
1574    let id = store
1575        .insert_auto(queue, entity)
1576        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1577    if let Some(ttl_ms) = config.ttl_ms {
1578        store
1579            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1580            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1581    }
1582    Ok(id)
1583}
1584
1585fn queue_projection_default_columns() -> Vec<String> {
1586    [
1587        "id",
1588        "payload",
1589        "priority",
1590        "attempts",
1591        "last_error",
1592        "enqueued_at",
1593        "available_at",
1594        "dlq",
1595        "tenant",
1596    ]
1597    .into_iter()
1598    .map(str::to_string)
1599    .collect()
1600}
1601
1602fn queue_projection_record(
1603    columns: &[String],
1604    message: &QueueMessageView,
1605    dlq: bool,
1606) -> RedDBResult<UnifiedRecord> {
1607    let mut record = UnifiedRecord::new();
1608    for column in columns {
1609        let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
1610            RedDBError::Query(format!("unknown queue projection column '{}'", column))
1611        })?;
1612        record.set(column, value);
1613    }
1614    Ok(record)
1615}
1616
1617fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
1618    match column {
1619        "id" => Some(Value::text(message_id_string(message.id))),
1620        "payload" => Some(message.payload.clone()),
1621        "priority" => Some(Value::Integer(i64::from(message.priority))),
1622        "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
1623        "last_error" => Some(Value::Null),
1624        "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1625        "available_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1626        "dlq" => Some(Value::Boolean(dlq)),
1627        "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
1628        _ => None,
1629    }
1630}
1631
1632fn queue_message_tenant(payload: &Value) -> Option<Value> {
1633    let Value::Json(bytes) = payload else {
1634        return None;
1635    };
1636    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1637    json.get("tenant")
1638        .and_then(crate::json::Value::as_str)
1639        .map(|tenant| Value::text(tenant.to_string()))
1640}
1641
1642fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
1643    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1644        return false;
1645    };
1646    !manager
1647        .query_all(|entity| {
1648            entity.data.as_row().is_some_and(|row| {
1649                row_text(row, "kind").as_deref() == Some("queue_config")
1650                    && row_text(row, "dlq").as_deref() == Some(queue)
1651            })
1652        })
1653        .is_empty()
1654}
1655
1656fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
1657    match filter {
1658        Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
1659            .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
1660        Filter::CompareFields { left, op, right } => {
1661            match (
1662                queue_filter_field_value(message, dlq, left),
1663                queue_filter_field_value(message, dlq, right),
1664            ) {
1665                (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
1666                _ => false,
1667            }
1668        }
1669        Filter::And(left, right) => {
1670            queue_message_matches_filter(message, dlq, left)
1671                && queue_message_matches_filter(message, dlq, right)
1672        }
1673        Filter::Or(left, right) => {
1674            queue_message_matches_filter(message, dlq, left)
1675                || queue_message_matches_filter(message, dlq, right)
1676        }
1677        Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
1678        Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
1679            .is_none_or(|value| matches!(value, Value::Null)),
1680        Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
1681            .is_some_and(|value| !matches!(value, Value::Null)),
1682        Filter::In { field, values } => {
1683            queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
1684                values
1685                    .iter()
1686                    .any(|value| queue_values_equal(&candidate, value))
1687            })
1688        }
1689        Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
1690            .is_some_and(|candidate| {
1691                queue_compare_values(&candidate, low, CompareOp::Ge)
1692                    && queue_compare_values(&candidate, high, CompareOp::Le)
1693            }),
1694        Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
1695            .is_some_and(|value| queue_like_matches(&value, pattern)),
1696        Filter::StartsWith { field, prefix } => {
1697            queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
1698        }
1699        Filter::EndsWith { field, suffix } => {
1700            queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
1701        }
1702        Filter::Contains { field, substring } => {
1703            queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
1704        }
1705        Filter::CompareExpr { .. } => false,
1706    }
1707}
1708
1709fn queue_filter_field_value(
1710    message: &QueueMessageView,
1711    dlq: bool,
1712    field: &FieldRef,
1713) -> Option<Value> {
1714    match field {
1715        FieldRef::TableColumn { table, column } if table.is_empty() => {
1716            queue_projection_value(message, dlq, column)
1717                .or_else(|| queue_payload_field_value(&message.payload, column))
1718        }
1719        FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
1720            .or_else(|| queue_payload_field_value(&message.payload, column)),
1721        _ => None,
1722    }
1723}
1724
1725fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
1726    let Value::Json(bytes) = payload else {
1727        return None;
1728    };
1729    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1730    let value = json.get(field)?;
1731    json_value_to_schema_value(value)
1732}
1733
1734fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
1735    if matches!(value, crate::json::Value::Null) {
1736        Some(Value::Null)
1737    } else if let Some(value) = value.as_bool() {
1738        Some(Value::Boolean(value))
1739    } else if let Some(value) = value.as_i64() {
1740        Some(Value::Integer(value))
1741    } else if let Some(value) = value.as_u64() {
1742        Some(Value::UnsignedInteger(value))
1743    } else if let Some(value) = value.as_f64() {
1744        Some(Value::Float(value))
1745    } else if let Some(value) = value.as_str() {
1746        Some(Value::text(value.to_string()))
1747    } else {
1748        Some(Value::Json(value.to_string_compact().into_bytes()))
1749    }
1750}
1751
1752fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
1753    queue_filter_field_value(message, dlq, field).and_then(|value| match value {
1754        Value::Text(value) => Some(value.to_string()),
1755        Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
1756        Value::Integer(value) => Some(value.to_string()),
1757        Value::UnsignedInteger(value) => Some(value.to_string()),
1758        Value::Float(value) => Some(value.to_string()),
1759        Value::Boolean(value) => Some(value.to_string()),
1760        _ => None,
1761    })
1762}
1763
1764fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
1765    match op {
1766        CompareOp::Eq => queue_values_equal(left, right),
1767        CompareOp::Ne => !queue_values_equal(left, right),
1768        CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
1769        CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
1770        CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
1771        CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
1772    }
1773}
1774
1775fn queue_values_equal(left: &Value, right: &Value) -> bool {
1776    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1777        return (left - right).abs() < f64::EPSILON;
1778    }
1779    match (left, right) {
1780        (Value::Text(left), Value::Text(right)) => left == right,
1781        (Value::Boolean(left), Value::Boolean(right)) => left == right,
1782        _ => left == right,
1783    }
1784}
1785
1786fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
1787    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1788        return left.partial_cmp(&right);
1789    }
1790    match (left, right) {
1791        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1792        _ => None,
1793    }
1794}
1795
1796fn queue_value_number(value: &Value) -> Option<f64> {
1797    match value {
1798        Value::Integer(value) => Some(*value as f64),
1799        Value::UnsignedInteger(value) => Some(*value as f64),
1800        Value::Float(value) => Some(*value),
1801        Value::Text(value) => value.parse().ok(),
1802        _ => None,
1803    }
1804}
1805
1806fn queue_like_matches(value: &str, pattern: &str) -> bool {
1807    if pattern == "%" {
1808        return true;
1809    }
1810    let starts_wild = pattern.starts_with('%');
1811    let ends_wild = pattern.ends_with('%');
1812    let needle = pattern.trim_matches('%');
1813    match (starts_wild, ends_wild) {
1814        (true, true) => value.contains(needle),
1815        (true, false) => value.ends_with(needle),
1816        (false, true) => value.starts_with(needle),
1817        (false, false) => value == needle,
1818    }
1819}
1820
1821pub(super) fn queue_message_view_by_id(
1822    store: &UnifiedStore,
1823    queue: &str,
1824    message_id: EntityId,
1825) -> RedDBResult<Option<QueueMessageView>> {
1826    let manager = queue_manager(store, queue)?;
1827    Ok(manager
1828        .get(message_id)
1829        .and_then(queue_message_view_from_entity))
1830}
1831
1832pub(super) fn sort_queue_messages(
1833    messages: &mut [QueueMessageView],
1834    config: &QueueRuntimeConfig,
1835    side: QueueSide,
1836) {
1837    messages.sort_by(|left, right| {
1838        if config.priority {
1839            right
1840                .priority
1841                .cmp(&left.priority)
1842                .then_with(|| match side {
1843                    QueueSide::Left => left.position.cmp(&right.position),
1844                    QueueSide::Right => right.position.cmp(&left.position),
1845                })
1846                .then_with(|| left.id.raw().cmp(&right.id.raw()))
1847        } else {
1848            match side {
1849                QueueSide::Left => left.position.cmp(&right.position),
1850                QueueSide::Right => right.position.cmp(&left.position),
1851            }
1852            .then_with(|| left.id.raw().cmp(&right.id.raw()))
1853        }
1854    });
1855}
1856
1857pub(super) fn next_queue_position(
1858    store: &UnifiedStore,
1859    queue: &str,
1860    side: QueueSide,
1861) -> RedDBResult<u64> {
1862    let messages = load_queue_message_views(store, queue)?;
1863    if messages.is_empty() {
1864        return Ok(QUEUE_POSITION_CENTER);
1865    }
1866    match side {
1867        QueueSide::Left => Ok(messages
1868            .iter()
1869            .map(|message| message.position)
1870            .min()
1871            .unwrap_or(QUEUE_POSITION_CENTER)
1872            .saturating_sub(1)),
1873        QueueSide::Right => Ok(messages
1874            .iter()
1875            .map(|message| message.position)
1876            .max()
1877            .unwrap_or(QUEUE_POSITION_CENTER)
1878            .saturating_add(1)),
1879    }
1880}
1881
1882pub(super) fn increment_queue_attempts(
1883    store: &UnifiedStore,
1884    queue: &str,
1885    message_id: EntityId,
1886) -> RedDBResult<u32> {
1887    let manager = queue_manager(store, queue)?;
1888    let mut entity = manager
1889        .get(message_id)
1890        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1891    match &mut entity.data {
1892        EntityData::QueueMessage(message) => {
1893            message.attempts = message.attempts.saturating_add(1);
1894            let attempts = message.attempts;
1895            manager
1896                .update(entity)
1897                .map_err(|err| RedDBError::Internal(err.to_string()))?;
1898            Ok(attempts)
1899        }
1900        _ => Err(RedDBError::Query(format!(
1901            "entity '{}' is not a queue message",
1902            message_id.raw()
1903        ))),
1904    }
1905}
1906
1907pub(super) fn queue_message_attempts(
1908    store: &UnifiedStore,
1909    queue: &str,
1910    message_id: EntityId,
1911) -> RedDBResult<u32> {
1912    Ok(queue_message_data(store, queue, message_id)?.attempts)
1913}
1914
1915pub(super) fn queue_message_max_attempts(
1916    store: &UnifiedStore,
1917    queue: &str,
1918    message_id: EntityId,
1919) -> RedDBResult<u32> {
1920    Ok(queue_message_data(store, queue, message_id)?.max_attempts)
1921}
1922
1923pub(super) fn queue_message_payload(
1924    store: &UnifiedStore,
1925    queue: &str,
1926    message_id: EntityId,
1927) -> RedDBResult<Value> {
1928    Ok(queue_message_data(store, queue, message_id)?.payload)
1929}
1930
1931pub(super) fn queue_message_pending_any(
1932    store: &UnifiedStore,
1933    queue: &str,
1934    message_id: EntityId,
1935) -> RedDBResult<bool> {
1936    Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
1937}
1938
1939pub(super) fn queue_message_pending_for_group(
1940    store: &UnifiedStore,
1941    queue: &str,
1942    group: &str,
1943    message_id: EntityId,
1944) -> RedDBResult<bool> {
1945    Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1946}
1947
1948pub(super) fn queue_message_acked_for_group(
1949    store: &UnifiedStore,
1950    queue: &str,
1951    group: &str,
1952    message_id: EntityId,
1953) -> RedDBResult<bool> {
1954    Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1955}
1956
1957fn queue_manager(
1958    store: &UnifiedStore,
1959    queue: &str,
1960) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
1961    store
1962        .get_collection(queue)
1963        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
1964}
1965
1966pub(super) fn queue_message_data(
1967    store: &UnifiedStore,
1968    queue: &str,
1969    message_id: EntityId,
1970) -> RedDBResult<QueueMessageData> {
1971    let manager = queue_manager(store, queue)?;
1972    let entity = manager
1973        .get(message_id)
1974        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1975    match entity.data {
1976        EntityData::QueueMessage(message) => Ok(message),
1977        _ => Err(RedDBError::Query(format!(
1978            "entity '{}' is not a queue message",
1979            message_id.raw()
1980        ))),
1981    }
1982}
1983
1984fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
1985    let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
1986    store
1987        .insert_auto(
1988            QUEUE_META_COLLECTION,
1989            UnifiedEntity::new(
1990                EntityId::new(0),
1991                EntityKind::TableRow {
1992                    table: Arc::from(QUEUE_META_COLLECTION),
1993                    row_id: 0,
1994                },
1995                EntityData::Row(RowData {
1996                    columns: Vec::new(),
1997                    named: Some(fields),
1998                    schema: None,
1999                }),
2000            ),
2001        )
2002        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2003    Ok(())
2004}
2005
2006pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2007    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2008        return;
2009    };
2010    let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2011    for row in rows {
2012        let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2013    }
2014}
2015
2016pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2017    let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2018}
2019
2020fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2021    format!("{queue}:{}", message_id.raw())
2022}
2023
2024pub(super) fn queue_message_lock_handle(
2025    runtime: &RedDBRuntime,
2026    queue: &str,
2027    message_id: EntityId,
2028) -> Arc<parking_lot::Mutex<()>> {
2029    let key = queue_message_lock_key(queue, message_id);
2030    if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2031        return lock;
2032    }
2033
2034    let mut locks = runtime.inner.queue_message_locks.write();
2035    locks
2036        .entry(key)
2037        .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2038        .clone()
2039}
2040
2041pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2042    runtime
2043        .inner
2044        .queue_message_locks
2045        .write()
2046        .remove(&queue_message_lock_key(queue, message_id));
2047}
2048
2049fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2050    let raw = value.strip_prefix('e').unwrap_or(value);
2051    raw.parse::<u64>()
2052        .map(EntityId::new)
2053        .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2054}
2055
2056fn message_id_string(message_id: EntityId) -> String {
2057    message_id.raw().to_string()
2058}
2059
2060/// Slice 10 of issue #527 — render-time scan of pending entries
2061/// per (queue, group) for `queue_pending_gauge` exposition. Walks
2062/// `red_queue_meta` live so the gauge cannot drift from the source
2063/// of truth.
2064pub(crate) fn pending_counts_by_group(
2065    store: &UnifiedStore,
2066) -> std::collections::BTreeMap<(String, String), u64> {
2067    let mut counts: std::collections::BTreeMap<(String, String), u64> =
2068        std::collections::BTreeMap::new();
2069    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2070        return counts;
2071    };
2072    for entity in manager.query_all(|entity| {
2073        entity
2074            .data
2075            .as_row()
2076            .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2077    }) {
2078        if let Some(row) = entity.data.as_row() {
2079            let queue = row_text(row, "queue");
2080            let group = row_text(row, "group");
2081            if let (Some(q), Some(g)) = (queue, group) {
2082                *counts.entry((q, g)).or_insert(0) += 1;
2083            }
2084        }
2085    }
2086    counts
2087}
2088
2089pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2090    match row.get_field(field)?.clone() {
2091        Value::Text(value) => Some(value.to_string()),
2092        Value::NodeRef(value) => Some(value),
2093        Value::EdgeRef(value) => Some(value),
2094        Value::TableRef(value) => Some(value),
2095        _ => None,
2096    }
2097}
2098
2099pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2100    match row.get_field(field)?.clone() {
2101        Value::UnsignedInteger(value) => Some(value),
2102        Value::Integer(value) if value >= 0 => Some(value as u64),
2103        Value::Float(value) if value >= 0.0 => Some(value as u64),
2104        Value::Text(value) => value.parse().ok(),
2105        _ => None,
2106    }
2107}
2108
2109fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2110    match row.get_field(field)?.clone() {
2111        Value::Boolean(value) => Some(value),
2112        Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2113            "true" => Some(true),
2114            "false" => Some(false),
2115            _ => None,
2116        },
2117        _ => None,
2118    }
2119}
2120
2121fn queue_collection_contract(
2122    name: &str,
2123    priority: bool,
2124    ttl_ms: Option<u64>,
2125) -> crate::physical::CollectionContract {
2126    let now = current_unix_ms();
2127    let mut context_index_fields = Vec::new();
2128    if priority {
2129        context_index_fields.push("priority".to_string());
2130    }
2131
2132    crate::physical::CollectionContract {
2133        name: name.to_string(),
2134        declared_model: crate::catalog::CollectionModel::Queue,
2135        schema_mode: crate::catalog::SchemaMode::Dynamic,
2136        origin: crate::physical::ContractOrigin::Explicit,
2137        version: 1,
2138        created_at_unix_ms: now,
2139        updated_at_unix_ms: now,
2140        default_ttl_ms: ttl_ms,
2141        vector_dimension: None,
2142        vector_metric: None,
2143        context_index_fields,
2144        declared_columns: Vec::new(),
2145        table_def: None,
2146        timestamps_enabled: false,
2147        context_index_enabled: false,
2148        metrics_raw_retention_ms: None,
2149        metrics_rollup_policies: Vec::new(),
2150        metrics_tenant_identity: None,
2151        metrics_namespace: None,
2152        // Queues manipulate messages via push/pop/ack — the row DML
2153        // paths never apply. Flag it as append_only so inadvertent
2154        // `UPDATE/DELETE FROM queue_name` statements fail loudly.
2155        append_only: true,
2156        subscriptions: Vec::new(),
2157        session_key: None,
2158        session_gap_ms: None,
2159        retention_duration_ms: None,
2160    }
2161}
2162
2163fn current_unix_ms() -> u128 {
2164    std::time::SystemTime::now()
2165        .duration_since(std::time::UNIX_EPOCH)
2166        .unwrap_or_default()
2167        .as_millis()
2168}
2169
2170pub(super) fn now_ns() -> u64 {
2171    std::time::SystemTime::now()
2172        .duration_since(std::time::UNIX_EPOCH)
2173        .unwrap_or_default()
2174        .as_nanos() as u64
2175}
2176
2177pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2178    Metadata::with_fields(
2179        [(
2180            "_ttl_ms".to_string(),
2181            if ttl_ms <= i64::MAX as u64 {
2182                MetadataValue::Int(ttl_ms as i64)
2183            } else {
2184                MetadataValue::Timestamp(ttl_ms)
2185            },
2186        )]
2187        .into_iter()
2188        .collect(),
2189    )
2190}
2191
2192/// Rough payload byte estimate for outbox watermark tracking.
2193fn estimate_payload_bytes(payload: &Value) -> u64 {
2194    match payload {
2195        Value::Json(v) => v.len() as u64,
2196        Value::Text(s) => s.len() as u64,
2197        _ => 64,
2198    }
2199}