use crate::calendar::civil_from_timestamp;
use crate::error::{Error, Result};
use crate::ohlcv::Candle;
use crate::traits::Indicator;
#[derive(Debug, Clone, PartialEq)]
pub struct VolumeByTimeProfileOutput {
pub bins: Vec<f64>,
}
#[derive(Debug, Clone)]
pub struct VolumeByTimeProfile {
buckets: usize,
utc_offset_minutes: i32,
sum: Vec<f64>,
count: Vec<u64>,
last: Option<VolumeByTimeProfileOutput>,
}
impl VolumeByTimeProfile {
pub fn new(buckets: usize, utc_offset_minutes: i32) -> Result<Self> {
if buckets == 0 {
return Err(Error::PeriodZero);
}
Ok(Self {
buckets,
utc_offset_minutes,
sum: vec![0.0; buckets],
count: vec![0; buckets],
last: None,
})
}
pub const fn params(&self) -> (usize, i32) {
(self.buckets, self.utc_offset_minutes)
}
pub fn value(&self) -> Option<&VolumeByTimeProfileOutput> {
self.last.as_ref()
}
fn bucket_of(&self, minute_of_day: u32) -> usize {
let raw = (minute_of_day as usize * self.buckets) / 1440;
raw.min(self.buckets - 1)
}
fn snapshot(&self) -> VolumeByTimeProfileOutput {
let bins = self
.sum
.iter()
.zip(&self.count)
.map(|(total, n)| if *n > 0 { total / *n as f64 } else { 0.0 })
.collect();
VolumeByTimeProfileOutput { bins }
}
}
impl Indicator for VolumeByTimeProfile {
type Input = Candle;
type Output = VolumeByTimeProfileOutput;
fn update(&mut self, candle: Candle) -> Option<VolumeByTimeProfileOutput> {
let civil = civil_from_timestamp(candle.timestamp, self.utc_offset_minutes);
let bucket = self.bucket_of(civil.minute_of_day());
self.sum[bucket] += candle.volume;
self.count[bucket] += 1;
let out = self.snapshot();
self.last = Some(out.clone());
Some(out)
}
fn reset(&mut self) {
self.sum.iter_mut().for_each(|x| *x = 0.0);
self.count.iter_mut().for_each(|x| *x = 0);
self.last = None;
}
fn warmup_period(&self) -> usize {
1
}
fn is_ready(&self) -> bool {
self.last.is_some()
}
fn name(&self) -> &'static str {
"VolumeByTimeProfile"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::traits::BatchExt;
use approx::assert_relative_eq;
const HOUR: i64 = 3_600_000;
fn c(volume: f64, ts: i64) -> Candle {
Candle::new(100.0, 100.0, 100.0, 100.0, volume, ts).unwrap()
}
#[test]
fn rejects_zero_buckets() {
assert!(matches!(
VolumeByTimeProfile::new(0, 0),
Err(Error::PeriodZero)
));
}
#[test]
fn metadata_and_accessors() {
let prof = VolumeByTimeProfile::new(24, -60).unwrap();
assert_eq!(prof.params(), (24, -60));
assert_eq!(prof.name(), "VolumeByTimeProfile");
assert_eq!(prof.warmup_period(), 1);
assert!(!prof.is_ready());
assert!(prof.value().is_none());
}
#[test]
fn emits_from_first_bar_and_means_volume() {
let mut prof = VolumeByTimeProfile::new(24, 0).unwrap();
let out = prof.update(c(500.0, HOUR)).unwrap(); assert_eq!(out.bins.len(), 24);
assert_relative_eq!(out.bins[1], 500.0);
assert_relative_eq!(out.bins[0], 0.0);
assert!(prof.is_ready());
let out = prof.update(c(700.0, 25 * HOUR)).unwrap();
assert_relative_eq!(out.bins[1], 600.0);
}
#[test]
fn last_bucket_clamped() {
let mut prof = VolumeByTimeProfile::new(24, 0).unwrap();
let out = prof.update(c(300.0, 23 * HOUR + 59 * 60_000)).unwrap();
assert_relative_eq!(out.bins[23], 300.0);
}
#[test]
fn reset_clears_state() {
let mut prof = VolumeByTimeProfile::new(24, 0).unwrap();
prof.update(c(500.0, HOUR));
prof.reset();
assert!(!prof.is_ready());
assert!(prof.value().is_none());
let out = prof.update(c(100.0, 2 * HOUR)).unwrap();
assert_relative_eq!(out.bins[2], 100.0);
}
#[test]
fn batch_equals_streaming() {
let candles: Vec<Candle> = (0..50)
.map(|i| c(100.0 + f64::from(i % 8), i64::from(i) * HOUR))
.collect();
let mut a = VolumeByTimeProfile::new(12, 0).unwrap();
let mut b = VolumeByTimeProfile::new(12, 0).unwrap();
assert_eq!(
a.batch(&candles),
candles.iter().map(|x| b.update(*x)).collect::<Vec<_>>()
);
}
}