jocker_lib/
state.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4    fs::{canonicalize, create_dir_all, File},
5    hash::{DefaultHasher, Hash, Hasher},
6    path::{Path, PathBuf},
7    sync::{Arc, Mutex},
8};
9
10use chrono::{DateTime, Utc};
11use tokio::fs::remove_dir_all;
12
13use crate::{
14    command::{
15        cargo::{BinaryPackage, Cargo},
16        pueue::Pueue,
17    },
18    common::{Binary, Process, ProcessState, Stack, JOCKER, JOCKER_ENV_STACK, MAX_RECURSION_LEVEL},
19    config::{ConfigFile, ConfigStack},
20    database::Database,
21    error::{lock_error, Error, InnerError, Result},
22};
23
24#[derive(Debug, PartialEq)]
25pub struct StateArgs {
26    pub refresh: bool,
27    pub stack: Option<String>,
28}
29
30pub struct State {
31    project_dir: String,
32    target_dir: PathBuf,
33    db: Database,
34    current_stack: Arc<Mutex<Option<String>>>,
35    scheduler: Pueue,
36}
37
38impl State {
39    pub async fn new(
40        refresh: bool,
41        stack: Option<String>,
42        target_dir: Option<impl Into<PathBuf>>,
43    ) -> Result<Self> {
44        let target_dir = target_dir.map(Into::into).unwrap_or(canonicalize(".")?);
45        let (project_id, project_dir) = Self::get_or_create_state_dir(&target_dir)?;
46        let db = Database::new(&project_dir).await?;
47        let scheduler = Pueue::new(&project_id).await?;
48        let state = Self {
49            project_dir,
50            target_dir,
51            db,
52            current_stack: Arc::new(Mutex::new(None)),
53            scheduler,
54        };
55        state.refresh(refresh).await?;
56        state.set_current_stack(&stack).await?;
57        Ok(state)
58    }
59
60    pub(crate) fn scheduler(&self) -> &Pueue {
61        &self.scheduler
62    }
63
64    pub fn scheduler_group(&self) -> &str {
65        self.scheduler.group()
66    }
67
68    pub async fn clean(self) -> Result<()> {
69        remove_dir_all(self.project_dir).await?;
70        self.scheduler.clean().await?;
71        Ok(())
72    }
73
74    pub async fn get_elapsed_since_last_binaries_update(&self) -> Result<u64> {
75        let date = if let Some(date) = self.db.get_binaries_updated_at().await? {
76            date
77        } else {
78            DateTime::UNIX_EPOCH
79        };
80        Ok(Utc::now()
81            .signed_duration_since(date)
82            .num_seconds()
83            .clamp(0, i64::MAX)
84            .try_into()?)
85    }
86
87    pub async fn get_elapsed_since_last_config_update(&self) -> Result<u64> {
88        let date = if let Some(date) = self.db.get_config_updated_at().await? {
89            date
90        } else {
91            DateTime::UNIX_EPOCH
92        };
93        Ok(Utc::now()
94            .signed_duration_since(date)
95            .num_seconds()
96            .clamp(0, i64::MAX)
97            .try_into()?)
98    }
99
100    pub async fn set_binaries_updated_at(&self, date: DateTime<Utc>) -> Result<()> {
101        self.db.set_binaries_updated_at(date).await
102    }
103
104    pub async fn set_config_updated_at(&self, date: DateTime<Utc>) -> Result<()> {
105        self.db.set_config_updated_at(date).await
106    }
107
108    pub fn get_target_dir(&self) -> &Path {
109        &self.target_dir
110    }
111
112    pub async fn get_binaries(&self) -> Result<Vec<Binary>> {
113        self.db.get_binaries().await
114    }
115
116    pub async fn set_binaries(&self, binaries: &[Binary]) -> Result<()> {
117        self.db.set_binaries(binaries).await
118    }
119
120    /// Filter processes list based on given process names
121    ///
122    /// If [`process_names`] is empty, returns all processes
123    pub async fn filter_processes(&self, process_names: &[String]) -> Result<Vec<Process>> {
124        let current_stack = self.get_current_stack()?;
125        let expected_processes: Vec<String> = if !process_names.is_empty() {
126            process_names.to_owned()
127        } else if let Some(stack) = current_stack {
128            self.get_stack(&stack)
129                .await?
130                .get_all_processes()
131                .into_iter()
132                .cloned()
133                .collect()
134        } else {
135            Vec::with_capacity(0)
136        };
137        if expected_processes.is_empty() {
138            return self.get_processes().await;
139        }
140        let processes: Vec<Process> = self
141            .get_processes()
142            .await?
143            .into_iter()
144            .filter(|process| expected_processes.contains(&process.name))
145            .collect();
146        if expected_processes.len() != processes.len() {
147            let mut process_names: HashSet<String> = process_names.iter().cloned().collect();
148            for process in processes {
149                process_names.remove(&process.name);
150            }
151            return Err(Error::new(InnerError::ProcessNotFound(
152                process_names.into_iter().collect(),
153            )));
154        }
155        Ok(processes)
156    }
157
158    pub async fn get_processes(&self) -> Result<Vec<Process>> {
159        self.db.get_processes().await
160    }
161
162    pub async fn set_processes(&self, processes: Vec<Process>) -> Result<()> {
163        self.db.set_processes(&processes).await
164    }
165
166    pub async fn set_state(&self, process_name: &str, state: ProcessState) -> Result<()> {
167        self.db.set_process_state(process_name, state).await
168    }
169
170    pub async fn set_pid(&self, process_name: &str, pid: Option<usize>) -> Result<()> {
171        self.db.set_process_pid(process_name, pid).await
172    }
173
174    pub fn get_current_stack(&self) -> Result<Option<String>> {
175        Ok(self.current_stack.lock().map_err(lock_error)?.clone())
176    }
177
178    pub async fn set_current_stack(&self, stack: &Option<String>) -> Result<()> {
179        if let Some(stack) = stack {
180            *self.current_stack.lock().map_err(lock_error)? =
181                Some(self.get_stack(stack).await?.name);
182        } else if let Ok(stack) = env::var(JOCKER_ENV_STACK) {
183            *self.current_stack.lock().map_err(lock_error)? =
184                Some(self.get_stack(&stack).await?.name);
185        } else {
186            *self.current_stack.lock().map_err(lock_error)? = self.get_default_stack().await?;
187        };
188
189        Ok(())
190    }
191
192    pub async fn get_default_stack(&self) -> Result<Option<String>> {
193        self.db.get_default_stack().await
194    }
195
196    pub async fn set_default_stack(&self, stack: &Option<String>) -> Result<()> {
197        self.db.set_default_stack(stack).await
198    }
199
200    pub async fn get_stack(&self, stack: &str) -> Result<Stack> {
201        self.db.get_stack(stack).await
202    }
203
204    pub async fn set_stacks(&self, stacks: &[Stack]) -> Result<()> {
205        self.db.set_stacks(stacks).await
206    }
207
208    // Refresh
209
210    pub async fn refresh(&self, hard: bool) -> Result<()> {
211        let mut scheduled_process = self.scheduler().processes().await?;
212        for process in self.get_processes().await? {
213            if let Some(sp) = scheduled_process.remove(process.name()) {
214                self.set_pid(process.name(), Some(sp.0)).await?;
215                self.set_state(process.name(), sp.1.into()).await?;
216            } else {
217                self.set_pid(process.name(), None).await?;
218                self.set_state(process.name(), ProcessState::Stopped)
219                    .await?;
220            }
221        }
222
223        if hard || self.needs_to_refresh_binaries().await? {
224            self.refresh_binaries(hard).await?;
225            self.set_binaries_updated_at(Utc::now()).await?;
226        }
227        if hard || self.needs_to_refresh_config().await? {
228            self.refresh_processes().await?;
229            self.refresh_stacks().await?;
230            self.set_config_updated_at(Utc::now()).await?;
231        }
232
233        Ok(())
234    }
235
236    async fn needs_to_refresh_binaries(&self) -> Result<bool> {
237        let elapsed_since_last_update = self.get_elapsed_since_last_binaries_update().await?;
238        let files = ["./Cargo.toml", "./Cargo.lock"];
239        for file in files {
240            if Path::new(file).exists()
241                && File::open(file)?
242                    .metadata()?
243                    .modified()?
244                    .elapsed()?
245                    .as_secs()
246                    < elapsed_since_last_update
247            {
248                return Ok(true);
249            }
250        }
251        Ok(false)
252    }
253
254    async fn needs_to_refresh_config(&self) -> Result<bool> {
255        let elapsed_since_last_update = self.get_elapsed_since_last_config_update().await?;
256        let files = ["./jocker.yml", "./jocker.override.yml"];
257        for file in files {
258            if Path::new(file).exists()
259                && File::open(file)?
260                    .metadata()?
261                    .modified()?
262                    .elapsed()?
263                    .as_secs()
264                    < elapsed_since_last_update
265            {
266                return Ok(true);
267            }
268        }
269        Ok(false)
270    }
271
272    async fn fetch_bins(target_dir: &Path) -> Result<Vec<BinaryPackage>> {
273        Ok(Cargo::metadata(target_dir)
274            .await?
275            .into_iter()
276            .map(Into::into)
277            .collect())
278    }
279
280    async fn refresh_binaries(&self, hard: bool) -> Result<()> {
281        if !hard {
282            return Ok(());
283        }
284        let binaries: Vec<Binary> = Self::fetch_bins(self.get_target_dir())
285            .await?
286            .into_iter()
287            .map(Into::into)
288            .collect();
289        self.set_binaries(&binaries).await?;
290        Ok(())
291    }
292
293    async fn refresh_processes(&self) -> Result<()> {
294        let previous_processes: HashMap<String, Process> = self
295            .get_processes()
296            .await?
297            .into_iter()
298            .map(|p| (p.name().to_string(), p))
299            .collect();
300        let processes: Vec<Process> =
301            if let Some(jocker_config) = ConfigFile::load(self.get_target_dir())? {
302                let mut processes = vec![];
303                let process_defaults = jocker_config.default.and_then(|d| d.process);
304                for config_process in jocker_config.processes {
305                    let mut process: Process = config_process.into();
306
307                    if let Some(ref process_defaults) = process_defaults {
308                        process
309                            .cargo_args
310                            .append(&mut process_defaults.cargo_args.clone());
311                    }
312                    processes.push(process);
313                }
314                processes
315            } else {
316                self.get_binaries()
317                    .await?
318                    .into_iter()
319                    .map(|b| Process::new(b.name(), b.name()))
320                    .collect()
321            };
322        let processes: Vec<Process> = processes
323            .into_iter()
324            .map(|mut p| {
325                if let Some(previous_process) = previous_processes.get(p.name()) {
326                    p.pid = previous_process.pid;
327                    p.state = previous_process.state.clone();
328                };
329                p
330            })
331            .collect();
332        self.set_processes(processes).await?;
333
334        Ok(())
335    }
336
337    async fn refresh_stacks(&self) -> Result<()> {
338        let mut default_stack = None;
339        let stacks = if let Some(jocker_config) = ConfigFile::load(self.get_target_dir())? {
340            if let Some(config_default_stack) = jocker_config.default.and_then(|d| d.stack) {
341                default_stack = Some(config_default_stack);
342            }
343            let mut stacks: HashMap<String, Stack> = HashMap::new();
344            let config_stacks = jocker_config.stacks.clone();
345
346            for (stack_name, config_stack) in jocker_config.stacks {
347                stacks.insert(
348                    stack_name.clone(),
349                    Stack {
350                        name: stack_name.clone(),
351                        processes: config_stack.processes,
352                        inherited_processes: Default::default(),
353                    },
354                );
355                let inherited_processes = Self::recurse_inherited_processes(
356                    0,
357                    &config_stack.inherits,
358                    &config_stacks,
359                    &mut HashSet::new(),
360                    HashSet::new(),
361                )?;
362                stacks
363                    .get_mut(&stack_name)
364                    .ok_or_else(|| Error::new(InnerError::StackNotFound(stack_name.to_owned())))
365                    .map(|stack| stack.inherited_processes = inherited_processes)?;
366            }
367            stacks
368        } else {
369            HashMap::new()
370        };
371        if let Some(default_stack) = default_stack.as_ref() {
372            if !stacks.contains_key(default_stack) {
373                return Err(Error::new(InnerError::StackNotFound(
374                    default_stack.to_owned(),
375                )));
376            }
377        }
378        self.set_stacks(stacks.values().cloned().collect::<Vec<Stack>>().as_slice())
379            .await?;
380        self.set_default_stack(&default_stack).await?;
381
382        Ok(())
383    }
384
385    fn recurse_inherited_processes(
386        recursion_level: u8,
387        stack_names: &HashSet<String>,
388        stacks: &HashMap<String, ConfigStack>,
389        browsed_stacks: &mut HashSet<String>,
390        mut inherited_processes: HashSet<String>,
391    ) -> Result<HashSet<String>> {
392        if recursion_level > MAX_RECURSION_LEVEL {
393            return Err(Error::new(InnerError::RecursionDeepnessTooHigh));
394        }
395        for stack_name in stack_names {
396            if !browsed_stacks.insert(stack_name.to_owned()) {
397                return Err(Error::new(InnerError::RecursionLoop));
398            }
399            let stack = stacks
400                .get(stack_name)
401                .ok_or_else(|| Error::new(InnerError::StackNotFound(stack_name.to_owned())))?;
402            inherited_processes.extend(stack.processes.clone().into_iter());
403            inherited_processes = Self::recurse_inherited_processes(
404                recursion_level + 1,
405                &stack.inherits,
406                stacks,
407                browsed_stacks,
408                inherited_processes,
409            )?;
410        }
411        Ok(inherited_processes)
412    }
413
414    fn get_project_id(target_dir: &PathBuf) -> String {
415        let mut hasher = DefaultHasher::new();
416        target_dir.hash(&mut hasher);
417        format!("{:x}", hasher.finish())
418    }
419
420    fn get_or_create_state_dir(target_dir: &PathBuf) -> Result<(String, String)> {
421        let (project_id, project_dir) = Self::get_or_create_project_dir(target_dir)?;
422
423        Ok((project_id, project_dir.clone()))
424    }
425
426    fn get_or_create_project_dir(target_dir: &PathBuf) -> Result<(String, String)> {
427        let project_id = Self::get_project_id(target_dir);
428
429        let home =
430            env::var("HOME").map_err(|e| Error::with_context(InnerError::Env(e.to_string()))(e))?;
431        let state_dir =
432            env::var("XDG_STATE_HOME").unwrap_or_else(|_| format!("{home}/.local/state"));
433
434        let project_dir = format!("{state_dir}/{JOCKER}/{project_id}");
435        let project_dir_path = Path::new(&project_dir);
436        if !project_dir_path.exists() {
437            create_dir_all(project_dir_path)
438                .map_err(Error::with_context(InnerError::Filesystem))?;
439        }
440        Ok((project_id, project_dir))
441    }
442}