forge_orchestration/
nomad.rs

1//! Nomad client integration for Forge
2//!
3//! ## Table of Contents
4//! - **NomadClient**: HTTP client for Nomad API
5//! - **NomadJob**: Nomad-specific job representation
6//! - **Allocation**: Nomad allocation info
7//! - **Node**: Nomad node info
8
9use crate::error::{ForgeError, Result};
10use crate::job::{Driver, Job, JobType, Task, TaskGroup};
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::time::Duration;
15use tracing::info;
16
17/// Nomad API client
18#[derive(Clone)]
19pub struct NomadClient {
20    client: Client,
21    base_url: String,
22    token: Option<String>,
23    namespace: String,
24    region: String,
25}
26
27impl NomadClient {
28    /// Create a new Nomad client
29    pub fn new(base_url: impl Into<String>) -> Result<Self> {
30        let client = Client::builder()
31            .timeout(Duration::from_secs(30))
32            .build()
33            .map_err(|e| ForgeError::nomad(format!("Failed to create HTTP client: {}", e)))?;
34
35        Ok(Self {
36            client,
37            base_url: base_url.into().trim_end_matches('/').to_string(),
38            token: None,
39            namespace: "default".to_string(),
40            region: "global".to_string(),
41        })
42    }
43
44    /// Set ACL token
45    pub fn with_token(mut self, token: impl Into<String>) -> Self {
46        self.token = Some(token.into());
47        self
48    }
49
50    /// Set namespace
51    pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
52        self.namespace = namespace.into();
53        self
54    }
55
56    /// Set region
57    pub fn with_region(mut self, region: impl Into<String>) -> Self {
58        self.region = region.into();
59        self
60    }
61
62    fn url(&self, path: &str) -> String {
63        format!("{}/v1{}", self.base_url, path)
64    }
65
66    fn add_auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
67        match &self.token {
68            Some(token) => req.header("X-Nomad-Token", token),
69            None => req,
70        }
71    }
72
73    /// Check Nomad connectivity
74    pub async fn health(&self) -> Result<bool> {
75        let resp = self
76            .add_auth(self.client.get(self.url("/status/leader")))
77            .send()
78            .await?;
79
80        Ok(resp.status().is_success())
81    }
82
83    /// Get cluster leader
84    pub async fn leader(&self) -> Result<String> {
85        let resp = self
86            .add_auth(self.client.get(self.url("/status/leader")))
87            .send()
88            .await?
89            .error_for_status()
90            .map_err(|e| ForgeError::nomad(e.to_string()))?;
91
92        resp.text()
93            .await
94            .map_err(|e| ForgeError::nomad(e.to_string()))
95    }
96
97    /// List all jobs
98    pub async fn list_jobs(&self) -> Result<Vec<JobListStub>> {
99        let url = format!("{}?namespace={}", self.url("/jobs"), self.namespace);
100        let resp = self
101            .add_auth(self.client.get(&url))
102            .send()
103            .await?
104            .error_for_status()
105            .map_err(|e| ForgeError::nomad(e.to_string()))?;
106
107        resp.json()
108            .await
109            .map_err(|e| ForgeError::nomad(e.to_string()))
110    }
111
112    /// Get job details
113    pub async fn get_job(&self, job_id: &str) -> Result<NomadJob> {
114        let url = format!(
115            "{}?namespace={}",
116            self.url(&format!("/job/{}", job_id)),
117            self.namespace
118        );
119        let resp = self
120            .add_auth(self.client.get(&url))
121            .send()
122            .await?
123            .error_for_status()
124            .map_err(|e| ForgeError::nomad(e.to_string()))?;
125
126        resp.json()
127            .await
128            .map_err(|e| ForgeError::nomad(e.to_string()))
129    }
130
131    /// Submit a job
132    pub async fn submit_job(&self, job: &Job) -> Result<JobSubmitResponse> {
133        let nomad_job = NomadJob::from_forge_job(job);
134        let payload = JobSubmitRequest {
135            job: nomad_job,
136            enforce_index: false,
137            job_modify_index: None,
138            policy_override: false,
139        };
140
141        let url = format!("{}?namespace={}", self.url("/jobs"), self.namespace);
142        let resp = self
143            .add_auth(self.client.post(&url))
144            .json(&payload)
145            .send()
146            .await?
147            .error_for_status()
148            .map_err(|e| ForgeError::nomad(e.to_string()))?;
149
150        let result: JobSubmitResponse = resp
151            .json()
152            .await
153            .map_err(|e| ForgeError::nomad(e.to_string()))?;
154
155        info!(job_id = %job.id, eval_id = %result.eval_id, "Job submitted to Nomad");
156        Ok(result)
157    }
158
159    /// Stop a job
160    pub async fn stop_job(&self, job_id: &str, purge: bool) -> Result<JobSubmitResponse> {
161        let url = format!(
162            "{}?namespace={}&purge={}",
163            self.url(&format!("/job/{}", job_id)),
164            self.namespace,
165            purge
166        );
167        let resp = self
168            .add_auth(self.client.delete(&url))
169            .send()
170            .await?
171            .error_for_status()
172            .map_err(|e| ForgeError::nomad(e.to_string()))?;
173
174        let result: JobSubmitResponse = resp
175            .json()
176            .await
177            .map_err(|e| ForgeError::nomad(e.to_string()))?;
178
179        info!(job_id = %job_id, "Job stopped");
180        Ok(result)
181    }
182
183    /// Scale a job's task group
184    pub async fn scale_job(
185        &self,
186        job_id: &str,
187        group: &str,
188        count: u32,
189        reason: Option<&str>,
190    ) -> Result<ScaleResponse> {
191        let payload = ScaleRequest {
192            count: Some(count as i64),
193            target: HashMap::from([("Group".to_string(), group.to_string())]),
194            message: reason.map(|s| s.to_string()),
195            policy_override: false,
196            error: false,
197            meta: None,
198        };
199
200        let url = format!(
201            "{}?namespace={}",
202            self.url(&format!("/job/{}/scale", job_id)),
203            self.namespace
204        );
205        let resp = self
206            .add_auth(self.client.post(&url))
207            .json(&payload)
208            .send()
209            .await?
210            .error_for_status()
211            .map_err(|e| ForgeError::nomad(e.to_string()))?;
212
213        let result: ScaleResponse = resp
214            .json()
215            .await
216            .map_err(|e| ForgeError::nomad(e.to_string()))?;
217
218        info!(job_id = %job_id, group = %group, count = count, "Job scaled");
219        Ok(result)
220    }
221
222    /// Get job allocations
223    pub async fn get_allocations(&self, job_id: &str) -> Result<Vec<AllocationListStub>> {
224        let url = format!(
225            "{}?namespace={}",
226            self.url(&format!("/job/{}/allocations", job_id)),
227            self.namespace
228        );
229        let resp = self
230            .add_auth(self.client.get(&url))
231            .send()
232            .await?
233            .error_for_status()
234            .map_err(|e| ForgeError::nomad(e.to_string()))?;
235
236        resp.json()
237            .await
238            .map_err(|e| ForgeError::nomad(e.to_string()))
239    }
240
241    /// List nodes
242    pub async fn list_nodes(&self) -> Result<Vec<NodeListStub>> {
243        let resp = self
244            .add_auth(self.client.get(self.url("/nodes")))
245            .send()
246            .await?
247            .error_for_status()
248            .map_err(|e| ForgeError::nomad(e.to_string()))?;
249
250        resp.json()
251            .await
252            .map_err(|e| ForgeError::nomad(e.to_string()))
253    }
254
255    /// Get node details
256    pub async fn get_node(&self, node_id: &str) -> Result<Node> {
257        let resp = self
258            .add_auth(self.client.get(self.url(&format!("/node/{}", node_id))))
259            .send()
260            .await?
261            .error_for_status()
262            .map_err(|e| ForgeError::nomad(e.to_string()))?;
263
264        resp.json()
265            .await
266            .map_err(|e| ForgeError::nomad(e.to_string()))
267    }
268}
269
270// Nomad API types
271
272/// Job list stub from Nomad API
273#[derive(Debug, Clone, Serialize, Deserialize)]
274#[serde(rename_all = "PascalCase")]
275pub struct JobListStub {
276    #[serde(rename = "ID")]
277    pub id: String,
278    pub name: String,
279    #[serde(rename = "Type")]
280    pub job_type: String,
281    pub status: String,
282    pub status_description: Option<String>,
283    pub priority: i32,
284}
285
286/// Nomad job representation
287#[derive(Debug, Clone, Serialize, Deserialize)]
288#[serde(rename_all = "PascalCase")]
289pub struct NomadJob {
290    #[serde(rename = "ID")]
291    pub id: String,
292    pub name: String,
293    #[serde(rename = "Type")]
294    pub job_type: String,
295    pub priority: i32,
296    pub datacenters: Vec<String>,
297    pub task_groups: Vec<NomadTaskGroup>,
298    pub namespace: Option<String>,
299    pub region: Option<String>,
300    pub meta: Option<HashMap<String, String>>,
301}
302
303impl NomadJob {
304    /// Convert from Forge Job
305    pub fn from_forge_job(job: &Job) -> Self {
306        Self {
307            id: job.id.clone(),
308            name: job.name.clone(),
309            job_type: match job.job_type {
310                JobType::Service => "service".to_string(),
311                JobType::Batch => "batch".to_string(),
312                JobType::System => "system".to_string(),
313                JobType::Parameterized => "parameterized".to_string(),
314            },
315            priority: job.priority as i32,
316            datacenters: job.datacenters.clone(),
317            task_groups: job.groups.iter().map(NomadTaskGroup::from_forge).collect(),
318            namespace: None,
319            region: None,
320            meta: if job.metadata.is_empty() {
321                None
322            } else {
323                Some(job.metadata.clone())
324            },
325        }
326    }
327}
328
329/// Nomad task group
330#[derive(Debug, Clone, Serialize, Deserialize)]
331#[serde(rename_all = "PascalCase")]
332pub struct NomadTaskGroup {
333    pub name: String,
334    pub count: i32,
335    pub tasks: Vec<NomadTask>,
336    pub scaling: Option<NomadScaling>,
337    pub restart_policy: Option<NomadRestartPolicy>,
338    pub meta: Option<HashMap<String, String>>,
339}
340
341impl NomadTaskGroup {
342    fn from_forge(group: &TaskGroup) -> Self {
343        Self {
344            name: group.name.clone(),
345            count: group.scaling.desired as i32,
346            tasks: group.tasks.iter().map(NomadTask::from_forge).collect(),
347            scaling: Some(NomadScaling {
348                min: group.scaling.min as i64,
349                max: group.scaling.max as i64,
350                enabled: true,
351                policy: None,
352            }),
353            restart_policy: Some(NomadRestartPolicy {
354                attempts: group.restart_policy.attempts as i32,
355                delay: group.restart_policy.delay_secs as i64 * 1_000_000_000,
356                mode: match group.restart_policy.mode {
357                    crate::job::RestartMode::Fail => "fail".to_string(),
358                    crate::job::RestartMode::Delay => "delay".to_string(),
359                },
360                interval: 1800_000_000_000, // 30 minutes in nanoseconds
361            }),
362            meta: if group.metadata.is_empty() {
363                None
364            } else {
365                Some(group.metadata.clone())
366            },
367        }
368    }
369}
370
371/// Nomad task
372#[derive(Debug, Clone, Serialize, Deserialize)]
373#[serde(rename_all = "PascalCase")]
374pub struct NomadTask {
375    pub name: String,
376    pub driver: String,
377    pub config: HashMap<String, serde_json::Value>,
378    pub resources: NomadResources,
379    pub env: Option<HashMap<String, String>>,
380    pub meta: Option<HashMap<String, String>>,
381}
382
383impl NomadTask {
384    fn from_forge(task: &Task) -> Self {
385        let mut config = HashMap::new();
386
387        if let Some(cmd) = &task.command {
388            config.insert("command".to_string(), serde_json::json!(cmd));
389        }
390        if !task.args.is_empty() {
391            config.insert("args".to_string(), serde_json::json!(task.args));
392        }
393
394        Self {
395            name: task.name.clone(),
396            driver: match task.driver {
397                Driver::Exec => "exec".to_string(),
398                Driver::Docker => "docker".to_string(),
399                Driver::Podman => "podman".to_string(),
400                Driver::RawExec => "raw_exec".to_string(),
401                Driver::Java => "java".to_string(),
402                Driver::Qemu => "qemu".to_string(),
403            },
404            config,
405            resources: NomadResources {
406                cpu: task.resources.cpu as i32,
407                memory_mb: task.resources.memory as i32,
408                disk_mb: task.resources.disk.map(|d| d as i32),
409            },
410            env: if task.env.is_empty() {
411                None
412            } else {
413                Some(task.env.clone())
414            },
415            meta: if task.metadata.is_empty() {
416                None
417            } else {
418                Some(task.metadata.clone())
419            },
420        }
421    }
422}
423
424/// Nomad resources
425#[derive(Debug, Clone, Serialize, Deserialize)]
426#[serde(rename_all = "PascalCase")]
427pub struct NomadResources {
428    #[serde(rename = "CPU")]
429    pub cpu: i32,
430    #[serde(rename = "MemoryMB")]
431    pub memory_mb: i32,
432    #[serde(rename = "DiskMB")]
433    pub disk_mb: Option<i32>,
434}
435
436/// Nomad scaling config
437#[derive(Debug, Clone, Serialize, Deserialize)]
438#[serde(rename_all = "PascalCase")]
439pub struct NomadScaling {
440    pub min: i64,
441    pub max: i64,
442    pub enabled: bool,
443    pub policy: Option<HashMap<String, serde_json::Value>>,
444}
445
446/// Nomad restart policy
447#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(rename_all = "PascalCase")]
449pub struct NomadRestartPolicy {
450    pub attempts: i32,
451    pub delay: i64,
452    pub mode: String,
453    pub interval: i64,
454}
455
456/// Job submit request
457#[derive(Debug, Serialize)]
458#[serde(rename_all = "PascalCase")]
459struct JobSubmitRequest {
460    job: NomadJob,
461    enforce_index: bool,
462    job_modify_index: Option<u64>,
463    policy_override: bool,
464}
465
466/// Job submit response
467#[derive(Debug, Clone, Deserialize)]
468#[serde(rename_all = "PascalCase")]
469pub struct JobSubmitResponse {
470    #[serde(rename = "EvalID")]
471    pub eval_id: String,
472    pub eval_create_index: Option<u64>,
473    pub job_modify_index: Option<u64>,
474    pub warnings: Option<String>,
475}
476
477/// Scale request
478#[derive(Debug, Serialize)]
479#[serde(rename_all = "PascalCase")]
480struct ScaleRequest {
481    count: Option<i64>,
482    target: HashMap<String, String>,
483    message: Option<String>,
484    policy_override: bool,
485    error: bool,
486    meta: Option<HashMap<String, String>>,
487}
488
489/// Scale response
490#[derive(Debug, Clone, Deserialize)]
491#[serde(rename_all = "PascalCase")]
492pub struct ScaleResponse {
493    #[serde(rename = "EvalID")]
494    pub eval_id: Option<String>,
495    pub eval_create_index: Option<u64>,
496}
497
498/// Allocation list stub
499#[derive(Debug, Clone, Serialize, Deserialize)]
500#[serde(rename_all = "PascalCase")]
501pub struct AllocationListStub {
502    #[serde(rename = "ID")]
503    pub id: String,
504    #[serde(rename = "JobID")]
505    pub job_id: String,
506    #[serde(rename = "NodeID")]
507    pub node_id: String,
508    pub task_group: String,
509    pub client_status: String,
510    pub desired_status: String,
511}
512
513/// Node list stub
514#[derive(Debug, Clone, Serialize, Deserialize)]
515#[serde(rename_all = "PascalCase")]
516pub struct NodeListStub {
517    #[serde(rename = "ID")]
518    pub id: String,
519    pub name: String,
520    pub status: String,
521    pub status_description: Option<String>,
522    pub datacenter: String,
523    pub node_class: Option<String>,
524    pub drain: bool,
525    pub schedulability_eligibility: Option<String>,
526}
527
528/// Node details
529#[derive(Debug, Clone, Serialize, Deserialize)]
530#[serde(rename_all = "PascalCase")]
531pub struct Node {
532    #[serde(rename = "ID")]
533    pub id: String,
534    pub name: String,
535    pub datacenter: String,
536    pub status: String,
537    pub drain: bool,
538    pub attributes: Option<HashMap<String, String>>,
539    pub resources: Option<NodeResources>,
540    pub reserved: Option<NodeResources>,
541}
542
543/// Node resources
544#[derive(Debug, Clone, Serialize, Deserialize)]
545#[serde(rename_all = "PascalCase")]
546pub struct NodeResources {
547    #[serde(rename = "CPU")]
548    pub cpu: Option<i32>,
549    #[serde(rename = "MemoryMB")]
550    pub memory_mb: Option<i32>,
551    #[serde(rename = "DiskMB")]
552    pub disk_mb: Option<i32>,
553}
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    #[test]
560    fn test_nomad_job_conversion() {
561        let job = Job::new("test-job")
562            .job_type(JobType::Service)
563            .with_group(
564                "api",
565                Task::new("server")
566                    .driver(Driver::Exec)
567                    .command("/bin/server")
568                    .resources(500, 256),
569            );
570
571        let nomad_job = NomadJob::from_forge_job(&job);
572
573        assert_eq!(nomad_job.name, "test-job");
574        assert_eq!(nomad_job.job_type, "service");
575        assert_eq!(nomad_job.task_groups.len(), 1);
576        assert_eq!(nomad_job.task_groups[0].name, "api");
577        assert_eq!(nomad_job.task_groups[0].tasks[0].driver, "exec");
578    }
579}