arpx/runtime/profile/
runtime_builder.rs1use crate::runtime::{
2 job::{
3 task::{action::BUILTIN_ACTIONS, log_monitor::LogMonitor, process::Process, Task},
4 Job,
5 },
6 profile::{deserialize, Profile},
7 Runtime,
8};
9use anyhow::{ensure, Context, Error, Result};
10use log::debug;
11use std::{collections::HashMap, env::var, path::Path};
12
13pub struct RuntimeBuilder;
14
15impl RuntimeBuilder {
16 pub fn from_profile_and_job_names(profile: Profile, job_names: &[String]) -> Result<Runtime> {
17 debug!("Building runtime object from profile data");
18
19 debug!("Building log_monitor_map");
20
21 let log_monitor_map = Self::build_log_monitor_map(profile.log_monitors);
22
23 ensure!(
24 log_monitor_map.len()
25 <= var("ARPX_LOG_MONITORS_MAX")
26 .unwrap_or_else(|_| "200".to_owned())
27 .parse::<usize>()
28 .unwrap_or(200),
29 "Too many log_monitors defined in profile"
30 );
31
32 debug!("Building process_map");
33
34 let process_map = Self::build_process_map(profile.processes)?;
35
36 ensure!(
37 process_map.len()
38 <= var("ARPX_PROCESSES_MAX")
39 .unwrap_or_else(|_| "200".to_owned())
40 .parse::<usize>()
41 .unwrap_or(200),
42 "Too many processes defined in profile"
43 );
44
45 ensure!(
46 !process_map.is_empty(),
47 "No valid processes exist in profile"
48 );
49
50 debug!("Building jobs object");
51
52 ensure!(!job_names.is_empty(), "No jobs requested for runtime");
53
54 let jobs = job_names
55 .iter()
56 .map(|job_name| {
57 let job = profile.jobs
58 .get(&job_name[..])
59 .context(format!("Requested job \"{}\" not defined in jobs", job_name))?;
60
61 return Ok(Job::new(
62 job_name.into(),
63 job.tasks
64 .iter()
65 .enumerate()
66 .map(|(i, task)| {
67 let task_index = i + 1;
68
69 ensure!(
70 task.processes.len()
71 <= var("ARPX_CONCURRENT_PROCESSES_MAX")
72 .unwrap_or_else(|_| "500".to_owned())
73 .parse::<usize>()
74 .unwrap_or(500),
75 "Job \"{}\", task {}: too many processes",
76 job_name,
77 task_index
78 );
79
80 return Ok(Task::new(
81 task.processes
82 .iter()
83 .map(|process| {
84 let default_process =
85 process_map.get(&process.name[..]).context(format!(
86 "Job \"{}\", task {}: process \"{}\" not defined in processes",
87 job_name,
88 task_index,
89 process.name
90 ))?;
91
92 ensure!(
93 task.processes.len() + process.log_monitors.len()
94 <= var("ARPX_THREAD_MAX")
95 .unwrap_or_else(|_| "500".to_owned())
96 .parse::<usize>()
97 .unwrap_or(500),
98 "Job \"{}\", task {}: too many threads (reduce processes or log_monitors on task)",
99 job_name,
100 task_index
101 );
102
103 for log_monitor in &process.log_monitors {
104 ensure!(
105 log_monitor_map.contains_key(log_monitor),
106 "Job \"{}\", task {}: log monitor \"{}\" not defined in log_monitors",
107 job_name,
108 task_index,
109 log_monitor
110 );
111 }
112
113 return Ok(Process::new(default_process.name.clone())
114 .command(default_process.command.clone())
115 .cwd(default_process.cwd.clone())
116 .log_monitors(process.log_monitors.clone())
117 .onfail(match &process.onfail {
118 Some(onfail) => {
119 ensure!(
120 process_map.contains_key(onfail)
121 || BUILTIN_ACTIONS.contains(&&onfail[..]),
122 "Job \"{}\", task {}: invalid onfail \"{}\" provided",
123 job_name,
124 task_index,
125 onfail
126 );
127
128 Some(onfail.into())
129 }
130 None => default_process.onfail.clone(),
131 })
132 .onsucceed(match &process.onsucceed {
133 Some(onsucceed) => {
134 ensure!(
135 process_map.contains_key(onsucceed)
136 || BUILTIN_ACTIONS.contains(&&onsucceed[..]),
137 "Job \"{}\", task {}: invalid onsucceed \"{}\" provided",
138 job_name,
139 task_index,
140 onsucceed
141 );
142
143 Some(onsucceed.into())
144 }
145 None => default_process.onsucceed.clone(),
146 }))
147 })
148 .collect::<Result<Vec<Process>, Error>>()?,
149 ))
150 })
151 .collect::<Result<Vec<Task>, Error>>()?,
152 ))
153 })
154 .collect::<Result<Vec<Job>, Error>>()?;
155
156 debug!("Building runtime object");
157
158 let runtime = Runtime::new()
159 .jobs(jobs)
160 .log_monitor_map(log_monitor_map)
161 .process_map(process_map);
162
163 debug!("Runtime object built");
164
165 Ok(runtime)
166 }
167
168 pub fn build_log_monitor_map(
169 log_monitors: HashMap<String, deserialize::log_monitors::LogMonitor>,
170 ) -> HashMap<String, LogMonitor> {
171 log_monitors
172 .into_iter()
173 .map(|(name, v)| {
174 let log_monitor = LogMonitor::new(name.clone())
175 .buffer_size(v.buffer_size)
176 .ontrigger(v.ontrigger)
177 .test(v.test);
178
179 (name, log_monitor)
180 })
181 .collect()
182 }
183
184 pub fn build_process_map(
185 processes: HashMap<String, deserialize::processes::Process>,
186 ) -> Result<HashMap<String, Process>> {
187 processes
188 .into_iter()
189 .map(|(name, v)| {
190 ensure!(
191 Path::new(&v.cwd).is_dir(),
192 "Configured current working directory \"{}\" on process \"{}\" is not a valid directory",
193 v.cwd,
194 name
195 );
196
197 let process = Process::new(name.clone())
198 .command(v.command)
199 .cwd(v.cwd)
200 .log_monitors(v.log_monitors)
201 .onfail(match &v.onfail[..] {
202 "" => None,
203 _ => Some(v.onfail),
204 })
205 .onsucceed(match &v.onsucceed[..] {
206 "" => None,
207 _ => Some(v.onsucceed),
208 });
209
210 Ok((name, process))
211 })
212 .collect::<Result<HashMap<String, Process>, Error>>()
213 }
214}