#![deny(missing_docs)]
#![allow(clippy::unnecessary_wraps)] #[cfg(feature = "metrics")]
use http::StatusCode;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::{body::Buf, Method};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use hyper_util::client::legacy::{connect::HttpConnector, Builder, Client};
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::{env, time::Duration};
use thiserror::Error;
use tokio_retry::Condition;
mod hyper_wrapper;
mod serialization_helpers;
pub mod types;
#[cfg(feature = "metrics")]
pub use metrics::MetricInfo;
pub use metrics::{Function, HttpMethod};
pub use types::*;
mod metrics;
#[cfg(feature = "metrics")]
use metrics::MetricInfoWrapper;
#[cfg(feature = "trace")]
use opentelemetry::global;
#[cfg(feature = "trace")]
use opentelemetry::global::BoxedTracer;
#[cfg(feature = "trace")]
use opentelemetry::trace::Span;
#[cfg(feature = "trace")]
use opentelemetry::trace::Status;
#[derive(Debug, Error)]
pub enum NomadError {
#[error(transparent)]
InvalidRequest(serde_json::error::Error),
#[error(transparent)]
RequestError(http::Error),
#[error(transparent)]
ResponseError(hyper_util::client::legacy::Error),
#[error(transparent)]
InvalidResponse(hyper::Error),
#[error(transparent)]
ResponseDeserializationFailed(serde_json::error::Error),
#[error("status code: {0}, body {1}")]
UnexpectedResponseCode(hyper::http::StatusCode, String),
#[error("Failed to get a response from nomad in {} milliseconds", .0.as_millis())]
Timeout(Duration),
#[error("Failed to build a connector with native roots. {0}")]
NativeRootsError(std::io::Error),
}
const DEFAULT_NAMESPACE: &str = "default";
const DEFAULT_MAX_RETRIES: usize = 3;
const DEFAULT_BACKOFF_INTERVAL: std::time::Duration = std::time::Duration::from_millis(250);
pub(crate) type Result<T> = std::result::Result<T, NomadError>;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
pub address: String,
pub token: Option<String>,
pub max_retries: Option<usize>,
pub backoff_interval: Option<std::time::Duration>,
#[serde(skip)]
#[serde(default = "default_builder")]
pub hyper_builder: hyper_util::client::legacy::Builder,
}
fn default_builder() -> Builder {
Builder::new(hyper_util::rt::TokioExecutor::new())
.pool_idle_timeout(std::time::Duration::from_millis(0))
.pool_max_idle_per_host(0)
.to_owned()
}
impl Default for Config {
fn default() -> Self {
Config {
address: "".to_string(),
token: None,
max_retries: None,
backoff_interval: None,
hyper_builder: default_builder(),
}
}
}
impl Config {
pub fn from_env() -> Self {
let token = env::var("NOMAD_TOKEN").unwrap_or_default();
let addr = env::var("NOMAD_ADDR").unwrap_or_else(|_| "http://127.0.0.1:4646".to_string());
Config {
address: addr,
token: Some(token),
max_retries: None,
backoff_interval: None,
hyper_builder: default_builder(),
}
}
}
fn https_connector() -> HttpsConnector<HttpConnector> {
HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.build()
}
#[derive(Debug)]
pub struct Nomad {
https_client: Client<hyper_rustls::HttpsConnector<HttpConnector>, BoxBody<Bytes, Infallible>>,
config: Config,
timeout: Duration,
#[cfg(feature = "trace")]
tracer: BoxedTracer,
#[cfg(feature = "metrics")]
metrics_tx: tokio::sync::mpsc::UnboundedSender<MetricInfo>,
#[cfg(feature = "metrics")]
metrics_rx: Option<tokio::sync::mpsc::UnboundedReceiver<MetricInfo>>,
}
impl Nomad {
pub fn new(config: Config, timeout: Duration) -> Self {
let https = https_connector();
let https_client = config
.hyper_builder
.build::<_, BoxBody<Bytes, Infallible>>(https);
#[cfg(feature = "metrics")]
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricInfo>();
Nomad {
https_client,
config,
timeout,
#[cfg(feature = "trace")]
tracer: global::tracer("nomad"),
#[cfg(feature = "metrics")]
metrics_tx: tx,
#[cfg(feature = "metrics")]
metrics_rx: Some(rx),
}
}
#[cfg(feature = "metrics")]
pub fn metrics_receiver(&mut self) -> Option<tokio::sync::mpsc::UnboundedReceiver<MetricInfo>> {
self.metrics_rx.take()
}
pub async fn list_jobs(
&self,
prefix: &str,
namespace: Option<&str>,
) -> Result<Vec<ConciseJob>> {
let uri = format!(
"{}/v1/jobs?prefix={}&namespace={}",
self.config.address,
prefix,
namespace.unwrap_or(DEFAULT_NAMESPACE),
);
let response_body = self
.execute_request::<_, ()>(
|| hyper::Request::builder().method(Method::GET).uri(&uri),
None,
Function::ListJobs,
)
.await?;
serde_json::from_reader(response_body.reader())
.map_err(NomadError::ResponseDeserializationFailed)
}
pub async fn create_job_plan(&self, job: &Job) -> Result<CreateJobPlanResponse> {
let request_body = CreateJobPlanRequest {
job,
diff: true,
policy_override: false,
};
let response_body = self
.execute_request(
|| {
hyper::Request::builder().method(Method::POST).uri(format!(
"{}/v1/job/{}/plan?namespace={}",
self.config.address,
job.id,
if job.namespace.is_empty() {
"default"
} else {
&job.namespace
}
))
},
Some(request_body),
Function::CreateJobPlan,
)
.await?;
serde_json::from_reader(response_body.reader())
.map_err(NomadError::ResponseDeserializationFailed)
}
pub async fn retrieve_most_recent_job_deployment(
&self,
job_id: &str,
namespace: Option<&str>,
) -> Result<Option<JobDeployment>> {
let uri = format!(
"{}/v1/job/{}/deployment?namespace={}",
self.config.address,
job_id,
namespace.unwrap_or(DEFAULT_NAMESPACE)
);
let response_body = self
.execute_request::<_, ()>(
|| hyper::Request::builder().method(Method::GET).uri(&uri),
None,
Function::RetrieveMostRecentJobDeployment,
)
.await?;
serde_json::from_reader(response_body.reader())
.map_err(NomadError::ResponseDeserializationFailed)
}
pub async fn create_or_update_job(
&self,
request: JobRegisterRequest,
) -> Result<JobRegisterResponse> {
let uri = format!("{}/v1/job/{}", self.config.address, request.job.id);
let response_body = self
.execute_request(
|| hyper::Request::builder().method(Method::POST).uri(&uri),
Some(request),
Function::CreateOrUpdateJob,
)
.await?;
serde_json::from_reader(response_body.reader())
.map_err(NomadError::ResponseDeserializationFailed)
}
pub async fn read_job(&self, job_id: &str, namespace: Option<&str>) -> Result<Job> {
let uri = format!(
"{}/v1/job/{}?namespace={}",
self.config.address,
job_id,
namespace.unwrap_or(DEFAULT_NAMESPACE)
);
let response_body = self
.execute_request::<_, ()>(
|| hyper::Request::builder().method(Method::GET).uri(&uri),
None,
Function::ReadJob,
)
.await?;
serde_json::from_reader(response_body.reader())
.map_err(NomadError::ResponseDeserializationFailed)
}
pub async fn stop_job(
&self,
job_id: String,
purge: bool,
namespace: Option<&str>,
) -> Result<JobRegisterResponse> {
let uri = format!(
"{}/v1/job/{}?purge={}&namespace={}",
self.config.address,
job_id,
purge,
namespace.unwrap_or(DEFAULT_NAMESPACE)
);
let response_body = self
.execute_request::<_, ()>(
|| hyper::Request::builder().method(Method::DELETE).uri(&uri),
None,
Function::StopJob,
)
.await?;
serde_json::from_reader(response_body.reader())
.map_err(NomadError::ResponseDeserializationFailed)
}
pub(crate) async fn execute_request<'a, R, B>(
&self,
request: R,
body: Option<B>,
_request_name: Function,
) -> Result<Box<dyn Buf>>
where
R: Fn() -> http::request::Builder,
B: Serialize,
{
let body = body
.map(|body| serde_json::to_string(&body).map_err(NomadError::InvalidRequest))
.transpose()?;
async fn do_request<R>(
client: &Nomad,
request: R,
body: Option<String>,
_request_name: Function,
) -> Result<Box<dyn Buf>>
where
R: Fn() -> http::request::Builder,
{
let body = match body {
Some(body) => BoxBody::new(Full::<Bytes>::new(Bytes::from(body))),
None => BoxBody::new(http_body_util::Empty::<Bytes>::new()),
};
let request = request()
.header(
"X-Nomad-Token",
client.config.token.clone().unwrap_or_default(),
)
.body(body)
.map_err(NomadError::RequestError)?;
#[cfg(feature = "trace")]
let mut span = crate::hyper_wrapper::span_for_request(&client.tracer, &request);
#[cfg(feature = "metrics")]
let mut metrics_info = MetricInfoWrapper::new(
request.method().clone().into(),
_request_name,
None,
client.metrics_tx.clone(),
);
let response =
tokio::time::timeout(client.timeout, client.https_client.request(request))
.await
.map_err(|_| {
#[cfg(feature = "metrics")]
{
metrics_info.set_status(StatusCode::REQUEST_TIMEOUT);
metrics_info.emit_metrics();
}
NomadError::Timeout(client.timeout)
})?
.map_err(NomadError::ResponseError)?;
let status = response.status();
#[cfg(feature = "trace")]
crate::hyper_wrapper::annotate_span_for_response(&mut span, &response);
if status != hyper::StatusCode::OK {
#[cfg(feature = "metrics")]
{
metrics_info.set_status(StatusCode::REQUEST_TIMEOUT);
metrics_info.emit_metrics();
}
let mut response_body = response
.into_body()
.collect()
.await
.map_err(|e| NomadError::UnexpectedResponseCode(status, e.to_string()))?
.aggregate();
let bytes = response_body.copy_to_bytes(response_body.remaining());
let resp = std::str::from_utf8(&bytes)
.map_err(|e| NomadError::UnexpectedResponseCode(status, e.to_string()))?;
return Err(NomadError::UnexpectedResponseCode(status, resp.to_string()));
}
match response
.into_body()
.collect()
.await
.map_err(NomadError::InvalidResponse)
.map(|b| b.aggregate())
{
Ok(body) => {
#[cfg(feature = "metrics")]
{
metrics_info.set_status(StatusCode::OK);
metrics_info.emit_metrics();
}
Ok(Box::new(body))
}
Err(e) => {
#[cfg(feature = "trace")]
span.set_status(Status::error(e.to_string()));
Err(e)
}
}
}
let retry_strategy = tokio_retry::strategy::FixedInterval::new(
self.config
.backoff_interval
.unwrap_or(DEFAULT_BACKOFF_INTERVAL),
)
.map(tokio_retry::strategy::jitter)
.take(self.config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES));
tokio_retry::RetryIf::spawn(
retry_strategy,
|| do_request(self, &request, body.clone(), _request_name),
RetryPolicy {},
)
.await
}
}
struct RetryPolicy {}
impl Condition<NomadError> for RetryPolicy {
fn should_retry(&mut self, error: &NomadError) -> bool {
match error {
NomadError::InvalidRequest(_) => false,
NomadError::RequestError(_) => false,
NomadError::ResponseError(_) => true,
NomadError::InvalidResponse(_) => true,
NomadError::ResponseDeserializationFailed(_) => false,
NomadError::UnexpectedResponseCode(code, _) if code.is_server_error() => true,
NomadError::UnexpectedResponseCode(_, _) => false,
NomadError::Timeout(_) => true,
NomadError::NativeRootsError(_) => false,
}
}
}
#[cfg(test)]
mod test {
use super::*;
use serial_test::serial;
use std::{collections::HashMap, time::Duration};
const DOCKER_GRPC_PORT: u16 = 1000;
const PORT_GRPC_NAME: &str = "redis";
const DOCKER_HTTP_PORT: u16 = 1001;
const PORT_DEBUG_NAME: &str = "debug";
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_add_job_plan() -> Result<()> {
let nomad_client = build_client();
let added_request = build_job_request(
"add_plan_job".to_string(),
"add_plan_id".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
let added_job = added_request.job;
let added_job_plan = nomad_client.create_job_plan(&added_job).await?;
assert_eq!(DiffType::Added, added_job_plan.diff.r#type);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_edit_job_plan() -> Result<()> {
let nomad_client = build_client();
let added_request = build_job_request(
"edit_plan_job".to_string(),
"edit_plan_id".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client.create_or_update_job(added_request).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
let changed_request = build_job_request(
"edit_plan_job".to_string(),
"edit_plan_id".to_string(),
0,
"us-central".to_string(),
"dc2".to_string(),
);
let changed_job = changed_request.job;
let job_plan = nomad_client.create_job_plan(&changed_job).await?;
assert_eq!(job_plan.diff.objects.len(), 1);
let diff_object = &job_plan.diff.objects[0];
assert_eq!(DiffType::Edited, job_plan.diff.r#type);
assert_eq!("Datacenters", diff_object.name);
let diff_fields = &diff_object.fields;
let mut old_diff_count = 0;
let mut new_diff_count = 0;
for diff_field in diff_fields {
if diff_field.r#type == DiffType::Added
&& diff_field.name.eq("Datacenters")
&& diff_field.old.is_empty()
&& diff_field.new.eq("dc2")
{
new_diff_count += 1;
}
if diff_field.r#type == DiffType::Deleted
&& diff_field.name.eq("Datacenters")
&& diff_field.old.eq("dc1")
&& diff_field.new.is_empty()
{
old_diff_count += 1;
}
}
assert_eq!(1, old_diff_count);
assert_eq!(1, new_diff_count);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_purge_true_job_plan() -> Result<()> {
let nomad_client = build_client();
let added_request = build_job_request(
"purge_true_plan_job".to_string(),
"purge_true_plan_id".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client.create_or_update_job(added_request).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
let request_copy = build_job_request(
"purge_true_plan_job".to_string(),
"purge_true_plan_id".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client
.stop_job("purge_true_plan_id".to_string(), true, Some("default"))
.await?;
let job_plan = nomad_client.create_job_plan(&request_copy.job).await?;
assert_eq!(DiffType::Added, job_plan.diff.r#type);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_purge_false_job_plan() -> Result<()> {
let nomad_client = build_client();
let added_request = build_job_request(
"purge_false_plan_job".to_string(),
"purge_false_plan_id".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client.create_or_update_job(added_request).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
let request_copy = build_job_request(
"purge_false_plan_job".to_string(),
"purge_false_plan_id".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client
.stop_job("purge_false_plan_id".to_string(), false, Some("default"))
.await?;
let job_plan = nomad_client.create_job_plan(&request_copy.job).await?;
assert_eq!(DiffType::Edited, job_plan.diff.r#type);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_retrieve_most_recent_job_deployment() -> Result<()> {
let nomad_client = build_client();
let request = build_job_request(
"deploy_job".to_string(),
"deploy_job_id".to_string(),
1,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client.create_or_update_job(request.clone()).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
let job = request.job;
let deployment_option = get_deployment("deploy_job_id", nomad_client).await?;
assert!(deployment_option.is_some());
let deployment = deployment_option.unwrap();
assert_ne!(deployment.id, "None".to_string());
assert_eq!(deployment.job_id, job.id);
assert_eq!(deployment.job_version, 0);
let change_request = build_job_request(
"deploy_job2".to_string(),
"deploy_job_id".to_string(),
1,
"us-central".to_string(),
"dc1".to_string(),
);
assert!(deployment.status.is_some());
nomad_client.create_or_update_job(change_request).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
let deployment_option = get_deployment("deploy_job_id", nomad_client).await?;
assert!(deployment_option.is_some());
let deployment: JobDeployment = deployment_option.unwrap();
assert_eq!(deployment.job_version, 1);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_stop_job() -> Result<()> {
let nomad_client = build_client();
let first_job_count = count_jobs().await?;
let request = build_job_request(
"stop_job".to_string(),
"stop_job_id".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client.create_or_update_job(request).await?;
nomad_client
.stop_job("stop_job_id".to_string(), true, Some("default"))
.await?;
let second_job_count = count_jobs().await?;
assert_eq!(first_job_count, second_job_count);
let request2 = build_job_request(
"stop_job2".to_string(),
"stop_job_id2".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client.create_or_update_job(request2).await?;
nomad_client
.stop_job("stop_job_id2".to_string(), false, Some("default"))
.await?;
let second_job_count = count_jobs().await?;
assert_eq!(first_job_count + 1, second_job_count);
let job: Job = nomad_client
.read_job("stop_job_id2", Some("default"))
.await?;
assert_eq!(job.name, "stop_job2".to_string());
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_update_jobs() -> Result<()> {
let nomad_client = build_client();
let request = build_job_request(
"update_job".to_string(),
"update_job_id".to_string(),
1,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client.create_or_update_job(request).await?;
let first_count = count_jobs().await?;
let job: Job = nomad_client
.read_job("update_job_id", Some("default"))
.await?;
assert_eq!(job.datacenters[0], "dc1".to_string());
let request = build_job_request(
"update_job".to_string(),
"update_job_id".to_string(),
1,
"us-central".to_string(),
"dc2".to_string(),
);
nomad_client.create_or_update_job(request).await?;
let second_count = count_jobs().await?;
let job: Job = nomad_client
.read_job("update_job_id", Some("default"))
.await?;
assert_eq!(job.datacenters[0], "dc2".to_string());
assert_eq!(first_count, second_count);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_list_jobs() -> Result<()> {
let nomad_client = build_client();
let first_job_count = count_jobs().await?;
let request = build_job_request(
"list_job".to_string(),
"list_job_id".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client.create_or_update_job(request).await?;
let second_job_count = count_jobs().await?;
let jobs = nomad_client.list_jobs("", None).await?.into_iter();
let mut found = false;
for job in jobs {
found = job.name == "list_job" || found;
}
assert!(found);
assert_eq!(first_job_count + 1, second_job_count);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_read_jobs() -> Result<()> {
let nomad_client = build_client();
let request = build_job_request(
"read_job".to_string(),
"read_job_id".to_string(),
0,
"us-central".to_string(),
"dc1".to_string(),
);
nomad_client.create_or_update_job(request).await?;
let job: Job = nomad_client
.read_job("read_job_id", Some("default"))
.await?;
assert_eq!(job.name, "read_job".to_string());
Ok(())
}
async fn count_jobs() -> Result<i32> {
let nomad_client = build_client();
let jobs = nomad_client.list_jobs("", None).await?.into_iter();
let mut job_count = 0;
for _ in jobs {
job_count += 1;
}
Ok(job_count)
}
fn build_client() -> &'static Nomad {
Box::leak(Box::new(Nomad::new(
Config::from_env(),
Duration::from_secs(10),
)))
}
async fn get_deployment(job_id: &str, nomad_client: &Nomad) -> Result<Option<JobDeployment>> {
let optional_deployment = nomad_client
.retrieve_most_recent_job_deployment(job_id, Some("default"))
.await?;
Ok(optional_deployment)
}
fn build_resources() -> Resources {
Resources {
cpu: CpuResource::Cores(1),
memory_mb: 256,
memory_max_mb: Some(1024),
networks: vec![NetworkConfig {
m_bits: 10,
dynamic_ports: vec![
Port {
label: "redis".to_string(),
..Default::default()
},
Port {
label: "debug".to_string(),
..Default::default()
},
],
..Default::default()
}],
devices: vec![],
}
}
fn build_task() -> Task {
let driver_config = build_driver_config();
let env_vars = HashMap::new();
let templates = vec![];
let task_resources = build_resources();
Task {
name: "redis".to_string(),
driver: "docker".to_string(),
config: Some(driver_config),
resources: task_resources,
env: Some(env_vars),
templates,
services: vec![Service {
name: "test-job".to_string(),
port_label: Some(PORT_GRPC_NAME.to_string()),
checks: vec![ConsulCheck {
address_mode: AddressMode::Host,
check_type: CheckType::Tcp,
port_label: PORT_GRPC_NAME.to_string(),
header: Some(HashMap::from([(
"Host".to_string(),
vec!["localhost".to_string()],
)])),
interval: Duration::from_secs(10),
timeout: Duration::from_secs(2),
..Default::default()
}],
..Default::default()
}],
restart_policy: RestartPolicy {
interval: Duration::from_secs(90),
delay: Duration::from_secs(5),
..Default::default()
},
log_config: LogConfig {
max_files: 10,
max_file_size_in_mb: 10,
},
vault: Some(Vault {
policies: vec!["secret".to_string()],
..Default::default()
}),
..Default::default()
}
}
fn build_sidecar_task() -> Task {
let driver_config = HashMap::from([(
"image".to_string(),
DriverConfig::String(format!("{}:{}", "busybox", "latest")),
)]);
let env_vars = HashMap::new();
let templates = vec![];
let task_resources = build_resources();
Task {
name: "sidecar".to_string(),
driver: "docker".to_string(),
config: Some(driver_config),
resources: task_resources,
env: Some(env_vars),
templates,
log_config: LogConfig {
max_files: 10,
max_file_size_in_mb: 10,
},
lifecycle: Some(Lifecycle {
hook: "prestart".to_string(),
sidecar: true,
}),
..Default::default()
}
}
fn build_driver_config() -> HashMap<String, DriverConfig> {
let mut driver_config = HashMap::new();
driver_config.insert(
"image".to_string(),
DriverConfig::String(format!("{}:{}", "bitnami/redis", "latest")),
);
let port_map = build_port_map();
driver_config.insert(
"port_map".to_string(),
DriverConfig::Vector(vec![DriverConfig::Map(port_map)]),
);
driver_config.insert(
"network_mode".to_string(),
DriverConfig::String("persistence".to_owned()),
);
driver_config
}
fn build_service_group(datacenter: String, job_name: String) -> TaskGroup {
let restart_policy = build_restart_policy();
let update_strategy = build_update_strategy();
TaskGroup {
name: format!(
"redis-{}-{}-{}-instance",
"Test",
datacenter.as_str(),
job_name
),
restart_policy: Some(restart_policy),
update: update_strategy,
count: 2,
ephemeral_disk: EphemeralDisk {
size_in_mb: 200,
..Default::default()
},
scaling: Some(Scaling {
enabled: false,
min: Some(1),
max: 5,
..Default::default()
}),
tasks: vec![build_task(), build_sidecar_task()],
..Default::default()
}
}
fn build_restart_policy() -> RestartPolicy {
RestartPolicy {
mode: RestartMode::Fail,
attempts: 1,
interval: Duration::from_secs(90),
delay: Duration::from_secs(5),
}
}
fn build_update_strategy() -> UpdateStrategy {
UpdateStrategy {
min_healthy_time: Duration::from_secs(10),
healthy_deadline: Duration::from_secs(120),
progress_deadline: Duration::from_secs(1200),
auto_revert: false,
stagger: Duration::from_secs(5),
max_parallel: 1,
..Default::default()
}
}
fn build_port_map() -> HashMap<String, DriverConfig> {
let mut port_map = HashMap::new();
port_map.insert(
PORT_GRPC_NAME.to_string(),
DriverConfig::Int(DOCKER_GRPC_PORT as i64),
);
port_map.insert(
PORT_DEBUG_NAME.to_string(),
DriverConfig::Int(DOCKER_HTTP_PORT as i64),
);
port_map
}
fn build_job(
job_name: String,
job_id: String,
job_region: String,
job_datacenter: String,
) -> Job {
let service_group = build_service_group(job_datacenter.clone(), job_name.clone());
Job {
namespace: "default".to_string(),
region: job_region,
datacenters: vec![job_datacenter],
id: job_id,
name: job_name,
job_type: JobType::Service,
meta: Some(build_meta()),
task_groups: vec![service_group],
constraints: vec![Constraint {
l_target: Some("${meta.storage}".to_string()),
operand: ConstraintOperator::NotEquals,
r_target: Some("portworx".to_string()),
}],
vault_token: Some("myroot".to_string()),
..Default::default()
}
}
fn build_job_request(
job_name: String,
job_id: String,
modify_index: u64,
region: String,
datacenter: String,
) -> JobRegisterRequest {
let new_job = build_job(job_name, job_id, region, datacenter);
JobRegisterRequest {
job: new_job,
enforce_index: false,
job_modify_index: modify_index,
..Default::default()
}
}
fn build_meta() -> HashMap<String, String> {
let mut result = HashMap::new();
result.insert("environment".to_string(), "Test".to_string());
result
}
}