1use crate::error::{ForgeError, Result};
10use crate::job::{Driver, Job, JobType, Task, TaskGroup};
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::time::Duration;
15use tracing::info;
16
17#[derive(Clone)]
19pub struct NomadClient {
20 client: Client,
21 base_url: String,
22 token: Option<String>,
23 namespace: String,
24 region: String,
25}
26
27impl NomadClient {
28 pub fn new(base_url: impl Into<String>) -> Result<Self> {
30 let client = Client::builder()
31 .timeout(Duration::from_secs(30))
32 .build()
33 .map_err(|e| ForgeError::nomad(format!("Failed to create HTTP client: {}", e)))?;
34
35 Ok(Self {
36 client,
37 base_url: base_url.into().trim_end_matches('/').to_string(),
38 token: None,
39 namespace: "default".to_string(),
40 region: "global".to_string(),
41 })
42 }
43
44 pub fn with_token(mut self, token: impl Into<String>) -> Self {
46 self.token = Some(token.into());
47 self
48 }
49
50 pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
52 self.namespace = namespace.into();
53 self
54 }
55
56 pub fn with_region(mut self, region: impl Into<String>) -> Self {
58 self.region = region.into();
59 self
60 }
61
62 fn url(&self, path: &str) -> String {
63 format!("{}/v1{}", self.base_url, path)
64 }
65
66 fn add_auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
67 match &self.token {
68 Some(token) => req.header("X-Nomad-Token", token),
69 None => req,
70 }
71 }
72
73 pub async fn health(&self) -> Result<bool> {
75 let resp = self
76 .add_auth(self.client.get(self.url("/status/leader")))
77 .send()
78 .await?;
79
80 Ok(resp.status().is_success())
81 }
82
83 pub async fn leader(&self) -> Result<String> {
85 let resp = self
86 .add_auth(self.client.get(self.url("/status/leader")))
87 .send()
88 .await?
89 .error_for_status()
90 .map_err(|e| ForgeError::nomad(e.to_string()))?;
91
92 resp.text()
93 .await
94 .map_err(|e| ForgeError::nomad(e.to_string()))
95 }
96
97 pub async fn list_jobs(&self) -> Result<Vec<JobListStub>> {
99 let url = format!("{}?namespace={}", self.url("/jobs"), self.namespace);
100 let resp = self
101 .add_auth(self.client.get(&url))
102 .send()
103 .await?
104 .error_for_status()
105 .map_err(|e| ForgeError::nomad(e.to_string()))?;
106
107 resp.json()
108 .await
109 .map_err(|e| ForgeError::nomad(e.to_string()))
110 }
111
112 pub async fn get_job(&self, job_id: &str) -> Result<NomadJob> {
114 let url = format!(
115 "{}?namespace={}",
116 self.url(&format!("/job/{}", job_id)),
117 self.namespace
118 );
119 let resp = self
120 .add_auth(self.client.get(&url))
121 .send()
122 .await?
123 .error_for_status()
124 .map_err(|e| ForgeError::nomad(e.to_string()))?;
125
126 resp.json()
127 .await
128 .map_err(|e| ForgeError::nomad(e.to_string()))
129 }
130
131 pub async fn submit_job(&self, job: &Job) -> Result<JobSubmitResponse> {
133 let nomad_job = NomadJob::from_forge_job(job);
134 let payload = JobSubmitRequest {
135 job: nomad_job,
136 enforce_index: false,
137 job_modify_index: None,
138 policy_override: false,
139 };
140
141 let url = format!("{}?namespace={}", self.url("/jobs"), self.namespace);
142 let resp = self
143 .add_auth(self.client.post(&url))
144 .json(&payload)
145 .send()
146 .await?
147 .error_for_status()
148 .map_err(|e| ForgeError::nomad(e.to_string()))?;
149
150 let result: JobSubmitResponse = resp
151 .json()
152 .await
153 .map_err(|e| ForgeError::nomad(e.to_string()))?;
154
155 info!(job_id = %job.id, eval_id = %result.eval_id, "Job submitted to Nomad");
156 Ok(result)
157 }
158
159 pub async fn stop_job(&self, job_id: &str, purge: bool) -> Result<JobSubmitResponse> {
161 let url = format!(
162 "{}?namespace={}&purge={}",
163 self.url(&format!("/job/{}", job_id)),
164 self.namespace,
165 purge
166 );
167 let resp = self
168 .add_auth(self.client.delete(&url))
169 .send()
170 .await?
171 .error_for_status()
172 .map_err(|e| ForgeError::nomad(e.to_string()))?;
173
174 let result: JobSubmitResponse = resp
175 .json()
176 .await
177 .map_err(|e| ForgeError::nomad(e.to_string()))?;
178
179 info!(job_id = %job_id, "Job stopped");
180 Ok(result)
181 }
182
183 pub async fn scale_job(
185 &self,
186 job_id: &str,
187 group: &str,
188 count: u32,
189 reason: Option<&str>,
190 ) -> Result<ScaleResponse> {
191 let payload = ScaleRequest {
192 count: Some(count as i64),
193 target: HashMap::from([("Group".to_string(), group.to_string())]),
194 message: reason.map(|s| s.to_string()),
195 policy_override: false,
196 error: false,
197 meta: None,
198 };
199
200 let url = format!(
201 "{}?namespace={}",
202 self.url(&format!("/job/{}/scale", job_id)),
203 self.namespace
204 );
205 let resp = self
206 .add_auth(self.client.post(&url))
207 .json(&payload)
208 .send()
209 .await?
210 .error_for_status()
211 .map_err(|e| ForgeError::nomad(e.to_string()))?;
212
213 let result: ScaleResponse = resp
214 .json()
215 .await
216 .map_err(|e| ForgeError::nomad(e.to_string()))?;
217
218 info!(job_id = %job_id, group = %group, count = count, "Job scaled");
219 Ok(result)
220 }
221
222 pub async fn get_allocations(&self, job_id: &str) -> Result<Vec<AllocationListStub>> {
224 let url = format!(
225 "{}?namespace={}",
226 self.url(&format!("/job/{}/allocations", job_id)),
227 self.namespace
228 );
229 let resp = self
230 .add_auth(self.client.get(&url))
231 .send()
232 .await?
233 .error_for_status()
234 .map_err(|e| ForgeError::nomad(e.to_string()))?;
235
236 resp.json()
237 .await
238 .map_err(|e| ForgeError::nomad(e.to_string()))
239 }
240
241 pub async fn list_nodes(&self) -> Result<Vec<NodeListStub>> {
243 let resp = self
244 .add_auth(self.client.get(self.url("/nodes")))
245 .send()
246 .await?
247 .error_for_status()
248 .map_err(|e| ForgeError::nomad(e.to_string()))?;
249
250 resp.json()
251 .await
252 .map_err(|e| ForgeError::nomad(e.to_string()))
253 }
254
255 pub async fn get_node(&self, node_id: &str) -> Result<Node> {
257 let resp = self
258 .add_auth(self.client.get(self.url(&format!("/node/{}", node_id))))
259 .send()
260 .await?
261 .error_for_status()
262 .map_err(|e| ForgeError::nomad(e.to_string()))?;
263
264 resp.json()
265 .await
266 .map_err(|e| ForgeError::nomad(e.to_string()))
267 }
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
274#[serde(rename_all = "PascalCase")]
275pub struct JobListStub {
276 #[serde(rename = "ID")]
277 pub id: String,
278 pub name: String,
279 #[serde(rename = "Type")]
280 pub job_type: String,
281 pub status: String,
282 pub status_description: Option<String>,
283 pub priority: i32,
284}
285
286#[derive(Debug, Clone, Serialize, Deserialize)]
288#[serde(rename_all = "PascalCase")]
289pub struct NomadJob {
290 #[serde(rename = "ID")]
291 pub id: String,
292 pub name: String,
293 #[serde(rename = "Type")]
294 pub job_type: String,
295 pub priority: i32,
296 pub datacenters: Vec<String>,
297 pub task_groups: Vec<NomadTaskGroup>,
298 pub namespace: Option<String>,
299 pub region: Option<String>,
300 pub meta: Option<HashMap<String, String>>,
301}
302
303impl NomadJob {
304 pub fn from_forge_job(job: &Job) -> Self {
306 Self {
307 id: job.id.clone(),
308 name: job.name.clone(),
309 job_type: match job.job_type {
310 JobType::Service => "service".to_string(),
311 JobType::Batch => "batch".to_string(),
312 JobType::System => "system".to_string(),
313 JobType::Parameterized => "parameterized".to_string(),
314 },
315 priority: job.priority as i32,
316 datacenters: job.datacenters.clone(),
317 task_groups: job.groups.iter().map(NomadTaskGroup::from_forge).collect(),
318 namespace: None,
319 region: None,
320 meta: if job.metadata.is_empty() {
321 None
322 } else {
323 Some(job.metadata.clone())
324 },
325 }
326 }
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize)]
331#[serde(rename_all = "PascalCase")]
332pub struct NomadTaskGroup {
333 pub name: String,
334 pub count: i32,
335 pub tasks: Vec<NomadTask>,
336 pub scaling: Option<NomadScaling>,
337 pub restart_policy: Option<NomadRestartPolicy>,
338 pub meta: Option<HashMap<String, String>>,
339}
340
341impl NomadTaskGroup {
342 fn from_forge(group: &TaskGroup) -> Self {
343 Self {
344 name: group.name.clone(),
345 count: group.scaling.desired as i32,
346 tasks: group.tasks.iter().map(NomadTask::from_forge).collect(),
347 scaling: Some(NomadScaling {
348 min: group.scaling.min as i64,
349 max: group.scaling.max as i64,
350 enabled: true,
351 policy: None,
352 }),
353 restart_policy: Some(NomadRestartPolicy {
354 attempts: group.restart_policy.attempts as i32,
355 delay: group.restart_policy.delay_secs as i64 * 1_000_000_000,
356 mode: match group.restart_policy.mode {
357 crate::job::RestartMode::Fail => "fail".to_string(),
358 crate::job::RestartMode::Delay => "delay".to_string(),
359 },
360 interval: 1800_000_000_000, }),
362 meta: if group.metadata.is_empty() {
363 None
364 } else {
365 Some(group.metadata.clone())
366 },
367 }
368 }
369}
370
371#[derive(Debug, Clone, Serialize, Deserialize)]
373#[serde(rename_all = "PascalCase")]
374pub struct NomadTask {
375 pub name: String,
376 pub driver: String,
377 pub config: HashMap<String, serde_json::Value>,
378 pub resources: NomadResources,
379 pub env: Option<HashMap<String, String>>,
380 pub meta: Option<HashMap<String, String>>,
381}
382
383impl NomadTask {
384 fn from_forge(task: &Task) -> Self {
385 let mut config = HashMap::new();
386
387 if let Some(cmd) = &task.command {
388 config.insert("command".to_string(), serde_json::json!(cmd));
389 }
390 if !task.args.is_empty() {
391 config.insert("args".to_string(), serde_json::json!(task.args));
392 }
393
394 Self {
395 name: task.name.clone(),
396 driver: match task.driver {
397 Driver::Exec => "exec".to_string(),
398 Driver::Docker => "docker".to_string(),
399 Driver::Podman => "podman".to_string(),
400 Driver::RawExec => "raw_exec".to_string(),
401 Driver::Java => "java".to_string(),
402 Driver::Qemu => "qemu".to_string(),
403 },
404 config,
405 resources: NomadResources {
406 cpu: task.resources.cpu as i32,
407 memory_mb: task.resources.memory as i32,
408 disk_mb: task.resources.disk.map(|d| d as i32),
409 },
410 env: if task.env.is_empty() {
411 None
412 } else {
413 Some(task.env.clone())
414 },
415 meta: if task.metadata.is_empty() {
416 None
417 } else {
418 Some(task.metadata.clone())
419 },
420 }
421 }
422}
423
424#[derive(Debug, Clone, Serialize, Deserialize)]
426#[serde(rename_all = "PascalCase")]
427pub struct NomadResources {
428 #[serde(rename = "CPU")]
429 pub cpu: i32,
430 #[serde(rename = "MemoryMB")]
431 pub memory_mb: i32,
432 #[serde(rename = "DiskMB")]
433 pub disk_mb: Option<i32>,
434}
435
436#[derive(Debug, Clone, Serialize, Deserialize)]
438#[serde(rename_all = "PascalCase")]
439pub struct NomadScaling {
440 pub min: i64,
441 pub max: i64,
442 pub enabled: bool,
443 pub policy: Option<HashMap<String, serde_json::Value>>,
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(rename_all = "PascalCase")]
449pub struct NomadRestartPolicy {
450 pub attempts: i32,
451 pub delay: i64,
452 pub mode: String,
453 pub interval: i64,
454}
455
456#[derive(Debug, Serialize)]
458#[serde(rename_all = "PascalCase")]
459struct JobSubmitRequest {
460 job: NomadJob,
461 enforce_index: bool,
462 job_modify_index: Option<u64>,
463 policy_override: bool,
464}
465
466#[derive(Debug, Clone, Deserialize)]
468#[serde(rename_all = "PascalCase")]
469pub struct JobSubmitResponse {
470 #[serde(rename = "EvalID")]
471 pub eval_id: String,
472 pub eval_create_index: Option<u64>,
473 pub job_modify_index: Option<u64>,
474 pub warnings: Option<String>,
475}
476
477#[derive(Debug, Serialize)]
479#[serde(rename_all = "PascalCase")]
480struct ScaleRequest {
481 count: Option<i64>,
482 target: HashMap<String, String>,
483 message: Option<String>,
484 policy_override: bool,
485 error: bool,
486 meta: Option<HashMap<String, String>>,
487}
488
489#[derive(Debug, Clone, Deserialize)]
491#[serde(rename_all = "PascalCase")]
492pub struct ScaleResponse {
493 #[serde(rename = "EvalID")]
494 pub eval_id: Option<String>,
495 pub eval_create_index: Option<u64>,
496}
497
498#[derive(Debug, Clone, Serialize, Deserialize)]
500#[serde(rename_all = "PascalCase")]
501pub struct AllocationListStub {
502 #[serde(rename = "ID")]
503 pub id: String,
504 #[serde(rename = "JobID")]
505 pub job_id: String,
506 #[serde(rename = "NodeID")]
507 pub node_id: String,
508 pub task_group: String,
509 pub client_status: String,
510 pub desired_status: String,
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize)]
515#[serde(rename_all = "PascalCase")]
516pub struct NodeListStub {
517 #[serde(rename = "ID")]
518 pub id: String,
519 pub name: String,
520 pub status: String,
521 pub status_description: Option<String>,
522 pub datacenter: String,
523 pub node_class: Option<String>,
524 pub drain: bool,
525 pub schedulability_eligibility: Option<String>,
526}
527
528#[derive(Debug, Clone, Serialize, Deserialize)]
530#[serde(rename_all = "PascalCase")]
531pub struct Node {
532 #[serde(rename = "ID")]
533 pub id: String,
534 pub name: String,
535 pub datacenter: String,
536 pub status: String,
537 pub drain: bool,
538 pub attributes: Option<HashMap<String, String>>,
539 pub resources: Option<NodeResources>,
540 pub reserved: Option<NodeResources>,
541}
542
543#[derive(Debug, Clone, Serialize, Deserialize)]
545#[serde(rename_all = "PascalCase")]
546pub struct NodeResources {
547 #[serde(rename = "CPU")]
548 pub cpu: Option<i32>,
549 #[serde(rename = "MemoryMB")]
550 pub memory_mb: Option<i32>,
551 #[serde(rename = "DiskMB")]
552 pub disk_mb: Option<i32>,
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[test]
560 fn test_nomad_job_conversion() {
561 let job = Job::new("test-job")
562 .job_type(JobType::Service)
563 .with_group(
564 "api",
565 Task::new("server")
566 .driver(Driver::Exec)
567 .command("/bin/server")
568 .resources(500, 256),
569 );
570
571 let nomad_job = NomadJob::from_forge_job(&job);
572
573 assert_eq!(nomad_job.name, "test-job");
574 assert_eq!(nomad_job.job_type, "service");
575 assert_eq!(nomad_job.task_groups.len(), 1);
576 assert_eq!(nomad_job.task_groups[0].name, "api");
577 assert_eq!(nomad_job.task_groups[0].tasks[0].driver, "exec");
578 }
579}