1#![deny(missing_docs)]
29#![allow(clippy::unnecessary_wraps)] #[cfg(feature = "metrics")]
32use http::StatusCode;
33use http_body_util::combinators::BoxBody;
34use http_body_util::{BodyExt, Full};
35use hyper::body::Bytes;
36use hyper::{body::Buf, Method};
37use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
38use hyper_util::client::legacy::{connect::HttpConnector, Builder, Client};
39use serde::{Deserialize, Serialize};
40use std::convert::Infallible;
41use std::{env, time::Duration};
42use thiserror::Error;
43use tokio_retry::Condition;
44
45mod hyper_wrapper;
46mod serialization_helpers;
47pub mod types;
49
50#[cfg(feature = "metrics")]
51pub use metrics::MetricInfo;
52pub use metrics::{Function, HttpMethod};
53pub use types::*;
54
55mod metrics;
56
57#[cfg(feature = "metrics")]
58use metrics::MetricInfoWrapper;
59#[cfg(feature = "trace")]
60use opentelemetry::global;
61#[cfg(feature = "trace")]
62use opentelemetry::global::BoxedTracer;
63#[cfg(feature = "trace")]
64use opentelemetry::trace::Span;
65#[cfg(feature = "trace")]
66use opentelemetry::trace::Status;
67
68#[derive(Debug, Error)]
70pub enum NomadError {
71 #[error(transparent)]
73 InvalidRequest(serde_json::error::Error),
74 #[error(transparent)]
76 RequestError(http::Error),
77 #[error(transparent)]
79 ResponseError(hyper_util::client::legacy::Error),
80 #[error(transparent)]
82 InvalidResponse(hyper::Error),
83 #[error(transparent)]
85 ResponseDeserializationFailed(serde_json::error::Error),
86 #[error("status code: {0}, body {1}")]
88 UnexpectedResponseCode(hyper::http::StatusCode, String),
89 #[error("Failed to get a response from nomad in {} milliseconds", .0.as_millis())]
91 Timeout(Duration),
92 #[error("Failed to build a connector with native roots. {0}")]
94 NativeRootsError(std::io::Error),
95}
96
97const DEFAULT_NAMESPACE: &str = "default";
98const DEFAULT_MAX_RETRIES: usize = 3;
99const DEFAULT_BACKOFF_INTERVAL: std::time::Duration = std::time::Duration::from_millis(250);
100
101pub(crate) type Result<T> = std::result::Result<T, NomadError>;
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
105pub struct Config {
106 pub address: String,
108 pub token: Option<String>,
110 pub max_retries: Option<usize>,
112 pub backoff_interval: Option<std::time::Duration>,
114 #[serde(skip)]
116 #[serde(default = "default_builder")]
117 pub hyper_builder: hyper_util::client::legacy::Builder,
118}
119
120fn default_builder() -> Builder {
121 Builder::new(hyper_util::rt::TokioExecutor::new())
123 .pool_idle_timeout(std::time::Duration::from_millis(0))
124 .pool_max_idle_per_host(0)
125 .to_owned()
126}
127
128impl Default for Config {
129 fn default() -> Self {
130 Config {
131 address: "".to_string(),
132 token: None,
133 max_retries: None,
134 backoff_interval: None,
135 hyper_builder: default_builder(),
136 }
137 }
138}
139
140impl Config {
141 pub fn from_env() -> Self {
146 let token = env::var("NOMAD_TOKEN").unwrap_or_default();
147 let addr = env::var("NOMAD_ADDR").unwrap_or_else(|_| "http://127.0.0.1:4646".to_string());
148
149 Config {
150 address: addr,
151 token: Some(token),
152 max_retries: None,
153 backoff_interval: None,
154 hyper_builder: default_builder(),
155 }
156 }
157}
158
159fn https_connector() -> HttpsConnector<HttpConnector> {
160 HttpsConnectorBuilder::new()
161 .with_webpki_roots()
162 .https_or_http()
163 .enable_http1()
164 .build()
165}
166
167#[derive(Debug)]
168pub struct Nomad {
170 https_client: Client<hyper_rustls::HttpsConnector<HttpConnector>, BoxBody<Bytes, Infallible>>,
171 config: Config,
172 timeout: Duration,
173 #[cfg(feature = "trace")]
174 tracer: BoxedTracer,
175 #[cfg(feature = "metrics")]
176 metrics_tx: tokio::sync::mpsc::UnboundedSender<MetricInfo>,
177 #[cfg(feature = "metrics")]
178 metrics_rx: Option<tokio::sync::mpsc::UnboundedReceiver<MetricInfo>>,
179}
180
181impl Nomad {
182 pub fn new(config: Config, timeout: Duration) -> Self {
187 let https = https_connector();
188 let https_client = config
189 .hyper_builder
190 .build::<_, BoxBody<Bytes, Infallible>>(https);
191
192 #[cfg(feature = "metrics")]
193 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricInfo>();
194 Nomad {
195 https_client,
196 config,
197 timeout,
198 #[cfg(feature = "trace")]
199 tracer: global::tracer("nomad"),
200 #[cfg(feature = "metrics")]
201 metrics_tx: tx,
202 #[cfg(feature = "metrics")]
203 metrics_rx: Some(rx),
204 }
205 }
206
207 #[cfg(feature = "metrics")]
208 pub fn metrics_receiver(&mut self) -> Option<tokio::sync::mpsc::UnboundedReceiver<MetricInfo>> {
210 self.metrics_rx.take()
211 }
212
213 pub async fn list_jobs(
220 &self,
221 prefix: &str,
222 namespace: Option<&str>,
223 ) -> Result<Vec<ConciseJob>> {
224 let uri = format!(
225 "{}/v1/jobs?prefix={}&namespace={}",
226 self.config.address,
227 prefix,
228 namespace.unwrap_or(DEFAULT_NAMESPACE),
229 );
230
231 let response_body = self
232 .execute_request::<_, ()>(
233 || hyper::Request::builder().method(Method::GET).uri(&uri),
234 None,
235 Function::ListJobs,
236 )
237 .await?;
238
239 serde_json::from_reader(response_body.reader())
240 .map_err(NomadError::ResponseDeserializationFailed)
241 }
242
243 pub async fn create_job_plan(&self, job: &Job) -> Result<CreateJobPlanResponse> {
249 let request_body = CreateJobPlanRequest {
250 job,
251 diff: true,
252 policy_override: false,
253 };
254
255 let response_body = self
256 .execute_request(
257 || {
258 hyper::Request::builder().method(Method::POST).uri(format!(
259 "{}/v1/job/{}/plan?namespace={}",
260 self.config.address,
261 job.id,
262 if job.namespace.is_empty() {
263 "default"
264 } else {
265 &job.namespace
266 }
267 ))
268 },
269 Some(request_body),
270 Function::CreateJobPlan,
271 )
272 .await?;
273 serde_json::from_reader(response_body.reader())
274 .map_err(NomadError::ResponseDeserializationFailed)
275 }
276
277 pub async fn retrieve_most_recent_job_deployment(
284 &self,
285 job_id: &str,
286 namespace: Option<&str>,
287 ) -> Result<Option<JobDeployment>> {
288 let uri = format!(
289 "{}/v1/job/{}/deployment?namespace={}",
290 self.config.address,
291 job_id,
292 namespace.unwrap_or(DEFAULT_NAMESPACE)
293 );
294
295 let response_body = self
296 .execute_request::<_, ()>(
297 || hyper::Request::builder().method(Method::GET).uri(&uri),
298 None,
299 Function::RetrieveMostRecentJobDeployment,
300 )
301 .await?;
302 serde_json::from_reader(response_body.reader())
303 .map_err(NomadError::ResponseDeserializationFailed)
304 }
305
306 pub async fn create_or_update_job(
312 &self,
313 request: JobRegisterRequest,
314 ) -> Result<JobRegisterResponse> {
315 let uri = format!("{}/v1/job/{}", self.config.address, request.job.id);
316
317 let response_body = self
318 .execute_request(
319 || hyper::Request::builder().method(Method::POST).uri(&uri),
320 Some(request),
321 Function::CreateOrUpdateJob,
322 )
323 .await?;
324 serde_json::from_reader(response_body.reader())
325 .map_err(NomadError::ResponseDeserializationFailed)
326 }
327
328 pub async fn read_job(&self, job_id: &str, namespace: Option<&str>) -> Result<Job> {
335 let uri = format!(
336 "{}/v1/job/{}?namespace={}",
337 self.config.address,
338 job_id,
339 namespace.unwrap_or(DEFAULT_NAMESPACE)
340 );
341
342 let response_body = self
343 .execute_request::<_, ()>(
344 || hyper::Request::builder().method(Method::GET).uri(&uri),
345 None,
346 Function::ReadJob,
347 )
348 .await?;
349 serde_json::from_reader(response_body.reader())
350 .map_err(NomadError::ResponseDeserializationFailed)
351 }
352
353 pub async fn stop_job(
361 &self,
362 job_id: String,
363 purge: bool,
364 namespace: Option<&str>,
365 ) -> Result<JobRegisterResponse> {
366 let uri = format!(
367 "{}/v1/job/{}?purge={}&namespace={}",
368 self.config.address,
369 job_id,
370 purge,
371 namespace.unwrap_or(DEFAULT_NAMESPACE)
372 );
373
374 let response_body = self
375 .execute_request::<_, ()>(
376 || hyper::Request::builder().method(Method::DELETE).uri(&uri),
377 None,
378 Function::StopJob,
379 )
380 .await?;
381
382 serde_json::from_reader(response_body.reader())
383 .map_err(NomadError::ResponseDeserializationFailed)
384 }
385
386 pub(crate) async fn execute_request<R, B>(
387 &self,
388 request: R,
389 body: Option<B>,
390 _request_name: Function,
391 ) -> Result<Box<dyn Buf>>
392 where
393 R: Fn() -> http::request::Builder,
394 B: Serialize,
395 {
396 let body = body
397 .map(|body| serde_json::to_string(&body).map_err(NomadError::InvalidRequest))
398 .transpose()?;
399
400 async fn do_request<R>(
401 client: &Nomad,
402 request: R,
403 body: Option<String>,
404 _request_name: Function,
405 ) -> Result<Box<dyn Buf>>
406 where
407 R: Fn() -> http::request::Builder,
408 {
409 let body = match body {
410 Some(body) => BoxBody::new(Full::<Bytes>::new(Bytes::from(body))),
411 None => BoxBody::new(http_body_util::Empty::<Bytes>::new()),
412 };
413
414 let request = request()
415 .header(
416 "X-Nomad-Token",
417 client.config.token.clone().unwrap_or_default(),
418 )
419 .body(body)
420 .map_err(NomadError::RequestError)?;
421
422 #[cfg(feature = "trace")]
423 let mut span = crate::hyper_wrapper::span_for_request(&client.tracer, &request);
424
425 #[cfg(feature = "metrics")]
426 let mut metrics_info = MetricInfoWrapper::new(
427 request.method().clone().into(),
428 _request_name,
429 None,
430 client.metrics_tx.clone(),
431 );
432
433 let response =
434 tokio::time::timeout(client.timeout, client.https_client.request(request))
435 .await
436 .map_err(|_| {
437 #[cfg(feature = "metrics")]
438 {
439 metrics_info.set_status(StatusCode::REQUEST_TIMEOUT);
440 metrics_info.emit_metrics();
441 }
442 NomadError::Timeout(client.timeout)
443 })?
444 .map_err(NomadError::ResponseError)?;
445
446 let status = response.status();
447
448 #[cfg(feature = "trace")]
449 crate::hyper_wrapper::annotate_span_for_response(&mut span, &response);
450
451 if status != hyper::StatusCode::OK {
452 #[cfg(feature = "metrics")]
453 {
454 metrics_info.set_status(StatusCode::REQUEST_TIMEOUT);
455 metrics_info.emit_metrics();
456 }
457
458 let mut response_body = response
459 .into_body()
460 .collect()
461 .await
462 .map_err(|e| NomadError::UnexpectedResponseCode(status, e.to_string()))?
463 .aggregate();
464 let bytes = response_body.copy_to_bytes(response_body.remaining());
465 let resp = std::str::from_utf8(&bytes)
466 .map_err(|e| NomadError::UnexpectedResponseCode(status, e.to_string()))?;
467 return Err(NomadError::UnexpectedResponseCode(status, resp.to_string()));
468 }
469
470 match response
471 .into_body()
472 .collect()
473 .await
474 .map_err(NomadError::InvalidResponse)
475 .map(|b| b.aggregate())
476 {
477 Ok(body) => {
478 #[cfg(feature = "metrics")]
479 {
480 metrics_info.set_status(StatusCode::OK);
481 metrics_info.emit_metrics();
482 }
483 Ok(Box::new(body))
484 }
485 Err(e) => {
486 #[cfg(feature = "trace")]
487 span.set_status(Status::error(e.to_string()));
488 Err(e)
489 }
490 }
491 }
492
493 let retry_strategy = tokio_retry::strategy::FixedInterval::new(
494 self.config
495 .backoff_interval
496 .unwrap_or(DEFAULT_BACKOFF_INTERVAL),
497 )
498 .map(tokio_retry::strategy::jitter)
499 .take(self.config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES));
500
501 tokio_retry::RetryIf::spawn(
502 retry_strategy,
503 || do_request(self, &request, body.clone(), _request_name),
504 RetryPolicy {},
505 )
506 .await
507 }
508}
509
510struct RetryPolicy {}
511
512impl Condition<NomadError> for RetryPolicy {
513 fn should_retry(&mut self, error: &NomadError) -> bool {
514 match error {
515 NomadError::InvalidRequest(_) => false,
516 NomadError::RequestError(_) => false,
517 NomadError::ResponseError(_) => true,
518 NomadError::InvalidResponse(_) => true,
519 NomadError::ResponseDeserializationFailed(_) => false,
520 NomadError::UnexpectedResponseCode(code, _) if code.is_server_error() => true,
521 NomadError::UnexpectedResponseCode(_, _) => false,
522 NomadError::Timeout(_) => true,
523 NomadError::NativeRootsError(_) => false,
524 }
525 }
526}
527
528#[cfg(test)]
529mod test {
530 use super::*;
531
532 use serial_test::serial;
533 use std::{collections::HashMap, time::Duration};
534
535 const DOCKER_GRPC_PORT: u16 = 1000;
537 const PORT_GRPC_NAME: &str = "redis";
539
540 const DOCKER_HTTP_PORT: u16 = 1001;
542 const PORT_DEBUG_NAME: &str = "debug";
544
545 #[tokio::test(flavor = "current_thread")]
546 #[serial]
547 async fn test_add_job_plan() -> Result<()> {
548 let nomad_client = build_client();
549 let added_request = build_job_request(
550 "add_plan_job".to_string(),
551 "add_plan_id".to_string(),
552 0,
553 "us-central".to_string(),
554 "dc1".to_string(),
555 );
556 let added_job = added_request.job;
557 let added_job_plan = nomad_client.create_job_plan(&added_job).await?;
558 assert_eq!(DiffType::Added, added_job_plan.diff.r#type);
559 Ok(())
560 }
561
562 #[tokio::test(flavor = "current_thread")]
563 #[serial]
564 async fn test_edit_job_plan() -> Result<()> {
565 let nomad_client = build_client();
566 let added_request = build_job_request(
567 "edit_plan_job".to_string(),
568 "edit_plan_id".to_string(),
569 0,
570 "us-central".to_string(),
571 "dc1".to_string(),
572 );
573 nomad_client.create_or_update_job(added_request).await?;
574 tokio::time::sleep(Duration::from_secs(5)).await;
575 let changed_request = build_job_request(
576 "edit_plan_job".to_string(),
577 "edit_plan_id".to_string(),
578 0,
579 "us-central".to_string(),
580 "dc2".to_string(),
581 );
582 let changed_job = changed_request.job;
583 let job_plan = nomad_client.create_job_plan(&changed_job).await?;
584 assert_eq!(job_plan.diff.objects.len(), 1);
585 let diff_object = &job_plan.diff.objects[0];
586 assert_eq!(DiffType::Edited, job_plan.diff.r#type);
587 assert_eq!("Datacenters", diff_object.name);
588 let diff_fields = &diff_object.fields;
589 let mut old_diff_count = 0;
590 let mut new_diff_count = 0;
591 for diff_field in diff_fields {
593 if diff_field.r#type == DiffType::Added
594 && diff_field.name.eq("Datacenters")
595 && diff_field.old.is_empty()
596 && diff_field.new.eq("dc2")
597 {
598 new_diff_count += 1;
599 }
600 if diff_field.r#type == DiffType::Deleted
601 && diff_field.name.eq("Datacenters")
602 && diff_field.old.eq("dc1")
603 && diff_field.new.is_empty()
604 {
605 old_diff_count += 1;
606 }
607 }
608 assert_eq!(1, old_diff_count);
610 assert_eq!(1, new_diff_count);
611
612 Ok(())
613 }
614
615 #[tokio::test(flavor = "current_thread")]
616 #[serial]
617 async fn test_purge_true_job_plan() -> Result<()> {
618 let nomad_client = build_client();
619 let added_request = build_job_request(
620 "purge_true_plan_job".to_string(),
621 "purge_true_plan_id".to_string(),
622 0,
623 "us-central".to_string(),
624 "dc1".to_string(),
625 );
626 nomad_client.create_or_update_job(added_request).await?;
627 tokio::time::sleep(Duration::from_secs(5)).await;
628 let request_copy = build_job_request(
629 "purge_true_plan_job".to_string(),
630 "purge_true_plan_id".to_string(),
631 0,
632 "us-central".to_string(),
633 "dc1".to_string(),
634 );
635 nomad_client
636 .stop_job("purge_true_plan_id".to_string(), true, Some("default"))
637 .await?;
638 let job_plan = nomad_client.create_job_plan(&request_copy.job).await?;
639 assert_eq!(DiffType::Added, job_plan.diff.r#type);
640 Ok(())
641 }
642
643 #[tokio::test(flavor = "current_thread")]
644 #[serial]
645 async fn test_purge_false_job_plan() -> Result<()> {
646 let nomad_client = build_client();
647 let added_request = build_job_request(
648 "purge_false_plan_job".to_string(),
649 "purge_false_plan_id".to_string(),
650 0,
651 "us-central".to_string(),
652 "dc1".to_string(),
653 );
654 nomad_client.create_or_update_job(added_request).await?;
655 tokio::time::sleep(Duration::from_secs(5)).await;
656 let request_copy = build_job_request(
657 "purge_false_plan_job".to_string(),
658 "purge_false_plan_id".to_string(),
659 0,
660 "us-central".to_string(),
661 "dc1".to_string(),
662 );
663 nomad_client
664 .stop_job("purge_false_plan_id".to_string(), false, Some("default"))
665 .await?;
666 let job_plan = nomad_client.create_job_plan(&request_copy.job).await?;
667 assert_eq!(DiffType::Edited, job_plan.diff.r#type);
668 Ok(())
669 }
670
671 #[tokio::test(flavor = "current_thread")]
672 #[serial]
673 async fn test_retrieve_most_recent_job_deployment() -> Result<()> {
674 let nomad_client = build_client();
675 let request = build_job_request(
676 "deploy_job".to_string(),
677 "deploy_job_id".to_string(),
678 1,
679 "us-central".to_string(),
680 "dc1".to_string(),
681 );
682
683 nomad_client.create_or_update_job(request.clone()).await?;
684 tokio::time::sleep(Duration::from_secs(5)).await;
686 let job = request.job;
687 let deployment_option = get_deployment("deploy_job_id", nomad_client).await?;
688 assert!(deployment_option.is_some());
689
690 let deployment = deployment_option.unwrap();
691
692 assert_ne!(deployment.id, "None".to_string());
693 assert_eq!(deployment.job_id, job.id);
694 assert_eq!(deployment.job_version, 0);
696 let change_request = build_job_request(
697 "deploy_job2".to_string(),
698 "deploy_job_id".to_string(),
699 1,
700 "us-central".to_string(),
701 "dc1".to_string(),
702 );
703
704 assert!(deployment.status.is_some());
705 nomad_client.create_or_update_job(change_request).await?;
706 tokio::time::sleep(Duration::from_secs(5)).await;
707 let deployment_option = get_deployment("deploy_job_id", nomad_client).await?;
708 assert!(deployment_option.is_some());
709 let deployment: JobDeployment = deployment_option.unwrap();
710 assert_eq!(deployment.job_version, 1);
712
713 Ok(())
714 }
715
716 #[tokio::test(flavor = "current_thread")]
717 #[serial]
718 async fn test_stop_job() -> Result<()> {
720 let nomad_client = build_client();
721 let first_job_count = count_jobs().await?;
722
723 let request = build_job_request(
724 "stop_job".to_string(),
725 "stop_job_id".to_string(),
726 0,
727 "us-central".to_string(),
728 "dc1".to_string(),
729 );
730 nomad_client.create_or_update_job(request).await?;
731 nomad_client
732 .stop_job("stop_job_id".to_string(), true, Some("default"))
733 .await?;
734 let second_job_count = count_jobs().await?;
735 assert_eq!(first_job_count, second_job_count);
736 let request2 = build_job_request(
737 "stop_job2".to_string(),
738 "stop_job_id2".to_string(),
739 0,
740 "us-central".to_string(),
741 "dc1".to_string(),
742 );
743 nomad_client.create_or_update_job(request2).await?;
744 nomad_client
745 .stop_job("stop_job_id2".to_string(), false, Some("default"))
746 .await?;
747 let second_job_count = count_jobs().await?;
748 assert_eq!(first_job_count + 1, second_job_count);
749 let job: Job = nomad_client
750 .read_job("stop_job_id2", Some("default"))
751 .await?;
752 assert_eq!(job.name, "stop_job2".to_string());
753
754 Ok(())
755 }
756
757 #[tokio::test(flavor = "current_thread")]
758 #[serial]
759 async fn test_update_jobs() -> Result<()> {
761 let nomad_client = build_client();
762
763 let request = build_job_request(
764 "update_job".to_string(),
765 "update_job_id".to_string(),
766 1,
767 "us-central".to_string(),
768 "dc1".to_string(),
769 );
770 nomad_client.create_or_update_job(request).await?;
771 let first_count = count_jobs().await?;
772 let job: Job = nomad_client
773 .read_job("update_job_id", Some("default"))
774 .await?;
775 assert_eq!(job.datacenters[0], "dc1".to_string());
776 let request = build_job_request(
777 "update_job".to_string(),
778 "update_job_id".to_string(),
779 1,
780 "us-central".to_string(),
781 "dc2".to_string(),
782 );
783 nomad_client.create_or_update_job(request).await?;
784
785 let second_count = count_jobs().await?;
786 let job: Job = nomad_client
787 .read_job("update_job_id", Some("default"))
788 .await?;
789 assert_eq!(job.datacenters[0], "dc2".to_string());
790 assert_eq!(first_count, second_count);
791 Ok(())
792 }
793
794 #[tokio::test(flavor = "current_thread")]
795 #[serial]
796 async fn test_list_jobs() -> Result<()> {
798 let nomad_client = build_client();
799 let first_job_count = count_jobs().await?;
800
801 let request = build_job_request(
802 "list_job".to_string(),
803 "list_job_id".to_string(),
804 0,
805 "us-central".to_string(),
806 "dc1".to_string(),
807 );
808 nomad_client.create_or_update_job(request).await?;
809 let second_job_count = count_jobs().await?;
810 let jobs = nomad_client.list_jobs("", None).await?.into_iter();
811 let mut found = false;
812 for job in jobs {
813 found = job.name == "list_job" || found;
814 }
815 assert!(found);
816 assert_eq!(first_job_count + 1, second_job_count);
817 Ok(())
818 }
819
820 #[tokio::test(flavor = "current_thread")]
821 #[serial]
822 async fn test_read_jobs() -> Result<()> {
824 let nomad_client = build_client();
825 let request = build_job_request(
826 "read_job".to_string(),
827 "read_job_id".to_string(),
828 0,
829 "us-central".to_string(),
830 "dc1".to_string(),
831 );
832 nomad_client.create_or_update_job(request).await?;
833 let job: Job = nomad_client
834 .read_job("read_job_id", Some("default"))
835 .await?;
836 assert_eq!(job.name, "read_job".to_string());
837 Ok(())
838 }
839
840 async fn count_jobs() -> Result<i32> {
841 let nomad_client = build_client();
842 let jobs = nomad_client.list_jobs("", None).await?.into_iter();
843 let mut job_count = 0;
844 for _ in jobs {
845 job_count += 1;
846 }
847 Ok(job_count)
848 }
849
850 fn build_client() -> &'static Nomad {
851 Box::leak(Box::new(Nomad::new(
852 Config::from_env(),
853 Duration::from_secs(10),
854 )))
855 }
856
857 async fn get_deployment(job_id: &str, nomad_client: &Nomad) -> Result<Option<JobDeployment>> {
858 let optional_deployment = nomad_client
859 .retrieve_most_recent_job_deployment(job_id, Some("default"))
860 .await?;
861 Ok(optional_deployment)
862 }
863
864 fn build_resources() -> Resources {
865 Resources {
866 cpu: CpuResource::Cores(1),
867 memory_mb: 256,
868 memory_max_mb: Some(1024),
869 networks: vec![NetworkConfig {
870 m_bits: 10,
871 dynamic_ports: vec![
872 Port {
873 label: "redis".to_string(),
874
875 ..Default::default()
876 },
877 Port {
878 label: "debug".to_string(),
879
880 ..Default::default()
881 },
882 ],
883
884 ..Default::default()
885 }],
886 devices: vec![],
887 }
888 }
889
890 fn build_task() -> Task {
891 let driver_config = build_driver_config();
892 let env_vars = HashMap::new();
893 let templates = vec![];
894 let task_resources = build_resources();
895 Task {
896 name: "redis".to_string(),
897 driver: "docker".to_string(),
898 config: Some(driver_config),
899
900 resources: task_resources,
901
902 env: Some(env_vars),
903
904 templates,
905
906 services: vec![Service {
907 name: "test-job".to_string(),
908 port_label: Some(PORT_GRPC_NAME.to_string()),
909 checks: vec![ConsulCheck {
910 address_mode: AddressMode::Host,
911 check_type: CheckType::Tcp,
912 port_label: PORT_GRPC_NAME.to_string(),
913 header: Some(HashMap::from([(
914 "Host".to_string(),
915 vec!["localhost".to_string()],
916 )])),
917 interval: Duration::from_secs(10),
918 timeout: Duration::from_secs(2),
919 ..Default::default()
920 }],
921
922 ..Default::default()
923 }],
924
925 restart_policy: RestartPolicy {
926 interval: Duration::from_secs(90),
927 delay: Duration::from_secs(5),
928 ..Default::default()
929 },
930
931 log_config: LogConfig {
932 max_files: 10,
933 max_file_size_in_mb: 10,
934 },
935
936 vault: Some(Vault {
937 policies: vec!["secret".to_string()],
938 ..Default::default()
939 }),
940 ..Default::default()
941 }
942 }
943
944 fn build_sidecar_task() -> Task {
945 let driver_config = HashMap::from([(
946 "image".to_string(),
947 DriverConfig::String(format!("{}:{}", "busybox", "latest")),
948 )]);
949
950 let env_vars = HashMap::new();
951 let templates = vec![];
952 let task_resources = build_resources();
953 Task {
954 name: "sidecar".to_string(),
955 driver: "docker".to_string(),
956 config: Some(driver_config),
957
958 resources: task_resources,
959
960 env: Some(env_vars),
961
962 templates,
963
964 log_config: LogConfig {
965 max_files: 10,
966 max_file_size_in_mb: 10,
967 },
968
969 lifecycle: Some(Lifecycle {
970 hook: "prestart".to_string(),
971 sidecar: true,
972 }),
973
974 ..Default::default()
975 }
976 }
977
978 fn build_driver_config() -> HashMap<String, DriverConfig> {
979 let mut driver_config = HashMap::new();
980 driver_config.insert(
981 "image".to_string(),
982 DriverConfig::String(format!("{}:{}", "bitnami/redis", "latest")),
983 );
984
985 let port_map = build_port_map();
986
987 driver_config.insert(
988 "port_map".to_string(),
989 DriverConfig::Vector(vec![DriverConfig::Map(port_map)]),
990 );
991
992 driver_config.insert(
995 "network_mode".to_string(),
996 DriverConfig::String("persistence".to_owned()),
997 );
998 driver_config
999 }
1000
1001 fn build_service_group(datacenter: String, job_name: String) -> TaskGroup {
1002 let restart_policy = build_restart_policy();
1003 let update_strategy = build_update_strategy();
1004 TaskGroup {
1005 name: format!(
1006 "redis-{}-{}-{}-instance",
1007 "Test",
1008 datacenter.as_str(),
1009 job_name
1010 ),
1011
1012 restart_policy: Some(restart_policy),
1013
1014 update: update_strategy,
1015
1016 count: 2,
1017
1018 ephemeral_disk: EphemeralDisk {
1019 size_in_mb: 200,
1020 ..Default::default()
1021 },
1022
1023 scaling: Some(Scaling {
1024 enabled: false,
1025 min: Some(1),
1026 max: 5,
1027 ..Default::default()
1028 }),
1029
1030 tasks: vec![build_task(), build_sidecar_task()],
1031
1032 ..Default::default()
1033 }
1034 }
1035
1036 fn build_restart_policy() -> RestartPolicy {
1037 RestartPolicy {
1038 mode: RestartMode::Fail,
1040 attempts: 1,
1041 interval: Duration::from_secs(90),
1042 delay: Duration::from_secs(5),
1043 }
1044 }
1045
1046 fn build_update_strategy() -> UpdateStrategy {
1047 UpdateStrategy {
1048 min_healthy_time: Duration::from_secs(10),
1049 healthy_deadline: Duration::from_secs(120),
1050 progress_deadline: Duration::from_secs(1200),
1051 auto_revert: false,
1052 stagger: Duration::from_secs(5),
1053 max_parallel: 1,
1054 ..Default::default()
1055 }
1056 }
1057
1058 fn build_port_map() -> HashMap<String, DriverConfig> {
1059 let mut port_map = HashMap::new();
1060 port_map.insert(
1061 PORT_GRPC_NAME.to_string(),
1062 DriverConfig::Int(DOCKER_GRPC_PORT as i64),
1063 );
1064 port_map.insert(
1065 PORT_DEBUG_NAME.to_string(),
1066 DriverConfig::Int(DOCKER_HTTP_PORT as i64),
1067 );
1068 port_map
1069 }
1070
1071 fn build_job(
1072 job_name: String,
1073 job_id: String,
1074 job_region: String,
1075 job_datacenter: String,
1076 ) -> Job {
1077 let service_group = build_service_group(job_datacenter.clone(), job_name.clone());
1078
1079 Job {
1080 namespace: "default".to_string(),
1081 region: job_region,
1082 datacenters: vec![job_datacenter],
1083 id: job_id,
1084 name: job_name,
1085 job_type: JobType::Service,
1086 meta: Some(build_meta()),
1087 task_groups: vec![service_group],
1088
1089 constraints: vec![Constraint {
1090 l_target: Some("${meta.storage}".to_string()),
1091 operand: ConstraintOperator::NotEquals,
1092 r_target: Some("portworx".to_string()),
1093 }],
1094
1095 vault_token: Some("myroot".to_string()),
1096 ..Default::default()
1097 }
1098 }
1099
1100 fn build_job_request(
1101 job_name: String,
1102 job_id: String,
1103 modify_index: u64,
1104 region: String,
1105 datacenter: String,
1106 ) -> JobRegisterRequest {
1107 let new_job = build_job(job_name, job_id, region, datacenter);
1108 JobRegisterRequest {
1109 job: new_job,
1110 enforce_index: false,
1111 job_modify_index: modify_index,
1112 ..Default::default()
1113 }
1114 }
1115
1116 fn build_meta() -> HashMap<String, String> {
1117 let mut result = HashMap::new();
1118 result.insert("environment".to_string(), "Test".to_string());
1119 result
1120 }
1121}