use crate::{
signals::Shutdown,
throttle::{self, Throttle},
};
use is_executable::IsExecutable;
use nix::{
sys::wait::{waitpid, WaitPidFlag, WaitStatus},
unistd::{fork, ForkResult, Pid},
};
use rand::{
distributions::{Alphanumeric, DistString},
rngs::StdRng,
seq::SliceRandom,
};
use serde::{Deserialize, Serialize};
use std::{
collections::{vec_deque, HashMap, HashSet, VecDeque},
env, error, fmt,
iter::Peekable,
num::{NonZeroU32, NonZeroUsize},
path::PathBuf,
process::{exit, Stdio},
str, thread,
time::Duration,
};
use tokio::process::Command;
use tracing::{error, info};
#[derive(Debug)]
pub struct NotExecutable {
executable: PathBuf,
}
impl error::Error for NotExecutable {}
impl fmt::Display for NotExecutable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} not executable", self.executable.display())
}
}
#[derive(Debug)]
pub struct ExecutionError {
stderr: String,
}
impl error::Error for ExecutionError {}
impl fmt::Display for ExecutionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "execution failed: {}", self.stderr)
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Not Executable!")]
NotExecutable(#[from] NotExecutable),
#[error("Serialization failed with error: {0}")]
Serialization(#[from] serde_yaml::Error),
#[error("IO error: {0}")]
Io(#[from] ::std::io::Error),
#[error("Execution error: {0}")]
ExecutionError(#[from] ExecutionError),
}
fn default_max_depth() -> NonZeroU32 {
NonZeroU32::new(10).unwrap()
}
fn default_max_tree_per_second() -> NonZeroU32 {
NonZeroU32::new(5).unwrap()
}
fn default_process_sleep_ns() -> NonZeroU32 {
NonZeroU32::new(100_000_000).unwrap()
}
fn default_max_children() -> NonZeroU32 {
NonZeroU32::new(10).unwrap()
}
fn default_args_len() -> NonZeroUsize {
NonZeroUsize::new(10).unwrap()
}
fn default_args_count() -> NonZeroU32 {
NonZeroU32::new(16).unwrap()
}
fn default_envs_len() -> NonZeroUsize {
NonZeroUsize::new(16).unwrap()
}
fn default_envs_count() -> NonZeroU32 {
NonZeroU32::new(10).unwrap()
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum Args {
Static(StaticArgs),
Generate(GenerateArgs),
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct StaticArgs {
pub values: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)]
pub struct GenerateArgs {
#[serde(default = "default_args_len")]
pub length: NonZeroUsize,
#[serde(default = "default_args_count")]
pub count: NonZeroU32,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum Envs {
Static(StaticEnvs),
Generate(GenerateEnvs),
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct StaticEnvs {
pub values: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)]
pub struct GenerateEnvs {
#[serde(default = "default_envs_len")]
pub length: NonZeroUsize,
#[serde(default = "default_envs_count")]
pub count: NonZeroU32,
}
impl StaticEnvs {
#[must_use]
pub fn to_hash(&self) -> HashMap<String, String> {
let mut envs: HashMap<String, String> = HashMap::new();
for env in &self.values {
if let Some(kv) = env.split_once('=') {
envs.insert(kv.0.to_string(), kv.1.to_string());
} else {
envs.insert(env.clone(), String::new());
}
}
envs
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct Executable {
pub executable: PathBuf,
pub args: Args,
pub envs: Envs,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct Config {
pub seed: [u8; 32],
#[serde(default = "default_max_tree_per_second")]
pub max_tree_per_second: NonZeroU32,
#[serde(default = "default_max_depth")]
pub max_depth: NonZeroU32,
#[serde(default = "default_max_children")]
pub max_children: NonZeroU32,
#[serde(default = "default_process_sleep_ns")]
pub process_sleep_ns: NonZeroU32,
pub executables: Vec<Executable>,
#[serde(default)]
pub throttle: throttle::Config,
}
impl Config {
pub fn validate(&self) -> Result<(), Error> {
let iter = self.executables.iter();
for exec in iter {
if !exec.executable.is_executable() {
return Err(Error::from(NotExecutable {
executable: exec.executable.clone(),
}));
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct ProcessTree {
lading_path: PathBuf,
config_content: String,
throttle: Throttle,
shutdown: Shutdown,
}
impl ProcessTree {
pub fn new(config: &Config, shutdown: Shutdown) -> Result<Self, Error> {
let lading_path = match env::current_exe() {
Ok(path) => path,
Err(e) => return Err(Error::from(e)),
};
let labels = vec![
("component".to_string(), "generator".to_string()),
("component_name".to_string(), "process_tree".to_string()),
];
let throttle =
Throttle::new_with_config(config.throttle, config.max_tree_per_second, labels);
match serde_yaml::to_string(config) {
Ok(serialized) => Ok(Self {
lading_path,
config_content: serialized,
throttle,
shutdown,
}),
Err(e) => Err(Error::from(e)),
}
}
pub async fn spin(mut self) -> Result<(), Error> {
let lading_path = self.lading_path.to_str().unwrap();
loop {
tokio::select! {
_ = self.throttle.wait() => {
let output = Command::new(lading_path)
.args(["--target-pid", "1"])
.arg("process-tree-gen")
.arg("--config-content")
.arg(&self.config_content)
.stdin(Stdio::null())
.output().await.unwrap();
if !output.status.success() {
error!("process tree generator execution error");
return Err(Error::from(ExecutionError {
stderr: str::from_utf8(&output.stderr).unwrap().to_string()
}));
}
},
_ = self.shutdown.recv() => {
info!("shutdown signal received");
break;
},
}
}
Ok(())
}
}
#[inline]
fn rnd_str(rng: &mut StdRng, len: usize) -> String {
Alphanumeric.sample_string(rng, len)
}
#[inline]
fn gen_rnd_args(rng: &mut StdRng, len: usize, max: u32) -> Vec<String> {
let mut args = Vec::new();
for _ in 0..max {
args.push(rnd_str(rng, len));
}
args
}
#[inline]
fn gen_rnd_envs(rng: &mut StdRng, len: usize, max: u32) -> HashMap<String, String> {
let key_size = len / 2;
let value_size = len - key_size;
let mut envs = HashMap::new();
for _ in 0..max {
let key = rnd_str(rng, key_size);
let value = rnd_str(rng, value_size);
envs.insert(key, value);
}
envs
}
#[derive(Debug)]
pub struct Exec {
executable: String,
args: Vec<String>,
envs: HashMap<String, String>,
}
impl Exec {
fn new(rng: &mut StdRng, config: &Config) -> Self {
let exec = config.executables.choose(rng).unwrap();
let args = match &exec.args {
Args::Static(params) => params.values.clone(),
Args::Generate(params) => gen_rnd_args(rng, params.length.get(), params.count.get()),
};
let envs = match &exec.envs {
Envs::Static(params) => params.to_hash(),
Envs::Generate(params) => gen_rnd_envs(rng, params.length.get(), params.count.get()),
};
Self {
executable: exec.executable.to_str().unwrap().to_string(),
args,
envs,
}
}
}
#[derive(Debug)]
pub struct Process {
depth: u32,
exec: Option<Exec>,
}
impl Process {
fn new(depth: u32, exec: Option<Exec>) -> Self {
Self { depth, exec }
}
}
pub fn spawn_tree(nodes: &VecDeque<Process>, sleep_ns: u32) {
let mut iter = nodes.iter().peekable();
let mut pids_to_wait: HashSet<Pid> = HashSet::new();
let mut depth = 0;
loop {
try_wait_pid(&mut pids_to_wait);
if iter.len() == 0 {
if !pids_to_wait.is_empty() {
continue;
}
if depth > 0 {
exit(0)
}
return;
}
let duration = Duration::from_nanos(sleep_ns.into());
thread::sleep(duration);
let process = iter.next().unwrap();
if let Some(exec) = &process.exec {
let status = std::process::Command::new(&exec.executable)
.args(&exec.args)
.envs(&exec.envs)
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.ok()
.unwrap();
exit(status.code().unwrap())
}
match unsafe { fork() } {
Ok(ForkResult::Parent { child, .. }) => {
pids_to_wait.insert(child);
goto_next_sibling(process.depth, &mut iter);
}
Ok(ForkResult::Child) => {
depth = process.depth;
pids_to_wait.clear();
}
Err(_) => {}
}
}
}
#[inline]
fn try_wait_pid(pids: &mut HashSet<Pid>) {
let mut exited: Option<Pid> = None;
for pid in pids.iter() {
match waitpid(*pid, Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::StillAlive) => {}
Ok(_) | Err(_) => {
exited = Some(*pid);
break;
}
}
}
if let Some(pid) = exited {
pids.remove(&pid);
}
}
#[inline]
fn goto_next_sibling(depth: u32, iter: &mut Peekable<vec_deque::Iter<'_, Process>>) {
while let Some(child) = iter.peek() {
if child.depth == depth {
break;
}
iter.next();
}
}
pub fn generate_tree(rng: &mut StdRng, config: &Config) -> VecDeque<Process> {
let mut nodes = VecDeque::new();
let mut stack = Vec::new();
stack.push(Process::new(1, None));
while let Some(process) = stack.pop() {
let curr_depth = process.depth;
nodes.push_back(process);
if curr_depth + 1 > config.max_depth.get() {
let exec = Exec::new(rng, config);
let process = Process::new(curr_depth + 1, Some(exec));
nodes.push_back(process);
} else {
for _ in 0..config.max_children.get() {
let process = Process::new(curr_depth + 1, None);
stack.push(process);
}
}
}
nodes
}
pub fn get_config(content: &str) -> Result<Config, Error> {
match serde_yaml::from_str::<Config>(content) {
Ok(config) => {
config.validate()?;
Ok(config)
}
Err(e) => Err(Error::from(e)),
}
}