1use std::collections::{HashMap, HashSet};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
8use crate::storage::queue::QueueMode;
9use crate::storage::unified::entity::{QueueMessageData, RowData};
10use crate::storage::unified::{Metadata, MetadataValue, UnifiedStore};
11
12use super::*;
13
14pub static EVENTS_DRAIN_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
21
22pub static EVENTS_DLQ_TOTAL: AtomicU64 = AtomicU64::new(0);
24
25pub static EVENTS_ENQUEUED_TOTAL: AtomicU64 = AtomicU64::new(0);
27
28const OUTBOX_WARN_BYTES: u64 = 1 << 30;
30
31const OUTBOX_MAX_BYTES: u64 = 10 * (1 << 30);
33
34static OUTBOX_APPROX_BYTES: AtomicU64 = AtomicU64::new(0);
36
37const QUEUE_META_COLLECTION: &str = "red_queue_meta";
38const QUEUE_POSITION_CENTER: u64 = u64::MAX / 2;
39const WORK_DEFAULT_GROUP: &str = "_work_default";
40const FANOUT_GROUP_PREFIX: &str = "_fanout_";
41
42#[derive(Debug, Clone)]
43pub(super) struct QueueRuntimeConfig {
44 pub(super) mode: QueueMode,
45 pub(super) priority: bool,
46 pub(super) max_size: Option<usize>,
47 pub(super) ttl_ms: Option<u64>,
48 pub(super) dlq: Option<String>,
49 pub(super) max_attempts: u32,
50 pub(super) lock_deadline_ms: u64,
51 pub(super) in_flight_cap_per_group: u32,
52}
53
54#[derive(Debug, Clone)]
55struct QueueGroupEntry {
56 entity_id: EntityId,
57 group: String,
58}
59
60#[derive(Debug, Clone)]
61pub(super) struct QueuePendingEntry {
62 pub(super) entity_id: EntityId,
63 group: String,
64 pub(super) message_id: EntityId,
65 consumer: String,
66 pub(super) delivered_at_ns: u64,
67 pub(super) delivery_count: u32,
68}
69
70#[derive(Debug, Clone)]
71pub(super) struct QueueAckEntry {
72 entity_id: EntityId,
73 group: String,
74 pub(super) message_id: EntityId,
75}
76
77#[derive(Debug, Clone)]
78pub(super) struct QueueMessageView {
79 pub(super) id: EntityId,
80 position: u64,
81 priority: i32,
82 pub(super) payload: Value,
83 attempts: u32,
84 pub(super) max_attempts: u32,
85 enqueued_at_ns: u64,
86}
87
88impl RedDBRuntime {
89 pub(crate) fn enqueue_event_payload(
90 &self,
91 queue: &str,
92 payload: Value,
93 ) -> RedDBResult<EntityId> {
94 let store = self.inner.db.store();
95 if store.get_collection(queue).is_none() {
97 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, queue)?;
98 }
99
100 let payload_bytes = estimate_payload_bytes(&payload);
102 let outbox_bytes = OUTBOX_APPROX_BYTES.fetch_add(payload_bytes, Ordering::Relaxed);
103
104 if outbox_bytes > OUTBOX_MAX_BYTES {
106 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
107 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
108 return self.route_event_to_outbox_dlq(queue, payload, "outbox_max_bytes_exceeded");
109 }
110
111 if outbox_bytes > OUTBOX_WARN_BYTES && outbox_bytes - payload_bytes <= OUTBOX_WARN_BYTES {
113 tracing::warn!(
114 outbox_bytes,
115 warn_threshold = OUTBOX_WARN_BYTES,
116 "event outbox approaching capacity warning threshold"
117 );
118 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
119 queue: queue.to_string(),
120 dlq: format!("{queue}_outbox_dlq"),
121 reason: "outbox_warn_bytes_exceeded".to_string(),
122 }
123 .emit_global();
124 }
125
126 let config = load_queue_config(store.as_ref(), queue);
127
128 if let Some(max_size) = config.max_size {
130 let current_len = load_queue_message_views(store.as_ref(), queue)
131 .unwrap_or_default()
132 .len();
133 if current_len >= max_size {
134 OUTBOX_APPROX_BYTES.fetch_sub(payload_bytes, Ordering::Relaxed);
135 EVENTS_DRAIN_RETRIES_TOTAL.fetch_add(1, Ordering::Relaxed);
136 return self.route_event_to_outbox_dlq(queue, payload, "queue_full");
137 }
138 if current_len * 10 >= max_size * 8 {
140 tracing::warn!(
141 queue = %queue,
142 size = current_len,
143 max = max_size,
144 "event target queue near capacity"
145 );
146 }
147 }
148
149 let id = self.enqueue_event_payload_raw(store.as_ref(), queue, &config, payload)?;
150 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
151 Ok(id)
152 }
153
154 fn route_event_to_outbox_dlq(
156 &self,
157 queue: &str,
158 payload: Value,
159 reason: &str,
160 ) -> RedDBResult<EntityId> {
161 let dlq_name = format!("{queue}_outbox_dlq");
162 EVENTS_DLQ_TOTAL.fetch_add(1, Ordering::Relaxed);
163
164 crate::telemetry::operator_event::OperatorEvent::OutboxDlqActivated {
165 queue: queue.to_string(),
166 dlq: dlq_name.clone(),
167 reason: reason.to_string(),
168 }
169 .emit_global();
170
171 let store = self.inner.db.store();
172 if store.get_collection(&dlq_name).is_none() {
173 crate::runtime::impl_ddl::ensure_event_target_queue_pub(self, &dlq_name)?;
174 }
175 let dlq_config = load_queue_config(store.as_ref(), &dlq_name);
176 let id = self.enqueue_event_payload_raw(store.as_ref(), &dlq_name, &dlq_config, payload)?;
177 EVENTS_ENQUEUED_TOTAL.fetch_add(1, Ordering::Relaxed);
178 Ok(id)
179 }
180
181 fn enqueue_event_payload_raw(
183 &self,
184 store: &UnifiedStore,
185 queue: &str,
186 config: &QueueRuntimeConfig,
187 payload: Value,
188 ) -> RedDBResult<EntityId> {
189 let position = next_queue_position(store, queue, QueueSide::Right)?;
190 let mut entity = UnifiedEntity::new(
191 EntityId::new(0),
192 EntityKind::QueueMessage {
193 queue: queue.to_string(),
194 position,
195 },
196 EntityData::QueueMessage(QueueMessageData {
197 payload,
198 priority: None,
199 enqueued_at_ns: now_ns(),
200 attempts: 0,
201 max_attempts: config.max_attempts,
202 acked: false,
203 }),
204 );
205 if let Some(xid) = self.current_xid() {
206 entity.set_xmin(xid);
207 }
208 let id = store
209 .insert_auto(queue, entity)
210 .map_err(|err| RedDBError::Internal(err.to_string()))?;
211 if let Some(ttl_ms) = config.ttl_ms {
212 store
213 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
214 .map_err(|err| RedDBError::Internal(err.to_string()))?;
215 }
216 self.invalidate_result_cache_for_table(queue);
217 Ok(id)
218 }
219
220 pub fn execute_create_queue(
221 &self,
222 raw_query: &str,
223 query: &CreateQueueQuery,
224 ) -> RedDBResult<RuntimeQueryResult> {
225 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
226 if query.dlq.as_deref() == Some(query.name.as_str()) {
227 return Err(RedDBError::Query(
228 "dead-letter queue must be different from the source queue".to_string(),
229 ));
230 }
231
232 let store = self.inner.db.store();
233 let exists = store.get_collection(&query.name).is_some();
234 if exists {
235 if query.if_not_exists {
236 return Ok(RuntimeQueryResult::ok_message(
237 raw_query.to_string(),
238 &format!("queue '{}' already exists", query.name),
239 "create",
240 ));
241 }
242 return Err(RedDBError::Query(format!(
243 "queue '{}' already exists",
244 query.name
245 )));
246 }
247
248 store
249 .create_collection(&query.name)
250 .map_err(|err| RedDBError::Internal(err.to_string()))?;
251 if let Some(ttl_ms) = query.ttl_ms {
252 self.inner
253 .db
254 .set_collection_default_ttl_ms(&query.name, ttl_ms);
255 }
256 self.inner
257 .db
258 .save_collection_contract(queue_collection_contract(
259 &query.name,
260 query.priority,
261 query.ttl_ms,
262 ))
263 .map_err(|err| RedDBError::Internal(err.to_string()))?;
264 save_queue_config(
265 store.as_ref(),
266 &query.name,
267 &QueueRuntimeConfig {
268 mode: query.mode,
269 priority: query.priority,
270 max_size: query.max_size,
271 ttl_ms: query.ttl_ms,
272 dlq: query.dlq.clone(),
273 max_attempts: query.max_attempts,
274 lock_deadline_ms: query.lock_deadline_ms,
275 in_flight_cap_per_group: query.in_flight_cap_per_group,
276 },
277 )?;
278
279 if let Some(dlq) = &query.dlq {
280 if store.get_collection(dlq).is_none() {
281 store
282 .create_collection(dlq)
283 .map_err(|err| RedDBError::Internal(err.to_string()))?;
284 self.inner
285 .db
286 .save_collection_contract(queue_collection_contract(dlq, false, None))
287 .map_err(|err| RedDBError::Internal(err.to_string()))?;
288 }
289 }
290
291 self.invalidate_result_cache();
292 self.inner
293 .db
294 .persist_metadata()
295 .map_err(|err| RedDBError::Internal(err.to_string()))?;
296 let mut type_tags = Vec::new();
301 if let Some(dlq) = &query.dlq {
302 type_tags.push(format!("dlq:{}", dlq));
303 }
304 self.schema_vocabulary_apply(
305 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
306 collection: query.name.clone(),
307 columns: vec!["payload".to_string()],
308 type_tags,
309 description: None,
310 },
311 );
312
313 let mut msg = format!("queue '{}' created", query.name);
314 msg.push_str(&format!(" (mode={})", query.mode.as_str()));
315 if query.priority {
316 msg.push_str(" (priority)");
317 }
318 if let Some(max_size) = query.max_size {
319 msg.push_str(&format!(" (max_size={max_size})"));
320 }
321 if let Some(ttl_ms) = query.ttl_ms {
322 msg.push_str(&format!(" (ttl={ttl_ms}ms)"));
323 }
324 if let Some(dlq) = &query.dlq {
325 msg.push_str(&format!(
326 " (dlq={dlq}, max_attempts={})",
327 query.max_attempts
328 ));
329 }
330
331 Ok(RuntimeQueryResult::ok_message(
332 raw_query.to_string(),
333 &msg,
334 "create",
335 ))
336 }
337
338 pub fn execute_alter_queue(
339 &self,
340 raw_query: &str,
341 query: &AlterQueueQuery,
342 ) -> RedDBResult<RuntimeQueryResult> {
343 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
344 let store = self.inner.db.store();
345 ensure_queue_exists(store.as_ref(), &query.name)?;
346
347 let mut config = load_queue_config(store.as_ref(), &query.name);
348 let mut summary: Vec<String> = Vec::new();
349
350 if let Some(new_mode) = query.mode {
351 let pending =
352 load_pending_entries(store.as_ref(), &query.name, None, None).unwrap_or_default();
353 if !pending.is_empty() {
354 tracing::warn!(
355 queue = %query.name,
356 pending_count = pending.len(),
357 new_mode = %new_mode.as_str(),
358 "ALTER QUEUE SET MODE: {} in-flight messages will drain with old mode; \
359 new reads use {}",
360 pending.len(),
361 new_mode.as_str(),
362 );
363 }
364 config.mode = new_mode;
365 summary.push(format!("mode={}", new_mode.as_str()));
366 }
367 if let Some(max_attempts) = query.max_attempts {
368 config.max_attempts = max_attempts;
369 summary.push(format!("max_attempts={max_attempts}"));
370 }
371 if let Some(lock_deadline_ms) = query.lock_deadline_ms {
372 config.lock_deadline_ms = lock_deadline_ms;
373 summary.push(format!("lock_deadline_ms={lock_deadline_ms}"));
374 }
375 if let Some(in_flight_cap) = query.in_flight_cap_per_group {
376 config.in_flight_cap_per_group = in_flight_cap;
377 summary.push(format!("in_flight_cap_per_group={in_flight_cap}"));
378 }
379 if let Some(dlq) = &query.dlq {
380 if dlq == &query.name {
381 return Err(RedDBError::Query(
382 "dead-letter queue must be different from the source queue".to_string(),
383 ));
384 }
385 config.dlq = Some(dlq.clone());
386 summary.push(format!("dlq={dlq}"));
387 }
388
389 save_queue_config(store.as_ref(), &query.name, &config)?;
390
391 self.invalidate_result_cache();
392 self.inner
393 .db
394 .persist_metadata()
395 .map_err(|err| RedDBError::Internal(err.to_string()))?;
396
397 Ok(RuntimeQueryResult::ok_message(
398 raw_query.to_string(),
399 &format!("queue '{}' altered: {}", query.name, summary.join(", ")),
400 "alter",
401 ))
402 }
403
404 pub fn execute_drop_queue(
405 &self,
406 raw_query: &str,
407 query: &DropQueueQuery,
408 ) -> RedDBResult<RuntimeQueryResult> {
409 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
410 let store = self.inner.db.store();
411 if super::impl_ddl::is_system_schema_name(&query.name) {
412 return Err(RedDBError::Query("system schema is read-only".to_string()));
413 }
414 if store.get_collection(&query.name).is_none() {
415 if query.if_exists {
416 return Ok(RuntimeQueryResult::ok_message(
417 raw_query.to_string(),
418 &format!("queue '{}' does not exist", query.name),
419 "drop",
420 ));
421 }
422 return Err(RedDBError::NotFound(format!(
423 "queue '{}' not found",
424 query.name
425 )));
426 }
427 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
428 &query.name,
429 &self.inner.db.catalog_model_snapshot(),
430 )?;
431 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
432 crate::catalog::CollectionModel::Queue,
433 actual,
434 )?;
435
436 store
437 .drop_collection(&query.name)
438 .map_err(|err| RedDBError::Internal(err.to_string()))?;
439 self.inner.db.clear_collection_default_ttl_ms(&query.name);
440 self.inner
441 .db
442 .remove_collection_contract(&query.name)
443 .map_err(|err| RedDBError::Internal(err.to_string()))?;
444 remove_queue_metadata(store.as_ref(), &query.name);
445 self.invalidate_result_cache();
446 self.inner
447 .db
448 .persist_metadata()
449 .map_err(|err| RedDBError::Internal(err.to_string()))?;
450 self.schema_vocabulary_apply(
452 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
453 collection: query.name.clone(),
454 },
455 );
456
457 Ok(RuntimeQueryResult::ok_message(
458 raw_query.to_string(),
459 &format!("queue '{}' dropped", query.name),
460 "drop",
461 ))
462 }
463
464 pub fn execute_queue_command(
465 &self,
466 raw_query: &str,
467 cmd: &QueueCommand,
468 ) -> RedDBResult<RuntimeQueryResult> {
469 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
470 match cmd {
471 QueueCommand::Push {
472 queue,
473 value,
474 side,
475 priority,
476 } => {
477 let store = self.inner.db.store();
478 ensure_queue_exists(store.as_ref(), queue)?;
479 let config = load_queue_config(store.as_ref(), queue);
480 if priority.is_some() && !config.priority {
481 return Err(RedDBError::Query(format!(
482 "queue '{}' is not a priority queue",
483 queue
484 )));
485 }
486 if let Some(max_size) = config.max_size {
487 let current_len =
488 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?
489 .len();
490 if current_len >= max_size {
491 return Err(RedDBError::Query(format!(
492 "queue '{}' is full (max_size={max_size})",
493 queue
494 )));
495 }
496 }
497
498 let position = next_queue_position(store.as_ref(), queue, *side)?;
499 let mut entity = UnifiedEntity::new(
500 EntityId::new(0),
501 EntityKind::QueueMessage {
502 queue: queue.clone(),
503 position,
504 },
505 EntityData::QueueMessage(QueueMessageData {
506 payload: value.clone(),
507 priority: if config.priority { *priority } else { None },
508 enqueued_at_ns: now_ns(),
509 attempts: 0,
510 max_attempts: config.max_attempts,
511 acked: false,
512 }),
513 );
514 if let Some(xid) = self.current_xid() {
517 entity.set_xmin(xid);
518 }
519 let id = store
520 .insert_auto(queue, entity)
521 .map_err(|err| RedDBError::Internal(err.to_string()))?;
522 if let Some(ttl_ms) = config.ttl_ms {
523 store
524 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
525 .map_err(|err| RedDBError::Internal(err.to_string()))?;
526 }
527 self.invalidate_result_cache();
528
529 let mut result = UnifiedResult::with_columns(vec![
530 "message_id".into(),
531 "side".into(),
532 "queue".into(),
533 ]);
534 let mut record = UnifiedRecord::new();
535 record.set("message_id", Value::text(message_id_string(id)));
536 record.set(
537 "side",
538 Value::text(match side {
539 QueueSide::Left => "left".to_string(),
540 QueueSide::Right => "right".to_string(),
541 }),
542 );
543 record.set("queue", Value::text(queue.clone()));
544 result.push(record);
545
546 Ok(RuntimeQueryResult {
547 query: raw_query.to_string(),
548 mode: QueryMode::Sql,
549 statement: "queue_push",
550 engine: "runtime-queue",
551 result,
552 affected_rows: 1,
553 statement_type: "insert",
554 })
555 }
556 QueueCommand::Pop { queue, side, count } => {
557 let store = self.inner.db.store();
558 ensure_queue_exists(store.as_ref(), queue)?;
559 let popped = super::queue_delivery::pop_messages(
560 self,
561 store.as_ref(),
562 queue,
563 *side,
564 *count,
565 )?;
566
567 let mut result =
568 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
569 for message in &popped {
570 let mut record = UnifiedRecord::new();
571 record.set(
572 "message_id",
573 Value::text(message_id_string(message.message_id)),
574 );
575 record.set("payload", message.payload.clone());
576 result.push(record);
577 }
578 let popped_count = popped.len() as u64;
579 if popped_count > 0 {
580 self.invalidate_result_cache();
581 }
582
583 Ok(RuntimeQueryResult {
584 query: raw_query.to_string(),
585 mode: QueryMode::Sql,
586 statement: "queue_pop",
587 engine: "runtime-queue",
588 result,
589 affected_rows: popped_count,
590 statement_type: "delete",
591 })
592 }
593 QueueCommand::Peek { queue, count } => {
594 let store = self.inner.db.store();
595 ensure_queue_exists(store.as_ref(), queue)?;
596 let messages =
597 super::queue_delivery::peek_messages(self, store.as_ref(), queue, *count)?;
598
599 let mut result =
600 UnifiedResult::with_columns(vec!["message_id".into(), "payload".into()]);
601 for message in messages {
602 let mut record = UnifiedRecord::new();
603 record.set(
604 "message_id",
605 Value::text(message_id_string(message.message_id)),
606 );
607 record.set("payload", message.payload);
608 result.push(record);
609 }
610
611 Ok(RuntimeQueryResult {
612 query: raw_query.to_string(),
613 mode: QueryMode::Sql,
614 statement: "queue_peek",
615 engine: "runtime-queue",
616 result,
617 affected_rows: 0,
618 statement_type: "select",
619 })
620 }
621 QueueCommand::Len { queue } => {
622 let store = self.inner.db.store();
623 ensure_queue_exists(store.as_ref(), queue)?;
624 let count =
625 load_queue_message_views_with_runtime(Some(self), store.as_ref(), queue)?.len()
626 as u64;
627 let mut result = UnifiedResult::with_columns(vec!["len".into()]);
628 let mut record = UnifiedRecord::new();
629 record.set("len", Value::UnsignedInteger(count));
630 result.push(record);
631
632 Ok(RuntimeQueryResult {
633 query: raw_query.to_string(),
634 mode: QueryMode::Sql,
635 statement: "queue_len",
636 engine: "runtime-queue",
637 result,
638 affected_rows: 0,
639 statement_type: "select",
640 })
641 }
642 QueueCommand::Purge { queue } => {
643 let store = self.inner.db.store();
644 ensure_queue_exists(store.as_ref(), queue)?;
645 let count = super::queue_delivery::purge_messages(self, store.as_ref(), queue)?;
646 if count > 0 {
647 self.invalidate_result_cache();
648 }
649
650 Ok(RuntimeQueryResult::ok_message(
651 raw_query.to_string(),
652 &format!("{count} messages purged from queue '{queue}'"),
653 "delete",
654 ))
655 }
656 QueueCommand::GroupCreate { queue, group } => {
657 let store = self.inner.db.store();
658 ensure_queue_exists(store.as_ref(), queue)?;
659 if queue_group_exists(store.as_ref(), queue, group)? {
660 return Ok(RuntimeQueryResult::ok_message(
661 raw_query.to_string(),
662 &format!(
663 "consumer group '{}' already exists on queue '{}'",
664 group, queue
665 ),
666 "create",
667 ));
668 }
669 save_queue_group(store.as_ref(), queue, group)?;
670 self.invalidate_result_cache();
671
672 Ok(RuntimeQueryResult::ok_message(
673 raw_query.to_string(),
674 &format!("consumer group '{}' created on queue '{}'", group, queue),
675 "create",
676 ))
677 }
678 QueueCommand::GroupRead {
679 queue,
680 group,
681 consumer,
682 count,
683 } => {
684 let store = self.inner.db.store();
685 ensure_queue_exists(store.as_ref(), queue)?;
686 let delivered = super::queue_delivery::read_messages(
687 self,
688 store.as_ref(),
689 queue,
690 group.as_deref(),
691 consumer,
692 *count,
693 )?;
694
695 let mut result = UnifiedResult::with_columns(vec![
696 "message_id".into(),
697 "payload".into(),
698 "consumer".into(),
699 "delivery_count".into(),
700 "attempts".into(),
701 ]);
702
703 for message in delivered {
704 let mut record = UnifiedRecord::new();
705 record.set(
706 "message_id",
707 Value::text(message_id_string(message.message_id)),
708 );
709 record.set("payload", message.payload);
710 record.set("consumer", Value::text(message.consumer));
711 record.set(
712 "delivery_count",
713 Value::UnsignedInteger(u64::from(message.delivery_count)),
714 );
715 record.set(
716 "attempts",
717 Value::UnsignedInteger(u64::from(message.delivery_count)),
718 );
719 result.push(record);
720 }
721 if !result.records.is_empty() {
722 self.invalidate_result_cache();
723 }
724
725 Ok(RuntimeQueryResult {
726 query: raw_query.to_string(),
727 mode: QueryMode::Sql,
728 statement: "queue_group_read",
729 engine: "runtime-queue",
730 result,
731 affected_rows: 0,
732 statement_type: "select",
733 })
734 }
735 QueueCommand::Pending { queue, group } => {
736 let store = self.inner.db.store();
737 ensure_queue_exists(store.as_ref(), queue)?;
738 require_queue_group(store.as_ref(), queue, group)?;
739 let mut pending = load_pending_entries(store.as_ref(), queue, Some(group), None)?;
740 pending.sort_by_key(|entry| entry.delivered_at_ns);
741 let current_time_ns = now_ns();
742
743 let mut result = UnifiedResult::with_columns(vec![
744 "message_id".into(),
745 "consumer".into(),
746 "delivered_at_ns".into(),
747 "delivery_count".into(),
748 "idle_ms".into(),
749 ]);
750 for entry in pending {
751 let mut record = UnifiedRecord::new();
752 record.set(
753 "message_id",
754 Value::text(message_id_string(entry.message_id)),
755 );
756 record.set("consumer", Value::text(entry.consumer));
757 record.set(
758 "delivered_at_ns",
759 Value::UnsignedInteger(entry.delivered_at_ns),
760 );
761 record.set(
762 "delivery_count",
763 Value::UnsignedInteger(u64::from(entry.delivery_count)),
764 );
765 record.set(
766 "idle_ms",
767 Value::UnsignedInteger(
768 current_time_ns.saturating_sub(entry.delivered_at_ns) / 1_000_000,
769 ),
770 );
771 result.push(record);
772 }
773
774 Ok(RuntimeQueryResult {
775 query: raw_query.to_string(),
776 mode: QueryMode::Sql,
777 statement: "queue_pending",
778 engine: "runtime-queue",
779 result,
780 affected_rows: 0,
781 statement_type: "select",
782 })
783 }
784 QueueCommand::Claim {
785 queue,
786 group,
787 consumer,
788 min_idle_ms,
789 } => {
790 let store = self.inner.db.store();
791 ensure_queue_exists(store.as_ref(), queue)?;
792 let delivered = super::queue_delivery::claim_messages(
793 self,
794 store.as_ref(),
795 queue,
796 group,
797 consumer,
798 *min_idle_ms,
799 )?;
800
801 let mut result = UnifiedResult::with_columns(vec![
802 "message_id".into(),
803 "payload".into(),
804 "consumer".into(),
805 "delivery_count".into(),
806 ]);
807
808 for message in delivered {
809 let mut record = UnifiedRecord::new();
810 record.set(
811 "message_id",
812 Value::text(message_id_string(message.message_id)),
813 );
814 record.set("payload", message.payload);
815 record.set("consumer", Value::text(message.consumer));
816 record.set(
817 "delivery_count",
818 Value::UnsignedInteger(u64::from(message.delivery_count)),
819 );
820 result.push(record);
821 }
822 if !result.records.is_empty() {
823 self.invalidate_result_cache();
824 }
825 let affected_rows = result.records.len() as u64;
826
827 Ok(RuntimeQueryResult {
828 query: raw_query.to_string(),
829 mode: QueryMode::Sql,
830 statement: "queue_claim",
831 engine: "runtime-queue",
832 result,
833 affected_rows,
834 statement_type: "update",
835 })
836 }
837 QueueCommand::Ack {
838 queue,
839 group,
840 message_id,
841 } => {
842 let store = self.inner.db.store();
843 ensure_queue_exists(store.as_ref(), queue)?;
844 require_queue_group(store.as_ref(), queue, group)?;
845 let message_id = parse_message_id(message_id)?;
846 let config = load_queue_config(store.as_ref(), queue);
847 super::queue_delivery::ack_message(
848 self,
849 store.as_ref(),
850 queue,
851 group,
852 message_id,
853 &config,
854 )?;
855 self.invalidate_result_cache();
856
857 Ok(RuntimeQueryResult::ok_message(
858 raw_query.to_string(),
859 "message acknowledged",
860 "update",
861 ))
862 }
863 QueueCommand::Nack {
864 queue,
865 group,
866 message_id,
867 } => {
868 let store = self.inner.db.store();
869 ensure_queue_exists(store.as_ref(), queue)?;
870 require_queue_group(store.as_ref(), queue, group)?;
871 let message_id = parse_message_id(message_id)?;
872 let config = load_queue_config(store.as_ref(), queue);
873 let message = match super::queue_delivery::nack_message(
874 self,
875 store.as_ref(),
876 queue,
877 group,
878 message_id,
879 &config,
880 )? {
881 super::queue_delivery::NackOutcome::Requeued => "message requeued".to_string(),
882 super::queue_delivery::NackOutcome::MovedToDlq(dlq) => {
883 format!("message moved to dead-letter queue '{}'", dlq)
884 }
885 super::queue_delivery::NackOutcome::Dropped => {
886 "message dropped after max attempts".to_string()
887 }
888 };
889 self.invalidate_result_cache();
890
891 Ok(RuntimeQueryResult::ok_message(
892 raw_query.to_string(),
893 &message,
894 "update",
895 ))
896 }
897 QueueCommand::Move {
898 source,
899 destination,
900 filter,
901 limit,
902 } => self.execute_queue_move(raw_query, source, destination, filter.as_ref(), *limit),
903 }
904 }
905
906 pub fn execute_queue_select(
907 &self,
908 raw_query: &str,
909 query: &QueueSelectQuery,
910 ) -> RedDBResult<RuntimeQueryResult> {
911 let store = self.inner.db.store();
912 ensure_queue_exists(store.as_ref(), &query.queue)?;
913 let config = load_queue_config(store.as_ref(), &query.queue);
914 let dlq = queue_is_dead_letter_target(store.as_ref(), &query.queue);
915 let columns = if query.columns.is_empty() {
916 queue_projection_default_columns()
917 } else {
918 query.columns.clone()
919 };
920
921 let mut messages =
922 load_queue_message_views_with_runtime(Some(self), store.as_ref(), &query.queue)?;
923 sort_queue_messages(&mut messages, &config, QueueSide::Left);
924
925 let mut result = UnifiedResult::with_columns(columns.clone());
926 for message in messages {
927 if query
928 .filter
929 .as_ref()
930 .is_some_and(|filter| !queue_message_matches_filter(&message, dlq, filter))
931 {
932 continue;
933 }
934 let record = queue_projection_record(&columns, &message, dlq)?;
935 result.push(record);
936 if query
937 .limit
938 .is_some_and(|limit| result.records.len() >= limit as usize)
939 {
940 break;
941 }
942 }
943
944 Ok(RuntimeQueryResult {
945 query: raw_query.to_string(),
946 mode: QueryMode::Sql,
947 statement: "queue_select",
948 engine: "runtime-queue",
949 result,
950 affected_rows: 0,
951 statement_type: "select",
952 })
953 }
954
955 fn execute_queue_move(
956 &self,
957 raw_query: &str,
958 source: &str,
959 destination: &str,
960 filter: Option<&Filter>,
961 limit: usize,
962 ) -> RedDBResult<RuntimeQueryResult> {
963 if source == destination {
964 return Err(RedDBError::Query(
965 "QUEUE MOVE source and destination must be different".to_string(),
966 ));
967 }
968 let store = self.inner.db.store();
969 ensure_queue_exists(store.as_ref(), source)?;
970 ensure_queue_exists(store.as_ref(), destination)?;
971 let source_config = load_queue_config(store.as_ref(), source);
972 let destination_config = load_queue_config(store.as_ref(), destination);
973 let source_dlq = queue_is_dead_letter_target(store.as_ref(), source);
974
975 let mut messages =
976 load_queue_message_views_with_runtime(Some(self), store.as_ref(), source)?;
977 sort_queue_messages(&mut messages, &source_config, QueueSide::Left);
978 let selected = messages
979 .into_iter()
980 .filter(|message| {
981 filter
982 .map(|f| queue_message_matches_filter(message, source_dlq, f))
983 .unwrap_or(true)
984 })
985 .take(limit)
986 .collect::<Vec<_>>();
987
988 if let Some(max_size) = destination_config.max_size {
989 let current_len =
990 load_queue_message_views_with_runtime(Some(self), store.as_ref(), destination)?
991 .len();
992 if current_len + selected.len() > max_size {
993 return Err(RedDBError::Query(format!(
994 "queue '{}' is full (max_size={max_size})",
995 destination
996 )));
997 }
998 }
999
1000 for message in &selected {
1001 let lock = queue_message_lock_handle(self, source, message.id);
1002 let Some(_guard) = lock.try_lock() else {
1003 return Err(RedDBError::Query(format!(
1004 "message '{}' is locked on queue '{}'",
1005 message.id.raw(),
1006 source
1007 )));
1008 };
1009 if queue_message_view_by_id(store.as_ref(), source, message.id)?.is_none() {
1010 return Err(RedDBError::Query(format!(
1011 "message '{}' is no longer available on queue '{}'",
1012 message.id.raw(),
1013 source
1014 )));
1015 }
1016 }
1017
1018 let mut inserted = Vec::new();
1019 for message in &selected {
1020 match insert_moved_queue_message(
1021 store.as_ref(),
1022 destination,
1023 &destination_config,
1024 message,
1025 ) {
1026 Ok(id) => inserted.push(id),
1027 Err(err) => {
1028 for id in inserted {
1029 let _ = store.delete(destination, id);
1030 }
1031 return Err(err);
1032 }
1033 }
1034 }
1035
1036 for message in &selected {
1037 super::queue_delivery::delete_message_with_state(
1038 Some(self),
1039 store.as_ref(),
1040 source,
1041 message.id,
1042 )?;
1043 }
1044 if !selected.is_empty() {
1045 self.invalidate_result_cache();
1046 }
1047
1048 let selected_count = selected.len() as u64;
1049 self.audit_log().record_event(
1050 AuditEvent::builder("queue/move")
1051 .source(AuditAuthSource::System)
1052 .outcome(Outcome::Success)
1053 .resource(format!("queue:{source}->{destination}"))
1054 .fields([
1055 AuditFieldEscaper::field("source", source),
1056 AuditFieldEscaper::field("destination", destination),
1057 AuditFieldEscaper::field("selected", selected_count),
1058 AuditFieldEscaper::field("committed", selected_count),
1059 ])
1060 .build(),
1061 );
1062
1063 let mut result = UnifiedResult::with_columns(vec![
1064 "source".into(),
1065 "destination".into(),
1066 "selected".into(),
1067 "committed".into(),
1068 ]);
1069 let mut record = UnifiedRecord::new();
1070 record.set("source", Value::text(source.to_string()));
1071 record.set("destination", Value::text(destination.to_string()));
1072 record.set("selected", Value::UnsignedInteger(selected_count));
1073 record.set("committed", Value::UnsignedInteger(selected_count));
1074 result.push(record);
1075
1076 Ok(RuntimeQueryResult {
1077 query: raw_query.to_string(),
1078 mode: QueryMode::Sql,
1079 statement: "queue_move",
1080 engine: "runtime-queue",
1081 result,
1082 affected_rows: selected_count,
1083 statement_type: "update",
1084 })
1085 }
1086}
1087
1088fn ensure_queue_exists(store: &UnifiedStore, queue: &str) -> RedDBResult<()> {
1089 if store.get_collection(queue).is_some() {
1090 Ok(())
1091 } else {
1092 Err(RedDBError::NotFound(format!("queue '{}' not found", queue)))
1093 }
1094}
1095
1096pub(super) fn load_queue_config(store: &UnifiedStore, queue: &str) -> QueueRuntimeConfig {
1097 let default = QueueRuntimeConfig {
1098 mode: QueueMode::Work,
1099 priority: false,
1100 max_size: None,
1101 ttl_ms: None,
1102 dlq: None,
1103 max_attempts: crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS,
1104 lock_deadline_ms: crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS,
1105 in_flight_cap_per_group: crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP,
1106 };
1107
1108 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1109 return default;
1110 };
1111 manager
1112 .query_all(|entity| {
1113 entity.data.as_row().is_some_and(|row| {
1114 row_text(row, "kind").as_deref() == Some("queue_config")
1115 && row_text(row, "queue").as_deref() == Some(queue)
1116 })
1117 })
1118 .into_iter()
1119 .find_map(|entity| {
1120 let row = entity.data.as_row()?;
1121 Some(QueueRuntimeConfig {
1122 mode: row_text(row, "mode")
1123 .as_deref()
1124 .and_then(QueueMode::parse)
1125 .unwrap_or_default(),
1126 priority: row_bool(row, "priority").unwrap_or(false),
1127 max_size: row_u64(row, "max_size").map(|value| value as usize),
1128 ttl_ms: row_u64(row, "ttl_ms"),
1129 dlq: row_text(row, "dlq"),
1130 max_attempts: row_u64(row, "max_attempts")
1131 .map(|value| value as u32)
1132 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
1133 lock_deadline_ms: row_u64(row, "lock_deadline_ms")
1134 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
1135 in_flight_cap_per_group: row_u64(row, "in_flight_cap_per_group")
1136 .map(|value| value as u32)
1137 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
1138 })
1139 })
1140 .unwrap_or(default)
1141}
1142
1143pub(super) fn queue_mode_str(store: &UnifiedStore, queue: &str) -> &'static str {
1144 load_queue_config(store, queue).mode.as_str()
1145}
1146
1147fn save_queue_config(
1148 store: &UnifiedStore,
1149 queue: &str,
1150 config: &QueueRuntimeConfig,
1151) -> RedDBResult<()> {
1152 remove_meta_rows(store, |row| {
1153 row_text(row, "kind").as_deref() == Some("queue_config")
1154 && row_text(row, "queue").as_deref() == Some(queue)
1155 });
1156
1157 let mut fields = HashMap::new();
1158 fields.insert("kind".to_string(), Value::text("queue_config".to_string()));
1159 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1160 fields.insert(
1161 "mode".to_string(),
1162 Value::text(config.mode.as_str().to_string()),
1163 );
1164 fields.insert("priority".to_string(), Value::Boolean(config.priority));
1165 fields.insert(
1166 "max_size".to_string(),
1167 config
1168 .max_size
1169 .map(|value| Value::UnsignedInteger(value as u64))
1170 .unwrap_or(Value::Null),
1171 );
1172 fields.insert(
1173 "ttl_ms".to_string(),
1174 config
1175 .ttl_ms
1176 .map(Value::UnsignedInteger)
1177 .unwrap_or(Value::Null),
1178 );
1179 fields.insert(
1180 "dlq".to_string(),
1181 config.dlq.clone().map(Value::text).unwrap_or(Value::Null),
1182 );
1183 fields.insert(
1184 "max_attempts".to_string(),
1185 Value::UnsignedInteger(u64::from(config.max_attempts)),
1186 );
1187 fields.insert(
1188 "lock_deadline_ms".to_string(),
1189 Value::UnsignedInteger(config.lock_deadline_ms),
1190 );
1191 fields.insert(
1192 "in_flight_cap_per_group".to_string(),
1193 Value::UnsignedInteger(u64::from(config.in_flight_cap_per_group)),
1194 );
1195 insert_meta_row(store, fields)
1196}
1197
1198fn remove_queue_metadata(store: &UnifiedStore, queue: &str) {
1199 remove_meta_rows(store, |row| {
1200 row_text(row, "queue").as_deref() == Some(queue)
1201 });
1202}
1203
1204fn queue_group_exists(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<bool> {
1205 Ok(load_queue_groups(store, queue)?
1206 .into_iter()
1207 .any(|entry| entry.group == group))
1208}
1209
1210pub(super) fn require_queue_group(
1211 store: &UnifiedStore,
1212 queue: &str,
1213 group: &str,
1214) -> RedDBResult<()> {
1215 if queue_group_exists(store, queue, group)? {
1216 Ok(())
1217 } else {
1218 Err(RedDBError::NotFound(format!(
1219 "consumer group '{}' not found on queue '{}'",
1220 group, queue
1221 )))
1222 }
1223}
1224
1225pub(super) fn resolve_read_group(
1226 store: &UnifiedStore,
1227 queue: &str,
1228 group: Option<&str>,
1229 consumer: &str,
1230 config: &QueueRuntimeConfig,
1231) -> RedDBResult<String> {
1232 if let Some(group) = group {
1233 require_queue_group(store, queue, group)?;
1234 return Ok(group.to_string());
1235 }
1236
1237 match config.mode {
1238 QueueMode::Work => {
1239 if !queue_group_exists(store, queue, WORK_DEFAULT_GROUP)? {
1240 save_queue_group(store, queue, WORK_DEFAULT_GROUP)?;
1241 }
1242 Ok(WORK_DEFAULT_GROUP.to_string())
1243 }
1244 QueueMode::Fanout => {
1245 let fanout_group = format!("{FANOUT_GROUP_PREFIX}{consumer}");
1246 if !queue_group_exists(store, queue, &fanout_group)? {
1247 save_queue_group(store, queue, &fanout_group)?;
1248 }
1249 Ok(fanout_group)
1250 }
1251 }
1252}
1253
1254fn load_queue_groups(store: &UnifiedStore, queue: &str) -> RedDBResult<Vec<QueueGroupEntry>> {
1255 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1256 return Ok(Vec::new());
1257 };
1258 Ok(manager
1259 .query_all(|entity| {
1260 entity.data.as_row().is_some_and(|row| {
1261 row_text(row, "kind").as_deref() == Some("queue_group")
1262 && row_text(row, "queue").as_deref() == Some(queue)
1263 })
1264 })
1265 .into_iter()
1266 .filter_map(|entity| {
1267 let row = entity.data.as_row()?;
1268 Some(QueueGroupEntry {
1269 entity_id: entity.id,
1270 group: row_text(row, "group")?,
1271 })
1272 })
1273 .collect())
1274}
1275
1276fn save_queue_group(store: &UnifiedStore, queue: &str, group: &str) -> RedDBResult<()> {
1277 let mut fields = HashMap::new();
1278 fields.insert("kind".to_string(), Value::text("queue_group".to_string()));
1279 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1280 fields.insert("group".to_string(), Value::text(group.to_string()));
1281 fields.insert(
1282 "created_at_ns".to_string(),
1283 Value::UnsignedInteger(now_ns()),
1284 );
1285 insert_meta_row(store, fields)
1286}
1287
1288pub(super) fn load_pending_entries(
1289 store: &UnifiedStore,
1290 queue: &str,
1291 group: Option<&str>,
1292 message_id: Option<EntityId>,
1293) -> RedDBResult<Vec<QueuePendingEntry>> {
1294 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1295 return Ok(Vec::new());
1296 };
1297 Ok(manager
1298 .query_all(|entity| {
1299 entity.data.as_row().is_some_and(|row| {
1300 row_text(row, "kind").as_deref() == Some("queue_pending")
1301 && row_text(row, "queue").as_deref() == Some(queue)
1302 && group
1303 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1304 .unwrap_or(true)
1305 && message_id
1306 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1307 .unwrap_or(true)
1308 })
1309 })
1310 .into_iter()
1311 .filter_map(|entity| {
1312 let row = entity.data.as_row()?;
1313 Some(QueuePendingEntry {
1314 entity_id: entity.id,
1315 group: row_text(row, "group")?,
1316 message_id: EntityId::new(row_u64(row, "message_id")?),
1317 consumer: row_text(row, "consumer")?,
1318 delivered_at_ns: row_u64(row, "delivered_at_ns")?,
1319 delivery_count: row_u64(row, "delivery_count")
1320 .map(|value| value as u32)
1321 .unwrap_or(1),
1322 })
1323 })
1324 .collect())
1325}
1326
1327pub(super) fn save_queue_pending(
1328 store: &UnifiedStore,
1329 queue: &str,
1330 group: &str,
1331 message_id: EntityId,
1332 consumer: &str,
1333 delivered_at_ns: u64,
1334 delivery_count: u32,
1335) -> RedDBResult<()> {
1336 remove_meta_rows(store, |row| {
1337 row_text(row, "kind").as_deref() == Some("queue_pending")
1338 && row_text(row, "queue").as_deref() == Some(queue)
1339 && row_text(row, "group").as_deref() == Some(group)
1340 && row_u64(row, "message_id") == Some(message_id.raw())
1341 });
1342
1343 let mut fields = HashMap::new();
1344 fields.insert("kind".to_string(), Value::text("queue_pending".to_string()));
1345 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1346 fields.insert("group".to_string(), Value::text(group.to_string()));
1347 fields.insert(
1348 "message_id".to_string(),
1349 Value::UnsignedInteger(message_id.raw()),
1350 );
1351 fields.insert("consumer".to_string(), Value::text(consumer.to_string()));
1352 fields.insert(
1353 "delivered_at_ns".to_string(),
1354 Value::UnsignedInteger(delivered_at_ns),
1355 );
1356 fields.insert(
1357 "delivery_count".to_string(),
1358 Value::UnsignedInteger(u64::from(delivery_count)),
1359 );
1360 insert_meta_row(store, fields)
1361}
1362
1363pub(super) fn require_pending_entry(
1364 store: &UnifiedStore,
1365 queue: &str,
1366 group: &str,
1367 message_id: EntityId,
1368) -> RedDBResult<QueuePendingEntry> {
1369 load_pending_entries(store, queue, Some(group), Some(message_id))?
1370 .into_iter()
1371 .next()
1372 .ok_or_else(|| {
1373 RedDBError::NotFound(format!(
1374 "message '{}' is not pending in group '{}' on queue '{}'",
1375 message_id.raw(),
1376 group,
1377 queue
1378 ))
1379 })
1380}
1381
1382pub(super) fn load_ack_entries(
1383 store: &UnifiedStore,
1384 queue: &str,
1385 group: Option<&str>,
1386 message_id: Option<EntityId>,
1387) -> RedDBResult<Vec<QueueAckEntry>> {
1388 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1389 return Ok(Vec::new());
1390 };
1391 Ok(manager
1392 .query_all(|entity| {
1393 entity.data.as_row().is_some_and(|row| {
1394 row_text(row, "kind").as_deref() == Some("queue_ack")
1395 && row_text(row, "queue").as_deref() == Some(queue)
1396 && group
1397 .map(|group_name| row_text(row, "group").as_deref() == Some(group_name))
1398 .unwrap_or(true)
1399 && message_id
1400 .map(|candidate| row_u64(row, "message_id") == Some(candidate.raw()))
1401 .unwrap_or(true)
1402 })
1403 })
1404 .into_iter()
1405 .filter_map(|entity| {
1406 let row = entity.data.as_row()?;
1407 Some(QueueAckEntry {
1408 entity_id: entity.id,
1409 group: row_text(row, "group")?,
1410 message_id: EntityId::new(row_u64(row, "message_id")?),
1411 })
1412 })
1413 .collect())
1414}
1415
1416pub(super) fn save_queue_ack(
1417 store: &UnifiedStore,
1418 queue: &str,
1419 group: &str,
1420 message_id: EntityId,
1421) -> RedDBResult<()> {
1422 let existing = load_ack_entries(store, queue, Some(group), Some(message_id))?;
1423 if !existing.is_empty() {
1424 return Ok(());
1425 }
1426
1427 let mut fields = HashMap::new();
1428 fields.insert("kind".to_string(), Value::text("queue_ack".to_string()));
1429 fields.insert("queue".to_string(), Value::text(queue.to_string()));
1430 fields.insert("group".to_string(), Value::text(group.to_string()));
1431 fields.insert(
1432 "message_id".to_string(),
1433 Value::UnsignedInteger(message_id.raw()),
1434 );
1435 fields.insert("acked_at_ns".to_string(), Value::UnsignedInteger(now_ns()));
1436 insert_meta_row(store, fields)
1437}
1438
1439pub(super) fn queue_message_completed_for_all_groups(
1440 store: &UnifiedStore,
1441 queue: &str,
1442 message_id: EntityId,
1443) -> RedDBResult<bool> {
1444 let groups = load_queue_groups(store, queue)?;
1445 let pending = load_pending_entries(store, queue, None, Some(message_id))?;
1446 if !pending.is_empty() {
1447 return Ok(false);
1448 }
1449 if groups.is_empty() {
1450 return Ok(true);
1451 }
1452
1453 let acked_groups = load_ack_entries(store, queue, None, Some(message_id))?
1454 .into_iter()
1455 .map(|entry| entry.group)
1456 .collect::<HashSet<_>>();
1457 Ok(groups
1458 .into_iter()
1459 .all(|group| acked_groups.contains(&group.group)))
1460}
1461
1462fn load_queue_message_views(
1463 store: &UnifiedStore,
1464 queue: &str,
1465) -> RedDBResult<Vec<QueueMessageView>> {
1466 load_queue_message_views_with_runtime(None, store, queue)
1467}
1468
1469pub(super) fn load_queue_message_views_with_runtime(
1476 runtime: Option<&RedDBRuntime>,
1477 store: &UnifiedStore,
1478 queue: &str,
1479) -> RedDBResult<Vec<QueueMessageView>> {
1480 let manager = store
1481 .get_collection(queue)
1482 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))?;
1483 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1487 let rls_filter = runtime.and_then(|rt| {
1488 crate::runtime::impl_core::rls_policy_filter_for_kind(
1489 rt,
1490 queue,
1491 crate::storage::query::ast::PolicyAction::Select,
1492 crate::storage::query::ast::PolicyTargetKind::Messages,
1493 )
1494 });
1495 let rls_enabled_but_denied = runtime.map(|rt| rt.is_rls_enabled(queue)).unwrap_or(false)
1496 && rls_filter.is_none()
1497 && runtime.is_some();
1498 if rls_enabled_but_denied {
1499 return Ok(Vec::new());
1501 }
1502 let filter_arc = rls_filter.map(std::sync::Arc::new);
1503 let rt_arc = runtime;
1504 Ok(manager
1505 .query_all(move |entity| {
1506 if !matches!(entity.kind, EntityKind::QueueMessage { .. }) {
1507 return false;
1508 }
1509 if !crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity) {
1510 return false;
1511 }
1512 if let (Some(filter), Some(rt)) = (filter_arc.as_ref(), rt_arc) {
1513 return crate::runtime::query_exec::evaluate_entity_filter_with_db(
1514 Some(&rt.inner.db),
1515 entity,
1516 filter,
1517 queue,
1518 queue,
1519 );
1520 }
1521 true
1522 })
1523 .into_iter()
1524 .filter_map(queue_message_view_from_entity)
1525 .collect())
1526}
1527
1528fn queue_message_view_from_entity(entity: UnifiedEntity) -> Option<QueueMessageView> {
1529 let (position, _) = match &entity.kind {
1530 EntityKind::QueueMessage { position, queue } => (*position, queue),
1531 _ => return None,
1532 };
1533 let data = match entity.data {
1534 EntityData::QueueMessage(data) => data,
1535 _ => return None,
1536 };
1537 Some(QueueMessageView {
1538 id: entity.id,
1539 position,
1540 priority: data.priority.unwrap_or(0),
1541 payload: data.payload,
1542 attempts: data.attempts,
1543 max_attempts: data.max_attempts,
1544 enqueued_at_ns: data.enqueued_at_ns,
1545 })
1546}
1547
1548fn insert_moved_queue_message(
1549 store: &UnifiedStore,
1550 queue: &str,
1551 config: &QueueRuntimeConfig,
1552 message: &QueueMessageView,
1553) -> RedDBResult<EntityId> {
1554 let position = next_queue_position(store, queue, QueueSide::Right)?;
1555 let entity = UnifiedEntity::new(
1556 EntityId::new(0),
1557 EntityKind::QueueMessage {
1558 queue: queue.to_string(),
1559 position,
1560 },
1561 EntityData::QueueMessage(QueueMessageData {
1562 payload: message.payload.clone(),
1563 priority: if config.priority {
1564 Some(message.priority)
1565 } else {
1566 None
1567 },
1568 enqueued_at_ns: message.enqueued_at_ns,
1569 attempts: message.attempts,
1570 max_attempts: message.max_attempts,
1571 acked: false,
1572 }),
1573 );
1574 let id = store
1575 .insert_auto(queue, entity)
1576 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1577 if let Some(ttl_ms) = config.ttl_ms {
1578 store
1579 .set_metadata(queue, id, queue_message_ttl_metadata(ttl_ms))
1580 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1581 }
1582 Ok(id)
1583}
1584
1585fn queue_projection_default_columns() -> Vec<String> {
1586 [
1587 "id",
1588 "payload",
1589 "priority",
1590 "attempts",
1591 "last_error",
1592 "enqueued_at",
1593 "available_at",
1594 "dlq",
1595 "tenant",
1596 ]
1597 .into_iter()
1598 .map(str::to_string)
1599 .collect()
1600}
1601
1602fn queue_projection_record(
1603 columns: &[String],
1604 message: &QueueMessageView,
1605 dlq: bool,
1606) -> RedDBResult<UnifiedRecord> {
1607 let mut record = UnifiedRecord::new();
1608 for column in columns {
1609 let value = queue_projection_value(message, dlq, column).ok_or_else(|| {
1610 RedDBError::Query(format!("unknown queue projection column '{}'", column))
1611 })?;
1612 record.set(column, value);
1613 }
1614 Ok(record)
1615}
1616
1617fn queue_projection_value(message: &QueueMessageView, dlq: bool, column: &str) -> Option<Value> {
1618 match column {
1619 "id" => Some(Value::text(message_id_string(message.id))),
1620 "payload" => Some(message.payload.clone()),
1621 "priority" => Some(Value::Integer(i64::from(message.priority))),
1622 "attempts" => Some(Value::UnsignedInteger(u64::from(message.attempts))),
1623 "last_error" => Some(Value::Null),
1624 "enqueued_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1625 "available_at" => Some(Value::UnsignedInteger(message.enqueued_at_ns)),
1626 "dlq" => Some(Value::Boolean(dlq)),
1627 "tenant" => queue_message_tenant(&message.payload).or(Some(Value::Null)),
1628 _ => None,
1629 }
1630}
1631
1632fn queue_message_tenant(payload: &Value) -> Option<Value> {
1633 let Value::Json(bytes) = payload else {
1634 return None;
1635 };
1636 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1637 json.get("tenant")
1638 .and_then(crate::json::Value::as_str)
1639 .map(|tenant| Value::text(tenant.to_string()))
1640}
1641
1642fn queue_is_dead_letter_target(store: &UnifiedStore, queue: &str) -> bool {
1643 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
1644 return false;
1645 };
1646 !manager
1647 .query_all(|entity| {
1648 entity.data.as_row().is_some_and(|row| {
1649 row_text(row, "kind").as_deref() == Some("queue_config")
1650 && row_text(row, "dlq").as_deref() == Some(queue)
1651 })
1652 })
1653 .is_empty()
1654}
1655
1656fn queue_message_matches_filter(message: &QueueMessageView, dlq: bool, filter: &Filter) -> bool {
1657 match filter {
1658 Filter::Compare { field, op, value } => queue_filter_field_value(message, dlq, field)
1659 .is_some_and(|candidate| queue_compare_values(&candidate, value, *op)),
1660 Filter::CompareFields { left, op, right } => {
1661 match (
1662 queue_filter_field_value(message, dlq, left),
1663 queue_filter_field_value(message, dlq, right),
1664 ) {
1665 (Some(left), Some(right)) => queue_compare_values(&left, &right, *op),
1666 _ => false,
1667 }
1668 }
1669 Filter::And(left, right) => {
1670 queue_message_matches_filter(message, dlq, left)
1671 && queue_message_matches_filter(message, dlq, right)
1672 }
1673 Filter::Or(left, right) => {
1674 queue_message_matches_filter(message, dlq, left)
1675 || queue_message_matches_filter(message, dlq, right)
1676 }
1677 Filter::Not(inner) => !queue_message_matches_filter(message, dlq, inner),
1678 Filter::IsNull(field) => queue_filter_field_value(message, dlq, field)
1679 .is_none_or(|value| matches!(value, Value::Null)),
1680 Filter::IsNotNull(field) => queue_filter_field_value(message, dlq, field)
1681 .is_some_and(|value| !matches!(value, Value::Null)),
1682 Filter::In { field, values } => {
1683 queue_filter_field_value(message, dlq, field).is_some_and(|candidate| {
1684 values
1685 .iter()
1686 .any(|value| queue_values_equal(&candidate, value))
1687 })
1688 }
1689 Filter::Between { field, low, high } => queue_filter_field_value(message, dlq, field)
1690 .is_some_and(|candidate| {
1691 queue_compare_values(&candidate, low, CompareOp::Ge)
1692 && queue_compare_values(&candidate, high, CompareOp::Le)
1693 }),
1694 Filter::Like { field, pattern } => queue_filter_text(message, dlq, field)
1695 .is_some_and(|value| queue_like_matches(&value, pattern)),
1696 Filter::StartsWith { field, prefix } => {
1697 queue_filter_text(message, dlq, field).is_some_and(|value| value.starts_with(prefix))
1698 }
1699 Filter::EndsWith { field, suffix } => {
1700 queue_filter_text(message, dlq, field).is_some_and(|value| value.ends_with(suffix))
1701 }
1702 Filter::Contains { field, substring } => {
1703 queue_filter_text(message, dlq, field).is_some_and(|value| value.contains(substring))
1704 }
1705 Filter::CompareExpr { .. } => false,
1706 }
1707}
1708
1709fn queue_filter_field_value(
1710 message: &QueueMessageView,
1711 dlq: bool,
1712 field: &FieldRef,
1713) -> Option<Value> {
1714 match field {
1715 FieldRef::TableColumn { table, column } if table.is_empty() => {
1716 queue_projection_value(message, dlq, column)
1717 .or_else(|| queue_payload_field_value(&message.payload, column))
1718 }
1719 FieldRef::TableColumn { column, .. } => queue_projection_value(message, dlq, column)
1720 .or_else(|| queue_payload_field_value(&message.payload, column)),
1721 _ => None,
1722 }
1723}
1724
1725fn queue_payload_field_value(payload: &Value, field: &str) -> Option<Value> {
1726 let Value::Json(bytes) = payload else {
1727 return None;
1728 };
1729 let json: crate::json::Value = crate::json::from_slice(bytes).ok()?;
1730 let value = json.get(field)?;
1731 json_value_to_schema_value(value)
1732}
1733
1734fn json_value_to_schema_value(value: &crate::json::Value) -> Option<Value> {
1735 if matches!(value, crate::json::Value::Null) {
1736 Some(Value::Null)
1737 } else if let Some(value) = value.as_bool() {
1738 Some(Value::Boolean(value))
1739 } else if let Some(value) = value.as_i64() {
1740 Some(Value::Integer(value))
1741 } else if let Some(value) = value.as_u64() {
1742 Some(Value::UnsignedInteger(value))
1743 } else if let Some(value) = value.as_f64() {
1744 Some(Value::Float(value))
1745 } else if let Some(value) = value.as_str() {
1746 Some(Value::text(value.to_string()))
1747 } else {
1748 Some(Value::Json(value.to_string_compact().into_bytes()))
1749 }
1750}
1751
1752fn queue_filter_text(message: &QueueMessageView, dlq: bool, field: &FieldRef) -> Option<String> {
1753 queue_filter_field_value(message, dlq, field).and_then(|value| match value {
1754 Value::Text(value) => Some(value.to_string()),
1755 Value::NodeRef(value) | Value::EdgeRef(value) | Value::TableRef(value) => Some(value),
1756 Value::Integer(value) => Some(value.to_string()),
1757 Value::UnsignedInteger(value) => Some(value.to_string()),
1758 Value::Float(value) => Some(value.to_string()),
1759 Value::Boolean(value) => Some(value.to_string()),
1760 _ => None,
1761 })
1762}
1763
1764fn queue_compare_values(left: &Value, right: &Value, op: CompareOp) -> bool {
1765 match op {
1766 CompareOp::Eq => queue_values_equal(left, right),
1767 CompareOp::Ne => !queue_values_equal(left, right),
1768 CompareOp::Lt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_lt()),
1769 CompareOp::Le => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_gt()),
1770 CompareOp::Gt => queue_partial_cmp(left, right).is_some_and(|ord| ord.is_gt()),
1771 CompareOp::Ge => queue_partial_cmp(left, right).is_some_and(|ord| !ord.is_lt()),
1772 }
1773}
1774
1775fn queue_values_equal(left: &Value, right: &Value) -> bool {
1776 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1777 return (left - right).abs() < f64::EPSILON;
1778 }
1779 match (left, right) {
1780 (Value::Text(left), Value::Text(right)) => left == right,
1781 (Value::Boolean(left), Value::Boolean(right)) => left == right,
1782 _ => left == right,
1783 }
1784}
1785
1786fn queue_partial_cmp(left: &Value, right: &Value) -> Option<std::cmp::Ordering> {
1787 if let (Some(left), Some(right)) = (queue_value_number(left), queue_value_number(right)) {
1788 return left.partial_cmp(&right);
1789 }
1790 match (left, right) {
1791 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1792 _ => None,
1793 }
1794}
1795
1796fn queue_value_number(value: &Value) -> Option<f64> {
1797 match value {
1798 Value::Integer(value) => Some(*value as f64),
1799 Value::UnsignedInteger(value) => Some(*value as f64),
1800 Value::Float(value) => Some(*value),
1801 Value::Text(value) => value.parse().ok(),
1802 _ => None,
1803 }
1804}
1805
1806fn queue_like_matches(value: &str, pattern: &str) -> bool {
1807 if pattern == "%" {
1808 return true;
1809 }
1810 let starts_wild = pattern.starts_with('%');
1811 let ends_wild = pattern.ends_with('%');
1812 let needle = pattern.trim_matches('%');
1813 match (starts_wild, ends_wild) {
1814 (true, true) => value.contains(needle),
1815 (true, false) => value.ends_with(needle),
1816 (false, true) => value.starts_with(needle),
1817 (false, false) => value == needle,
1818 }
1819}
1820
1821pub(super) fn queue_message_view_by_id(
1822 store: &UnifiedStore,
1823 queue: &str,
1824 message_id: EntityId,
1825) -> RedDBResult<Option<QueueMessageView>> {
1826 let manager = queue_manager(store, queue)?;
1827 Ok(manager
1828 .get(message_id)
1829 .and_then(queue_message_view_from_entity))
1830}
1831
1832pub(super) fn sort_queue_messages(
1833 messages: &mut [QueueMessageView],
1834 config: &QueueRuntimeConfig,
1835 side: QueueSide,
1836) {
1837 messages.sort_by(|left, right| {
1838 if config.priority {
1839 right
1840 .priority
1841 .cmp(&left.priority)
1842 .then_with(|| match side {
1843 QueueSide::Left => left.position.cmp(&right.position),
1844 QueueSide::Right => right.position.cmp(&left.position),
1845 })
1846 .then_with(|| left.id.raw().cmp(&right.id.raw()))
1847 } else {
1848 match side {
1849 QueueSide::Left => left.position.cmp(&right.position),
1850 QueueSide::Right => right.position.cmp(&left.position),
1851 }
1852 .then_with(|| left.id.raw().cmp(&right.id.raw()))
1853 }
1854 });
1855}
1856
1857pub(super) fn next_queue_position(
1858 store: &UnifiedStore,
1859 queue: &str,
1860 side: QueueSide,
1861) -> RedDBResult<u64> {
1862 let messages = load_queue_message_views(store, queue)?;
1863 if messages.is_empty() {
1864 return Ok(QUEUE_POSITION_CENTER);
1865 }
1866 match side {
1867 QueueSide::Left => Ok(messages
1868 .iter()
1869 .map(|message| message.position)
1870 .min()
1871 .unwrap_or(QUEUE_POSITION_CENTER)
1872 .saturating_sub(1)),
1873 QueueSide::Right => Ok(messages
1874 .iter()
1875 .map(|message| message.position)
1876 .max()
1877 .unwrap_or(QUEUE_POSITION_CENTER)
1878 .saturating_add(1)),
1879 }
1880}
1881
1882pub(super) fn increment_queue_attempts(
1883 store: &UnifiedStore,
1884 queue: &str,
1885 message_id: EntityId,
1886) -> RedDBResult<u32> {
1887 let manager = queue_manager(store, queue)?;
1888 let mut entity = manager
1889 .get(message_id)
1890 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1891 match &mut entity.data {
1892 EntityData::QueueMessage(message) => {
1893 message.attempts = message.attempts.saturating_add(1);
1894 let attempts = message.attempts;
1895 manager
1896 .update(entity)
1897 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1898 Ok(attempts)
1899 }
1900 _ => Err(RedDBError::Query(format!(
1901 "entity '{}' is not a queue message",
1902 message_id.raw()
1903 ))),
1904 }
1905}
1906
1907pub(super) fn queue_message_attempts(
1908 store: &UnifiedStore,
1909 queue: &str,
1910 message_id: EntityId,
1911) -> RedDBResult<u32> {
1912 Ok(queue_message_data(store, queue, message_id)?.attempts)
1913}
1914
1915pub(super) fn queue_message_max_attempts(
1916 store: &UnifiedStore,
1917 queue: &str,
1918 message_id: EntityId,
1919) -> RedDBResult<u32> {
1920 Ok(queue_message_data(store, queue, message_id)?.max_attempts)
1921}
1922
1923pub(super) fn queue_message_payload(
1924 store: &UnifiedStore,
1925 queue: &str,
1926 message_id: EntityId,
1927) -> RedDBResult<Value> {
1928 Ok(queue_message_data(store, queue, message_id)?.payload)
1929}
1930
1931pub(super) fn queue_message_pending_any(
1932 store: &UnifiedStore,
1933 queue: &str,
1934 message_id: EntityId,
1935) -> RedDBResult<bool> {
1936 Ok(!load_pending_entries(store, queue, None, Some(message_id))?.is_empty())
1937}
1938
1939pub(super) fn queue_message_pending_for_group(
1940 store: &UnifiedStore,
1941 queue: &str,
1942 group: &str,
1943 message_id: EntityId,
1944) -> RedDBResult<bool> {
1945 Ok(!load_pending_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1946}
1947
1948pub(super) fn queue_message_acked_for_group(
1949 store: &UnifiedStore,
1950 queue: &str,
1951 group: &str,
1952 message_id: EntityId,
1953) -> RedDBResult<bool> {
1954 Ok(!load_ack_entries(store, queue, Some(group), Some(message_id))?.is_empty())
1955}
1956
1957fn queue_manager(
1958 store: &UnifiedStore,
1959 queue: &str,
1960) -> RedDBResult<Arc<crate::storage::unified::SegmentManager>> {
1961 store
1962 .get_collection(queue)
1963 .ok_or_else(|| RedDBError::NotFound(format!("queue '{}' not found", queue)))
1964}
1965
1966pub(super) fn queue_message_data(
1967 store: &UnifiedStore,
1968 queue: &str,
1969 message_id: EntityId,
1970) -> RedDBResult<QueueMessageData> {
1971 let manager = queue_manager(store, queue)?;
1972 let entity = manager
1973 .get(message_id)
1974 .ok_or_else(|| RedDBError::NotFound(format!("message '{}' not found", message_id.raw())))?;
1975 match entity.data {
1976 EntityData::QueueMessage(message) => Ok(message),
1977 _ => Err(RedDBError::Query(format!(
1978 "entity '{}' is not a queue message",
1979 message_id.raw()
1980 ))),
1981 }
1982}
1983
1984fn insert_meta_row(store: &UnifiedStore, fields: HashMap<String, Value>) -> RedDBResult<()> {
1985 let _ = store.get_or_create_collection(QUEUE_META_COLLECTION);
1986 store
1987 .insert_auto(
1988 QUEUE_META_COLLECTION,
1989 UnifiedEntity::new(
1990 EntityId::new(0),
1991 EntityKind::TableRow {
1992 table: Arc::from(QUEUE_META_COLLECTION),
1993 row_id: 0,
1994 },
1995 EntityData::Row(RowData {
1996 columns: Vec::new(),
1997 named: Some(fields),
1998 schema: None,
1999 }),
2000 ),
2001 )
2002 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2003 Ok(())
2004}
2005
2006pub(super) fn remove_meta_rows(store: &UnifiedStore, predicate: impl Fn(&RowData) -> bool + Sync) {
2007 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2008 return;
2009 };
2010 let rows = manager.query_all(|entity| entity.data.as_row().is_some_and(&predicate));
2011 for row in rows {
2012 let _ = store.delete(QUEUE_META_COLLECTION, row.id);
2013 }
2014}
2015
2016pub(super) fn delete_meta_entity(store: &UnifiedStore, entity_id: EntityId) {
2017 let _ = store.delete(QUEUE_META_COLLECTION, entity_id);
2018}
2019
2020fn queue_message_lock_key(queue: &str, message_id: EntityId) -> String {
2021 format!("{queue}:{}", message_id.raw())
2022}
2023
2024pub(super) fn queue_message_lock_handle(
2025 runtime: &RedDBRuntime,
2026 queue: &str,
2027 message_id: EntityId,
2028) -> Arc<parking_lot::Mutex<()>> {
2029 let key = queue_message_lock_key(queue, message_id);
2030 if let Some(lock) = runtime.inner.queue_message_locks.read().get(&key).cloned() {
2031 return lock;
2032 }
2033
2034 let mut locks = runtime.inner.queue_message_locks.write();
2035 locks
2036 .entry(key)
2037 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
2038 .clone()
2039}
2040
2041pub(super) fn forget_queue_message_lock(runtime: &RedDBRuntime, queue: &str, message_id: EntityId) {
2042 runtime
2043 .inner
2044 .queue_message_locks
2045 .write()
2046 .remove(&queue_message_lock_key(queue, message_id));
2047}
2048
2049fn parse_message_id(value: &str) -> RedDBResult<EntityId> {
2050 let raw = value.strip_prefix('e').unwrap_or(value);
2051 raw.parse::<u64>()
2052 .map(EntityId::new)
2053 .map_err(|_| RedDBError::Query(format!("invalid message id '{}'", value)))
2054}
2055
2056fn message_id_string(message_id: EntityId) -> String {
2057 message_id.raw().to_string()
2058}
2059
2060pub(crate) fn pending_counts_by_group(
2065 store: &UnifiedStore,
2066) -> std::collections::BTreeMap<(String, String), u64> {
2067 let mut counts: std::collections::BTreeMap<(String, String), u64> =
2068 std::collections::BTreeMap::new();
2069 let Some(manager) = store.get_collection(QUEUE_META_COLLECTION) else {
2070 return counts;
2071 };
2072 for entity in manager.query_all(|entity| {
2073 entity
2074 .data
2075 .as_row()
2076 .is_some_and(|row| row_text(row, "kind").as_deref() == Some("queue_pending"))
2077 }) {
2078 if let Some(row) = entity.data.as_row() {
2079 let queue = row_text(row, "queue");
2080 let group = row_text(row, "group");
2081 if let (Some(q), Some(g)) = (queue, group) {
2082 *counts.entry((q, g)).or_insert(0) += 1;
2083 }
2084 }
2085 }
2086 counts
2087}
2088
2089pub(super) fn row_text(row: &RowData, field: &str) -> Option<String> {
2090 match row.get_field(field)?.clone() {
2091 Value::Text(value) => Some(value.to_string()),
2092 Value::NodeRef(value) => Some(value),
2093 Value::EdgeRef(value) => Some(value),
2094 Value::TableRef(value) => Some(value),
2095 _ => None,
2096 }
2097}
2098
2099pub(super) fn row_u64(row: &RowData, field: &str) -> Option<u64> {
2100 match row.get_field(field)?.clone() {
2101 Value::UnsignedInteger(value) => Some(value),
2102 Value::Integer(value) if value >= 0 => Some(value as u64),
2103 Value::Float(value) if value >= 0.0 => Some(value as u64),
2104 Value::Text(value) => value.parse().ok(),
2105 _ => None,
2106 }
2107}
2108
2109fn row_bool(row: &RowData, field: &str) -> Option<bool> {
2110 match row.get_field(field)?.clone() {
2111 Value::Boolean(value) => Some(value),
2112 Value::Text(value) => match value.to_ascii_lowercase().as_str() {
2113 "true" => Some(true),
2114 "false" => Some(false),
2115 _ => None,
2116 },
2117 _ => None,
2118 }
2119}
2120
2121fn queue_collection_contract(
2122 name: &str,
2123 priority: bool,
2124 ttl_ms: Option<u64>,
2125) -> crate::physical::CollectionContract {
2126 let now = current_unix_ms();
2127 let mut context_index_fields = Vec::new();
2128 if priority {
2129 context_index_fields.push("priority".to_string());
2130 }
2131
2132 crate::physical::CollectionContract {
2133 name: name.to_string(),
2134 declared_model: crate::catalog::CollectionModel::Queue,
2135 schema_mode: crate::catalog::SchemaMode::Dynamic,
2136 origin: crate::physical::ContractOrigin::Explicit,
2137 version: 1,
2138 created_at_unix_ms: now,
2139 updated_at_unix_ms: now,
2140 default_ttl_ms: ttl_ms,
2141 vector_dimension: None,
2142 vector_metric: None,
2143 context_index_fields,
2144 declared_columns: Vec::new(),
2145 table_def: None,
2146 timestamps_enabled: false,
2147 context_index_enabled: false,
2148 metrics_raw_retention_ms: None,
2149 metrics_rollup_policies: Vec::new(),
2150 metrics_tenant_identity: None,
2151 metrics_namespace: None,
2152 append_only: true,
2156 subscriptions: Vec::new(),
2157 session_key: None,
2158 session_gap_ms: None,
2159 retention_duration_ms: None,
2160 }
2161}
2162
2163fn current_unix_ms() -> u128 {
2164 std::time::SystemTime::now()
2165 .duration_since(std::time::UNIX_EPOCH)
2166 .unwrap_or_default()
2167 .as_millis()
2168}
2169
2170pub(super) fn now_ns() -> u64 {
2171 std::time::SystemTime::now()
2172 .duration_since(std::time::UNIX_EPOCH)
2173 .unwrap_or_default()
2174 .as_nanos() as u64
2175}
2176
2177pub(super) fn queue_message_ttl_metadata(ttl_ms: u64) -> Metadata {
2178 Metadata::with_fields(
2179 [(
2180 "_ttl_ms".to_string(),
2181 if ttl_ms <= i64::MAX as u64 {
2182 MetadataValue::Int(ttl_ms as i64)
2183 } else {
2184 MetadataValue::Timestamp(ttl_ms)
2185 },
2186 )]
2187 .into_iter()
2188 .collect(),
2189 )
2190}
2191
2192fn estimate_payload_bytes(payload: &Value) -> u64 {
2194 match payload {
2195 Value::Json(v) => v.len() as u64,
2196 Value::Text(s) => s.len() as u64,
2197 _ => 64,
2198 }
2199}