arpx/runtime/profile/
runtime_builder.rs

1use crate::runtime::{
2    job::{
3        task::{action::BUILTIN_ACTIONS, log_monitor::LogMonitor, process::Process, Task},
4        Job,
5    },
6    profile::{deserialize, Profile},
7    Runtime,
8};
9use anyhow::{ensure, Context, Error, Result};
10use log::debug;
11use std::{collections::HashMap, env::var, path::Path};
12
13pub struct RuntimeBuilder;
14
15impl RuntimeBuilder {
16    pub fn from_profile_and_job_names(profile: Profile, job_names: &[String]) -> Result<Runtime> {
17        debug!("Building runtime object from profile data");
18
19        debug!("Building log_monitor_map");
20
21        let log_monitor_map = Self::build_log_monitor_map(profile.log_monitors);
22
23        ensure!(
24            log_monitor_map.len()
25                <= var("ARPX_LOG_MONITORS_MAX")
26                    .unwrap_or_else(|_| "200".to_owned())
27                    .parse::<usize>()
28                    .unwrap_or(200),
29            "Too many log_monitors defined in profile"
30        );
31
32        debug!("Building process_map");
33
34        let process_map = Self::build_process_map(profile.processes)?;
35
36        ensure!(
37            process_map.len()
38                <= var("ARPX_PROCESSES_MAX")
39                    .unwrap_or_else(|_| "200".to_owned())
40                    .parse::<usize>()
41                    .unwrap_or(200),
42            "Too many processes defined in profile"
43        );
44
45        ensure!(
46            !process_map.is_empty(),
47            "No valid processes exist in profile"
48        );
49
50        debug!("Building jobs object");
51
52        ensure!(!job_names.is_empty(), "No jobs requested for runtime");
53
54        let jobs = job_names
55            .iter()
56            .map(|job_name| {
57                let job = profile.jobs
58                    .get(&job_name[..])
59                    .context(format!("Requested job \"{}\" not defined in jobs", job_name))?;
60
61                return Ok(Job::new(
62                    job_name.into(),
63                    job.tasks
64                        .iter()
65                        .enumerate()
66                        .map(|(i, task)| {
67                            let task_index = i + 1;
68
69                            ensure!(
70                                task.processes.len()
71                                    <= var("ARPX_CONCURRENT_PROCESSES_MAX")
72                                        .unwrap_or_else(|_| "500".to_owned())
73                                        .parse::<usize>()
74                                        .unwrap_or(500),
75                                "Job \"{}\", task {}: too many processes",
76                                job_name,
77                                task_index
78                            );
79
80                            return Ok(Task::new(
81                                task.processes
82                                    .iter()
83                                    .map(|process| {
84                                        let default_process =
85                                            process_map.get(&process.name[..]).context(format!(
86                                                "Job \"{}\", task {}: process \"{}\" not defined in processes",
87                                                job_name,
88                                                task_index,
89                                                process.name
90                                            ))?;
91
92                                        ensure!(
93                                            task.processes.len() + process.log_monitors.len()
94                                                <= var("ARPX_THREAD_MAX")
95                                                    .unwrap_or_else(|_| "500".to_owned())
96                                                    .parse::<usize>()
97                                                    .unwrap_or(500),
98                                            "Job \"{}\", task {}: too many threads (reduce processes or log_monitors on task)",
99                                            job_name,
100                                            task_index
101                                        );
102
103                                        for log_monitor in &process.log_monitors {
104                                            ensure!(
105                                                log_monitor_map.contains_key(log_monitor),
106                                                "Job \"{}\", task {}: log monitor \"{}\" not defined in log_monitors",
107                                                job_name,
108                                                task_index,
109                                                log_monitor
110                                            );
111                                        }
112
113                                        return Ok(Process::new(default_process.name.clone())
114                                            .command(default_process.command.clone())
115                                            .cwd(default_process.cwd.clone())
116                                            .log_monitors(process.log_monitors.clone())
117                                            .onfail(match &process.onfail {
118                                                Some(onfail) => {
119                                                    ensure!(
120                                                        process_map.contains_key(onfail)
121                                                            || BUILTIN_ACTIONS.contains(&&onfail[..]),
122                                                        "Job \"{}\", task {}: invalid onfail \"{}\" provided",
123                                                        job_name,
124                                                        task_index,
125                                                        onfail
126                                                    );
127
128                                                    Some(onfail.into())
129                                                }
130                                                None => default_process.onfail.clone(),
131                                            })
132                                            .onsucceed(match &process.onsucceed {
133                                                Some(onsucceed) => {
134                                                    ensure!(
135                                                        process_map.contains_key(onsucceed)
136                                                            || BUILTIN_ACTIONS.contains(&&onsucceed[..]),
137                                                        "Job \"{}\", task {}: invalid onsucceed \"{}\" provided",
138                                                        job_name,
139                                                        task_index,
140                                                        onsucceed
141                                                    );
142
143                                                    Some(onsucceed.into())
144                                                }
145                                                None => default_process.onsucceed.clone(),
146                                            }))
147                                    })
148                                    .collect::<Result<Vec<Process>, Error>>()?,
149                            ))
150                        })
151                        .collect::<Result<Vec<Task>, Error>>()?,
152                ))
153            })
154            .collect::<Result<Vec<Job>, Error>>()?;
155
156        debug!("Building runtime object");
157
158        let runtime = Runtime::new()
159            .jobs(jobs)
160            .log_monitor_map(log_monitor_map)
161            .process_map(process_map);
162
163        debug!("Runtime object built");
164
165        Ok(runtime)
166    }
167
168    pub fn build_log_monitor_map(
169        log_monitors: HashMap<String, deserialize::log_monitors::LogMonitor>,
170    ) -> HashMap<String, LogMonitor> {
171        log_monitors
172            .into_iter()
173            .map(|(name, v)| {
174                let log_monitor = LogMonitor::new(name.clone())
175                    .buffer_size(v.buffer_size)
176                    .ontrigger(v.ontrigger)
177                    .test(v.test);
178
179                (name, log_monitor)
180            })
181            .collect()
182    }
183
184    pub fn build_process_map(
185        processes: HashMap<String, deserialize::processes::Process>,
186    ) -> Result<HashMap<String, Process>> {
187        processes
188            .into_iter()
189            .map(|(name, v)| {
190                ensure!(
191                    Path::new(&v.cwd).is_dir(),
192                    "Configured current working directory \"{}\" on process \"{}\" is not a valid directory",
193                    v.cwd,
194                    name
195                );
196
197                let process = Process::new(name.clone())
198                    .command(v.command)
199                    .cwd(v.cwd)
200                    .log_monitors(v.log_monitors)
201                    .onfail(match &v.onfail[..] {
202                        "" => None,
203                        _ => Some(v.onfail),
204                    })
205                    .onsucceed(match &v.onsucceed[..] {
206                        "" => None,
207                        _ => Some(v.onsucceed),
208                    });
209
210                Ok((name, process))
211            })
212            .collect::<Result<HashMap<String, Process>, Error>>()
213    }
214}