use std::time::Duration;
use serde::Serialize;
use serde_json::Value;
use crate::client::{extract_job, FlashQ};
use crate::constants::CLIENT_TIMEOUT_BUFFER_MS;
use crate::errors::{check_response, Result};
use crate::types::*;
impl FlashQ {
pub async fn get_job(&self, job_id: u64) -> Result<Option<JobWithState>> {
let cmd = serde_json::json!({"cmd": "GETJOB", "id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
let job = extract_job(&resp, "job")?;
let state = resp
.get("state")
.and_then(|v| serde_json::from_value::<JobState>(v.clone()).ok());
match (job, state) {
(Some(job), Some(state)) => Ok(Some(JobWithState { job, state })),
_ => Ok(None),
}
}
pub async fn get_state(&self, job_id: u64) -> Result<Option<JobState>> {
let cmd = serde_json::json!({"cmd": "GETSTATE", "id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(resp
.get("state")
.and_then(|v| serde_json::from_value::<JobState>(v.clone()).ok()))
}
pub async fn get_result(&self, job_id: u64) -> Result<Option<Value>> {
let cmd = serde_json::json!({"cmd": "GETRESULT", "id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(resp.get("result").cloned().filter(|v| !v.is_null()))
}
pub async fn get_job_by_custom_id(&self, custom_id: &str) -> Result<Option<JobWithState>> {
let cmd = serde_json::json!({"cmd": "GETJOBBYCUSTOMID", "job_id": custom_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
let job = extract_job(&resp, "job")?;
let state = resp
.get("state")
.and_then(|v| serde_json::from_value::<JobState>(v.clone()).ok());
match (job, state) {
(Some(job), Some(state)) => Ok(Some(JobWithState { job, state })),
_ => Ok(None),
}
}
pub async fn get_jobs_batch(&self, job_ids: Vec<u64>) -> Result<Vec<JobWithState>> {
let cmd = serde_json::json!({"cmd": "GETJOBSBATCH", "ids": job_ids});
let resp = self.send(cmd).await?;
check_response(&resp)?;
let jobs = resp
.get("jobs")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| serde_json::from_value::<JobWithState>(v.clone()).ok())
.collect()
})
.unwrap_or_default();
Ok(jobs)
}
pub async fn get_jobs(
&self,
queue: Option<&str>,
state: Option<JobState>,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<JobsResult> {
let mut cmd = serde_json::json!({"cmd": "GETJOBS"});
let obj = cmd.as_object_mut().unwrap();
if let Some(q) = queue {
obj.insert("queue".into(), Value::String(q.to_string()));
}
if let Some(s) = state {
obj.insert("state".into(), serde_json::to_value(s)?);
}
if let Some(l) = limit {
obj.insert("limit".into(), Value::Number(l.into()));
}
if let Some(o) = offset {
obj.insert("offset".into(), Value::Number(o.into()));
}
let resp = self.send(cmd).await?;
check_response(&resp)?;
let jobs = resp
.get("jobs")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| serde_json::from_value::<JobWithState>(v.clone()).ok())
.collect()
})
.unwrap_or_default();
let total = resp.get("total").and_then(|v| v.as_u64()).unwrap_or(0);
Ok(JobsResult { jobs, total })
}
pub async fn get_job_counts(&self, queue: &str) -> Result<JobCounts> {
let cmd = serde_json::json!({"cmd": "GETJOBCOUNTS", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(JobCounts {
waiting: resp.get("waiting").and_then(|v| v.as_u64()).unwrap_or(0),
active: resp.get("active").and_then(|v| v.as_u64()).unwrap_or(0),
delayed: resp.get("delayed").and_then(|v| v.as_u64()).unwrap_or(0),
completed: resp.get("completed").and_then(|v| v.as_u64()).unwrap_or(0),
failed: resp.get("failed").and_then(|v| v.as_u64()).unwrap_or(0),
})
}
pub async fn count(&self, queue: &str) -> Result<u64> {
let cmd = serde_json::json!({"cmd": "COUNT", "queue": queue});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(resp.get("count").and_then(|v| v.as_u64()).unwrap_or(0))
}
pub async fn finished(&self, job_id: u64, timeout: Option<Duration>) -> Result<Option<Value>> {
let timeout_ms = timeout.map(|d| d.as_millis() as u64).unwrap_or(30_000);
let cmd = serde_json::json!({"cmd": "WAITJOB", "id": job_id, "timeout": timeout_ms});
let client_timeout = Duration::from_millis(timeout_ms + CLIENT_TIMEOUT_BUFFER_MS);
let resp = self.send_with_timeout(cmd, client_timeout).await?;
check_response(&resp)?;
Ok(resp.get("result").cloned().filter(|v| !v.is_null()))
}
pub async fn cancel(&self, job_id: u64) -> Result<bool> {
let cmd = serde_json::json!({"cmd": "CANCEL", "id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn progress(&self, job_id: u64, progress: u8, message: Option<&str>) -> Result<bool> {
let mut cmd = serde_json::json!({
"cmd": "PROGRESS",
"id": job_id,
"progress": progress,
});
if let Some(msg) = message {
cmd.as_object_mut()
.unwrap()
.insert("message".into(), Value::String(msg.to_string()));
}
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn get_progress(&self, job_id: u64) -> Result<ProgressInfo> {
let cmd = serde_json::json!({"cmd": "GETPROGRESS", "id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(ProgressInfo {
progress: resp.get("progress").and_then(|v| v.as_u64()).unwrap_or(0) as u8,
message: resp
.get("message")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
})
}
pub async fn update<T: Serialize>(&self, job_id: u64, data: T) -> Result<bool> {
let cmd = serde_json::json!({
"cmd": "UPDATEJOB",
"id": job_id,
"data": serde_json::to_value(data)?,
});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn change_priority(&self, job_id: u64, priority: i32) -> Result<bool> {
let cmd = serde_json::json!({
"cmd": "CHANGEPRIORITY",
"id": job_id,
"priority": priority,
});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn move_to_delayed(&self, job_id: u64, delay: u64) -> Result<bool> {
let cmd = serde_json::json!({
"cmd": "MOVETODELAYED",
"id": job_id,
"delay": delay,
});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn promote(&self, job_id: u64) -> Result<bool> {
let cmd = serde_json::json!({"cmd": "PROMOTE", "id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn discard(&self, job_id: u64) -> Result<bool> {
let cmd = serde_json::json!({"cmd": "DISCARD", "id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn heartbeat(&self, job_id: u64) -> Result<bool> {
let cmd = serde_json::json!({"cmd": "HEARTBEAT", "id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn log(&self, job_id: u64, message: &str, level: Option<&str>) -> Result<bool> {
let cmd = serde_json::json!({
"cmd": "LOG",
"id": job_id,
"message": message,
"level": level.unwrap_or("info"),
});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
pub async fn get_logs(&self, job_id: u64) -> Result<Vec<LogEntry>> {
let cmd = serde_json::json!({"cmd": "GETLOGS", "id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
let logs = resp
.get("logs")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| serde_json::from_value::<LogEntry>(v.clone()).ok())
.collect()
})
.unwrap_or_default();
Ok(logs)
}
pub async fn get_children(&self, job_id: u64) -> Result<Vec<u64>> {
let cmd = serde_json::json!({"cmd": "GETCHILDREN", "parent_id": job_id});
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(resp
.get("ids")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
.unwrap_or_default())
}
pub async fn partial(&self, job_id: u64, data: Value, index: Option<u32>) -> Result<bool> {
let mut cmd = serde_json::json!({
"cmd": "PARTIAL",
"id": job_id,
"data": data,
});
if let Some(idx) = index {
cmd.as_object_mut()
.unwrap()
.insert("index".into(), Value::Number(idx.into()));
}
let resp = self.send(cmd).await?;
check_response(&resp)?;
Ok(true)
}
}