rs_nomad/
lib.rs

1/*
2MIT License
3
4Copyright (c) 2023 Roblox
5
6Permission is hereby granted, free of charge, to any person obtaining a copy
7of this software and associated documentation files (the "Software"), to deal
8in the Software without restriction, including without limitation the rights
9to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10copies of the Software, and to permit persons to whom the Software is
11furnished to do so, subject to the following conditions:
12
13The above copyright notice and this permission notice shall be included in all
14copies or substantial portions of the Software.
15
16THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22SOFTWARE.
23*/
24
25//! nomad
26//!
27//! This crate provides access to a set of strongly typed apis to interact with nomad (https://www.nomadproject.io/)
28#![deny(missing_docs)]
29#![allow(clippy::unnecessary_wraps)] // TODO: Used in serilization code & required by serde?
30
31#[cfg(feature = "metrics")]
32use http::StatusCode;
33use http_body_util::combinators::BoxBody;
34use http_body_util::{BodyExt, Full};
35use hyper::body::Bytes;
36use hyper::{body::Buf, Method};
37use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
38use hyper_util::client::legacy::{connect::HttpConnector, Builder, Client};
39use serde::{Deserialize, Serialize};
40use std::convert::Infallible;
41use std::{env, time::Duration};
42use thiserror::Error;
43use tokio_retry::Condition;
44
45mod hyper_wrapper;
46mod serialization_helpers;
47/// The strongly typed data structures representing canonical nomad objects.
48pub mod types;
49
50#[cfg(feature = "metrics")]
51pub use metrics::MetricInfo;
52pub use metrics::{Function, HttpMethod};
53pub use types::*;
54
55mod metrics;
56
57#[cfg(feature = "metrics")]
58use metrics::MetricInfoWrapper;
59#[cfg(feature = "trace")]
60use opentelemetry::global;
61#[cfg(feature = "trace")]
62use opentelemetry::global::BoxedTracer;
63#[cfg(feature = "trace")]
64use opentelemetry::trace::Span;
65#[cfg(feature = "trace")]
66use opentelemetry::trace::Status;
67
68/// The error type returned from all calls into this this crate.
69#[derive(Debug, Error)]
70pub enum NomadError {
71    /// The request was invalid and could not be serialized to valid json.
72    #[error(transparent)]
73    InvalidRequest(serde_json::error::Error),
74    /// The request was invalid and could not be converted into a proper http request.
75    #[error(transparent)]
76    RequestError(http::Error),
77    /// The nomad server response could not be converted into a proper http response.
78    #[error(transparent)]
79    ResponseError(hyper_util::client::legacy::Error),
80    /// The nomad server response was invalid.
81    #[error(transparent)]
82    InvalidResponse(hyper::Error),
83    /// The nomad server response could not be deserialized from json.
84    #[error(transparent)]
85    ResponseDeserializationFailed(serde_json::error::Error),
86    /// The nomad server response was something other than 200. The status code and the body of the response are included.
87    #[error("status code: {0}, body {1}")]
88    UnexpectedResponseCode(hyper::http::StatusCode, String),
89    /// The nomad server failed to respond in the given timeout.
90    #[error("Failed to get a response from nomad in {} milliseconds", .0.as_millis())]
91    Timeout(Duration),
92    /// Failed to build a connector with native roots.
93    #[error("Failed to build a connector with native roots. {0}")]
94    NativeRootsError(std::io::Error),
95}
96
97const DEFAULT_NAMESPACE: &str = "default";
98const DEFAULT_MAX_RETRIES: usize = 3;
99const DEFAULT_BACKOFF_INTERVAL: std::time::Duration = std::time::Duration::from_millis(250);
100
101pub(crate) type Result<T> = std::result::Result<T, NomadError>;
102
103/// The config necessary to create a new nomad client.
104#[derive(Clone, Debug, Serialize, Deserialize)]
105pub struct Config {
106    /// The address of the nomad server. This must include the protocol to connect over eg. http or https.
107    pub address: String,
108    /// The nomad secret token to make authenticated requests to the nomad server.
109    pub token: Option<String>,
110    /// Override the number of retries upon non-200 status code. Defaults to 3.
111    pub max_retries: Option<usize>,
112    /// Override the duration base for the linear, fixed interval backoff upon non-200 status code. Defaults to 250ms.
113    pub backoff_interval: Option<std::time::Duration>,
114    /// The hyper_builder used to construct the inner http client.
115    #[serde(skip)]
116    #[serde(default = "default_builder")]
117    pub hyper_builder: hyper_util::client::legacy::Builder,
118}
119
120fn default_builder() -> Builder {
121    // https://github.com/hyperium/hyper/issues/2312
122    Builder::new(hyper_util::rt::TokioExecutor::new())
123        .pool_idle_timeout(std::time::Duration::from_millis(0))
124        .pool_max_idle_per_host(0)
125        .to_owned()
126}
127
128impl Default for Config {
129    fn default() -> Self {
130        Config {
131            address: "".to_string(),
132            token: None,
133            max_retries: None,
134            backoff_interval: None,
135            hyper_builder: default_builder(),
136        }
137    }
138}
139
140impl Config {
141    /// Obtains a [`Config`](nomad::Config) from environment variables.
142    /// Specifically, looks for `NOMAD_TOKEN` and `NOMAD_ADDR` as environment variables.
143    /// # Errors
144    /// Returns an [error](env::VarError) if either environment variable is missing.
145    pub fn from_env() -> Self {
146        let token = env::var("NOMAD_TOKEN").unwrap_or_default();
147        let addr = env::var("NOMAD_ADDR").unwrap_or_else(|_| "http://127.0.0.1:4646".to_string());
148
149        Config {
150            address: addr,
151            token: Some(token),
152            max_retries: None,
153            backoff_interval: None,
154            hyper_builder: default_builder(),
155        }
156    }
157}
158
159fn https_connector() -> HttpsConnector<HttpConnector> {
160    HttpsConnectorBuilder::new()
161        .with_webpki_roots()
162        .https_or_http()
163        .enable_http1()
164        .build()
165}
166
167#[derive(Debug)]
168/// This struct defines the nomad client and allows access to the nomad api via method syntax.
169pub struct Nomad {
170    https_client: Client<hyper_rustls::HttpsConnector<HttpConnector>, BoxBody<Bytes, Infallible>>,
171    config: Config,
172    timeout: Duration,
173    #[cfg(feature = "trace")]
174    tracer: BoxedTracer,
175    #[cfg(feature = "metrics")]
176    metrics_tx: tokio::sync::mpsc::UnboundedSender<MetricInfo>,
177    #[cfg(feature = "metrics")]
178    metrics_rx: Option<tokio::sync::mpsc::UnboundedReceiver<MetricInfo>>,
179}
180
181impl Nomad {
182    /// Creates a new instance of [`Nomad`](nomad::Nomad).
183    /// This is the entry point for this crate.
184    /// #Arguments:
185    /// - [Config](nomad::Config)
186    pub fn new(config: Config, timeout: Duration) -> Self {
187        let https = https_connector();
188        let https_client = config
189            .hyper_builder
190            .build::<_, BoxBody<Bytes, Infallible>>(https);
191
192        #[cfg(feature = "metrics")]
193        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricInfo>();
194        Nomad {
195            https_client,
196            config,
197            timeout,
198            #[cfg(feature = "trace")]
199            tracer: global::tracer("nomad"),
200            #[cfg(feature = "metrics")]
201            metrics_tx: tx,
202            #[cfg(feature = "metrics")]
203            metrics_rx: Some(rx),
204        }
205    }
206
207    #[cfg(feature = "metrics")]
208    /// Returns the metrics receiver for the consul client.
209    pub fn metrics_receiver(&mut self) -> Option<tokio::sync::mpsc::UnboundedReceiver<MetricInfo>> {
210        self.metrics_rx.take()
211    }
212
213    /// Lists a set of nomad jobs. See [nomad docs](https://www.nomadproject.io/api-docs/jobs#list-jobs)
214    /// # Arguments:
215    /// - prefix: The prefix to filter all jobs by.
216    /// - namespace: The namespace from which jobs are queried.
217    /// # Errors:
218    /// [NomadError](nomad::NomadError) describes all possible errors returned by this api.
219    pub async fn list_jobs(
220        &self,
221        prefix: &str,
222        namespace: Option<&str>,
223    ) -> Result<Vec<ConciseJob>> {
224        let uri = format!(
225            "{}/v1/jobs?prefix={}&namespace={}",
226            self.config.address,
227            prefix,
228            namespace.unwrap_or(DEFAULT_NAMESPACE),
229        );
230
231        let response_body = self
232            .execute_request::<_, ()>(
233                || hyper::Request::builder().method(Method::GET).uri(&uri),
234                None,
235                Function::ListJobs,
236            )
237            .await?;
238
239        serde_json::from_reader(response_body.reader())
240            .map_err(NomadError::ResponseDeserializationFailed)
241    }
242
243    /// Plans a deployment for a given job. See [nomad docs](https://www.nomadproject.io/api-docs/jobs#create-job-plan)
244    /// # Arguments:
245    /// - job: The new version of the job.
246    /// # Errors:
247    /// [NomadError](nomad::NomadError) describes all possible errors returned by this api.
248    pub async fn create_job_plan(&self, job: &Job) -> Result<CreateJobPlanResponse> {
249        let request_body = CreateJobPlanRequest {
250            job,
251            diff: true,
252            policy_override: false,
253        };
254
255        let response_body = self
256            .execute_request(
257                || {
258                    hyper::Request::builder().method(Method::POST).uri(format!(
259                        "{}/v1/job/{}/plan?namespace={}",
260                        self.config.address,
261                        job.id,
262                        if job.namespace.is_empty() {
263                            "default"
264                        } else {
265                            &job.namespace
266                        }
267                    ))
268                },
269                Some(request_body),
270                Function::CreateJobPlan,
271            )
272            .await?;
273        serde_json::from_reader(response_body.reader())
274            .map_err(NomadError::ResponseDeserializationFailed)
275    }
276
277    /// Retrieves a single job's most recent deployment. See [nomad docs](https://www.nomadproject.io/api-docs/jobs#read-job-s-most-recent-deployment)
278    /// # Arguments:
279    /// - job_id: The nomad job ID, represented as a String.
280    /// - namespace: The namespace from which jobs are queried.
281    /// # Errors:
282    /// [NomadError](nomad::NomadError) describes all possible errors returned by this api.
283    pub async fn retrieve_most_recent_job_deployment(
284        &self,
285        job_id: &str,
286        namespace: Option<&str>,
287    ) -> Result<Option<JobDeployment>> {
288        let uri = format!(
289            "{}/v1/job/{}/deployment?namespace={}",
290            self.config.address,
291            job_id,
292            namespace.unwrap_or(DEFAULT_NAMESPACE)
293        );
294
295        let response_body = self
296            .execute_request::<_, ()>(
297                || hyper::Request::builder().method(Method::GET).uri(&uri),
298                None,
299                Function::RetrieveMostRecentJobDeployment,
300            )
301            .await?;
302        serde_json::from_reader(response_body.reader())
303            .map_err(NomadError::ResponseDeserializationFailed)
304    }
305
306    /// Creates or updates a nomad job. See [nomad docs](https://www.nomadproject.io/api-docs/jobs#update-existing-job)
307    /// # Arguments:
308    /// - request: The nomad job request represented by [JobRegisterRequest](nomad::types::JobRegisterRequest)
309    /// # Errors:
310    /// [NomadError](nomad::NomadError) describes all possible errors returned by this api.
311    pub async fn create_or_update_job(
312        &self,
313        request: JobRegisterRequest,
314    ) -> Result<JobRegisterResponse> {
315        let uri = format!("{}/v1/job/{}", self.config.address, request.job.id);
316
317        let response_body = self
318            .execute_request(
319                || hyper::Request::builder().method(Method::POST).uri(&uri),
320                Some(request),
321                Function::CreateOrUpdateJob,
322            )
323            .await?;
324        serde_json::from_reader(response_body.reader())
325            .map_err(NomadError::ResponseDeserializationFailed)
326    }
327
328    /// Reads a nomad job by it's ID. See [nomad docs](https://www.nomadproject.io/api-docs/jobs#read-job)
329    /// # Arguments:
330    /// - job_id: The nomad job ID, represented as a String.
331    /// - namespace: The namespace from which jobs are queried.
332    /// # Errors:
333    /// [NomadError](nomad::NomadError) describes all possible errors returned by this api.
334    pub async fn read_job(&self, job_id: &str, namespace: Option<&str>) -> Result<Job> {
335        let uri = format!(
336            "{}/v1/job/{}?namespace={}",
337            self.config.address,
338            job_id,
339            namespace.unwrap_or(DEFAULT_NAMESPACE)
340        );
341
342        let response_body = self
343            .execute_request::<_, ()>(
344                || hyper::Request::builder().method(Method::GET).uri(&uri),
345                None,
346                Function::ReadJob,
347            )
348            .await?;
349        serde_json::from_reader(response_body.reader())
350            .map_err(NomadError::ResponseDeserializationFailed)
351    }
352
353    /// Stops a nomad job by it's ID. See [nomad docs](https://www.nomadproject.io/api-docs/jobs#stop-a-job)
354    /// # Arguments:
355    /// - job_id: The nomad job ID, represented as a String.
356    /// - purge: Whether to purge the job as part of this request.
357    /// - namespace: The namespace from which jobs are queried.
358    /// # Errors:
359    /// [NomadError](nomad::NomadError) describes all possible errors returned by this api.
360    pub async fn stop_job(
361        &self,
362        job_id: String,
363        purge: bool,
364        namespace: Option<&str>,
365    ) -> Result<JobRegisterResponse> {
366        let uri = format!(
367            "{}/v1/job/{}?purge={}&namespace={}",
368            self.config.address,
369            job_id,
370            purge,
371            namespace.unwrap_or(DEFAULT_NAMESPACE)
372        );
373
374        let response_body = self
375            .execute_request::<_, ()>(
376                || hyper::Request::builder().method(Method::DELETE).uri(&uri),
377                None,
378                Function::StopJob,
379            )
380            .await?;
381
382        serde_json::from_reader(response_body.reader())
383            .map_err(NomadError::ResponseDeserializationFailed)
384    }
385
386    pub(crate) async fn execute_request<R, B>(
387        &self,
388        request: R,
389        body: Option<B>,
390        _request_name: Function,
391    ) -> Result<Box<dyn Buf>>
392    where
393        R: Fn() -> http::request::Builder,
394        B: Serialize,
395    {
396        let body = body
397            .map(|body| serde_json::to_string(&body).map_err(NomadError::InvalidRequest))
398            .transpose()?;
399
400        async fn do_request<R>(
401            client: &Nomad,
402            request: R,
403            body: Option<String>,
404            _request_name: Function,
405        ) -> Result<Box<dyn Buf>>
406        where
407            R: Fn() -> http::request::Builder,
408        {
409            let body = match body {
410                Some(body) => BoxBody::new(Full::<Bytes>::new(Bytes::from(body))),
411                None => BoxBody::new(http_body_util::Empty::<Bytes>::new()),
412            };
413
414            let request = request()
415                .header(
416                    "X-Nomad-Token",
417                    client.config.token.clone().unwrap_or_default(),
418                )
419                .body(body)
420                .map_err(NomadError::RequestError)?;
421
422            #[cfg(feature = "trace")]
423            let mut span = crate::hyper_wrapper::span_for_request(&client.tracer, &request);
424
425            #[cfg(feature = "metrics")]
426            let mut metrics_info = MetricInfoWrapper::new(
427                request.method().clone().into(),
428                _request_name,
429                None,
430                client.metrics_tx.clone(),
431            );
432
433            let response =
434                tokio::time::timeout(client.timeout, client.https_client.request(request))
435                    .await
436                    .map_err(|_| {
437                        #[cfg(feature = "metrics")]
438                        {
439                            metrics_info.set_status(StatusCode::REQUEST_TIMEOUT);
440                            metrics_info.emit_metrics();
441                        }
442                        NomadError::Timeout(client.timeout)
443                    })?
444                    .map_err(NomadError::ResponseError)?;
445
446            let status = response.status();
447
448            #[cfg(feature = "trace")]
449            crate::hyper_wrapper::annotate_span_for_response(&mut span, &response);
450
451            if status != hyper::StatusCode::OK {
452                #[cfg(feature = "metrics")]
453                {
454                    metrics_info.set_status(StatusCode::REQUEST_TIMEOUT);
455                    metrics_info.emit_metrics();
456                }
457
458                let mut response_body = response
459                    .into_body()
460                    .collect()
461                    .await
462                    .map_err(|e| NomadError::UnexpectedResponseCode(status, e.to_string()))?
463                    .aggregate();
464                let bytes = response_body.copy_to_bytes(response_body.remaining());
465                let resp = std::str::from_utf8(&bytes)
466                    .map_err(|e| NomadError::UnexpectedResponseCode(status, e.to_string()))?;
467                return Err(NomadError::UnexpectedResponseCode(status, resp.to_string()));
468            }
469
470            match response
471                .into_body()
472                .collect()
473                .await
474                .map_err(NomadError::InvalidResponse)
475                .map(|b| b.aggregate())
476            {
477                Ok(body) => {
478                    #[cfg(feature = "metrics")]
479                    {
480                        metrics_info.set_status(StatusCode::OK);
481                        metrics_info.emit_metrics();
482                    }
483                    Ok(Box::new(body))
484                }
485                Err(e) => {
486                    #[cfg(feature = "trace")]
487                    span.set_status(Status::error(e.to_string()));
488                    Err(e)
489                }
490            }
491        }
492
493        let retry_strategy = tokio_retry::strategy::FixedInterval::new(
494            self.config
495                .backoff_interval
496                .unwrap_or(DEFAULT_BACKOFF_INTERVAL),
497        )
498        .map(tokio_retry::strategy::jitter)
499        .take(self.config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES));
500
501        tokio_retry::RetryIf::spawn(
502            retry_strategy,
503            || do_request(self, &request, body.clone(), _request_name),
504            RetryPolicy {},
505        )
506        .await
507    }
508}
509
510struct RetryPolicy {}
511
512impl Condition<NomadError> for RetryPolicy {
513    fn should_retry(&mut self, error: &NomadError) -> bool {
514        match error {
515            NomadError::InvalidRequest(_) => false,
516            NomadError::RequestError(_) => false,
517            NomadError::ResponseError(_) => true,
518            NomadError::InvalidResponse(_) => true,
519            NomadError::ResponseDeserializationFailed(_) => false,
520            NomadError::UnexpectedResponseCode(code, _) if code.is_server_error() => true,
521            NomadError::UnexpectedResponseCode(_, _) => false,
522            NomadError::Timeout(_) => true,
523            NomadError::NativeRootsError(_) => false,
524        }
525    }
526}
527
528#[cfg(test)]
529mod test {
530    use super::*;
531
532    use serial_test::serial;
533    use std::{collections::HashMap, time::Duration};
534
535    // The port within the docker container that the GRPC server is hosted at.
536    const DOCKER_GRPC_PORT: u16 = 1000;
537    // The name used for the grpc port in the nomad job spec.
538    const PORT_GRPC_NAME: &str = "redis";
539
540    // The port within the docker container that the HTTP server is hosted at.
541    const DOCKER_HTTP_PORT: u16 = 1001;
542    // The name used for the debug port in the nomad job spec.
543    const PORT_DEBUG_NAME: &str = "debug";
544
545    #[tokio::test(flavor = "current_thread")]
546    #[serial]
547    async fn test_add_job_plan() -> Result<()> {
548        let nomad_client = build_client();
549        let added_request = build_job_request(
550            "add_plan_job".to_string(),
551            "add_plan_id".to_string(),
552            0,
553            "us-central".to_string(),
554            "dc1".to_string(),
555        );
556        let added_job = added_request.job;
557        let added_job_plan = nomad_client.create_job_plan(&added_job).await?;
558        assert_eq!(DiffType::Added, added_job_plan.diff.r#type);
559        Ok(())
560    }
561
562    #[tokio::test(flavor = "current_thread")]
563    #[serial]
564    async fn test_edit_job_plan() -> Result<()> {
565        let nomad_client = build_client();
566        let added_request = build_job_request(
567            "edit_plan_job".to_string(),
568            "edit_plan_id".to_string(),
569            0,
570            "us-central".to_string(),
571            "dc1".to_string(),
572        );
573        nomad_client.create_or_update_job(added_request).await?;
574        tokio::time::sleep(Duration::from_secs(5)).await;
575        let changed_request = build_job_request(
576            "edit_plan_job".to_string(),
577            "edit_plan_id".to_string(),
578            0,
579            "us-central".to_string(),
580            "dc2".to_string(),
581        );
582        let changed_job = changed_request.job;
583        let job_plan = nomad_client.create_job_plan(&changed_job).await?;
584        assert_eq!(job_plan.diff.objects.len(), 1);
585        let diff_object = &job_plan.diff.objects[0];
586        assert_eq!(DiffType::Edited, job_plan.diff.r#type);
587        assert_eq!("Datacenters", diff_object.name);
588        let diff_fields = &diff_object.fields;
589        let mut old_diff_count = 0;
590        let mut new_diff_count = 0;
591        // Order is indeterminate so make sure one fieldDiff deletes old datacenter and one fieldDiff adds new datacenter
592        for diff_field in diff_fields {
593            if diff_field.r#type == DiffType::Added
594                && diff_field.name.eq("Datacenters")
595                && diff_field.old.is_empty()
596                && diff_field.new.eq("dc2")
597            {
598                new_diff_count += 1;
599            }
600            if diff_field.r#type == DiffType::Deleted
601                && diff_field.name.eq("Datacenters")
602                && diff_field.old.eq("dc1")
603                && diff_field.new.is_empty()
604            {
605                old_diff_count += 1;
606            }
607        }
608        // There should only be exactly one of each operation
609        assert_eq!(1, old_diff_count);
610        assert_eq!(1, new_diff_count);
611
612        Ok(())
613    }
614
615    #[tokio::test(flavor = "current_thread")]
616    #[serial]
617    async fn test_purge_true_job_plan() -> Result<()> {
618        let nomad_client = build_client();
619        let added_request = build_job_request(
620            "purge_true_plan_job".to_string(),
621            "purge_true_plan_id".to_string(),
622            0,
623            "us-central".to_string(),
624            "dc1".to_string(),
625        );
626        nomad_client.create_or_update_job(added_request).await?;
627        tokio::time::sleep(Duration::from_secs(5)).await;
628        let request_copy = build_job_request(
629            "purge_true_plan_job".to_string(),
630            "purge_true_plan_id".to_string(),
631            0,
632            "us-central".to_string(),
633            "dc1".to_string(),
634        );
635        nomad_client
636            .stop_job("purge_true_plan_id".to_string(), true, Some("default"))
637            .await?;
638        let job_plan = nomad_client.create_job_plan(&request_copy.job).await?;
639        assert_eq!(DiffType::Added, job_plan.diff.r#type);
640        Ok(())
641    }
642
643    #[tokio::test(flavor = "current_thread")]
644    #[serial]
645    async fn test_purge_false_job_plan() -> Result<()> {
646        let nomad_client = build_client();
647        let added_request = build_job_request(
648            "purge_false_plan_job".to_string(),
649            "purge_false_plan_id".to_string(),
650            0,
651            "us-central".to_string(),
652            "dc1".to_string(),
653        );
654        nomad_client.create_or_update_job(added_request).await?;
655        tokio::time::sleep(Duration::from_secs(5)).await;
656        let request_copy = build_job_request(
657            "purge_false_plan_job".to_string(),
658            "purge_false_plan_id".to_string(),
659            0,
660            "us-central".to_string(),
661            "dc1".to_string(),
662        );
663        nomad_client
664            .stop_job("purge_false_plan_id".to_string(), false, Some("default"))
665            .await?;
666        let job_plan = nomad_client.create_job_plan(&request_copy.job).await?;
667        assert_eq!(DiffType::Edited, job_plan.diff.r#type);
668        Ok(())
669    }
670
671    #[tokio::test(flavor = "current_thread")]
672    #[serial]
673    async fn test_retrieve_most_recent_job_deployment() -> Result<()> {
674        let nomad_client = build_client();
675        let request = build_job_request(
676            "deploy_job".to_string(),
677            "deploy_job_id".to_string(),
678            1,
679            "us-central".to_string(),
680            "dc1".to_string(),
681        );
682
683        nomad_client.create_or_update_job(request.clone()).await?;
684        // Wait 5 seconds for job to finish deploying
685        tokio::time::sleep(Duration::from_secs(5)).await;
686        let job = request.job;
687        let deployment_option = get_deployment("deploy_job_id", nomad_client).await?;
688        assert!(deployment_option.is_some());
689
690        let deployment = deployment_option.unwrap();
691
692        assert_ne!(deployment.id, "None".to_string());
693        assert_eq!(deployment.job_id, job.id);
694        // first iteration of job, should be 0 because job has not been updated
695        assert_eq!(deployment.job_version, 0);
696        let change_request = build_job_request(
697            "deploy_job2".to_string(),
698            "deploy_job_id".to_string(),
699            1,
700            "us-central".to_string(),
701            "dc1".to_string(),
702        );
703
704        assert!(deployment.status.is_some());
705        nomad_client.create_or_update_job(change_request).await?;
706        tokio::time::sleep(Duration::from_secs(5)).await;
707        let deployment_option = get_deployment("deploy_job_id", nomad_client).await?;
708        assert!(deployment_option.is_some());
709        let deployment: JobDeployment = deployment_option.unwrap();
710        // second iteration of job, job_version should be 1 because job has been updated 1 time
711        assert_eq!(deployment.job_version, 1);
712
713        Ok(())
714    }
715
716    #[tokio::test(flavor = "current_thread")]
717    #[serial]
718    // Tests whether the job actually stops when calling stop_job() and if purge is set to True, job should be removed
719    async fn test_stop_job() -> Result<()> {
720        let nomad_client = build_client();
721        let first_job_count = count_jobs().await?;
722
723        let request = build_job_request(
724            "stop_job".to_string(),
725            "stop_job_id".to_string(),
726            0,
727            "us-central".to_string(),
728            "dc1".to_string(),
729        );
730        nomad_client.create_or_update_job(request).await?;
731        nomad_client
732            .stop_job("stop_job_id".to_string(), true, Some("default"))
733            .await?;
734        let second_job_count = count_jobs().await?;
735        assert_eq!(first_job_count, second_job_count);
736        let request2 = build_job_request(
737            "stop_job2".to_string(),
738            "stop_job_id2".to_string(),
739            0,
740            "us-central".to_string(),
741            "dc1".to_string(),
742        );
743        nomad_client.create_or_update_job(request2).await?;
744        nomad_client
745            .stop_job("stop_job_id2".to_string(), false, Some("default"))
746            .await?;
747        let second_job_count = count_jobs().await?;
748        assert_eq!(first_job_count + 1, second_job_count);
749        let job: Job = nomad_client
750            .read_job("stop_job_id2", Some("default"))
751            .await?;
752        assert_eq!(job.name, "stop_job2".to_string());
753
754        Ok(())
755    }
756
757    #[tokio::test(flavor = "current_thread")]
758    #[serial]
759    // Tests if calling create_or_update_job on an already created job modifies but doesn't create a new job.
760    async fn test_update_jobs() -> Result<()> {
761        let nomad_client = build_client();
762
763        let request = build_job_request(
764            "update_job".to_string(),
765            "update_job_id".to_string(),
766            1,
767            "us-central".to_string(),
768            "dc1".to_string(),
769        );
770        nomad_client.create_or_update_job(request).await?;
771        let first_count = count_jobs().await?;
772        let job: Job = nomad_client
773            .read_job("update_job_id", Some("default"))
774            .await?;
775        assert_eq!(job.datacenters[0], "dc1".to_string());
776        let request = build_job_request(
777            "update_job".to_string(),
778            "update_job_id".to_string(),
779            1,
780            "us-central".to_string(),
781            "dc2".to_string(),
782        );
783        nomad_client.create_or_update_job(request).await?;
784
785        let second_count = count_jobs().await?;
786        let job: Job = nomad_client
787            .read_job("update_job_id", Some("default"))
788            .await?;
789        assert_eq!(job.datacenters[0], "dc2".to_string());
790        assert_eq!(first_count, second_count);
791        Ok(())
792    }
793
794    #[tokio::test(flavor = "current_thread")]
795    #[serial]
796    // Tests if list_job() lists out all the jobs created
797    async fn test_list_jobs() -> Result<()> {
798        let nomad_client = build_client();
799        let first_job_count = count_jobs().await?;
800
801        let request = build_job_request(
802            "list_job".to_string(),
803            "list_job_id".to_string(),
804            0,
805            "us-central".to_string(),
806            "dc1".to_string(),
807        );
808        nomad_client.create_or_update_job(request).await?;
809        let second_job_count = count_jobs().await?;
810        let jobs = nomad_client.list_jobs("", None).await?.into_iter();
811        let mut found = false;
812        for job in jobs {
813            found = job.name == "list_job" || found;
814        }
815        assert!(found);
816        assert_eq!(first_job_count + 1, second_job_count);
817        Ok(())
818    }
819
820    #[tokio::test(flavor = "current_thread")]
821    #[serial]
822    // tests if read_jobs() can return the metadata related to a certain job.
823    async fn test_read_jobs() -> Result<()> {
824        let nomad_client = build_client();
825        let request = build_job_request(
826            "read_job".to_string(),
827            "read_job_id".to_string(),
828            0,
829            "us-central".to_string(),
830            "dc1".to_string(),
831        );
832        nomad_client.create_or_update_job(request).await?;
833        let job: Job = nomad_client
834            .read_job("read_job_id", Some("default"))
835            .await?;
836        assert_eq!(job.name, "read_job".to_string());
837        Ok(())
838    }
839
840    async fn count_jobs() -> Result<i32> {
841        let nomad_client = build_client();
842        let jobs = nomad_client.list_jobs("", None).await?.into_iter();
843        let mut job_count = 0;
844        for _ in jobs {
845            job_count += 1;
846        }
847        Ok(job_count)
848    }
849
850    fn build_client() -> &'static Nomad {
851        Box::leak(Box::new(Nomad::new(
852            Config::from_env(),
853            Duration::from_secs(10),
854        )))
855    }
856
857    async fn get_deployment(job_id: &str, nomad_client: &Nomad) -> Result<Option<JobDeployment>> {
858        let optional_deployment = nomad_client
859            .retrieve_most_recent_job_deployment(job_id, Some("default"))
860            .await?;
861        Ok(optional_deployment)
862    }
863
864    fn build_resources() -> Resources {
865        Resources {
866            cpu: CpuResource::Cores(1),
867            memory_mb: 256,
868            memory_max_mb: Some(1024),
869            networks: vec![NetworkConfig {
870                m_bits: 10,
871                dynamic_ports: vec![
872                    Port {
873                        label: "redis".to_string(),
874
875                        ..Default::default()
876                    },
877                    Port {
878                        label: "debug".to_string(),
879
880                        ..Default::default()
881                    },
882                ],
883
884                ..Default::default()
885            }],
886            devices: vec![],
887        }
888    }
889
890    fn build_task() -> Task {
891        let driver_config = build_driver_config();
892        let env_vars = HashMap::new();
893        let templates = vec![];
894        let task_resources = build_resources();
895        Task {
896            name: "redis".to_string(),
897            driver: "docker".to_string(),
898            config: Some(driver_config),
899
900            resources: task_resources,
901
902            env: Some(env_vars),
903
904            templates,
905
906            services: vec![Service {
907                name: "test-job".to_string(),
908                port_label: Some(PORT_GRPC_NAME.to_string()),
909                checks: vec![ConsulCheck {
910                    address_mode: AddressMode::Host,
911                    check_type: CheckType::Tcp,
912                    port_label: PORT_GRPC_NAME.to_string(),
913                    header: Some(HashMap::from([(
914                        "Host".to_string(),
915                        vec!["localhost".to_string()],
916                    )])),
917                    interval: Duration::from_secs(10),
918                    timeout: Duration::from_secs(2),
919                    ..Default::default()
920                }],
921
922                ..Default::default()
923            }],
924
925            restart_policy: RestartPolicy {
926                interval: Duration::from_secs(90),
927                delay: Duration::from_secs(5),
928                ..Default::default()
929            },
930
931            log_config: LogConfig {
932                max_files: 10,
933                max_file_size_in_mb: 10,
934            },
935
936            vault: Some(Vault {
937                policies: vec!["secret".to_string()],
938                ..Default::default()
939            }),
940            ..Default::default()
941        }
942    }
943
944    fn build_sidecar_task() -> Task {
945        let driver_config = HashMap::from([(
946            "image".to_string(),
947            DriverConfig::String(format!("{}:{}", "busybox", "latest")),
948        )]);
949
950        let env_vars = HashMap::new();
951        let templates = vec![];
952        let task_resources = build_resources();
953        Task {
954            name: "sidecar".to_string(),
955            driver: "docker".to_string(),
956            config: Some(driver_config),
957
958            resources: task_resources,
959
960            env: Some(env_vars),
961
962            templates,
963
964            log_config: LogConfig {
965                max_files: 10,
966                max_file_size_in_mb: 10,
967            },
968
969            lifecycle: Some(Lifecycle {
970                hook: "prestart".to_string(),
971                sidecar: true,
972            }),
973
974            ..Default::default()
975        }
976    }
977
978    fn build_driver_config() -> HashMap<String, DriverConfig> {
979        let mut driver_config = HashMap::new();
980        driver_config.insert(
981            "image".to_string(),
982            DriverConfig::String(format!("{}:{}", "bitnami/redis", "latest")),
983        );
984
985        let port_map = build_port_map();
986
987        driver_config.insert(
988            "port_map".to_string(),
989            DriverConfig::Vector(vec![DriverConfig::Map(port_map)]),
990        );
991
992        // If we are running in a docker-compose setup, then we want to have our docker containers join the same docker network,
993        // that is used by the infrastructure generated by the core-compose.yml setup.
994        driver_config.insert(
995            "network_mode".to_string(),
996            DriverConfig::String("persistence".to_owned()),
997        );
998        driver_config
999    }
1000
1001    fn build_service_group(datacenter: String, job_name: String) -> TaskGroup {
1002        let restart_policy = build_restart_policy();
1003        let update_strategy = build_update_strategy();
1004        TaskGroup {
1005            name: format!(
1006                "redis-{}-{}-{}-instance",
1007                "Test",
1008                datacenter.as_str(),
1009                job_name
1010            ),
1011
1012            restart_policy: Some(restart_policy),
1013
1014            update: update_strategy,
1015
1016            count: 2,
1017
1018            ephemeral_disk: EphemeralDisk {
1019                size_in_mb: 200,
1020                ..Default::default()
1021            },
1022
1023            scaling: Some(Scaling {
1024                enabled: false,
1025                min: Some(1),
1026                max: 5,
1027                ..Default::default()
1028            }),
1029
1030            tasks: vec![build_task(), build_sidecar_task()],
1031
1032            ..Default::default()
1033        }
1034    }
1035
1036    fn build_restart_policy() -> RestartPolicy {
1037        RestartPolicy {
1038            // This needs to be set to Fail as only an failed allocation will stop & be rescheduled to another nomad client.
1039            mode: RestartMode::Fail,
1040            attempts: 1,
1041            interval: Duration::from_secs(90),
1042            delay: Duration::from_secs(5),
1043        }
1044    }
1045
1046    fn build_update_strategy() -> UpdateStrategy {
1047        UpdateStrategy {
1048            min_healthy_time: Duration::from_secs(10),
1049            healthy_deadline: Duration::from_secs(120),
1050            progress_deadline: Duration::from_secs(1200),
1051            auto_revert: false,
1052            stagger: Duration::from_secs(5),
1053            max_parallel: 1,
1054            ..Default::default()
1055        }
1056    }
1057
1058    fn build_port_map() -> HashMap<String, DriverConfig> {
1059        let mut port_map = HashMap::new();
1060        port_map.insert(
1061            PORT_GRPC_NAME.to_string(),
1062            DriverConfig::Int(DOCKER_GRPC_PORT as i64),
1063        );
1064        port_map.insert(
1065            PORT_DEBUG_NAME.to_string(),
1066            DriverConfig::Int(DOCKER_HTTP_PORT as i64),
1067        );
1068        port_map
1069    }
1070
1071    fn build_job(
1072        job_name: String,
1073        job_id: String,
1074        job_region: String,
1075        job_datacenter: String,
1076    ) -> Job {
1077        let service_group = build_service_group(job_datacenter.clone(), job_name.clone());
1078
1079        Job {
1080            namespace: "default".to_string(),
1081            region: job_region,
1082            datacenters: vec![job_datacenter],
1083            id: job_id,
1084            name: job_name,
1085            job_type: JobType::Service,
1086            meta: Some(build_meta()),
1087            task_groups: vec![service_group],
1088
1089            constraints: vec![Constraint {
1090                l_target: Some("${meta.storage}".to_string()),
1091                operand: ConstraintOperator::NotEquals,
1092                r_target: Some("portworx".to_string()),
1093            }],
1094
1095            vault_token: Some("myroot".to_string()),
1096            ..Default::default()
1097        }
1098    }
1099
1100    fn build_job_request(
1101        job_name: String,
1102        job_id: String,
1103        modify_index: u64,
1104        region: String,
1105        datacenter: String,
1106    ) -> JobRegisterRequest {
1107        let new_job = build_job(job_name, job_id, region, datacenter);
1108        JobRegisterRequest {
1109            job: new_job,
1110            enforce_index: false,
1111            job_modify_index: modify_index,
1112            ..Default::default()
1113        }
1114    }
1115
1116    fn build_meta() -> HashMap<String, String> {
1117        let mut result = HashMap::new();
1118        result.insert("environment".to_string(), "Test".to_string());
1119        result
1120    }
1121}