xvc_pipeline/pipeline/
command.rs1use crate::Result;
2
3use crossbeam_channel::{Receiver, Sender};
4
5use std::collections::HashMap;
6use std::fmt::Display;
7use std::io::Read;
8
9use std::time::Instant;
10use subprocess as sp;
11
12use xvc_file::CHANNEL_CAPACITY;
13
14use serde::{Deserialize, Serialize};
15use xvc_core::persist;
16
17use crate::XvcStep;
18
19#[derive(Debug, Clone, PartialOrd, Ord, Eq, PartialEq, Serialize, Deserialize)]
21pub struct XvcStepCommand {
22 pub command: String,
24}
25
26persist!(XvcStepCommand, "xvc-step-command");
27
28impl Display for XvcStepCommand {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "{}", self.command)
31 }
32}
33
34impl AsRef<str> for XvcStepCommand {
35 fn as_ref(&self) -> &str {
36 self.command.as_ref()
37 }
38}
39
40#[derive(Debug)]
43pub struct CommandProcess {
44 pub environment: HashMap<String, String>,
47 pub step: XvcStep,
49 pub step_command: XvcStepCommand,
51 pub birth: Option<Instant>,
53 pub process: Option<sp::Popen>,
55 pub stdout_sender: Sender<String>,
57 pub stderr_sender: Sender<String>,
59 pub stdout_receiver: Receiver<String>,
61 pub stderr_receiver: Receiver<String>,
63}
64
65impl CommandProcess {
66 pub fn new(step: &XvcStep, step_command: &XvcStepCommand) -> Self {
69 let (stdout_sender, stdout_receiver) = crossbeam_channel::bounded(CHANNEL_CAPACITY);
70 let (stderr_sender, stderr_receiver) = crossbeam_channel::bounded(CHANNEL_CAPACITY);
71 Self {
72 environment: HashMap::new(),
73 step: step.clone(),
74 step_command: step_command.clone(),
75 birth: None,
76 process: None,
77 stdout_sender,
78 stderr_sender,
79 stdout_receiver,
80 stderr_receiver,
81 }
82 }
83
84 pub fn add_environment_variable(&mut self, key: &str, value: &str) -> Result<&mut Self> {
86 self.environment.insert(key.to_owned(), value.to_owned());
87 Ok(self)
88 }
89
90 pub fn run(&mut self) -> Result<()> {
93 let process = sp::Exec::shell(self.step_command.command.clone())
94 .stdout(sp::Redirection::Pipe)
95 .stderr(sp::Redirection::Pipe)
96 .stdin(sp::Redirection::None)
97 .env_extend(
98 self.environment
99 .iter()
100 .collect::<Vec<(&String, &String)>>()
101 .as_slice(),
102 )
103 .detached()
104 .popen()?;
105 self.process = Some(process);
106 self.birth = Some(Instant::now());
107 Ok(())
108 }
109
110 pub fn update_output_channels(&mut self) -> Result<()> {
112 if let Some(p) = &self.process {
113 if let Some(mut stdout) = p.stdout.as_ref() {
114 let mut out = String::new();
115 stdout.read_to_string(&mut out)?;
116 if !out.is_empty() {
117 self.stdout_sender
118 .send(format!("[OUT] [{}] {}", self.step.name, out))
119 .ok();
120 }
121 }
122
123 if let Some(mut stderr) = p.stderr.as_ref() {
124 let mut err = String::new();
125 stderr.read_to_string(&mut err)?;
126 if !err.is_empty() {
127 self.stderr_sender
128 .send(format!("[ERR] [{}] {}", self.step.name, err))
129 .ok();
130 }
131 }
132 }
133 Ok(())
134 }
135}