1use std::{
2 env,
3 ffi::{OsStr, OsString},
4 fs::canonicalize,
5 future::Future,
6 io,
7 path::Path,
8 process::Output,
9 sync::atomic::{AtomicU8, Ordering},
10 task::Poll,
11};
12
13use arc_swap::ArcSwap;
14use dircpy::copy_dir;
15use futures::{future::ready, stream::FuturesUnordered, StreamExt, TryStreamExt};
16use thiserror::Error;
17use tokio::task::block_in_place;
18
19use crate::{parser::Spanned, Command, EnvCommand, FsCommand, HashMap, IoCommand, Module, Unit};
20
21#[derive(Debug, Error)]
22pub enum RuntimeError {
23 #[error("A dependency of this unit failed")]
24 DependencyError(Spanned<String>),
25 #[error("Dependency {1} failed preventing completion of {0}")]
26 FailedDependency(String, Spanned<String>),
27 #[error("Failed to execute {0:?}: {1}")]
28 ExecutionFailure(Vec<Spanned<String>>, io::Error),
29 #[error("{0}")]
30 FsError(FsError),
31 #[error("{0}")]
32 JoinPathsError(env::JoinPathsError),
33 #[error("Command {0:?} isn't supported on this runtime")]
34 CommandUnsupported(Command),
35}
36
37pub struct Runtime {
38 module: Module,
39 notifier: Box<dyn Notifier + Sync>,
40 once: HashMap<Spanned<String>, AtomicU8>,
41 env_vars: ArcSwap<HashMap<OsString, OsString>>,
42}
43
44const UOS_INCOMPLETE: u8 = 0;
45const UOS_IN_PROGRESS: u8 = 1;
46const UOS_COMPLETE: u8 = 2;
47const UOS_FAILED: u8 = 3;
48
49impl Runtime {
50 pub fn new(module: Module, notifier: impl Notifier + Sync + 'static) -> Self {
51 let once = module
52 .units
53 .keys()
54 .map(|name| (name.clone(), AtomicU8::new(UOS_INCOMPLETE)))
55 .collect();
56
57 let env_vars = ArcSwap::from_pointee(env::vars_os().collect());
58
59 Self {
60 module,
61 notifier: Box::new(notifier),
62 once,
63 env_vars,
64 }
65 }
66
67 pub fn notifier(&self) -> &dyn Notifier {
68 &*self.notifier
69 }
70
71 fn get_unit(&self, name: impl Into<String>) -> (&Spanned<String>, &Unit) {
72 self.module
73 .units
74 .get_key_value(&Spanned::new(name.into()))
75 .unwrap()
76 }
77
78 fn get_uos(&self, name: impl Into<String>) -> &AtomicU8 {
79 self.once.get(&Spanned::new(name.into())).unwrap()
80 }
81
82 pub async fn run(&self, unit_name: &str) -> Result<(), RuntimeError> {
83 Box::pin(async {
85 let (unit_span, unit) = self.get_unit(unit_name);
86
87 self.notifier.start(unit_name);
88
89 if !unit.depends_on.is_empty() {
90 let futs = unit
91 .depends_on
92 .iter()
93 .map(|dep| {
94 Box::pin(async move {
95 let uos_state = self.get_uos(dep.inner());
96 let uos = uos_state.compare_exchange(
97 UOS_INCOMPLETE,
98 UOS_IN_PROGRESS,
99 Ordering::Acquire,
100 Ordering::Relaxed,
101 );
102 match uos {
103 Ok(_) => {
104 self.notifier.dependency(unit_name, dep.inner());
105 match self.run(dep.inner()).await {
106 Err(e) => {
107 uos_state.store(UOS_FAILED, Ordering::Release);
108 Err(e)
109 }
110 Ok(o) => {
111 uos_state.store(UOS_COMPLETE, Ordering::Release);
112 Ok(o)
113 }
114 }
115 }
116 Err(UOS_FAILED) => Err(RuntimeError::FailedDependency(
117 unit_name.to_owned(),
118 dep.clone(),
119 )),
120 Err(UOS_IN_PROGRESS) => BlockOnDepFuture { uos: uos_state }
121 .await
122 .map_err(|f| f(unit_name.to_owned(), dep.clone())),
123 _ => Ok(()),
124 }
125 })
126 })
127 .collect::<FuturesUnordered<_>>();
128
129 let errors = futs
130 .into_stream()
131 .filter_map(|res| ready(res.err()))
132 .collect::<Vec<_>>()
133 .await;
134 if !errors.is_empty() {
135 self.notifier.error(&errors);
136 return Err(RuntimeError::DependencyError(unit_span.clone()));
137 }
138 }
139 for cmd in &unit.commands {
140 cmd.call(self).await?;
141 }
142
143 self.notifier.complete(unit_name);
144
145 Ok(())
146 })
147 .await
148 }
149}
150
151impl Command {
152 pub async fn call(&self, rt: &Runtime) -> Result<(), RuntimeError> {
153 use Command::{Concurrent, DependsOn, Do, Env, Exec, Fs, Io, Meta};
154
155 rt.notifier.call(self);
156
157 match self {
158 DependsOn(_) | Meta(_) => Ok(()),
160
161 Do(units) => {
162 for unit in units {
163 rt.run(unit.inner()).await?;
164 }
165 Ok(())
166 }
167 Exec(cmd) => {
168 let args = cmd[1..].iter().map(Spanned::inner);
169 let handle = duct::cmd(cmd[0].inner(), args)
170 .full_env(rt.env_vars.load().iter())
171 .unchecked()
172 .start()
173 .map_err(|io| RuntimeError::ExecutionFailure(cmd.to_vec(), io))?;
174 let fut = HandleFuture { handle };
175 let exec = fut
176 .await
177 .map_err(|io| RuntimeError::ExecutionFailure(cmd.to_vec(), io))?;
178 if exec.status.success() {
179 Ok(())
180 } else {
181 Err(RuntimeError::ExecutionFailure(
182 cmd.to_vec(),
183 io::Error::new(
184 io::ErrorKind::Other,
185 format!("Process returned non-successfully with {}.", exec.status),
186 ),
187 ))
188 }
189 }
190 Concurrent(cmds) => {
191 let mut errors = cmds
192 .iter()
193 .map(|cmd| cmd.call(rt))
194 .collect::<FuturesUnordered<_>>()
195 .into_stream()
196 .filter_map(|res| ready(res.err()))
197 .collect::<Vec<_>>()
198 .await;
199
200 if errors.is_empty() {
202 Ok(())
203 } else {
204 Err(errors.pop().unwrap())
205 }
206 }
207
208 Fs(cmd) => cmd.call().await,
209 Io(cmd) => cmd.call(),
210 Env(cmd) => cmd.call(rt),
211 }
212 }
213}
214
215struct HandleFuture {
216 handle: duct::Handle,
217}
218
219impl Future for HandleFuture {
220 type Output = io::Result<Output>;
221
222 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
223 if let Some(output) = self.handle.try_wait()? {
224 Poll::Ready(Ok(output.clone()))
225 } else {
226 cx.waker().wake_by_ref();
227 Poll::Pending
228 }
229 }
230}
231
232impl FsCommand {
233 pub async fn call(&self) -> Result<(), RuntimeError> {
234 use tokio::fs;
235 use FsCommand::{
236 Copy, CopyTo, Create, CreateDir, EPrintFile, Move, MoveTo, PrintFile, Remove,
237 };
238
239 match self {
240 Create(p) => fs::File::create(p.inner())
241 .await
242 .map(|_| ())
243 .map_err(|io| FsError::CreateFileError(p.clone(), io)),
244 CreateDir(p) => fs::create_dir_all(p.inner())
245 .await
246 .map_err(|io| FsError::CreateDirError(p.clone(), io)),
247 Remove(p) => {
248 let path: &Path = p.inner().as_ref();
249 if path.is_dir() {
250 fs::remove_dir_all(path).await
251 } else {
252 fs::remove_file(path).await
253 }
254 .map_err(|io| FsError::RemoveError(p.clone(), io))
255 }
256 Copy(src, dst) => Ok(fs_copy(src, dst).await?),
257 CopyTo(head, map) => {
258 for (src, dst) in expand_binary_map(head, map) {
259 fs_copy(src, &dst).await?;
260 }
261 Ok(())
262 }
263 Move(src, dst) => Ok(fs_move(src, dst).await?),
264 MoveTo(head, map) => {
265 for (src, dst) in expand_binary_map(head, map) {
266 fs_move(src, &dst).await?;
267 }
268 Ok(())
269 }
270 PrintFile(p) => {
271 let contents = fs::read_to_string(p.inner())
272 .await
273 .map_err(|io| RuntimeError::FsError(FsError::FileAccessError(p.clone(), io)))?;
274 println!("{contents}");
275 Ok(())
276 }
277 EPrintFile(p) => {
278 let contents = fs::read_to_string(p.inner())
279 .await
280 .map_err(|io| RuntimeError::FsError(FsError::FileAccessError(p.clone(), io)))?;
281 eprintln!("{contents}");
282 Ok(())
283 }
284 }
285 .map_err(RuntimeError::FsError)
286 }
287}
288
289fn expand_binary_map<'a>(
290 head: &'a Spanned<String>,
291 map: &'a [(Spanned<String>, Option<Spanned<String>>)],
292) -> impl Iterator<Item = (&'a Spanned<String>, Spanned<String>)> + 'a {
293 map.iter().map(|(src, dst)| {
294 let new_dst = dst.as_ref().map_or_else(
295 || head.clone().map(extend_path(src.inner())),
296 |dst| head.clone().map(extend_path(dst.inner())),
297 );
298 (src, new_dst)
299 })
300}
301
302fn extend_path(end: &str) -> impl Fn(String) -> String + '_ {
303 |mut head| {
304 if !head.ends_with('/') && !head.ends_with('\\') {
305 head.push('/');
306 }
307 head.push_str(end);
308 head
309 }
310}
311
312async fn fs_move(src: &Spanned<String>, dst: &Spanned<String>) -> Result<(), RuntimeError> {
313 use tokio::fs;
314 fs_copy(src, dst).await?;
315
316 let src_path: &Path = src.inner().as_ref();
317 if src_path.is_dir() {
318 fs::remove_dir_all(src_path).await
319 } else {
320 fs::remove_file(src_path).await
321 }
322 .map_err(|io| RuntimeError::FsError(FsError::RemoveError(src.clone(), io)))
323}
324
325async fn fs_copy(src: &Spanned<String>, dst: &Spanned<String>) -> Result<(), RuntimeError> {
326 use tokio::fs;
327 let src_path: &Path = src.inner().as_ref();
328 if src_path.is_dir() {
329 block_in_place(|| copy_dir(src_path, dst.inner()))
331 } else {
332 fs::copy(src_path, dst.inner()).await.map(|_| ())
333 }
334 .map_err(|io| RuntimeError::FsError(FsError::CopyError(src.clone(), dst.clone(), io)))
335}
336
337#[derive(Debug, Error)]
338pub enum FsError {
339 #[error("Failed to create file {0}")]
340 CreateFileError(Spanned<String>, io::Error),
341 #[error("Failed to create directory {0}")]
342 CreateDirError(Spanned<String>, io::Error),
343 #[error("Failed to remove {0}")]
344 RemoveError(Spanned<String>, io::Error),
345 #[error("Failed to get contents of file {0}")]
346 FileAccessError(Spanned<String>, io::Error),
347 #[error("Failed to copy {0} to {1}")]
348 CopyError(Spanned<String>, Spanned<String>, io::Error),
349}
350
351impl IoCommand {
352 pub fn call(&self) -> Result<(), RuntimeError> {
353 use IoCommand::{EPrint, EPrintLn, Print, PrintLn};
354
355 match self {
356 PrintLn(t) => {
357 println!("{t}");
358 }
359 Print(t) => {
360 print!("{t}");
361 }
362 EPrintLn(t) => {
363 eprintln!("{t}");
364 }
365 EPrint(t) => {
366 eprint!("{t}");
367 }
368 };
369
370 Ok(())
371 }
372}
373
374impl EnvCommand {
375 pub fn call(&self, rt: &Runtime) -> Result<(), RuntimeError> {
376 use EnvCommand::{PathPush, PathRemove, RemoveVar, SetVar};
377
378 match self {
380 SetVar(var, val) => {
381 rt.env_vars.rcu(|envs| {
382 let mut envs = HashMap::clone(envs);
383 envs.insert(var.inner().into(), val.inner().into());
384 envs
385 });
386 Ok(())
387 }
388 RemoveVar(var) => {
389 rt.env_vars.rcu(|envs| {
390 let mut envs = HashMap::clone(envs);
391 envs.swap_remove(OsStr::new(var.inner()));
392 envs
393 });
394 Ok(())
395 }
396 PathPush(p) => {
397 rt.env_vars.rcu(|envs| {
398 let mut envs = HashMap::clone(envs);
399
400 let mut path_var: Vec<_> = envs
401 .get(OsStr::new("PATH"))
402 .map(|i| env::split_paths(&i).collect())
403 .unwrap_or_default();
404 path_var.push(canonicalize(p.inner()).unwrap());
405
406 env::join_paths(path_var)
407 .ok()
408 .map(|new_path| envs.insert("PATH".into(), new_path));
409 envs
410 });
411
412 Ok(())
413 }
414 PathRemove(p) => {
415 rt.env_vars.rcu(|envs| {
416 let mut envs = HashMap::clone(envs);
417 if let Some(i) = envs.get(OsStr::new("PATH")) {
418 let mut path_var = env::split_paths(&i).collect::<Vec<_>>();
419 path_var.retain(|i| i != &canonicalize(p.inner()).unwrap());
420
421 env::join_paths(path_var)
422 .ok()
423 .map(|new_path| envs.insert("PATH".into(), new_path));
424 }
425 envs
426 });
427 Ok(())
428 }
429 }
430 }
431}
432
433struct BlockOnDepFuture<'a> {
434 uos: &'a AtomicU8,
435}
436
437impl Future for BlockOnDepFuture<'_> {
438 type Output = Result<(), fn(String, Spanned<String>) -> RuntimeError>;
439
440 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
441 if self.uos.load(Ordering::Acquire) < UOS_COMPLETE {
442 cx.waker().wake_by_ref();
443 return Poll::Pending;
444 }
445
446 if self.uos.load(Ordering::Relaxed) == UOS_FAILED {
447 Poll::Ready(Err(|unit_name, dep_name| {
448 RuntimeError::FailedDependency(unit_name, dep_name)
449 }))
450 } else {
451 Poll::Ready(Ok(()))
452 }
453 }
454}
455
456pub enum NotifierEvent<'a> {
457 Call(&'a Command),
458 Start(&'a str),
459 Complete(&'a str),
460 Error(&'a [RuntimeError]),
461 Dependency { parent: &'a str, name: &'a str },
462 BlockOn { parent: &'a str, name: &'a str },
463}
464
465pub trait Notifier {
467 fn on_event(&self, event: NotifierEvent<'_>);
468}
469
470pub trait NotifierExt: Notifier {
472 fn call(&self, command: &Command) {
473 self.on_event(NotifierEvent::Call(command));
474 }
475
476 fn start(&self, name: &str) {
477 self.on_event(NotifierEvent::Start(name));
478 }
479
480 fn complete(&self, name: &str) {
481 self.on_event(NotifierEvent::Complete(name));
482 }
483
484 fn error(&self, errors: &[RuntimeError]) {
485 self.on_event(NotifierEvent::Error(errors));
486 }
487
488 fn dependency(&self, parent: &str, name: &str) {
489 self.on_event(NotifierEvent::Dependency { parent, name });
490 }
491
492 fn block_on(&self, parent: &str, name: &str) {
493 self.on_event(NotifierEvent::BlockOn { parent, name });
494 }
495}
496
497impl<T: Notifier + ?Sized> NotifierExt for T {}