use std::io::{BufWriter, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::fs::File;
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use std::sync::mpsc::{Sender, channel};
use std::thread::{self, Builder, JoinHandle};
use tracing::{debug, error, info};
#[cfg(feature = "python")]
use pyo3::prelude::*;
use crate::controller::context::ControllerCtx;
use crate::py_json_methods;
use super::{Dispatcher, Overflow, csv_header, csv_row_fixed_width, resource_name_with_suffix};
#[derive(Serialize, Deserialize, Default)]
#[cfg_attr(feature = "python", pyclass)]
pub struct CsvDispatcher {
chunk_size_megabytes: usize,
overflow_behavior: Overflow,
#[serde(default, skip_serializing_if = "Option::is_none")]
op_name_suffix: Option<String>,
#[serde(skip)]
worker: Option<WorkerHandle>,
}
impl CsvDispatcher {
pub fn new(chunk_size_megabytes: usize, overflow_behavior: Overflow) -> Box<Self> {
Box::new(Self {
chunk_size_megabytes,
overflow_behavior,
op_name_suffix: None,
worker: None,
})
}
pub fn with_op_name_suffix(mut self: Box<Self>, suffix: &str) -> Box<Self> {
self.op_name_suffix = if suffix.is_empty() {
None
} else {
Some(suffix.to_owned())
};
self
}
}
py_json_methods!(
CsvDispatcher,
Dispatcher,
#[new]
#[pyo3(signature=(chunk_size_megabytes, overflow_behavior, op_name_suffix=None))]
fn py_new(
chunk_size_megabytes: usize,
overflow_behavior: Overflow,
op_name_suffix: Option<String>,
) -> PyResult<Self> {
let dispatcher = Self::new(chunk_size_megabytes, overflow_behavior);
let dispatcher = if let Some(suffix) = op_name_suffix {
dispatcher.with_op_name_suffix(&suffix)
} else {
dispatcher
};
Ok(*dispatcher)
}
);
#[typetag::serde]
impl Dispatcher for CsvDispatcher {
fn init(
&mut self,
ctx: &ControllerCtx,
channel_names: &[String],
core_assignment: usize,
) -> Result<(), String> {
self.worker = None;
let header = csv_header(channel_names);
let total_len = 1024 * 1_024 * self.chunk_size_megabytes;
let resource_name = resource_name_with_suffix(&ctx.op_name, self.op_name_suffix.as_deref());
let filepath = ctx.op_dir.join(format!("{resource_name}.csv"));
info!(
"Initializing CSV dispatcher with file path: {:?}",
&filepath
);
self.worker = Some(WorkerHandle::new(
filepath,
header,
total_len,
self.overflow_behavior,
core_assignment,
)?);
Ok(())
}
fn consume(
&mut self,
time: SystemTime,
timestamp: i64,
channel_values: Vec<f64>,
) -> Result<(), String> {
match &mut self.worker {
Some(worker) => worker
.tx
.send((time, timestamp, channel_values))
.map_err(|e| format!("Failed to queue data to write to CSV: {e}")),
None => Err("Dispatcher must be initialized before consuming data".to_string()),
}
}
fn terminate(&mut self) -> Result<(), String> {
self.worker = None;
Ok(())
}
}
struct WorkerHandle {
pub tx: Sender<(SystemTime, i64, Vec<f64>)>,
_thread: JoinHandle<()>,
}
impl WorkerHandle {
fn new(
path: PathBuf,
header: String,
total_size: usize,
overflow_behavior: Overflow,
core_assignment: usize,
) -> Result<Self, String> {
let (tx, rx) = channel::<(SystemTime, i64, Vec<f64>)>();
let original_filename = path.file_stem().unwrap().to_str().unwrap().to_owned();
let header_len = header.len();
let mut writer = new_file(&path, &header, total_size)?;
let mut file_size = header_len;
let mut shard_number: u64 = 0;
let _thread = Builder::new()
.name("csv-dispatcher".to_string())
.spawn(move || {
{
let success = core_affinity::set_for_current(core_affinity::CoreId {
id: core_assignment,
});
if !success {
debug!(
"CSV dispatcher: core_affinity::set_for_current returned false \
(expected on macOS and other platforms without hard affinity)"
);
}
}
let mut stringbuf = String::new();
loop {
match rx.recv() {
Err(_) => {
let _ = writer.flush();
let mut file = writer.into_inner().unwrap();
let len = get_file_loc(&mut file);
let _ = file.set_len(len);
return;
}
Ok((time, timestamp, channel_values)) => {
csv_row_fixed_width(
&mut stringbuf,
(time, timestamp, channel_values.as_slice()),
);
let n_to_write = stringbuf.len();
if get_file_loc(&mut writer) as usize + n_to_write > total_size {
match overflow_behavior {
Overflow::Wrap => {
writer.seek(SeekFrom::Start(header_len as u64)).unwrap();
}
Overflow::Error => {
error!(
"CSV file is full with overflow policy set to Error"
);
return;
}
Overflow::NewFile => {
shard_number += 1;
let filename_new =
format!("{original_filename}_{shard_number}.csv");
info!("Reserving new CSV file at {}", &filename_new);
let path_new: PathBuf =
path.parent().unwrap().join(filename_new);
writer = new_file(&path_new, &header, total_size).unwrap();
info!("CSV dispatcher moved to new file at {path_new:?}")
}
}
}
writer.write_all(stringbuf.as_bytes()).unwrap();
file_size = file_size.max(get_file_loc(&mut writer) as usize);
}
}
thread::yield_now();
}
})
.expect("Failed to spawn CSV dispatcher thread");
Ok(Self { tx, _thread })
}
}
fn get_file_loc<T: Seek>(f: &mut T) -> u64 {
f.stream_position().unwrap()
}
fn new_file(path: &PathBuf, header: &str, total_size: usize) -> Result<BufWriter<File>, String> {
let file = File::create(path).map_err(|e| format!("{e}"))?;
file.set_len(total_size as u64)
.map_err(|e| format!("{e}"))?;
let mut writer: BufWriter<File> = BufWriter::new(file);
writer
.write_all(header.as_bytes())
.map_err(|e| format!("{e}"))?;
Ok(writer)
}