use std::sync::Arc;
use crate::cluster::version_parser::Version;
use crate::cluster::Cluster;
use crate::errors::{Error, Result};
use crate::task::{Status, Task};
use crate::AdminPolicy;
#[derive(Debug, Clone)]
pub struct ExecuteTask {
cluster: Arc<Cluster>,
task_id: u64,
scan: bool,
}
impl ExecuteTask {
pub const fn new(cluster: Arc<Cluster>, task_id: u64, scan: bool) -> Self {
ExecuteTask {
cluster,
task_id,
scan,
}
}
pub const fn task_id(&self) -> u64 {
self.task_id
}
fn build_command(version: &Version, scan: bool, task_id: u64) -> String {
let id_key = if version >= &Version::new(8, 1, 0, 0) {
"id"
} else {
"trid"
};
if version.supports_partition_query() {
format!("query-show:{id_key}={task_id}")
} else if version.supports_query_show() {
let module = if scan { "scan" } else { "query" };
format!("{module}-show:{id_key}={task_id}")
} else {
let module = if scan { "scan" } else { "query" };
format!("jobs:module={module};cmd=get-job;{id_key}={task_id}")
}
}
fn parse_status(response: &str) -> Result<Status> {
if response.contains("ERROR:2") {
return Ok(Status::Complete);
}
if response.starts_with("ERROR") {
return Err(Error::BadResponse(format!(
"Query execute failed: {response}"
)));
}
if let Some(status_idx) = response.find("status=") {
let status_start = status_idx + "status=".len();
let status_str = &response[status_start..];
let status_end = status_str.find(':').unwrap_or(status_str.len());
let status_val = &status_str[..status_end];
if status_val.to_lowercase().starts_with("done") {
return Ok(Status::Complete);
}
return Ok(Status::InProgress);
}
Ok(Status::InProgress)
}
}
#[async_trait::async_trait]
impl Task for ExecuteTask {
async fn query_status(&self) -> Result<Status> {
let nodes = self.cluster.nodes();
if nodes.is_empty() {
return Err(Error::Connection("No connected node".to_string()));
}
let admin_policy = AdminPolicy { timeout: 3_000 };
for node in &nodes {
let command = Self::build_command(node.version(), self.scan, self.task_id);
let response = node.info(&admin_policy, &[&command[..]]).await?;
if let Some(resp) = response.get(&command) {
match Self::parse_status(resp)? {
Status::Complete => {}
other => return Ok(other),
}
} else {
return Ok(Status::InProgress);
}
}
Ok(Status::Complete)
}
}