1use std::{
2 collections::{HashMap, HashSet},
3 env,
4 fs::{canonicalize, create_dir_all, File},
5 hash::{DefaultHasher, Hash, Hasher},
6 path::{Path, PathBuf},
7 sync::{Arc, Mutex},
8};
9
10use chrono::{DateTime, Utc};
11use tokio::fs::remove_dir_all;
12
13use crate::{
14 command::{
15 cargo::{BinaryPackage, Cargo},
16 pueue::Pueue,
17 },
18 common::{Binary, Process, ProcessState, Stack, JOCKER, JOCKER_ENV_STACK, MAX_RECURSION_LEVEL},
19 config::{ConfigFile, ConfigStack},
20 database::Database,
21 error::{lock_error, Error, InnerError, Result},
22};
23
24#[derive(Debug, PartialEq)]
25pub struct StateArgs {
26 pub refresh: bool,
27 pub stack: Option<String>,
28}
29
30pub struct State {
31 project_dir: String,
32 target_dir: PathBuf,
33 db: Database,
34 current_stack: Arc<Mutex<Option<String>>>,
35 scheduler: Pueue,
36}
37
38impl State {
39 pub async fn new(
40 refresh: bool,
41 stack: Option<String>,
42 target_dir: Option<impl Into<PathBuf>>,
43 ) -> Result<Self> {
44 let target_dir = target_dir.map(Into::into).unwrap_or(canonicalize(".")?);
45 let (project_id, project_dir) = Self::get_or_create_state_dir(&target_dir)?;
46 let db = Database::new(&project_dir).await?;
47 let scheduler = Pueue::new(&project_id).await?;
48 let state = Self {
49 project_dir,
50 target_dir,
51 db,
52 current_stack: Arc::new(Mutex::new(None)),
53 scheduler,
54 };
55 state.refresh(refresh).await?;
56 state.set_current_stack(&stack).await?;
57 Ok(state)
58 }
59
60 pub(crate) fn scheduler(&self) -> &Pueue {
61 &self.scheduler
62 }
63
64 pub fn scheduler_group(&self) -> &str {
65 self.scheduler.group()
66 }
67
68 pub async fn clean(self) -> Result<()> {
69 remove_dir_all(self.project_dir).await?;
70 self.scheduler.clean().await?;
71 Ok(())
72 }
73
74 pub async fn get_elapsed_since_last_binaries_update(&self) -> Result<u64> {
75 let date = if let Some(date) = self.db.get_binaries_updated_at().await? {
76 date
77 } else {
78 DateTime::UNIX_EPOCH
79 };
80 Ok(Utc::now()
81 .signed_duration_since(date)
82 .num_seconds()
83 .clamp(0, i64::MAX)
84 .try_into()?)
85 }
86
87 pub async fn get_elapsed_since_last_config_update(&self) -> Result<u64> {
88 let date = if let Some(date) = self.db.get_config_updated_at().await? {
89 date
90 } else {
91 DateTime::UNIX_EPOCH
92 };
93 Ok(Utc::now()
94 .signed_duration_since(date)
95 .num_seconds()
96 .clamp(0, i64::MAX)
97 .try_into()?)
98 }
99
100 pub async fn set_binaries_updated_at(&self, date: DateTime<Utc>) -> Result<()> {
101 self.db.set_binaries_updated_at(date).await
102 }
103
104 pub async fn set_config_updated_at(&self, date: DateTime<Utc>) -> Result<()> {
105 self.db.set_config_updated_at(date).await
106 }
107
108 pub fn get_target_dir(&self) -> &Path {
109 &self.target_dir
110 }
111
112 pub async fn get_binaries(&self) -> Result<Vec<Binary>> {
113 self.db.get_binaries().await
114 }
115
116 pub async fn set_binaries(&self, binaries: &[Binary]) -> Result<()> {
117 self.db.set_binaries(binaries).await
118 }
119
120 pub async fn filter_processes(&self, process_names: &[String]) -> Result<Vec<Process>> {
124 let current_stack = self.get_current_stack()?;
125 let expected_processes: Vec<String> = if !process_names.is_empty() {
126 process_names.to_owned()
127 } else if let Some(stack) = current_stack {
128 self.get_stack(&stack)
129 .await?
130 .get_all_processes()
131 .into_iter()
132 .cloned()
133 .collect()
134 } else {
135 Vec::with_capacity(0)
136 };
137 if expected_processes.is_empty() {
138 return self.get_processes().await;
139 }
140 let processes: Vec<Process> = self
141 .get_processes()
142 .await?
143 .into_iter()
144 .filter(|process| expected_processes.contains(&process.name))
145 .collect();
146 if expected_processes.len() != processes.len() {
147 let mut process_names: HashSet<String> = process_names.iter().cloned().collect();
148 for process in processes {
149 process_names.remove(&process.name);
150 }
151 return Err(Error::new(InnerError::ProcessNotFound(
152 process_names.into_iter().collect(),
153 )));
154 }
155 Ok(processes)
156 }
157
158 pub async fn get_processes(&self) -> Result<Vec<Process>> {
159 self.db.get_processes().await
160 }
161
162 pub async fn set_processes(&self, processes: Vec<Process>) -> Result<()> {
163 self.db.set_processes(&processes).await
164 }
165
166 pub async fn set_state(&self, process_name: &str, state: ProcessState) -> Result<()> {
167 self.db.set_process_state(process_name, state).await
168 }
169
170 pub async fn set_pid(&self, process_name: &str, pid: Option<usize>) -> Result<()> {
171 self.db.set_process_pid(process_name, pid).await
172 }
173
174 pub fn get_current_stack(&self) -> Result<Option<String>> {
175 Ok(self.current_stack.lock().map_err(lock_error)?.clone())
176 }
177
178 pub async fn set_current_stack(&self, stack: &Option<String>) -> Result<()> {
179 if let Some(stack) = stack {
180 *self.current_stack.lock().map_err(lock_error)? =
181 Some(self.get_stack(stack).await?.name);
182 } else if let Ok(stack) = env::var(JOCKER_ENV_STACK) {
183 *self.current_stack.lock().map_err(lock_error)? =
184 Some(self.get_stack(&stack).await?.name);
185 } else {
186 *self.current_stack.lock().map_err(lock_error)? = self.get_default_stack().await?;
187 };
188
189 Ok(())
190 }
191
192 pub async fn get_default_stack(&self) -> Result<Option<String>> {
193 self.db.get_default_stack().await
194 }
195
196 pub async fn set_default_stack(&self, stack: &Option<String>) -> Result<()> {
197 self.db.set_default_stack(stack).await
198 }
199
200 pub async fn get_stack(&self, stack: &str) -> Result<Stack> {
201 self.db.get_stack(stack).await
202 }
203
204 pub async fn set_stacks(&self, stacks: &[Stack]) -> Result<()> {
205 self.db.set_stacks(stacks).await
206 }
207
208 pub async fn refresh(&self, hard: bool) -> Result<()> {
211 let mut scheduled_process = self.scheduler().processes().await?;
212 for process in self.get_processes().await? {
213 if let Some(sp) = scheduled_process.remove(process.name()) {
214 self.set_pid(process.name(), Some(sp.0)).await?;
215 self.set_state(process.name(), sp.1.into()).await?;
216 } else {
217 self.set_pid(process.name(), None).await?;
218 self.set_state(process.name(), ProcessState::Stopped)
219 .await?;
220 }
221 }
222
223 if hard || self.needs_to_refresh_binaries().await? {
224 self.refresh_binaries(hard).await?;
225 self.set_binaries_updated_at(Utc::now()).await?;
226 }
227 if hard || self.needs_to_refresh_config().await? {
228 self.refresh_processes().await?;
229 self.refresh_stacks().await?;
230 self.set_config_updated_at(Utc::now()).await?;
231 }
232
233 Ok(())
234 }
235
236 async fn needs_to_refresh_binaries(&self) -> Result<bool> {
237 let elapsed_since_last_update = self.get_elapsed_since_last_binaries_update().await?;
238 let files = ["./Cargo.toml", "./Cargo.lock"];
239 for file in files {
240 if Path::new(file).exists()
241 && File::open(file)?
242 .metadata()?
243 .modified()?
244 .elapsed()?
245 .as_secs()
246 < elapsed_since_last_update
247 {
248 return Ok(true);
249 }
250 }
251 Ok(false)
252 }
253
254 async fn needs_to_refresh_config(&self) -> Result<bool> {
255 let elapsed_since_last_update = self.get_elapsed_since_last_config_update().await?;
256 let files = ["./jocker.yml", "./jocker.override.yml"];
257 for file in files {
258 if Path::new(file).exists()
259 && File::open(file)?
260 .metadata()?
261 .modified()?
262 .elapsed()?
263 .as_secs()
264 < elapsed_since_last_update
265 {
266 return Ok(true);
267 }
268 }
269 Ok(false)
270 }
271
272 async fn fetch_bins(target_dir: &Path) -> Result<Vec<BinaryPackage>> {
273 Ok(Cargo::metadata(target_dir)
274 .await?
275 .into_iter()
276 .map(Into::into)
277 .collect())
278 }
279
280 async fn refresh_binaries(&self, hard: bool) -> Result<()> {
281 if !hard {
282 return Ok(());
283 }
284 let binaries: Vec<Binary> = Self::fetch_bins(self.get_target_dir())
285 .await?
286 .into_iter()
287 .map(Into::into)
288 .collect();
289 self.set_binaries(&binaries).await?;
290 Ok(())
291 }
292
293 async fn refresh_processes(&self) -> Result<()> {
294 let previous_processes: HashMap<String, Process> = self
295 .get_processes()
296 .await?
297 .into_iter()
298 .map(|p| (p.name().to_string(), p))
299 .collect();
300 let processes: Vec<Process> =
301 if let Some(jocker_config) = ConfigFile::load(self.get_target_dir())? {
302 let mut processes = vec![];
303 let process_defaults = jocker_config.default.and_then(|d| d.process);
304 for config_process in jocker_config.processes {
305 let mut process: Process = config_process.into();
306
307 if let Some(ref process_defaults) = process_defaults {
308 process
309 .cargo_args
310 .append(&mut process_defaults.cargo_args.clone());
311 }
312 processes.push(process);
313 }
314 processes
315 } else {
316 self.get_binaries()
317 .await?
318 .into_iter()
319 .map(|b| Process::new(b.name(), b.name()))
320 .collect()
321 };
322 let processes: Vec<Process> = processes
323 .into_iter()
324 .map(|mut p| {
325 if let Some(previous_process) = previous_processes.get(p.name()) {
326 p.pid = previous_process.pid;
327 p.state = previous_process.state.clone();
328 };
329 p
330 })
331 .collect();
332 self.set_processes(processes).await?;
333
334 Ok(())
335 }
336
337 async fn refresh_stacks(&self) -> Result<()> {
338 let mut default_stack = None;
339 let stacks = if let Some(jocker_config) = ConfigFile::load(self.get_target_dir())? {
340 if let Some(config_default_stack) = jocker_config.default.and_then(|d| d.stack) {
341 default_stack = Some(config_default_stack);
342 }
343 let mut stacks: HashMap<String, Stack> = HashMap::new();
344 let config_stacks = jocker_config.stacks.clone();
345
346 for (stack_name, config_stack) in jocker_config.stacks {
347 stacks.insert(
348 stack_name.clone(),
349 Stack {
350 name: stack_name.clone(),
351 processes: config_stack.processes,
352 inherited_processes: Default::default(),
353 },
354 );
355 let inherited_processes = Self::recurse_inherited_processes(
356 0,
357 &config_stack.inherits,
358 &config_stacks,
359 &mut HashSet::new(),
360 HashSet::new(),
361 )?;
362 stacks
363 .get_mut(&stack_name)
364 .ok_or_else(|| Error::new(InnerError::StackNotFound(stack_name.to_owned())))
365 .map(|stack| stack.inherited_processes = inherited_processes)?;
366 }
367 stacks
368 } else {
369 HashMap::new()
370 };
371 if let Some(default_stack) = default_stack.as_ref() {
372 if !stacks.contains_key(default_stack) {
373 return Err(Error::new(InnerError::StackNotFound(
374 default_stack.to_owned(),
375 )));
376 }
377 }
378 self.set_stacks(stacks.values().cloned().collect::<Vec<Stack>>().as_slice())
379 .await?;
380 self.set_default_stack(&default_stack).await?;
381
382 Ok(())
383 }
384
385 fn recurse_inherited_processes(
386 recursion_level: u8,
387 stack_names: &HashSet<String>,
388 stacks: &HashMap<String, ConfigStack>,
389 browsed_stacks: &mut HashSet<String>,
390 mut inherited_processes: HashSet<String>,
391 ) -> Result<HashSet<String>> {
392 if recursion_level > MAX_RECURSION_LEVEL {
393 return Err(Error::new(InnerError::RecursionDeepnessTooHigh));
394 }
395 for stack_name in stack_names {
396 if !browsed_stacks.insert(stack_name.to_owned()) {
397 return Err(Error::new(InnerError::RecursionLoop));
398 }
399 let stack = stacks
400 .get(stack_name)
401 .ok_or_else(|| Error::new(InnerError::StackNotFound(stack_name.to_owned())))?;
402 inherited_processes.extend(stack.processes.clone().into_iter());
403 inherited_processes = Self::recurse_inherited_processes(
404 recursion_level + 1,
405 &stack.inherits,
406 stacks,
407 browsed_stacks,
408 inherited_processes,
409 )?;
410 }
411 Ok(inherited_processes)
412 }
413
414 fn get_project_id(target_dir: &PathBuf) -> String {
415 let mut hasher = DefaultHasher::new();
416 target_dir.hash(&mut hasher);
417 format!("{:x}", hasher.finish())
418 }
419
420 fn get_or_create_state_dir(target_dir: &PathBuf) -> Result<(String, String)> {
421 let (project_id, project_dir) = Self::get_or_create_project_dir(target_dir)?;
422
423 Ok((project_id, project_dir.clone()))
424 }
425
426 fn get_or_create_project_dir(target_dir: &PathBuf) -> Result<(String, String)> {
427 let project_id = Self::get_project_id(target_dir);
428
429 let home =
430 env::var("HOME").map_err(|e| Error::with_context(InnerError::Env(e.to_string()))(e))?;
431 let state_dir =
432 env::var("XDG_STATE_HOME").unwrap_or_else(|_| format!("{home}/.local/state"));
433
434 let project_dir = format!("{state_dir}/{JOCKER}/{project_id}");
435 let project_dir_path = Path::new(&project_dir);
436 if !project_dir_path.exists() {
437 create_dir_all(project_dir_path)
438 .map_err(Error::with_context(InnerError::Filesystem))?;
439 }
440 Ok((project_id, project_dir))
441 }
442}