use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::Path;
use chrono::Utc;
use crate::config::WriteOptions;
use crate::dataset::{ColumnData, Dataset};
use crate::error::{Error, Result};
use crate::schema::DatasetSchema;
use crate::xpt::v5::constants::{
LIBRARY_HEADER, MEMBER_HEADER, MEMBER_HEADER_DATA, NAMESTR_HEADER, OBS_HEADER, PAD_CHAR,
RECORD_LEN,
};
use crate::xpt::v5::encoding::encode_ibm_float;
use crate::xpt::v5::namestr::pack_namestr;
use crate::xpt::v5::record::RecordWriter;
use crate::xpt::v5::timestamp::{
format_sas_timestamp, sas_days_since_1960, sas_seconds_since_1960, sas_seconds_since_midnight,
};
pub struct XptWriter<W: Write> {
writer: RecordWriter<W>,
options: WriteOptions,
}
impl<W: Write> XptWriter<W> {
pub(crate) fn new(writer: W, options: WriteOptions) -> Self {
Self {
writer: RecordWriter::new(writer),
options,
}
}
pub(crate) fn write(mut self, dataset: &Dataset, plan: &DatasetSchema) -> Result<W> {
self.write_library_header()?;
self.write_member(dataset, plan)?;
self.writer.finish().map_err(Error::Io)
}
fn write_library_header(&mut self) -> Result<()> {
self.writer
.write_record(LIBRARY_HEADER)
.map_err(Error::Io)?;
let now = Utc::now();
let created = self.options.created.unwrap_or(now);
let modified = self.options.modified.unwrap_or(now);
let created_str = format_sas_timestamp(created);
let modified_str = format_sas_timestamp(modified);
let mut rec2 = [PAD_CHAR; RECORD_LEN];
rec2[..24].copy_from_slice(b"SAS SAS SASLIB ");
rec2[24..32].copy_from_slice(b"9.4 "); rec2[64..80].copy_from_slice(created_str.as_bytes());
self.writer.write_record(&rec2).map_err(Error::Io)?;
let mut rec3 = [PAD_CHAR; RECORD_LEN];
rec3[..16].copy_from_slice(modified_str.as_bytes());
self.writer.write_record(&rec3).map_err(Error::Io)?;
Ok(())
}
fn write_member(&mut self, dataset: &Dataset, plan: &DatasetSchema) -> Result<()> {
self.write_member_header(plan)?;
self.write_namestr_section(plan)?;
self.write_observations(dataset, plan)?;
Ok(())
}
fn write_member_header(&mut self, plan: &DatasetSchema) -> Result<()> {
let now = Utc::now();
let created = self.options.created.unwrap_or(now);
let modified = self.options.modified.unwrap_or(now);
let created_str = format_sas_timestamp(created);
let modified_str = format_sas_timestamp(modified);
self.writer.write_record(MEMBER_HEADER).map_err(Error::Io)?;
self.writer
.write_record(MEMBER_HEADER_DATA)
.map_err(Error::Io)?;
let mut rec1 = [PAD_CHAR; RECORD_LEN];
rec1[..8].copy_from_slice(b"SAS ");
rec1[8..16].copy_from_slice(pad_string(&plan.domain_code, 8).as_slice());
rec1[16..24].copy_from_slice(b"SASDATA ");
rec1[24..32].copy_from_slice(b"9.4 "); rec1[64..80].copy_from_slice(created_str.as_bytes());
self.writer.write_record(&rec1).map_err(Error::Io)?;
let mut rec2 = [PAD_CHAR; RECORD_LEN];
rec2[..16].copy_from_slice(modified_str.as_bytes());
if let Some(ref label) = plan.dataset_label {
let label_bytes = pad_string(label, 40);
rec2[32..72].copy_from_slice(&label_bytes);
}
self.writer.write_record(&rec2).map_err(Error::Io)?;
Ok(())
}
fn write_namestr_section(&mut self, plan: &DatasetSchema) -> Result<()> {
let nvars = plan.variables.len();
let mut header = [PAD_CHAR; RECORD_LEN];
header[..54].copy_from_slice(NAMESTR_HEADER);
let nvars_str = format!("{:04}", nvars);
header[54..58].copy_from_slice(nvars_str.as_bytes());
header[58..78].copy_from_slice(b"00000000000000000000");
header[78..80].copy_from_slice(b" ");
self.writer.write_record(&header).map_err(Error::Io)?;
for (i, var) in plan.variables.iter().enumerate() {
let namestr = pack_namestr(var, i)?;
self.writer.write_bytes(&namestr).map_err(Error::Io)?;
}
self.writer.pad_and_flush().map_err(Error::Io)?;
self.writer.write_record(OBS_HEADER).map_err(Error::Io)?;
Ok(())
}
fn write_observations(&mut self, dataset: &Dataset, plan: &DatasetSchema) -> Result<()> {
for row_idx in 0..dataset.nrows() {
for var in &plan.variables {
let col = dataset.column(&var.name).ok_or_else(|| {
Error::invalid_schema(format!("column '{}' not found in dataset", var.name))
})?;
if var.xpt_type.is_numeric() {
let value = get_numeric_value(col.data(), row_idx)?;
let bytes = encode_ibm_float(value);
self.writer.write_bytes(&bytes).map_err(Error::Io)?;
} else {
let value = get_character_value(col.data(), row_idx)?;
let bytes = pad_string(&value.unwrap_or_default(), var.length);
self.writer.write_bytes(&bytes).map_err(Error::Io)?;
}
}
}
self.writer.pad_and_flush().map_err(Error::Io)?;
Ok(())
}
}
impl XptWriter<BufWriter<File>> {
pub(crate) fn create(path: impl AsRef<Path>, options: WriteOptions) -> Result<Self> {
let file = File::create(path.as_ref()).map_err(Error::Io)?;
Ok(Self::new(BufWriter::new(file), options))
}
}
fn get_numeric_value(data: &ColumnData, row: usize) -> Result<Option<f64>> {
match data {
ColumnData::F64(v) => Ok(v.get(row).copied().flatten()),
ColumnData::I64(v) => Ok(v.get(row).copied().flatten().map(|i| i as f64)),
ColumnData::Bool(v) => Ok(v
.get(row)
.copied()
.flatten()
.map(|b| if b { 1.0 } else { 0.0 })),
ColumnData::Date(v) => Ok(v
.get(row)
.copied()
.flatten()
.map(|d| sas_days_since_1960(d) as f64)),
ColumnData::DateTime(v) => Ok(v
.get(row)
.copied()
.flatten()
.map(|dt| sas_seconds_since_1960(dt) as f64)),
ColumnData::Time(v) => Ok(v
.get(row)
.copied()
.flatten()
.map(|t| sas_seconds_since_midnight(t) as f64)),
_ => Err(Error::invalid_schema("expected numeric column data type")),
}
}
fn get_character_value(data: &ColumnData, row: usize) -> Result<Option<String>> {
match data {
ColumnData::String(v) => Ok(v.get(row).cloned().flatten()),
ColumnData::Bytes(v) => Ok(v
.get(row)
.cloned()
.flatten()
.map(|b| String::from_utf8_lossy(&b).into_owned())),
ColumnData::Date(v) => Ok(v
.get(row)
.copied()
.flatten()
.map(|d| d.format("%Y-%m-%d").to_string())),
ColumnData::DateTime(v) => Ok(v
.get(row)
.copied()
.flatten()
.map(|dt| dt.format("%Y-%m-%dT%H:%M:%S").to_string())),
ColumnData::Time(v) => Ok(v
.get(row)
.copied()
.flatten()
.map(|t| t.format("%H:%M:%S").to_string())),
_ => Err(Error::invalid_schema("expected character column data type")),
}
}
fn pad_string(s: &str, len: usize) -> Vec<u8> {
let mut bytes = s.as_bytes().to_vec();
bytes.truncate(len);
bytes.resize(len, PAD_CHAR);
bytes
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dataset::Column;
use crate::schema::plan::VariableSpec;
use std::io::Cursor;
#[test]
fn test_write_empty_dataset() {
let dataset = Dataset::new("AE", vec![]).unwrap();
let mut plan = DatasetSchema::new("AE");
plan.recalculate_positions();
let output = Vec::new();
let writer = XptWriter::new(Cursor::new(output), WriteOptions::default());
let result = writer.write(&dataset, &plan);
assert!(result.is_ok());
}
#[test]
fn test_write_simple_dataset() {
let dataset = Dataset::new(
"AE",
vec![Column::new(
"AESEQ",
ColumnData::F64(vec![Some(1.0), Some(2.0)]),
)],
)
.unwrap();
let mut plan = DatasetSchema::new("AE");
plan.variables = vec![VariableSpec::numeric("AESEQ")];
plan.recalculate_positions();
let output = Vec::new();
let writer = XptWriter::new(Cursor::new(output), WriteOptions::default());
let result = writer.write(&dataset, &plan);
assert!(result.is_ok());
}
}