Skip to main content

reddb_server/runtime/
impl_queue.rs

1//! Queue DDL and command execution
2
3use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::storage::queue::QueueMode;
9use crate::storage::unified::entity::{QueueMessageData, RowData};
10use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
11
12use super::*;
13
14// ---------------------------------------------------------------------------
15// Outbox metrics (exposed via /metrics)
16// ---------------------------------------------------------------------------
17
18/// Total event push attempts that failed (queue full or other error) and
19/// triggered DLQ routing.
20pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
21
22/// Total events routed to the dead-letter queue.
23pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
24
25/// Total events successfully enqueued to their target queue.
26pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
27
28/// Warn when total estimated outbox payload bytes exceed this value (1 GiB).
29const OUTBOX_WARN_BYTES: u64 = 1 << 30;
30
31/// Route all new events to DLQ when estimated outbox exceeds this value (10 GiB).
32const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
33
34/// Running estimate of bytes pending in event queues (approximate; not decremented on consume).
35static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
36
37const QUEUE_META_COLLECTION: &str = "red_queue_meta";
38const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
39const WORK_DEFAULT_GROUP: &str = "_work_default";
40const FANOUT_GROUP_PREFIX: &str = "_fanout_";
41
42#[derive(Debug, Clone)]
43pub(super) struct QueueRuntimeConfig {
44    pub(super) mode: QueueMode,
45    pub(super) priority: bool,
46    pub(super) max_size: Option<usize>,
47    pub(super) ttl_ms: Option<u64>,
48    pub(super) dlq: Option<String>,
49    pub(super) max_attempts: u32,
50    pub(super) lock_deadline_ms: u64,
51    pub(super) in_flight_cap_per_group: u32,
52}
53
54#[derive(Debug, Clone)]
55struct QueueGroupEntry {
56    entity_id: EntityId,
57    group: String,
58}
59
60#[derive(Debug, Clone)]
61pub(super) struct QueuePendingEntry {
62    pub(super) entity_id: EntityId,
63    group: String,
64    pub(super) message_id: EntityId,
65    consumer: String,
66    pub(super) delivered_at_ns: u64,
67    pub(super) delivery_count: u32,
68}
69
70#[derive(Debug, Clone)]
71pub(super) struct QueueAckEntry {
72    entity_id: EntityId,
73    group: String,
74    pub(super) message_id: EntityId,
75}
76
77#[derive(Debug, Clone)]
78pub(super) struct QueueMessageView {
79    pub(super) id: EntityId,
80    position: u64,
81    priority: i32,
82    pub(super) payload: Value,
83    attempts: u32,
84    pub(super) max_attempts: u32,
85    enqueued_at_ns: u64,
86}
87
88impl RedDBRuntime {
89    pub(crate) fn enqueue_event_payload(
90        &self,
91        queue: &str,
92        payload: Value,
93    ) -> RedDBResult<EntityId> {
94        let store = self.inner.db.store();
95        // Auto-create the queue if it does not exist yet.
96        if store.get_collection(queue).is_none() {
97            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
98        }
99
100        // Estimate payload bytes for outbox watermark checks.
101        let payload_bytes = estimate_payload_bytes(&payload);
102        let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
103
104        // Hard limit: route directly to DLQ without even trying.
105        if outbox_bytes > OUTBOX_MAX_BYTES {
106            OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
107            EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
108            return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
109        }
110
111        // Soft limit: warn once per crossing.
112        if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
113            tracing::warn!(
114                outbox_bytes,
115                warn_threshold = OUTBOX_WARN_BYTES,
116                "event outbox approaching capacity warning threshold"
117            );
118            crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
119                queue: queue.to_string(),
120                dlq: format!("{queue}_outbox_dlq"),
121                reason: "outbox_warn_bytes_exceeded".to_string(),
122            }
123            .emit_global();
124        }
125
126        let config = load_queue_config(store.as_ref(), queue);
127
128        // If the target queue has a max_size and is full, route to DLQ.
129        if let Some(max_size) = config.max_size {
130            let current_len = load_queue_message_views(store.as_ref(), queue)
131                .unwrap_or_default()
132                .len();
133            if current_len >= max_size {
134                OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
135                EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
136                return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
137            }
138            // Warn at 80% capacity.
139            if current_len * 10 >= max_size * 8 {
140                tracing::warn!(
141                    queue = %queue,
142                    size = current_len,
143                    max = max_size,
144                    "event target queue near capacity"
145                );
146            }
147        }
148
149        let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
150        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
151        Ok(id)
152    }
153
154    /// Route a failed event to `<queue>_outbox_dlq`, auto-creating it if needed.
155    fn route_event_to_outbox_dlq(
156        &self,
157        queue: &str,
158        payload: Value,
159        reason: &str,
160    ) -> RedDBResult<EntityId> {
161        let dlq_name = format!("{queue}_outbox_dlq");
162        EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
163
164        crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
165            queue: queue.to_string(),
166            dlq: dlq_name.clone(),
167            reason: reason.to_string(),
168        }
169        .emit_global();
170
171        let store = self.inner.db.store();
172        if store.get_collection(&dlq_name).is_none() {
173            crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
174        }
175        let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
176        let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
177        EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
178        Ok(id)
179    }
180
181    /// Low-level event message insert — no size checks, no DLQ routing.
182    fn enqueue_event_payload_raw(
183        &self,
184        store: &UnifiedStore,
185        queue: &str,
186        config: &QueueRuntimeConfig,
187        payload: Value,
188    ) -> RedDBResult<EntityId> {
189        let position = next_queue_position(store, queue, QueueSide::Right)?;
190        let mut entity = UnifiedEntity::new(
191            EntityId::new(0),
192            EntityKind::QueueMessage {
193                queue: queue.to_string(),
194                position,
195            },
196            EntityData::QueueMessage(QueueMessageData {
197                payload,
198                priority: None,
199                enqueued_at_ns: now_ns(),
200                attempts: 0,
201                max_attempts: config.max_attempts,
202                acked: false,
203            }),
204        );
205        if let Some(xid) = self.current_xid() {
206            entity.set_xmin(xid);
207        }
208        let id = store
209            .insert_auto(queue, entity)
210            .map_err(|err| RedDBError::Internal(err.to_string()))?;
211        if let Some(ttl_ms) = config.ttl_ms {
212            store
213                .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
214                .map_err(|err| RedDBError::Internal(err.to_string()))?;
215        }
216        self.invalidate_result_cache_for_table(queue);
217        Ok(id)
218    }
219
220    pub fn execute_create_queue(
221        &self,
222        raw_query: &str,
223        query: &CreateQueueQuery,
224    ) -> RedDBResult<RuntimeQueryResult> {
225        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
226        if query.dlq.as_deref() == Some(query.name.as_str()) {
227            return Err(RedDBError::Query(
228                "dead-letter queue must be different from the source queue".to_string(),
229            ));
230        }
231
232        let store = self.inner.db.store();
233        let exists = store.get_collection(&query.name).is_some();
234        if exists {
235            if query.if_not_exists {
236                return Ok(RuntimeQueryResult::ok_message(
237                    raw_query.to_string(),
238                    &format!("queue '{}' already exists", query.name),
239                    "create",
240                ));
241            }
242            return Err(RedDBError::Query(format!(
243                "queue '{}' already exists",
244                query.name
245            )));
246        }
247
248        store
249            .create_collection(&query.name)
250            .map_err(|err| RedDBError::Internal(err.to_string()))?;
251        if let Some(ttl_ms) = query.ttl_ms {
252            self.inner
253                .db
254                .set_collection_default_ttl_ms(&query.name, ttl_ms);
255        }
256        self.inner
257            .db
258            .save_collection_contract(queue_collection_contract(
259                &query.name,
260                query.priority,
261                query.ttl_ms,
262            ))
263            .map_err(|err| RedDBError::Internal(err.to_string()))?;
264        save_queue_config(
265            store.as_ref(),
266            &query.name,
267            &QueueRuntimeConfig {
268                mode: query.mode,
269                priority: query.priority,
270                max_size: query.max_size,
271                ttl_ms: query.ttl_ms,
272                dlq: query.dlq.clone(),
273                max_attempts: query.max_attempts,
274                lock_deadline_ms: query.lock_deadline_ms,
275                in_flight_cap_per_group: query.in_flight_cap_per_group,
276            },
277        )?;
278
279        if let Some(dlq) = &query.dlq {
280            if store.get_collection(dlq).is_none() {
281                store
282                    .create_collection(dlq)
283                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
284                self.inner
285                    .db
286                    .save_collection_contract(queue_collection_contract(dlq, false, None))
287                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
288            }
289        }
290
291        self.invalidate_result_cache();
292        self.inner
293            .db
294            .persist_metadata()
295            .map_err(|err| RedDBError::Internal(err.to_string()))?;
296        // Issue #120 — feed the queue into the schema-vocabulary so
297        // AskPipeline (#121) can resolve queue references. Queues
298        // have an opaque payload column, so we expose `payload` and
299        // (when configured) the DLQ partner as type-tag context.
300        let mut type_tags = Vec::new();
301        if let Some(dlq) = &query.dlq {
302            type_tags.push(format!("dlq:{}", dlq));
303        }
304        self.schema_vocabulary_apply(
305            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
306                collection: query.name.clone(),
307                columns: vec!["payload".to_string()],
308                type_tags,
309                description: None,
310            },
311        );
312
313        let mut msg = format!("queue '{}' created", query.name);
314        msg.push_str(&format!(" (mode={})", query.mode.as_str()));
315        if query.priority {
316            msg.push_str(" (priority)");
317        }
318        if let Some(max_size) = query.max_size {
319            msg.push_str(&format!(" (max_size={max_size})"));
320        }
321        if let Some(ttl_ms) = query.ttl_ms {
322            msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
323        }
324        if let Some(dlq) = &query.dlq {
325            msg.push_str(&format!(
326                " (dlq={dlq}, max_attempts={})",
327                query.max_attempts
328            ));
329        }
330
331        Ok(RuntimeQueryResult::ok_message(
332            raw_query.to_string(),
333            &msg,
334            "create",
335        ))
336    }
337
338    pub fn execute_alter_queue(
339        &self,
340        raw_query: &str,
341        query: &AlterQueueQuery,
342    ) -> RedDBResult<RuntimeQueryResult> {
343        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
344        let store = self.inner.db.store();
345        ensure_queue_exists(store.as_ref(), &query.name)?;
346
347        let mut config = load_queue_config(store.as_ref(), &query.name);
348        let mut summary: Vec<String> = Vec::new();
349
350        if let Some(new_mode) = query.mode {
351            let pending =
352                load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
353            if !pending.is_empty() {
354                tracing::warn!(
355                    queue = %query.name,
356                    pending_count = pending.len(),
357                    new_mode = %new_mode.as_str(),
358                    "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
359                     new reads use {}",
360                    pending.len(),
361                    new_mode.as_str(),
362                );
363            }
364            config.mode = new_mode;
365            summary.push(format!("mode={}", new_mode.as_str()));
366        }
367        if let Some(max_attempts) = query.max_attempts {
368            config.max_attempts = max_attempts;
369            summary.push(format!("max_attempts={max_attempts}"));
370        }
371        if let Some(lock_deadline_ms) = query.lock_deadline_ms {
372            config.lock_deadline_ms = lock_deadline_ms;
373            summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
374        }
375        if let Some(in_flight_cap) = query.in_flight_cap_per_group {
376            config.in_flight_cap_per_group = in_flight_cap;
377            summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
378        }
379        if let Some(dlq) = &query.dlq {
380            if dlq == &query.name {
381                return Err(RedDBError::Query(
382                    "dead-letter queue must be different from the source queue".to_string(),
383                ));
384            }
385            config.dlq = Some(dlq.clone());
386            summary.push(format!("dlq={dlq}"));
387        }
388
389        save_queue_config(store.as_ref(), &query.name, &config)?;
390
391        self.invalidate_result_cache();
392        self.inner
393            .db
394            .persist_metadata()
395            .map_err(|err| RedDBError::Internal(err.to_string()))?;
396
397        Ok(RuntimeQueryResult::ok_message(
398            raw_query.to_string(),
399            &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
400            "alter",
401        ))
402    }
403
404    pub fn execute_drop_queue(
405        &self,
406        raw_query: &str,
407        query: &DropQueueQuery,
408    ) -> RedDBResult<RuntimeQueryResult> {
409        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
410        let store = self.inner.db.store();
411        if super::impl_ddl::is_system_schema_name(&query.name) {
412            return Err(RedDBError::Query("system schema is read-only".to_string()));
413        }
414        if store.get_collection(&query.name).is_none() {
415            if query.if_exists {
416                return Ok(RuntimeQueryResult::ok_message(
417                    raw_query.to_string(),
418                    &format!("queue '{}' does not exist", query.name),
419                    "drop",
420                ));
421            }
422            return Err(RedDBError::NotFound(format!(
423                "queue '{}' not found",
424                query.name
425            )));
426        }
427        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
428            &query.name,
429            &self.inner.db.catalog_model_snapshot(),
430        )?;
431        crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
432            crate::catalog::CollectionModel::Queue,
433            actual,
434        )?;
435
436        store
437            .drop_collection(&query.name)
438            .map_err(|err| RedDBError::Internal(err.to_string()))?;
439        self.inner.db.clear_collection_default_ttl_ms(&query.name);
440        self.inner
441            .db
442            .remove_collection_contract(&query.name)
443            .map_err(|err| RedDBError::Internal(err.to_string()))?;
444        remove_queue_metadata(store.as_ref(), &query.name);
445        self.invalidate_result_cache();
446        self.inner
447            .db
448            .persist_metadata()
449            .map_err(|err| RedDBError::Internal(err.to_string()))?;
450        // Issue #120 — invalidate the schema-vocabulary entry.
451        self.schema_vocabulary_apply(
452            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
453                collection: query.name.clone(),
454            },
455        );
456
457        Ok(RuntimeQueryResult::ok_message(
458            raw_query.to_string(),
459            &format!("queue '{}' dropped", query.name),
460            "drop",
461        ))
462    }
463
464    pub fn execute_queue_command(
465        &self,
466        raw_query: &str,
467        cmd: &QueueCommand,
468    ) -> RedDBResult<RuntimeQueryResult> {
469        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
470        match cmd {
471            QueueCommand::Push {
472                queue,
473                value,
474                side,
475                priority,
476            } => {
477                let store = self.inner.db.store();
478                ensure_queue_exists(store.as_ref(), queue)?;
479                let config = load_queue_config(store.as_ref(), queue);
480                if priority.is_some() && !config.priority {
481                    return Err(RedDBError::Query(format!(
482                        "queue '{}' is not a priority queue",
483                        queue
484                    )));
485                }
486                if let Some(max_size) = config.max_size {
487                    let current_len =
488                        load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
489                            .len();
490                    if current_len >= max_size {
491                        return Err(RedDBError::Query(format!(
492                            "queue '{}' is full (max_size={max_size})",
493                            queue
494                        )));
495                    }
496                }
497
498                let position = next_queue_position(store.as_ref(), queue, *side)?;
499                let mut entity = UnifiedEntity::new(
500                    EntityId::new(0),
501                    EntityKind::QueueMessage {
502                        queue: queue.clone(),
503                        position,
504                    },
505                    EntityData::QueueMessage(QueueMessageData {
506                        payload: value.clone(),
507                        priority: if config.priority { *priority } else { None },
508                        enqueued_at_ns: now_ns(),
509                        attempts: 0,
510                        max_attempts: config.max_attempts,
511                        acked: false,
512                    }),
513                );
514                // Phase 1.1 MVCC universal: stamp xmin so other
515                // connections don't see this message until COMMIT.
516                if let Some(xid) = self.current_xid() {
517                    entity.set_xmin(xid);
518                }
519                let id = store
520                    .insert_auto(queue, entity)
521                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
522                if let Some(ttl_ms) = config.ttl_ms {
523                    store
524                        .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
525                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
526                }
527                self.invalidate_result_cache();
528
529                let mut result = UnifiedResult::with_columns(vec![
530                    "message_id".into(),
531                    "side".into(),
532                    "queue".into(),
533                ]);
534                let mut record = UnifiedRecord::new();
535                record.set("message_id", Value::text(message_id_string(id)));
536                record.set(
537                    "side",
538                    Value::text(match side {
539                        QueueSide::Left => "left".to_string(),
540                        QueueSide::Right => "right".to_string(),
541                    }),
542                );
543                record.set("queue", Value::text(queue.clone()));
544                result.push(record);
545
546                Ok(RuntimeQueryResult {
547                    query: raw_query.to_string(),
548                    mode: QueryMode::Sql,
549                    statement: "queue_push",
550                    engine: "runtime-queue",
551                    result,
552                    affected_rows: 1,
553                    statement_type: "insert",
554                })
555            }
556            QueueCommand::Pop { queue, side, count } => {
557                let store = self.inner.db.store();
558                ensure_queue_exists(store.as_ref(), queue)?;
559                let popped = super::queue_delivery::pop_messages(
560                    self,
561                    store.as_ref(),
562                    queue,
563                    *side,
564                    *count,
565                )?;
566
567                let mut result =
568                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
569                for message in &popped {
570                    let mut record = UnifiedRecord::new();
571                    record.set(
572                        "message_id",
573                        Value::text(message_id_string(message.message_id)),
574                    );
575                    record.set("payload", message.payload.clone());
576                    result.push(record);
577                }
578                let popped_count = popped.len() as u64;
579                if popped_count > 0 {
580                    self.invalidate_result_cache();
581                }
582
583                Ok(RuntimeQueryResult {
584                    query: raw_query.to_string(),
585                    mode: QueryMode::Sql,
586                    statement: "queue_pop",
587                    engine: "runtime-queue",
588                    result,
589                    affected_rows: popped_count,
590                    statement_type: "delete",
591                })
592            }
593            QueueCommand::Peek { queue, count } => {
594                let store = self.inner.db.store();
595                ensure_queue_exists(store.as_ref(), queue)?;
596                let messages =
597                    super::queue_delivery::peek_messages(self, store.as_ref(), queue, *count)?;
598
599                let mut result =
600                    UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
601                for message in messages {
602                    let mut record = UnifiedRecord::new();
603                    record.set(
604                        "message_id",
605                        Value::text(message_id_string(message.message_id)),
606                    );
607                    record.set("payload", message.payload);
608                    result.push(record);
609                }
610
611                Ok(RuntimeQueryResult {
612                    query: raw_query.to_string(),
613                    mode: QueryMode::Sql,
614                    statement: "queue_peek",
615                    engine: "runtime-queue",
616                    result,
617                    affected_rows: 0,
618                    statement_type: "select",
619                })
620            }
621            QueueCommand::Len { queue } => {
622                let store = self.inner.db.store();
623                ensure_queue_exists(store.as_ref(), queue)?;
624                let count =
625                    load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
626                        as u64;
627                let mut result = UnifiedResult::with_columns(vec!["len".into()]);
628                let mut record = UnifiedRecord::new();
629                record.set("len", Value::UnsignedInteger(count));
630                result.push(record);
631
632                Ok(RuntimeQueryResult {
633                    query: raw_query.to_string(),
634                    mode: QueryMode::Sql,
635                    statement: "queue_len",
636                    engine: "runtime-queue",
637                    result,
638                    affected_rows: 0,
639                    statement_type: "select",
640                })
641            }
642            QueueCommand::Purge { queue } => {
643                let store = self.inner.db.store();
644                ensure_queue_exists(store.as_ref(), queue)?;
645                let count = super::queue_delivery::purge_messages(self, store.as_ref(), queue)?;
646                if count > 0 {
647                    self.invalidate_result_cache();
648                }
649
650                Ok(RuntimeQueryResult::ok_message(
651                    raw_query.to_string(),
652                    &format!("{count} messages purged from queue '{queue}'"),
653                    "delete",
654                ))
655            }
656            QueueCommand::GroupCreate { queue, group } => {
657                let store = self.inner.db.store();
658                ensure_queue_exists(store.as_ref(), queue)?;
659                if queue_group_exists(store.as_ref(), queue, group)? {
660                    return Ok(RuntimeQueryResult::ok_message(
661                        raw_query.to_string(),
662                        &format!(
663                            "consumer group '{}' already exists on queue '{}'",
664                            group, queue
665                        ),
666                        "create",
667                    ));
668                }
669                save_queue_group(store.as_ref(), queue, group)?;
670                self.invalidate_result_cache();
671
672                Ok(RuntimeQueryResult::ok_message(
673                    raw_query.to_string(),
674                    &format!("consumer group '{}' created on queue '{}'", group, queue),
675                    "create",
676                ))
677            }
678            QueueCommand::GroupRead {
679                queue,
680                group,
681                consumer,
682                count,
683            } => {
684                let store = self.inner.db.store();
685                ensure_queue_exists(store.as_ref(), queue)?;
686                let delivered = super::queue_delivery::read_messages(
687                    self,
688                    store.as_ref(),
689                    queue,
690                    group.as_deref(),
691                    consumer,
692                    *count,
693                )?;
694
695                let mut result = UnifiedResult::with_columns(vec![
696                    "message_id".into(),
697                    "payload".into(),
698                    "consumer".into(),
699                    "delivery_count".into(),
700                    "attempts".into(),
701                ]);
702
703                for message in delivered {
704                    let mut record = UnifiedRecord::new();
705                    record.set(
706                        "message_id",
707                        Value::text(message_id_string(message.message_id)),
708                    );
709                    record.set("payload", message.payload);
710                    record.set("consumer", Value::text(message.consumer));
711                    record.set(
712                        "delivery_count",
713                        Value::UnsignedInteger(u64::from(message.delivery_count)),
714                    );
715                    record.set(
716                        "attempts",
717                        Value::UnsignedInteger(u64::from(message.delivery_count)),
718                    );
719                    result.push(record);
720                }
721                if !result.records.is_empty() {
722                    self.invalidate_result_cache();
723                }
724
725                Ok(RuntimeQueryResult {
726                    query: raw_query.to_string(),
727                    mode: QueryMode::Sql,
728                    statement: "queue_group_read",
729                    engine: "runtime-queue",
730                    result,
731                    affected_rows: 0,
732                    statement_type: "select",
733                })
734            }
735            QueueCommand::Pending { queue, group } => {
736                let store = self.inner.db.store();
737                ensure_queue_exists(store.as_ref(), queue)?;
738                require_queue_group(store.as_ref(), queue, group)?;
739                let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
740                pending.sort_by_key(|entry| entry.delivered_at_ns);
741                let current_time_ns = now_ns();
742
743                let mut result = UnifiedResult::with_columns(vec![
744                    "message_id".into(),
745                    "consumer".into(),
746                    "delivered_at_ns".into(),
747                    "delivery_count".into(),
748                    "idle_ms".into(),
749                ]);
750                for entry in pending {
751                    let mut record = UnifiedRecord::new();
752                    record.set(
753                        "message_id",
754                        Value::text(message_id_string(entry.message_id)),
755                    );
756                    record.set("consumer", Value::text(entry.consumer));
757                    record.set(
758                        "delivered_at_ns",
759                        Value::UnsignedInteger(entry.delivered_at_ns),
760                    );
761                    record.set(
762                        "delivery_count",
763                        Value::UnsignedInteger(u64::from(entry.delivery_count)),
764                    );
765                    record.set(
766                        "idle_ms",
767                        Value::UnsignedInteger(
768                            current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
769                        ),
770                    );
771                    result.push(record);
772                }
773
774                Ok(RuntimeQueryResult {
775                    query: raw_query.to_string(),
776                    mode: QueryMode::Sql,
777                    statement: "queue_pending",
778                    engine: "runtime-queue",
779                    result,
780                    affected_rows: 0,
781                    statement_type: "select",
782                })
783            }
784            QueueCommand::Claim {
785                queue,
786                group,
787                consumer,
788                min_idle_ms,
789            } => {
790                let store = self.inner.db.store();
791                ensure_queue_exists(store.as_ref(), queue)?;
792                let delivered = super::queue_delivery::claim_messages(
793                    self,
794                    store.as_ref(),
795                    queue,
796                    group,
797                    consumer,
798                    *min_idle_ms,
799                )?;
800
801                let mut result = UnifiedResult::with_columns(vec![
802                    "message_id".into(),
803                    "payload".into(),
804                    "consumer".into(),
805                    "delivery_count".into(),
806                ]);
807
808                for message in delivered {
809                    let mut record = UnifiedRecord::new();
810                    record.set(
811                        "message_id",
812                        Value::text(message_id_string(message.message_id)),
813                    );
814                    record.set("payload", message.payload);
815                    record.set("consumer", Value::text(message.consumer));
816                    record.set(
817                        "delivery_count",
818                        Value::UnsignedInteger(u64::from(message.delivery_count)),
819                    );
820                    result.push(record);
821                }
822                if !result.records.is_empty() {
823                    self.invalidate_result_cache();
824                }
825                let affected_rows = result.records.len() as u64;
826
827                Ok(RuntimeQueryResult {
828                    query: raw_query.to_string(),
829                    mode: QueryMode::Sql,
830                    statement: "queue_claim",
831                    engine: "runtime-queue",
832                    result,
833                    affected_rows,
834                    statement_type: "update",
835                })
836            }
837            QueueCommand::Ack {
838                queue,
839                group,
840                message_id,
841                delivery_id,
842            } => {
843                let store = self.inner.db.store();
844                ensure_queue_exists(store.as_ref(), queue)?;
845                let (group_owned, message_entity) = resolve_ack_nack_handle(
846                    store.as_ref(),
847                    queue,
848                    group,
849                    message_id,
850                    delivery_id.as_deref(),
851                )?;
852                let group_ref = group_owned.as_str();
853                require_queue_group(store.as_ref(), queue, group_ref)?;
854                let config = load_queue_config(store.as_ref(), queue);
855                super::queue_delivery::ack_message(
856                    self,
857                    store.as_ref(),
858                    queue,
859                    group_ref,
860                    message_entity,
861                    &config,
862                )?;
863                self.invalidate_result_cache();
864
865                Ok(RuntimeQueryResult::ok_message(
866                    raw_query.to_string(),
867                    "message acknowledged",
868                    "update",
869                ))
870            }
871            QueueCommand::Nack {
872                queue,
873                group,
874                message_id,
875                delivery_id,
876            } => {
877                let store = self.inner.db.store();
878                ensure_queue_exists(store.as_ref(), queue)?;
879                let (group_owned, message_entity) = resolve_ack_nack_handle(
880                    store.as_ref(),
881                    queue,
882                    group,
883                    message_id,
884                    delivery_id.as_deref(),
885                )?;
886                let group_ref = group_owned.as_str();
887                require_queue_group(store.as_ref(), queue, group_ref)?;
888                let config = load_queue_config(store.as_ref(), queue);
889                let message = match super::queue_delivery::nack_message(
890                    self,
891                    store.as_ref(),
892                    queue,
893                    group_ref,
894                    message_entity,
895                    &config,
896                )? {
897                    super::queue_delivery::NackOutcome::Requeued => "message requeued".to_string(),
898                    super::queue_delivery::NackOutcome::MovedToDlq(dlq) => {
899                        format!("message moved to dead-letter queue '{}'", dlq)
900                    }
901                    super::queue_delivery::NackOutcome::Dropped => {
902                        "message dropped after max attempts".to_string()
903                    }
904                };
905                self.invalidate_result_cache();
906
907                Ok(RuntimeQueryResult::ok_message(
908                    raw_query.to_string(),
909                    &message,
910                    "update",
911                ))
912            }
913            QueueCommand::Move {
914                source,
915                destination,
916                filter,
917                limit,
918            } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
919        }
920    }
921
922    pub fn execute_queue_select(
923        &self,
924        raw_query: &str,
925        query: &QueueSelectQuery,
926    ) -> RedDBResult<RuntimeQueryResult> {
927        let store = self.inner.db.store();
928        ensure_queue_exists(store.as_ref(), &query.queue)?;
929        let config = load_queue_config(store.as_ref(), &query.queue);
930        let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
931        let columns = if query.columns.is_empty() {
932            queue_projection_default_columns()
933        } else {
934            query.columns.clone()
935        };
936
937        let mut messages =
938            load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
939        sort_queue_messages(&mut messages, &config, QueueSide::Left);
940
941        let mut result = UnifiedResult::with_columns(columns.clone());
942        for message in messages {
943            if query
944                .filter
945                .as_ref()
946                .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
947            {
948                continue;
949            }
950            let record = queue_projection_record(&columns, &message, dlq)?;
951            result.push(record);
952            if query
953                .limit
954                .is_some_and(|limit| result.records.len() >= limit as usize)
955            {
956                break;
957            }
958        }
959
960        Ok(RuntimeQueryResult {
961            query: raw_query.to_string(),
962            mode: QueryMode::Sql,
963            statement: "queue_select",
964            engine: "runtime-queue",
965            result,
966            affected_rows: 0,
967            statement_type: "select",
968        })
969    }
970
971    fn execute_queue_move(
972        &self,
973        raw_query: &str,
974        source: &str,
975        destination: &str,
976        filter: Option<&Filter>,
977        limit: usize,
978    ) -> RedDBResult<RuntimeQueryResult> {
979        if source == destination {
980            return Err(RedDBError::Query(
981                "QUEUE MOVE source and destination must be different".to_string(),
982            ));
983        }
984        let store = self.inner.db.store();
985        ensure_queue_exists(store.as_ref(), source)?;
986        ensure_queue_exists(store.as_ref(), destination)?;
987        let source_config = load_queue_config(store.as_ref(), source);
988        let destination_config = load_queue_config(store.as_ref(), destination);
989        let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
990
991        let mut messages =
992            load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
993        sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
994        let selected = messages
995            .into_iter()
996            .filter(|message| {
997                filter
998                    .map(|f| queue_message_matches_filter(message, source_dlq, f))
999                    .unwrap_or(true)
1000            })
1001            .take(limit)
1002            .collect::<Vec<_>>();
1003
1004        if let Some(max_size) = destination_config.max_size {
1005            let current_len =
1006                load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1007                    .len();
1008            if current_len + selected.len() > max_size {
1009                return Err(RedDBError::Query(format!(
1010                    "queue '{}' is full (max_size={max_size})",
1011                    destination
1012                )));
1013            }
1014        }
1015
1016        for message in &selected {
1017            let lock = queue_message_lock_handle(self, source, message.id);
1018            let Some(_guard) = lock.try_lock() else {
1019                return Err(RedDBError::Query(format!(
1020                    "message '{}' is locked on queue '{}'",
1021                    message.id.raw(),
1022                    source
1023                )));
1024            };
1025            if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1026                return Err(RedDBError::Query(format!(
1027                    "message '{}' is no longer available on queue '{}'",
1028                    message.id.raw(),
1029                    source
1030                )));
1031            }
1032        }
1033
1034        let mut inserted = Vec::new();
1035        for message in &selected {
1036            match insert_moved_queue_message(
1037                store.as_ref(),
1038                destination,
1039                &destination_config,
1040                message,
1041            ) {
1042                Ok(id) => inserted.push(id),
1043                Err(err) => {
1044                    for id in inserted {
1045                        let _ = store.delete(destination, id);
1046                    }
1047                    return Err(err);
1048                }
1049            }
1050        }
1051
1052        for message in &selected {
1053            super::queue_delivery::delete_message_with_state(
1054                Some(self),
1055                store.as_ref(),
1056                source,
1057                message.id,
1058            )?;
1059        }
1060        if !selected.is_empty() {
1061            self.invalidate_result_cache();
1062        }
1063
1064        let selected_count = selected.len() as u64;
1065        self.audit_log().record_event(
1066            AuditEvent::builder("queue/move")
1067                .source(AuditAuthSource::System)
1068                .outcome(Outcome::Success)
1069                .resource(format!("queue:{source}->{destination}"))
1070                .fields([
1071                    AuditFieldEscaper::field("source", source),
1072                    AuditFieldEscaper::field("destination", destination),
1073                    AuditFieldEscaper::field("selected", selected_count),
1074                    AuditFieldEscaper::field("committed", selected_count),
1075                ])
1076                .build(),
1077        );
1078
1079        let mut result = UnifiedResult::with_columns(vec![
1080            "source".into(),
1081            "destination".into(),
1082            "selected".into(),
1083            "committed".into(),
1084        ]);
1085        let mut record = UnifiedRecord::new();
1086        record.set("source", Value::text(source.to_string()));
1087        record.set("destination", Value::text(destination.to_string()));
1088        record.set("selected", Value::UnsignedInteger(selected_count));
1089        record.set("committed", Value::UnsignedInteger(selected_count));
1090        result.push(record);
1091
1092        Ok(RuntimeQueryResult {
1093            query: raw_query.to_string(),
1094            mode: QueryMode::Sql,
1095            statement: "queue_move",
1096            engine: "runtime-queue",
1097            result,
1098            affected_rows: selected_count,
1099            statement_type: "update",
1100        })
1101    }
1102}
1103
1104fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1105    if store.get_collection(queue).is_some() {
1106        Ok(())
1107    } else {
1108        Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1109    }
1110}
1111
1112pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1113    let default = QueueRuntimeConfig {
1114        mode: QueueMode::Work,
1115        priority: false,
1116        max_size: None,
1117        ttl_ms: None,
1118        dlq: None,
1119        max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1120        lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1121        in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1122    };
1123
1124    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1125        return default;
1126    };
1127    manager
1128        .query_all(|entity| {
1129            entity.data.as_row().is_some_and(|row| {
1130                row_text(row, "kind").as_deref() == Some("queue_config")
1131                    && row_text(row, "queue").as_deref() == Some(queue)
1132            })
1133        })
1134        .into_iter()
1135        .find_map(|entity| {
1136            let row = entity.data.as_row()?;
1137            Some(QueueRuntimeConfig {
1138                mode: row_text(row, "mode")
1139                    .as_deref()
1140                    .and_then(QueueMode::parse)
1141                    .unwrap_or_default(),
1142                priority: row_bool(row, "priority").unwrap_or(false),
1143                max_size: row_u64(row, "max_size").map(|value| value as usize),
1144                ttl_ms: row_u64(row, "ttl_ms"),
1145                dlq: row_text(row, "dlq"),
1146                max_attempts: row_u64(row, "max_attempts")
1147                    .map(|value| value as u32)
1148                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1149                lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1150                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1151                in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1152                    .map(|value| value as u32)
1153                    .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1154            })
1155        })
1156        .unwrap_or(default)
1157}
1158
1159pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1160    load_queue_config(store, queue).mode.as_str()
1161}
1162
1163fn save_queue_config(
1164    store: &UnifiedStore,
1165    queue: &str,
1166    config: &QueueRuntimeConfig,
1167) -> RedDBResult<()> {
1168    remove_meta_rows(store, |row| {
1169        row_text(row, "kind").as_deref() == Some("queue_config")
1170            && row_text(row, "queue").as_deref() == Some(queue)
1171    });
1172
1173    let mut fields = HashMap::new();
1174    fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1175    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1176    fields.insert(
1177        "mode".to_string(),
1178        Value::text(config.mode.as_str().to_string()),
1179    );
1180    fields.insert("priority".to_string(), Value::Boolean(config.priority));
1181    fields.insert(
1182        "max_size".to_string(),
1183        config
1184            .max_size
1185            .map(|value| Value::UnsignedInteger(value as u64))
1186            .unwrap_or(Value::Null),
1187    );
1188    fields.insert(
1189        "ttl_ms".to_string(),
1190        config
1191            .ttl_ms
1192            .map(Value::UnsignedInteger)
1193            .unwrap_or(Value::Null),
1194    );
1195    fields.insert(
1196        "dlq".to_string(),
1197        config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1198    );
1199    fields.insert(
1200        "max_attempts".to_string(),
1201        Value::UnsignedInteger(u64::from(config.max_attempts)),
1202    );
1203    fields.insert(
1204        "lock_deadline_ms".to_string(),
1205        Value::UnsignedInteger(config.lock_deadline_ms),
1206    );
1207    fields.insert(
1208        "in_flight_cap_per_group".to_string(),
1209        Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1210    );
1211    insert_meta_row(store, fields)
1212}
1213
1214fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1215    remove_meta_rows(store, |row| {
1216        row_text(row, "queue").as_deref() == Some(queue)
1217    });
1218}
1219
1220fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1221    Ok(load_queue_groups(store, queue)?
1222        .into_iter()
1223        .any(|entry| entry.group == group))
1224}
1225
1226pub(super) fn require_queue_group(
1227    store: &UnifiedStore,
1228    queue: &str,
1229    group: &str,
1230) -> RedDBResult<()> {
1231    if queue_group_exists(store, queue, group)? {
1232        Ok(())
1233    } else {
1234        Err(RedDBError::NotFound(format!(
1235            "consumer group '{}' not found on queue '{}'",
1236            group, queue
1237        )))
1238    }
1239}
1240
1241pub(super) fn resolve_read_group(
1242    store: &UnifiedStore,
1243    queue: &str,
1244    group: Option<&str>,
1245    consumer: &str,
1246    config: &QueueRuntimeConfig,
1247) -> RedDBResult<String> {
1248    if let Some(group) = group {
1249        require_queue_group(store, queue, group)?;
1250        return Ok(group.to_string());
1251    }
1252
1253    match config.mode {
1254        QueueMode::Work => {
1255            if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1256                save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1257            }
1258            Ok(WORK_DEFAULT_GROUP.to_string())
1259        }
1260        QueueMode::Fanout => {
1261            let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1262            if !queue_group_exists(store, queue, &fanout_group)? {
1263                save_queue_group(store, queue, &fanout_group)?;
1264            }
1265            Ok(fanout_group)
1266        }
1267    }
1268}
1269
1270fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1271    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1272        return Ok(Vec::new());
1273    };
1274    Ok(manager
1275        .query_all(|entity| {
1276            entity.data.as_row().is_some_and(|row| {
1277                row_text(row, "kind").as_deref() == Some("queue_group")
1278                    && row_text(row, "queue").as_deref() == Some(queue)
1279            })
1280        })
1281        .into_iter()
1282        .filter_map(|entity| {
1283            let row = entity.data.as_row()?;
1284            Some(QueueGroupEntry {
1285                entity_id: entity.id,
1286                group: row_text(row, "group")?,
1287            })
1288        })
1289        .collect())
1290}
1291
1292fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1293    let mut fields = HashMap::new();
1294    fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1295    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1296    fields.insert("group".to_string(), Value::text(group.to_string()));
1297    fields.insert(
1298        "created_at_ns".to_string(),
1299        Value::UnsignedInteger(now_ns()),
1300    );
1301    insert_meta_row(store, fields)
1302}
1303
1304pub(super) fn load_pending_entries(
1305    store: &UnifiedStore,
1306    queue: &str,
1307    group: Option<&str>,
1308    message_id: Option<EntityId>,
1309) -> RedDBResult<Vec<QueuePendingEntry>> {
1310    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1311        return Ok(Vec::new());
1312    };
1313    Ok(manager
1314        .query_all(|entity| {
1315            entity.data.as_row().is_some_and(|row| {
1316                row_text(row, "kind").as_deref() == Some("queue_pending")
1317                    && row_text(row, "queue").as_deref() == Some(queue)
1318                    && group
1319                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1320                        .unwrap_or(true)
1321                    && message_id
1322                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1323                        .unwrap_or(true)
1324            })
1325        })
1326        .into_iter()
1327        .filter_map(|entity| {
1328            let row = entity.data.as_row()?;
1329            Some(QueuePendingEntry {
1330                entity_id: entity.id,
1331                group: row_text(row, "group")?,
1332                message_id: EntityId::new(row_u64(row, "message_id")?),
1333                consumer: row_text(row, "consumer")?,
1334                delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1335                delivery_count: row_u64(row, "delivery_count")
1336                    .map(|value| value as u32)
1337                    .unwrap_or(1),
1338            })
1339        })
1340        .collect())
1341}
1342
1343pub(super) fn save_queue_pending(
1344    store: &UnifiedStore,
1345    queue: &str,
1346    group: &str,
1347    message_id: EntityId,
1348    consumer: &str,
1349    delivered_at_ns: u64,
1350    delivery_count: u32,
1351) -> RedDBResult<()> {
1352    remove_meta_rows(store, |row| {
1353        row_text(row, "kind").as_deref() == Some("queue_pending")
1354            && row_text(row, "queue").as_deref() == Some(queue)
1355            && row_text(row, "group").as_deref() == Some(group)
1356            && row_u64(row, "message_id") == Some(message_id.raw())
1357    });
1358
1359    let mut fields = HashMap::new();
1360    fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1361    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1362    fields.insert("group".to_string(), Value::text(group.to_string()));
1363    fields.insert(
1364        "message_id".to_string(),
1365        Value::UnsignedInteger(message_id.raw()),
1366    );
1367    fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1368    fields.insert(
1369        "delivered_at_ns".to_string(),
1370        Value::UnsignedInteger(delivered_at_ns),
1371    );
1372    fields.insert(
1373        "delivery_count".to_string(),
1374        Value::UnsignedInteger(u64::from(delivery_count)),
1375    );
1376    insert_meta_row(store, fields)
1377}
1378
1379pub(super) fn require_pending_entry(
1380    store: &UnifiedStore,
1381    queue: &str,
1382    group: &str,
1383    message_id: EntityId,
1384) -> RedDBResult<QueuePendingEntry> {
1385    load_pending_entries(store, queue, Some(group), Some(message_id))?
1386        .into_iter()
1387        .next()
1388        .ok_or_else(|| {
1389            RedDBError::NotFound(format!(
1390                "message '{}' is not pending in group '{}' on queue '{}'",
1391                message_id.raw(),
1392                group,
1393                queue
1394            ))
1395        })
1396}
1397
1398pub(super) fn load_ack_entries(
1399    store: &UnifiedStore,
1400    queue: &str,
1401    group: Option<&str>,
1402    message_id: Option<EntityId>,
1403) -> RedDBResult<Vec<QueueAckEntry>> {
1404    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1405        return Ok(Vec::new());
1406    };
1407    Ok(manager
1408        .query_all(|entity| {
1409            entity.data.as_row().is_some_and(|row| {
1410                row_text(row, "kind").as_deref() == Some("queue_ack")
1411                    && row_text(row, "queue").as_deref() == Some(queue)
1412                    && group
1413                        .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1414                        .unwrap_or(true)
1415                    && message_id
1416                        .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1417                        .unwrap_or(true)
1418            })
1419        })
1420        .into_iter()
1421        .filter_map(|entity| {
1422            let row = entity.data.as_row()?;
1423            Some(QueueAckEntry {
1424                entity_id: entity.id,
1425                group: row_text(row, "group")?,
1426                message_id: EntityId::new(row_u64(row, "message_id")?),
1427            })
1428        })
1429        .collect())
1430}
1431
1432pub(super) fn save_queue_ack(
1433    store: &UnifiedStore,
1434    queue: &str,
1435    group: &str,
1436    message_id: EntityId,
1437) -> RedDBResult<()> {
1438    let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1439    if !existing.is_empty() {
1440        return Ok(());
1441    }
1442
1443    let mut fields = HashMap::new();
1444    fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1445    fields.insert("queue".to_string(), Value::text(queue.to_string()));
1446    fields.insert("group".to_string(), Value::text(group.to_string()));
1447    fields.insert(
1448        "message_id".to_string(),
1449        Value::UnsignedInteger(message_id.raw()),
1450    );
1451    fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1452    insert_meta_row(store, fields)
1453}
1454
1455pub(super) fn queue_message_completed_for_all_groups(
1456    store: &UnifiedStore,
1457    queue: &str,
1458    message_id: EntityId,
1459) -> RedDBResult<bool> {
1460    let groups = load_queue_groups(store, queue)?;
1461    let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1462    if !pending.is_empty() {
1463        return Ok(false);
1464    }
1465    if groups.is_empty() {
1466        return Ok(true);
1467    }
1468
1469    let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1470        .into_iter()
1471        .map(|entry| entry.group)
1472        .collect::<HashSet<_>>();
1473    Ok(groups
1474        .into_iter()
1475        .all(|group| acked_groups.contains(&group.group)))
1476}
1477
1478fn load_queue_message_views(
1479    store: &UnifiedStore,
1480    queue: &str,
1481) -> RedDBResult<Vec<QueueMessageView>> {
1482    load_queue_message_views_with_runtime(None, store, queue)
1483}
1484
1485/// Kind-aware queue scan (Phase 2.5.5 RLS universal). When the
1486/// caller has a `RedDBRuntime` reference, the gate also applies
1487/// any `CREATE POLICY ... ON MESSAGES OF <queue>` predicate. In
1488/// autocommit / embedded paths that only have the raw store (e.g.
1489/// purge loops) we skip RLS because there's no session identity
1490/// to match against.
1491pub(super) fn load_queue_message_views_with_runtime(
1492    runtime: Option<&RedDBRuntime>,
1493    store: &UnifiedStore,
1494    queue: &str,
1495) -> RedDBResult<Vec<QueueMessageView>> {
1496    let manager = store
1497        .get_collection(queue)
1498        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1499    // Phase 1.2 MVCC universal: capture before parallel scan. Messages
1500    // inserted by another connection's open txn stay invisible to
1501    // consumers until that txn commits (prevents phantom POPs).
1502    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1503    let rls_filter = runtime.and_then(|rt| {
1504        crate::runtime::impl_core::rls_policy_filter_for_kind(
1505            rt,
1506            queue,
1507            crate::storage::query::ast::PolicyAction::Select,
1508            crate::storage::query::ast::PolicyTargetKind::Messages,
1509        )
1510    });
1511    let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1512        && rls_filter.is_none()
1513        && runtime.is_some();
1514    if rls_enabled_but_denied {
1515        // RLS on + no Messages policy for this role = deny-default.
1516        return Ok(Vec::new());
1517    }
1518    let filter_arc = rls_filter.map(std::sync::Arc::new);
1519    let rt_arc = runtime;
1520    Ok(manager
1521        .query_all(move |entity| {
1522            if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1523                return false;
1524            }
1525            if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1526                return false;
1527            }
1528            if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1529                return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1530                    Some(&rt.inner.db),
1531                    entity,
1532                    filter,
1533                    queue,
1534                    queue,
1535                );
1536            }
1537            true
1538        })
1539        .into_iter()
1540        .filter_map(queue_message_view_from_entity)
1541        .collect())
1542}
1543
1544fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1545    let (position, _) = match &entity.kind {
1546        EntityKind::QueueMessage { position, queue } => (*position, queue),
1547        _ => return None,
1548    };
1549    let data = match entity.data {
1550        EntityData::QueueMessage(data) => data,
1551        _ => return None,
1552    };
1553    Some(QueueMessageView {
1554        id: entity.id,
1555        position,
1556        priority: data.priority.unwrap_or(0),
1557        payload: data.payload,
1558        attempts: data.attempts,
1559        max_attempts: data.max_attempts,
1560        enqueued_at_ns: data.enqueued_at_ns,
1561    })
1562}
1563
1564fn insert_moved_queue_message(
1565    store: &UnifiedStore,
1566    queue: &str,
1567    config: &QueueRuntimeConfig,
1568    message: &QueueMessageView,
1569) -> RedDBResult<EntityId> {
1570    let position = next_queue_position(store, queue, QueueSide::Right)?;
1571    let entity = UnifiedEntity::new(
1572        EntityId::new(0),
1573        EntityKind::QueueMessage {
1574            queue: queue.to_string(),
1575            position,
1576        },
1577        EntityData::QueueMessage(QueueMessageData {
1578            payload: message.payload.clone(),
1579            priority: if config.priority {
1580                Some(message.priority)
1581            } else {
1582                None
1583            },
1584            enqueued_at_ns: message.enqueued_at_ns,
1585            attempts: message.attempts,
1586            max_attempts: message.max_attempts,
1587            acked: false,
1588        }),
1589    );
1590    let id = store
1591        .insert_auto(queue, entity)
1592        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1593    if let Some(ttl_ms) = config.ttl_ms {
1594        store
1595            .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1596            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1597    }
1598    Ok(id)
1599}
1600
1601fn queue_projection_default_columns() -> Vec<String> {
1602    [
1603        "id",
1604        "payload",
1605        "priority",
1606        "attempts",
1607        "last_error",
1608        "enqueued_at",
1609        "available_at",
1610        "dlq",
1611        "tenant",
1612    ]
1613    .into_iter()
1614    .map(str::to_string)
1615    .collect()
1616}
1617
1618fn queue_projection_record(
1619    columns: &[String],
1620    message: &QueueMessageView,
1621    dlq: bool,
1622) -> RedDBResult<UnifiedRecord> {
1623    let mut record = UnifiedRecord::new();
1624    for column in columns {
1625        let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
1626            RedDBError::Query(format!("unknown queue projection column '{}'", column))
1627        })?;
1628        record.set(column, value);
1629    }
1630    Ok(record)
1631}
1632
1633fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
1634    match column {
1635        "id" => Some(Value::text(message_id_string(message.id))),
1636        "payload" => Some(message.payload.clone()),
1637        "priority" => Some(Value::Integer(i64::from(message.priority))),
1638        "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
1639        "last_error" => Some(Value::Null),
1640        "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1641        "available_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1642        "dlq" => Some(Value::Boolean(dlq)),
1643        "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
1644        _ => None,
1645    }
1646}
1647
1648fn queue_message_tenant(payload: &Value) -> Option<Value> {
1649    let Value::Json(bytes) = payload else {
1650        return None;
1651    };
1652    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1653    json.get("tenant")
1654        .and_then(crate::json::Value::as_str)
1655        .map(|tenant| Value::text(tenant.to_string()))
1656}
1657
1658fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
1659    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1660        return false;
1661    };
1662    !manager
1663        .query_all(|entity| {
1664            entity.data.as_row().is_some_and(|row| {
1665                row_text(row, "kind").as_deref() == Some("queue_config")
1666                    && row_text(row, "dlq").as_deref() == Some(queue)
1667            })
1668        })
1669        .is_empty()
1670}
1671
1672fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
1673    match filter {
1674        Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
1675            .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
1676        Filter::CompareFields { left, op, right } => {
1677            match (
1678                queue_filter_field_value(message, dlq, left),
1679                queue_filter_field_value(message, dlq, right),
1680            ) {
1681                (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
1682                _ => false,
1683            }
1684        }
1685        Filter::And(left, right) => {
1686            queue_message_matches_filter(message, dlq, left)
1687                && queue_message_matches_filter(message, dlq, right)
1688        }
1689        Filter::Or(left, right) => {
1690            queue_message_matches_filter(message, dlq, left)
1691                || queue_message_matches_filter(message, dlq, right)
1692        }
1693        Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
1694        Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
1695            .is_none_or(|value| matches!(value, Value::Null)),
1696        Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
1697            .is_some_and(|value| !matches!(value, Value::Null)),
1698        Filter::In { field, values } => {
1699            queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
1700                values
1701                    .iter()
1702                    .any(|value| queue_values_equal(&candidate, value))
1703            })
1704        }
1705        Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
1706            .is_some_and(|candidate| {
1707                queue_compare_values(&candidate, low, CompareOp::Ge)
1708                    && queue_compare_values(&candidate, high, CompareOp::Le)
1709            }),
1710        Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
1711            .is_some_and(|value| queue_like_matches(&value, pattern)),
1712        Filter::StartsWith { field, prefix } => {
1713            queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
1714        }
1715        Filter::EndsWith { field, suffix } => {
1716            queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
1717        }
1718        Filter::Contains { field, substring } => {
1719            queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
1720        }
1721        Filter::CompareExpr { .. } => false,
1722    }
1723}
1724
1725fn queue_filter_field_value(
1726    message: &QueueMessageView,
1727    dlq: bool,
1728    field: &FieldRef,
1729) -> Option<Value> {
1730    match field {
1731        FieldRef::TableColumn { table, column } if table.is_empty() => {
1732            queue_projection_value(message, dlq, column)
1733                .or_else(|| queue_payload_field_value(&message.payload, column))
1734        }
1735        FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
1736            .or_else(|| queue_payload_field_value(&message.payload, column)),
1737        _ => None,
1738    }
1739}
1740
1741fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
1742    let Value::Json(bytes) = payload else {
1743        return None;
1744    };
1745    let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1746    let value = json.get(field)?;
1747    json_value_to_schema_value(value)
1748}
1749
1750fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
1751    if matches!(value, crate::json::Value::Null) {
1752        Some(Value::Null)
1753    } else if let Some(value) = value.as_bool() {
1754        Some(Value::Boolean(value))
1755    } else if let Some(value) = value.as_i64() {
1756        Some(Value::Integer(value))
1757    } else if let Some(value) = value.as_u64() {
1758        Some(Value::UnsignedInteger(value))
1759    } else if let Some(value) = value.as_f64() {
1760        Some(Value::Float(value))
1761    } else if let Some(value) = value.as_str() {
1762        Some(Value::text(value.to_string()))
1763    } else {
1764        Some(Value::Json(value.to_string_compact().into_bytes()))
1765    }
1766}
1767
1768fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
1769    queue_filter_field_value(message, dlq, field).and_then(|value| match value {
1770        Value::Text(value) => Some(value.to_string()),
1771        Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
1772        Value::Integer(value) => Some(value.to_string()),
1773        Value::UnsignedInteger(value) => Some(value.to_string()),
1774        Value::Float(value) => Some(value.to_string()),
1775        Value::Boolean(value) => Some(value.to_string()),
1776        _ => None,
1777    })
1778}
1779
1780fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
1781    match op {
1782        CompareOp::Eq => queue_values_equal(left, right),
1783        CompareOp::Ne => !queue_values_equal(left, right),
1784        CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
1785        CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
1786        CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
1787        CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
1788    }
1789}
1790
1791fn queue_values_equal(left: &Value, right: &Value) -> bool {
1792    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1793        return (left - right).abs() < f64::EPSILON;
1794    }
1795    match (left, right) {
1796        (Value::Text(left), Value::Text(right)) => left == right,
1797        (Value::Boolean(left), Value::Boolean(right)) => left == right,
1798        _ => left == right,
1799    }
1800}
1801
1802fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
1803    if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1804        return left.partial_cmp(&right);
1805    }
1806    match (left, right) {
1807        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1808        _ => None,
1809    }
1810}
1811
1812fn queue_value_number(value: &Value) -> Option<f64> {
1813    match value {
1814        Value::Integer(value) => Some(*value as f64),
1815        Value::UnsignedInteger(value) => Some(*value as f64),
1816        Value::Float(value) => Some(*value),
1817        Value::Text(value) => value.parse().ok(),
1818        _ => None,
1819    }
1820}
1821
1822fn queue_like_matches(value: &str, pattern: &str) -> bool {
1823    if pattern == "%" {
1824        return true;
1825    }
1826    let starts_wild = pattern.starts_with('%');
1827    let ends_wild = pattern.ends_with('%');
1828    let needle = pattern.trim_matches('%');
1829    match (starts_wild, ends_wild) {
1830        (true, true) => value.contains(needle),
1831        (true, false) => value.ends_with(needle),
1832        (false, true) => value.starts_with(needle),
1833        (false, false) => value == needle,
1834    }
1835}
1836
1837pub(super) fn queue_message_view_by_id(
1838    store: &UnifiedStore,
1839    queue: &str,
1840    message_id: EntityId,
1841) -> RedDBResult<Option<QueueMessageView>> {
1842    let manager = queue_manager(store, queue)?;
1843    Ok(manager
1844        .get(message_id)
1845        .and_then(queue_message_view_from_entity))
1846}
1847
1848pub(super) fn sort_queue_messages(
1849    messages: &mut [QueueMessageView],
1850    config: &QueueRuntimeConfig,
1851    side: QueueSide,
1852) {
1853    messages.sort_by(|left, right| {
1854        if config.priority {
1855            right
1856                .priority
1857                .cmp(&left.priority)
1858                .then_with(|| match side {
1859                    QueueSide::Left => left.position.cmp(&right.position),
1860                    QueueSide::Right => right.position.cmp(&left.position),
1861                })
1862                .then_with(|| left.id.raw().cmp(&right.id.raw()))
1863        } else {
1864            match side {
1865                QueueSide::Left => left.position.cmp(&right.position),
1866                QueueSide::Right => right.position.cmp(&left.position),
1867            }
1868            .then_with(|| left.id.raw().cmp(&right.id.raw()))
1869        }
1870    });
1871}
1872
1873pub(super) fn next_queue_position(
1874    store: &UnifiedStore,
1875    queue: &str,
1876    side: QueueSide,
1877) -> RedDBResult<u64> {
1878    let messages = load_queue_message_views(store, queue)?;
1879    if messages.is_empty() {
1880        return Ok(QUEUE_POSITION_CENTER);
1881    }
1882    match side {
1883        QueueSide::Left => Ok(messages
1884            .iter()
1885            .map(|message| message.position)
1886            .min()
1887            .unwrap_or(QUEUE_POSITION_CENTER)
1888            .saturating_sub(1)),
1889        QueueSide::Right => Ok(messages
1890            .iter()
1891            .map(|message| message.position)
1892            .max()
1893            .unwrap_or(QUEUE_POSITION_CENTER)
1894            .saturating_add(1)),
1895    }
1896}
1897
1898pub(super) fn increment_queue_attempts(
1899    store: &UnifiedStore,
1900    queue: &str,
1901    message_id: EntityId,
1902) -> RedDBResult<u32> {
1903    let manager = queue_manager(store, queue)?;
1904    let mut entity = manager
1905        .get(message_id)
1906        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1907    match &mut entity.data {
1908        EntityData::QueueMessage(message) => {
1909            message.attempts = message.attempts.saturating_add(1);
1910            let attempts = message.attempts;
1911            manager
1912                .update(entity)
1913                .map_err(|err| RedDBError::Internal(err.to_string()))?;
1914            Ok(attempts)
1915        }
1916        _ => Err(RedDBError::Query(format!(
1917            "entity '{}' is not a queue message",
1918            message_id.raw()
1919        ))),
1920    }
1921}
1922
1923pub(super) fn queue_message_attempts(
1924    store: &UnifiedStore,
1925    queue: &str,
1926    message_id: EntityId,
1927) -> RedDBResult<u32> {
1928    Ok(queue_message_data(store, queue, message_id)?.attempts)
1929}
1930
1931pub(super) fn queue_message_max_attempts(
1932    store: &UnifiedStore,
1933    queue: &str,
1934    message_id: EntityId,
1935) -> RedDBResult<u32> {
1936    Ok(queue_message_data(store, queue, message_id)?.max_attempts)
1937}
1938
1939pub(super) fn queue_message_payload(
1940    store: &UnifiedStore,
1941    queue: &str,
1942    message_id: EntityId,
1943) -> RedDBResult<Value> {
1944    Ok(queue_message_data(store, queue, message_id)?.payload)
1945}
1946
1947pub(super) fn queue_message_pending_any(
1948    store: &UnifiedStore,
1949    queue: &str,
1950    message_id: EntityId,
1951) -> RedDBResult<bool> {
1952    Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
1953}
1954
1955pub(super) fn queue_message_pending_for_group(
1956    store: &UnifiedStore,
1957    queue: &str,
1958    group: &str,
1959    message_id: EntityId,
1960) -> RedDBResult<bool> {
1961    Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1962}
1963
1964pub(super) fn queue_message_acked_for_group(
1965    store: &UnifiedStore,
1966    queue: &str,
1967    group: &str,
1968    message_id: EntityId,
1969) -> RedDBResult<bool> {
1970    Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1971}
1972
1973fn queue_manager(
1974    store: &UnifiedStore,
1975    queue: &str,
1976) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
1977    store
1978        .get_collection(queue)
1979        .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
1980}
1981
1982pub(super) fn queue_message_data(
1983    store: &UnifiedStore,
1984    queue: &str,
1985    message_id: EntityId,
1986) -> RedDBResult<QueueMessageData> {
1987    let manager = queue_manager(store, queue)?;
1988    let entity = manager
1989        .get(message_id)
1990        .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1991    match entity.data {
1992        EntityData::QueueMessage(message) => Ok(message),
1993        _ => Err(RedDBError::Query(format!(
1994            "entity '{}' is not a queue message",
1995            message_id.raw()
1996        ))),
1997    }
1998}
1999
2000fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2001    let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2002    store
2003        .insert_auto(
2004            QUEUE_META_COLLECTION,
2005            UnifiedEntity::new(
2006                EntityId::new(0),
2007                EntityKind::TableRow {
2008                    table: Arc::from(QUEUE_META_COLLECTION),
2009                    row_id: 0,
2010                },
2011                EntityData::Row(RowData {
2012                    columns: Vec::new(),
2013                    named: Some(fields),
2014                    schema: None,
2015                }),
2016            ),
2017        )
2018        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2019    Ok(())
2020}
2021
2022pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2023    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2024        return;
2025    };
2026    let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2027    for row in rows {
2028        let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2029    }
2030}
2031
2032pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2033    let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2034}
2035
2036fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2037    format!("{queue}:{}", message_id.raw())
2038}
2039
2040pub(super) fn queue_message_lock_handle(
2041    runtime: &RedDBRuntime,
2042    queue: &str,
2043    message_id: EntityId,
2044) -> Arc<parking_lot::Mutex<()>> {
2045    let key = queue_message_lock_key(queue, message_id);
2046    if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2047        return lock;
2048    }
2049
2050    let mut locks = runtime.inner.queue_message_locks.write();
2051    locks
2052        .entry(key)
2053        .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2054        .clone()
2055}
2056
2057pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2058    runtime
2059        .inner
2060        .queue_message_locks
2061        .write()
2062        .remove(&queue_message_lock_key(queue, message_id));
2063}
2064
2065fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2066    let raw = value.strip_prefix('e').unwrap_or(value);
2067    raw.parse::<u64>()
2068        .map(EntityId::new)
2069        .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2070}
2071
2072/// ADR 0026: resolve the ACK/NACK handle. When `delivery_id` is supplied,
2073/// it wins unconditionally — strict failure if the handle does not resolve
2074/// to a live pending delivery on `queue`. When only the legacy tuple is
2075/// supplied, emit a rate-limited deprecation log line and use the tuple.
2076/// At least one handle must be present.
2077pub(super) fn resolve_ack_nack_handle(
2078    store: &UnifiedStore,
2079    queue: &str,
2080    group_hint: &str,
2081    message_id_hint: &str,
2082    delivery_id: Option<&str>,
2083) -> RedDBResult<(String, EntityId)> {
2084    if let Some(did) = delivery_id {
2085        return resolve_delivery_id(store, queue, did);
2086    }
2087    if group_hint.is_empty() || message_id_hint.is_empty() {
2088        return Err(RedDBError::Query(
2089            "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2090                .to_string(),
2091        ));
2092    }
2093    log_tuple_deprecation(queue);
2094    let entity = parse_message_id(message_id_hint)?;
2095    Ok((group_hint.to_string(), entity))
2096}
2097
2098fn resolve_delivery_id(
2099    store: &UnifiedStore,
2100    queue: &str,
2101    delivery_id: &str,
2102) -> RedDBResult<(String, EntityId)> {
2103    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2104        return Err(RedDBError::Query(format!(
2105            "delivery_id '{}' does not resolve to a live pending delivery",
2106            delivery_id
2107        )));
2108    };
2109    for entity in manager.query_all(|entity| {
2110        entity.data.as_row().is_some_and(|row| {
2111            row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2112                && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2113        })
2114    }) {
2115        if let Some(row) = entity.data.as_row() {
2116            let row_queue = row_text(row, "queue").unwrap_or_default();
2117            let row_group = row_text(row, "group").unwrap_or_default();
2118            let row_message = row_u64(row, "message_id").unwrap_or(0);
2119            if row_queue != queue {
2120                return Err(RedDBError::Query(format!(
2121                    "delivery_id '{}' belongs to queue '{}', not '{}'",
2122                    delivery_id, row_queue, queue
2123                )));
2124            }
2125            return Ok((row_group, EntityId::new(row_message)));
2126        }
2127    }
2128    Err(RedDBError::Query(format!(
2129        "delivery_id '{}' does not resolve to a live pending delivery",
2130        delivery_id
2131    )))
2132}
2133
2134/// Per-(connection, queue) rate-limited "tuple ACK is deprecated" log line.
2135/// One emission per minute matches ADR 0026's operational guidance.
2136fn log_tuple_deprecation(queue: &str) {
2137    use std::sync::{Mutex, OnceLock};
2138    use std::time::Instant;
2139
2140    static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2141    const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2142
2143    let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2144    let key = (super::impl_core::current_connection_id(), queue.to_string());
2145    let now = Instant::now();
2146    let mut guard = match map.lock() {
2147        Ok(g) => g,
2148        Err(_) => return,
2149    };
2150    let should_emit =
2151        !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2152    if should_emit {
2153        guard.insert(key.clone(), now);
2154        drop(guard);
2155        tracing::warn!(
2156            target: "reddb::queue_lifecycle",
2157            queue = queue,
2158            connection_id = key.0,
2159            "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2160             switch to the server-issued delivery_id (ADR 0026). \
2161             The tuple path will be removed one minor release after introduction.",
2162        );
2163    }
2164}
2165
2166fn message_id_string(message_id: EntityId) -> String {
2167    message_id.raw().to_string()
2168}
2169
2170/// Slice 10 of issue #527 — render-time scan of pending entries
2171/// per (queue, group) for `queue_pending_gauge` exposition. Walks
2172/// `red_queue_meta` live so the gauge cannot drift from the source
2173/// of truth.
2174pub(crate) fn pending_counts_by_group(
2175    store: &UnifiedStore,
2176) -> std::collections::BTreeMap<(String, String), u64> {
2177    let mut counts: std::collections::BTreeMap<(String, String), u64> =
2178        std::collections::BTreeMap::new();
2179    let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2180        return counts;
2181    };
2182    for entity in manager.query_all(|entity| {
2183        entity
2184            .data
2185            .as_row()
2186            .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2187    }) {
2188        if let Some(row) = entity.data.as_row() {
2189            let queue = row_text(row, "queue");
2190            let group = row_text(row, "group");
2191            if let (Some(q), Some(g)) = (queue, group) {
2192                *counts.entry((q, g)).or_insert(0) += 1;
2193            }
2194        }
2195    }
2196    counts
2197}
2198
2199pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2200    match row.get_field(field)?.clone() {
2201        Value::Text(value) => Some(value.to_string()),
2202        Value::NodeRef(value) => Some(value),
2203        Value::EdgeRef(value) => Some(value),
2204        Value::TableRef(value) => Some(value),
2205        _ => None,
2206    }
2207}
2208
2209pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2210    match row.get_field(field)?.clone() {
2211        Value::UnsignedInteger(value) => Some(value),
2212        Value::Integer(value) if value >= 0 => Some(value as u64),
2213        Value::Float(value) if value >= 0.0 => Some(value as u64),
2214        Value::Text(value) => value.parse().ok(),
2215        _ => None,
2216    }
2217}
2218
2219fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2220    match row.get_field(field)?.clone() {
2221        Value::Boolean(value) => Some(value),
2222        Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2223            "true" => Some(true),
2224            "false" => Some(false),
2225            _ => None,
2226        },
2227        _ => None,
2228    }
2229}
2230
2231fn queue_collection_contract(
2232    name: &str,
2233    priority: bool,
2234    ttl_ms: Option<u64>,
2235) -> crate::physical::CollectionContract {
2236    let now = current_unix_ms();
2237    let mut context_index_fields = Vec::new();
2238    if priority {
2239        context_index_fields.push("priority".to_string());
2240    }
2241
2242    crate::physical::CollectionContract {
2243        name: name.to_string(),
2244        declared_model: crate::catalog::CollectionModel::Queue,
2245        schema_mode: crate::catalog::SchemaMode::Dynamic,
2246        origin: crate::physical::ContractOrigin::Explicit,
2247        version: 1,
2248        created_at_unix_ms: now,
2249        updated_at_unix_ms: now,
2250        default_ttl_ms: ttl_ms,
2251        vector_dimension: None,
2252        vector_metric: None,
2253        context_index_fields,
2254        declared_columns: Vec::new(),
2255        table_def: None,
2256        timestamps_enabled: false,
2257        context_index_enabled: false,
2258        metrics_raw_retention_ms: None,
2259        metrics_rollup_policies: Vec::new(),
2260        metrics_tenant_identity: None,
2261        metrics_namespace: None,
2262        // Queues manipulate messages via push/pop/ack — the row DML
2263        // paths never apply. Flag it as append_only so inadvertent
2264        // `UPDATE/DELETE FROM queue_name` statements fail loudly.
2265        append_only: true,
2266        subscriptions: Vec::new(),
2267        session_key: None,
2268        session_gap_ms: None,
2269        retention_duration_ms: None,
2270    }
2271}
2272
2273fn current_unix_ms() -> u128 {
2274    std::time::SystemTime::now()
2275        .duration_since(std::time::UNIX_EPOCH)
2276        .unwrap_or_default()
2277        .as_millis()
2278}
2279
2280pub(super) fn now_ns() -> u64 {
2281    std::time::SystemTime::now()
2282        .duration_since(std::time::UNIX_EPOCH)
2283        .unwrap_or_default()
2284        .as_nanos() as u64
2285}
2286
2287pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2288    Metadata::with_fields(
2289        [(
2290            "_ttl_ms".to_string(),
2291            if ttl_ms <= i64::MAX as u64 {
2292                MetadataValue::Int(ttl_ms as i64)
2293            } else {
2294                MetadataValue::Timestamp(ttl_ms)
2295            },
2296        )]
2297        .into_iter()
2298        .collect(),
2299    )
2300}
2301
2302/// Rough payload byte estimate for outbox watermark tracking.
2303fn estimate_payload_bytes(payload: &Value) -> u64 {
2304    match payload {
2305        Value::Json(v) => v.len() as u64,
2306        Value::Text(s) => s.len() as u64,
2307        _ => 64,
2308    }
2309}