Skip to main content

rrq_producer/
lib.rs

1//! RRQ Producer - A production-ready Redis job queue producer.
2//!
3//! Features:
4//! - Auto-reconnecting connection via ConnectionManager
5//! - Atomic job enqueue operations
6//! - Job result polling with timeout
7//! - Trace context propagation support
8//! - Trait-based design for easy mocking in tests
9//!
10//! # TLS Support
11//!
12//! This library uses rustls with embedded Mozilla CA roots for TLS connections.
13//! Before creating a Producer with a TLS Redis URL, you must initialize the
14//! crypto provider by calling [`init_crypto_provider`] once at application startup.
15//!
16//! ```no_run
17//! rrq_producer::init_crypto_provider();
18//! ```
19
20use anyhow::{Context, Result};
21use opentelemetry::global;
22use opentelemetry::propagation::Injector;
23use std::sync::Once;
24use tracing_opentelemetry::OpenTelemetrySpanExt;
25
26static CRYPTO_PROVIDER_INIT: Once = Once::new();
27
28/// Initialize the rustls crypto provider (ring) for TLS connections.
29///
30/// This must be called once before creating a Producer with a TLS Redis URL.
31/// It is safe to call multiple times; subsequent calls are no-ops.
32///
33/// # Example
34///
35/// ```no_run
36/// rrq_producer::init_crypto_provider();
37/// // Now you can create producers with TLS Redis URLs
38/// ```
39pub fn init_crypto_provider() {
40    CRYPTO_PROVIDER_INIT.call_once(|| {
41        let _ = rustls::crypto::ring::default_provider().install_default();
42    });
43}
44use async_trait::async_trait;
45use chrono::{DateTime, Utc};
46use redis::AsyncCommands;
47use redis::Script;
48use redis::aio::ConnectionManager;
49use rrq_config::defaults::{
50    DEFAULT_JOB_TIMEOUT_SECONDS, DEFAULT_MAX_RETRIES, DEFAULT_QUEUE_NAME,
51    DEFAULT_RESULT_TTL_SECONDS, DEFAULT_UNIQUE_JOB_LOCK_TTL_SECONDS,
52};
53use serde_json::{Map, Value};
54use std::collections::HashMap;
55use std::time::Duration;
56use uuid::Uuid;
57
58#[allow(dead_code)]
59mod ffi;
60
61const JOB_KEY_PREFIX: &str = "rrq:job:";
62const JOB_EVENTS_KEY_PREFIX: &str = "rrq:events:job:";
63const QUEUE_KEY_PREFIX: &str = "rrq:queue:";
64const IDEMPOTENCY_KEY_PREFIX: &str = "rrq:idempotency:";
65const RATE_LIMIT_KEY_PREFIX: &str = "rrq:rate_limit:";
66const DEBOUNCE_KEY_PREFIX: &str = "rrq:debounce:";
67
68/// Job status as stored in Redis.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum JobStatus {
71    Pending,
72    Active,
73    Completed,
74    Failed,
75    Retrying,
76    Unknown,
77}
78
79impl JobStatus {
80    fn from_str(s: &str) -> Self {
81        match s {
82            "PENDING" => Self::Pending,
83            "ACTIVE" => Self::Active,
84            "COMPLETED" => Self::Completed,
85            "FAILED" => Self::Failed,
86            "RETRYING" => Self::Retrying,
87            _ => Self::Unknown,
88        }
89    }
90}
91
92/// Result of a completed or failed job.
93#[derive(Debug, Clone)]
94pub struct JobResult {
95    pub status: JobStatus,
96    pub result: Option<Value>,
97    pub last_error: Option<String>,
98}
99
100/// Options for enqueuing a job.
101#[derive(Debug, Clone, Default)]
102pub struct EnqueueOptions {
103    pub queue_name: Option<String>,
104    pub job_id: Option<String>,
105    pub idempotency_key: Option<String>,
106    pub idempotency_ttl_seconds: Option<i64>,
107    pub max_retries: Option<i64>,
108    pub job_timeout_seconds: Option<i64>,
109    pub result_ttl_seconds: Option<i64>,
110    pub enqueue_time: Option<DateTime<Utc>>,
111    pub scheduled_time: Option<DateTime<Utc>>,
112    pub trace_context: Option<HashMap<String, String>>,
113}
114
115/// Trait for RRQ producer operations - enables mocking in tests.
116#[async_trait]
117pub trait ProducerHandle: Send + Sync {
118    /// Enqueue a job for processing.
119    async fn enqueue(
120        &self,
121        function_name: &str,
122        params: Map<String, Value>,
123        options: EnqueueOptions,
124    ) -> Result<String>;
125
126    /// Wait for a queued job to reach a terminal state (COMPLETED or FAILED).
127    async fn wait_for_completion(
128        &self,
129        job_id: &str,
130        timeout: Duration,
131        block_interval: Duration,
132    ) -> Result<Option<JobResult>>;
133}
134
135/// RRQ Producer with auto-reconnecting Redis connection.
136#[derive(Clone)]
137pub struct Producer {
138    manager: ConnectionManager,
139    default_queue_name: String,
140    default_max_retries: i64,
141    default_job_timeout_seconds: i64,
142    default_result_ttl_seconds: i64,
143    default_idempotency_ttl_seconds: i64,
144    correlation_mappings: HashMap<String, String>,
145    enqueue_script: Script,
146    rate_limit_script: Script,
147    debounce_script: Script,
148}
149
150/// Configuration for creating a Producer.
151#[derive(Debug, Clone)]
152pub struct ProducerConfig {
153    pub queue_name: String,
154    pub max_retries: i64,
155    pub job_timeout_seconds: i64,
156    pub result_ttl_seconds: i64,
157    pub idempotency_ttl_seconds: i64,
158    pub correlation_mappings: HashMap<String, String>,
159}
160
161impl Default for ProducerConfig {
162    fn default() -> Self {
163        Self {
164            queue_name: DEFAULT_QUEUE_NAME.to_string(),
165            max_retries: DEFAULT_MAX_RETRIES,
166            job_timeout_seconds: DEFAULT_JOB_TIMEOUT_SECONDS,
167            result_ttl_seconds: DEFAULT_RESULT_TTL_SECONDS,
168            idempotency_ttl_seconds: DEFAULT_UNIQUE_JOB_LOCK_TTL_SECONDS,
169            correlation_mappings: HashMap::new(),
170        }
171    }
172}
173
174impl Producer {
175    /// Create a new Producer with default configuration.
176    pub async fn new(redis_dsn: impl AsRef<str>) -> Result<Self> {
177        Self::with_config(redis_dsn, ProducerConfig::default()).await
178    }
179
180    /// Create a new Producer with custom configuration.
181    pub async fn with_config(redis_dsn: impl AsRef<str>, config: ProducerConfig) -> Result<Self> {
182        let client = redis::Client::open(redis_dsn.as_ref())
183            .with_context(|| "failed to create Redis client")?;
184        let manager = ConnectionManager::new(client)
185            .await
186            .with_context(|| "failed to connect to Redis")?;
187        Ok(Self {
188            manager,
189            default_queue_name: config.queue_name,
190            default_max_retries: config.max_retries,
191            default_job_timeout_seconds: config.job_timeout_seconds,
192            default_result_ttl_seconds: config.result_ttl_seconds,
193            default_idempotency_ttl_seconds: config.idempotency_ttl_seconds,
194            correlation_mappings: config.correlation_mappings,
195            enqueue_script: build_enqueue_script(),
196            rate_limit_script: build_rate_limit_script(),
197            debounce_script: build_debounce_script(),
198        })
199    }
200
201    /// Create a Producer from an existing ConnectionManager.
202    pub fn with_connection(manager: ConnectionManager, config: ProducerConfig) -> Self {
203        Self {
204            manager,
205            default_queue_name: config.queue_name,
206            default_max_retries: config.max_retries,
207            default_job_timeout_seconds: config.job_timeout_seconds,
208            default_result_ttl_seconds: config.result_ttl_seconds,
209            default_idempotency_ttl_seconds: config.idempotency_ttl_seconds,
210            correlation_mappings: config.correlation_mappings,
211            enqueue_script: build_enqueue_script(),
212            rate_limit_script: build_rate_limit_script(),
213            debounce_script: build_debounce_script(),
214        }
215    }
216
217    /// Enqueue a job for processing.
218    pub async fn enqueue(
219        &self,
220        function_name: &str,
221        params: Map<String, Value>,
222        options: EnqueueOptions,
223    ) -> Result<String> {
224        validate_name("function_name", function_name)?;
225        let queue_name = options
226            .queue_name
227            .unwrap_or_else(|| self.default_queue_name.clone());
228        validate_name("queue_name", &queue_name)?;
229        let queue_name = format_queue_key(&queue_name);
230        let job_id = options.job_id.unwrap_or_else(|| Uuid::new_v4().to_string());
231        let enqueue_time = options.enqueue_time.unwrap_or_else(Utc::now);
232        let scheduled_time = options.scheduled_time.unwrap_or(enqueue_time);
233        let max_retries = options.max_retries.unwrap_or(self.default_max_retries);
234        let job_timeout_seconds = options
235            .job_timeout_seconds
236            .unwrap_or(self.default_job_timeout_seconds);
237        if job_timeout_seconds <= 0 {
238            anyhow::bail!("job_timeout_seconds must be positive");
239        }
240        let result_ttl_seconds = options
241            .result_ttl_seconds
242            .unwrap_or(self.default_result_ttl_seconds);
243
244        let job_key = format!("{JOB_KEY_PREFIX}{job_id}");
245        let queue_key = queue_name.clone();
246        let score_ms = scheduled_time.timestamp_millis() as f64;
247        let idempotency_key = if let Some(key) = options.idempotency_key.as_deref() {
248            validate_name("idempotency_key", key)?;
249            format_idempotency_key(key)
250        } else {
251            String::new()
252        };
253        let mut idempotency_ttl_seconds = options
254            .idempotency_ttl_seconds
255            .unwrap_or(self.default_idempotency_ttl_seconds);
256        if !idempotency_key.is_empty() {
257            if idempotency_ttl_seconds <= 0 {
258                anyhow::bail!("idempotency_ttl_seconds must be positive");
259            }
260            let deferral_ms = scheduled_time
261                .signed_duration_since(enqueue_time)
262                .num_milliseconds();
263            if deferral_ms > 0 {
264                let deferral_seconds = deferral_ms.saturating_add(999) / 1000;
265                if deferral_seconds > idempotency_ttl_seconds {
266                    idempotency_ttl_seconds = deferral_seconds;
267                }
268            }
269        }
270
271        let trace_context = merge_trace_context(options.trace_context);
272        let correlation_context = extract_correlation_context(
273            &params,
274            &self.correlation_mappings,
275            trace_context.as_ref(),
276        );
277
278        let job_params_json = serde_json::to_string(&params)?;
279        let trace_context_json = if let Some(trace_context) = trace_context {
280            serde_json::to_string(&trace_context)?
281        } else {
282            String::new()
283        };
284        let correlation_context_json = if let Some(correlation_context) = correlation_context {
285            serde_json::to_string(&correlation_context)?
286        } else {
287            String::new()
288        };
289
290        // Enqueue atomically, preventing job_id collisions and supporting idempotency keys.
291        let mut conn = self.manager.clone();
292        let (status, returned_id): (i64, String) = self
293            .enqueue_script
294            .key(job_key)
295            .key(queue_key)
296            .key(idempotency_key)
297            .arg(&job_id)
298            .arg(function_name)
299            .arg(&job_params_json)
300            .arg(enqueue_time.to_rfc3339())
301            .arg("PENDING")
302            .arg(0i64)
303            .arg(scheduled_time.to_rfc3339())
304            .arg(max_retries)
305            .arg(job_timeout_seconds)
306            .arg(result_ttl_seconds)
307            .arg(&queue_name)
308            .arg("null")
309            .arg(trace_context_json)
310            .arg(correlation_context_json)
311            .arg(score_ms)
312            .arg(idempotency_ttl_seconds)
313            .invoke_async(&mut conn)
314            .await?;
315
316        match status {
317            1 => Ok(returned_id),
318            0 => Ok(returned_id),
319            -1 => anyhow::bail!("job_id already exists"),
320            _ => anyhow::bail!("unexpected enqueue status"),
321        }
322    }
323
324    /// Enqueue a job with a leading-edge rate limit.
325    ///
326    /// Returns Ok(None) when the rate limit key is already held.
327    pub async fn enqueue_with_rate_limit(
328        &self,
329        function_name: &str,
330        params: Map<String, Value>,
331        rate_limit_key: &str,
332        rate_limit_window: Duration,
333        options: EnqueueOptions,
334    ) -> Result<Option<String>> {
335        validate_name("function_name", function_name)?;
336        validate_name("rate_limit_key", rate_limit_key)?;
337        let queue_name = options
338            .queue_name
339            .unwrap_or_else(|| self.default_queue_name.clone());
340        validate_name("queue_name", &queue_name)?;
341        let queue_name = format_queue_key(&queue_name);
342        let job_id = options.job_id.unwrap_or_else(|| Uuid::new_v4().to_string());
343        let enqueue_time = options.enqueue_time.unwrap_or_else(Utc::now);
344        let scheduled_time = options.scheduled_time.unwrap_or(enqueue_time);
345        let max_retries = options.max_retries.unwrap_or(self.default_max_retries);
346        let job_timeout_seconds = options
347            .job_timeout_seconds
348            .unwrap_or(self.default_job_timeout_seconds);
349        if job_timeout_seconds <= 0 {
350            anyhow::bail!("job_timeout_seconds must be positive");
351        }
352        let result_ttl_seconds = options
353            .result_ttl_seconds
354            .unwrap_or(self.default_result_ttl_seconds);
355
356        let ttl_seconds = rate_limit_window.as_secs_f64().ceil() as i64;
357        if ttl_seconds <= 0 {
358            anyhow::bail!("rate_limit_window must be positive");
359        }
360
361        let job_key = format!("{JOB_KEY_PREFIX}{job_id}");
362        let queue_key = queue_name.clone();
363        let rate_limit_key = format_rate_limit_key(rate_limit_key);
364        let score_ms = scheduled_time.timestamp_millis() as f64;
365
366        let trace_context = merge_trace_context(options.trace_context);
367        let correlation_context = extract_correlation_context(
368            &params,
369            &self.correlation_mappings,
370            trace_context.as_ref(),
371        );
372
373        let job_params_json = serde_json::to_string(&params)?;
374        let trace_context_json = if let Some(trace_context) = trace_context {
375            serde_json::to_string(&trace_context)?
376        } else {
377            String::new()
378        };
379        let correlation_context_json = if let Some(correlation_context) = correlation_context {
380            serde_json::to_string(&correlation_context)?
381        } else {
382            String::new()
383        };
384
385        let mut conn = self.manager.clone();
386        let (status, returned_id): (i64, String) = self
387            .rate_limit_script
388            .key(job_key)
389            .key(queue_key)
390            .key(rate_limit_key)
391            .arg(&job_id)
392            .arg(function_name)
393            .arg(&job_params_json)
394            .arg(enqueue_time.to_rfc3339())
395            .arg("PENDING")
396            .arg(0i64)
397            .arg(scheduled_time.to_rfc3339())
398            .arg(max_retries)
399            .arg(job_timeout_seconds)
400            .arg(result_ttl_seconds)
401            .arg(&queue_name)
402            .arg("null")
403            .arg(trace_context_json)
404            .arg(correlation_context_json)
405            .arg(score_ms)
406            .arg(ttl_seconds)
407            .invoke_async(&mut conn)
408            .await?;
409
410        match status {
411            1 => Ok(Some(returned_id)),
412            2 => Ok(None),
413            -1 => anyhow::bail!("job_id already exists"),
414            _ => anyhow::bail!("unexpected enqueue status"),
415        }
416    }
417
418    /// Enqueue a job using trailing-edge debounce semantics.
419    ///
420    /// Reuses a pending job for the debounce key when possible and reschedules it
421    /// for `enqueue_time + debounce_window`.
422    pub async fn enqueue_with_debounce(
423        &self,
424        function_name: &str,
425        params: Map<String, Value>,
426        debounce_key: &str,
427        debounce_window: Duration,
428        options: EnqueueOptions,
429    ) -> Result<String> {
430        validate_name("function_name", function_name)?;
431        validate_name("debounce_key", debounce_key)?;
432        let queue_name = options
433            .queue_name
434            .unwrap_or_else(|| self.default_queue_name.clone());
435        validate_name("queue_name", &queue_name)?;
436        let queue_name = format_queue_key(&queue_name);
437        let job_id = options.job_id.unwrap_or_else(|| Uuid::new_v4().to_string());
438        let enqueue_time = options.enqueue_time.unwrap_or_else(Utc::now);
439        let scheduled_time = options.scheduled_time.unwrap_or_else(|| {
440            enqueue_time + chrono::Duration::from_std(debounce_window).unwrap_or_default()
441        });
442        let max_retries = options.max_retries.unwrap_or(self.default_max_retries);
443        let job_timeout_seconds = options
444            .job_timeout_seconds
445            .unwrap_or(self.default_job_timeout_seconds);
446        if job_timeout_seconds <= 0 {
447            anyhow::bail!("job_timeout_seconds must be positive");
448        }
449        let result_ttl_seconds = options
450            .result_ttl_seconds
451            .unwrap_or(self.default_result_ttl_seconds);
452
453        let ttl_seconds = debounce_window.as_secs_f64().ceil() as i64;
454        if ttl_seconds <= 0 {
455            anyhow::bail!("debounce_window must be positive");
456        }
457
458        let queue_key = queue_name.clone();
459        let debounce_key = format_debounce_key(debounce_key);
460        let score_ms = scheduled_time.timestamp_millis() as f64;
461
462        let trace_context = merge_trace_context(options.trace_context);
463        let correlation_context = extract_correlation_context(
464            &params,
465            &self.correlation_mappings,
466            trace_context.as_ref(),
467        );
468
469        let job_params_json = serde_json::to_string(&params)?;
470        let trace_context_json = if let Some(trace_context) = trace_context {
471            serde_json::to_string(&trace_context)?
472        } else {
473            String::new()
474        };
475        let correlation_context_json = if let Some(correlation_context) = correlation_context {
476            serde_json::to_string(&correlation_context)?
477        } else {
478            String::new()
479        };
480
481        let mut conn = self.manager.clone();
482        let (status, returned_id): (i64, String) = self
483            .debounce_script
484            .key(queue_key)
485            .key(debounce_key)
486            .arg(JOB_KEY_PREFIX)
487            .arg(&job_id)
488            .arg(function_name)
489            .arg(&job_params_json)
490            .arg(enqueue_time.to_rfc3339())
491            .arg("PENDING")
492            .arg(0i64)
493            .arg(scheduled_time.to_rfc3339())
494            .arg(max_retries)
495            .arg(job_timeout_seconds)
496            .arg(result_ttl_seconds)
497            .arg(&queue_name)
498            .arg("null")
499            .arg(trace_context_json)
500            .arg(correlation_context_json)
501            .arg(score_ms)
502            .arg(ttl_seconds)
503            .invoke_async(&mut conn)
504            .await?;
505
506        match status {
507            1 | 0 => Ok(returned_id),
508            -1 => anyhow::bail!("job_id already exists"),
509            _ => anyhow::bail!("unexpected enqueue status"),
510        }
511    }
512
513    /// Get the current status of a job without waiting.
514    pub async fn get_job_status(&self, job_id: &str) -> Result<Option<JobResult>> {
515        let job_key = format!("{JOB_KEY_PREFIX}{job_id}");
516        let mut conn = self.manager.clone();
517        let data: HashMap<String, String> = conn.hgetall(&job_key).await?;
518
519        if data.is_empty() {
520            return Ok(None);
521        }
522
523        let status = data
524            .get("status")
525            .map(|s| JobStatus::from_str(s))
526            .unwrap_or(JobStatus::Unknown);
527
528        let result = data.get("result").and_then(|r| parse_result(r));
529        let last_error = data.get("last_error").cloned();
530
531        Ok(Some(JobResult {
532            status,
533            result,
534            last_error,
535        }))
536    }
537
538    /// Wait for a job to reach a terminal state using Redis stream notifications.
539    pub async fn wait_for_completion(
540        &self,
541        job_id: &str,
542        timeout: Duration,
543        block_interval: Duration,
544    ) -> Result<Option<JobResult>> {
545        let deadline = tokio::time::Instant::now() + timeout;
546        let mut conn = self.manager.clone();
547        let event_key = format_job_events_key(job_id);
548        let mut stream_offset = "$".to_string();
549
550        loop {
551            if let Some(status) = self.get_job_status(job_id).await?
552                && matches!(status.status, JobStatus::Completed | JobStatus::Failed)
553            {
554                return Ok(Some(status));
555            }
556
557            let now = tokio::time::Instant::now();
558            if now >= deadline {
559                return Ok(None);
560            }
561
562            let remaining = deadline.saturating_duration_since(now);
563            let wait_for = if block_interval.is_zero() {
564                remaining
565            } else {
566                block_interval.min(remaining)
567            };
568            let wait_ms = wait_for.as_millis().clamp(1, i64::MAX as u128) as i64;
569
570            // Wait for a terminal event for this job key. We re-check job status on wake
571            // because stream events are advisory and status is the source of truth.
572            let response: redis::Value = redis::cmd("XREAD")
573                .arg("BLOCK")
574                .arg(wait_ms)
575                .arg("COUNT")
576                .arg(1)
577                .arg("STREAMS")
578                .arg(&event_key)
579                .arg(&stream_offset)
580                .query_async(&mut conn)
581                .await?;
582
583            if let Some(last_seen_id) = extract_latest_stream_entry_id(&response) {
584                stream_offset = last_seen_id;
585            }
586        }
587    }
588}
589
590struct HashMapInjector<'a>(&'a mut HashMap<String, String>);
591
592impl<'a> Injector for HashMapInjector<'a> {
593    fn set(&mut self, key: &str, value: String) {
594        self.0.entry(key.to_string()).or_insert(value);
595    }
596}
597
598fn merge_trace_context(
599    trace_context: Option<HashMap<String, String>>,
600) -> Option<HashMap<String, String>> {
601    let mut merged = trace_context.unwrap_or_default();
602    let current = tracing::Span::current().context();
603    global::get_text_map_propagator(|propagator| {
604        propagator.inject_context(&current, &mut HashMapInjector(&mut merged));
605    });
606    if merged.is_empty() {
607        return None;
608    }
609    Some(merged)
610}
611
612fn extract_correlation_context(
613    params: &Map<String, Value>,
614    mappings: &HashMap<String, String>,
615    trace_context: Option<&HashMap<String, String>>,
616) -> Option<HashMap<String, String>> {
617    if mappings.is_empty() {
618        return None;
619    }
620
621    const MAX_CORRELATION_KEYS: usize = 16;
622    const MAX_CORRELATION_KEY_LEN: usize = 64;
623    const MAX_CORRELATION_VALUE_LEN: usize = 256;
624
625    let mut correlation = HashMap::new();
626
627    for (attr_name, path) in mappings {
628        if correlation.len() >= MAX_CORRELATION_KEYS {
629            break;
630        }
631        let key = attr_name.trim();
632        if key.is_empty() || key.len() > MAX_CORRELATION_KEY_LEN {
633            continue;
634        }
635        if let Some(existing) = trace_context.and_then(|ctx| ctx.get(key))
636            && !existing.is_empty()
637        {
638            correlation.insert(
639                key.to_string(),
640                truncate_utf8(existing, MAX_CORRELATION_VALUE_LEN),
641            );
642            continue;
643        }
644
645        let Some(raw) = lookup_value_in_params(params, path) else {
646            continue;
647        };
648        let Some(value) = scalar_value_to_string(raw) else {
649            continue;
650        };
651        correlation.insert(
652            key.to_string(),
653            truncate_utf8(&value, MAX_CORRELATION_VALUE_LEN),
654        );
655    }
656
657    if correlation.is_empty() {
658        return None;
659    }
660    Some(correlation)
661}
662
663fn lookup_value_in_params<'a>(params: &'a Map<String, Value>, path: &str) -> Option<&'a Value> {
664    let trimmed = path.trim();
665    let cleaned = trimmed.strip_prefix("params.").unwrap_or(trimmed);
666    if cleaned.is_empty() {
667        return None;
668    }
669    let mut parts = cleaned.split('.');
670    let first = parts.next()?;
671    let mut current = params.get(first)?;
672    for part in parts {
673        if part.is_empty() {
674            return None;
675        }
676        current = current.as_object()?.get(part)?;
677    }
678    Some(current)
679}
680
681fn scalar_value_to_string(value: &Value) -> Option<String> {
682    match value {
683        Value::String(v) if !v.is_empty() => Some(v.clone()),
684        Value::Bool(v) => Some(v.to_string()),
685        Value::Number(v) => Some(v.to_string()),
686        _ => None,
687    }
688}
689
690fn truncate_utf8(value: &str, max_len: usize) -> String {
691    if value.len() <= max_len {
692        return value.to_string();
693    }
694    let mut out = String::with_capacity(max_len);
695    for ch in value.chars() {
696        if out.len() + ch.len_utf8() > max_len {
697            break;
698        }
699        out.push(ch);
700    }
701    out
702}
703
704#[async_trait]
705impl ProducerHandle for Producer {
706    async fn enqueue(
707        &self,
708        function_name: &str,
709        params: Map<String, Value>,
710        options: EnqueueOptions,
711    ) -> Result<String> {
712        self.enqueue(function_name, params, options).await
713    }
714
715    async fn wait_for_completion(
716        &self,
717        job_id: &str,
718        timeout: Duration,
719        block_interval: Duration,
720    ) -> Result<Option<JobResult>> {
721        self.wait_for_completion(job_id, timeout, block_interval)
722            .await
723    }
724}
725
726fn format_queue_key(queue_name: &str) -> String {
727    if queue_name.starts_with(QUEUE_KEY_PREFIX) {
728        queue_name.to_string()
729    } else {
730        format!("{QUEUE_KEY_PREFIX}{queue_name}")
731    }
732}
733
734fn format_job_events_key(job_id: &str) -> String {
735    format!("{JOB_EVENTS_KEY_PREFIX}{job_id}")
736}
737
738fn extract_latest_stream_entry_id(value: &redis::Value) -> Option<String> {
739    let redis::Value::Array(streams) = value else {
740        return None;
741    };
742    let redis::Value::Array(stream) = streams.last()? else {
743        return None;
744    };
745    let redis::Value::Array(entries) = stream.get(1)? else {
746        return None;
747    };
748    let redis::Value::Array(last_entry) = entries.last()? else {
749        return None;
750    };
751    redis_value_to_string(last_entry.first()?)
752}
753
754fn redis_value_to_string(value: &redis::Value) -> Option<String> {
755    match value {
756        redis::Value::SimpleString(s) => Some(s.clone()),
757        redis::Value::BulkString(bytes) => Some(String::from_utf8_lossy(bytes).to_string()),
758        _ => None,
759    }
760}
761
762fn format_idempotency_key(key: &str) -> String {
763    format!("{IDEMPOTENCY_KEY_PREFIX}{key}")
764}
765
766fn format_rate_limit_key(key: &str) -> String {
767    if key.starts_with(RATE_LIMIT_KEY_PREFIX) {
768        key.to_string()
769    } else {
770        format!("{RATE_LIMIT_KEY_PREFIX}{key}")
771    }
772}
773
774fn format_debounce_key(key: &str) -> String {
775    if key.starts_with(DEBOUNCE_KEY_PREFIX) {
776        key.to_string()
777    } else {
778        format!("{DEBOUNCE_KEY_PREFIX}{key}")
779    }
780}
781
782fn validate_name(label: &str, value: &str) -> Result<()> {
783    if value.trim().is_empty() {
784        anyhow::bail!("{label} cannot be empty");
785    }
786    Ok(())
787}
788
789fn build_enqueue_script() -> Script {
790    let script = format!(
791        "-- KEYS: [1] = job_key, [2] = queue_key, [3] = idempotency_key (optional)\n\
792         -- ARGV: [1] = job_id, [2] = function_name, [3] = job_params\n\
793         --       [4] = enqueue_time, [5] = status, [6] = current_retries\n\
794         --       [7] = next_scheduled_run_time, [8] = max_retries\n\
795         --       [9] = job_timeout_seconds, [10] = result_ttl_seconds\n\
796         --       [11] = queue_name, [12] = result, [13] = trace_context_json\n\
797         --       [14] = correlation_context_json, [15] = score_ms, [16] = idempotency_ttl_seconds\n\
798         local idem_key = KEYS[3]\n\
799         if idem_key ~= '' then\n\
800             local existing = redis.call('GET', idem_key)\n\
801             if existing then\n\
802                 local existing_job_key = '{job_prefix}' .. existing\n\
803                 if redis.call('EXISTS', existing_job_key) == 1 then\n\
804                     return {{0, existing}}\n\
805                 end\n\
806                 redis.call('DEL', idem_key)\n\
807             end\n\
808         end\n\
809         if redis.call('EXISTS', KEYS[1]) == 1 then\n\
810             return {{-1, ARGV[1]}}\n\
811         end\n\
812         if idem_key ~= '' then\n\
813             local ttl = tonumber(ARGV[16])\n\
814             local set_ok = nil\n\
815             if ttl and ttl > 0 then\n\
816                 set_ok = redis.call('SET', idem_key, ARGV[1], 'NX', 'EX', ttl)\n\
817             else\n\
818                 set_ok = redis.call('SET', idem_key, ARGV[1], 'NX')\n\
819             end\n\
820             if not set_ok then\n\
821                 local winner = redis.call('GET', idem_key)\n\
822                 if winner then\n\
823                     return {{0, winner}}\n\
824                 end\n\
825             end\n\
826         end\n\
827         redis.call('HSET', KEYS[1],\n\
828             'id', ARGV[1],\n\
829             'function_name', ARGV[2],\n\
830             'job_params', ARGV[3],\n\
831             'enqueue_time', ARGV[4],\n\
832             'status', ARGV[5],\n\
833             'current_retries', ARGV[6],\n\
834             'next_scheduled_run_time', ARGV[7],\n\
835             'max_retries', ARGV[8],\n\
836             'job_timeout_seconds', ARGV[9],\n\
837             'result_ttl_seconds', ARGV[10],\n\
838             'queue_name', ARGV[11],\n\
839         'result', ARGV[12])\n\
840         if ARGV[13] ~= '' then\n\
841             redis.call('HSET', KEYS[1], 'trace_context', ARGV[13])\n\
842         end\n\
843         if ARGV[14] ~= '' then\n\
844             redis.call('HSET', KEYS[1], 'correlation_context', ARGV[14])\n\
845         end\n\
846         redis.call('ZADD', KEYS[2], ARGV[15], ARGV[1])\n\
847         return {{1, ARGV[1]}}",
848        job_prefix = JOB_KEY_PREFIX
849    );
850    Script::new(&script)
851}
852
853fn build_rate_limit_script() -> Script {
854    let script = "\
855        -- KEYS: [1] = job_key, [2] = queue_key, [3] = rate_limit_key\n\
856        -- ARGV: [1] = job_id, [2] = function_name, [3] = job_params\n\
857        --       [4] = enqueue_time, [5] = status, [6] = current_retries\n\
858        --       [7] = next_scheduled_run_time, [8] = max_retries\n\
859        --       [9] = job_timeout_seconds, [10] = result_ttl_seconds\n\
860        --       [11] = queue_name, [12] = result, [13] = trace_context_json\n\
861        --       [14] = correlation_context_json, [15] = score_ms, [16] = rate_limit_ttl_seconds\n\
862        local rate_key = KEYS[3]\n\
863        local rate_set = false\n\
864        if rate_key ~= '' then\n\
865            local ttl = tonumber(ARGV[16])\n\
866            if not ttl or ttl <= 0 then\n\
867                return {-2, ARGV[1]}\n\
868            end\n\
869            local ok = redis.call('SET', rate_key, ARGV[1], 'NX', 'EX', ttl)\n\
870            if not ok then\n\
871                return {2, ''}\n\
872            end\n\
873            rate_set = true\n\
874        end\n\
875        if redis.call('EXISTS', KEYS[1]) == 1 then\n\
876            if rate_set then\n\
877                redis.call('DEL', rate_key)\n\
878            end\n\
879            return {-1, ARGV[1]}\n\
880        end\n\
881        redis.call('HSET', KEYS[1],\n\
882            'id', ARGV[1],\n\
883            'function_name', ARGV[2],\n\
884            'job_params', ARGV[3],\n\
885            'enqueue_time', ARGV[4],\n\
886            'status', ARGV[5],\n\
887            'current_retries', ARGV[6],\n\
888            'next_scheduled_run_time', ARGV[7],\n\
889            'max_retries', ARGV[8],\n\
890            'job_timeout_seconds', ARGV[9],\n\
891            'result_ttl_seconds', ARGV[10],\n\
892            'queue_name', ARGV[11],\n\
893        'result', ARGV[12])\n\
894        if ARGV[13] ~= '' then\n\
895            redis.call('HSET', KEYS[1], 'trace_context', ARGV[13])\n\
896        end\n\
897        if ARGV[14] ~= '' then\n\
898            redis.call('HSET', KEYS[1], 'correlation_context', ARGV[14])\n\
899        end\n\
900        redis.call('ZADD', KEYS[2], ARGV[15], ARGV[1])\n\
901        return {1, ARGV[1]}";
902    Script::new(script)
903}
904
905fn build_debounce_script() -> Script {
906    let script = "\
907        -- KEYS: [1] = queue_key, [2] = debounce_key\n\
908        -- ARGV: [1] = job_prefix, [2] = job_id, [3] = function_name, [4] = job_params\n\
909        --       [5] = enqueue_time, [6] = status, [7] = current_retries\n\
910        --       [8] = next_scheduled_run_time, [9] = max_retries\n\
911        --       [10] = job_timeout_seconds, [11] = result_ttl_seconds\n\
912        --       [12] = queue_name, [13] = result, [14] = trace_context_json\n\
913        --       [15] = correlation_context_json, [16] = score_ms, [17] = debounce_ttl_seconds\n\
914        local existing_id = redis.call('GET', KEYS[2])\n\
915        if existing_id then\n\
916            local existing_job_key = ARGV[1] .. existing_id\n\
917            if redis.call('EXISTS', existing_job_key) == 1 then\n\
918                local status = redis.call('HGET', existing_job_key, 'status')\n\
919                if status == 'PENDING' then\n\
920                    redis.call('HSET', existing_job_key,\n\
921                        'function_name', ARGV[3],\n\
922                        'job_params', ARGV[4],\n\
923                        'next_scheduled_run_time', ARGV[8],\n\
924                        'max_retries', ARGV[9],\n\
925                        'job_timeout_seconds', ARGV[10],\n\
926                        'result_ttl_seconds', ARGV[11],\n\
927                        'queue_name', ARGV[12])\n\
928                    if ARGV[14] ~= '' then\n\
929                        redis.call('HSET', existing_job_key, 'trace_context', ARGV[14])\n\
930                    end\n\
931                    if ARGV[15] ~= '' then\n\
932                        redis.call('HSET', existing_job_key, 'correlation_context', ARGV[15])\n\
933                    else\n\
934                        redis.call('HDEL', existing_job_key, 'correlation_context')\n\
935                    end\n\
936                    redis.call('ZADD', KEYS[1], ARGV[16], existing_id)\n\
937                    local ttl = tonumber(ARGV[17])\n\
938                    if ttl and ttl > 0 then\n\
939                        redis.call('EXPIRE', KEYS[2], ttl)\n\
940                    end\n\
941                    return {0, existing_id}\n\
942                end\n\
943            end\n\
944            redis.call('DEL', KEYS[2])\n\
945        end\n\
946        local job_key = ARGV[1] .. ARGV[2]\n\
947        if redis.call('EXISTS', job_key) == 1 then\n\
948            return {-1, ARGV[2]}\n\
949        end\n\
950        local ttl = tonumber(ARGV[17])\n\
951        if ttl and ttl > 0 then\n\
952            local ok = redis.call('SET', KEYS[2], ARGV[2], 'NX', 'EX', ttl)\n\
953            if not ok then\n\
954                local other = redis.call('GET', KEYS[2])\n\
955                if other then\n\
956                    return {0, other}\n\
957                end\n\
958                return {2, ''}\n\
959            end\n\
960        else\n\
961            redis.call('SET', KEYS[2], ARGV[2], 'NX')\n\
962        end\n\
963        redis.call('HSET', job_key,\n\
964            'id', ARGV[2],\n\
965            'function_name', ARGV[3],\n\
966            'job_params', ARGV[4],\n\
967            'enqueue_time', ARGV[5],\n\
968            'status', ARGV[6],\n\
969            'current_retries', ARGV[7],\n\
970            'next_scheduled_run_time', ARGV[8],\n\
971            'max_retries', ARGV[9],\n\
972            'job_timeout_seconds', ARGV[10],\n\
973            'result_ttl_seconds', ARGV[11],\n\
974            'queue_name', ARGV[12],\n\
975        'result', ARGV[13])\n\
976        if ARGV[14] ~= '' then\n\
977            redis.call('HSET', job_key, 'trace_context', ARGV[14])\n\
978        end\n\
979        if ARGV[15] ~= '' then\n\
980            redis.call('HSET', job_key, 'correlation_context', ARGV[15])\n\
981        end\n\
982        redis.call('ZADD', KEYS[1], ARGV[16], ARGV[2])\n\
983        return {1, ARGV[2]}";
984    Script::new(script)
985}
986
987fn parse_result(result: &str) -> Option<Value> {
988    if result.is_empty() || result == "null" {
989        return None;
990    }
991    serde_json::from_str(result).ok()
992}
993
994#[cfg(test)]
995#[path = "lib/tests.rs"]
996mod tests;