1use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::storage::queue::QueueMode;
9use crate::storage::unified::entity::{QueueMessageData, RowData};
10use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
11
12use super::*;
13
14pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
21
22pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
24
25pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
27
28const OUTBOX_WARN_BYTES: u64 = 1 << 30;
30
31const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
33
34static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
36
37const QUEUE_META_COLLECTION: &str = "red_queue_meta";
38const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
39const WORK_DEFAULT_GROUP: &str = "_work_default";
40const FANOUT_GROUP_PREFIX: &str = "_fanout_";
41
42#[derive(Debug, Clone)]
43pub(super) struct QueueRuntimeConfig {
44 pub(super) mode: QueueMode,
45 pub(super) priority: bool,
46 pub(super) max_size: Option<usize>,
47 pub(super) ttl_ms: Option<u64>,
48 pub(super) dlq: Option<String>,
49 pub(super) max_attempts: u32,
50 pub(super) lock_deadline_ms: u64,
51 pub(super) in_flight_cap_per_group: u32,
52}
53
54#[derive(Debug, Clone)]
55struct QueueGroupEntry {
56 entity_id: EntityId,
57 group: String,
58}
59
60#[derive(Debug, Clone)]
61pub(super) struct QueuePendingEntry {
62 pub(super) entity_id: EntityId,
63 group: String,
64 pub(super) message_id: EntityId,
65 consumer: String,
66 pub(super) delivered_at_ns: u64,
67 pub(super) delivery_count: u32,
68}
69
70#[derive(Debug, Clone)]
71pub(super) struct QueueAckEntry {
72 entity_id: EntityId,
73 group: String,
74 pub(super) message_id: EntityId,
75}
76
77#[derive(Debug, Clone)]
78pub(super) struct QueueMessageView {
79 pub(super) id: EntityId,
80 position: u64,
81 priority: i32,
82 pub(super) payload: Value,
83 attempts: u32,
84 pub(super) max_attempts: u32,
85 enqueued_at_ns: u64,
86}
87
88impl RedDBRuntime {
89 pub(crate) fn enqueue_event_payload(
90 &self,
91 queue: &str,
92 payload: Value,
93 ) -> RedDBResult<EntityId> {
94 let store = self.inner.db.store();
95 if store.get_collection(queue).is_none() {
97 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
98 }
99
100 let payload_bytes = estimate_payload_bytes(&payload);
102 let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
103
104 if outbox_bytes > OUTBOX_MAX_BYTES {
106 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
107 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
108 return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
109 }
110
111 if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
113 tracing::warn!(
114 outbox_bytes,
115 warn_threshold = OUTBOX_WARN_BYTES,
116 "event outbox approaching capacity warning threshold"
117 );
118 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
119 queue: queue.to_string(),
120 dlq: format!("{queue}_outbox_dlq"),
121 reason: "outbox_warn_bytes_exceeded".to_string(),
122 }
123 .emit_global();
124 }
125
126 let config = load_queue_config(store.as_ref(), queue);
127
128 if let Some(max_size) = config.max_size {
130 let current_len = load_queue_message_views(store.as_ref(), queue)
131 .unwrap_or_default()
132 .len();
133 if current_len >= max_size {
134 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
135 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
136 return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
137 }
138 if current_len * 10 >= max_size * 8 {
140 tracing::warn!(
141 queue = %queue,
142 size = current_len,
143 max = max_size,
144 "event target queue near capacity"
145 );
146 }
147 }
148
149 let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
150 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
151 Ok(id)
152 }
153
154 fn route_event_to_outbox_dlq(
156 &self,
157 queue: &str,
158 payload: Value,
159 reason: &str,
160 ) -> RedDBResult<EntityId> {
161 let dlq_name = format!("{queue}_outbox_dlq");
162 EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
163
164 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
165 queue: queue.to_string(),
166 dlq: dlq_name.clone(),
167 reason: reason.to_string(),
168 }
169 .emit_global();
170
171 let store = self.inner.db.store();
172 if store.get_collection(&dlq_name).is_none() {
173 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
174 }
175 let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
176 let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
177 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
178 Ok(id)
179 }
180
181 fn enqueue_event_payload_raw(
183 &self,
184 store: &UnifiedStore,
185 queue: &str,
186 config: &QueueRuntimeConfig,
187 payload: Value,
188 ) -> RedDBResult<EntityId> {
189 let position = next_queue_position(store, queue, QueueSide::Right)?;
190 let mut entity = UnifiedEntity::new(
191 EntityId::new(0),
192 EntityKind::QueueMessage {
193 queue: queue.to_string(),
194 position,
195 },
196 EntityData::QueueMessage(QueueMessageData {
197 payload,
198 priority: None,
199 enqueued_at_ns: now_ns(),
200 attempts: 0,
201 max_attempts: config.max_attempts,
202 acked: false,
203 }),
204 );
205 if let Some(xid) = self.current_xid() {
206 entity.set_xmin(xid);
207 }
208 let id = store
209 .insert_auto(queue, entity)
210 .map_err(|err| RedDBError::Internal(err.to_string()))?;
211 if let Some(ttl_ms) = config.ttl_ms {
212 store
213 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
214 .map_err(|err| RedDBError::Internal(err.to_string()))?;
215 }
216 self.invalidate_result_cache_for_table(queue);
217 Ok(id)
218 }
219
220 pub fn execute_create_queue(
221 &self,
222 raw_query: &str,
223 query: &CreateQueueQuery,
224 ) -> RedDBResult<RuntimeQueryResult> {
225 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
226 if query.dlq.as_deref() == Some(query.name.as_str()) {
227 return Err(RedDBError::Query(
228 "dead-letter queue must be different from the source queue".to_string(),
229 ));
230 }
231
232 let store = self.inner.db.store();
233 let exists = store.get_collection(&query.name).is_some();
234 if exists {
235 if query.if_not_exists {
236 return Ok(RuntimeQueryResult::ok_message(
237 raw_query.to_string(),
238 &format!("queue '{}' already exists", query.name),
239 "create",
240 ));
241 }
242 return Err(RedDBError::Query(format!(
243 "queue '{}' already exists",
244 query.name
245 )));
246 }
247
248 store
249 .create_collection(&query.name)
250 .map_err(|err| RedDBError::Internal(err.to_string()))?;
251 if let Some(ttl_ms) = query.ttl_ms {
252 self.inner
253 .db
254 .set_collection_default_ttl_ms(&query.name, ttl_ms);
255 }
256 self.inner
257 .db
258 .save_collection_contract(queue_collection_contract(
259 &query.name,
260 query.priority,
261 query.ttl_ms,
262 ))
263 .map_err(|err| RedDBError::Internal(err.to_string()))?;
264 save_queue_config(
265 store.as_ref(),
266 &query.name,
267 &QueueRuntimeConfig {
268 mode: query.mode,
269 priority: query.priority,
270 max_size: query.max_size,
271 ttl_ms: query.ttl_ms,
272 dlq: query.dlq.clone(),
273 max_attempts: query.max_attempts,
274 lock_deadline_ms: query.lock_deadline_ms,
275 in_flight_cap_per_group: query.in_flight_cap_per_group,
276 },
277 )?;
278
279 if let Some(dlq) = &query.dlq {
280 if store.get_collection(dlq).is_none() {
281 store
282 .create_collection(dlq)
283 .map_err(|err| RedDBError::Internal(err.to_string()))?;
284 self.inner
285 .db
286 .save_collection_contract(queue_collection_contract(dlq, false, None))
287 .map_err(|err| RedDBError::Internal(err.to_string()))?;
288 }
289 }
290
291 self.invalidate_result_cache();
292 self.inner
293 .db
294 .persist_metadata()
295 .map_err(|err| RedDBError::Internal(err.to_string()))?;
296 let mut type_tags = Vec::new();
301 if let Some(dlq) = &query.dlq {
302 type_tags.push(format!("dlq:{}", dlq));
303 }
304 self.schema_vocabulary_apply(
305 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
306 collection: query.name.clone(),
307 columns: vec!["payload".to_string()],
308 type_tags,
309 description: None,
310 },
311 );
312
313 let mut msg = format!("queue '{}' created", query.name);
314 msg.push_str(&format!(" (mode={})", query.mode.as_str()));
315 if query.priority {
316 msg.push_str(" (priority)");
317 }
318 if let Some(max_size) = query.max_size {
319 msg.push_str(&format!(" (max_size={max_size})"));
320 }
321 if let Some(ttl_ms) = query.ttl_ms {
322 msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
323 }
324 if let Some(dlq) = &query.dlq {
325 msg.push_str(&format!(
326 " (dlq={dlq}, max_attempts={})",
327 query.max_attempts
328 ));
329 }
330
331 Ok(RuntimeQueryResult::ok_message(
332 raw_query.to_string(),
333 &msg,
334 "create",
335 ))
336 }
337
338 pub fn execute_alter_queue(
339 &self,
340 raw_query: &str,
341 query: &AlterQueueQuery,
342 ) -> RedDBResult<RuntimeQueryResult> {
343 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
344 let store = self.inner.db.store();
345 ensure_queue_exists(store.as_ref(), &query.name)?;
346
347 let mut config = load_queue_config(store.as_ref(), &query.name);
348 let mut summary: Vec<String> = Vec::new();
349
350 if let Some(new_mode) = query.mode {
351 let pending =
352 load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
353 if !pending.is_empty() {
354 tracing::warn!(
355 queue = %query.name,
356 pending_count = pending.len(),
357 new_mode = %new_mode.as_str(),
358 "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
359 new reads use {}",
360 pending.len(),
361 new_mode.as_str(),
362 );
363 }
364 config.mode = new_mode;
365 summary.push(format!("mode={}", new_mode.as_str()));
366 }
367 if let Some(max_attempts) = query.max_attempts {
368 config.max_attempts = max_attempts;
369 summary.push(format!("max_attempts={max_attempts}"));
370 }
371 if let Some(lock_deadline_ms) = query.lock_deadline_ms {
372 config.lock_deadline_ms = lock_deadline_ms;
373 summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
374 }
375 if let Some(in_flight_cap) = query.in_flight_cap_per_group {
376 config.in_flight_cap_per_group = in_flight_cap;
377 summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
378 }
379 if let Some(dlq) = &query.dlq {
380 if dlq == &query.name {
381 return Err(RedDBError::Query(
382 "dead-letter queue must be different from the source queue".to_string(),
383 ));
384 }
385 config.dlq = Some(dlq.clone());
386 summary.push(format!("dlq={dlq}"));
387 }
388
389 save_queue_config(store.as_ref(), &query.name, &config)?;
390
391 self.invalidate_result_cache();
392 self.inner
393 .db
394 .persist_metadata()
395 .map_err(|err| RedDBError::Internal(err.to_string()))?;
396
397 Ok(RuntimeQueryResult::ok_message(
398 raw_query.to_string(),
399 &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
400 "alter",
401 ))
402 }
403
404 pub fn execute_drop_queue(
405 &self,
406 raw_query: &str,
407 query: &DropQueueQuery,
408 ) -> RedDBResult<RuntimeQueryResult> {
409 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
410 let store = self.inner.db.store();
411 if super::impl_ddl::is_system_schema_name(&query.name) {
412 return Err(RedDBError::Query("system schema is read-only".to_string()));
413 }
414 if store.get_collection(&query.name).is_none() {
415 if query.if_exists {
416 return Ok(RuntimeQueryResult::ok_message(
417 raw_query.to_string(),
418 &format!("queue '{}' does not exist", query.name),
419 "drop",
420 ));
421 }
422 return Err(RedDBError::NotFound(format!(
423 "queue '{}' not found",
424 query.name
425 )));
426 }
427 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
428 &query.name,
429 &self.inner.db.catalog_model_snapshot(),
430 )?;
431 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
432 crate::catalog::CollectionModel::Queue,
433 actual,
434 )?;
435
436 store
437 .drop_collection(&query.name)
438 .map_err(|err| RedDBError::Internal(err.to_string()))?;
439 self.inner.db.clear_collection_default_ttl_ms(&query.name);
440 self.inner
441 .db
442 .remove_collection_contract(&query.name)
443 .map_err(|err| RedDBError::Internal(err.to_string()))?;
444 remove_queue_metadata(store.as_ref(), &query.name);
445 self.invalidate_result_cache();
446 self.inner
447 .db
448 .persist_metadata()
449 .map_err(|err| RedDBError::Internal(err.to_string()))?;
450 self.schema_vocabulary_apply(
452 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
453 collection: query.name.clone(),
454 },
455 );
456
457 Ok(RuntimeQueryResult::ok_message(
458 raw_query.to_string(),
459 &format!("queue '{}' dropped", query.name),
460 "drop",
461 ))
462 }
463
464 pub fn execute_queue_command(
465 &self,
466 raw_query: &str,
467 cmd: &QueueCommand,
468 ) -> RedDBResult<RuntimeQueryResult> {
469 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
470 match cmd {
471 QueueCommand::Push {
472 queue,
473 value,
474 side,
475 priority,
476 } => {
477 let store = self.inner.db.store();
478 ensure_queue_exists(store.as_ref(), queue)?;
479 let config = load_queue_config(store.as_ref(), queue);
480 if priority.is_some() && !config.priority {
481 return Err(RedDBError::Query(format!(
482 "queue '{}' is not a priority queue",
483 queue
484 )));
485 }
486 if let Some(max_size) = config.max_size {
487 let current_len =
488 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
489 .len();
490 if current_len >= max_size {
491 return Err(RedDBError::Query(format!(
492 "queue '{}' is full (max_size={max_size})",
493 queue
494 )));
495 }
496 }
497
498 let position = next_queue_position(store.as_ref(), queue, *side)?;
499 let mut entity = UnifiedEntity::new(
500 EntityId::new(0),
501 EntityKind::QueueMessage {
502 queue: queue.clone(),
503 position,
504 },
505 EntityData::QueueMessage(QueueMessageData {
506 payload: value.clone(),
507 priority: if config.priority { *priority } else { None },
508 enqueued_at_ns: now_ns(),
509 attempts: 0,
510 max_attempts: config.max_attempts,
511 acked: false,
512 }),
513 );
514 if let Some(xid) = self.current_xid() {
517 entity.set_xmin(xid);
518 }
519 let id = store
520 .insert_auto(queue, entity)
521 .map_err(|err| RedDBError::Internal(err.to_string()))?;
522 if let Some(ttl_ms) = config.ttl_ms {
523 store
524 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
525 .map_err(|err| RedDBError::Internal(err.to_string()))?;
526 }
527 self.invalidate_result_cache();
528
529 let mut result = UnifiedResult::with_columns(vec![
530 "message_id".into(),
531 "side".into(),
532 "queue".into(),
533 ]);
534 let mut record = UnifiedRecord::new();
535 record.set("message_id", Value::text(message_id_string(id)));
536 record.set(
537 "side",
538 Value::text(match side {
539 QueueSide::Left => "left".to_string(),
540 QueueSide::Right => "right".to_string(),
541 }),
542 );
543 record.set("queue", Value::text(queue.clone()));
544 result.push(record);
545
546 Ok(RuntimeQueryResult {
547 query: raw_query.to_string(),
548 mode: QueryMode::Sql,
549 statement: "queue_push",
550 engine: "runtime-queue",
551 result,
552 affected_rows: 1,
553 statement_type: "insert",
554 })
555 }
556 QueueCommand::Pop { queue, side, count } => {
557 let store = self.inner.db.store();
558 ensure_queue_exists(store.as_ref(), queue)?;
559 let popped = super::queue_delivery::pop_messages(
560 self,
561 store.as_ref(),
562 queue,
563 *side,
564 *count,
565 )?;
566
567 let mut result =
568 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
569 for message in &popped {
570 let mut record = UnifiedRecord::new();
571 record.set(
572 "message_id",
573 Value::text(message_id_string(message.message_id)),
574 );
575 record.set("payload", message.payload.clone());
576 result.push(record);
577 }
578 let popped_count = popped.len() as u64;
579 if popped_count > 0 {
580 self.invalidate_result_cache();
581 }
582
583 Ok(RuntimeQueryResult {
584 query: raw_query.to_string(),
585 mode: QueryMode::Sql,
586 statement: "queue_pop",
587 engine: "runtime-queue",
588 result,
589 affected_rows: popped_count,
590 statement_type: "delete",
591 })
592 }
593 QueueCommand::Peek { queue, count } => {
594 let store = self.inner.db.store();
595 ensure_queue_exists(store.as_ref(), queue)?;
596 let messages =
597 super::queue_delivery::peek_messages(self, store.as_ref(), queue, *count)?;
598
599 let mut result =
600 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
601 for message in messages {
602 let mut record = UnifiedRecord::new();
603 record.set(
604 "message_id",
605 Value::text(message_id_string(message.message_id)),
606 );
607 record.set("payload", message.payload);
608 result.push(record);
609 }
610
611 Ok(RuntimeQueryResult {
612 query: raw_query.to_string(),
613 mode: QueryMode::Sql,
614 statement: "queue_peek",
615 engine: "runtime-queue",
616 result,
617 affected_rows: 0,
618 statement_type: "select",
619 })
620 }
621 QueueCommand::Len { queue } => {
622 let store = self.inner.db.store();
623 ensure_queue_exists(store.as_ref(), queue)?;
624 let count =
625 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
626 as u64;
627 let mut result = UnifiedResult::with_columns(vec!["len".into()]);
628 let mut record = UnifiedRecord::new();
629 record.set("len", Value::UnsignedInteger(count));
630 result.push(record);
631
632 Ok(RuntimeQueryResult {
633 query: raw_query.to_string(),
634 mode: QueryMode::Sql,
635 statement: "queue_len",
636 engine: "runtime-queue",
637 result,
638 affected_rows: 0,
639 statement_type: "select",
640 })
641 }
642 QueueCommand::Purge { queue } => {
643 let store = self.inner.db.store();
644 ensure_queue_exists(store.as_ref(), queue)?;
645 let count = super::queue_delivery::purge_messages(self, store.as_ref(), queue)?;
646 if count > 0 {
647 self.invalidate_result_cache();
648 }
649
650 Ok(RuntimeQueryResult::ok_message(
651 raw_query.to_string(),
652 &format!("{count} messages purged from queue '{queue}'"),
653 "delete",
654 ))
655 }
656 QueueCommand::GroupCreate { queue, group } => {
657 let store = self.inner.db.store();
658 ensure_queue_exists(store.as_ref(), queue)?;
659 if queue_group_exists(store.as_ref(), queue, group)? {
660 return Ok(RuntimeQueryResult::ok_message(
661 raw_query.to_string(),
662 &format!(
663 "consumer group '{}' already exists on queue '{}'",
664 group, queue
665 ),
666 "create",
667 ));
668 }
669 save_queue_group(store.as_ref(), queue, group)?;
670 self.invalidate_result_cache();
671
672 Ok(RuntimeQueryResult::ok_message(
673 raw_query.to_string(),
674 &format!("consumer group '{}' created on queue '{}'", group, queue),
675 "create",
676 ))
677 }
678 QueueCommand::GroupRead {
679 queue,
680 group,
681 consumer,
682 count,
683 } => {
684 let store = self.inner.db.store();
685 ensure_queue_exists(store.as_ref(), queue)?;
686 let delivered = super::queue_delivery::read_messages(
687 self,
688 store.as_ref(),
689 queue,
690 group.as_deref(),
691 consumer,
692 *count,
693 )?;
694
695 let mut result = UnifiedResult::with_columns(vec![
696 "message_id".into(),
697 "payload".into(),
698 "consumer".into(),
699 "delivery_count".into(),
700 "attempts".into(),
701 ]);
702
703 for message in delivered {
704 let mut record = UnifiedRecord::new();
705 record.set(
706 "message_id",
707 Value::text(message_id_string(message.message_id)),
708 );
709 record.set("payload", message.payload);
710 record.set("consumer", Value::text(message.consumer));
711 record.set(
712 "delivery_count",
713 Value::UnsignedInteger(u64::from(message.delivery_count)),
714 );
715 record.set(
716 "attempts",
717 Value::UnsignedInteger(u64::from(message.delivery_count)),
718 );
719 result.push(record);
720 }
721 if !result.records.is_empty() {
722 self.invalidate_result_cache();
723 }
724
725 Ok(RuntimeQueryResult {
726 query: raw_query.to_string(),
727 mode: QueryMode::Sql,
728 statement: "queue_group_read",
729 engine: "runtime-queue",
730 result,
731 affected_rows: 0,
732 statement_type: "select",
733 })
734 }
735 QueueCommand::Pending { queue, group } => {
736 let store = self.inner.db.store();
737 ensure_queue_exists(store.as_ref(), queue)?;
738 require_queue_group(store.as_ref(), queue, group)?;
739 let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
740 pending.sort_by_key(|entry| entry.delivered_at_ns);
741 let current_time_ns = now_ns();
742
743 let mut result = UnifiedResult::with_columns(vec![
744 "message_id".into(),
745 "consumer".into(),
746 "delivered_at_ns".into(),
747 "delivery_count".into(),
748 "idle_ms".into(),
749 ]);
750 for entry in pending {
751 let mut record = UnifiedRecord::new();
752 record.set(
753 "message_id",
754 Value::text(message_id_string(entry.message_id)),
755 );
756 record.set("consumer", Value::text(entry.consumer));
757 record.set(
758 "delivered_at_ns",
759 Value::UnsignedInteger(entry.delivered_at_ns),
760 );
761 record.set(
762 "delivery_count",
763 Value::UnsignedInteger(u64::from(entry.delivery_count)),
764 );
765 record.set(
766 "idle_ms",
767 Value::UnsignedInteger(
768 current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
769 ),
770 );
771 result.push(record);
772 }
773
774 Ok(RuntimeQueryResult {
775 query: raw_query.to_string(),
776 mode: QueryMode::Sql,
777 statement: "queue_pending",
778 engine: "runtime-queue",
779 result,
780 affected_rows: 0,
781 statement_type: "select",
782 })
783 }
784 QueueCommand::Claim {
785 queue,
786 group,
787 consumer,
788 min_idle_ms,
789 } => {
790 let store = self.inner.db.store();
791 ensure_queue_exists(store.as_ref(), queue)?;
792 let delivered = super::queue_delivery::claim_messages(
793 self,
794 store.as_ref(),
795 queue,
796 group,
797 consumer,
798 *min_idle_ms,
799 )?;
800
801 let mut result = UnifiedResult::with_columns(vec![
802 "message_id".into(),
803 "payload".into(),
804 "consumer".into(),
805 "delivery_count".into(),
806 ]);
807
808 for message in delivered {
809 let mut record = UnifiedRecord::new();
810 record.set(
811 "message_id",
812 Value::text(message_id_string(message.message_id)),
813 );
814 record.set("payload", message.payload);
815 record.set("consumer", Value::text(message.consumer));
816 record.set(
817 "delivery_count",
818 Value::UnsignedInteger(u64::from(message.delivery_count)),
819 );
820 result.push(record);
821 }
822 if !result.records.is_empty() {
823 self.invalidate_result_cache();
824 }
825 let affected_rows = result.records.len() as u64;
826
827 Ok(RuntimeQueryResult {
828 query: raw_query.to_string(),
829 mode: QueryMode::Sql,
830 statement: "queue_claim",
831 engine: "runtime-queue",
832 result,
833 affected_rows,
834 statement_type: "update",
835 })
836 }
837 QueueCommand::Ack {
838 queue,
839 group,
840 message_id,
841 delivery_id,
842 } => {
843 let store = self.inner.db.store();
844 ensure_queue_exists(store.as_ref(), queue)?;
845 let (group_owned, message_entity) = resolve_ack_nack_handle(
846 store.as_ref(),
847 queue,
848 group,
849 message_id,
850 delivery_id.as_deref(),
851 )?;
852 let group_ref = group_owned.as_str();
853 require_queue_group(store.as_ref(), queue, group_ref)?;
854 let config = load_queue_config(store.as_ref(), queue);
855 super::queue_delivery::ack_message(
856 self,
857 store.as_ref(),
858 queue,
859 group_ref,
860 message_entity,
861 &config,
862 )?;
863 self.invalidate_result_cache();
864
865 Ok(RuntimeQueryResult::ok_message(
866 raw_query.to_string(),
867 "message acknowledged",
868 "update",
869 ))
870 }
871 QueueCommand::Nack {
872 queue,
873 group,
874 message_id,
875 delivery_id,
876 } => {
877 let store = self.inner.db.store();
878 ensure_queue_exists(store.as_ref(), queue)?;
879 let (group_owned, message_entity) = resolve_ack_nack_handle(
880 store.as_ref(),
881 queue,
882 group,
883 message_id,
884 delivery_id.as_deref(),
885 )?;
886 let group_ref = group_owned.as_str();
887 require_queue_group(store.as_ref(), queue, group_ref)?;
888 let config = load_queue_config(store.as_ref(), queue);
889 let message = match super::queue_delivery::nack_message(
890 self,
891 store.as_ref(),
892 queue,
893 group_ref,
894 message_entity,
895 &config,
896 )? {
897 super::queue_delivery::NackOutcome::Requeued => "message requeued".to_string(),
898 super::queue_delivery::NackOutcome::MovedToDlq(dlq) => {
899 format!("message moved to dead-letter queue '{}'", dlq)
900 }
901 super::queue_delivery::NackOutcome::Dropped => {
902 "message dropped after max attempts".to_string()
903 }
904 };
905 self.invalidate_result_cache();
906
907 Ok(RuntimeQueryResult::ok_message(
908 raw_query.to_string(),
909 &message,
910 "update",
911 ))
912 }
913 QueueCommand::Move {
914 source,
915 destination,
916 filter,
917 limit,
918 } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
919 }
920 }
921
922 pub fn execute_queue_select(
923 &self,
924 raw_query: &str,
925 query: &QueueSelectQuery,
926 ) -> RedDBResult<RuntimeQueryResult> {
927 let store = self.inner.db.store();
928 ensure_queue_exists(store.as_ref(), &query.queue)?;
929 let config = load_queue_config(store.as_ref(), &query.queue);
930 let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
931 let columns = if query.columns.is_empty() {
932 queue_projection_default_columns()
933 } else {
934 query.columns.clone()
935 };
936
937 let mut messages =
938 load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
939 sort_queue_messages(&mut messages, &config, QueueSide::Left);
940
941 let mut result = UnifiedResult::with_columns(columns.clone());
942 for message in messages {
943 if query
944 .filter
945 .as_ref()
946 .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
947 {
948 continue;
949 }
950 let record = queue_projection_record(&columns, &message, dlq)?;
951 result.push(record);
952 if query
953 .limit
954 .is_some_and(|limit| result.records.len() >= limit as usize)
955 {
956 break;
957 }
958 }
959
960 Ok(RuntimeQueryResult {
961 query: raw_query.to_string(),
962 mode: QueryMode::Sql,
963 statement: "queue_select",
964 engine: "runtime-queue",
965 result,
966 affected_rows: 0,
967 statement_type: "select",
968 })
969 }
970
971 fn execute_queue_move(
972 &self,
973 raw_query: &str,
974 source: &str,
975 destination: &str,
976 filter: Option<&Filter>,
977 limit: usize,
978 ) -> RedDBResult<RuntimeQueryResult> {
979 if source == destination {
980 return Err(RedDBError::Query(
981 "QUEUE MOVE source and destination must be different".to_string(),
982 ));
983 }
984 let store = self.inner.db.store();
985 ensure_queue_exists(store.as_ref(), source)?;
986 ensure_queue_exists(store.as_ref(), destination)?;
987 let source_config = load_queue_config(store.as_ref(), source);
988 let destination_config = load_queue_config(store.as_ref(), destination);
989 let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
990
991 let mut messages =
992 load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
993 sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
994 let selected = messages
995 .into_iter()
996 .filter(|message| {
997 filter
998 .map(|f| queue_message_matches_filter(message, source_dlq, f))
999 .unwrap_or(true)
1000 })
1001 .take(limit)
1002 .collect::<Vec<_>>();
1003
1004 if let Some(max_size) = destination_config.max_size {
1005 let current_len =
1006 load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
1007 .len();
1008 if current_len + selected.len() > max_size {
1009 return Err(RedDBError::Query(format!(
1010 "queue '{}' is full (max_size={max_size})",
1011 destination
1012 )));
1013 }
1014 }
1015
1016 for message in &selected {
1017 let lock = queue_message_lock_handle(self, source, message.id);
1018 let Some(_guard) = lock.try_lock() else {
1019 return Err(RedDBError::Query(format!(
1020 "message '{}' is locked on queue '{}'",
1021 message.id.raw(),
1022 source
1023 )));
1024 };
1025 if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1026 return Err(RedDBError::Query(format!(
1027 "message '{}' is no longer available on queue '{}'",
1028 message.id.raw(),
1029 source
1030 )));
1031 }
1032 }
1033
1034 let mut inserted = Vec::new();
1035 for message in &selected {
1036 match insert_moved_queue_message(
1037 store.as_ref(),
1038 destination,
1039 &destination_config,
1040 message,
1041 ) {
1042 Ok(id) => inserted.push(id),
1043 Err(err) => {
1044 for id in inserted {
1045 let _ = store.delete(destination, id);
1046 }
1047 return Err(err);
1048 }
1049 }
1050 }
1051
1052 for message in &selected {
1053 super::queue_delivery::delete_message_with_state(
1054 Some(self),
1055 store.as_ref(),
1056 source,
1057 message.id,
1058 )?;
1059 }
1060 if !selected.is_empty() {
1061 self.invalidate_result_cache();
1062 }
1063
1064 let selected_count = selected.len() as u64;
1065 self.audit_log().record_event(
1066 AuditEvent::builder("queue/move")
1067 .source(AuditAuthSource::System)
1068 .outcome(Outcome::Success)
1069 .resource(format!("queue:{source}->{destination}"))
1070 .fields([
1071 AuditFieldEscaper::field("source", source),
1072 AuditFieldEscaper::field("destination", destination),
1073 AuditFieldEscaper::field("selected", selected_count),
1074 AuditFieldEscaper::field("committed", selected_count),
1075 ])
1076 .build(),
1077 );
1078
1079 let mut result = UnifiedResult::with_columns(vec![
1080 "source".into(),
1081 "destination".into(),
1082 "selected".into(),
1083 "committed".into(),
1084 ]);
1085 let mut record = UnifiedRecord::new();
1086 record.set("source", Value::text(source.to_string()));
1087 record.set("destination", Value::text(destination.to_string()));
1088 record.set("selected", Value::UnsignedInteger(selected_count));
1089 record.set("committed", Value::UnsignedInteger(selected_count));
1090 result.push(record);
1091
1092 Ok(RuntimeQueryResult {
1093 query: raw_query.to_string(),
1094 mode: QueryMode::Sql,
1095 statement: "queue_move",
1096 engine: "runtime-queue",
1097 result,
1098 affected_rows: selected_count,
1099 statement_type: "update",
1100 })
1101 }
1102}
1103
1104fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1105 if store.get_collection(queue).is_some() {
1106 Ok(())
1107 } else {
1108 Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1109 }
1110}
1111
1112pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1113 let default = QueueRuntimeConfig {
1114 mode: QueueMode::Work,
1115 priority: false,
1116 max_size: None,
1117 ttl_ms: None,
1118 dlq: None,
1119 max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1120 lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1121 in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1122 };
1123
1124 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1125 return default;
1126 };
1127 manager
1128 .query_all(|entity| {
1129 entity.data.as_row().is_some_and(|row| {
1130 row_text(row, "kind").as_deref() == Some("queue_config")
1131 && row_text(row, "queue").as_deref() == Some(queue)
1132 })
1133 })
1134 .into_iter()
1135 .find_map(|entity| {
1136 let row = entity.data.as_row()?;
1137 Some(QueueRuntimeConfig {
1138 mode: row_text(row, "mode")
1139 .as_deref()
1140 .and_then(QueueMode::parse)
1141 .unwrap_or_default(),
1142 priority: row_bool(row, "priority").unwrap_or(false),
1143 max_size: row_u64(row, "max_size").map(|value| value as usize),
1144 ttl_ms: row_u64(row, "ttl_ms"),
1145 dlq: row_text(row, "dlq"),
1146 max_attempts: row_u64(row, "max_attempts")
1147 .map(|value| value as u32)
1148 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1149 lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1150 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1151 in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1152 .map(|value| value as u32)
1153 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1154 })
1155 })
1156 .unwrap_or(default)
1157}
1158
1159pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1160 load_queue_config(store, queue).mode.as_str()
1161}
1162
1163fn save_queue_config(
1164 store: &UnifiedStore,
1165 queue: &str,
1166 config: &QueueRuntimeConfig,
1167) -> RedDBResult<()> {
1168 remove_meta_rows(store, |row| {
1169 row_text(row, "kind").as_deref() == Some("queue_config")
1170 && row_text(row, "queue").as_deref() == Some(queue)
1171 });
1172
1173 let mut fields = HashMap::new();
1174 fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1175 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1176 fields.insert(
1177 "mode".to_string(),
1178 Value::text(config.mode.as_str().to_string()),
1179 );
1180 fields.insert("priority".to_string(), Value::Boolean(config.priority));
1181 fields.insert(
1182 "max_size".to_string(),
1183 config
1184 .max_size
1185 .map(|value| Value::UnsignedInteger(value as u64))
1186 .unwrap_or(Value::Null),
1187 );
1188 fields.insert(
1189 "ttl_ms".to_string(),
1190 config
1191 .ttl_ms
1192 .map(Value::UnsignedInteger)
1193 .unwrap_or(Value::Null),
1194 );
1195 fields.insert(
1196 "dlq".to_string(),
1197 config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1198 );
1199 fields.insert(
1200 "max_attempts".to_string(),
1201 Value::UnsignedInteger(u64::from(config.max_attempts)),
1202 );
1203 fields.insert(
1204 "lock_deadline_ms".to_string(),
1205 Value::UnsignedInteger(config.lock_deadline_ms),
1206 );
1207 fields.insert(
1208 "in_flight_cap_per_group".to_string(),
1209 Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1210 );
1211 insert_meta_row(store, fields)
1212}
1213
1214fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1215 remove_meta_rows(store, |row| {
1216 row_text(row, "queue").as_deref() == Some(queue)
1217 });
1218}
1219
1220fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1221 Ok(load_queue_groups(store, queue)?
1222 .into_iter()
1223 .any(|entry| entry.group == group))
1224}
1225
1226pub(super) fn require_queue_group(
1227 store: &UnifiedStore,
1228 queue: &str,
1229 group: &str,
1230) -> RedDBResult<()> {
1231 if queue_group_exists(store, queue, group)? {
1232 Ok(())
1233 } else {
1234 Err(RedDBError::NotFound(format!(
1235 "consumer group '{}' not found on queue '{}'",
1236 group, queue
1237 )))
1238 }
1239}
1240
1241pub(super) fn resolve_read_group(
1242 store: &UnifiedStore,
1243 queue: &str,
1244 group: Option<&str>,
1245 consumer: &str,
1246 config: &QueueRuntimeConfig,
1247) -> RedDBResult<String> {
1248 if let Some(group) = group {
1249 require_queue_group(store, queue, group)?;
1250 return Ok(group.to_string());
1251 }
1252
1253 match config.mode {
1254 QueueMode::Work => {
1255 if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1256 save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1257 }
1258 Ok(WORK_DEFAULT_GROUP.to_string())
1259 }
1260 QueueMode::Fanout => {
1261 let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1262 if !queue_group_exists(store, queue, &fanout_group)? {
1263 save_queue_group(store, queue, &fanout_group)?;
1264 }
1265 Ok(fanout_group)
1266 }
1267 }
1268}
1269
1270fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1271 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1272 return Ok(Vec::new());
1273 };
1274 Ok(manager
1275 .query_all(|entity| {
1276 entity.data.as_row().is_some_and(|row| {
1277 row_text(row, "kind").as_deref() == Some("queue_group")
1278 && row_text(row, "queue").as_deref() == Some(queue)
1279 })
1280 })
1281 .into_iter()
1282 .filter_map(|entity| {
1283 let row = entity.data.as_row()?;
1284 Some(QueueGroupEntry {
1285 entity_id: entity.id,
1286 group: row_text(row, "group")?,
1287 })
1288 })
1289 .collect())
1290}
1291
1292fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1293 let mut fields = HashMap::new();
1294 fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1295 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1296 fields.insert("group".to_string(), Value::text(group.to_string()));
1297 fields.insert(
1298 "created_at_ns".to_string(),
1299 Value::UnsignedInteger(now_ns()),
1300 );
1301 insert_meta_row(store, fields)
1302}
1303
1304pub(super) fn load_pending_entries(
1305 store: &UnifiedStore,
1306 queue: &str,
1307 group: Option<&str>,
1308 message_id: Option<EntityId>,
1309) -> RedDBResult<Vec<QueuePendingEntry>> {
1310 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1311 return Ok(Vec::new());
1312 };
1313 Ok(manager
1314 .query_all(|entity| {
1315 entity.data.as_row().is_some_and(|row| {
1316 row_text(row, "kind").as_deref() == Some("queue_pending")
1317 && row_text(row, "queue").as_deref() == Some(queue)
1318 && group
1319 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1320 .unwrap_or(true)
1321 && message_id
1322 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1323 .unwrap_or(true)
1324 })
1325 })
1326 .into_iter()
1327 .filter_map(|entity| {
1328 let row = entity.data.as_row()?;
1329 Some(QueuePendingEntry {
1330 entity_id: entity.id,
1331 group: row_text(row, "group")?,
1332 message_id: EntityId::new(row_u64(row, "message_id")?),
1333 consumer: row_text(row, "consumer")?,
1334 delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1335 delivery_count: row_u64(row, "delivery_count")
1336 .map(|value| value as u32)
1337 .unwrap_or(1),
1338 })
1339 })
1340 .collect())
1341}
1342
1343pub(super) fn save_queue_pending(
1344 store: &UnifiedStore,
1345 queue: &str,
1346 group: &str,
1347 message_id: EntityId,
1348 consumer: &str,
1349 delivered_at_ns: u64,
1350 delivery_count: u32,
1351) -> RedDBResult<()> {
1352 remove_meta_rows(store, |row| {
1353 row_text(row, "kind").as_deref() == Some("queue_pending")
1354 && row_text(row, "queue").as_deref() == Some(queue)
1355 && row_text(row, "group").as_deref() == Some(group)
1356 && row_u64(row, "message_id") == Some(message_id.raw())
1357 });
1358
1359 let mut fields = HashMap::new();
1360 fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1361 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1362 fields.insert("group".to_string(), Value::text(group.to_string()));
1363 fields.insert(
1364 "message_id".to_string(),
1365 Value::UnsignedInteger(message_id.raw()),
1366 );
1367 fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1368 fields.insert(
1369 "delivered_at_ns".to_string(),
1370 Value::UnsignedInteger(delivered_at_ns),
1371 );
1372 fields.insert(
1373 "delivery_count".to_string(),
1374 Value::UnsignedInteger(u64::from(delivery_count)),
1375 );
1376 insert_meta_row(store, fields)
1377}
1378
1379pub(super) fn require_pending_entry(
1380 store: &UnifiedStore,
1381 queue: &str,
1382 group: &str,
1383 message_id: EntityId,
1384) -> RedDBResult<QueuePendingEntry> {
1385 load_pending_entries(store, queue, Some(group), Some(message_id))?
1386 .into_iter()
1387 .next()
1388 .ok_or_else(|| {
1389 RedDBError::NotFound(format!(
1390 "message '{}' is not pending in group '{}' on queue '{}'",
1391 message_id.raw(),
1392 group,
1393 queue
1394 ))
1395 })
1396}
1397
1398pub(super) fn load_ack_entries(
1399 store: &UnifiedStore,
1400 queue: &str,
1401 group: Option<&str>,
1402 message_id: Option<EntityId>,
1403) -> RedDBResult<Vec<QueueAckEntry>> {
1404 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1405 return Ok(Vec::new());
1406 };
1407 Ok(manager
1408 .query_all(|entity| {
1409 entity.data.as_row().is_some_and(|row| {
1410 row_text(row, "kind").as_deref() == Some("queue_ack")
1411 && row_text(row, "queue").as_deref() == Some(queue)
1412 && group
1413 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1414 .unwrap_or(true)
1415 && message_id
1416 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1417 .unwrap_or(true)
1418 })
1419 })
1420 .into_iter()
1421 .filter_map(|entity| {
1422 let row = entity.data.as_row()?;
1423 Some(QueueAckEntry {
1424 entity_id: entity.id,
1425 group: row_text(row, "group")?,
1426 message_id: EntityId::new(row_u64(row, "message_id")?),
1427 })
1428 })
1429 .collect())
1430}
1431
1432pub(super) fn save_queue_ack(
1433 store: &UnifiedStore,
1434 queue: &str,
1435 group: &str,
1436 message_id: EntityId,
1437) -> RedDBResult<()> {
1438 let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1439 if !existing.is_empty() {
1440 return Ok(());
1441 }
1442
1443 let mut fields = HashMap::new();
1444 fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1445 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1446 fields.insert("group".to_string(), Value::text(group.to_string()));
1447 fields.insert(
1448 "message_id".to_string(),
1449 Value::UnsignedInteger(message_id.raw()),
1450 );
1451 fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1452 insert_meta_row(store, fields)
1453}
1454
1455pub(super) fn queue_message_completed_for_all_groups(
1456 store: &UnifiedStore,
1457 queue: &str,
1458 message_id: EntityId,
1459) -> RedDBResult<bool> {
1460 let groups = load_queue_groups(store, queue)?;
1461 let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1462 if !pending.is_empty() {
1463 return Ok(false);
1464 }
1465 if groups.is_empty() {
1466 return Ok(true);
1467 }
1468
1469 let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1470 .into_iter()
1471 .map(|entry| entry.group)
1472 .collect::<HashSet<_>>();
1473 Ok(groups
1474 .into_iter()
1475 .all(|group| acked_groups.contains(&group.group)))
1476}
1477
1478fn load_queue_message_views(
1479 store: &UnifiedStore,
1480 queue: &str,
1481) -> RedDBResult<Vec<QueueMessageView>> {
1482 load_queue_message_views_with_runtime(None, store, queue)
1483}
1484
1485pub(super) fn load_queue_message_views_with_runtime(
1492 runtime: Option<&RedDBRuntime>,
1493 store: &UnifiedStore,
1494 queue: &str,
1495) -> RedDBResult<Vec<QueueMessageView>> {
1496 let manager = store
1497 .get_collection(queue)
1498 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1499 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1503 let rls_filter = runtime.and_then(|rt| {
1504 crate::runtime::impl_core::rls_policy_filter_for_kind(
1505 rt,
1506 queue,
1507 crate::storage::query::ast::PolicyAction::Select,
1508 crate::storage::query::ast::PolicyTargetKind::Messages,
1509 )
1510 });
1511 let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1512 && rls_filter.is_none()
1513 && runtime.is_some();
1514 if rls_enabled_but_denied {
1515 return Ok(Vec::new());
1517 }
1518 let filter_arc = rls_filter.map(std::sync::Arc::new);
1519 let rt_arc = runtime;
1520 Ok(manager
1521 .query_all(move |entity| {
1522 if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1523 return false;
1524 }
1525 if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1526 return false;
1527 }
1528 if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1529 return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1530 Some(&rt.inner.db),
1531 entity,
1532 filter,
1533 queue,
1534 queue,
1535 );
1536 }
1537 true
1538 })
1539 .into_iter()
1540 .filter_map(queue_message_view_from_entity)
1541 .collect())
1542}
1543
1544fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1545 let (position, _) = match &entity.kind {
1546 EntityKind::QueueMessage { position, queue } => (*position, queue),
1547 _ => return None,
1548 };
1549 let data = match entity.data {
1550 EntityData::QueueMessage(data) => data,
1551 _ => return None,
1552 };
1553 Some(QueueMessageView {
1554 id: entity.id,
1555 position,
1556 priority: data.priority.unwrap_or(0),
1557 payload: data.payload,
1558 attempts: data.attempts,
1559 max_attempts: data.max_attempts,
1560 enqueued_at_ns: data.enqueued_at_ns,
1561 })
1562}
1563
1564fn insert_moved_queue_message(
1565 store: &UnifiedStore,
1566 queue: &str,
1567 config: &QueueRuntimeConfig,
1568 message: &QueueMessageView,
1569) -> RedDBResult<EntityId> {
1570 let position = next_queue_position(store, queue, QueueSide::Right)?;
1571 let entity = UnifiedEntity::new(
1572 EntityId::new(0),
1573 EntityKind::QueueMessage {
1574 queue: queue.to_string(),
1575 position,
1576 },
1577 EntityData::QueueMessage(QueueMessageData {
1578 payload: message.payload.clone(),
1579 priority: if config.priority {
1580 Some(message.priority)
1581 } else {
1582 None
1583 },
1584 enqueued_at_ns: message.enqueued_at_ns,
1585 attempts: message.attempts,
1586 max_attempts: message.max_attempts,
1587 acked: false,
1588 }),
1589 );
1590 let id = store
1591 .insert_auto(queue, entity)
1592 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1593 if let Some(ttl_ms) = config.ttl_ms {
1594 store
1595 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1596 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1597 }
1598 Ok(id)
1599}
1600
1601fn queue_projection_default_columns() -> Vec<String> {
1602 [
1603 "id",
1604 "payload",
1605 "priority",
1606 "attempts",
1607 "last_error",
1608 "enqueued_at",
1609 "available_at",
1610 "dlq",
1611 "tenant",
1612 ]
1613 .into_iter()
1614 .map(str::to_string)
1615 .collect()
1616}
1617
1618fn queue_projection_record(
1619 columns: &[String],
1620 message: &QueueMessageView,
1621 dlq: bool,
1622) -> RedDBResult<UnifiedRecord> {
1623 let mut record = UnifiedRecord::new();
1624 for column in columns {
1625 let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
1626 RedDBError::Query(format!("unknown queue projection column '{}'", column))
1627 })?;
1628 record.set(column, value);
1629 }
1630 Ok(record)
1631}
1632
1633fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
1634 match column {
1635 "id" => Some(Value::text(message_id_string(message.id))),
1636 "payload" => Some(message.payload.clone()),
1637 "priority" => Some(Value::Integer(i64::from(message.priority))),
1638 "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
1639 "last_error" => Some(Value::Null),
1640 "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1641 "available_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1642 "dlq" => Some(Value::Boolean(dlq)),
1643 "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
1644 _ => None,
1645 }
1646}
1647
1648fn queue_message_tenant(payload: &Value) -> Option<Value> {
1649 let Value::Json(bytes) = payload else {
1650 return None;
1651 };
1652 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1653 json.get("tenant")
1654 .and_then(crate::json::Value::as_str)
1655 .map(|tenant| Value::text(tenant.to_string()))
1656}
1657
1658fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
1659 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1660 return false;
1661 };
1662 !manager
1663 .query_all(|entity| {
1664 entity.data.as_row().is_some_and(|row| {
1665 row_text(row, "kind").as_deref() == Some("queue_config")
1666 && row_text(row, "dlq").as_deref() == Some(queue)
1667 })
1668 })
1669 .is_empty()
1670}
1671
1672fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
1673 match filter {
1674 Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
1675 .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
1676 Filter::CompareFields { left, op, right } => {
1677 match (
1678 queue_filter_field_value(message, dlq, left),
1679 queue_filter_field_value(message, dlq, right),
1680 ) {
1681 (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
1682 _ => false,
1683 }
1684 }
1685 Filter::And(left, right) => {
1686 queue_message_matches_filter(message, dlq, left)
1687 && queue_message_matches_filter(message, dlq, right)
1688 }
1689 Filter::Or(left, right) => {
1690 queue_message_matches_filter(message, dlq, left)
1691 || queue_message_matches_filter(message, dlq, right)
1692 }
1693 Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
1694 Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
1695 .is_none_or(|value| matches!(value, Value::Null)),
1696 Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
1697 .is_some_and(|value| !matches!(value, Value::Null)),
1698 Filter::In { field, values } => {
1699 queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
1700 values
1701 .iter()
1702 .any(|value| queue_values_equal(&candidate, value))
1703 })
1704 }
1705 Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
1706 .is_some_and(|candidate| {
1707 queue_compare_values(&candidate, low, CompareOp::Ge)
1708 && queue_compare_values(&candidate, high, CompareOp::Le)
1709 }),
1710 Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
1711 .is_some_and(|value| queue_like_matches(&value, pattern)),
1712 Filter::StartsWith { field, prefix } => {
1713 queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
1714 }
1715 Filter::EndsWith { field, suffix } => {
1716 queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
1717 }
1718 Filter::Contains { field, substring } => {
1719 queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
1720 }
1721 Filter::CompareExpr { .. } => false,
1722 }
1723}
1724
1725fn queue_filter_field_value(
1726 message: &QueueMessageView,
1727 dlq: bool,
1728 field: &FieldRef,
1729) -> Option<Value> {
1730 match field {
1731 FieldRef::TableColumn { table, column } if table.is_empty() => {
1732 queue_projection_value(message, dlq, column)
1733 .or_else(|| queue_payload_field_value(&message.payload, column))
1734 }
1735 FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
1736 .or_else(|| queue_payload_field_value(&message.payload, column)),
1737 _ => None,
1738 }
1739}
1740
1741fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
1742 let Value::Json(bytes) = payload else {
1743 return None;
1744 };
1745 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1746 let value = json.get(field)?;
1747 json_value_to_schema_value(value)
1748}
1749
1750fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
1751 if matches!(value, crate::json::Value::Null) {
1752 Some(Value::Null)
1753 } else if let Some(value) = value.as_bool() {
1754 Some(Value::Boolean(value))
1755 } else if let Some(value) = value.as_i64() {
1756 Some(Value::Integer(value))
1757 } else if let Some(value) = value.as_u64() {
1758 Some(Value::UnsignedInteger(value))
1759 } else if let Some(value) = value.as_f64() {
1760 Some(Value::Float(value))
1761 } else if let Some(value) = value.as_str() {
1762 Some(Value::text(value.to_string()))
1763 } else {
1764 Some(Value::Json(value.to_string_compact().into_bytes()))
1765 }
1766}
1767
1768fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
1769 queue_filter_field_value(message, dlq, field).and_then(|value| match value {
1770 Value::Text(value) => Some(value.to_string()),
1771 Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
1772 Value::Integer(value) => Some(value.to_string()),
1773 Value::UnsignedInteger(value) => Some(value.to_string()),
1774 Value::Float(value) => Some(value.to_string()),
1775 Value::Boolean(value) => Some(value.to_string()),
1776 _ => None,
1777 })
1778}
1779
1780fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
1781 match op {
1782 CompareOp::Eq => queue_values_equal(left, right),
1783 CompareOp::Ne => !queue_values_equal(left, right),
1784 CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
1785 CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
1786 CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
1787 CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
1788 }
1789}
1790
1791fn queue_values_equal(left: &Value, right: &Value) -> bool {
1792 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1793 return (left - right).abs() < f64::EPSILON;
1794 }
1795 match (left, right) {
1796 (Value::Text(left), Value::Text(right)) => left == right,
1797 (Value::Boolean(left), Value::Boolean(right)) => left == right,
1798 _ => left == right,
1799 }
1800}
1801
1802fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
1803 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1804 return left.partial_cmp(&right);
1805 }
1806 match (left, right) {
1807 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1808 _ => None,
1809 }
1810}
1811
1812fn queue_value_number(value: &Value) -> Option<f64> {
1813 match value {
1814 Value::Integer(value) => Some(*value as f64),
1815 Value::UnsignedInteger(value) => Some(*value as f64),
1816 Value::Float(value) => Some(*value),
1817 Value::Text(value) => value.parse().ok(),
1818 _ => None,
1819 }
1820}
1821
1822fn queue_like_matches(value: &str, pattern: &str) -> bool {
1823 if pattern == "%" {
1824 return true;
1825 }
1826 let starts_wild = pattern.starts_with('%');
1827 let ends_wild = pattern.ends_with('%');
1828 let needle = pattern.trim_matches('%');
1829 match (starts_wild, ends_wild) {
1830 (true, true) => value.contains(needle),
1831 (true, false) => value.ends_with(needle),
1832 (false, true) => value.starts_with(needle),
1833 (false, false) => value == needle,
1834 }
1835}
1836
1837pub(super) fn queue_message_view_by_id(
1838 store: &UnifiedStore,
1839 queue: &str,
1840 message_id: EntityId,
1841) -> RedDBResult<Option<QueueMessageView>> {
1842 let manager = queue_manager(store, queue)?;
1843 Ok(manager
1844 .get(message_id)
1845 .and_then(queue_message_view_from_entity))
1846}
1847
1848pub(super) fn sort_queue_messages(
1849 messages: &mut [QueueMessageView],
1850 config: &QueueRuntimeConfig,
1851 side: QueueSide,
1852) {
1853 messages.sort_by(|left, right| {
1854 if config.priority {
1855 right
1856 .priority
1857 .cmp(&left.priority)
1858 .then_with(|| match side {
1859 QueueSide::Left => left.position.cmp(&right.position),
1860 QueueSide::Right => right.position.cmp(&left.position),
1861 })
1862 .then_with(|| left.id.raw().cmp(&right.id.raw()))
1863 } else {
1864 match side {
1865 QueueSide::Left => left.position.cmp(&right.position),
1866 QueueSide::Right => right.position.cmp(&left.position),
1867 }
1868 .then_with(|| left.id.raw().cmp(&right.id.raw()))
1869 }
1870 });
1871}
1872
1873pub(super) fn next_queue_position(
1874 store: &UnifiedStore,
1875 queue: &str,
1876 side: QueueSide,
1877) -> RedDBResult<u64> {
1878 let messages = load_queue_message_views(store, queue)?;
1879 if messages.is_empty() {
1880 return Ok(QUEUE_POSITION_CENTER);
1881 }
1882 match side {
1883 QueueSide::Left => Ok(messages
1884 .iter()
1885 .map(|message| message.position)
1886 .min()
1887 .unwrap_or(QUEUE_POSITION_CENTER)
1888 .saturating_sub(1)),
1889 QueueSide::Right => Ok(messages
1890 .iter()
1891 .map(|message| message.position)
1892 .max()
1893 .unwrap_or(QUEUE_POSITION_CENTER)
1894 .saturating_add(1)),
1895 }
1896}
1897
1898pub(super) fn increment_queue_attempts(
1899 store: &UnifiedStore,
1900 queue: &str,
1901 message_id: EntityId,
1902) -> RedDBResult<u32> {
1903 let manager = queue_manager(store, queue)?;
1904 let mut entity = manager
1905 .get(message_id)
1906 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1907 match &mut entity.data {
1908 EntityData::QueueMessage(message) => {
1909 message.attempts = message.attempts.saturating_add(1);
1910 let attempts = message.attempts;
1911 manager
1912 .update(entity)
1913 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1914 Ok(attempts)
1915 }
1916 _ => Err(RedDBError::Query(format!(
1917 "entity '{}' is not a queue message",
1918 message_id.raw()
1919 ))),
1920 }
1921}
1922
1923pub(super) fn queue_message_attempts(
1924 store: &UnifiedStore,
1925 queue: &str,
1926 message_id: EntityId,
1927) -> RedDBResult<u32> {
1928 Ok(queue_message_data(store, queue, message_id)?.attempts)
1929}
1930
1931pub(super) fn queue_message_max_attempts(
1932 store: &UnifiedStore,
1933 queue: &str,
1934 message_id: EntityId,
1935) -> RedDBResult<u32> {
1936 Ok(queue_message_data(store, queue, message_id)?.max_attempts)
1937}
1938
1939pub(super) fn queue_message_payload(
1940 store: &UnifiedStore,
1941 queue: &str,
1942 message_id: EntityId,
1943) -> RedDBResult<Value> {
1944 Ok(queue_message_data(store, queue, message_id)?.payload)
1945}
1946
1947pub(super) fn queue_message_pending_any(
1948 store: &UnifiedStore,
1949 queue: &str,
1950 message_id: EntityId,
1951) -> RedDBResult<bool> {
1952 Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
1953}
1954
1955pub(super) fn queue_message_pending_for_group(
1956 store: &UnifiedStore,
1957 queue: &str,
1958 group: &str,
1959 message_id: EntityId,
1960) -> RedDBResult<bool> {
1961 Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1962}
1963
1964pub(super) fn queue_message_acked_for_group(
1965 store: &UnifiedStore,
1966 queue: &str,
1967 group: &str,
1968 message_id: EntityId,
1969) -> RedDBResult<bool> {
1970 Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1971}
1972
1973fn queue_manager(
1974 store: &UnifiedStore,
1975 queue: &str,
1976) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
1977 store
1978 .get_collection(queue)
1979 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
1980}
1981
1982pub(super) fn queue_message_data(
1983 store: &UnifiedStore,
1984 queue: &str,
1985 message_id: EntityId,
1986) -> RedDBResult<QueueMessageData> {
1987 let manager = queue_manager(store, queue)?;
1988 let entity = manager
1989 .get(message_id)
1990 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1991 match entity.data {
1992 EntityData::QueueMessage(message) => Ok(message),
1993 _ => Err(RedDBError::Query(format!(
1994 "entity '{}' is not a queue message",
1995 message_id.raw()
1996 ))),
1997 }
1998}
1999
2000fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
2001 let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
2002 store
2003 .insert_auto(
2004 QUEUE_META_COLLECTION,
2005 UnifiedEntity::new(
2006 EntityId::new(0),
2007 EntityKind::TableRow {
2008 table: Arc::from(QUEUE_META_COLLECTION),
2009 row_id: 0,
2010 },
2011 EntityData::Row(RowData {
2012 columns: Vec::new(),
2013 named: Some(fields),
2014 schema: None,
2015 }),
2016 ),
2017 )
2018 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2019 Ok(())
2020}
2021
2022pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2023 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2024 return;
2025 };
2026 let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2027 for row in rows {
2028 let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2029 }
2030}
2031
2032pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2033 let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2034}
2035
2036fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2037 format!("{queue}:{}", message_id.raw())
2038}
2039
2040pub(super) fn queue_message_lock_handle(
2041 runtime: &RedDBRuntime,
2042 queue: &str,
2043 message_id: EntityId,
2044) -> Arc<parking_lot::Mutex<()>> {
2045 let key = queue_message_lock_key(queue, message_id);
2046 if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2047 return lock;
2048 }
2049
2050 let mut locks = runtime.inner.queue_message_locks.write();
2051 locks
2052 .entry(key)
2053 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2054 .clone()
2055}
2056
2057pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2058 runtime
2059 .inner
2060 .queue_message_locks
2061 .write()
2062 .remove(&queue_message_lock_key(queue, message_id));
2063}
2064
2065fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2066 let raw = value.strip_prefix('e').unwrap_or(value);
2067 raw.parse::<u64>()
2068 .map(EntityId::new)
2069 .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2070}
2071
2072pub(super) fn resolve_ack_nack_handle(
2078 store: &UnifiedStore,
2079 queue: &str,
2080 group_hint: &str,
2081 message_id_hint: &str,
2082 delivery_id: Option<&str>,
2083) -> RedDBResult<(String, EntityId)> {
2084 if let Some(did) = delivery_id {
2085 return resolve_delivery_id(store, queue, did);
2086 }
2087 if group_hint.is_empty() || message_id_hint.is_empty() {
2088 return Err(RedDBError::Query(
2089 "ACK/NACK requires either GROUP <group> '<message_id>' or WITH delivery_id = '<id>'"
2090 .to_string(),
2091 ));
2092 }
2093 log_tuple_deprecation(queue);
2094 let entity = parse_message_id(message_id_hint)?;
2095 Ok((group_hint.to_string(), entity))
2096}
2097
2098fn resolve_delivery_id(
2099 store: &UnifiedStore,
2100 queue: &str,
2101 delivery_id: &str,
2102) -> RedDBResult<(String, EntityId)> {
2103 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2104 return Err(RedDBError::Query(format!(
2105 "delivery_id '{}' does not resolve to a live pending delivery",
2106 delivery_id
2107 )));
2108 };
2109 for entity in manager.query_all(|entity| {
2110 entity.data.as_row().is_some_and(|row| {
2111 row_text(row, "kind").as_deref() == Some("queue_pending_lc")
2112 && row_text(row, "delivery_id").as_deref() == Some(delivery_id)
2113 })
2114 }) {
2115 if let Some(row) = entity.data.as_row() {
2116 let row_queue = row_text(row, "queue").unwrap_or_default();
2117 let row_group = row_text(row, "group").unwrap_or_default();
2118 let row_message = row_u64(row, "message_id").unwrap_or(0);
2119 if row_queue != queue {
2120 return Err(RedDBError::Query(format!(
2121 "delivery_id '{}' belongs to queue '{}', not '{}'",
2122 delivery_id, row_queue, queue
2123 )));
2124 }
2125 return Ok((row_group, EntityId::new(row_message)));
2126 }
2127 }
2128 Err(RedDBError::Query(format!(
2129 "delivery_id '{}' does not resolve to a live pending delivery",
2130 delivery_id
2131 )))
2132}
2133
2134fn log_tuple_deprecation(queue: &str) {
2137 use std::sync::{Mutex, OnceLock};
2138 use std::time::Instant;
2139
2140 static LAST_EMIT: OnceLock<Mutex<HashMap<(u64, String), Instant>>> = OnceLock::new();
2141 const COOLDOWN: std::time::Duration = std::time::Duration::from_secs(60);
2142
2143 let map = LAST_EMIT.get_or_init(|| Mutex::new(HashMap::new()));
2144 let key = (super::impl_core::current_connection_id(), queue.to_string());
2145 let now = Instant::now();
2146 let mut guard = match map.lock() {
2147 Ok(g) => g,
2148 Err(_) => return,
2149 };
2150 let should_emit =
2151 !matches!(guard.get(&key), Some(prev) if now.duration_since(*prev) < COOLDOWN);
2152 if should_emit {
2153 guard.insert(key.clone(), now);
2154 drop(guard);
2155 tracing::warn!(
2156 target: "reddb::queue_lifecycle",
2157 queue = queue,
2158 connection_id = key.0,
2159 "ACK/NACK by (queue, group, message_id) tuple is deprecated; \
2160 switch to the server-issued delivery_id (ADR 0026). \
2161 The tuple path will be removed one minor release after introduction.",
2162 );
2163 }
2164}
2165
2166fn message_id_string(message_id: EntityId) -> String {
2167 message_id.raw().to_string()
2168}
2169
2170pub(crate) fn pending_counts_by_group(
2175 store: &UnifiedStore,
2176) -> std::collections::BTreeMap<(String, String), u64> {
2177 let mut counts: std::collections::BTreeMap<(String, String), u64> =
2178 std::collections::BTreeMap::new();
2179 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2180 return counts;
2181 };
2182 for entity in manager.query_all(|entity| {
2183 entity
2184 .data
2185 .as_row()
2186 .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2187 }) {
2188 if let Some(row) = entity.data.as_row() {
2189 let queue = row_text(row, "queue");
2190 let group = row_text(row, "group");
2191 if let (Some(q), Some(g)) = (queue, group) {
2192 *counts.entry((q, g)).or_insert(0) += 1;
2193 }
2194 }
2195 }
2196 counts
2197}
2198
2199pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2200 match row.get_field(field)?.clone() {
2201 Value::Text(value) => Some(value.to_string()),
2202 Value::NodeRef(value) => Some(value),
2203 Value::EdgeRef(value) => Some(value),
2204 Value::TableRef(value) => Some(value),
2205 _ => None,
2206 }
2207}
2208
2209pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2210 match row.get_field(field)?.clone() {
2211 Value::UnsignedInteger(value) => Some(value),
2212 Value::Integer(value) if value >= 0 => Some(value as u64),
2213 Value::Float(value) if value >= 0.0 => Some(value as u64),
2214 Value::Text(value) => value.parse().ok(),
2215 _ => None,
2216 }
2217}
2218
2219fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2220 match row.get_field(field)?.clone() {
2221 Value::Boolean(value) => Some(value),
2222 Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2223 "true" => Some(true),
2224 "false" => Some(false),
2225 _ => None,
2226 },
2227 _ => None,
2228 }
2229}
2230
2231fn queue_collection_contract(
2232 name: &str,
2233 priority: bool,
2234 ttl_ms: Option<u64>,
2235) -> crate::physical::CollectionContract {
2236 let now = current_unix_ms();
2237 let mut context_index_fields = Vec::new();
2238 if priority {
2239 context_index_fields.push("priority".to_string());
2240 }
2241
2242 crate::physical::CollectionContract {
2243 name: name.to_string(),
2244 declared_model: crate::catalog::CollectionModel::Queue,
2245 schema_mode: crate::catalog::SchemaMode::Dynamic,
2246 origin: crate::physical::ContractOrigin::Explicit,
2247 version: 1,
2248 created_at_unix_ms: now,
2249 updated_at_unix_ms: now,
2250 default_ttl_ms: ttl_ms,
2251 vector_dimension: None,
2252 vector_metric: None,
2253 context_index_fields,
2254 declared_columns: Vec::new(),
2255 table_def: None,
2256 timestamps_enabled: false,
2257 context_index_enabled: false,
2258 metrics_raw_retention_ms: None,
2259 metrics_rollup_policies: Vec::new(),
2260 metrics_tenant_identity: None,
2261 metrics_namespace: None,
2262 append_only: true,
2266 subscriptions: Vec::new(),
2267 session_key: None,
2268 session_gap_ms: None,
2269 retention_duration_ms: None,
2270 }
2271}
2272
2273fn current_unix_ms() -> u128 {
2274 std::time::SystemTime::now()
2275 .duration_since(std::time::UNIX_EPOCH)
2276 .unwrap_or_default()
2277 .as_millis()
2278}
2279
2280pub(super) fn now_ns() -> u64 {
2281 std::time::SystemTime::now()
2282 .duration_since(std::time::UNIX_EPOCH)
2283 .unwrap_or_default()
2284 .as_nanos() as u64
2285}
2286
2287pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2288 Metadata::with_fields(
2289 [(
2290 "_ttl_ms".to_string(),
2291 if ttl_ms <= i64::MAX as u64 {
2292 MetadataValue::Int(ttl_ms as i64)
2293 } else {
2294 MetadataValue::Timestamp(ttl_ms)
2295 },
2296 )]
2297 .into_iter()
2298 .collect(),
2299 )
2300}
2301
2302fn estimate_payload_bytes(payload: &Value) -> u64 {
2304 match payload {
2305 Value::Json(v) => v.len() as u64,
2306 Value::Text(s) => s.len() as u64,
2307 _ => 64,
2308 }
2309}