use std::{
num::{NonZeroU32, NonZeroUsize},
path::PathBuf,
str,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use byte_unit::{Byte, ByteUnit};
use futures::future::join_all;
use metrics::{gauge, register_counter};
use rand::{prelude::StdRng, SeedableRng};
use serde::Deserialize;
use tokio::{
fs,
io::{AsyncWriteExt, BufWriter},
task::{JoinError, JoinHandle},
};
use tracing::info;
use crate::{
block::{self, chunk_bytes, construct_block_cache, Block},
payload,
signals::Shutdown,
throttle::{self, Throttle},
};
use super::General;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Io error: {0}")]
Io(#[from] ::std::io::Error),
#[error("Block creation error: {0}")]
Block(#[from] block::Error),
#[error("Child join error: {0}")]
Child(#[from] JoinError),
}
fn default_rotation() -> bool {
true
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct Config {
pub seed: [u8; 32],
pub path_template: String,
pub duplicates: u8,
pub variant: payload::Config,
maximum_bytes_per_file: Byte,
bytes_per_second: Byte,
pub block_sizes: Option<Vec<byte_unit::Byte>>,
maximum_prebuild_cache_size_bytes: Byte,
#[serde(default = "default_rotation")]
rotate: bool,
#[serde(default)]
pub throttle: throttle::Config,
}
#[derive(Debug)]
pub struct FileGen {
handles: Vec<JoinHandle<Result<(), Error>>>,
shutdown: Shutdown,
}
impl FileGen {
#[allow(clippy::cast_possible_truncation)]
pub fn new(general: General, config: Config, shutdown: Shutdown) -> Result<Self, Error> {
let mut rng = StdRng::from_seed(config.seed);
let block_sizes: Vec<NonZeroUsize> = config
.block_sizes
.unwrap_or_else(|| {
vec![
Byte::from_unit(1_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(2_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(4_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(8_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(16_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(32_f64, ByteUnit::MB).unwrap(),
]
})
.iter()
.map(|sz| NonZeroUsize::new(sz.get_bytes() as usize).expect("bytes must be non-zero"))
.collect();
let mut labels = vec![
("component".to_string(), "generator".to_string()),
("component_name".to_string(), "file_gen".to_string()),
];
if let Some(id) = general.id {
labels.push(("id".to_string(), id));
}
let bytes_per_second = NonZeroU32::new(config.bytes_per_second.get_bytes() as u32).unwrap();
gauge!(
"bytes_per_second",
f64::from(bytes_per_second.get()),
&labels
);
let maximum_bytes_per_file =
NonZeroU32::new(config.maximum_bytes_per_file.get_bytes() as u32).unwrap();
let maximum_prebuild_cache_size_bytes =
NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as u32).unwrap();
let block_chunks = chunk_bytes(
&mut rng,
NonZeroUsize::new(maximum_prebuild_cache_size_bytes.get() as usize)
.expect("bytes must be non-zero"),
&block_sizes,
)?;
let labels = vec![
("component".to_string(), "generator".to_string()),
("component_name".to_string(), "file_gen".to_string()),
];
let mut handles = Vec::new();
let file_index = Arc::new(AtomicU32::new(0));
let block_cache = construct_block_cache(&mut rng, &config.variant, &block_chunks, &labels);
let block_cache = Arc::new(block_cache);
for _ in 0..config.duplicates {
let throttle =
Throttle::new_with_config(config.throttle, bytes_per_second, labels.clone());
let child = Child {
path_template: config.path_template.clone(),
maximum_bytes_per_file,
bytes_per_second,
throttle,
block_cache: Arc::clone(&block_cache),
file_index: Arc::clone(&file_index),
rotate: config.rotate,
shutdown: shutdown.clone(),
};
handles.push(tokio::spawn(child.spin()));
}
Ok(Self { handles, shutdown })
}
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::cast_possible_truncation)]
pub async fn spin(mut self) -> Result<(), Error> {
self.shutdown.recv().await;
info!("shutdown signal received");
for res in join_all(self.handles.drain(..)).await {
match res {
Ok(Ok(())) => continue,
Ok(Err(err)) => return Err(err),
Err(err) => return Err(Error::Child(err)),
}
}
Ok(())
}
}
struct Child {
path_template: String,
maximum_bytes_per_file: NonZeroU32,
bytes_per_second: NonZeroU32,
throttle: Throttle,
block_cache: Arc<Vec<Block>>,
rotate: bool,
file_index: Arc<AtomicU32>,
shutdown: Shutdown,
}
impl Child {
pub(crate) async fn spin(mut self) -> Result<(), Error> {
let bytes_per_second = self.bytes_per_second.get() as usize;
let mut total_bytes_written: u64 = 0;
let maximum_bytes_per_file: u64 = u64::from(self.maximum_bytes_per_file.get());
let mut file_index = self.file_index.fetch_add(1, Ordering::Relaxed);
let mut path = path_from_template(&self.path_template, file_index);
let mut fp = BufWriter::with_capacity(
bytes_per_second,
fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&path)
.await?,
);
let mut idx = 0;
let blocks = self.block_cache;
let bytes_written = register_counter!("bytes_written");
loop {
let block = &blocks[idx];
let total_bytes = block.total_bytes;
tokio::select! {
_ = self.throttle.wait_for(total_bytes) => {
idx = (idx + 1) % blocks.len();
let total_bytes = u64::from(total_bytes.get());
{
fp.write_all(&block.bytes).await?;
bytes_written.increment(total_bytes);
total_bytes_written += total_bytes;
}
if total_bytes_written > maximum_bytes_per_file {
fp.flush().await?;
if self.rotate {
fs::remove_file(&path).await?;
}
file_index = self.file_index.fetch_add(1, Ordering::Relaxed);
path = path_from_template(&self.path_template, file_index);
fp = BufWriter::with_capacity(
bytes_per_second,
fs::OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(&path)
.await?,
);
total_bytes_written = 0;
}
}
_ = self.shutdown.recv() => {
fp.flush().await?;
info!("shutdown signal received");
return Ok(());
},
}
}
}
}
#[inline]
fn path_from_template(path_template: &str, index: u32) -> PathBuf {
let fidx = format!("{index:04}");
let full_path = path_template.replace("%NNN%", &fidx);
PathBuf::from(full_path)
}