use crate::error::{NeuralError, Result};
#[cfg(feature = "candle")]
use candle_core::{Device, Tensor};
use polars::prelude::*;
use rand::seq::SliceRandom;
use rayon::prelude::*;
use std::path::Path;
pub struct TimeSeriesDataset {
pub data: DataFrame,
pub targets: DataFrame,
pub sequence_length: usize,
pub horizon: usize,
indices: Vec<usize>,
}
impl TimeSeriesDataset {
pub fn new(
df: DataFrame,
target_col: &str,
sequence_length: usize,
horizon: usize,
) -> Result<Self> {
if df.height() < sequence_length + horizon {
return Err(NeuralError::data(format!(
"DataFrame too small: {} rows, need at least {}",
df.height(),
sequence_length + horizon
)));
}
let _targets = df
.select([target_col])
.map_err(|e| NeuralError::data(format!("Failed to select target column: {}", e)))?;
let max_idx = df.height() - sequence_length - horizon;
let indices: Vec<usize> = (0..max_idx).collect();
Ok(Self {
data: df,
targets,
sequence_length,
horizon,
indices,
})
}
pub fn from_csv(path: impl AsRef<Path>, target_col: &str, sequence_length: usize, horizon: usize) -> Result<Self> {
let df = CsvReader::from_path(path)
.map_err(|e| NeuralError::data(format!("Failed to read CSV: {}", e)))?
.finish()
.map_err(|e| NeuralError::data(format!("Failed to parse CSV: {}", e)))?;
Self::new(df, target_col, sequence_length, horizon)
}
pub fn from_parquet(path: impl AsRef<Path>, target_col: &str, sequence_length: usize, horizon: usize) -> Result<Self> {
let df = ParquetReader::new(std::fs::File::open(path)?)
.finish()
.map_err(|e| NeuralError::data(format!("Failed to read Parquet: {}", e)))?;
Self::new(df, target_col, sequence_length, horizon)
}
pub fn len(&self) -> usize {
self.indices.len()
}
pub fn is_empty(&self) -> bool {
self.indices.is_empty()
}
pub fn get(&self, idx: usize) -> Result<(Vec<f64>, Vec<f64>)> {
if idx >= self.indices.len() {
return Err(NeuralError::data(format!("Index {} out of bounds", idx)));
}
let start_idx = self.indices[idx];
let end_idx = start_idx + self.sequence_length;
let target_end = end_idx + self.horizon;
let input_slice = self.data.slice((start_idx as i64), self.sequence_length);
let input = self.dataframe_to_vec(&input_slice)?;
let target_slice = self.targets.slice((end_idx as i64), self.horizon);
let target = self.dataframe_to_vec(&target_slice)?;
Ok((input, target))
}
fn dataframe_to_vec(&self, df: &DataFrame) -> Result<Vec<f64>> {
let mut result = Vec::new();
for col in df.get_columns() {
match col.dtype() {
DataType::Float64 => {
let series = col.f64().map_err(|e| NeuralError::data(format!("Failed to cast column: {}", e)))?;
for val in series.into_iter() {
result.push(val.unwrap_or(0.0));
}
}
DataType::Float32 => {
let series = col.f32().map_err(|e| NeuralError::data(format!("Failed to cast column: {}", e)))?;
for val in series.into_iter() {
result.push(val.unwrap_or(0.0) as f64);
}
}
DataType::Int64 => {
let series = col.i64().map_err(|e| NeuralError::data(format!("Failed to cast column: {}", e)))?;
for val in series.into_iter() {
result.push(val.unwrap_or(0) as f64);
}
}
_ => {
return Err(NeuralError::data(format!("Unsupported column type: {:?}", col.dtype())));
}
}
}
Ok(result)
}
pub fn shuffle(&mut self) {
let mut rng = rand::thread_rng();
self.indices.shuffle(&mut rng);
}
pub fn train_val_split(&self, val_split: f64) -> Result<(Self, Self)> {
if !(0.0..1.0).contains(&val_split) {
return Err(NeuralError::data("val_split must be between 0 and 1"));
}
let val_size = (self.len() as f64 * val_split) as usize;
let train_size = self.len() - val_size;
let train_indices = self.indices[..train_size].to_vec();
let val_indices = self.indices[train_size..].to_vec();
let train_dataset = Self {
data: self.data.clone(),
targets: self.targets.clone(),
sequence_length: self.sequence_length,
horizon: self.horizon,
indices: train_indices,
};
let val_dataset = Self {
data: self.data.clone(),
targets: self.targets.clone(),
sequence_length: self.sequence_length,
horizon: self.horizon,
indices: val_indices,
};
Ok((train_dataset, val_dataset))
}
}
pub struct DataLoader {
dataset: TimeSeriesDataset,
batch_size: usize,
shuffle: bool,
drop_last: bool,
num_workers: usize,
current_idx: usize,
}
impl DataLoader {
pub fn new(dataset: TimeSeriesDataset, batch_size: usize) -> Self {
Self {
dataset,
batch_size,
shuffle: false,
drop_last: false,
num_workers: num_cpus::get(),
current_idx: 0,
}
}
pub fn with_shuffle(mut self, shuffle: bool) -> Self {
self.shuffle = shuffle;
self
}
pub fn with_drop_last(mut self, drop_last: bool) -> Self {
self.drop_last = drop_last;
self
}
pub fn with_num_workers(mut self, num_workers: usize) -> Self {
self.num_workers = num_workers;
self
}
pub fn num_batches(&self) -> usize {
let total = self.dataset.len();
if self.drop_last {
total / self.batch_size
} else {
(total + self.batch_size - 1) / self.batch_size
}
}
pub fn reset(&mut self) {
self.current_idx = 0;
if self.shuffle {
self.dataset.shuffle();
}
}
pub fn next_batch(&mut self, device: &Device) -> Result<Option<(Tensor, Tensor)>> {
if self.current_idx >= self.dataset.len() {
return Ok(None);
}
let end_idx = (self.current_idx + self.batch_size).min(self.dataset.len());
let batch_size = end_idx - self.current_idx;
if self.drop_last && batch_size < self.batch_size {
return Ok(None);
}
let samples: Vec<_> = (self.current_idx..end_idx)
.into_par_iter()
.map(|idx| self.dataset.get(idx))
.collect::<Result<Vec<_>>>()?;
let (inputs, targets): (Vec<_>, Vec<_>) = samples.into_iter().unzip();
let input_tensor = self.vec_to_tensor(&inputs, device)?;
let target_tensor = self.vec_to_tensor(&targets, device)?;
self.current_idx = end_idx;
Ok(Some((input_tensor, target_tensor)))
}
fn vec_to_tensor(&self, data: &[Vec<f64>], device: &Device) -> Result<Tensor> {
let batch_size = data.len();
let seq_len = data[0].len();
let flat: Vec<f64> = data.iter().flatten().copied().collect();
Tensor::from_vec(flat, (batch_size, seq_len), device)
.map_err(|e| NeuralError::data(format!("Failed to create tensor: {}", e)))
}
pub fn iter_batches<'a>(
&'a mut self,
device: &'a Device,
) -> impl Iterator<Item = Result<(Tensor, Tensor)>> + 'a {
std::iter::from_fn(move || {
match self.next_batch(device) {
Ok(Some(batch)) => Some(Ok(batch)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_dataframe() -> DataFrame {
let values: Vec<f64> = (0..1000).map(|x| x as f64).collect();
let dates: Vec<String> = (0..1000).map(|i| format!("2024-01-{}", i % 30 + 1)).collect();
df!(
"date" => dates,
"value" => values.clone(),
"value2" => values.iter().map(|x| x * 2.0).collect::<Vec<_>>()
).unwrap()
}
#[test]
fn test_dataset_creation() {
let df = create_test_dataframe();
let dataset = TimeSeriesDataset::new(df, "value", 100, 24).unwrap();
assert_eq!(dataset.sequence_length, 100);
assert_eq!(dataset.horizon, 24);
assert!(dataset.len() > 0);
}
#[test]
fn test_dataset_get() {
let df = create_test_dataframe();
let dataset = TimeSeriesDataset::new(df, "value", 100, 24).unwrap();
let (input, target) = dataset.get(0).unwrap();
assert_eq!(input.len(), 100 * 2); assert_eq!(target.len(), 24);
}
#[test]
fn test_train_val_split() {
let df = create_test_dataframe();
let dataset = TimeSeriesDataset::new(df, "value", 100, 24).unwrap();
let (train, val) = dataset.train_val_split(0.2).unwrap();
let total_len = dataset.len();
assert!(train.len() > val.len());
assert_eq!(train.len() + val.len(), total_len);
}
#[test]
fn test_dataloader() {
let df = create_test_dataframe();
let dataset = TimeSeriesDataset::new(df, "value", 100, 24).unwrap();
let mut loader = DataLoader::new(dataset, 32)
.with_shuffle(true)
.with_drop_last(false);
assert!(loader.num_batches() > 0);
}
#[test]
fn test_dataloader_iteration() {
let df = create_test_dataframe();
let dataset = TimeSeriesDataset::new(df, "value", 100, 24).unwrap();
let device = Device::Cpu;
let mut loader = DataLoader::new(dataset, 32);
let mut batch_count = 0;
while let Some((inputs, targets)) = loader.next_batch(&device).unwrap() {
batch_count += 1;
assert!(inputs.dims()[0] <= 32); }
assert!(batch_count > 0);
}
}