Skip to main content

reddb_server/runtime/
impl_queue.rs

1//! Queue DDL and command execution
2
3use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::storage::queue::QueueMode;
9use crate::storage::unified::entity::{QueueMessageData, RowData};
10use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
11
12use super::*;
13
14// ---------------------------------------------------------------------------
15// Outbox metrics (exposed via /metrics)
16// ---------------------------------------------------------------------------
17
18/// Total event push attempts that failed (queue full or other error) and
19/// triggered DLQ routing.
20pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
21
22/// Total events routed to the dead-letter queue.
23pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
24
25/// Total events successfully enqueued to their target queue.
26pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
27
28/// Warn when total estimated outbox payload bytes exceed this value (1 GiB).
29const OUTBOX_WARN_BYTES: u64 = 1 << 30;
30
31/// Route all new events to DLQ when estimated outbox exceeds this value (10 GiB).
32const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
33
34/// Running estimate of bytes pending in event queues (approximate; not decremented on consume).
35static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
36
37const QUEUE_META_COLLECTION: &str = "red_queue_meta";
38const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
39const WORK_DEFAULT_GROUP: &str = "_work_default";
40const FANOUT_GROUP_PREFIX: &str = "_fanout_";
41
42#[derive(Debug, Clone)]
43pub(super) struct QueueRuntimeConfig {
44    pub(super) mode: QueueMode,
45    pub(super) priority: bool,
46    pub(super) max_size: Option<usize>,
47    pub(super) ttl_ms: Option<u64>,
48    pub(super) dlq: Option<String>,
49    pub(super) max_attempts: u32,
50}
51
52#[derive(Debug, Clone)]
53struct QueueGroupEntry {
54    entity_id: EntityId,
55    group: String,
56}
57
58#[derive(Debug, Clone)]
59pub(super) struct QueuePendingEntry {
60    pub(super) entity_id: EntityId,
61    group: String,
62    pub(super) message_id: EntityId,
63    consumer: String,
64    pub(super) delivered_at_ns: u64,
65    pub(super) delivery_count: u32,
66}
67
68#[derive(Debug, Clone)]
69pub(super) struct QueueAckEntry {
70    entity_id: EntityId,
71    group: String,
72    pub(super) message_id: EntityId,
73}
74
75#[derive(Debug, Clone)]
76pub(super) struct QueueMessageView {
77    pub(super) id: EntityId,
78    position: u64,
79    priority: i32,
80    pub(super) payload: Value,
81    attempts: u32,
82    pub(super) max_attempts: u32,
83    enqueued_at_ns: u64,
84}
85
86impl RedDBRuntime {
87    pub(crate) fn enqueue_event_payload(
88        &self,
89        queue: &str,
90        payload: Value,
91    ) -> RedDBResult<EntityId> {
92        let store = self.inner.db.store();
93        // Auto-create the queue if it does not exist yet.
94        if store.get_collection(queue).is_none() {
95            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
96        }
97
98        // Estimate payload bytes for outbox watermark checks.
99        let payload_bytes = estimate_payload_bytes(&payload);
100        let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
101
102        // Hard limit: route directly to DLQ without even trying.
103        if outbox_bytes > OUTBOX_MAX_BYTES {
104            OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
105            EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
106            return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
107        }
108
109        // Soft limit: warn once per crossing.
110        if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
111            tracing::warn!(
112                outbox_bytes,
113                warn_threshold = OUTBOX_WARN_BYTES,
114                "event outbox approaching capacity warning threshold"
115            );
116            crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
117                queue: queue.to_string(),
118                dlq: format!("{queue}_outbox_dlq"),
119                reason: "outbox_warn_bytes_exceeded".to_string(),
120            }
121            .emit_global();
122        }
123
124        let config = load_queue_config(store.as_ref(), queue);
125
126        // If the target queue has a max_size and is full, route to DLQ.
127        if let Some(max_size) = config.max_size {
128            let current_len = load_queue_message_views(store.as_ref(), queue)
129                .unwrap_or_default()
130                .len();
131            if current_len >= max_size {
132                OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
133                EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
134                return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
135            }
136            // Warn at 80% capacity.
137            if current_len * 10 >= max_size * 8 {
138                tracing::warn!(
139                    queue = %queue,
140                    size = current_len,
141                    max = max_size,
142                    "event target queue near capacity"
143                );
144            }
145        }
146
147        let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
148        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
149        Ok(id)
150    }
151
152    /// Route a failed event to `<queue>_outbox_dlq`, auto-creating it if needed.
153    fn route_event_to_outbox_dlq(
154        &self,
155        queue: &str,
156        payload: Value,
157        reason: &str,
158    ) -> RedDBResult<EntityId> {
159        let dlq_name = format!("{queue}_outbox_dlq");
160        EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
161
162        crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
163            queue: queue.to_string(),
164            dlq: dlq_name.clone(),
165            reason: reason.to_string(),
166        }
167        .emit_global();
168
169        let store = self.inner.db.store();
170        if store.get_collection(&dlq_name).is_none() {
171            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
172        }
173        let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
174        let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
175        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
176        Ok(id)
177    }
178
179    /// Low-level event message insert — no size checks, no DLQ routing.
180    fn enqueue_event_payload_raw(
181        &self,
182        store: &UnifiedStore,
183        queue: &str,
184        config: &QueueRuntimeConfig,
185        payload: Value,
186    ) -> RedDBResult<EntityId> {
187        let position = next_queue_position(store, queue, QueueSide::Right)?;
188        let mut entity = UnifiedEntity::new(
189            EntityId::new(0),
190            EntityKind::QueueMessage {
191                queue: queue.to_string(),
192                position,
193            },
194            EntityData::QueueMessage(QueueMessageData {
195                payload,
196                priority: None,
197                enqueued_at_ns: now_ns(),
198                attempts: 0,
199                max_attempts: config.max_attempts,
200                acked: false,
201            }),
202        );
203        if let Some(xid) = self.current_xid() {
204            entity.set_xmin(xid);
205        }
206        let id = store
207            .insert_auto(queue, entity)
208            .map_err(|err| RedDBError::Internal(err.to_string()))?;
209        if let Some(ttl_ms) = config.ttl_ms {
210            store
211                .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
212                .map_err(|err| RedDBError::Internal(err.to_string()))?;
213        }
214        self.invalidate_result_cache_for_table(queue);
215        Ok(id)
216    }
217
218    pub fn execute_create_queue(
219        &self,
220        raw_query: &str,
221        query: &CreateQueueQuery,
222    ) -> RedDBResult<RuntimeQueryResult> {
223        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
224        if query.dlq.as_deref() == Some(query.name.as_str()) {
225            return Err(RedDBError::Query(
226                "dead-letter queue must be different from the source queue".to_string(),
227            ));
228        }
229
230        let store = self.inner.db.store();
231        let exists = store.get_collection(&query.name).is_some();
232        if exists {
233            if query.if_not_exists {
234                return Ok(RuntimeQueryResult::ok_message(
235                    raw_query.to_string(),
236                    &format!("queue '{}' already exists", query.name),
237                    "create",
238                ));
239            }
240            return Err(RedDBError::Query(format!(
241                "queue '{}' already exists",
242                query.name
243            )));
244        }
245
246        store
247            .create_collection(&query.name)
248            .map_err(|err| RedDBError::Internal(err.to_string()))?;
249        if let Some(ttl_ms) = query.ttl_ms {
250            self.inner
251                .db
252                .set_collection_default_ttl_ms(&query.name, ttl_ms);
253        }
254        self.inner
255            .db
256            .save_collection_contract(queue_collection_contract(
257                &query.name,
258                query.priority,
259                query.ttl_ms,
260            ))
261            .map_err(|err| RedDBError::Internal(err.to_string()))?;
262        save_queue_config(
263            store.as_ref(),
264            &query.name,
265            &QueueRuntimeConfig {
266                mode: query.mode,
267                priority: query.priority,
268                max_size: query.max_size,
269                ttl_ms: query.ttl_ms,
270                dlq: query.dlq.clone(),
271                max_attempts: query.max_attempts,
272            },
273        )?;
274
275        if let Some(dlq) = &query.dlq {
276            if store.get_collection(dlq).is_none() {
277                store
278                    .create_collection(dlq)
279                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
280                self.inner
281                    .db
282                    .save_collection_contract(queue_collection_contract(dlq, false, None))
283                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
284            }
285        }
286
287        self.invalidate_result_cache();
288        self.inner
289            .db
290            .persist_metadata()
291            .map_err(|err| RedDBError::Internal(err.to_string()))?;
292        // Issue #120 — feed the queue into the schema-vocabulary so
293        // AskPipeline (#121) can resolve queue references. Queues
294        // have an opaque payload column, so we expose `payload` and
295        // (when configured) the DLQ partner as type-tag context.
296        let mut type_tags = Vec::new();
297        if let Some(dlq) = &query.dlq {
298            type_tags.push(format!("dlq:{}", dlq));
299        }
300        self.schema_vocabulary_apply(
301            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
302                collection: query.name.clone(),
303                columns: vec!["payload".to_string()],
304                type_tags,
305                description: None,
306            },
307        );
308
309        let mut msg = format!("queue '{}' created", query.name);
310        msg.push_str(&format!(" (mode={})", query.mode.as_str()));
311        if query.priority {
312            msg.push_str(" (priority)");
313        }
314        if let Some(max_size) = query.max_size {
315            msg.push_str(&format!(" (max_size={max_size})"));
316        }
317        if let Some(ttl_ms) = query.ttl_ms {
318            msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
319        }
320        if let Some(dlq) = &query.dlq {
321            msg.push_str(&format!(
322                " (dlq={dlq}, max_attempts={})",
323                query.max_attempts
324            ));
325        }
326
327        Ok(RuntimeQueryResult::ok_message(
328            raw_query.to_string(),
329            &msg,
330            "create",
331        ))
332    }
333
334    pub fn execute_alter_queue(
335        &self,
336        raw_query: &str,
337        query: &AlterQueueQuery,
338    ) -> RedDBResult<RuntimeQueryResult> {
339        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
340        let store = self.inner.db.store();
341        ensure_queue_exists(store.as_ref(), &query.name)?;
342
343        let pending =
344            load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
345        if !pending.is_empty() {
346            tracing::warn!(
347                queue = %query.name,
348                pending_count = pending.len(),
349                new_mode = %query.mode.as_str(),
350                "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
351                 new reads use {}",
352                pending.len(),
353                query.mode.as_str(),
354            );
355        }
356
357        let mut config = load_queue_config(store.as_ref(), &query.name);
358        config.mode = query.mode;
359        save_queue_config(store.as_ref(), &query.name, &config)?;
360
361        self.invalidate_result_cache();
362        self.inner
363            .db
364            .persist_metadata()
365            .map_err(|err| RedDBError::Internal(err.to_string()))?;
366
367        Ok(RuntimeQueryResult::ok_message(
368            raw_query.to_string(),
369            &format!("queue '{}' mode set to {}", query.name, query.mode.as_str()),
370            "alter",
371        ))
372    }
373
374    pub fn execute_drop_queue(
375        &self,
376        raw_query: &str,
377        query: &DropQueueQuery,
378    ) -> RedDBResult<RuntimeQueryResult> {
379        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
380        let store = self.inner.db.store();
381        if super::impl_ddl::is_system_schema_name(&query.name) {
382            return Err(RedDBError::Query("system schema is read-only".to_string()));
383        }
384        if store.get_collection(&query.name).is_none() {
385            if query.if_exists {
386                return Ok(RuntimeQueryResult::ok_message(
387                    raw_query.to_string(),
388                    &format!("queue '{}' does not exist", query.name),
389                    "drop",
390                ));
391            }
392            return Err(RedDBError::NotFound(format!(
393                "queue '{}' not found",
394                query.name
395            )));
396        }
397        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
398            &query.name,
399            &self.inner.db.catalog_model_snapshot(),
400        )?;
401        crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
402            crate::catalog::CollectionModel::Queue,
403            actual,
404        )?;
405
406        store
407            .drop_collection(&query.name)
408            .map_err(|err| RedDBError::Internal(err.to_string()))?;
409        self.inner.db.clear_collection_default_ttl_ms(&query.name);
410        self.inner
411            .db
412            .remove_collection_contract(&query.name)
413            .map_err(|err| RedDBError::Internal(err.to_string()))?;
414        remove_queue_metadata(store.as_ref(), &query.name);
415        self.invalidate_result_cache();
416        self.inner
417            .db
418            .persist_metadata()
419            .map_err(|err| RedDBError::Internal(err.to_string()))?;
420        // Issue #120 — invalidate the schema-vocabulary entry.
421        self.schema_vocabulary_apply(
422            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
423                collection: query.name.clone(),
424            },
425        );
426
427        Ok(RuntimeQueryResult::ok_message(
428            raw_query.to_string(),
429            &format!("queue '{}' dropped", query.name),
430            "drop",
431        ))
432    }
433
434    pub fn execute_queue_command(
435        &self,
436        raw_query: &str,
437        cmd: &QueueCommand,
438    ) -> RedDBResult<RuntimeQueryResult> {
439        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
440        match cmd {
441            QueueCommand::Push {
442                queue,
443                value,
444                side,
445                priority,
446            } => {
447                let store = self.inner.db.store();
448                ensure_queue_exists(store.as_ref(), queue)?;
449                let config = load_queue_config(store.as_ref(), queue);
450                if priority.is_some() && !config.priority {
451                    return Err(RedDBError::Query(format!(
452                        "queue '{}' is not a priority queue",
453                        queue
454                    )));
455                }
456                if let Some(max_size) = config.max_size {
457                    let current_len =
458                        load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
459                            .len();
460                    if current_len >= max_size {
461                        return Err(RedDBError::Query(format!(
462                            "queue '{}' is full (max_size={max_size})",
463                            queue
464                        )));
465                    }
466                }
467
468                let position = next_queue_position(store.as_ref(), queue, *side)?;
469                let mut entity = UnifiedEntity::new(
470                    EntityId::new(0),
471                    EntityKind::QueueMessage {
472                        queue: queue.clone(),
473                        position,
474                    },
475                    EntityData::QueueMessage(QueueMessageData {
476                        payload: value.clone(),
477                        priority: if config.priority { *priority } else { None },
478                        enqueued_at_ns: now_ns(),
479                        attempts: 0,
480                        max_attempts: config.max_attempts,
481                        acked: false,
482                    }),
483                );
484                // Phase 1.1 MVCC universal: stamp xmin so other
485                // connections don't see this message until COMMIT.
486                if let Some(xid) = self.current_xid() {
487                    entity.set_xmin(xid);
488                }
489                let id = store
490                    .insert_auto(queue, entity)
491                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
492                if let Some(ttl_ms) = config.ttl_ms {
493                    store
494                        .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
495                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
496                }
497                self.invalidate_result_cache();
498
499                let mut result = UnifiedResult::with_columns(vec![
500                    "message_id".into(),
501                    "side".into(),
502                    "queue".into(),
503                ]);
504                let mut record = UnifiedRecord::new();
505                record.set("message_id", Value::text(message_id_string(id)));
506                record.set(
507                    "side",
508                    Value::text(match side {
509                        QueueSide::Left => "left".to_string(),
510                        QueueSide::Right => "right".to_string(),
511                    }),
512                );
513                record.set("queue", Value::text(queue.clone()));
514                result.push(record);
515
516                Ok(RuntimeQueryResult {
517                    query: raw_query.to_string(),
518                    mode: QueryMode::Sql,
519                    statement: "queue_push",
520                    engine: "runtime-queue",
521                    result,
522                    affected_rows: 1,
523                    statement_type: "insert",
524                })
525            }
526            QueueCommand::Pop { queue, side, count } => {
527                let store = self.inner.db.store();
528                ensure_queue_exists(store.as_ref(), queue)?;
529                let popped = super::queue_delivery::pop_messages(
530                    self,
531                    store.as_ref(),
532                    queue,
533                    *side,
534                    *count,
535                )?;
536
537                let mut result =
538                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
539                for message in &popped {
540                    let mut record = UnifiedRecord::new();
541                    record.set(
542                        "message_id",
543                        Value::text(message_id_string(message.message_id)),
544                    );
545                    record.set("payload", message.payload.clone());
546                    result.push(record);
547                }
548                let popped_count = popped.len() as u64;
549                if popped_count > 0 {
550                    self.invalidate_result_cache();
551                }
552
553                Ok(RuntimeQueryResult {
554                    query: raw_query.to_string(),
555                    mode: QueryMode::Sql,
556                    statement: "queue_pop",
557                    engine: "runtime-queue",
558                    result,
559                    affected_rows: popped_count,
560                    statement_type: "delete",
561                })
562            }
563            QueueCommand::Peek { queue, count } => {
564                let store = self.inner.db.store();
565                ensure_queue_exists(store.as_ref(), queue)?;
566                let messages =
567                    super::queue_delivery::peek_messages(self, store.as_ref(), queue, *count)?;
568
569                let mut result =
570                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
571                for message in messages {
572                    let mut record = UnifiedRecord::new();
573                    record.set(
574                        "message_id",
575                        Value::text(message_id_string(message.message_id)),
576                    );
577                    record.set("payload", message.payload);
578                    result.push(record);
579                }
580
581                Ok(RuntimeQueryResult {
582                    query: raw_query.to_string(),
583                    mode: QueryMode::Sql,
584                    statement: "queue_peek",
585                    engine: "runtime-queue",
586                    result,
587                    affected_rows: 0,
588                    statement_type: "select",
589                })
590            }
591            QueueCommand::Len { queue } => {
592                let store = self.inner.db.store();
593                ensure_queue_exists(store.as_ref(), queue)?;
594                let count =
595                    load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
596                        as u64;
597                let mut result = UnifiedResult::with_columns(vec!["len".into()]);
598                let mut record = UnifiedRecord::new();
599                record.set("len", Value::UnsignedInteger(count));
600                result.push(record);
601
602                Ok(RuntimeQueryResult {
603                    query: raw_query.to_string(),
604                    mode: QueryMode::Sql,
605                    statement: "queue_len",
606                    engine: "runtime-queue",
607                    result,
608                    affected_rows: 0,
609                    statement_type: "select",
610                })
611            }
612            QueueCommand::Purge { queue } => {
613                let store = self.inner.db.store();
614                ensure_queue_exists(store.as_ref(), queue)?;
615                let count = super::queue_delivery::purge_messages(self, store.as_ref(), queue)?;
616                if count > 0 {
617                    self.invalidate_result_cache();
618                }
619
620                Ok(RuntimeQueryResult::ok_message(
621                    raw_query.to_string(),
622                    &format!("{count} messages purged from queue '{queue}'"),
623                    "delete",
624                ))
625            }
626            QueueCommand::GroupCreate { queue, group } => {
627                let store = self.inner.db.store();
628                ensure_queue_exists(store.as_ref(), queue)?;
629                if queue_group_exists(store.as_ref(), queue, group)? {
630                    return Ok(RuntimeQueryResult::ok_message(
631                        raw_query.to_string(),
632                        &format!(
633                            "consumer group '{}' already exists on queue '{}'",
634                            group, queue
635                        ),
636                        "create",
637                    ));
638                }
639                save_queue_group(store.as_ref(), queue, group)?;
640                self.invalidate_result_cache();
641
642                Ok(RuntimeQueryResult::ok_message(
643                    raw_query.to_string(),
644                    &format!("consumer group '{}' created on queue '{}'", group, queue),
645                    "create",
646                ))
647            }
648            QueueCommand::GroupRead {
649                queue,
650                group,
651                consumer,
652                count,
653            } => {
654                let store = self.inner.db.store();
655                ensure_queue_exists(store.as_ref(), queue)?;
656                let delivered = super::queue_delivery::read_messages(
657                    self,
658                    store.as_ref(),
659                    queue,
660                    group.as_deref(),
661                    consumer,
662                    *count,
663                )?;
664
665                let mut result = UnifiedResult::with_columns(vec![
666                    "message_id".into(),
667                    "payload".into(),
668                    "consumer".into(),
669                    "delivery_count".into(),
670                    "attempts".into(),
671                ]);
672
673                for message in delivered {
674                    let mut record = UnifiedRecord::new();
675                    record.set(
676                        "message_id",
677                        Value::text(message_id_string(message.message_id)),
678                    );
679                    record.set("payload", message.payload);
680                    record.set("consumer", Value::text(message.consumer));
681                    record.set(
682                        "delivery_count",
683                        Value::UnsignedInteger(u64::from(message.delivery_count)),
684                    );
685                    record.set(
686                        "attempts",
687                        Value::UnsignedInteger(u64::from(message.delivery_count)),
688                    );
689                    result.push(record);
690                }
691                if !result.records.is_empty() {
692                    self.invalidate_result_cache();
693                }
694
695                Ok(RuntimeQueryResult {
696                    query: raw_query.to_string(),
697                    mode: QueryMode::Sql,
698                    statement: "queue_group_read",
699                    engine: "runtime-queue",
700                    result,
701                    affected_rows: 0,
702                    statement_type: "select",
703                })
704            }
705            QueueCommand::Pending { queue, group } => {
706                let store = self.inner.db.store();
707                ensure_queue_exists(store.as_ref(), queue)?;
708                require_queue_group(store.as_ref(), queue, group)?;
709                let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
710                pending.sort_by_key(|entry| entry.delivered_at_ns);
711                let current_time_ns = now_ns();
712
713                let mut result = UnifiedResult::with_columns(vec![
714                    "message_id".into(),
715                    "consumer".into(),
716                    "delivered_at_ns".into(),
717                    "delivery_count".into(),
718                    "idle_ms".into(),
719                ]);
720                for entry in pending {
721                    let mut record = UnifiedRecord::new();
722                    record.set(
723                        "message_id",
724                        Value::text(message_id_string(entry.message_id)),
725                    );
726                    record.set("consumer", Value::text(entry.consumer));
727                    record.set(
728                        "delivered_at_ns",
729                        Value::UnsignedInteger(entry.delivered_at_ns),
730                    );
731                    record.set(
732                        "delivery_count",
733                        Value::UnsignedInteger(u64::from(entry.delivery_count)),
734                    );
735                    record.set(
736                        "idle_ms",
737                        Value::UnsignedInteger(
738                            current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
739                        ),
740                    );
741                    result.push(record);
742                }
743
744                Ok(RuntimeQueryResult {
745                    query: raw_query.to_string(),
746                    mode: QueryMode::Sql,
747                    statement: "queue_pending",
748                    engine: "runtime-queue",
749                    result,
750                    affected_rows: 0,
751                    statement_type: "select",
752                })
753            }
754            QueueCommand::Claim {
755                queue,
756                group,
757                consumer,
758                min_idle_ms,
759            } => {
760                let store = self.inner.db.store();
761                ensure_queue_exists(store.as_ref(), queue)?;
762                let delivered = super::queue_delivery::claim_messages(
763                    self,
764                    store.as_ref(),
765                    queue,
766                    group,
767                    consumer,
768                    *min_idle_ms,
769                )?;
770
771                let mut result = UnifiedResult::with_columns(vec![
772                    "message_id".into(),
773                    "payload".into(),
774                    "consumer".into(),
775                    "delivery_count".into(),
776                ]);
777
778                for message in delivered {
779                    let mut record = UnifiedRecord::new();
780                    record.set(
781                        "message_id",
782                        Value::text(message_id_string(message.message_id)),
783                    );
784                    record.set("payload", message.payload);
785                    record.set("consumer", Value::text(message.consumer));
786                    record.set(
787                        "delivery_count",
788                        Value::UnsignedInteger(u64::from(message.delivery_count)),
789                    );
790                    result.push(record);
791                }
792                if !result.records.is_empty() {
793                    self.invalidate_result_cache();
794                }
795                let affected_rows = result.records.len() as u64;
796
797                Ok(RuntimeQueryResult {
798                    query: raw_query.to_string(),
799                    mode: QueryMode::Sql,
800                    statement: "queue_claim",
801                    engine: "runtime-queue",
802                    result,
803                    affected_rows,
804                    statement_type: "update",
805                })
806            }
807            QueueCommand::Ack {
808                queue,
809                group,
810                message_id,
811            } => {
812                let store = self.inner.db.store();
813                ensure_queue_exists(store.as_ref(), queue)?;
814                require_queue_group(store.as_ref(), queue, group)?;
815                let message_id = parse_message_id(message_id)?;
816                let config = load_queue_config(store.as_ref(), queue);
817                super::queue_delivery::ack_message(
818                    self,
819                    store.as_ref(),
820                    queue,
821                    group,
822                    message_id,
823                    &config,
824                )?;
825                self.invalidate_result_cache();
826
827                Ok(RuntimeQueryResult::ok_message(
828                    raw_query.to_string(),
829                    "message acknowledged",
830                    "update",
831                ))
832            }
833            QueueCommand::Nack {
834                queue,
835                group,
836                message_id,
837            } => {
838                let store = self.inner.db.store();
839                ensure_queue_exists(store.as_ref(), queue)?;
840                require_queue_group(store.as_ref(), queue, group)?;
841                let message_id = parse_message_id(message_id)?;
842                let config = load_queue_config(store.as_ref(), queue);
843                let message = match super::queue_delivery::nack_message(
844                    self,
845                    store.as_ref(),
846                    queue,
847                    group,
848                    message_id,
849                    &config,
850                )? {
851                    super::queue_delivery::NackOutcome::Requeued => "message requeued".to_string(),
852                    super::queue_delivery::NackOutcome::MovedToDlq(dlq) => {
853                        format!("message moved to dead-letter queue '{}'", dlq)
854                    }
855                    super::queue_delivery::NackOutcome::Dropped => {
856                        "message dropped after max attempts".to_string()
857                    }
858                };
859                self.invalidate_result_cache();
860
861                Ok(RuntimeQueryResult::ok_message(
862                    raw_query.to_string(),
863                    &message,
864                    "update",
865                ))
866            }
867            QueueCommand::Move {
868                source,
869                destination,
870                filter,
871                limit,
872            } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
873        }
874    }
875
876    pub fn execute_queue_select(
877        &self,
878        raw_query: &str,
879        query: &QueueSelectQuery,
880    ) -> RedDBResult<RuntimeQueryResult> {
881        let store = self.inner.db.store();
882        ensure_queue_exists(store.as_ref(), &query.queue)?;
883        let config = load_queue_config(store.as_ref(), &query.queue);
884        let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
885        let columns = if query.columns.is_empty() {
886            queue_projection_default_columns()
887        } else {
888            query.columns.clone()
889        };
890
891        let mut messages =
892            load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
893        sort_queue_messages(&mut messages, &config, QueueSide::Left);
894
895        let mut result = UnifiedResult::with_columns(columns.clone());
896        for message in messages {
897            if query
898                .filter
899                .as_ref()
900                .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
901            {
902                continue;
903            }
904            let record = queue_projection_record(&columns, &message, dlq)?;
905            result.push(record);
906            if query
907                .limit
908                .is_some_and(|limit| result.records.len() >= limit as usize)
909            {
910                break;
911            }
912        }
913
914        Ok(RuntimeQueryResult {
915            query: raw_query.to_string(),
916            mode: QueryMode::Sql,
917            statement: "queue_select",
918            engine: "runtime-queue",
919            result,
920            affected_rows: 0,
921            statement_type: "select",
922        })
923    }
924
925    fn execute_queue_move(
926        &self,
927        raw_query: &str,
928        source: &str,
929        destination: &str,
930        filter: Option<&Filter>,
931        limit: usize,
932    ) -> RedDBResult<RuntimeQueryResult> {
933        if source == destination {
934            return Err(RedDBError::Query(
935                "QUEUE MOVE source and destination must be different".to_string(),
936            ));
937        }
938        let store = self.inner.db.store();
939        ensure_queue_exists(store.as_ref(), source)?;
940        ensure_queue_exists(store.as_ref(), destination)?;
941        let source_config = load_queue_config(store.as_ref(), source);
942        let destination_config = load_queue_config(store.as_ref(), destination);
943        let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
944
945        let mut messages =
946            load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
947        sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
948        let selected = messages
949            .into_iter()
950            .filter(|message| {
951                filter
952                    .map(|f| queue_message_matches_filter(message, source_dlq, f))
953                    .unwrap_or(true)
954            })
955            .take(limit)
956            .collect::<Vec<_>>();
957
958        if let Some(max_size) = destination_config.max_size {
959            let current_len =
960                load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
961                    .len();
962            if current_len + selected.len() > max_size {
963                return Err(RedDBError::Query(format!(
964                    "queue '{}' is full (max_size={max_size})",
965                    destination
966                )));
967            }
968        }
969
970        for message in &selected {
971            let lock = queue_message_lock_handle(self, source, message.id);
972            let Some(_guard) = lock.try_lock() else {
973                return Err(RedDBError::Query(format!(
974                    "message '{}' is locked on queue '{}'",
975                    message.id.raw(),
976                    source
977                )));
978            };
979            if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
980                return Err(RedDBError::Query(format!(
981                    "message '{}' is no longer available on queue '{}'",
982                    message.id.raw(),
983                    source
984                )));
985            }
986        }
987
988        let mut inserted = Vec::new();
989        for message in &selected {
990            match insert_moved_queue_message(
991                store.as_ref(),
992                destination,
993                &destination_config,
994                message,
995            ) {
996                Ok(id) => inserted.push(id),
997                Err(err) => {
998                    for id in inserted {
999                        let _ = store.delete(destination, id);
1000                    }
1001                    return Err(err);
1002                }
1003            }
1004        }
1005
1006        for message in &selected {
1007            super::queue_delivery::delete_message_with_state(
1008                Some(self),
1009                store.as_ref(),
1010                source,
1011                message.id,
1012            )?;
1013        }
1014        if !selected.is_empty() {
1015            self.invalidate_result_cache();
1016        }
1017
1018        let selected_count = selected.len() as u64;
1019        self.audit_log().record_event(
1020            AuditEvent::builder("queue/move")
1021                .source(AuditAuthSource::System)
1022                .outcome(Outcome::Success)
1023                .resource(format!("queue:{source}->{destination}"))
1024                .fields([
1025                    AuditFieldEscaper::field("source", source),
1026                    AuditFieldEscaper::field("destination", destination),
1027                    AuditFieldEscaper::field("selected", selected_count),
1028                    AuditFieldEscaper::field("committed", selected_count),
1029                ])
1030                .build(),
1031        );
1032
1033        let mut result = UnifiedResult::with_columns(vec![
1034            "source".into(),
1035            "destination".into(),
1036            "selected".into(),
1037            "committed".into(),
1038        ]);
1039        let mut record = UnifiedRecord::new();
1040        record.set("source", Value::text(source.to_string()));
1041        record.set("destination", Value::text(destination.to_string()));
1042        record.set("selected", Value::UnsignedInteger(selected_count));
1043        record.set("committed", Value::UnsignedInteger(selected_count));
1044        result.push(record);
1045
1046        Ok(RuntimeQueryResult {
1047            query: raw_query.to_string(),
1048            mode: QueryMode::Sql,
1049            statement: "queue_move",
1050            engine: "runtime-queue",
1051            result,
1052            affected_rows: selected_count,
1053            statement_type: "update",
1054        })
1055    }
1056}
1057
1058fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1059    if store.get_collection(queue).is_some() {
1060        Ok(())
1061    } else {
1062        Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1063    }
1064}
1065
1066pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1067    let default = QueueRuntimeConfig {
1068        mode: QueueMode::Work,
1069        priority: false,
1070        max_size: None,
1071        ttl_ms: None,
1072        dlq: None,
1073        max_attempts: 3,
1074    };
1075
1076    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1077        return default;
1078    };
1079    manager
1080        .query_all(|entity| {
1081            entity.data.as_row().is_some_and(|row| {
1082                row_text(row, "kind").as_deref() == Some("queue_config")
1083                    && row_text(row, "queue").as_deref() == Some(queue)
1084            })
1085        })
1086        .into_iter()
1087        .find_map(|entity| {
1088            let row = entity.data.as_row()?;
1089            Some(QueueRuntimeConfig {
1090                mode: row_text(row, "mode")
1091                    .as_deref()
1092                    .and_then(QueueMode::parse)
1093                    .unwrap_or_default(),
1094                priority: row_bool(row, "priority").unwrap_or(false),
1095                max_size: row_u64(row, "max_size").map(|value| value as usize),
1096                ttl_ms: row_u64(row, "ttl_ms"),
1097                dlq: row_text(row, "dlq"),
1098                max_attempts: row_u64(row, "max_attempts")
1099                    .map(|value| value as u32)
1100                    .unwrap_or(3),
1101            })
1102        })
1103        .unwrap_or(default)
1104}
1105
1106pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1107    load_queue_config(store, queue).mode.as_str()
1108}
1109
1110fn save_queue_config(
1111    store: &UnifiedStore,
1112    queue: &str,
1113    config: &QueueRuntimeConfig,
1114) -> RedDBResult<()> {
1115    remove_meta_rows(store, |row| {
1116        row_text(row, "kind").as_deref() == Some("queue_config")
1117            && row_text(row, "queue").as_deref() == Some(queue)
1118    });
1119
1120    let mut fields = HashMap::new();
1121    fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1122    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1123    fields.insert(
1124        "mode".to_string(),
1125        Value::text(config.mode.as_str().to_string()),
1126    );
1127    fields.insert("priority".to_string(), Value::Boolean(config.priority));
1128    fields.insert(
1129        "max_size".to_string(),
1130        config
1131            .max_size
1132            .map(|value| Value::UnsignedInteger(value as u64))
1133            .unwrap_or(Value::Null),
1134    );
1135    fields.insert(
1136        "ttl_ms".to_string(),
1137        config
1138            .ttl_ms
1139            .map(Value::UnsignedInteger)
1140            .unwrap_or(Value::Null),
1141    );
1142    fields.insert(
1143        "dlq".to_string(),
1144        config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1145    );
1146    fields.insert(
1147        "max_attempts".to_string(),
1148        Value::UnsignedInteger(u64::from(config.max_attempts)),
1149    );
1150    insert_meta_row(store, fields)
1151}
1152
1153fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1154    remove_meta_rows(store, |row| {
1155        row_text(row, "queue").as_deref() == Some(queue)
1156    });
1157}
1158
1159fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1160    Ok(load_queue_groups(store, queue)?
1161        .into_iter()
1162        .any(|entry| entry.group == group))
1163}
1164
1165pub(super) fn require_queue_group(
1166    store: &UnifiedStore,
1167    queue: &str,
1168    group: &str,
1169) -> RedDBResult<()> {
1170    if queue_group_exists(store, queue, group)? {
1171        Ok(())
1172    } else {
1173        Err(RedDBError::NotFound(format!(
1174            "consumer group '{}' not found on queue '{}'",
1175            group, queue
1176        )))
1177    }
1178}
1179
1180pub(super) fn resolve_read_group(
1181    store: &UnifiedStore,
1182    queue: &str,
1183    group: Option<&str>,
1184    consumer: &str,
1185    config: &QueueRuntimeConfig,
1186) -> RedDBResult<String> {
1187    if let Some(group) = group {
1188        require_queue_group(store, queue, group)?;
1189        return Ok(group.to_string());
1190    }
1191
1192    match config.mode {
1193        QueueMode::Work => {
1194            if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1195                save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1196            }
1197            Ok(WORK_DEFAULT_GROUP.to_string())
1198        }
1199        QueueMode::Fanout => {
1200            let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1201            if !queue_group_exists(store, queue, &fanout_group)? {
1202                save_queue_group(store, queue, &fanout_group)?;
1203            }
1204            Ok(fanout_group)
1205        }
1206    }
1207}
1208
1209fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1210    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1211        return Ok(Vec::new());
1212    };
1213    Ok(manager
1214        .query_all(|entity| {
1215            entity.data.as_row().is_some_and(|row| {
1216                row_text(row, "kind").as_deref() == Some("queue_group")
1217                    && row_text(row, "queue").as_deref() == Some(queue)
1218            })
1219        })
1220        .into_iter()
1221        .filter_map(|entity| {
1222            let row = entity.data.as_row()?;
1223            Some(QueueGroupEntry {
1224                entity_id: entity.id,
1225                group: row_text(row, "group")?,
1226            })
1227        })
1228        .collect())
1229}
1230
1231fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1232    let mut fields = HashMap::new();
1233    fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1234    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1235    fields.insert("group".to_string(), Value::text(group.to_string()));
1236    fields.insert(
1237        "created_at_ns".to_string(),
1238        Value::UnsignedInteger(now_ns()),
1239    );
1240    insert_meta_row(store, fields)
1241}
1242
1243pub(super) fn load_pending_entries(
1244    store: &UnifiedStore,
1245    queue: &str,
1246    group: Option<&str>,
1247    message_id: Option<EntityId>,
1248) -> RedDBResult<Vec<QueuePendingEntry>> {
1249    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1250        return Ok(Vec::new());
1251    };
1252    Ok(manager
1253        .query_all(|entity| {
1254            entity.data.as_row().is_some_and(|row| {
1255                row_text(row, "kind").as_deref() == Some("queue_pending")
1256                    && row_text(row, "queue").as_deref() == Some(queue)
1257                    && group
1258                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1259                        .unwrap_or(true)
1260                    && message_id
1261                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1262                        .unwrap_or(true)
1263            })
1264        })
1265        .into_iter()
1266        .filter_map(|entity| {
1267            let row = entity.data.as_row()?;
1268            Some(QueuePendingEntry {
1269                entity_id: entity.id,
1270                group: row_text(row, "group")?,
1271                message_id: EntityId::new(row_u64(row, "message_id")?),
1272                consumer: row_text(row, "consumer")?,
1273                delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1274                delivery_count: row_u64(row, "delivery_count")
1275                    .map(|value| value as u32)
1276                    .unwrap_or(1),
1277            })
1278        })
1279        .collect())
1280}
1281
1282pub(super) fn save_queue_pending(
1283    store: &UnifiedStore,
1284    queue: &str,
1285    group: &str,
1286    message_id: EntityId,
1287    consumer: &str,
1288    delivered_at_ns: u64,
1289    delivery_count: u32,
1290) -> RedDBResult<()> {
1291    remove_meta_rows(store, |row| {
1292        row_text(row, "kind").as_deref() == Some("queue_pending")
1293            && row_text(row, "queue").as_deref() == Some(queue)
1294            && row_text(row, "group").as_deref() == Some(group)
1295            && row_u64(row, "message_id") == Some(message_id.raw())
1296    });
1297
1298    let mut fields = HashMap::new();
1299    fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1300    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1301    fields.insert("group".to_string(), Value::text(group.to_string()));
1302    fields.insert(
1303        "message_id".to_string(),
1304        Value::UnsignedInteger(message_id.raw()),
1305    );
1306    fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1307    fields.insert(
1308        "delivered_at_ns".to_string(),
1309        Value::UnsignedInteger(delivered_at_ns),
1310    );
1311    fields.insert(
1312        "delivery_count".to_string(),
1313        Value::UnsignedInteger(u64::from(delivery_count)),
1314    );
1315    insert_meta_row(store, fields)
1316}
1317
1318pub(super) fn require_pending_entry(
1319    store: &UnifiedStore,
1320    queue: &str,
1321    group: &str,
1322    message_id: EntityId,
1323) -> RedDBResult<QueuePendingEntry> {
1324    load_pending_entries(store, queue, Some(group), Some(message_id))?
1325        .into_iter()
1326        .next()
1327        .ok_or_else(|| {
1328            RedDBError::NotFound(format!(
1329                "message '{}' is not pending in group '{}' on queue '{}'",
1330                message_id.raw(),
1331                group,
1332                queue
1333            ))
1334        })
1335}
1336
1337pub(super) fn load_ack_entries(
1338    store: &UnifiedStore,
1339    queue: &str,
1340    group: Option<&str>,
1341    message_id: Option<EntityId>,
1342) -> RedDBResult<Vec<QueueAckEntry>> {
1343    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1344        return Ok(Vec::new());
1345    };
1346    Ok(manager
1347        .query_all(|entity| {
1348            entity.data.as_row().is_some_and(|row| {
1349                row_text(row, "kind").as_deref() == Some("queue_ack")
1350                    && row_text(row, "queue").as_deref() == Some(queue)
1351                    && group
1352                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1353                        .unwrap_or(true)
1354                    && message_id
1355                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1356                        .unwrap_or(true)
1357            })
1358        })
1359        .into_iter()
1360        .filter_map(|entity| {
1361            let row = entity.data.as_row()?;
1362            Some(QueueAckEntry {
1363                entity_id: entity.id,
1364                group: row_text(row, "group")?,
1365                message_id: EntityId::new(row_u64(row, "message_id")?),
1366            })
1367        })
1368        .collect())
1369}
1370
1371pub(super) fn save_queue_ack(
1372    store: &UnifiedStore,
1373    queue: &str,
1374    group: &str,
1375    message_id: EntityId,
1376) -> RedDBResult<()> {
1377    let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1378    if !existing.is_empty() {
1379        return Ok(());
1380    }
1381
1382    let mut fields = HashMap::new();
1383    fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1384    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1385    fields.insert("group".to_string(), Value::text(group.to_string()));
1386    fields.insert(
1387        "message_id".to_string(),
1388        Value::UnsignedInteger(message_id.raw()),
1389    );
1390    fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1391    insert_meta_row(store, fields)
1392}
1393
1394pub(super) fn queue_message_completed_for_all_groups(
1395    store: &UnifiedStore,
1396    queue: &str,
1397    message_id: EntityId,
1398) -> RedDBResult<bool> {
1399    let groups = load_queue_groups(store, queue)?;
1400    let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1401    if !pending.is_empty() {
1402        return Ok(false);
1403    }
1404    if groups.is_empty() {
1405        return Ok(true);
1406    }
1407
1408    let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1409        .into_iter()
1410        .map(|entry| entry.group)
1411        .collect::<HashSet<_>>();
1412    Ok(groups
1413        .into_iter()
1414        .all(|group| acked_groups.contains(&group.group)))
1415}
1416
1417fn load_queue_message_views(
1418    store: &UnifiedStore,
1419    queue: &str,
1420) -> RedDBResult<Vec<QueueMessageView>> {
1421    load_queue_message_views_with_runtime(None, store, queue)
1422}
1423
1424/// Kind-aware queue scan (Phase 2.5.5 RLS universal). When the
1425/// caller has a `RedDBRuntime` reference, the gate also applies
1426/// any `CREATE POLICY ... ON MESSAGES OF <queue>` predicate. In
1427/// autocommit / embedded paths that only have the raw store (e.g.
1428/// purge loops) we skip RLS because there's no session identity
1429/// to match against.
1430pub(super) fn load_queue_message_views_with_runtime(
1431    runtime: Option<&RedDBRuntime>,
1432    store: &UnifiedStore,
1433    queue: &str,
1434) -> RedDBResult<Vec<QueueMessageView>> {
1435    let manager = store
1436        .get_collection(queue)
1437        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1438    // Phase 1.2 MVCC universal: capture before parallel scan. Messages
1439    // inserted by another connection's open txn stay invisible to
1440    // consumers until that txn commits (prevents phantom POPs).
1441    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1442    let rls_filter = runtime.and_then(|rt| {
1443        crate::runtime::impl_core::rls_policy_filter_for_kind(
1444            rt,
1445            queue,
1446            crate::storage::query::ast::PolicyAction::Select,
1447            crate::storage::query::ast::PolicyTargetKind::Messages,
1448        )
1449    });
1450    let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1451        && rls_filter.is_none()
1452        && runtime.is_some();
1453    if rls_enabled_but_denied {
1454        // RLS on + no Messages policy for this role = deny-default.
1455        return Ok(Vec::new());
1456    }
1457    let filter_arc = rls_filter.map(std::sync::Arc::new);
1458    let rt_arc = runtime;
1459    Ok(manager
1460        .query_all(move |entity| {
1461            if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1462                return false;
1463            }
1464            if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1465                return false;
1466            }
1467            if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1468                return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1469                    Some(&rt.inner.db),
1470                    entity,
1471                    filter,
1472                    queue,
1473                    queue,
1474                );
1475            }
1476            true
1477        })
1478        .into_iter()
1479        .filter_map(queue_message_view_from_entity)
1480        .collect())
1481}
1482
1483fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1484    let (position, _) = match &entity.kind {
1485        EntityKind::QueueMessage { position, queue } => (*position, queue),
1486        _ => return None,
1487    };
1488    let data = match entity.data {
1489        EntityData::QueueMessage(data) => data,
1490        _ => return None,
1491    };
1492    Some(QueueMessageView {
1493        id: entity.id,
1494        position,
1495        priority: data.priority.unwrap_or(0),
1496        payload: data.payload,
1497        attempts: data.attempts,
1498        max_attempts: data.max_attempts,
1499        enqueued_at_ns: data.enqueued_at_ns,
1500    })
1501}
1502
1503fn insert_moved_queue_message(
1504    store: &UnifiedStore,
1505    queue: &str,
1506    config: &QueueRuntimeConfig,
1507    message: &QueueMessageView,
1508) -> RedDBResult<EntityId> {
1509    let position = next_queue_position(store, queue, QueueSide::Right)?;
1510    let entity = UnifiedEntity::new(
1511        EntityId::new(0),
1512        EntityKind::QueueMessage {
1513            queue: queue.to_string(),
1514            position,
1515        },
1516        EntityData::QueueMessage(QueueMessageData {
1517            payload: message.payload.clone(),
1518            priority: if config.priority {
1519                Some(message.priority)
1520            } else {
1521                None
1522            },
1523            enqueued_at_ns: message.enqueued_at_ns,
1524            attempts: message.attempts,
1525            max_attempts: message.max_attempts,
1526            acked: false,
1527        }),
1528    );
1529    let id = store
1530        .insert_auto(queue, entity)
1531        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1532    if let Some(ttl_ms) = config.ttl_ms {
1533        store
1534            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1535            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1536    }
1537    Ok(id)
1538}
1539
1540fn queue_projection_default_columns() -> Vec<String> {
1541    [
1542        "id",
1543        "payload",
1544        "priority",
1545        "attempts",
1546        "last_error",
1547        "enqueued_at",
1548        "available_at",
1549        "dlq",
1550        "tenant",
1551    ]
1552    .into_iter()
1553    .map(str::to_string)
1554    .collect()
1555}
1556
1557fn queue_projection_record(
1558    columns: &[String],
1559    message: &QueueMessageView,
1560    dlq: bool,
1561) -> RedDBResult<UnifiedRecord> {
1562    let mut record = UnifiedRecord::new();
1563    for column in columns {
1564        let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
1565            RedDBError::Query(format!("unknown queue projection column '{}'", column))
1566        })?;
1567        record.set(column, value);
1568    }
1569    Ok(record)
1570}
1571
1572fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
1573    match column {
1574        "id" => Some(Value::text(message_id_string(message.id))),
1575        "payload" => Some(message.payload.clone()),
1576        "priority" => Some(Value::Integer(i64::from(message.priority))),
1577        "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
1578        "last_error" => Some(Value::Null),
1579        "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1580        "available_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1581        "dlq" => Some(Value::Boolean(dlq)),
1582        "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
1583        _ => None,
1584    }
1585}
1586
1587fn queue_message_tenant(payload: &Value) -> Option<Value> {
1588    let Value::Json(bytes) = payload else {
1589        return None;
1590    };
1591    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1592    json.get("tenant")
1593        .and_then(crate::json::Value::as_str)
1594        .map(|tenant| Value::text(tenant.to_string()))
1595}
1596
1597fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
1598    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1599        return false;
1600    };
1601    !manager
1602        .query_all(|entity| {
1603            entity.data.as_row().is_some_and(|row| {
1604                row_text(row, "kind").as_deref() == Some("queue_config")
1605                    && row_text(row, "dlq").as_deref() == Some(queue)
1606            })
1607        })
1608        .is_empty()
1609}
1610
1611fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
1612    match filter {
1613        Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
1614            .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
1615        Filter::CompareFields { left, op, right } => {
1616            match (
1617                queue_filter_field_value(message, dlq, left),
1618                queue_filter_field_value(message, dlq, right),
1619            ) {
1620                (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
1621                _ => false,
1622            }
1623        }
1624        Filter::And(left, right) => {
1625            queue_message_matches_filter(message, dlq, left)
1626                && queue_message_matches_filter(message, dlq, right)
1627        }
1628        Filter::Or(left, right) => {
1629            queue_message_matches_filter(message, dlq, left)
1630                || queue_message_matches_filter(message, dlq, right)
1631        }
1632        Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
1633        Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
1634            .is_none_or(|value| matches!(value, Value::Null)),
1635        Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
1636            .is_some_and(|value| !matches!(value, Value::Null)),
1637        Filter::In { field, values } => {
1638            queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
1639                values
1640                    .iter()
1641                    .any(|value| queue_values_equal(&candidate, value))
1642            })
1643        }
1644        Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
1645            .is_some_and(|candidate| {
1646                queue_compare_values(&candidate, low, CompareOp::Ge)
1647                    && queue_compare_values(&candidate, high, CompareOp::Le)
1648            }),
1649        Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
1650            .is_some_and(|value| queue_like_matches(&value, pattern)),
1651        Filter::StartsWith { field, prefix } => {
1652            queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
1653        }
1654        Filter::EndsWith { field, suffix } => {
1655            queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
1656        }
1657        Filter::Contains { field, substring } => {
1658            queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
1659        }
1660        Filter::CompareExpr { .. } => false,
1661    }
1662}
1663
1664fn queue_filter_field_value(
1665    message: &QueueMessageView,
1666    dlq: bool,
1667    field: &FieldRef,
1668) -> Option<Value> {
1669    match field {
1670        FieldRef::TableColumn { table, column } if table.is_empty() => {
1671            queue_projection_value(message, dlq, column)
1672                .or_else(|| queue_payload_field_value(&message.payload, column))
1673        }
1674        FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
1675            .or_else(|| queue_payload_field_value(&message.payload, column)),
1676        _ => None,
1677    }
1678}
1679
1680fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
1681    let Value::Json(bytes) = payload else {
1682        return None;
1683    };
1684    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1685    let value = json.get(field)?;
1686    json_value_to_schema_value(value)
1687}
1688
1689fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
1690    if matches!(value, crate::json::Value::Null) {
1691        Some(Value::Null)
1692    } else if let Some(value) = value.as_bool() {
1693        Some(Value::Boolean(value))
1694    } else if let Some(value) = value.as_i64() {
1695        Some(Value::Integer(value))
1696    } else if let Some(value) = value.as_u64() {
1697        Some(Value::UnsignedInteger(value))
1698    } else if let Some(value) = value.as_f64() {
1699        Some(Value::Float(value))
1700    } else if let Some(value) = value.as_str() {
1701        Some(Value::text(value.to_string()))
1702    } else {
1703        Some(Value::Json(value.to_string_compact().into_bytes()))
1704    }
1705}
1706
1707fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
1708    queue_filter_field_value(message, dlq, field).and_then(|value| match value {
1709        Value::Text(value) => Some(value.to_string()),
1710        Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
1711        Value::Integer(value) => Some(value.to_string()),
1712        Value::UnsignedInteger(value) => Some(value.to_string()),
1713        Value::Float(value) => Some(value.to_string()),
1714        Value::Boolean(value) => Some(value.to_string()),
1715        _ => None,
1716    })
1717}
1718
1719fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
1720    match op {
1721        CompareOp::Eq => queue_values_equal(left, right),
1722        CompareOp::Ne => !queue_values_equal(left, right),
1723        CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
1724        CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
1725        CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
1726        CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
1727    }
1728}
1729
1730fn queue_values_equal(left: &Value, right: &Value) -> bool {
1731    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1732        return (left - right).abs() < f64::EPSILON;
1733    }
1734    match (left, right) {
1735        (Value::Text(left), Value::Text(right)) => left == right,
1736        (Value::Boolean(left), Value::Boolean(right)) => left == right,
1737        _ => left == right,
1738    }
1739}
1740
1741fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
1742    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1743        return left.partial_cmp(&right);
1744    }
1745    match (left, right) {
1746        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1747        _ => None,
1748    }
1749}
1750
1751fn queue_value_number(value: &Value) -> Option<f64> {
1752    match value {
1753        Value::Integer(value) => Some(*value as f64),
1754        Value::UnsignedInteger(value) => Some(*value as f64),
1755        Value::Float(value) => Some(*value),
1756        Value::Text(value) => value.parse().ok(),
1757        _ => None,
1758    }
1759}
1760
1761fn queue_like_matches(value: &str, pattern: &str) -> bool {
1762    if pattern == "%" {
1763        return true;
1764    }
1765    let starts_wild = pattern.starts_with('%');
1766    let ends_wild = pattern.ends_with('%');
1767    let needle = pattern.trim_matches('%');
1768    match (starts_wild, ends_wild) {
1769        (true, true) => value.contains(needle),
1770        (true, false) => value.ends_with(needle),
1771        (false, true) => value.starts_with(needle),
1772        (false, false) => value == needle,
1773    }
1774}
1775
1776pub(super) fn queue_message_view_by_id(
1777    store: &UnifiedStore,
1778    queue: &str,
1779    message_id: EntityId,
1780) -> RedDBResult<Option<QueueMessageView>> {
1781    let manager = queue_manager(store, queue)?;
1782    Ok(manager
1783        .get(message_id)
1784        .and_then(queue_message_view_from_entity))
1785}
1786
1787pub(super) fn sort_queue_messages(
1788    messages: &mut [QueueMessageView],
1789    config: &QueueRuntimeConfig,
1790    side: QueueSide,
1791) {
1792    messages.sort_by(|left, right| {
1793        if config.priority {
1794            right
1795                .priority
1796                .cmp(&left.priority)
1797                .then_with(|| match side {
1798                    QueueSide::Left => left.position.cmp(&right.position),
1799                    QueueSide::Right => right.position.cmp(&left.position),
1800                })
1801                .then_with(|| left.id.raw().cmp(&right.id.raw()))
1802        } else {
1803            match side {
1804                QueueSide::Left => left.position.cmp(&right.position),
1805                QueueSide::Right => right.position.cmp(&left.position),
1806            }
1807            .then_with(|| left.id.raw().cmp(&right.id.raw()))
1808        }
1809    });
1810}
1811
1812pub(super) fn next_queue_position(
1813    store: &UnifiedStore,
1814    queue: &str,
1815    side: QueueSide,
1816) -> RedDBResult<u64> {
1817    let messages = load_queue_message_views(store, queue)?;
1818    if messages.is_empty() {
1819        return Ok(QUEUE_POSITION_CENTER);
1820    }
1821    match side {
1822        QueueSide::Left => Ok(messages
1823            .iter()
1824            .map(|message| message.position)
1825            .min()
1826            .unwrap_or(QUEUE_POSITION_CENTER)
1827            .saturating_sub(1)),
1828        QueueSide::Right => Ok(messages
1829            .iter()
1830            .map(|message| message.position)
1831            .max()
1832            .unwrap_or(QUEUE_POSITION_CENTER)
1833            .saturating_add(1)),
1834    }
1835}
1836
1837pub(super) fn increment_queue_attempts(
1838    store: &UnifiedStore,
1839    queue: &str,
1840    message_id: EntityId,
1841) -> RedDBResult<u32> {
1842    let manager = queue_manager(store, queue)?;
1843    let mut entity = manager
1844        .get(message_id)
1845        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1846    match &mut entity.data {
1847        EntityData::QueueMessage(message) => {
1848            message.attempts = message.attempts.saturating_add(1);
1849            let attempts = message.attempts;
1850            manager
1851                .update(entity)
1852                .map_err(|err| RedDBError::Internal(err.to_string()))?;
1853            Ok(attempts)
1854        }
1855        _ => Err(RedDBError::Query(format!(
1856            "entity '{}' is not a queue message",
1857            message_id.raw()
1858        ))),
1859    }
1860}
1861
1862pub(super) fn queue_message_attempts(
1863    store: &UnifiedStore,
1864    queue: &str,
1865    message_id: EntityId,
1866) -> RedDBResult<u32> {
1867    Ok(queue_message_data(store, queue, message_id)?.attempts)
1868}
1869
1870pub(super) fn queue_message_max_attempts(
1871    store: &UnifiedStore,
1872    queue: &str,
1873    message_id: EntityId,
1874) -> RedDBResult<u32> {
1875    Ok(queue_message_data(store, queue, message_id)?.max_attempts)
1876}
1877
1878pub(super) fn queue_message_payload(
1879    store: &UnifiedStore,
1880    queue: &str,
1881    message_id: EntityId,
1882) -> RedDBResult<Value> {
1883    Ok(queue_message_data(store, queue, message_id)?.payload)
1884}
1885
1886pub(super) fn queue_message_pending_any(
1887    store: &UnifiedStore,
1888    queue: &str,
1889    message_id: EntityId,
1890) -> RedDBResult<bool> {
1891    Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
1892}
1893
1894pub(super) fn queue_message_pending_for_group(
1895    store: &UnifiedStore,
1896    queue: &str,
1897    group: &str,
1898    message_id: EntityId,
1899) -> RedDBResult<bool> {
1900    Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1901}
1902
1903pub(super) fn queue_message_acked_for_group(
1904    store: &UnifiedStore,
1905    queue: &str,
1906    group: &str,
1907    message_id: EntityId,
1908) -> RedDBResult<bool> {
1909    Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1910}
1911
1912fn queue_manager(
1913    store: &UnifiedStore,
1914    queue: &str,
1915) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
1916    store
1917        .get_collection(queue)
1918        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
1919}
1920
1921pub(super) fn queue_message_data(
1922    store: &UnifiedStore,
1923    queue: &str,
1924    message_id: EntityId,
1925) -> RedDBResult<QueueMessageData> {
1926    let manager = queue_manager(store, queue)?;
1927    let entity = manager
1928        .get(message_id)
1929        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1930    match entity.data {
1931        EntityData::QueueMessage(message) => Ok(message),
1932        _ => Err(RedDBError::Query(format!(
1933            "entity '{}' is not a queue message",
1934            message_id.raw()
1935        ))),
1936    }
1937}
1938
1939fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
1940    let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
1941    store
1942        .insert_auto(
1943            QUEUE_META_COLLECTION,
1944            UnifiedEntity::new(
1945                EntityId::new(0),
1946                EntityKind::TableRow {
1947                    table: Arc::from(QUEUE_META_COLLECTION),
1948                    row_id: 0,
1949                },
1950                EntityData::Row(RowData {
1951                    columns: Vec::new(),
1952                    named: Some(fields),
1953                    schema: None,
1954                }),
1955            ),
1956        )
1957        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1958    Ok(())
1959}
1960
1961pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
1962    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1963        return;
1964    };
1965    let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
1966    for row in rows {
1967        let _ = store.delete(QUEUE_META_COLLECTION, row.id);
1968    }
1969}
1970
1971pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
1972    let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
1973}
1974
1975fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
1976    format!("{queue}:{}", message_id.raw())
1977}
1978
1979pub(super) fn queue_message_lock_handle(
1980    runtime: &RedDBRuntime,
1981    queue: &str,
1982    message_id: EntityId,
1983) -> Arc<parking_lot::Mutex<()>> {
1984    let key = queue_message_lock_key(queue, message_id);
1985    if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
1986        return lock;
1987    }
1988
1989    let mut locks = runtime.inner.queue_message_locks.write();
1990    locks
1991        .entry(key)
1992        .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
1993        .clone()
1994}
1995
1996pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
1997    runtime
1998        .inner
1999        .queue_message_locks
2000        .write()
2001        .remove(&queue_message_lock_key(queue, message_id));
2002}
2003
2004fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2005    let raw = value.strip_prefix('e').unwrap_or(value);
2006    raw.parse::<u64>()
2007        .map(EntityId::new)
2008        .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2009}
2010
2011fn message_id_string(message_id: EntityId) -> String {
2012    message_id.raw().to_string()
2013}
2014
2015pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2016    match row.get_field(field)?.clone() {
2017        Value::Text(value) => Some(value.to_string()),
2018        Value::NodeRef(value) => Some(value),
2019        Value::EdgeRef(value) => Some(value),
2020        Value::TableRef(value) => Some(value),
2021        _ => None,
2022    }
2023}
2024
2025pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2026    match row.get_field(field)?.clone() {
2027        Value::UnsignedInteger(value) => Some(value),
2028        Value::Integer(value) if value >= 0 => Some(value as u64),
2029        Value::Float(value) if value >= 0.0 => Some(value as u64),
2030        Value::Text(value) => value.parse().ok(),
2031        _ => None,
2032    }
2033}
2034
2035fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2036    match row.get_field(field)?.clone() {
2037        Value::Boolean(value) => Some(value),
2038        Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2039            "true" => Some(true),
2040            "false" => Some(false),
2041            _ => None,
2042        },
2043        _ => None,
2044    }
2045}
2046
2047fn queue_collection_contract(
2048    name: &str,
2049    priority: bool,
2050    ttl_ms: Option<u64>,
2051) -> crate::physical::CollectionContract {
2052    let now = current_unix_ms();
2053    let mut context_index_fields = Vec::new();
2054    if priority {
2055        context_index_fields.push("priority".to_string());
2056    }
2057
2058    crate::physical::CollectionContract {
2059        name: name.to_string(),
2060        declared_model: crate::catalog::CollectionModel::Queue,
2061        schema_mode: crate::catalog::SchemaMode::Dynamic,
2062        origin: crate::physical::ContractOrigin::Explicit,
2063        version: 1,
2064        created_at_unix_ms: now,
2065        updated_at_unix_ms: now,
2066        default_ttl_ms: ttl_ms,
2067        vector_dimension: None,
2068        vector_metric: None,
2069        context_index_fields,
2070        declared_columns: Vec::new(),
2071        table_def: None,
2072        timestamps_enabled: false,
2073        context_index_enabled: false,
2074        metrics_raw_retention_ms: None,
2075        metrics_rollup_policies: Vec::new(),
2076        metrics_tenant_identity: None,
2077        metrics_namespace: None,
2078        // Queues manipulate messages via push/pop/ack — the row DML
2079        // paths never apply. Flag it as append_only so inadvertent
2080        // `UPDATE/DELETE FROM queue_name` statements fail loudly.
2081        append_only: true,
2082        subscriptions: Vec::new(),
2083    }
2084}
2085
2086fn current_unix_ms() -> u128 {
2087    std::time::SystemTime::now()
2088        .duration_since(std::time::UNIX_EPOCH)
2089        .unwrap_or_default()
2090        .as_millis()
2091}
2092
2093pub(super) fn now_ns() -> u64 {
2094    std::time::SystemTime::now()
2095        .duration_since(std::time::UNIX_EPOCH)
2096        .unwrap_or_default()
2097        .as_nanos() as u64
2098}
2099
2100pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2101    Metadata::with_fields(
2102        [(
2103            "_ttl_ms".to_string(),
2104            if ttl_ms <= i64::MAX as u64 {
2105                MetadataValue::Int(ttl_ms as i64)
2106            } else {
2107                MetadataValue::Timestamp(ttl_ms)
2108            },
2109        )]
2110        .into_iter()
2111        .collect(),
2112    )
2113}
2114
2115/// Rough payload byte estimate for outbox watermark tracking.
2116fn estimate_payload_bytes(payload: &Value) -> u64 {
2117    match payload {
2118        Value::Json(v) => v.len() as u64,
2119        Value::Text(s) => s.len() as u64,
2120        _ => 64,
2121    }
2122}