pub struct InfraQueueQueue { /* private fields */ }Expand description
A minimal queue facade backed by Redis per topic.
Implementations§
Source§impl InfraQueueQueue
impl InfraQueueQueue
Sourcepub async fn get_topic_length(&self, topic: &str) -> Result<u64, Error>
pub async fn get_topic_length(&self, topic: &str) -> Result<u64, Error>
Get the length of a specific topic list in Redis.
Sourcepub async fn get_inflight_length(&self, topic: &str) -> Result<u64, Error>
pub async fn get_inflight_length(&self, topic: &str) -> Result<u64, Error>
Get the number of in-flight messages for a specific topic.
Sourcepub async fn heartbeat(
&self,
topic: &str,
consumer_name: &str,
ttl_secs: u64,
) -> Result<(), Error>
pub async fn heartbeat( &self, topic: &str, consumer_name: &str, ttl_secs: u64, ) -> Result<(), Error>
Register a heartbeat for a consumer.
Sourcepub async fn list_active_consumers(
&self,
) -> Result<HashMap<String, Vec<String>>, Error>
pub async fn list_active_consumers( &self, ) -> Result<HashMap<String, Vec<String>>, Error>
List all active consumers for all topics.
Sourcepub async fn list_topics(&self) -> Result<Vec<String>, Error>
pub async fn list_topics(&self) -> Result<Vec<String>, Error>
List all topics currently active in Redis (those starting with the prefix). WARNING: Uses KEYS which is O(N) where N is the total number of keys in the DB.
Sourcepub fn from_env() -> Result<Self, Error>
pub fn from_env() -> Result<Self, Error>
Create a new queue using environment variables.
Precedence:
- REDIS_URL if set.
- Otherwise REDIS_HOST/REDIS_PORT/REDIS_USER/REDIS_PASSWORD parts.
Sourcepub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<(), Error>
pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<(), Error>
Enqueue a message into Redis under the given topic.
Sourcepub async fn dequeue(
&self,
topic: &str,
) -> Result<Option<InfraQueueMessage>, Error>
pub async fn dequeue( &self, topic: &str, ) -> Result<Option<InfraQueueMessage>, Error>
Basic dequeue (no visibility). Returns Ok(None) if empty.
Sourcepub async fn reclaim_inflight(&self, topic: &str) -> Result<u64, Error>
pub async fn reclaim_inflight(&self, topic: &str) -> Result<u64, Error>
Reclaim expired in-flight messages by moving them back to the ready queue.
LLM guidance:
- If a worker times out or crashes during generation, its visibility window may lapse. Those messages become reclaimable and are re-queued for redelivery.
- The retry_count is incremented to preserve a coarse history of delivery attempts and to inform backoff policy decisions.
- Your LLM worker should be idempotent. Use
message.idor business keys to avoid double-processing outputs when duplicates occur (at-least-once semantics).
Sourcepub async fn dequeue_with_visibility(
&self,
topic: &str,
visibility_timeout_ms: u64,
) -> Result<Option<DequeueWithReceipt>, Error>
pub async fn dequeue_with_visibility( &self, topic: &str, visibility_timeout_ms: u64, ) -> Result<Option<DequeueWithReceipt>, Error>
Dequeue with a visibility timeout and return a receipt token (message id).
LLM guidance:
- Set
visibility_timeout_msto cover the expected model latency (e.g., 30–120s for long generations). While visible=false, other workers won’t receive this message. - If the operation completes within the window, ACK to remove the inflight entry and body; if it fails transiently (429/timeout), NACK to reschedule with backoff.
- If the window expires first, the message is reclaimable and may be delivered again (at-least-once delivery). Ensure your worker is idempotent.
Sourcepub async fn ack(&self, topic: &str, receipt: &str) -> Result<bool, Error>
pub async fn ack(&self, topic: &str, receipt: &str) -> Result<bool, Error>
Acknowledge successful processing.
LLM guidance:
- Only ACK after you have durably persisted the model output (DB/S3) and emitted any downstream events to avoid losing work if the worker crashes right after ACK.
- Keep idempotency in mind: a duplicate ACK on the same receipt is harmless, but your side-effects (writes) should also be idempotent when possible.
Sourcepub async fn nack(
&self,
topic: &str,
receipt: &str,
policy: &RetryPolicy,
) -> Result<NackOutcome, Error>
pub async fn nack( &self, topic: &str, receipt: &str, policy: &RetryPolicy, ) -> Result<NackOutcome, Error>
Negative acknowledge: reschedule with backoff or dead-letter if retries exceeded.
LLM guidance:
- Call NACK for transient provider errors (HTTP 429, timeouts) to delay and retry without
blocking a worker. The delay is computed via
RetryPolicy(exponential backoff capped bymax_delay_ms). - After
max_retries, the message is moved toinfraqueue:{topic}:dlqfor manual triage. - Choose conservative backoff to avoid cascading failures and control spend when providers are degraded. Pair with observability on DLQ size and retry counters.