1use anyhow::{Context, Result};
21use opentelemetry::global;
22use opentelemetry::propagation::Injector;
23use std::sync::Once;
24use tracing_opentelemetry::OpenTelemetrySpanExt;
25
26static CRYPTO_PROVIDER_INIT: Once = Once::new();
27
28pub fn init_crypto_provider() {
40 CRYPTO_PROVIDER_INIT.call_once(|| {
41 let _ = rustls::crypto::ring::default_provider().install_default();
42 });
43}
44use async_trait::async_trait;
45use chrono::{DateTime, Utc};
46use redis::AsyncCommands;
47use redis::Script;
48use redis::aio::ConnectionManager;
49use rrq_config::defaults::{
50 DEFAULT_JOB_TIMEOUT_SECONDS, DEFAULT_MAX_RETRIES, DEFAULT_QUEUE_NAME,
51 DEFAULT_RESULT_TTL_SECONDS, DEFAULT_UNIQUE_JOB_LOCK_TTL_SECONDS,
52};
53use serde_json::{Map, Value};
54use std::collections::HashMap;
55use std::time::Duration;
56use uuid::Uuid;
57
58#[allow(dead_code)]
59mod ffi;
60
61const JOB_KEY_PREFIX: &str = "rrq:job:";
62const JOB_EVENTS_KEY_PREFIX: &str = "rrq:events:job:";
63const QUEUE_KEY_PREFIX: &str = "rrq:queue:";
64const IDEMPOTENCY_KEY_PREFIX: &str = "rrq:idempotency:";
65const RATE_LIMIT_KEY_PREFIX: &str = "rrq:rate_limit:";
66const DEBOUNCE_KEY_PREFIX: &str = "rrq:debounce:";
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum JobStatus {
71 Pending,
72 Active,
73 Completed,
74 Failed,
75 Retrying,
76 Unknown,
77}
78
79impl JobStatus {
80 fn from_str(s: &str) -> Self {
81 match s {
82 "PENDING" => Self::Pending,
83 "ACTIVE" => Self::Active,
84 "COMPLETED" => Self::Completed,
85 "FAILED" => Self::Failed,
86 "RETRYING" => Self::Retrying,
87 _ => Self::Unknown,
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct JobResult {
95 pub status: JobStatus,
96 pub result: Option<Value>,
97 pub last_error: Option<String>,
98}
99
100#[derive(Debug, Clone, Default)]
102pub struct EnqueueOptions {
103 pub queue_name: Option<String>,
104 pub job_id: Option<String>,
105 pub idempotency_key: Option<String>,
106 pub idempotency_ttl_seconds: Option<i64>,
107 pub max_retries: Option<i64>,
108 pub job_timeout_seconds: Option<i64>,
109 pub result_ttl_seconds: Option<i64>,
110 pub enqueue_time: Option<DateTime<Utc>>,
111 pub scheduled_time: Option<DateTime<Utc>>,
112 pub trace_context: Option<HashMap<String, String>>,
113}
114
115#[async_trait]
117pub trait ProducerHandle: Send + Sync {
118 async fn enqueue(
120 &self,
121 function_name: &str,
122 params: Map<String, Value>,
123 options: EnqueueOptions,
124 ) -> Result<String>;
125
126 async fn wait_for_completion(
128 &self,
129 job_id: &str,
130 timeout: Duration,
131 block_interval: Duration,
132 ) -> Result<Option<JobResult>>;
133}
134
135#[derive(Clone)]
137pub struct Producer {
138 manager: ConnectionManager,
139 default_queue_name: String,
140 default_max_retries: i64,
141 default_job_timeout_seconds: i64,
142 default_result_ttl_seconds: i64,
143 default_idempotency_ttl_seconds: i64,
144 correlation_mappings: HashMap<String, String>,
145 enqueue_script: Script,
146 rate_limit_script: Script,
147 debounce_script: Script,
148}
149
150#[derive(Debug, Clone)]
152pub struct ProducerConfig {
153 pub queue_name: String,
154 pub max_retries: i64,
155 pub job_timeout_seconds: i64,
156 pub result_ttl_seconds: i64,
157 pub idempotency_ttl_seconds: i64,
158 pub correlation_mappings: HashMap<String, String>,
159}
160
161impl Default for ProducerConfig {
162 fn default() -> Self {
163 Self {
164 queue_name: DEFAULT_QUEUE_NAME.to_string(),
165 max_retries: DEFAULT_MAX_RETRIES,
166 job_timeout_seconds: DEFAULT_JOB_TIMEOUT_SECONDS,
167 result_ttl_seconds: DEFAULT_RESULT_TTL_SECONDS,
168 idempotency_ttl_seconds: DEFAULT_UNIQUE_JOB_LOCK_TTL_SECONDS,
169 correlation_mappings: HashMap::new(),
170 }
171 }
172}
173
174impl Producer {
175 pub async fn new(redis_dsn: impl AsRef<str>) -> Result<Self> {
177 Self::with_config(redis_dsn, ProducerConfig::default()).await
178 }
179
180 pub async fn with_config(redis_dsn: impl AsRef<str>, config: ProducerConfig) -> Result<Self> {
182 let client = redis::Client::open(redis_dsn.as_ref())
183 .with_context(|| "failed to create Redis client")?;
184 let manager = ConnectionManager::new(client)
185 .await
186 .with_context(|| "failed to connect to Redis")?;
187 Ok(Self {
188 manager,
189 default_queue_name: config.queue_name,
190 default_max_retries: config.max_retries,
191 default_job_timeout_seconds: config.job_timeout_seconds,
192 default_result_ttl_seconds: config.result_ttl_seconds,
193 default_idempotency_ttl_seconds: config.idempotency_ttl_seconds,
194 correlation_mappings: config.correlation_mappings,
195 enqueue_script: build_enqueue_script(),
196 rate_limit_script: build_rate_limit_script(),
197 debounce_script: build_debounce_script(),
198 })
199 }
200
201 pub fn with_connection(manager: ConnectionManager, config: ProducerConfig) -> Self {
203 Self {
204 manager,
205 default_queue_name: config.queue_name,
206 default_max_retries: config.max_retries,
207 default_job_timeout_seconds: config.job_timeout_seconds,
208 default_result_ttl_seconds: config.result_ttl_seconds,
209 default_idempotency_ttl_seconds: config.idempotency_ttl_seconds,
210 correlation_mappings: config.correlation_mappings,
211 enqueue_script: build_enqueue_script(),
212 rate_limit_script: build_rate_limit_script(),
213 debounce_script: build_debounce_script(),
214 }
215 }
216
217 pub async fn enqueue(
219 &self,
220 function_name: &str,
221 params: Map<String, Value>,
222 options: EnqueueOptions,
223 ) -> Result<String> {
224 validate_name("function_name", function_name)?;
225 let queue_name = options
226 .queue_name
227 .unwrap_or_else(|| self.default_queue_name.clone());
228 validate_name("queue_name", &queue_name)?;
229 let queue_name = format_queue_key(&queue_name);
230 let job_id = options.job_id.unwrap_or_else(|| Uuid::new_v4().to_string());
231 let enqueue_time = options.enqueue_time.unwrap_or_else(Utc::now);
232 let scheduled_time = options.scheduled_time.unwrap_or(enqueue_time);
233 let max_retries = options.max_retries.unwrap_or(self.default_max_retries);
234 let job_timeout_seconds = options
235 .job_timeout_seconds
236 .unwrap_or(self.default_job_timeout_seconds);
237 if job_timeout_seconds <= 0 {
238 anyhow::bail!("job_timeout_seconds must be positive");
239 }
240 let result_ttl_seconds = options
241 .result_ttl_seconds
242 .unwrap_or(self.default_result_ttl_seconds);
243
244 let job_key = format!("{JOB_KEY_PREFIX}{job_id}");
245 let queue_key = queue_name.clone();
246 let score_ms = scheduled_time.timestamp_millis() as f64;
247 let idempotency_key = if let Some(key) = options.idempotency_key.as_deref() {
248 validate_name("idempotency_key", key)?;
249 format_idempotency_key(key)
250 } else {
251 String::new()
252 };
253 let mut idempotency_ttl_seconds = options
254 .idempotency_ttl_seconds
255 .unwrap_or(self.default_idempotency_ttl_seconds);
256 if !idempotency_key.is_empty() {
257 if idempotency_ttl_seconds <= 0 {
258 anyhow::bail!("idempotency_ttl_seconds must be positive");
259 }
260 let deferral_ms = scheduled_time
261 .signed_duration_since(enqueue_time)
262 .num_milliseconds();
263 if deferral_ms > 0 {
264 let deferral_seconds = deferral_ms.saturating_add(999) / 1000;
265 if deferral_seconds > idempotency_ttl_seconds {
266 idempotency_ttl_seconds = deferral_seconds;
267 }
268 }
269 }
270
271 let trace_context = merge_trace_context(options.trace_context);
272 let correlation_context = extract_correlation_context(
273 ¶ms,
274 &self.correlation_mappings,
275 trace_context.as_ref(),
276 );
277
278 let job_params_json = serde_json::to_string(¶ms)?;
279 let trace_context_json = if let Some(trace_context) = trace_context {
280 serde_json::to_string(&trace_context)?
281 } else {
282 String::new()
283 };
284 let correlation_context_json = if let Some(correlation_context) = correlation_context {
285 serde_json::to_string(&correlation_context)?
286 } else {
287 String::new()
288 };
289
290 let mut conn = self.manager.clone();
292 let (status, returned_id): (i64, String) = self
293 .enqueue_script
294 .key(job_key)
295 .key(queue_key)
296 .key(idempotency_key)
297 .arg(&job_id)
298 .arg(function_name)
299 .arg(&job_params_json)
300 .arg(enqueue_time.to_rfc3339())
301 .arg("PENDING")
302 .arg(0i64)
303 .arg(scheduled_time.to_rfc3339())
304 .arg(max_retries)
305 .arg(job_timeout_seconds)
306 .arg(result_ttl_seconds)
307 .arg(&queue_name)
308 .arg("null")
309 .arg(trace_context_json)
310 .arg(correlation_context_json)
311 .arg(score_ms)
312 .arg(idempotency_ttl_seconds)
313 .invoke_async(&mut conn)
314 .await?;
315
316 match status {
317 1 => Ok(returned_id),
318 0 => Ok(returned_id),
319 -1 => anyhow::bail!("job_id already exists"),
320 _ => anyhow::bail!("unexpected enqueue status"),
321 }
322 }
323
324 pub async fn enqueue_with_rate_limit(
328 &self,
329 function_name: &str,
330 params: Map<String, Value>,
331 rate_limit_key: &str,
332 rate_limit_window: Duration,
333 options: EnqueueOptions,
334 ) -> Result<Option<String>> {
335 validate_name("function_name", function_name)?;
336 validate_name("rate_limit_key", rate_limit_key)?;
337 let queue_name = options
338 .queue_name
339 .unwrap_or_else(|| self.default_queue_name.clone());
340 validate_name("queue_name", &queue_name)?;
341 let queue_name = format_queue_key(&queue_name);
342 let job_id = options.job_id.unwrap_or_else(|| Uuid::new_v4().to_string());
343 let enqueue_time = options.enqueue_time.unwrap_or_else(Utc::now);
344 let scheduled_time = options.scheduled_time.unwrap_or(enqueue_time);
345 let max_retries = options.max_retries.unwrap_or(self.default_max_retries);
346 let job_timeout_seconds = options
347 .job_timeout_seconds
348 .unwrap_or(self.default_job_timeout_seconds);
349 if job_timeout_seconds <= 0 {
350 anyhow::bail!("job_timeout_seconds must be positive");
351 }
352 let result_ttl_seconds = options
353 .result_ttl_seconds
354 .unwrap_or(self.default_result_ttl_seconds);
355
356 let ttl_seconds = rate_limit_window.as_secs_f64().ceil() as i64;
357 if ttl_seconds <= 0 {
358 anyhow::bail!("rate_limit_window must be positive");
359 }
360
361 let job_key = format!("{JOB_KEY_PREFIX}{job_id}");
362 let queue_key = queue_name.clone();
363 let rate_limit_key = format_rate_limit_key(rate_limit_key);
364 let score_ms = scheduled_time.timestamp_millis() as f64;
365
366 let trace_context = merge_trace_context(options.trace_context);
367 let correlation_context = extract_correlation_context(
368 ¶ms,
369 &self.correlation_mappings,
370 trace_context.as_ref(),
371 );
372
373 let job_params_json = serde_json::to_string(¶ms)?;
374 let trace_context_json = if let Some(trace_context) = trace_context {
375 serde_json::to_string(&trace_context)?
376 } else {
377 String::new()
378 };
379 let correlation_context_json = if let Some(correlation_context) = correlation_context {
380 serde_json::to_string(&correlation_context)?
381 } else {
382 String::new()
383 };
384
385 let mut conn = self.manager.clone();
386 let (status, returned_id): (i64, String) = self
387 .rate_limit_script
388 .key(job_key)
389 .key(queue_key)
390 .key(rate_limit_key)
391 .arg(&job_id)
392 .arg(function_name)
393 .arg(&job_params_json)
394 .arg(enqueue_time.to_rfc3339())
395 .arg("PENDING")
396 .arg(0i64)
397 .arg(scheduled_time.to_rfc3339())
398 .arg(max_retries)
399 .arg(job_timeout_seconds)
400 .arg(result_ttl_seconds)
401 .arg(&queue_name)
402 .arg("null")
403 .arg(trace_context_json)
404 .arg(correlation_context_json)
405 .arg(score_ms)
406 .arg(ttl_seconds)
407 .invoke_async(&mut conn)
408 .await?;
409
410 match status {
411 1 => Ok(Some(returned_id)),
412 2 => Ok(None),
413 -1 => anyhow::bail!("job_id already exists"),
414 _ => anyhow::bail!("unexpected enqueue status"),
415 }
416 }
417
418 pub async fn enqueue_with_debounce(
423 &self,
424 function_name: &str,
425 params: Map<String, Value>,
426 debounce_key: &str,
427 debounce_window: Duration,
428 options: EnqueueOptions,
429 ) -> Result<String> {
430 validate_name("function_name", function_name)?;
431 validate_name("debounce_key", debounce_key)?;
432 let queue_name = options
433 .queue_name
434 .unwrap_or_else(|| self.default_queue_name.clone());
435 validate_name("queue_name", &queue_name)?;
436 let queue_name = format_queue_key(&queue_name);
437 let job_id = options.job_id.unwrap_or_else(|| Uuid::new_v4().to_string());
438 let enqueue_time = options.enqueue_time.unwrap_or_else(Utc::now);
439 let scheduled_time = options.scheduled_time.unwrap_or_else(|| {
440 enqueue_time + chrono::Duration::from_std(debounce_window).unwrap_or_default()
441 });
442 let max_retries = options.max_retries.unwrap_or(self.default_max_retries);
443 let job_timeout_seconds = options
444 .job_timeout_seconds
445 .unwrap_or(self.default_job_timeout_seconds);
446 if job_timeout_seconds <= 0 {
447 anyhow::bail!("job_timeout_seconds must be positive");
448 }
449 let result_ttl_seconds = options
450 .result_ttl_seconds
451 .unwrap_or(self.default_result_ttl_seconds);
452
453 let ttl_seconds = debounce_window.as_secs_f64().ceil() as i64;
454 if ttl_seconds <= 0 {
455 anyhow::bail!("debounce_window must be positive");
456 }
457
458 let queue_key = queue_name.clone();
459 let debounce_key = format_debounce_key(debounce_key);
460 let score_ms = scheduled_time.timestamp_millis() as f64;
461
462 let trace_context = merge_trace_context(options.trace_context);
463 let correlation_context = extract_correlation_context(
464 ¶ms,
465 &self.correlation_mappings,
466 trace_context.as_ref(),
467 );
468
469 let job_params_json = serde_json::to_string(¶ms)?;
470 let trace_context_json = if let Some(trace_context) = trace_context {
471 serde_json::to_string(&trace_context)?
472 } else {
473 String::new()
474 };
475 let correlation_context_json = if let Some(correlation_context) = correlation_context {
476 serde_json::to_string(&correlation_context)?
477 } else {
478 String::new()
479 };
480
481 let mut conn = self.manager.clone();
482 let (status, returned_id): (i64, String) = self
483 .debounce_script
484 .key(queue_key)
485 .key(debounce_key)
486 .arg(JOB_KEY_PREFIX)
487 .arg(&job_id)
488 .arg(function_name)
489 .arg(&job_params_json)
490 .arg(enqueue_time.to_rfc3339())
491 .arg("PENDING")
492 .arg(0i64)
493 .arg(scheduled_time.to_rfc3339())
494 .arg(max_retries)
495 .arg(job_timeout_seconds)
496 .arg(result_ttl_seconds)
497 .arg(&queue_name)
498 .arg("null")
499 .arg(trace_context_json)
500 .arg(correlation_context_json)
501 .arg(score_ms)
502 .arg(ttl_seconds)
503 .invoke_async(&mut conn)
504 .await?;
505
506 match status {
507 1 | 0 => Ok(returned_id),
508 -1 => anyhow::bail!("job_id already exists"),
509 _ => anyhow::bail!("unexpected enqueue status"),
510 }
511 }
512
513 pub async fn get_job_status(&self, job_id: &str) -> Result<Option<JobResult>> {
515 let job_key = format!("{JOB_KEY_PREFIX}{job_id}");
516 let mut conn = self.manager.clone();
517 let data: HashMap<String, String> = conn.hgetall(&job_key).await?;
518
519 if data.is_empty() {
520 return Ok(None);
521 }
522
523 let status = data
524 .get("status")
525 .map(|s| JobStatus::from_str(s))
526 .unwrap_or(JobStatus::Unknown);
527
528 let result = data.get("result").and_then(|r| parse_result(r));
529 let last_error = data.get("last_error").cloned();
530
531 Ok(Some(JobResult {
532 status,
533 result,
534 last_error,
535 }))
536 }
537
538 pub async fn wait_for_completion(
540 &self,
541 job_id: &str,
542 timeout: Duration,
543 block_interval: Duration,
544 ) -> Result<Option<JobResult>> {
545 let deadline = tokio::time::Instant::now() + timeout;
546 let mut conn = self.manager.clone();
547 let event_key = format_job_events_key(job_id);
548 let mut stream_offset = "$".to_string();
549
550 loop {
551 if let Some(status) = self.get_job_status(job_id).await?
552 && matches!(status.status, JobStatus::Completed | JobStatus::Failed)
553 {
554 return Ok(Some(status));
555 }
556
557 let now = tokio::time::Instant::now();
558 if now >= deadline {
559 return Ok(None);
560 }
561
562 let remaining = deadline.saturating_duration_since(now);
563 let wait_for = if block_interval.is_zero() {
564 remaining
565 } else {
566 block_interval.min(remaining)
567 };
568 let wait_ms = wait_for.as_millis().clamp(1, i64::MAX as u128) as i64;
569
570 let response: redis::Value = redis::cmd("XREAD")
573 .arg("BLOCK")
574 .arg(wait_ms)
575 .arg("COUNT")
576 .arg(1)
577 .arg("STREAMS")
578 .arg(&event_key)
579 .arg(&stream_offset)
580 .query_async(&mut conn)
581 .await?;
582
583 if let Some(last_seen_id) = extract_latest_stream_entry_id(&response) {
584 stream_offset = last_seen_id;
585 }
586 }
587 }
588}
589
590struct HashMapInjector<'a>(&'a mut HashMap<String, String>);
591
592impl<'a> Injector for HashMapInjector<'a> {
593 fn set(&mut self, key: &str, value: String) {
594 self.0.entry(key.to_string()).or_insert(value);
595 }
596}
597
598fn merge_trace_context(
599 trace_context: Option<HashMap<String, String>>,
600) -> Option<HashMap<String, String>> {
601 let mut merged = trace_context.unwrap_or_default();
602 let current = tracing::Span::current().context();
603 global::get_text_map_propagator(|propagator| {
604 propagator.inject_context(¤t, &mut HashMapInjector(&mut merged));
605 });
606 if merged.is_empty() {
607 return None;
608 }
609 Some(merged)
610}
611
612fn extract_correlation_context(
613 params: &Map<String, Value>,
614 mappings: &HashMap<String, String>,
615 trace_context: Option<&HashMap<String, String>>,
616) -> Option<HashMap<String, String>> {
617 if mappings.is_empty() {
618 return None;
619 }
620
621 const MAX_CORRELATION_KEYS: usize = 16;
622 const MAX_CORRELATION_KEY_LEN: usize = 64;
623 const MAX_CORRELATION_VALUE_LEN: usize = 256;
624
625 let mut correlation = HashMap::new();
626
627 for (attr_name, path) in mappings {
628 if correlation.len() >= MAX_CORRELATION_KEYS {
629 break;
630 }
631 let key = attr_name.trim();
632 if key.is_empty() || key.len() > MAX_CORRELATION_KEY_LEN {
633 continue;
634 }
635 if let Some(existing) = trace_context.and_then(|ctx| ctx.get(key))
636 && !existing.is_empty()
637 {
638 correlation.insert(
639 key.to_string(),
640 truncate_utf8(existing, MAX_CORRELATION_VALUE_LEN),
641 );
642 continue;
643 }
644
645 let Some(raw) = lookup_value_in_params(params, path) else {
646 continue;
647 };
648 let Some(value) = scalar_value_to_string(raw) else {
649 continue;
650 };
651 correlation.insert(
652 key.to_string(),
653 truncate_utf8(&value, MAX_CORRELATION_VALUE_LEN),
654 );
655 }
656
657 if correlation.is_empty() {
658 return None;
659 }
660 Some(correlation)
661}
662
663fn lookup_value_in_params<'a>(params: &'a Map<String, Value>, path: &str) -> Option<&'a Value> {
664 let trimmed = path.trim();
665 let cleaned = trimmed.strip_prefix("params.").unwrap_or(trimmed);
666 if cleaned.is_empty() {
667 return None;
668 }
669 let mut parts = cleaned.split('.');
670 let first = parts.next()?;
671 let mut current = params.get(first)?;
672 for part in parts {
673 if part.is_empty() {
674 return None;
675 }
676 current = current.as_object()?.get(part)?;
677 }
678 Some(current)
679}
680
681fn scalar_value_to_string(value: &Value) -> Option<String> {
682 match value {
683 Value::String(v) if !v.is_empty() => Some(v.clone()),
684 Value::Bool(v) => Some(v.to_string()),
685 Value::Number(v) => Some(v.to_string()),
686 _ => None,
687 }
688}
689
690fn truncate_utf8(value: &str, max_len: usize) -> String {
691 if value.len() <= max_len {
692 return value.to_string();
693 }
694 let mut out = String::with_capacity(max_len);
695 for ch in value.chars() {
696 if out.len() + ch.len_utf8() > max_len {
697 break;
698 }
699 out.push(ch);
700 }
701 out
702}
703
704#[async_trait]
705impl ProducerHandle for Producer {
706 async fn enqueue(
707 &self,
708 function_name: &str,
709 params: Map<String, Value>,
710 options: EnqueueOptions,
711 ) -> Result<String> {
712 self.enqueue(function_name, params, options).await
713 }
714
715 async fn wait_for_completion(
716 &self,
717 job_id: &str,
718 timeout: Duration,
719 block_interval: Duration,
720 ) -> Result<Option<JobResult>> {
721 self.wait_for_completion(job_id, timeout, block_interval)
722 .await
723 }
724}
725
726fn format_queue_key(queue_name: &str) -> String {
727 if queue_name.starts_with(QUEUE_KEY_PREFIX) {
728 queue_name.to_string()
729 } else {
730 format!("{QUEUE_KEY_PREFIX}{queue_name}")
731 }
732}
733
734fn format_job_events_key(job_id: &str) -> String {
735 format!("{JOB_EVENTS_KEY_PREFIX}{job_id}")
736}
737
738fn extract_latest_stream_entry_id(value: &redis::Value) -> Option<String> {
739 let redis::Value::Array(streams) = value else {
740 return None;
741 };
742 let redis::Value::Array(stream) = streams.last()? else {
743 return None;
744 };
745 let redis::Value::Array(entries) = stream.get(1)? else {
746 return None;
747 };
748 let redis::Value::Array(last_entry) = entries.last()? else {
749 return None;
750 };
751 redis_value_to_string(last_entry.first()?)
752}
753
754fn redis_value_to_string(value: &redis::Value) -> Option<String> {
755 match value {
756 redis::Value::SimpleString(s) => Some(s.clone()),
757 redis::Value::BulkString(bytes) => Some(String::from_utf8_lossy(bytes).to_string()),
758 _ => None,
759 }
760}
761
762fn format_idempotency_key(key: &str) -> String {
763 format!("{IDEMPOTENCY_KEY_PREFIX}{key}")
764}
765
766fn format_rate_limit_key(key: &str) -> String {
767 if key.starts_with(RATE_LIMIT_KEY_PREFIX) {
768 key.to_string()
769 } else {
770 format!("{RATE_LIMIT_KEY_PREFIX}{key}")
771 }
772}
773
774fn format_debounce_key(key: &str) -> String {
775 if key.starts_with(DEBOUNCE_KEY_PREFIX) {
776 key.to_string()
777 } else {
778 format!("{DEBOUNCE_KEY_PREFIX}{key}")
779 }
780}
781
782fn validate_name(label: &str, value: &str) -> Result<()> {
783 if value.trim().is_empty() {
784 anyhow::bail!("{label} cannot be empty");
785 }
786 Ok(())
787}
788
789fn build_enqueue_script() -> Script {
790 let script = format!(
791 "-- KEYS: [1] = job_key, [2] = queue_key, [3] = idempotency_key (optional)\n\
792 -- ARGV: [1] = job_id, [2] = function_name, [3] = job_params\n\
793 -- [4] = enqueue_time, [5] = status, [6] = current_retries\n\
794 -- [7] = next_scheduled_run_time, [8] = max_retries\n\
795 -- [9] = job_timeout_seconds, [10] = result_ttl_seconds\n\
796 -- [11] = queue_name, [12] = result, [13] = trace_context_json\n\
797 -- [14] = correlation_context_json, [15] = score_ms, [16] = idempotency_ttl_seconds\n\
798 local idem_key = KEYS[3]\n\
799 if idem_key ~= '' then\n\
800 local existing = redis.call('GET', idem_key)\n\
801 if existing then\n\
802 local existing_job_key = '{job_prefix}' .. existing\n\
803 if redis.call('EXISTS', existing_job_key) == 1 then\n\
804 return {{0, existing}}\n\
805 end\n\
806 redis.call('DEL', idem_key)\n\
807 end\n\
808 end\n\
809 if redis.call('EXISTS', KEYS[1]) == 1 then\n\
810 return {{-1, ARGV[1]}}\n\
811 end\n\
812 if idem_key ~= '' then\n\
813 local ttl = tonumber(ARGV[16])\n\
814 local set_ok = nil\n\
815 if ttl and ttl > 0 then\n\
816 set_ok = redis.call('SET', idem_key, ARGV[1], 'NX', 'EX', ttl)\n\
817 else\n\
818 set_ok = redis.call('SET', idem_key, ARGV[1], 'NX')\n\
819 end\n\
820 if not set_ok then\n\
821 local winner = redis.call('GET', idem_key)\n\
822 if winner then\n\
823 return {{0, winner}}\n\
824 end\n\
825 end\n\
826 end\n\
827 redis.call('HSET', KEYS[1],\n\
828 'id', ARGV[1],\n\
829 'function_name', ARGV[2],\n\
830 'job_params', ARGV[3],\n\
831 'enqueue_time', ARGV[4],\n\
832 'status', ARGV[5],\n\
833 'current_retries', ARGV[6],\n\
834 'next_scheduled_run_time', ARGV[7],\n\
835 'max_retries', ARGV[8],\n\
836 'job_timeout_seconds', ARGV[9],\n\
837 'result_ttl_seconds', ARGV[10],\n\
838 'queue_name', ARGV[11],\n\
839 'result', ARGV[12])\n\
840 if ARGV[13] ~= '' then\n\
841 redis.call('HSET', KEYS[1], 'trace_context', ARGV[13])\n\
842 end\n\
843 if ARGV[14] ~= '' then\n\
844 redis.call('HSET', KEYS[1], 'correlation_context', ARGV[14])\n\
845 end\n\
846 redis.call('ZADD', KEYS[2], ARGV[15], ARGV[1])\n\
847 return {{1, ARGV[1]}}",
848 job_prefix = JOB_KEY_PREFIX
849 );
850 Script::new(&script)
851}
852
853fn build_rate_limit_script() -> Script {
854 let script = "\
855 -- KEYS: [1] = job_key, [2] = queue_key, [3] = rate_limit_key\n\
856 -- ARGV: [1] = job_id, [2] = function_name, [3] = job_params\n\
857 -- [4] = enqueue_time, [5] = status, [6] = current_retries\n\
858 -- [7] = next_scheduled_run_time, [8] = max_retries\n\
859 -- [9] = job_timeout_seconds, [10] = result_ttl_seconds\n\
860 -- [11] = queue_name, [12] = result, [13] = trace_context_json\n\
861 -- [14] = correlation_context_json, [15] = score_ms, [16] = rate_limit_ttl_seconds\n\
862 local rate_key = KEYS[3]\n\
863 local rate_set = false\n\
864 if rate_key ~= '' then\n\
865 local ttl = tonumber(ARGV[16])\n\
866 if not ttl or ttl <= 0 then\n\
867 return {-2, ARGV[1]}\n\
868 end\n\
869 local ok = redis.call('SET', rate_key, ARGV[1], 'NX', 'EX', ttl)\n\
870 if not ok then\n\
871 return {2, ''}\n\
872 end\n\
873 rate_set = true\n\
874 end\n\
875 if redis.call('EXISTS', KEYS[1]) == 1 then\n\
876 if rate_set then\n\
877 redis.call('DEL', rate_key)\n\
878 end\n\
879 return {-1, ARGV[1]}\n\
880 end\n\
881 redis.call('HSET', KEYS[1],\n\
882 'id', ARGV[1],\n\
883 'function_name', ARGV[2],\n\
884 'job_params', ARGV[3],\n\
885 'enqueue_time', ARGV[4],\n\
886 'status', ARGV[5],\n\
887 'current_retries', ARGV[6],\n\
888 'next_scheduled_run_time', ARGV[7],\n\
889 'max_retries', ARGV[8],\n\
890 'job_timeout_seconds', ARGV[9],\n\
891 'result_ttl_seconds', ARGV[10],\n\
892 'queue_name', ARGV[11],\n\
893 'result', ARGV[12])\n\
894 if ARGV[13] ~= '' then\n\
895 redis.call('HSET', KEYS[1], 'trace_context', ARGV[13])\n\
896 end\n\
897 if ARGV[14] ~= '' then\n\
898 redis.call('HSET', KEYS[1], 'correlation_context', ARGV[14])\n\
899 end\n\
900 redis.call('ZADD', KEYS[2], ARGV[15], ARGV[1])\n\
901 return {1, ARGV[1]}";
902 Script::new(script)
903}
904
905fn build_debounce_script() -> Script {
906 let script = "\
907 -- KEYS: [1] = queue_key, [2] = debounce_key\n\
908 -- ARGV: [1] = job_prefix, [2] = job_id, [3] = function_name, [4] = job_params\n\
909 -- [5] = enqueue_time, [6] = status, [7] = current_retries\n\
910 -- [8] = next_scheduled_run_time, [9] = max_retries\n\
911 -- [10] = job_timeout_seconds, [11] = result_ttl_seconds\n\
912 -- [12] = queue_name, [13] = result, [14] = trace_context_json\n\
913 -- [15] = correlation_context_json, [16] = score_ms, [17] = debounce_ttl_seconds\n\
914 local existing_id = redis.call('GET', KEYS[2])\n\
915 if existing_id then\n\
916 local existing_job_key = ARGV[1] .. existing_id\n\
917 if redis.call('EXISTS', existing_job_key) == 1 then\n\
918 local status = redis.call('HGET', existing_job_key, 'status')\n\
919 if status == 'PENDING' then\n\
920 redis.call('HSET', existing_job_key,\n\
921 'function_name', ARGV[3],\n\
922 'job_params', ARGV[4],\n\
923 'next_scheduled_run_time', ARGV[8],\n\
924 'max_retries', ARGV[9],\n\
925 'job_timeout_seconds', ARGV[10],\n\
926 'result_ttl_seconds', ARGV[11],\n\
927 'queue_name', ARGV[12])\n\
928 if ARGV[14] ~= '' then\n\
929 redis.call('HSET', existing_job_key, 'trace_context', ARGV[14])\n\
930 end\n\
931 if ARGV[15] ~= '' then\n\
932 redis.call('HSET', existing_job_key, 'correlation_context', ARGV[15])\n\
933 else\n\
934 redis.call('HDEL', existing_job_key, 'correlation_context')\n\
935 end\n\
936 redis.call('ZADD', KEYS[1], ARGV[16], existing_id)\n\
937 local ttl = tonumber(ARGV[17])\n\
938 if ttl and ttl > 0 then\n\
939 redis.call('EXPIRE', KEYS[2], ttl)\n\
940 end\n\
941 return {0, existing_id}\n\
942 end\n\
943 end\n\
944 redis.call('DEL', KEYS[2])\n\
945 end\n\
946 local job_key = ARGV[1] .. ARGV[2]\n\
947 if redis.call('EXISTS', job_key) == 1 then\n\
948 return {-1, ARGV[2]}\n\
949 end\n\
950 local ttl = tonumber(ARGV[17])\n\
951 if ttl and ttl > 0 then\n\
952 local ok = redis.call('SET', KEYS[2], ARGV[2], 'NX', 'EX', ttl)\n\
953 if not ok then\n\
954 local other = redis.call('GET', KEYS[2])\n\
955 if other then\n\
956 return {0, other}\n\
957 end\n\
958 return {2, ''}\n\
959 end\n\
960 else\n\
961 redis.call('SET', KEYS[2], ARGV[2], 'NX')\n\
962 end\n\
963 redis.call('HSET', job_key,\n\
964 'id', ARGV[2],\n\
965 'function_name', ARGV[3],\n\
966 'job_params', ARGV[4],\n\
967 'enqueue_time', ARGV[5],\n\
968 'status', ARGV[6],\n\
969 'current_retries', ARGV[7],\n\
970 'next_scheduled_run_time', ARGV[8],\n\
971 'max_retries', ARGV[9],\n\
972 'job_timeout_seconds', ARGV[10],\n\
973 'result_ttl_seconds', ARGV[11],\n\
974 'queue_name', ARGV[12],\n\
975 'result', ARGV[13])\n\
976 if ARGV[14] ~= '' then\n\
977 redis.call('HSET', job_key, 'trace_context', ARGV[14])\n\
978 end\n\
979 if ARGV[15] ~= '' then\n\
980 redis.call('HSET', job_key, 'correlation_context', ARGV[15])\n\
981 end\n\
982 redis.call('ZADD', KEYS[1], ARGV[16], ARGV[2])\n\
983 return {1, ARGV[2]}";
984 Script::new(script)
985}
986
987fn parse_result(result: &str) -> Option<Value> {
988 if result.is_empty() || result == "null" {
989 return None;
990 }
991 serde_json::from_str(result).ok()
992}
993
994#[cfg(test)]
995#[path = "lib/tests.rs"]
996mod tests;