xvc_pipeline/pipeline/
command.rs

1use crate::Result;
2
3use crossbeam_channel::{Receiver, Sender};
4
5use std::collections::HashMap;
6use std::fmt::Display;
7use std::io::Read;
8
9use std::time::Instant;
10use subprocess as sp;
11
12use xvc_file::CHANNEL_CAPACITY;
13
14use serde::{Deserialize, Serialize};
15use xvc_core::persist;
16
17use crate::XvcStep;
18
19/// Command to run for an [XvcStep].
20#[derive(Debug, Clone, PartialOrd, Ord, Eq, PartialEq, Serialize, Deserialize)]
21pub struct XvcStepCommand {
22    /// A shell command that will be run via [subprocess::Exec::shell] in [crate::pipeline::s_waiting_to_run].
23    pub command: String,
24}
25
26persist!(XvcStepCommand, "xvc-step-command");
27
28impl Display for XvcStepCommand {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(f, "{}", self.command)
31    }
32}
33
34impl AsRef<str> for XvcStepCommand {
35    fn as_ref(&self) -> &str {
36        self.command.as_ref()
37    }
38}
39
40/// Used for encapsulating a process and its outputs. This is used to associate steps with running
41/// processes.
42#[derive(Debug)]
43pub struct CommandProcess {
44    /// Environment variables injected to the shell that runs the command. This is used to pass
45    /// added, removed items in certain dependency types.  
46    pub environment: HashMap<String, String>,
47    /// The step that this command belongs to
48    pub step: XvcStep,
49    /// The command to run
50    pub step_command: XvcStepCommand,
51    /// When we started running the command
52    pub birth: Option<Instant>,
53    /// The process that runs the command
54    pub process: Option<sp::Popen>,
55    /// Channel to send stdout to
56    pub stdout_sender: Sender<String>,
57    /// Channel to send stderr to
58    pub stderr_sender: Sender<String>,
59    /// Channel to receive stdout from
60    pub stdout_receiver: Receiver<String>,
61    /// Channel to receive stderr from
62    pub stderr_receiver: Receiver<String>,
63}
64
65impl CommandProcess {
66    /// Create a new CommandProcess by creating channels and setting other variables to their
67    /// default values.
68    pub fn new(step: &XvcStep, step_command: &XvcStepCommand) -> Self {
69        let (stdout_sender, stdout_receiver) = crossbeam_channel::bounded(CHANNEL_CAPACITY);
70        let (stderr_sender, stderr_receiver) = crossbeam_channel::bounded(CHANNEL_CAPACITY);
71        Self {
72            environment: HashMap::new(),
73            step: step.clone(),
74            step_command: step_command.clone(),
75            birth: None,
76            process: None,
77            stdout_sender,
78            stderr_sender,
79            stdout_receiver,
80            stderr_receiver,
81        }
82    }
83
84    /// Add an environment variable to inject to the shell that runs the command.
85    pub fn add_environment_variable(&mut self, key: &str, value: &str) -> Result<&mut Self> {
86        self.environment.insert(key.to_owned(), value.to_owned());
87        Ok(self)
88    }
89
90    /// Start executing the command in a shell. Updates birth and process variables after
91    /// detaching.
92    pub fn run(&mut self) -> Result<()> {
93        let process = sp::Exec::shell(self.step_command.command.clone())
94            .stdout(sp::Redirection::Pipe)
95            .stderr(sp::Redirection::Pipe)
96            .stdin(sp::Redirection::None)
97            .env_extend(
98                self.environment
99                    .iter()
100                    .collect::<Vec<(&String, &String)>>()
101                    .as_slice(),
102            )
103            .detached()
104            .popen()?;
105        self.process = Some(process);
106        self.birth = Some(Instant::now());
107        Ok(())
108    }
109
110    /// Collects the output from process and sends to output channels.
111    pub fn update_output_channels(&mut self) -> Result<()> {
112        if let Some(p) = &self.process {
113            if let Some(mut stdout) = p.stdout.as_ref() {
114                let mut out = String::new();
115                stdout.read_to_string(&mut out)?;
116                if !out.is_empty() {
117                    self.stdout_sender
118                        .send(format!("[OUT] [{}] {}", self.step.name, out))
119                        .ok();
120                }
121            }
122
123            if let Some(mut stderr) = p.stderr.as_ref() {
124                let mut err = String::new();
125                stderr.read_to_string(&mut err)?;
126                if !err.is_empty() {
127                    self.stderr_sender
128                        .send(format!("[ERR] [{}] {}", self.step.name, err))
129                        .ok();
130                }
131            }
132        }
133        Ok(())
134    }
135}