august_build/
runtime.rs

1use std::{
2    env,
3    ffi::{OsStr, OsString},
4    fs::canonicalize,
5    future::Future,
6    io,
7    path::Path,
8    process::Output,
9    sync::atomic::{AtomicU8, Ordering},
10    task::Poll,
11};
12
13use arc_swap::ArcSwap;
14use dircpy::copy_dir;
15use futures::{future::ready, stream::FuturesUnordered, StreamExt, TryStreamExt};
16use thiserror::Error;
17use tokio::task::block_in_place;
18
19use crate::{parser::Spanned, Command, EnvCommand, FsCommand, HashMap, IoCommand, Module, Unit};
20
21#[derive(Debug, Error)]
22pub enum RuntimeError {
23    #[error("A dependency of this unit failed")]
24    DependencyError(Spanned<String>),
25    #[error("Dependency {1} failed preventing completion of {0}")]
26    FailedDependency(String, Spanned<String>),
27    #[error("Failed to execute {0:?}: {1}")]
28    ExecutionFailure(Vec<Spanned<String>>, io::Error),
29    #[error("{0}")]
30    FsError(FsError),
31    #[error("{0}")]
32    JoinPathsError(env::JoinPathsError),
33    #[error("Command {0:?} isn't supported on this runtime")]
34    CommandUnsupported(Command),
35}
36
37pub struct Runtime {
38    module: Module,
39    notifier: Box<dyn Notifier + Sync>,
40    once: HashMap<Spanned<String>, AtomicU8>,
41    env_vars: ArcSwap<HashMap<OsString, OsString>>,
42}
43
44const UOS_INCOMPLETE: u8 = 0;
45const UOS_IN_PROGRESS: u8 = 1;
46const UOS_COMPLETE: u8 = 2;
47const UOS_FAILED: u8 = 3;
48
49impl Runtime {
50    pub fn new(module: Module, notifier: impl Notifier + Sync + 'static) -> Self {
51        let once = module
52            .units
53            .keys()
54            .map(|name| (name.clone(), AtomicU8::new(UOS_INCOMPLETE)))
55            .collect();
56
57        let env_vars = ArcSwap::from_pointee(env::vars_os().collect());
58
59        Self {
60            module,
61            notifier: Box::new(notifier),
62            once,
63            env_vars,
64        }
65    }
66
67    pub fn notifier(&self) -> &dyn Notifier {
68        &*self.notifier
69    }
70
71    fn get_unit(&self, name: impl Into<String>) -> (&Spanned<String>, &Unit) {
72        self.module
73            .units
74            .get_key_value(&Spanned::new(name.into()))
75            .unwrap()
76    }
77
78    fn get_uos(&self, name: impl Into<String>) -> &AtomicU8 {
79        self.once.get(&Spanned::new(name.into())).unwrap()
80    }
81
82    pub async fn run(&self, unit_name: &str) -> Result<(), RuntimeError> {
83        // Box::pin because recursive generators are hard
84        Box::pin(async {
85            let (unit_span, unit) = self.get_unit(unit_name);
86
87            self.notifier.start(unit_name);
88
89            if !unit.depends_on.is_empty() {
90                let futs = unit
91                    .depends_on
92                    .iter()
93                    .map(|dep| {
94                        Box::pin(async move {
95                            let uos_state = self.get_uos(dep.inner());
96                            let uos = uos_state.compare_exchange(
97                                UOS_INCOMPLETE,
98                                UOS_IN_PROGRESS,
99                                Ordering::Acquire,
100                                Ordering::Relaxed,
101                            );
102                            match uos {
103                                Ok(_) => {
104                                    self.notifier.dependency(unit_name, dep.inner());
105                                    match self.run(dep.inner()).await {
106                                        Err(e) => {
107                                            uos_state.store(UOS_FAILED, Ordering::Release);
108                                            Err(e)
109                                        }
110                                        Ok(o) => {
111                                            uos_state.store(UOS_COMPLETE, Ordering::Release);
112                                            Ok(o)
113                                        }
114                                    }
115                                }
116                                Err(UOS_FAILED) => Err(RuntimeError::FailedDependency(
117                                    unit_name.to_owned(),
118                                    dep.clone(),
119                                )),
120                                Err(UOS_IN_PROGRESS) => BlockOnDepFuture { uos: uos_state }
121                                    .await
122                                    .map_err(|f| f(unit_name.to_owned(), dep.clone())),
123                                _ => Ok(()),
124                            }
125                        })
126                    })
127                    .collect::<FuturesUnordered<_>>();
128
129                let errors = futs
130                    .into_stream()
131                    .filter_map(|res| ready(res.err()))
132                    .collect::<Vec<_>>()
133                    .await;
134                if !errors.is_empty() {
135                    self.notifier.error(&errors);
136                    return Err(RuntimeError::DependencyError(unit_span.clone()));
137                }
138            }
139            for cmd in &unit.commands {
140                cmd.call(self).await?;
141            }
142
143            self.notifier.complete(unit_name);
144
145            Ok(())
146        })
147        .await
148    }
149}
150
151impl Command {
152    pub async fn call(&self, rt: &Runtime) -> Result<(), RuntimeError> {
153        use Command::{Concurrent, DependsOn, Do, Env, Exec, Fs, Io, Meta};
154
155        rt.notifier.call(self);
156
157        match self {
158            // no op, shouldn't be in Vec<Command>
159            DependsOn(_) | Meta(_) => Ok(()),
160
161            Do(units) => {
162                for unit in units {
163                    rt.run(unit.inner()).await?;
164                }
165                Ok(())
166            }
167            Exec(cmd) => {
168                let args = cmd[1..].iter().map(Spanned::inner);
169                let handle = duct::cmd(cmd[0].inner(), args)
170                    .full_env(rt.env_vars.load().iter())
171                    .unchecked()
172                    .start()
173                    .map_err(|io| RuntimeError::ExecutionFailure(cmd.to_vec(), io))?;
174                let fut = HandleFuture { handle };
175                let exec = fut
176                    .await
177                    .map_err(|io| RuntimeError::ExecutionFailure(cmd.to_vec(), io))?;
178                if exec.status.success() {
179                    Ok(())
180                } else {
181                    Err(RuntimeError::ExecutionFailure(
182                        cmd.to_vec(),
183                        io::Error::new(
184                            io::ErrorKind::Other,
185                            format!("Process returned non-successfully with {}.", exec.status),
186                        ),
187                    ))
188                }
189            }
190            Concurrent(cmds) => {
191                let mut errors = cmds
192                    .iter()
193                    .map(|cmd| cmd.call(rt))
194                    .collect::<FuturesUnordered<_>>()
195                    .into_stream()
196                    .filter_map(|res| ready(res.err()))
197                    .collect::<Vec<_>>()
198                    .await;
199
200                // TODO: Make less jank
201                if errors.is_empty() {
202                    Ok(())
203                } else {
204                    Err(errors.pop().unwrap())
205                }
206            }
207
208            Fs(cmd) => cmd.call().await,
209            Io(cmd) => cmd.call(),
210            Env(cmd) => cmd.call(rt),
211        }
212    }
213}
214
215struct HandleFuture {
216    handle: duct::Handle,
217}
218
219impl Future for HandleFuture {
220    type Output = io::Result<Output>;
221
222    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
223        if let Some(output) = self.handle.try_wait()? {
224            Poll::Ready(Ok(output.clone()))
225        } else {
226            cx.waker().wake_by_ref();
227            Poll::Pending
228        }
229    }
230}
231
232impl FsCommand {
233    pub async fn call(&self) -> Result<(), RuntimeError> {
234        use tokio::fs;
235        use FsCommand::{
236            Copy, CopyTo, Create, CreateDir, EPrintFile, Move, MoveTo, PrintFile, Remove,
237        };
238
239        match self {
240            Create(p) => fs::File::create(p.inner())
241                .await
242                .map(|_| ())
243                .map_err(|io| FsError::CreateFileError(p.clone(), io)),
244            CreateDir(p) => fs::create_dir_all(p.inner())
245                .await
246                .map_err(|io| FsError::CreateDirError(p.clone(), io)),
247            Remove(p) => {
248                let path: &Path = p.inner().as_ref();
249                if path.is_dir() {
250                    fs::remove_dir_all(path).await
251                } else {
252                    fs::remove_file(path).await
253                }
254                .map_err(|io| FsError::RemoveError(p.clone(), io))
255            }
256            Copy(src, dst) => Ok(fs_copy(src, dst).await?),
257            CopyTo(head, map) => {
258                for (src, dst) in expand_binary_map(head, map) {
259                    fs_copy(src, &dst).await?;
260                }
261                Ok(())
262            }
263            Move(src, dst) => Ok(fs_move(src, dst).await?),
264            MoveTo(head, map) => {
265                for (src, dst) in expand_binary_map(head, map) {
266                    fs_move(src, &dst).await?;
267                }
268                Ok(())
269            }
270            PrintFile(p) => {
271                let contents = fs::read_to_string(p.inner())
272                    .await
273                    .map_err(|io| RuntimeError::FsError(FsError::FileAccessError(p.clone(), io)))?;
274                println!("{contents}");
275                Ok(())
276            }
277            EPrintFile(p) => {
278                let contents = fs::read_to_string(p.inner())
279                    .await
280                    .map_err(|io| RuntimeError::FsError(FsError::FileAccessError(p.clone(), io)))?;
281                eprintln!("{contents}");
282                Ok(())
283            }
284        }
285        .map_err(RuntimeError::FsError)
286    }
287}
288
289fn expand_binary_map<'a>(
290    head: &'a Spanned<String>,
291    map: &'a [(Spanned<String>, Option<Spanned<String>>)],
292) -> impl Iterator<Item = (&'a Spanned<String>, Spanned<String>)> + 'a {
293    map.iter().map(|(src, dst)| {
294        let new_dst = dst.as_ref().map_or_else(
295            || head.clone().map(extend_path(src.inner())),
296            |dst| head.clone().map(extend_path(dst.inner())),
297        );
298        (src, new_dst)
299    })
300}
301
302fn extend_path(end: &str) -> impl Fn(String) -> String + '_ {
303    |mut head| {
304        if !head.ends_with('/') && !head.ends_with('\\') {
305            head.push('/');
306        }
307        head.push_str(end);
308        head
309    }
310}
311
312async fn fs_move(src: &Spanned<String>, dst: &Spanned<String>) -> Result<(), RuntimeError> {
313    use tokio::fs;
314    fs_copy(src, dst).await?;
315
316    let src_path: &Path = src.inner().as_ref();
317    if src_path.is_dir() {
318        fs::remove_dir_all(src_path).await
319    } else {
320        fs::remove_file(src_path).await
321    }
322    .map_err(|io| RuntimeError::FsError(FsError::RemoveError(src.clone(), io)))
323}
324
325async fn fs_copy(src: &Spanned<String>, dst: &Spanned<String>) -> Result<(), RuntimeError> {
326    use tokio::fs;
327    let src_path: &Path = src.inner().as_ref();
328    if src_path.is_dir() {
329        // No async variant for dircpy
330        block_in_place(|| copy_dir(src_path, dst.inner()))
331    } else {
332        fs::copy(src_path, dst.inner()).await.map(|_| ())
333    }
334    .map_err(|io| RuntimeError::FsError(FsError::CopyError(src.clone(), dst.clone(), io)))
335}
336
337#[derive(Debug, Error)]
338pub enum FsError {
339    #[error("Failed to create file {0}")]
340    CreateFileError(Spanned<String>, io::Error),
341    #[error("Failed to create directory {0}")]
342    CreateDirError(Spanned<String>, io::Error),
343    #[error("Failed to remove {0}")]
344    RemoveError(Spanned<String>, io::Error),
345    #[error("Failed to get contents of file {0}")]
346    FileAccessError(Spanned<String>, io::Error),
347    #[error("Failed to copy {0} to {1}")]
348    CopyError(Spanned<String>, Spanned<String>, io::Error),
349}
350
351impl IoCommand {
352    pub fn call(&self) -> Result<(), RuntimeError> {
353        use IoCommand::{EPrint, EPrintLn, Print, PrintLn};
354
355        match self {
356            PrintLn(t) => {
357                println!("{t}");
358            }
359            Print(t) => {
360                print!("{t}");
361            }
362            EPrintLn(t) => {
363                eprintln!("{t}");
364            }
365            EPrint(t) => {
366                eprint!("{t}");
367            }
368        };
369
370        Ok(())
371    }
372}
373
374impl EnvCommand {
375    pub fn call(&self, rt: &Runtime) -> Result<(), RuntimeError> {
376        use EnvCommand::{PathPush, PathRemove, RemoveVar, SetVar};
377
378        // FIX: Find a way to make manipulating ENV vars safe
379        match self {
380            SetVar(var, val) => {
381                rt.env_vars.rcu(|envs| {
382                    let mut envs = HashMap::clone(envs);
383                    envs.insert(var.inner().into(), val.inner().into());
384                    envs
385                });
386                Ok(())
387            }
388            RemoveVar(var) => {
389                rt.env_vars.rcu(|envs| {
390                    let mut envs = HashMap::clone(envs);
391                    envs.swap_remove(OsStr::new(var.inner()));
392                    envs
393                });
394                Ok(())
395            }
396            PathPush(p) => {
397                rt.env_vars.rcu(|envs| {
398                    let mut envs = HashMap::clone(envs);
399
400                    let mut path_var: Vec<_> = envs
401                        .get(OsStr::new("PATH"))
402                        .map(|i| env::split_paths(&i).collect())
403                        .unwrap_or_default();
404                    path_var.push(canonicalize(p.inner()).unwrap());
405
406                    env::join_paths(path_var)
407                        .ok()
408                        .map(|new_path| envs.insert("PATH".into(), new_path));
409                    envs
410                });
411
412                Ok(())
413            }
414            PathRemove(p) => {
415                rt.env_vars.rcu(|envs| {
416                    let mut envs = HashMap::clone(envs);
417                    if let Some(i) = envs.get(OsStr::new("PATH")) {
418                        let mut path_var = env::split_paths(&i).collect::<Vec<_>>();
419                        path_var.retain(|i| i != &canonicalize(p.inner()).unwrap());
420
421                        env::join_paths(path_var)
422                            .ok()
423                            .map(|new_path| envs.insert("PATH".into(), new_path));
424                    }
425                    envs
426                });
427                Ok(())
428            }
429        }
430    }
431}
432
433struct BlockOnDepFuture<'a> {
434    uos: &'a AtomicU8,
435}
436
437impl Future for BlockOnDepFuture<'_> {
438    type Output = Result<(), fn(String, Spanned<String>) -> RuntimeError>;
439
440    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
441        if self.uos.load(Ordering::Acquire) < UOS_COMPLETE {
442            cx.waker().wake_by_ref();
443            return Poll::Pending;
444        }
445
446        if self.uos.load(Ordering::Relaxed) == UOS_FAILED {
447            Poll::Ready(Err(|unit_name, dep_name| {
448                RuntimeError::FailedDependency(unit_name, dep_name)
449            }))
450        } else {
451            Poll::Ready(Ok(()))
452        }
453    }
454}
455
456pub enum NotifierEvent<'a> {
457    Call(&'a Command),
458    Start(&'a str),
459    Complete(&'a str),
460    Error(&'a [RuntimeError]),
461    Dependency { parent: &'a str, name: &'a str },
462    BlockOn { parent: &'a str, name: &'a str },
463}
464
465/// Trait to hook into runtime events, usually for logging.
466pub trait Notifier {
467    fn on_event(&self, event: NotifierEvent<'_>);
468}
469
470/// Convenience trait for using [`Notifier`]
471pub trait NotifierExt: Notifier {
472    fn call(&self, command: &Command) {
473        self.on_event(NotifierEvent::Call(command));
474    }
475
476    fn start(&self, name: &str) {
477        self.on_event(NotifierEvent::Start(name));
478    }
479
480    fn complete(&self, name: &str) {
481        self.on_event(NotifierEvent::Complete(name));
482    }
483
484    fn error(&self, errors: &[RuntimeError]) {
485        self.on_event(NotifierEvent::Error(errors));
486    }
487
488    fn dependency(&self, parent: &str, name: &str) {
489        self.on_event(NotifierEvent::Dependency { parent, name });
490    }
491
492    fn block_on(&self, parent: &str, name: &str) {
493        self.on_event(NotifierEvent::BlockOn { parent, name });
494    }
495}
496
497impl<T: Notifier + ?Sized> NotifierExt for T {}