1use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::storage::queue::QueueMode;
9use crate::storage::unified::entity::{QueueMessageData, RowData};
10use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
11
12use super::*;
13
14use super::primary_queue_store::PrimaryQueueStore;
15use super::queue_lifecycle::{QueueLifecycle, RetirementOutcome};
16use crate::storage::queue::lifecycle::{
17 QueueSide as LcQueueSide, QueueStore as _, QueueStoreError, QueueTxn,
18};
19
20pub(super) fn runtime_lifecycle(
27 runtime: &RedDBRuntime,
28 queue: &str,
29) -> (
30 QueueLifecycle<PrimaryQueueStore>,
31 PrimaryQueueStore,
32 QueueTxn,
33) {
34 let primary_for_lookup = PrimaryQueueStore::new(runtime.clone());
35 let primary_for_lifecycle = PrimaryQueueStore::new(runtime.clone());
36 let txn = primary_for_lifecycle.new_txn();
37 let cfg = primary_for_lifecycle.lifecycle_config(queue);
38 (
39 QueueLifecycle::new(primary_for_lifecycle, cfg),
40 primary_for_lookup,
41 txn,
42 )
43}
44
45fn ast_side_to_lc(side: crate::storage::query::ast::QueueSide) -> LcQueueSide {
49 use crate::storage::query::ast::QueueSide as Ast;
50 match side {
51 Ast::Left => LcQueueSide::Left,
52 Ast::Right => LcQueueSide::Right,
53 }
54}
55
56fn map_qse(err: QueueStoreError) -> RedDBError {
60 match err {
61 QueueStoreError::UnknownDelivery(id) => RedDBError::NotFound(format!(
62 "delivery_id '{id}' does not resolve to a live pending delivery"
63 )),
64 QueueStoreError::UnknownQueue(q) => RedDBError::NotFound(format!("queue '{q}' not found")),
65 QueueStoreError::ReplicaImmutable => {
66 RedDBError::Internal("replica QueueStore is immutable".to_string())
67 }
68 }
69}
70
71pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
78
79pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
81
82pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
84
85const OUTBOX_WARN_BYTES: u64 = 1 << 30;
87
88const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
90
91static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
93
94const QUEUE_META_COLLECTION: &str = "red_queue_meta";
95const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
96const WORK_DEFAULT_GROUP: &str = "_work_default";
97const FANOUT_GROUP_PREFIX: &str = "_fanout_";
98
99#[derive(Debug, Clone)]
100pub(super) struct QueueRuntimeConfig {
101 pub(super) mode: QueueMode,
102 pub(super) priority: bool,
103 pub(super) max_size: Option<usize>,
104 pub(super) ttl_ms: Option<u64>,
105 pub(super) dlq: Option<String>,
106 pub(super) max_attempts: u32,
107 pub(super) lock_deadline_ms: u64,
108 pub(super) in_flight_cap_per_group: u32,
109}
110
111#[derive(Debug, Clone)]
112struct QueueGroupEntry {
113 entity_id: EntityId,
114 group: String,
115}
116
117#[derive(Debug, Clone)]
118pub(super) struct QueuePendingEntry {
119 pub(super) entity_id: EntityId,
120 group: String,
121 pub(super) message_id: EntityId,
122 consumer: String,
123 pub(super) delivered_at_ns: u64,
124 pub(super) delivery_count: u32,
125}
126
127#[derive(Debug, Clone)]
128pub(super) struct QueueAckEntry {
129 entity_id: EntityId,
130 group: String,
131 pub(super) message_id: EntityId,
132}
133
134#[derive(Debug, Clone)]
135pub(super) struct QueueMessageView {
136 pub(super) id: EntityId,
137 position: u64,
138 priority: i32,
139 pub(super) payload: Value,
140 attempts: u32,
141 pub(super) max_attempts: u32,
142 enqueued_at_ns: u64,
143}
144
145impl RedDBRuntime {
146 pub(crate) fn enqueue_event_payload(
147 &self,
148 queue: &str,
149 payload: Value,
150 ) -> RedDBResult<EntityId> {
151 let store = self.inner.db.store();
152 if store.get_collection(queue).is_none() {
154 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
155 }
156
157 let payload_bytes = estimate_payload_bytes(&payload);
159 let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
160
161 if outbox_bytes > OUTBOX_MAX_BYTES {
163 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
164 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
165 return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
166 }
167
168 if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
170 tracing::warn!(
171 outbox_bytes,
172 warn_threshold = OUTBOX_WARN_BYTES,
173 "event outbox approaching capacity warning threshold"
174 );
175 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
176 queue: queue.to_string(),
177 dlq: format!("{queue}_outbox_dlq"),
178 reason: "outbox_warn_bytes_exceeded".to_string(),
179 }
180 .emit_global();
181 }
182
183 let config = load_queue_config(store.as_ref(), queue);
184
185 if let Some(max_size) = config.max_size {
187 let current_len = load_queue_message_views(store.as_ref(), queue)
188 .unwrap_or_default()
189 .len();
190 if current_len >= max_size {
191 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
192 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
193 return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
194 }
195 if current_len * 10 >= max_size * 8 {
197 tracing::warn!(
198 queue = %queue,
199 size = current_len,
200 max = max_size,
201 "event target queue near capacity"
202 );
203 }
204 }
205
206 let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
207 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
208 Ok(id)
209 }
210
211 fn route_event_to_outbox_dlq(
213 &self,
214 queue: &str,
215 payload: Value,
216 reason: &str,
217 ) -> RedDBResult<EntityId> {
218 let dlq_name = format!("{queue}_outbox_dlq");
219 EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
220
221 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
222 queue: queue.to_string(),
223 dlq: dlq_name.clone(),
224 reason: reason.to_string(),
225 }
226 .emit_global();
227
228 let store = self.inner.db.store();
229 if store.get_collection(&dlq_name).is_none() {
230 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
231 }
232 let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
233 let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
234 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
235 Ok(id)
236 }
237
238 fn enqueue_event_payload_raw(
240 &self,
241 store: &UnifiedStore,
242 queue: &str,
243 config: &QueueRuntimeConfig,
244 payload: Value,
245 ) -> RedDBResult<EntityId> {
246 let position = next_queue_position(store, queue, QueueSide::Right)?;
247 let mut entity = UnifiedEntity::new(
248 EntityId::new(0),
249 EntityKind::QueueMessage {
250 queue: queue.to_string(),
251 position,
252 },
253 EntityData::QueueMessage(QueueMessageData {
254 payload,
255 priority: None,
256 enqueued_at_ns: now_ns(),
257 attempts: 0,
258 max_attempts: config.max_attempts,
259 acked: false,
260 }),
261 );
262 if let Some(xid) = self.current_xid() {
263 entity.set_xmin(xid);
264 }
265 let id = store
266 .insert_auto(queue, entity)
267 .map_err(|err| RedDBError::Internal(err.to_string()))?;
268 if let Some(ttl_ms) = config.ttl_ms {
269 store
270 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
271 .map_err(|err| RedDBError::Internal(err.to_string()))?;
272 }
273 self.invalidate_result_cache_for_table(queue);
274 Ok(id)
275 }
276
277 pub fn execute_create_queue(
278 &self,
279 raw_query: &str,
280 query: &CreateQueueQuery,
281 ) -> RedDBResult<RuntimeQueryResult> {
282 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
283 if query.dlq.as_deref() == Some(query.name.as_str()) {
284 return Err(RedDBError::Query(
285 "dead-letter queue must be different from the source queue".to_string(),
286 ));
287 }
288
289 let store = self.inner.db.store();
290 let exists = store.get_collection(&query.name).is_some();
291 if exists {
292 if query.if_not_exists {
293 return Ok(RuntimeQueryResult::ok_message(
294 raw_query.to_string(),
295 &format!("queue '{}' already exists", query.name),
296 "create",
297 ));
298 }
299 return Err(RedDBError::Query(format!(
300 "queue '{}' already exists",
301 query.name
302 )));
303 }
304
305 store
306 .create_collection(&query.name)
307 .map_err(|err| RedDBError::Internal(err.to_string()))?;
308 if let Some(ttl_ms) = query.ttl_ms {
309 self.inner
310 .db
311 .set_collection_default_ttl_ms(&query.name, ttl_ms);
312 }
313 self.inner
314 .db
315 .save_collection_contract(queue_collection_contract(
316 &query.name,
317 query.priority,
318 query.ttl_ms,
319 ))
320 .map_err(|err| RedDBError::Internal(err.to_string()))?;
321 save_queue_config(
322 store.as_ref(),
323 &query.name,
324 &QueueRuntimeConfig {
325 mode: query.mode,
326 priority: query.priority,
327 max_size: query.max_size,
328 ttl_ms: query.ttl_ms,
329 dlq: query.dlq.clone(),
330 max_attempts: query.max_attempts,
331 lock_deadline_ms: query.lock_deadline_ms,
332 in_flight_cap_per_group: query.in_flight_cap_per_group,
333 },
334 )?;
335
336 if let Some(dlq) = &query.dlq {
337 if store.get_collection(dlq).is_none() {
338 store
339 .create_collection(dlq)
340 .map_err(|err| RedDBError::Internal(err.to_string()))?;
341 self.inner
342 .db
343 .save_collection_contract(queue_collection_contract(dlq, false, None))
344 .map_err(|err| RedDBError::Internal(err.to_string()))?;
345 }
346 }
347
348 self.invalidate_result_cache();
349 self.inner
350 .db
351 .persist_metadata()
352 .map_err(|err| RedDBError::Internal(err.to_string()))?;
353 let mut type_tags = Vec::new();
358 if let Some(dlq) = &query.dlq {
359 type_tags.push(format!("dlq:{}", dlq));
360 }
361 self.schema_vocabulary_apply(
362 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
363 collection: query.name.clone(),
364 columns: vec!["payload".to_string()],
365 type_tags,
366 description: None,
367 },
368 );
369
370 let mut msg = format!("queue '{}' created", query.name);
371 msg.push_str(&format!(" (mode={})", query.mode.as_str()));
372 if query.priority {
373 msg.push_str(" (priority)");
374 }
375 if let Some(max_size) = query.max_size {
376 msg.push_str(&format!(" (max_size={max_size})"));
377 }
378 if let Some(ttl_ms) = query.ttl_ms {
379 msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
380 }
381 if let Some(dlq) = &query.dlq {
382 msg.push_str(&format!(
383 " (dlq={dlq}, max_attempts={})",
384 query.max_attempts
385 ));
386 }
387
388 Ok(RuntimeQueryResult::ok_message(
389 raw_query.to_string(),
390 &msg,
391 "create",
392 ))
393 }
394
395 pub fn execute_alter_queue(
396 &self,
397 raw_query: &str,
398 query: &AlterQueueQuery,
399 ) -> RedDBResult<RuntimeQueryResult> {
400 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
401 let store = self.inner.db.store();
402 ensure_queue_exists(store.as_ref(), &query.name)?;
403
404 let mut config = load_queue_config(store.as_ref(), &query.name);
405 let mut summary: Vec<String> = Vec::new();
406
407 if let Some(new_mode) = query.mode {
408 let pending =
409 load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
410 if !pending.is_empty() {
411 tracing::warn!(
412 queue = %query.name,
413 pending_count = pending.len(),
414 new_mode = %new_mode.as_str(),
415 "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
416 new reads use {}",
417 pending.len(),
418 new_mode.as_str(),
419 );
420 }
421 config.mode = new_mode;
422 summary.push(format!("mode={}", new_mode.as_str()));
423 }
424 if let Some(max_attempts) = query.max_attempts {
425 config.max_attempts = max_attempts;
426 summary.push(format!("max_attempts={max_attempts}"));
427 }
428 if let Some(lock_deadline_ms) = query.lock_deadline_ms {
429 config.lock_deadline_ms = lock_deadline_ms;
430 summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
431 }
432 if let Some(in_flight_cap) = query.in_flight_cap_per_group {
433 config.in_flight_cap_per_group = in_flight_cap;
434 summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
435 }
436 if let Some(dlq) = &query.dlq {
437 if dlq == &query.name {
438 return Err(RedDBError::Query(
439 "dead-letter queue must be different from the source queue".to_string(),
440 ));
441 }
442 config.dlq = Some(dlq.clone());
443 summary.push(format!("dlq={dlq}"));
444 }
445
446 save_queue_config(store.as_ref(), &query.name, &config)?;
447
448 self.invalidate_result_cache();
449 self.inner
450 .db
451 .persist_metadata()
452 .map_err(|err| RedDBError::Internal(err.to_string()))?;
453
454 Ok(RuntimeQueryResult::ok_message(
455 raw_query.to_string(),
456 &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
457 "alter",
458 ))
459 }
460
461 pub fn execute_drop_queue(
462 &self,
463 raw_query: &str,
464 query: &DropQueueQuery,
465 ) -> RedDBResult<RuntimeQueryResult> {
466 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
467 let store = self.inner.db.store();
468 if super::impl_ddl::is_system_schema_name(&query.name) {
469 return Err(RedDBError::Query("system schema is read-only".to_string()));
470 }
471 if store.get_collection(&query.name).is_none() {
472 if query.if_exists {
473 return Ok(RuntimeQueryResult::ok_message(
474 raw_query.to_string(),
475 &format!("queue '{}' does not exist", query.name),
476 "drop",
477 ));
478 }
479 return Err(RedDBError::NotFound(format!(
480 "queue '{}' not found",
481 query.name
482 )));
483 }
484 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
485 &query.name,
486 &self.inner.db.catalog_model_snapshot(),
487 )?;
488 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
489 crate::catalog::CollectionModel::Queue,
490 actual,
491 )?;
492
493 store
494 .drop_collection(&query.name)
495 .map_err(|err| RedDBError::Internal(err.to_string()))?;
496 self.inner.db.clear_collection_default_ttl_ms(&query.name);
497 self.inner
498 .db
499 .remove_collection_contract(&query.name)
500 .map_err(|err| RedDBError::Internal(err.to_string()))?;
501 remove_queue_metadata(store.as_ref(), &query.name);
502 self.invalidate_result_cache();
503 self.inner
504 .db
505 .persist_metadata()
506 .map_err(|err| RedDBError::Internal(err.to_string()))?;
507 self.schema_vocabulary_apply(
509 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
510 collection: query.name.clone(),
511 },
512 );
513
514 Ok(RuntimeQueryResult::ok_message(
515 raw_query.to_string(),
516 &format!("queue '{}' dropped", query.name),
517 "drop",
518 ))
519 }
520
521 pub fn execute_queue_command(
522 &self,
523 raw_query: &str,
524 cmd: &QueueCommand,
525 ) -> RedDBResult<RuntimeQueryResult> {
526 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
527 match cmd {
528 QueueCommand::Push {
529 queue,
530 value,
531 side,
532 priority,
533 } => {
534 let store = self.inner.db.store();
535 ensure_queue_exists(store.as_ref(), queue)?;
536 let config = load_queue_config(store.as_ref(), queue);
537 if priority.is_some() && !config.priority {
538 return Err(RedDBError::Query(format!(
539 "queue '{}' is not a priority queue",
540 queue
541 )));
542 }
543 if let Some(max_size) = config.max_size {
544 let current_len =
545 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
546 .len();
547 if current_len >= max_size {
548 return Err(RedDBError::Query(format!(
549 "queue '{}' is full (max_size={max_size})",
550 queue
551 )));
552 }
553 }
554
555 let position = next_queue_position(store.as_ref(), queue, *side)?;
556 let mut entity = UnifiedEntity::new(
557 EntityId::new(0),
558 EntityKind::QueueMessage {
559 queue: queue.clone(),
560 position,
561 },
562 EntityData::QueueMessage(QueueMessageData {
563 payload: value.clone(),
564 priority: if config.priority { *priority } else { None },
565 enqueued_at_ns: now_ns(),
566 attempts: 0,
567 max_attempts: config.max_attempts,
568 acked: false,
569 }),
570 );
571 if let Some(xid) = self.current_xid() {
574 entity.set_xmin(xid);
575 }
576 let id = store
577 .insert_auto(queue, entity)
578 .map_err(|err| RedDBError::Internal(err.to_string()))?;
579 if let Some(ttl_ms) = config.ttl_ms {
580 store
581 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
582 .map_err(|err| RedDBError::Internal(err.to_string()))?;
583 }
584 self.invalidate_result_cache();
585
586 let mut result = UnifiedResult::with_columns(vec![
587 "message_id".into(),
588 "side".into(),
589 "queue".into(),
590 ]);
591 let mut record = UnifiedRecord::new();
592 record.set("message_id", Value::text(message_id_string(id)));
593 record.set(
594 "side",
595 Value::text(match side {
596 QueueSide::Left => "left".to_string(),
597 QueueSide::Right => "right".to_string(),
598 }),
599 );
600 record.set("queue", Value::text(queue.clone()));
601 result.push(record);
602
603 Ok(RuntimeQueryResult {
604 query: raw_query.to_string(),
605 mode: QueryMode::Sql,
606 statement: "queue_push",
607 engine: "runtime-queue",
608 result,
609 affected_rows: 1,
610 statement_type: "insert",
611 })
612 }
613 QueueCommand::Pop { queue, side, count } => {
614 let store = self.inner.db.store();
615 ensure_queue_exists(store.as_ref(), queue)?;
616 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
617 let popped = lifecycle
618 .pop(queue, ast_side_to_lc(*side), *count, &txn)
619 .map_err(map_qse)?;
620
621 let mut result =
622 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
623 for (message_id, payload) in &popped {
624 let mut record = UnifiedRecord::new();
625 record.set(
626 "message_id",
627 Value::text(message_id_string(EntityId::new(*message_id))),
628 );
629 record.set("payload", payload.clone());
630 result.push(record);
631 }
632 let popped_count = popped.len() as u64;
633 if popped_count > 0 {
634 self.invalidate_result_cache();
635 }
636
637 Ok(RuntimeQueryResult {
638 query: raw_query.to_string(),
639 mode: QueryMode::Sql,
640 statement: "queue_pop",
641 engine: "runtime-queue",
642 result,
643 affected_rows: popped_count,
644 statement_type: "delete",
645 })
646 }
647 QueueCommand::Peek { queue, count } => {
648 let store = self.inner.db.store();
649 ensure_queue_exists(store.as_ref(), queue)?;
650 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
651 let messages = lifecycle.peek(queue, *count, &txn);
652
653 let mut result =
654 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
655 for message in messages {
656 let mut record = UnifiedRecord::new();
657 record.set(
658 "message_id",
659 Value::text(message_id_string(EntityId::new(message.message_id))),
660 );
661 record.set("payload", message.payload);
662 result.push(record);
663 }
664
665 Ok(RuntimeQueryResult {
666 query: raw_query.to_string(),
667 mode: QueryMode::Sql,
668 statement: "queue_peek",
669 engine: "runtime-queue",
670 result,
671 affected_rows: 0,
672 statement_type: "select",
673 })
674 }
675 QueueCommand::Len { queue } => {
676 let store = self.inner.db.store();
677 ensure_queue_exists(store.as_ref(), queue)?;
678 let count =
679 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
680 as u64;
681 let mut result = UnifiedResult::with_columns(vec!["len".into()]);
682 let mut record = UnifiedRecord::new();
683 record.set("len", Value::UnsignedInteger(count));
684 result.push(record);
685
686 Ok(RuntimeQueryResult {
687 query: raw_query.to_string(),
688 mode: QueryMode::Sql,
689 statement: "queue_len",
690 engine: "runtime-queue",
691 result,
692 affected_rows: 0,
693 statement_type: "select",
694 })
695 }
696 QueueCommand::Purge { queue } => {
697 let store = self.inner.db.store();
698 ensure_queue_exists(store.as_ref(), queue)?;
699 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
700 let count = lifecycle.purge(queue, &txn).map_err(map_qse)?;
701 if count > 0 {
702 self.invalidate_result_cache();
703 }
704
705 Ok(RuntimeQueryResult::ok_message(
706 raw_query.to_string(),
707 &format!("{count} messages purged from queue '{queue}'"),
708 "delete",
709 ))
710 }
711 QueueCommand::GroupCreate { queue, group } => {
712 let store = self.inner.db.store();
713 ensure_queue_exists(store.as_ref(), queue)?;
714 if queue_group_exists(store.as_ref(), queue, group)? {
715 return Ok(RuntimeQueryResult::ok_message(
716 raw_query.to_string(),
717 &format!(
718 "consumer group '{}' already exists on queue '{}'",
719 group, queue
720 ),
721 "create",
722 ));
723 }
724 save_queue_group(store.as_ref(), queue, group)?;
725 self.invalidate_result_cache();
726
727 Ok(RuntimeQueryResult::ok_message(
728 raw_query.to_string(),
729 &format!("consumer group '{}' created on queue '{}'", group, queue),
730 "create",
731 ))
732 }
733 QueueCommand::GroupRead {
734 queue,
735 group,
736 consumer,
737 count,
738 } => {
739 let store = self.inner.db.store();
740 ensure_queue_exists(store.as_ref(), queue)?;
741 let config = load_queue_config(store.as_ref(), queue);
745 let group_owned =
746 resolve_read_group(store.as_ref(), queue, group.as_deref(), consumer, &config)?;
747 let group_ref = group_owned.as_str();
748 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
749 let delivered = lifecycle
750 .group_read(&txn, queue, group_ref, consumer, *count)
751 .map_err(map_qse)?;
752
753 let mut result = UnifiedResult::with_columns(vec![
754 "message_id".into(),
755 "payload".into(),
756 "consumer".into(),
757 "delivery_count".into(),
758 "attempts".into(),
759 ]);
760
761 for message in delivered {
762 let mut record = UnifiedRecord::new();
763 record.set(
764 "message_id",
765 Value::text(message_id_string(EntityId::new(message.message_id))),
766 );
767 record.set("payload", message.payload);
768 record.set("consumer", Value::text(message.consumer));
769 record.set(
770 "delivery_count",
771 Value::UnsignedInteger(u64::from(message.delivery_count)),
772 );
773 record.set(
774 "attempts",
775 Value::UnsignedInteger(u64::from(message.delivery_count)),
776 );
777 result.push(record);
778 }
779 if !result.records.is_empty() {
780 self.invalidate_result_cache();
781 }
782
783 Ok(RuntimeQueryResult {
784 query: raw_query.to_string(),
785 mode: QueryMode::Sql,
786 statement: "queue_group_read",
787 engine: "runtime-queue",
788 result,
789 affected_rows: 0,
790 statement_type: "select",
791 })
792 }
793 QueueCommand::Pending { queue, group } => {
794 let store = self.inner.db.store();
795 ensure_queue_exists(store.as_ref(), queue)?;
796 require_queue_group(store.as_ref(), queue, group)?;
797 let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
798 pending.sort_by_key(|entry| entry.delivered_at_ns);
799 let current_time_ns = now_ns();
800
801 let mut result = UnifiedResult::with_columns(vec![
802 "message_id".into(),
803 "consumer".into(),
804 "delivered_at_ns".into(),
805 "delivery_count".into(),
806 "idle_ms".into(),
807 ]);
808 for entry in pending {
809 let mut record = UnifiedRecord::new();
810 record.set(
811 "message_id",
812 Value::text(message_id_string(entry.message_id)),
813 );
814 record.set("consumer", Value::text(entry.consumer));
815 record.set(
816 "delivered_at_ns",
817 Value::UnsignedInteger(entry.delivered_at_ns),
818 );
819 record.set(
820 "delivery_count",
821 Value::UnsignedInteger(u64::from(entry.delivery_count)),
822 );
823 record.set(
824 "idle_ms",
825 Value::UnsignedInteger(
826 current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
827 ),
828 );
829 result.push(record);
830 }
831
832 Ok(RuntimeQueryResult {
833 query: raw_query.to_string(),
834 mode: QueryMode::Sql,
835 statement: "queue_pending",
836 engine: "runtime-queue",
837 result,
838 affected_rows: 0,
839 statement_type: "select",
840 })
841 }
842 QueueCommand::Claim {
843 queue,
844 group,
845 consumer,
846 min_idle_ms,
847 } => {
848 let store = self.inner.db.store();
849 ensure_queue_exists(store.as_ref(), queue)?;
850 require_queue_group(store.as_ref(), queue, group)?;
851 let (lifecycle, _ps, txn) = runtime_lifecycle(self, queue);
852 let delivered = lifecycle
853 .claim_delivering(queue, consumer, *min_idle_ms, &txn)
854 .map_err(map_qse)?;
855
856 let mut result = UnifiedResult::with_columns(vec![
857 "message_id".into(),
858 "payload".into(),
859 "consumer".into(),
860 "delivery_count".into(),
861 ]);
862
863 for message in delivered {
864 let mut record = UnifiedRecord::new();
865 record.set(
866 "message_id",
867 Value::text(message_id_string(EntityId::new(message.message_id))),
868 );
869 record.set("payload", message.payload);
870 record.set("consumer", Value::text(message.consumer));
871 record.set(
872 "delivery_count",
873 Value::UnsignedInteger(u64::from(message.delivery_count)),
874 );
875 result.push(record);
876 }
877 if !result.records.is_empty() {
878 self.invalidate_result_cache();
879 }
880 let affected_rows = result.records.len() as u64;
881
882 Ok(RuntimeQueryResult {
883 query: raw_query.to_string(),
884 mode: QueryMode::Sql,
885 statement: "queue_claim",
886 engine: "runtime-queue",
887 result,
888 affected_rows,
889 statement_type: "update",
890 })
891 }
892 QueueCommand::Ack {
893 queue,
894 group,
895 message_id,
896 delivery_id,
897 } => {
898 let store = self.inner.db.store();
899 ensure_queue_exists(store.as_ref(), queue)?;
900 let (group_owned, message_entity) = resolve_ack_nack_handle(
901 store.as_ref(),
902 queue,
903 group,
904 message_id,
905 delivery_id.as_deref(),
906 )?;
907 let group_ref = group_owned.as_str();
908 require_queue_group(store.as_ref(), queue, group_ref)?;
909 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
910 let did = match delivery_id.as_deref() {
911 Some(d) => d.to_string(),
912 None => ps
913 .find_pending_by_key(queue, message_entity.raw(), group_ref)
914 .ok_or_else(|| {
915 RedDBError::NotFound(format!(
916 "no pending delivery for message '{}' on queue '{}' (group '{}')",
917 message_entity.raw(),
918 queue,
919 group_ref
920 ))
921 })?,
922 };
923 lifecycle.ack(&txn, &did).map_err(map_qse)?;
924 self.invalidate_result_cache();
925
926 Ok(RuntimeQueryResult::ok_message(
927 raw_query.to_string(),
928 "message acknowledged",
929 "update",
930 ))
931 }
932 QueueCommand::Nack {
933 queue,
934 group,
935 message_id,
936 delivery_id,
937 } => {
938 let store = self.inner.db.store();
939 ensure_queue_exists(store.as_ref(), queue)?;
940 let (group_owned, message_entity) = resolve_ack_nack_handle(
941 store.as_ref(),
942 queue,
943 group,
944 message_id,
945 delivery_id.as_deref(),
946 )?;
947 let group_ref = group_owned.as_str();
948 require_queue_group(store.as_ref(), queue, group_ref)?;
949 let (lifecycle, ps, txn) = runtime_lifecycle(self, queue);
950 let did = match delivery_id.as_deref() {
951 Some(d) => d.to_string(),
952 None => ps
953 .find_pending_by_key(queue, message_entity.raw(), group_ref)
954 .ok_or_else(|| {
955 RedDBError::NotFound(format!(
956 "no pending delivery for message '{}' on queue '{}' (group '{}')",
957 message_entity.raw(),
958 queue,
959 group_ref
960 ))
961 })?,
962 };
963 let message = match lifecycle.nack(&txn, &did).map_err(map_qse)? {
964 RetirementOutcome::Requeued => "message requeued".to_string(),
965 RetirementOutcome::MovedToDlq(dlq) => {
966 format!("message moved to dead-letter queue '{}'", dlq)
967 }
968 RetirementOutcome::Dropped => "message dropped after max attempts".to_string(),
969 };
970 self.invalidate_result_cache();
971
972 Ok(RuntimeQueryResult::ok_message(
973 raw_query.to_string(),
974 &message,
975 "update",
976 ))
977 }
978 QueueCommand::Move {
979 source,
980 destination,
981 filter,
982 limit,
983 } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
984 }
985 }
986
987 pub fn execute_queue_select(
988 &self,
989 raw_query: &str,
990 query: &QueueSelectQuery,
991 ) -> RedDBResult<RuntimeQueryResult> {
992 let store = self.inner.db.store();
993 ensure_queue_exists(store.as_ref(), &query.queue)?;
994 let config = load_queue_config(store.as_ref(), &query.queue);
995 let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
996 let columns = if query.columns.is_empty() {
997 queue_projection_default_columns()
998 } else {
999 query.columns.clone()
1000 };
1001
1002 let mut messages =
1003 load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
1004 sort_queue_messages(&mut messages, &config, QueueSide::Left);
1005
1006 let mut result = UnifiedResult::with_columns(columns.clone());
1007 for message in messages {
1008 if query
1009 .filter
1010 .as_ref()
1011 .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
1012 {
1013 continue;
1014 }
1015 let record = queue_projection_record(&columns, &message, dlq)?;
1016 result.push(record);
1017 if query
1018 .limit
1019 .is_some_and(|limit| result.records.len() >= limit as usize)
1020 {
1021 break;
1022 }
1023 }
1024
1025 Ok(RuntimeQueryResult {
1026 query: raw_query.to_string(),
1027 mode: QueryMode::Sql,
1028 statement: "queue_select",
1029 engine: "runtime-queue",
1030 result,
1031 affected_rows: 0,
1032 statement_type: "select",
1033 })
1034 }
1035
1036 fn execute_queue_move(
1037 &self,
1038 raw_query: &str,
1039 source: &str,
1040 destination: &str,
1041 filter: Option<&Filter>,
1042 limit: usize,
1043 ) -> RedDBResult<RuntimeQueryResult> {
1044 if source == destination {
1045 return Err(RedDBError::Query(
1046 "QUEUE MOVE source and destination must be different".to_string(),
1047 ));
1048 }
1049 let store = self.inner.db.store();
1050 ensure_queue_exists(store.as_ref(), source)?;
1051 ensure_queue_exists(store.as_ref(), destination)?;
1052 let source_config = load_queue_config(store.as_ref(), source);
1053 let destination_config = load_queue_config(store.as_ref(), destination);
1054 let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
1055
1056 let mut messages =
1057 load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
1058 sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
1059 let selected = messages
1060 .into_iter()
1061 .filter(|message| {
1062 filter
1063 .map(|f| queue_message_matches_filter(message, source_dlq, f))
1064 .unwrap_or(true)
1065 })
1066 .take(limit)
1067 .collect::<Vec<_>>();
1068
1069 if let Some(max_size) = destination_config.max_size {
1070 let current_len =
1071 load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1072 .len();
1073 if current_len + selected.len() > max_size {
1074 return Err(RedDBError::Query(format!(
1075 "queue '{}' is full (max_size={max_size})",
1076 destination
1077 )));
1078 }
1079 }
1080
1081 for message in &selected {
1082 let lock = queue_message_lock_handle(self, source, message.id);
1083 let Some(_guard) = lock.try_lock() else {
1084 return Err(RedDBError::Query(format!(
1085 "message '{}' is locked on queue '{}'",
1086 message.id.raw(),
1087 source
1088 )));
1089 };
1090 if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1091 return Err(RedDBError::Query(format!(
1092 "message '{}' is no longer available on queue '{}'",
1093 message.id.raw(),
1094 source
1095 )));
1096 }
1097 }
1098
1099 let mut inserted = Vec::new();
1100 for message in &selected {
1101 match insert_moved_queue_message(
1102 store.as_ref(),
1103 destination,
1104 &destination_config,
1105 message,
1106 ) {
1107 Ok(id) => inserted.push(id),
1108 Err(err) => {
1109 for id in inserted {
1110 let _ = store.delete(destination, id);
1111 }
1112 return Err(err);
1113 }
1114 }
1115 }
1116
1117 let (move_lifecycle, _move_ps, move_txn) = runtime_lifecycle(self, source);
1118 for message in &selected {
1119 move_lifecycle
1120 .delete_with_state(source, message.id.raw(), &move_txn)
1121 .map_err(map_qse)?;
1122 }
1123 if !selected.is_empty() {
1124 self.invalidate_result_cache();
1125 }
1126
1127 let selected_count = selected.len() as u64;
1128 self.audit_log().record_event(
1129 AuditEvent::builder("queue/move")
1130 .source(AuditAuthSource::System)
1131 .outcome(Outcome::Success)
1132 .resource(format!("queue:{source}->{destination}"))
1133 .fields([
1134 AuditFieldEscaper::field("source", source),
1135 AuditFieldEscaper::field("destination", destination),
1136 AuditFieldEscaper::field("selected", selected_count),
1137 AuditFieldEscaper::field("committed", selected_count),
1138 ])
1139 .build(),
1140 );
1141
1142 let mut result = UnifiedResult::with_columns(vec![
1143 "source".into(),
1144 "destination".into(),
1145 "selected".into(),
1146 "committed".into(),
1147 ]);
1148 let mut record = UnifiedRecord::new();
1149 record.set("source", Value::text(source.to_string()));
1150 record.set("destination", Value::text(destination.to_string()));
1151 record.set("selected", Value::UnsignedInteger(selected_count));
1152 record.set("committed", Value::UnsignedInteger(selected_count));
1153 result.push(record);
1154
1155 Ok(RuntimeQueryResult {
1156 query: raw_query.to_string(),
1157 mode: QueryMode::Sql,
1158 statement: "queue_move",
1159 engine: "runtime-queue",
1160 result,
1161 affected_rows: selected_count,
1162 statement_type: "update",
1163 })
1164 }
1165}
1166
1167fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1168 if store.get_collection(queue).is_some() {
1169 Ok(())
1170 } else {
1171 Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1172 }
1173}
1174
1175pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1176 let default = QueueRuntimeConfig {
1177 mode: QueueMode::Work,
1178 priority: false,
1179 max_size: None,
1180 ttl_ms: None,
1181 dlq: None,
1182 max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1183 lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1184 in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1185 };
1186
1187 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1188 return default;
1189 };
1190 manager
1191 .query_all(|entity| {
1192 entity.data.as_row().is_some_and(|row| {
1193 row_text(row, "kind").as_deref() == Some("queue_config")
1194 && row_text(row, "queue").as_deref() == Some(queue)
1195 })
1196 })
1197 .into_iter()
1198 .find_map(|entity| {
1199 let row = entity.data.as_row()?;
1200 Some(QueueRuntimeConfig {
1201 mode: row_text(row, "mode")
1202 .as_deref()
1203 .and_then(QueueMode::parse)
1204 .unwrap_or_default(),
1205 priority: row_bool(row, "priority").unwrap_or(false),
1206 max_size: row_u64(row, "max_size").map(|value| value as usize),
1207 ttl_ms: row_u64(row, "ttl_ms"),
1208 dlq: row_text(row, "dlq"),
1209 max_attempts: row_u64(row, "max_attempts")
1210 .map(|value| value as u32)
1211 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1212 lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1213 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1214 in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1215 .map(|value| value as u32)
1216 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1217 })
1218 })
1219 .unwrap_or(default)
1220}
1221
1222pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1223 load_queue_config(store, queue).mode.as_str()
1224}
1225
1226fn save_queue_config(
1227 store: &UnifiedStore,
1228 queue: &str,
1229 config: &QueueRuntimeConfig,
1230) -> RedDBResult<()> {
1231 remove_meta_rows(store, |row| {
1232 row_text(row, "kind").as_deref() == Some("queue_config")
1233 && row_text(row, "queue").as_deref() == Some(queue)
1234 });
1235
1236 let mut fields = HashMap::new();
1237 fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1238 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1239 fields.insert(
1240 "mode".to_string(),
1241 Value::text(config.mode.as_str().to_string()),
1242 );
1243 fields.insert("priority".to_string(), Value::Boolean(config.priority));
1244 fields.insert(
1245 "max_size".to_string(),
1246 config
1247 .max_size
1248 .map(|value| Value::UnsignedInteger(value as u64))
1249 .unwrap_or(Value::Null),
1250 );
1251 fields.insert(
1252 "ttl_ms".to_string(),
1253 config
1254 .ttl_ms
1255 .map(Value::UnsignedInteger)
1256 .unwrap_or(Value::Null),
1257 );
1258 fields.insert(
1259 "dlq".to_string(),
1260 config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1261 );
1262 fields.insert(
1263 "max_attempts".to_string(),
1264 Value::UnsignedInteger(u64::from(config.max_attempts)),
1265 );
1266 fields.insert(
1267 "lock_deadline_ms".to_string(),
1268 Value::UnsignedInteger(config.lock_deadline_ms),
1269 );
1270 fields.insert(
1271 "in_flight_cap_per_group".to_string(),
1272 Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1273 );
1274 insert_meta_row(store, fields)
1275}
1276
1277fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1278 remove_meta_rows(store, |row| {
1279 row_text(row, "queue").as_deref() == Some(queue)
1280 });
1281}
1282
1283fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1284 Ok(load_queue_groups(store, queue)?
1285 .into_iter()
1286 .any(|entry| entry.group == group))
1287}
1288
1289pub(super) fn require_queue_group(
1290 store: &UnifiedStore,
1291 queue: &str,
1292 group: &str,
1293) -> RedDBResult<()> {
1294 if queue_group_exists(store, queue, group)? {
1295 Ok(())
1296 } else {
1297 Err(RedDBError::NotFound(format!(
1298 "consumer group '{}' not found on queue '{}'",
1299 group, queue
1300 )))
1301 }
1302}
1303
1304pub(super) fn resolve_read_group(
1305 store: &UnifiedStore,
1306 queue: &str,
1307 group: Option<&str>,
1308 consumer: &str,
1309 config: &QueueRuntimeConfig,
1310) -> RedDBResult<String> {
1311 if let Some(group) = group {
1312 require_queue_group(store, queue, group)?;
1313 return Ok(group.to_string());
1314 }
1315
1316 match config.mode {
1317 QueueMode::Work => {
1318 if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1319 save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1320 }
1321 Ok(WORK_DEFAULT_GROUP.to_string())
1322 }
1323 QueueMode::Fanout => {
1324 let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1325 if !queue_group_exists(store, queue, &fanout_group)? {
1326 save_queue_group(store, queue, &fanout_group)?;
1327 }
1328 Ok(fanout_group)
1329 }
1330 }
1331}
1332
1333fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1334 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1335 return Ok(Vec::new());
1336 };
1337 Ok(manager
1338 .query_all(|entity| {
1339 entity.data.as_row().is_some_and(|row| {
1340 row_text(row, "kind").as_deref() == Some("queue_group")
1341 && row_text(row, "queue").as_deref() == Some(queue)
1342 })
1343 })
1344 .into_iter()
1345 .filter_map(|entity| {
1346 let row = entity.data.as_row()?;
1347 Some(QueueGroupEntry {
1348 entity_id: entity.id,
1349 group: row_text(row, "group")?,
1350 })
1351 })
1352 .collect())
1353}
1354
1355fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1356 let mut fields = HashMap::new();
1357 fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1358 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1359 fields.insert("group".to_string(), Value::text(group.to_string()));
1360 fields.insert(
1361 "created_at_ns".to_string(),
1362 Value::UnsignedInteger(now_ns()),
1363 );
1364 insert_meta_row(store, fields)
1365}
1366
1367pub(super) fn load_pending_entries(
1368 store: &UnifiedStore,
1369 queue: &str,
1370 group: Option<&str>,
1371 message_id: Option<EntityId>,
1372) -> RedDBResult<Vec<QueuePendingEntry>> {
1373 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1374 return Ok(Vec::new());
1375 };
1376 Ok(manager
1377 .query_all(|entity| {
1378 entity.data.as_row().is_some_and(|row| {
1379 row_text(row, "kind").as_deref() == Some("queue_pending")
1380 && row_text(row, "queue").as_deref() == Some(queue)
1381 && group
1382 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1383 .unwrap_or(true)
1384 && message_id
1385 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1386 .unwrap_or(true)
1387 })
1388 })
1389 .into_iter()
1390 .filter_map(|entity| {
1391 let row = entity.data.as_row()?;
1392 Some(QueuePendingEntry {
1393 entity_id: entity.id,
1394 group: row_text(row, "group")?,
1395 message_id: EntityId::new(row_u64(row, "message_id")?),
1396 consumer: row_text(row, "consumer")?,
1397 delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1398 delivery_count: row_u64(row, "delivery_count")
1399 .map(|value| value as u32)
1400 .unwrap_or(1),
1401 })
1402 })
1403 .collect())
1404}
1405
1406pub(super) fn save_queue_pending(
1407 store: &UnifiedStore,
1408 queue: &str,
1409 group: &str,
1410 message_id: EntityId,
1411 consumer: &str,
1412 delivered_at_ns: u64,
1413 delivery_count: u32,
1414) -> RedDBResult<()> {
1415 remove_meta_rows(store, |row| {
1416 row_text(row, "kind").as_deref() == Some("queue_pending")
1417 && row_text(row, "queue").as_deref() == Some(queue)
1418 && row_text(row, "group").as_deref() == Some(group)
1419 && row_u64(row, "message_id") == Some(message_id.raw())
1420 });
1421
1422 let mut fields = HashMap::new();
1423 fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1424 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1425 fields.insert("group".to_string(), Value::text(group.to_string()));
1426 fields.insert(
1427 "message_id".to_string(),
1428 Value::UnsignedInteger(message_id.raw()),
1429 );
1430 fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1431 fields.insert(
1432 "delivered_at_ns".to_string(),
1433 Value::UnsignedInteger(delivered_at_ns),
1434 );
1435 fields.insert(
1436 "delivery_count".to_string(),
1437 Value::UnsignedInteger(u64::from(delivery_count)),
1438 );
1439 insert_meta_row(store, fields)
1440}
1441
1442pub(super) fn require_pending_entry(
1443 store: &UnifiedStore,
1444 queue: &str,
1445 group: &str,
1446 message_id: EntityId,
1447) -> RedDBResult<QueuePendingEntry> {
1448 load_pending_entries(store, queue, Some(group), Some(message_id))?
1449 .into_iter()
1450 .next()
1451 .ok_or_else(|| {
1452 RedDBError::NotFound(format!(
1453 "message '{}' is not pending in group '{}' on queue '{}'",
1454 message_id.raw(),
1455 group,
1456 queue
1457 ))
1458 })
1459}
1460
1461pub(super) fn load_ack_entries(
1462 store: &UnifiedStore,
1463 queue: &str,
1464 group: Option<&str>,
1465 message_id: Option<EntityId>,
1466) -> RedDBResult<Vec<QueueAckEntry>> {
1467 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1468 return Ok(Vec::new());
1469 };
1470 Ok(manager
1471 .query_all(|entity| {
1472 entity.data.as_row().is_some_and(|row| {
1473 row_text(row, "kind").as_deref() == Some("queue_ack")
1474 && row_text(row, "queue").as_deref() == Some(queue)
1475 && group
1476 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1477 .unwrap_or(true)
1478 && message_id
1479 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1480 .unwrap_or(true)
1481 })
1482 })
1483 .into_iter()
1484 .filter_map(|entity| {
1485 let row = entity.data.as_row()?;
1486 Some(QueueAckEntry {
1487 entity_id: entity.id,
1488 group: row_text(row, "group")?,
1489 message_id: EntityId::new(row_u64(row, "message_id")?),
1490 })
1491 })
1492 .collect())
1493}
1494
1495pub(super) fn save_queue_ack(
1496 store: &UnifiedStore,
1497 queue: &str,
1498 group: &str,
1499 message_id: EntityId,
1500) -> RedDBResult<()> {
1501 let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1502 if !existing.is_empty() {
1503 return Ok(());
1504 }
1505
1506 let mut fields = HashMap::new();
1507 fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1508 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1509 fields.insert("group".to_string(), Value::text(group.to_string()));
1510 fields.insert(
1511 "message_id".to_string(),
1512 Value::UnsignedInteger(message_id.raw()),
1513 );
1514 fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1515 insert_meta_row(store, fields)
1516}
1517
1518pub(super) fn queue_message_completed_for_all_groups(
1519 store: &UnifiedStore,
1520 queue: &str,
1521 message_id: EntityId,
1522) -> RedDBResult<bool> {
1523 let groups = load_queue_groups(store, queue)?;
1524 let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1525 if !pending.is_empty() {
1526 return Ok(false);
1527 }
1528 if groups.is_empty() {
1529 return Ok(true);
1530 }
1531
1532 let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1533 .into_iter()
1534 .map(|entry| entry.group)
1535 .collect::<HashSet<_>>();
1536 Ok(groups
1537 .into_iter()
1538 .all(|group| acked_groups.contains(&group.group)))
1539}
1540
1541fn load_queue_message_views(
1542 store: &UnifiedStore,
1543 queue: &str,
1544) -> RedDBResult<Vec<QueueMessageView>> {
1545 load_queue_message_views_with_runtime(None, store, queue)
1546}
1547
1548pub(super) fn load_queue_message_views_with_runtime(
1555 runtime: Option<&RedDBRuntime>,
1556 store: &UnifiedStore,
1557 queue: &str,
1558) -> RedDBResult<Vec<QueueMessageView>> {
1559 let manager = store
1560 .get_collection(queue)
1561 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1562 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1566 let rls_filter = runtime.and_then(|rt| {
1567 crate::runtime::impl_core::rls_policy_filter_for_kind(
1568 rt,
1569 queue,
1570 crate::storage::query::ast::PolicyAction::Select,
1571 crate::storage::query::ast::PolicyTargetKind::Messages,
1572 )
1573 });
1574 let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1575 && rls_filter.is_none()
1576 && runtime.is_some();
1577 if rls_enabled_but_denied {
1578 return Ok(Vec::new());
1580 }
1581 let filter_arc = rls_filter.map(std::sync::Arc::new);
1582 let rt_arc = runtime;
1583 Ok(manager
1584 .query_all(move |entity| {
1585 if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1586 return false;
1587 }
1588 if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1589 return false;
1590 }
1591 if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1592 return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1593 Some(&rt.inner.db),
1594 entity,
1595 filter,
1596 queue,
1597 queue,
1598 );
1599 }
1600 true
1601 })
1602 .into_iter()
1603 .filter_map(queue_message_view_from_entity)
1604 .collect())
1605}
1606
1607fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1608 let (position, _) = match &entity.kind {
1609 EntityKind::QueueMessage { position, queue } => (*position, queue),
1610 _ => return None,
1611 };
1612 let data = match entity.data {
1613 EntityData::QueueMessage(data) => data,
1614 _ => return None,
1615 };
1616 Some(QueueMessageView {
1617 id: entity.id,
1618 position,
1619 priority: data.priority.unwrap_or(0),
1620 payload: data.payload,
1621 attempts: data.attempts,
1622 max_attempts: data.max_attempts,
1623 enqueued_at_ns: data.enqueued_at_ns,
1624 })
1625}
1626
1627pub(super) fn insert_moved_queue_message_payload(
1634 store: &UnifiedStore,
1635 queue: &str,
1636 payload: &Value,
1637) -> RedDBResult<EntityId> {
1638 let config = load_queue_config(store, queue);
1639 let position = next_queue_position(store, queue, QueueSide::Right)?;
1640 let enqueued_at_ns = std::time::SystemTime::now()
1641 .duration_since(std::time::UNIX_EPOCH)
1642 .map(|d| d.as_nanos() as u64)
1643 .unwrap_or(0);
1644 let entity = UnifiedEntity::new(
1645 EntityId::new(0),
1646 EntityKind::QueueMessage {
1647 queue: queue.to_string(),
1648 position,
1649 },
1650 EntityData::QueueMessage(QueueMessageData {
1651 payload: payload.clone(),
1652 priority: None,
1653 enqueued_at_ns,
1654 attempts: 0,
1655 max_attempts: config.max_attempts,
1656 acked: false,
1657 }),
1658 );
1659 let id = store
1660 .insert_auto(queue, entity)
1661 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1662 if let Some(ttl_ms) = config.ttl_ms {
1663 store
1664 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1665 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1666 }
1667 Ok(id)
1668}
1669
1670fn insert_moved_queue_message(
1671 store: &UnifiedStore,
1672 queue: &str,
1673 config: &QueueRuntimeConfig,
1674 message: &QueueMessageView,
1675) -> RedDBResult<EntityId> {
1676 let position = next_queue_position(store, queue, QueueSide::Right)?;
1677 let entity = UnifiedEntity::new(
1678 EntityId::new(0),
1679 EntityKind::QueueMessage {
1680 queue: queue.to_string(),
1681 position,
1682 },
1683 EntityData::QueueMessage(QueueMessageData {
1684 payload: message.payload.clone(),
1685 priority: if config.priority {
1686 Some(message.priority)
1687 } else {
1688 None
1689 },
1690 enqueued_at_ns: message.enqueued_at_ns,
1691 attempts: message.attempts,
1692 max_attempts: message.max_attempts,
1693 acked: false,
1694 }),
1695 );
1696 let id = store
1697 .insert_auto(queue, entity)
1698 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1699 if let Some(ttl_ms) = config.ttl_ms {
1700 store
1701 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1702 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1703 }
1704 Ok(id)
1705}
1706
1707fn queue_projection_default_columns() -> Vec<String> {
1708 [
1709 "id",
1710 "payload",
1711 "priority",
1712 "attempts",
1713 "last_error",
1714 "enqueued_at",
1715 "available_at",
1716 "dlq",
1717 "tenant",
1718 ]
1719 .into_iter()
1720 .map(str::to_string)
1721 .collect()
1722}
1723
1724fn queue_projection_record(
1725 columns: &[String],
1726 message: &QueueMessageView,
1727 dlq: bool,
1728) -> RedDBResult<UnifiedRecord> {
1729 let mut record = UnifiedRecord::new();
1730 for column in columns {
1731 let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
1732 RedDBError::Query(format!("unknown queue projection column '{}'", column))
1733 })?;
1734 record.set(column, value);
1735 }
1736 Ok(record)
1737}
1738
1739fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
1740 match column {
1741 "id" => Some(Value::text(message_id_string(message.id))),
1742 "payload" => Some(message.payload.clone()),
1743 "priority" => Some(Value::Integer(i64::from(message.priority))),
1744 "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
1745 "last_error" => Some(Value::Null),
1746 "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1747 "available_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1748 "dlq" => Some(Value::Boolean(dlq)),
1749 "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
1750 _ => None,
1751 }
1752}
1753
1754fn queue_message_tenant(payload: &Value) -> Option<Value> {
1755 let Value::Json(bytes) = payload else {
1756 return None;
1757 };
1758 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1759 json.get("tenant")
1760 .and_then(crate::json::Value::as_str)
1761 .map(|tenant| Value::text(tenant.to_string()))
1762}
1763
1764fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
1765 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1766 return false;
1767 };
1768 !manager
1769 .query_all(|entity| {
1770 entity.data.as_row().is_some_and(|row| {
1771 row_text(row, "kind").as_deref() == Some("queue_config")
1772 && row_text(row, "dlq").as_deref() == Some(queue)
1773 })
1774 })
1775 .is_empty()
1776}
1777
1778fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
1779 match filter {
1780 Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
1781 .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
1782 Filter::CompareFields { left, op, right } => {
1783 match (
1784 queue_filter_field_value(message, dlq, left),
1785 queue_filter_field_value(message, dlq, right),
1786 ) {
1787 (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
1788 _ => false,
1789 }
1790 }
1791 Filter::And(left, right) => {
1792 queue_message_matches_filter(message, dlq, left)
1793 && queue_message_matches_filter(message, dlq, right)
1794 }
1795 Filter::Or(left, right) => {
1796 queue_message_matches_filter(message, dlq, left)
1797 || queue_message_matches_filter(message, dlq, right)
1798 }
1799 Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
1800 Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
1801 .is_none_or(|value| matches!(value, Value::Null)),
1802 Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
1803 .is_some_and(|value| !matches!(value, Value::Null)),
1804 Filter::In { field, values } => {
1805 queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
1806 values
1807 .iter()
1808 .any(|value| queue_values_equal(&candidate, value))
1809 })
1810 }
1811 Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
1812 .is_some_and(|candidate| {
1813 queue_compare_values(&candidate, low, CompareOp::Ge)
1814 && queue_compare_values(&candidate, high, CompareOp::Le)
1815 }),
1816 Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
1817 .is_some_and(|value| queue_like_matches(&value, pattern)),
1818 Filter::StartsWith { field, prefix } => {
1819 queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
1820 }
1821 Filter::EndsWith { field, suffix } => {
1822 queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
1823 }
1824 Filter::Contains { field, substring } => {
1825 queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
1826 }
1827 Filter::CompareExpr { .. } => false,
1828 }
1829}
1830
1831fn queue_filter_field_value(
1832 message: &QueueMessageView,
1833 dlq: bool,
1834 field: &FieldRef,
1835) -> Option<Value> {
1836 match field {
1837 FieldRef::TableColumn { table, column } if table.is_empty() => {
1838 queue_projection_value(message, dlq, column)
1839 .or_else(|| queue_payload_field_value(&message.payload, column))
1840 }
1841 FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
1842 .or_else(|| queue_payload_field_value(&message.payload, column)),
1843 _ => None,
1844 }
1845}
1846
1847fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
1848 let Value::Json(bytes) = payload else {
1849 return None;
1850 };
1851 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1852 let value = json.get(field)?;
1853 json_value_to_schema_value(value)
1854}
1855
1856fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
1857 if matches!(value, crate::json::Value::Null) {
1858 Some(Value::Null)
1859 } else if let Some(value) = value.as_bool() {
1860 Some(Value::Boolean(value))
1861 } else if let Some(value) = value.as_i64() {
1862 Some(Value::Integer(value))
1863 } else if let Some(value) = value.as_u64() {
1864 Some(Value::UnsignedInteger(value))
1865 } else if let Some(value) = value.as_f64() {
1866 Some(Value::Float(value))
1867 } else if let Some(value) = value.as_str() {
1868 Some(Value::text(value.to_string()))
1869 } else {
1870 Some(Value::Json(value.to_string_compact().into_bytes()))
1871 }
1872}
1873
1874fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
1875 queue_filter_field_value(message, dlq, field).and_then(|value| match value {
1876 Value::Text(value) => Some(value.to_string()),
1877 Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
1878 Value::Integer(value) => Some(value.to_string()),
1879 Value::UnsignedInteger(value) => Some(value.to_string()),
1880 Value::Float(value) => Some(value.to_string()),
1881 Value::Boolean(value) => Some(value.to_string()),
1882 _ => None,
1883 })
1884}
1885
1886fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
1887 match op {
1888 CompareOp::Eq => queue_values_equal(left, right),
1889 CompareOp::Ne => !queue_values_equal(left, right),
1890 CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
1891 CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
1892 CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
1893 CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
1894 }
1895}
1896
1897fn queue_values_equal(left: &Value, right: &Value) -> bool {
1898 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1899 return (left - right).abs() < f64::EPSILON;
1900 }
1901 match (left, right) {
1902 (Value::Text(left), Value::Text(right)) => left == right,
1903 (Value::Boolean(left), Value::Boolean(right)) => left == right,
1904 _ => left == right,
1905 }
1906}
1907
1908fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
1909 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1910 return left.partial_cmp(&right);
1911 }
1912 match (left, right) {
1913 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1914 _ => None,
1915 }
1916}
1917
1918fn queue_value_number(value: &Value) -> Option<f64> {
1919 match value {
1920 Value::Integer(value) => Some(*value as f64),
1921 Value::UnsignedInteger(value) => Some(*value as f64),
1922 Value::Float(value) => Some(*value),
1923 Value::Text(value) => value.parse().ok(),
1924 _ => None,
1925 }
1926}
1927
1928fn queue_like_matches(value: &str, pattern: &str) -> bool {
1929 if pattern == "%" {
1930 return true;
1931 }
1932 let starts_wild = pattern.starts_with('%');
1933 let ends_wild = pattern.ends_with('%');
1934 let needle = pattern.trim_matches('%');
1935 match (starts_wild, ends_wild) {
1936 (true, true) => value.contains(needle),
1937 (true, false) => value.ends_with(needle),
1938 (false, true) => value.starts_with(needle),
1939 (false, false) => value == needle,
1940 }
1941}
1942
1943pub(super) fn queue_message_view_by_id(
1944 store: &UnifiedStore,
1945 queue: &str,
1946 message_id: EntityId,
1947) -> RedDBResult<Option<QueueMessageView>> {
1948 let manager = queue_manager(store, queue)?;
1949 Ok(manager
1950 .get(message_id)
1951 .and_then(queue_message_view_from_entity))
1952}
1953
1954pub(super) fn sort_queue_messages(
1955 messages: &mut [QueueMessageView],
1956 config: &QueueRuntimeConfig,
1957 side: QueueSide,
1958) {
1959 messages.sort_by(|left, right| {
1960 if config.priority {
1961 right
1962 .priority
1963 .cmp(&left.priority)
1964 .then_with(|| match side {
1965 QueueSide::Left => left.position.cmp(&right.position),
1966 QueueSide::Right => right.position.cmp(&left.position),
1967 })
1968 .then_with(|| left.id.raw().cmp(&right.id.raw()))
1969 } else {
1970 match side {
1971 QueueSide::Left => left.position.cmp(&right.position),
1972 QueueSide::Right => right.position.cmp(&left.position),
1973 }
1974 .then_with(|| left.id.raw().cmp(&right.id.raw()))
1975 }
1976 });
1977}
1978
1979pub(super) fn next_queue_position(
1980 store: &UnifiedStore,
1981 queue: &str,
1982 side: QueueSide,
1983) -> RedDBResult<u64> {
1984 let messages = load_queue_message_views(store, queue)?;
1985 if messages.is_empty() {
1986 return Ok(QUEUE_POSITION_CENTER);
1987 }
1988 match side {
1989 QueueSide::Left => Ok(messages
1990 .iter()
1991 .map(|message| message.position)
1992 .min()
1993 .unwrap_or(QUEUE_POSITION_CENTER)
1994 .saturating_sub(1)),
1995 QueueSide::Right => Ok(messages
1996 .iter()
1997 .map(|message| message.position)
1998 .max()
1999 .unwrap_or(QUEUE_POSITION_CENTER)
2000 .saturating_add(1)),
2001 }
2002}
2003
2004pub(super) fn increment_queue_attempts(
2005 store: &UnifiedStore,
2006 queue: &str,
2007 message_id: EntityId,
2008) -> RedDBResult<u32> {
2009 let manager = queue_manager(store, queue)?;
2010 let mut entity = manager
2011 .get(message_id)
2012 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2013 match &mut entity.data {
2014 EntityData::QueueMessage(message) => {
2015 message.attempts = message.attempts.saturating_add(1);
2016 let attempts = message.attempts;
2017 manager
2018 .update(entity)
2019 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2020 Ok(attempts)
2021 }
2022 _ => Err(RedDBError::Query(format!(
2023 "entity '{}' is not a queue message",
2024 message_id.raw()
2025 ))),
2026 }
2027}
2028
2029pub(super) fn queue_message_attempts(
2030 store: &UnifiedStore,
2031 queue: &str,
2032 message_id: EntityId,
2033) -> RedDBResult<u32> {
2034 Ok(queue_message_data(store, queue, message_id)?.attempts)
2035}
2036
2037pub(super) fn queue_message_max_attempts(
2038 store: &UnifiedStore,
2039 queue: &str,
2040 message_id: EntityId,
2041) -> RedDBResult<u32> {
2042 Ok(queue_message_data(store, queue, message_id)?.max_attempts)
2043}
2044
2045pub(super) fn queue_message_payload(
2046 store: &UnifiedStore,
2047 queue: &str,
2048 message_id: EntityId,
2049) -> RedDBResult<Value> {
2050 Ok(queue_message_data(store, queue, message_id)?.payload)
2051}
2052
2053pub(super) fn queue_message_pending_any(
2054 store: &UnifiedStore,
2055 queue: &str,
2056 message_id: EntityId,
2057) -> RedDBResult<bool> {
2058 Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
2059}
2060
2061pub(super) fn queue_message_pending_for_group(
2062 store: &UnifiedStore,
2063 queue: &str,
2064 group: &str,
2065 message_id: EntityId,
2066) -> RedDBResult<bool> {
2067 Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2068}
2069
2070pub(super) fn queue_message_acked_for_group(
2071 store: &UnifiedStore,
2072 queue: &str,
2073 group: &str,
2074 message_id: EntityId,
2075) -> RedDBResult<bool> {
2076 Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
2077}
2078
2079fn queue_manager(
2080 store: &UnifiedStore,
2081 queue: &str,
2082) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
2083 store
2084 .get_collection(queue)
2085 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
2086}
2087
2088pub(super) fn queue_message_data(
2089 store: &UnifiedStore,
2090 queue: &str,
2091 message_id: EntityId,
2092) -> RedDBResult<QueueMessageData> {
2093 let manager = queue_manager(store, queue)?;
2094 let entity = manager
2095 .get(message_id)
2096 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
2097 match entity.data {
2098 EntityData::QueueMessage(message) => Ok(message),
2099 _ => Err(RedDBError::Query(format!(
2100 "entity '{}' is not a queue message",
2101 message_id.raw()
2102 ))),
2103 }
2104}
2105
2106fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2107 let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2108 store
2109 .insert_auto(
2110 QUEUE_META_COLLECTION,
2111 UnifiedEntity::new(
2112 EntityId::new(0),
2113 EntityKind::TableRow {
2114 table: Arc::from(QUEUE_META_COLLECTION),
2115 row_id: 0,
2116 },
2117 EntityData::Row(RowData {
2118 columns: Vec::new(),
2119 named: Some(fields),
2120 schema: None,
2121 }),
2122 ),
2123 )
2124 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2125 Ok(())
2126}
2127
2128pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2129 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2130 return;
2131 };
2132 let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2133 for row in rows {
2134 let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2135 }
2136}
2137
2138pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2139 let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2140}
2141
2142fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2143 format!("{queue}:{}", message_id.raw())
2144}
2145
2146pub(super) fn queue_message_lock_handle(
2147 runtime: &RedDBRuntime,
2148 queue: &str,
2149 message_id: EntityId,
2150) -> Arc<parking_lot::Mutex<()>> {
2151 let key = queue_message_lock_key(queue, message_id);
2152 if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2153 return lock;
2154 }
2155
2156 let mut locks = runtime.inner.queue_message_locks.write();
2157 locks
2158 .entry(key)
2159 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2160 .clone()
2161}
2162
2163pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2164 runtime
2165 .inner
2166 .queue_message_locks
2167 .write()
2168 .remove(&queue_message_lock_key(queue, message_id));
2169}
2170
2171fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2172 let raw = value.strip_prefix('e').unwrap_or(value);
2173 raw.parse::<u64>()
2174 .map(EntityId::new)
2175 .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2176}
2177
2178pub(super) fn resolve_ack_nack_handle(
2184 store: &UnifiedStore,
2185 queue: &str,
2186 group_hint: &str,
2187 message_id_hint: &str,
2188 delivery_id: Option<&str>,
2189) -> RedDBResult<(String, EntityId)> {
2190 if let Some(did) = delivery_id {
2191 return resolve_delivery_id(store, queue, did);
2192 }
2193 if group_hint.is_empty() || message_id_hint.is_empty() {
2194 return Err(RedDBError::Query(
2195 "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2196 .to_string(),
2197 ));
2198 }
2199 log_tuple_deprecation(queue);
2200 let entity = parse_message_id(message_id_hint)?;
2201 Ok((group_hint.to_string(), entity))
2202}
2203
2204fn resolve_delivery_id(
2205 store: &UnifiedStore,
2206 queue: &str,
2207 delivery_id: &str,
2208) -> RedDBResult<(String, EntityId)> {
2209 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2210 return Err(RedDBError::Query(format!(
2211 "delivery_id '{}' does not resolve to a live pending delivery",
2212 delivery_id
2213 )));
2214 };
2215 for entity in manager.query_all(|entity| {
2216 entity.data.as_row().is_some_and(|row| {
2217 row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2218 && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2219 })
2220 }) {
2221 if let Some(row) = entity.data.as_row() {
2222 let row_queue = row_text(row, "queue").unwrap_or_default();
2223 let row_group = row_text(row, "group").unwrap_or_default();
2224 let row_message = row_u64(row, "message_id").unwrap_or(0);
2225 if row_queue != queue {
2226 return Err(RedDBError::Query(format!(
2227 "delivery_id '{}' belongs to queue '{}', not '{}'",
2228 delivery_id, row_queue, queue
2229 )));
2230 }
2231 return Ok((row_group, EntityId::new(row_message)));
2232 }
2233 }
2234 Err(RedDBError::Query(format!(
2235 "delivery_id '{}' does not resolve to a live pending delivery",
2236 delivery_id
2237 )))
2238}
2239
2240fn log_tuple_deprecation(queue: &str) {
2243 use std::sync::{Mutex, OnceLock};
2244 use std::time::Instant;
2245
2246 static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2247 const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2248
2249 let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2250 let key = (super::impl_core::current_connection_id(), queue.to_string());
2251 let now = Instant::now();
2252 let mut guard = match map.lock() {
2253 Ok(g) => g,
2254 Err(_) => return,
2255 };
2256 let should_emit =
2257 !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2258 if should_emit {
2259 guard.insert(key.clone(), now);
2260 drop(guard);
2261 tracing::warn!(
2262 target: "reddb::queue_lifecycle",
2263 queue = queue,
2264 connection_id = key.0,
2265 "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2266 switch to the server-issued delivery_id (ADR 0026). \
2267 The tuple path will be removed one minor release after introduction.",
2268 );
2269 }
2270}
2271
2272fn message_id_string(message_id: EntityId) -> String {
2273 message_id.raw().to_string()
2274}
2275
2276pub(crate) fn pending_counts_by_group(
2281 store: &UnifiedStore,
2282) -> std::collections::BTreeMap<(String, String), u64> {
2283 let mut counts: std::collections::BTreeMap<(String, String), u64> =
2284 std::collections::BTreeMap::new();
2285 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2286 return counts;
2287 };
2288 for entity in manager.query_all(|entity| {
2289 entity
2290 .data
2291 .as_row()
2292 .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2293 }) {
2294 if let Some(row) = entity.data.as_row() {
2295 let queue = row_text(row, "queue");
2296 let group = row_text(row, "group");
2297 if let (Some(q), Some(g)) = (queue, group) {
2298 *counts.entry((q, g)).or_insert(0) += 1;
2299 }
2300 }
2301 }
2302 counts
2303}
2304
2305pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2306 match row.get_field(field)?.clone() {
2307 Value::Text(value) => Some(value.to_string()),
2308 Value::NodeRef(value) => Some(value),
2309 Value::EdgeRef(value) => Some(value),
2310 Value::TableRef(value) => Some(value),
2311 _ => None,
2312 }
2313}
2314
2315pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2316 match row.get_field(field)?.clone() {
2317 Value::UnsignedInteger(value) => Some(value),
2318 Value::Integer(value) if value >= 0 => Some(value as u64),
2319 Value::Float(value) if value >= 0.0 => Some(value as u64),
2320 Value::Text(value) => value.parse().ok(),
2321 _ => None,
2322 }
2323}
2324
2325fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2326 match row.get_field(field)?.clone() {
2327 Value::Boolean(value) => Some(value),
2328 Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2329 "true" => Some(true),
2330 "false" => Some(false),
2331 _ => None,
2332 },
2333 _ => None,
2334 }
2335}
2336
2337fn queue_collection_contract(
2338 name: &str,
2339 priority: bool,
2340 ttl_ms: Option<u64>,
2341) -> crate::physical::CollectionContract {
2342 let now = current_unix_ms();
2343 let mut context_index_fields = Vec::new();
2344 if priority {
2345 context_index_fields.push("priority".to_string());
2346 }
2347
2348 crate::physical::CollectionContract {
2349 name: name.to_string(),
2350 declared_model: crate::catalog::CollectionModel::Queue,
2351 schema_mode: crate::catalog::SchemaMode::Dynamic,
2352 origin: crate::physical::ContractOrigin::Explicit,
2353 version: 1,
2354 created_at_unix_ms: now,
2355 updated_at_unix_ms: now,
2356 default_ttl_ms: ttl_ms,
2357 vector_dimension: None,
2358 vector_metric: None,
2359 context_index_fields,
2360 declared_columns: Vec::new(),
2361 table_def: None,
2362 timestamps_enabled: false,
2363 context_index_enabled: false,
2364 metrics_raw_retention_ms: None,
2365 metrics_rollup_policies: Vec::new(),
2366 metrics_tenant_identity: None,
2367 metrics_namespace: None,
2368 append_only: true,
2372 subscriptions: Vec::new(),
2373 session_key: None,
2374 session_gap_ms: None,
2375 retention_duration_ms: None,
2376 }
2377}
2378
2379fn current_unix_ms() -> u128 {
2380 std::time::SystemTime::now()
2381 .duration_since(std::time::UNIX_EPOCH)
2382 .unwrap_or_default()
2383 .as_millis()
2384}
2385
2386pub(super) fn now_ns() -> u64 {
2387 std::time::SystemTime::now()
2388 .duration_since(std::time::UNIX_EPOCH)
2389 .unwrap_or_default()
2390 .as_nanos() as u64
2391}
2392
2393pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2394 Metadata::with_fields(
2395 [(
2396 "_ttl_ms".to_string(),
2397 if ttl_ms <= i64::MAX as u64 {
2398 MetadataValue::Int(ttl_ms as i64)
2399 } else {
2400 MetadataValue::Timestamp(ttl_ms)
2401 },
2402 )]
2403 .into_iter()
2404 .collect(),
2405 )
2406}
2407
2408fn estimate_payload_bytes(payload: &Value) -> u64 {
2410 match payload {
2411 Value::Json(v) => v.len() as u64,
2412 Value::Text(s) => s.len() as u64,
2413 _ => 64,
2414 }
2415}