use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::BufReader;
use polars::prelude::*;
use chrono::{TimeZone, NaiveDateTime, Utc};
use crate::utils::date_utils::to_date;
#[derive(Debug, Copy, Clone)]
pub struct IntervalDays {
pub average: f64,
pub mode: f64,
}
#[derive(Debug, Clone)]
pub struct KLINE {
pub ticker: String,
pub timestamp: Vec<i64>,
pub open: Option<Vec<f64>>,
pub high: Option<Vec<f64>>,
pub low: Option<Vec<f64>>,
pub close: Vec<f64>,
pub volume: Option<Vec<f64>>,
pub adjclose: Option<Vec<f64>>,
}
impl KLINE {
pub fn interval_days(&self) -> IntervalDays {
let mut intervals = Vec::new();
let mut total_seconds = 0.0;
for window in self.timestamp.windows(2) {
let diff = (window[1] - window[0]) as f64;
let days = diff / 86400.0;
intervals.push(days);
total_seconds += diff;
}
let avg = total_seconds / (intervals.len() as f64 * 86400.0);
let mut interval_counts = HashMap::new();
let mut max_count = 0;
let mut mode = 0.0;
for &interval in &intervals {
let key = (interval * 10000.0) as i64;
let count = interval_counts.entry(key).or_insert(0);
*count += 1;
if *count > max_count || (*count == max_count && key < (mode * 10000.0) as i64) {
max_count = *count;
mode = key as f64 / 10000.0;
}
}
IntervalDays {
average: avg,
mode,
}
}
pub fn start_date(&self) -> String {
let start_timestamp = self.timestamp[0];
to_date(start_timestamp)
}
pub fn end_date(&self) -> String {
let end_timestamp = self.timestamp[self.timestamp.len() - 1];
to_date(end_timestamp)
}
pub fn to_dataframe(&self) -> Result<DataFrame, Box<dyn Error>> {
let len = self.timestamp.len();
if self.close.len() != len {
return Err( "close length does not match timestamp".into());
}
let check_length = |name: &str, opt_vec: &Option<Vec<f64>>| -> Result<(), Box<dyn Error>> {
if let Some(vec) = opt_vec {
if vec.len() != len {
return Err(format!("{name} length does not match timestamp").into());
}
}
Ok(())
};
check_length("open", &self.open)?;
check_length("high", &self.high)?;
check_length("low", &self.low)?;
check_length("volume", &self.volume)?;
check_length("adjclose", &self.adjclose)?;
let open_data = self.open.clone().unwrap_or_else(|| self.close.clone());
let high_data = self.high.clone().unwrap_or_else(|| self.close.clone());
let low_data = self.low.clone().unwrap_or_else(|| self.close.clone());
let volume_data = self.volume.clone().unwrap_or_else(|| vec![0.0; len]);
let adj_close_data = self.adjclose.clone().unwrap_or_else(|| self.close.clone());
let datetime_vec: Result<Vec<NaiveDateTime>, _> = self.timestamp
.iter()
.map(|&ts| {
Utc.timestamp_opt(ts, 0)
.single()
.ok_or_else(|| PolarsError::ComputeError(
format!("Invalid timestamp: {ts}").into()
))
.map(|dt| dt.naive_utc())
})
.collect();
let cols = vec![
Column::new("timestamp".into(), datetime_vec?),
Column::new("open".into(), open_data),
Column::new("high".into(), high_data),
Column::new("low".into(), low_data),
Column::new("close".into(), self.close.clone()),
Column::new("volume".into(), volume_data),
Column::new("adjclose".into(), adj_close_data),
];
let df = DataFrame::new(cols)?;
Ok(df)
}
pub fn from_dataframe(ticker: &str, df: &DataFrame) -> Result<KLINE, Box<dyn Error>> {
let timestamp = df.column("timestamp")?.i64()?.to_vec().iter()
.map(|&x| x.unwrap_or_default()).collect::<Vec<i64>>();
let open = match df.column("open") {
Ok(col) => Some(col.f64()?.to_vec().iter().map(|&x| x.unwrap_or_default()).collect::<Vec<f64>>()),
Err(_) => None,
};
let high = match df.column("high") {
Ok(col) => Some(col.f64()?.to_vec().iter().map(|&x| x.unwrap_or_default()).collect::<Vec<f64>>()),
Err(_) => None,
};
let low = match df.column("low") {
Ok(col) => Some(col.f64()?.to_vec().iter().map(|&x| x.unwrap_or_default()).collect::<Vec<f64>>()),
Err(_) => None,
};
let close = df.column("close")?.f64()?.to_vec().iter()
.map(|&x| x.unwrap_or_default()).collect::<Vec<f64>>();
let volume = match df.column("volume") {
Ok(col) => Some(col.f64()?.to_vec().iter().map(|&x| x.unwrap_or_default()).collect::<Vec<f64>>()),
Err(_) => None,
};
let adjclose = match df.column("adjclose") {
Ok(col) => Some(col.f64()?.to_vec().iter().map(|&x| x.unwrap_or_default()).collect::<Vec<f64>>()),
Err(_) => None,
};
Ok(KLINE {
ticker: ticker.to_string(),
timestamp,
open,
high,
low,
close,
volume,
adjclose,
})
}
pub fn from_csv(ticker: &str, path: &str) -> Result<Self, Box<dyn Error>> {
let df = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(path.into()))?
.finish()?;
KLINE::from_dataframe(ticker, &df)
}
pub fn from_json(ticker: &str, path: &str) -> Result<Self, Box<dyn Error>> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let json_data: serde_json::Value = serde_json::from_reader(reader)?;
let timestamp = json_data["timestamp"]
.as_array()
.ok_or("Missing 'timestamp' field")?
.iter()
.map(|v| v.as_i64().ok_or("Invalid 'timestamp' value"))
.collect::<Result<Vec<_>, _>>()?;
let open = json_data["open"]
.as_array()
.map(|arr| {
arr.iter()
.map(|v| v.as_f64().ok_or("Invalid 'open' value"))
.collect::<Result<Vec<_>, _>>()
})
.transpose()?;
let high = json_data["high"]
.as_array()
.map(|arr| {
arr.iter()
.map(|v| v.as_f64().ok_or("Invalid 'high' value"))
.collect::<Result<Vec<_>, _>>()
})
.transpose()?;
let low = json_data["low"]
.as_array()
.map(|arr| {
arr.iter()
.map(|v| v.as_f64().ok_or("Invalid 'low' value"))
.collect::<Result<Vec<_>, _>>()
})
.transpose()?;
let close = json_data["close"]
.as_array()
.ok_or("Missing 'close' field")?
.iter()
.map(|v| v.as_f64().ok_or("Invalid 'close' value"))
.collect::<Result<Vec<_>, _>>()?;
let volume = json_data["volume"]
.as_array()
.map(|arr| {
arr.iter()
.map(|v| v.as_f64().ok_or("Invalid 'volume' value"))
.collect::<Result<Vec<_>, _>>()
})
.transpose()?;
let adjclose = json_data["adjclose"]
.as_array()
.map(|arr| {
arr.iter()
.map(|v| v.as_f64().ok_or("Invalid 'adjclose' value"))
.collect::<Result<Vec<_>, _>>()
})
.transpose()?;
let kline = KLINE {
ticker: ticker.to_string(),
timestamp,
open,
high,
low,
close,
volume,
adjclose,
};
Ok(kline)
}
}