use crate::{ArcGISClient, BuilderError, Result};
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use tracing::instrument;
use super::types::{GPExecuteResult, GPJobInfo, GPMessage};
#[derive(Clone)]
pub struct GeoprocessingServiceClient<'a> {
url: String,
client: &'a ArcGISClient,
}
impl<'a> GeoprocessingServiceClient<'a> {
pub fn new(url: impl Into<String>, client: &'a ArcGISClient) -> Self {
GeoprocessingServiceClient {
url: url.into(),
client,
}
}
#[instrument(skip(self, parameters), fields(param_count = parameters.len()))]
pub async fn execute(&self, parameters: HashMap<String, Value>) -> Result<GPExecuteResult> {
tracing::debug!("Executing synchronous geoprocessing task");
let execute_url = format!("{}/execute", self.url);
let mut form: Vec<(&str, String)> = Vec::new();
form.push(("f", "json".to_string()));
for (key, value) in parameters.iter() {
let value_str = match value {
Value::String(s) => s.clone(),
_ => serde_json::to_string(value)?,
};
form.push((key.as_str(), value_str));
}
let response = self
.client
.http()
.post(&execute_url)
.form(&form)
.send()
.await?;
let result: GPExecuteResult = response.json().await?;
tracing::debug!(
result_count = result.results().len(),
message_count = result.messages().len(),
"Task execution completed"
);
Ok(result)
}
#[instrument(skip(self, parameters), fields(param_count = parameters.len()))]
pub async fn submit_job(&self, parameters: HashMap<String, Value>) -> Result<GPJobInfo> {
tracing::debug!("Submitting asynchronous geoprocessing job");
let submit_url = format!("{}/submitJob", self.url);
tracing::debug!(url = %submit_url, "Submitting job to URL");
let mut form: Vec<(&str, String)> = Vec::new();
form.push(("f", "json".to_string()));
if let Some(token) = self.client.get_token_if_required().await? {
tracing::debug!("Adding authentication token to request");
form.push(("token", token));
}
for (key, value) in parameters.iter() {
let value_str = match value {
Value::String(s) => s.clone(),
_ => serde_json::to_string(value)?,
};
let display_value = if value_str.len() > 100 {
format!("{}... ({} chars)", &value_str[..100], value_str.len())
} else {
value_str.clone()
};
tracing::debug!(param = %key, value = %display_value, "Adding parameter");
form.push((key.as_str(), value_str));
}
let response = self
.client
.http()
.post(&submit_url)
.form(&form)
.send()
.await?;
let status = response.status();
tracing::debug!(status_code = %status, "Received response");
let response_text = response.text().await?;
tracing::debug!(response_body = %response_text, "Raw response body");
let job_info: GPJobInfo = serde_json::from_str(&response_text).map_err(|e| {
tracing::error!(
parse_error = %e,
response = %response_text,
"Failed to parse job submission response"
);
e
})?;
tracing::info!(
job_id = %job_info.job_id(),
status = ?job_info.job_status(),
"Job submitted"
);
Ok(job_info)
}
#[instrument(skip(self), fields(job_id))]
pub async fn get_job_status(&self, job_id: &str) -> Result<GPJobInfo> {
tracing::debug!("Getting job status");
let status_url = format!("{}/jobs/{}", self.url, job_id);
tracing::debug!(url = %status_url, "Getting job status from URL");
let mut query_params = vec![("f", "json")];
let token_string;
if let Some(token) = self.client.get_token_if_required().await? {
tracing::debug!("Adding authentication token to status request");
token_string = token;
query_params.push(("token", token_string.as_str()));
}
let response = self
.client
.http()
.get(&status_url)
.query(&query_params)
.send()
.await?;
let status = response.status();
tracing::debug!(status_code = %status, "Received status response");
let response_text = response.text().await?;
tracing::debug!(response_body = %response_text, "Raw status response body");
let job_info: GPJobInfo = serde_json::from_str(&response_text).map_err(|e| {
tracing::error!(
parse_error = %e,
response = %response_text,
"Failed to parse job status response"
);
e
})?;
tracing::debug!(status = ?job_info.job_status(), "Job status retrieved");
Ok(job_info)
}
#[instrument(skip(self), fields(job_id))]
pub async fn get_job_result(&self, job_id: &str) -> Result<GPJobInfo> {
tracing::debug!("Getting job result");
let job_info = self.get_job_status(job_id).await?;
if !job_info.job_status().is_terminal() {
tracing::warn!(
status = ?job_info.job_status(),
"Job is not in terminal state"
);
}
Ok(job_info)
}
#[instrument(skip(self), fields(job_id))]
pub async fn cancel_job(&self, job_id: &str) -> Result<GPJobInfo> {
tracing::debug!("Cancelling job");
let cancel_url = format!("{}/jobs/{}/cancel", self.url, job_id);
let response = self
.client
.http()
.post(&cancel_url)
.form(&[("f", "json")])
.send()
.await?;
let job_info: GPJobInfo = response.json().await?;
tracing::info!(status = ?job_info.job_status(), "Job cancel requested");
Ok(job_info)
}
#[instrument(skip(self), fields(job_id))]
pub async fn get_job_messages(&self, job_id: &str) -> Result<Vec<GPMessage>> {
tracing::debug!("Getting job messages");
let messages_url = format!("{}/jobs/{}/messages", self.url, job_id);
tracing::debug!(url = %messages_url, "Getting job messages from URL");
#[derive(Deserialize)]
struct MessagesResponse {
messages: Vec<GPMessage>,
}
let mut query_params = vec![("f", "json")];
let token_string;
if let Some(token) = self.client.get_token_if_required().await? {
tracing::debug!("Adding authentication token to messages request");
token_string = token;
query_params.push(("token", token_string.as_str()));
}
let response = self
.client
.http()
.get(&messages_url)
.query(&query_params)
.send()
.await?;
let messages_response: MessagesResponse = response.json().await?;
tracing::debug!(
message_count = messages_response.messages.len(),
"Messages retrieved"
);
Ok(messages_response.messages)
}
#[instrument(skip(self), fields(job_id, param_name))]
pub async fn get_result_data(&self, job_id: &str, param_name: &str) -> Result<Value> {
tracing::debug!("Getting result data for parameter");
let result_url = format!("{}/jobs/{}/results/{}", self.url, job_id, param_name);
tracing::debug!(url = %result_url, "Fetching result data from URL");
let mut query_params = vec![("f", "json")];
let token_string;
if let Some(token) = self.client.get_token_if_required().await? {
tracing::debug!("Adding authentication token to result data request");
token_string = token;
query_params.push(("token", token_string.as_str()));
}
let response = self
.client
.http()
.get(&result_url)
.query(&query_params)
.send()
.await?;
let status = response.status();
tracing::debug!(status_code = %status, "Received result data response");
let response_text = response.text().await?;
tracing::debug!(response_body = %response_text, "Raw result data response");
let result_json: Value = serde_json::from_str(&response_text).map_err(|e| {
tracing::error!(
parse_error = %e,
response = %response_text,
"Failed to parse result data response"
);
e
})?;
tracing::debug!("Result data retrieved and parsed");
Ok(result_json)
}
#[instrument(skip(self), fields(job_id, initial_delay_ms, max_delay_ms, timeout_ms))]
pub async fn poll_until_complete(
&self,
job_id: &str,
initial_delay_ms: u64,
max_delay_ms: u64,
timeout_ms: Option<u64>,
) -> Result<GPJobInfo> {
use tokio::time::{Duration, Instant, sleep};
tracing::info!("Polling job until complete");
let start_time = Instant::now();
let mut delay_ms = initial_delay_ms;
loop {
if let Some(timeout) = timeout_ms {
if start_time.elapsed().as_millis() > timeout as u128 {
tracing::error!("Job polling timed out");
return Err(BuilderError::new(format!(
"Job polling timed out after {}ms",
timeout
))
.into());
}
}
let job_info = self.get_job_status(job_id).await?;
tracing::debug!(
status = ?job_info.job_status(),
elapsed_ms = start_time.elapsed().as_millis(),
"Polling job"
);
if job_info.job_status().is_terminal() {
tracing::info!(
status = ?job_info.job_status(),
elapsed_ms = start_time.elapsed().as_millis(),
"Job reached terminal state"
);
return Ok(job_info);
}
sleep(Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(max_delay_ms);
}
}
}