use serde::Serialize;
use serde_json::Value;
use crate::client::{extract_jobs, FlashQ};
use crate::errors::{check_response, Result};
use crate::types::*;
use crate::validation::validate_queue_name;
impl FlashQ {
pub async fn pause(&self, queue: &str) -> Result<bool> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "PAUSE", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn resume(&self, queue: &str) -> Result<bool> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "RESUME", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn is_paused(&self, queue: &str) -> Result<bool> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "ISPAUSED", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(resp
.get("paused")
.and_then(|v| v.as_bool())
.unwrap_or(false))
}
pub async fn drain(&self, queue: &str) -> Result<u64> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "DRAIN", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(resp.get("count").and_then(|v| v.as_u64()).unwrap_or(0))
}
pub async fn obliterate(&self, queue: &str) -> Result<bool> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "OBLITERATE", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn clean(
&self,
queue: &str,
grace: u64,
state: &str,
limit: Option<u32>,
) -> Result<u64> {
validate_queue_name(queue)?;
let mut cmd = serde_json::json!({
"cmd": "CLEAN",
"queue": queue,
"grace": grace,
"state": state,
});
if let Some(l) = limit {
cmd.as_object_mut()
.unwrap()
.insert("limit".into(), Value::Number(l.into()));
}
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(resp.get("count").and_then(|v| v.as_u64()).unwrap_or(0))
}
pub async fn list_queues(&self) -> Result<Vec<QueueInfo>> {
let cmd = serde_json::json!({"cmd": "LISTQUEUES"});
let resp = self.send(cmd).await?;
check_response(&resp)?;
let queues = resp
.get("queues")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| serde_json::from_value::<QueueInfo>(v.clone()).ok())
.collect()
})
.unwrap_or_default();
Ok(queues)
}
pub async fn get_dlq(&self, queue: &str, count: Option<u32>) -> Result<Vec<Job>> {
validate_queue_name(queue)?;
let mut cmd = serde_json::json!({"cmd": "DLQ", "queue": queue});
if let Some(c) = count {
cmd.as_object_mut()
.unwrap()
.insert("count".into(), Value::Number(c.into()));
}
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(extract_jobs(&resp, "jobs"))
}
pub async fn retry_dlq(&self, queue: &str, job_id: Option<u64>) -> Result<u64> {
validate_queue_name(queue)?;
let mut cmd = serde_json::json!({"cmd": "RETRYDLQ", "queue": queue});
if let Some(id) = job_id {
cmd.as_object_mut()
.unwrap()
.insert("id".into(), Value::Number(id.into()));
}
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(resp.get("count").and_then(|v| v.as_u64()).unwrap_or(0))
}
pub async fn purge_dlq(&self, queue: &str) -> Result<u64> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "PURGEDLQ", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(resp.get("count").and_then(|v| v.as_u64()).unwrap_or(0))
}
pub async fn set_rate_limit(&self, queue: &str, limit: u32) -> Result<bool> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "RATELIMIT", "queue": queue, "limit": limit});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn clear_rate_limit(&self, queue: &str) -> Result<bool> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "RATELIMITCLEAR", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn set_concurrency(&self, queue: &str, limit: u32) -> Result<bool> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "SETCONCURRENCY", "queue": queue, "limit": limit});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn clear_concurrency(&self, queue: &str) -> Result<bool> {
validate_queue_name(queue)?;
let cmd = serde_json::json!({"cmd": "CLEARCONCURRENCY", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn add_cron(&self, name: &str, opts: CronOptions) -> Result<bool> {
validate_queue_name(&opts.queue)?;
let mut cmd = serde_json::json!({
"cmd": "CRON",
"name": name,
"queue": opts.queue,
"data": opts.data,
});
let obj = cmd.as_object_mut().unwrap();
if let Some(schedule) = opts.schedule {
obj.insert("schedule".into(), Value::String(schedule));
}
if let Some(repeat_every) = opts.repeat_every {
obj.insert("repeat_every".into(), Value::Number(repeat_every.into()));
}
if let Some(priority) = opts.priority {
obj.insert("priority".into(), Value::Number(priority.into()));
}
if let Some(limit) = opts.limit {
obj.insert("limit".into(), Value::Number(limit.into()));
}
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn delete_cron(&self, name: &str) -> Result<bool> {
let cmd = serde_json::json!({"cmd": "CRONDELETE", "name": name});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn list_crons(&self) -> Result<Vec<CronJob>> {
let cmd = serde_json::json!({"cmd": "CRONLIST"});
let resp = self.send(cmd).await?;
check_response(&resp)?;
let crons = resp
.get("crons")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| serde_json::from_value::<CronJob>(v.clone()).ok())
.collect()
})
.unwrap_or_default();
Ok(crons)
}
pub async fn push_flow<T: Serialize>(
&self,
queue: &str,
data: T,
children: Vec<FlowChild>,
opts: Option<PushOptions>,
) -> Result<FlowResult> {
validate_queue_name(queue)?;
let mut cmd = serde_json::json!({
"cmd": "FLOW",
"queue": queue,
"data": serde_json::to_value(data)?,
"children": serde_json::to_value(&children)?,
});
if let Some(opts) = opts {
if let Some(p) = opts.priority {
cmd.as_object_mut()
.unwrap()
.insert("priority".into(), Value::Number(p.into()));
}
}
let resp = self.send(cmd).await?;
check_response(&resp)?;
let parent_id = resp.get("parent_id").and_then(|v| v.as_u64()).unwrap_or(0);
let children_ids = resp
.get("children_ids")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
.unwrap_or_default();
Ok(FlowResult {
parent_id,
children_ids,
})
}
pub async fn stats(&self) -> Result<Stats> {
let cmd = serde_json::json!({"cmd": "STATS"});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(Stats {
queued: resp.get("queued").and_then(|v| v.as_u64()).unwrap_or(0),
processing: resp.get("processing").and_then(|v| v.as_u64()).unwrap_or(0),
delayed: resp.get("delayed").and_then(|v| v.as_u64()).unwrap_or(0),
dlq: resp.get("dlq").and_then(|v| v.as_u64()).unwrap_or(0),
})
}
pub async fn metrics(&self) -> Result<Metrics> {
let cmd = serde_json::json!({"cmd": "METRICS"});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(Metrics {
total_pushed: resp
.get("total_pushed")
.and_then(|v| v.as_u64())
.unwrap_or(0),
total_completed: resp
.get("total_completed")
.and_then(|v| v.as_u64())
.unwrap_or(0),
total_failed: resp
.get("total_failed")
.and_then(|v| v.as_u64())
.unwrap_or(0),
jobs_per_second: resp
.get("jobs_per_second")
.and_then(|v| v.as_f64())
.unwrap_or(0.0),
avg_latency_ms: resp
.get("avg_latency_ms")
.and_then(|v| v.as_f64())
.unwrap_or(0.0),
queues: resp
.get("queues")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| serde_json::from_value::<QueueMetrics>(v.clone()).ok())
.collect()
})
.unwrap_or_default(),
})
}
}