1#[cfg(all(feature = "python", feature = "cuda"))]
2use crate::cuda::cuda_available;
3#[cfg(all(feature = "python", feature = "cuda"))]
4use crate::cuda::moving_averages::vwap_wrapper::VwapDeviceArrayF32;
5#[cfg(all(feature = "python", feature = "cuda"))]
6use crate::cuda::moving_averages::CudaVwap;
7#[cfg(all(feature = "python", feature = "cuda"))]
8use crate::utilities::dlpack_cuda::export_f32_cuda_dlpack_2d;
9#[cfg(feature = "python")]
10use numpy::{IntoPyArray, PyArray1};
11#[cfg(feature = "python")]
12use pyo3::exceptions::PyValueError;
13#[cfg(feature = "python")]
14use pyo3::prelude::*;
15#[cfg(feature = "python")]
16use pyo3::types::{PyDict, PyList};
17#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
18use serde::{Deserialize, Serialize};
19#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
20use wasm_bindgen::prelude::*;
21
22use crate::utilities::data_loader::{source_type, Candles};
23use crate::utilities::enums::Kernel;
24use crate::utilities::helpers::{
25 alloc_with_nan_prefix, detect_best_batch_kernel, detect_best_kernel, init_matrix_prefixes,
26 make_uninit_matrix,
27};
28use chrono::{Datelike, NaiveDate, NaiveDateTime};
29#[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
30use core::arch::x86_64::*;
31#[cfg(not(target_arch = "wasm32"))]
32use rayon::prelude::*;
33use std::convert::AsRef;
34use std::error::Error;
35use std::mem::MaybeUninit;
36use thiserror::Error;
37
38#[derive(Debug, Clone)]
39pub enum VwapData<'a> {
40 Candles {
41 candles: &'a Candles,
42 source: &'a str,
43 },
44 CandlesPlusPrices {
45 candles: &'a Candles,
46 prices: &'a [f64],
47 },
48 Slice {
49 timestamps: &'a [i64],
50 volumes: &'a [f64],
51 prices: &'a [f64],
52 },
53}
54
55#[derive(Debug, Clone)]
56#[cfg_attr(
57 all(target_arch = "wasm32", feature = "wasm"),
58 derive(Serialize, Deserialize)
59)]
60pub struct VwapParams {
61 pub anchor: Option<String>,
62}
63
64impl Default for VwapParams {
65 fn default() -> Self {
66 Self {
67 anchor: Some("1d".to_string()),
68 }
69 }
70}
71
72#[derive(Debug, Clone)]
73pub struct VwapInput<'a> {
74 pub data: VwapData<'a>,
75 pub params: VwapParams,
76}
77
78impl<'a> VwapInput<'a> {
79 #[inline]
80 pub fn from_candles(candles: &'a Candles, source: &'a str, params: VwapParams) -> Self {
81 Self {
82 data: VwapData::Candles { candles, source },
83 params,
84 }
85 }
86
87 #[inline]
88 pub fn from_candles_plus_prices(
89 candles: &'a Candles,
90 prices: &'a [f64],
91 params: VwapParams,
92 ) -> Self {
93 Self {
94 data: VwapData::CandlesPlusPrices { candles, prices },
95 params,
96 }
97 }
98
99 #[inline]
100 pub fn from_slice(
101 timestamps: &'a [i64],
102 volumes: &'a [f64],
103 prices: &'a [f64],
104 params: VwapParams,
105 ) -> Self {
106 Self {
107 data: VwapData::Slice {
108 timestamps,
109 volumes,
110 prices,
111 },
112 params,
113 }
114 }
115
116 #[inline]
117 pub fn with_default_candles(candles: &'a Candles) -> Self {
118 Self {
119 data: VwapData::Candles {
120 candles,
121 source: "hlc3",
122 },
123 params: VwapParams::default(),
124 }
125 }
126
127 #[inline]
128 pub fn get_anchor(&self) -> &str {
129 self.params.anchor.as_deref().unwrap_or("1d")
130 }
131}
132
133#[derive(Debug, Clone)]
134pub struct VwapOutput {
135 pub values: Vec<f64>,
136}
137
138#[derive(Clone, Debug)]
139pub struct VwapBuilder {
140 anchor: Option<String>,
141 kernel: Kernel,
142}
143
144impl Default for VwapBuilder {
145 fn default() -> Self {
146 Self {
147 anchor: None,
148 kernel: Kernel::Auto,
149 }
150 }
151}
152
153impl VwapBuilder {
154 #[inline(always)]
155 pub fn new() -> Self {
156 Self::default()
157 }
158 #[inline(always)]
159 pub fn anchor(mut self, s: impl Into<String>) -> Self {
160 self.anchor = Some(s.into());
161 self
162 }
163 #[inline(always)]
164 pub fn kernel(mut self, k: Kernel) -> Self {
165 self.kernel = k;
166 self
167 }
168
169 #[inline(always)]
170 pub fn apply(self, candles: &Candles) -> Result<VwapOutput, VwapError> {
171 let params = VwapParams {
172 anchor: self.anchor,
173 };
174 let input = VwapInput::from_candles(candles, "hlc3", params);
175 vwap_with_kernel(&input, self.kernel)
176 }
177
178 #[inline(always)]
179 pub fn apply_slice(
180 self,
181 timestamps: &[i64],
182 volumes: &[f64],
183 prices: &[f64],
184 ) -> Result<VwapOutput, VwapError> {
185 let params = VwapParams {
186 anchor: self.anchor,
187 };
188 let input = VwapInput::from_slice(timestamps, volumes, prices, params);
189 vwap_with_kernel(&input, self.kernel)
190 }
191
192 #[inline(always)]
193 pub fn into_stream(self) -> Result<VwapStream, VwapError> {
194 let params = VwapParams {
195 anchor: self.anchor,
196 };
197 VwapStream::try_new(params)
198 }
199
200 #[inline(always)]
201 pub fn apply_candles_with_source(
202 self,
203 candles: &Candles,
204 source: &str,
205 ) -> Result<VwapOutput, VwapError> {
206 let params = VwapParams {
207 anchor: self.anchor,
208 };
209 let input = VwapInput::from_candles(candles, source, params);
210 vwap_with_kernel(&input, self.kernel)
211 }
212
213 #[inline(always)]
214 pub fn with_default_candles(candles: &Candles) -> Result<VwapOutput, VwapError> {
215 VwapBuilder::new().apply_candles_with_source(candles, "hlc3")
216 }
217}
218
219#[derive(Debug, Error)]
220pub enum VwapError {
221 #[error("vwap: Input data slice is empty.")]
222 EmptyInputData,
223 #[error("vwap: Mismatch in length of timestamps ({timestamps}), prices ({prices}), or volumes ({volumes}).")]
224 MismatchTimestampsPricesVolumes {
225 timestamps: usize,
226 prices: usize,
227 volumes: usize,
228 },
229 #[error("vwap: No data for VWAP calculation.")]
230 NoData,
231 #[error("vwap: Mismatch in length of prices ({prices}) and volumes ({volumes}).")]
232 MismatchPricesVolumes { prices: usize, volumes: usize },
233 #[error("vwap: Error parsing anchor: {msg}")]
234 ParseAnchorError { msg: String },
235 #[error("vwap: Unsupported anchor unit '{unit_char}'.")]
236 UnsupportedAnchorUnit { unit_char: char },
237 #[error("vwap: Error converting timestamp {ts_ms} to month-based anchor.")]
238 MonthConversionError { ts_ms: i64 },
239
240 #[error("vwap: Output length mismatch (expected {expected}, got {got}).")]
241 OutputLengthMismatch { expected: usize, got: usize },
242 #[error("vwap: Invalid kernel for batch path: {0:?}.")]
243 InvalidKernelForBatch(Kernel),
244 #[error("vwap: Invalid range expansion (start='{start}', end='{end}', step={step}).")]
245 InvalidRange {
246 start: String,
247 end: String,
248 step: u32,
249 },
250 #[error("vwap: Not enough valid data (needed {needed}, have {valid}).")]
251 NotEnoughValidData { needed: usize, valid: usize },
252 #[error("vwap: All input values are NaN.")]
253 AllValuesNaN,
254}
255
256#[inline]
257pub fn vwap(input: &VwapInput) -> Result<VwapOutput, VwapError> {
258 vwap_with_kernel(input, Kernel::Auto)
259}
260
261pub fn vwap_with_kernel(input: &VwapInput, kernel: Kernel) -> Result<VwapOutput, VwapError> {
262 let (timestamps, volumes, prices) = match &input.data {
263 VwapData::Candles { candles, source } => {
264 let timestamps: &[i64] = candles
265 .get_timestamp()
266 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
267 let prices: &[f64] = source_type(candles, source);
268 let vols: &[f64] = candles
269 .select_candle_field("volume")
270 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
271 (timestamps, vols, prices)
272 }
273 VwapData::CandlesPlusPrices { candles, prices } => {
274 let timestamps: &[i64] = candles
275 .get_timestamp()
276 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
277 let vols: &[f64] = candles
278 .select_candle_field("volume")
279 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
280 (timestamps, vols, *prices)
281 }
282 VwapData::Slice {
283 timestamps,
284 volumes,
285 prices,
286 } => (*timestamps, *volumes, *prices),
287 };
288
289 let n = prices.len();
290 if timestamps.len() != n || volumes.len() != n {
291 return Err(VwapError::MismatchTimestampsPricesVolumes {
292 timestamps: timestamps.len(),
293 prices: n,
294 volumes: volumes.len(),
295 });
296 }
297 if n == 0 {
298 return Err(VwapError::NoData);
299 }
300
301 let (count, unit_char) = parse_anchor(input.get_anchor())
302 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
303
304 let chosen = match kernel {
305 Kernel::Auto => detect_best_kernel(),
306 other => other,
307 };
308
309 let mut values = alloc_with_nan_prefix(n, 0);
310
311 unsafe {
312 match chosen {
313 Kernel::Scalar | Kernel::ScalarBatch => {
314 vwap_scalar(timestamps, volumes, prices, count, unit_char, &mut values)?
315 }
316 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
317 Kernel::Avx2 | Kernel::Avx2Batch => {
318 vwap_avx2(timestamps, volumes, prices, count, unit_char, &mut values)?
319 }
320 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
321 Kernel::Avx512 | Kernel::Avx512Batch => {
322 vwap_avx512(timestamps, volumes, prices, count, unit_char, &mut values)?
323 }
324 _ => unreachable!(),
325 }
326 }
327
328 Ok(VwapOutput { values })
329}
330
331#[inline]
332pub fn vwap_into_slice(dst: &mut [f64], input: &VwapInput, kern: Kernel) -> Result<(), VwapError> {
333 let (timestamps, volumes, prices) = match &input.data {
334 VwapData::Candles { candles, source } => {
335 let timestamps: &[i64] = candles
336 .get_timestamp()
337 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
338 let prices: &[f64] = source_type(candles, source);
339 let vols: &[f64] = candles
340 .select_candle_field("volume")
341 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
342 (timestamps, vols, prices)
343 }
344 VwapData::CandlesPlusPrices { candles, prices } => {
345 let timestamps: &[i64] = candles
346 .get_timestamp()
347 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
348 let vols: &[f64] = candles
349 .select_candle_field("volume")
350 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
351 (timestamps, vols, *prices)
352 }
353 VwapData::Slice {
354 timestamps,
355 volumes,
356 prices,
357 } => (*timestamps, *volumes, *prices),
358 };
359
360 let n = prices.len();
361 if dst.len() != n {
362 return Err(VwapError::OutputLengthMismatch {
363 expected: n,
364 got: dst.len(),
365 });
366 }
367 if timestamps.len() != n || volumes.len() != n {
368 return Err(VwapError::MismatchTimestampsPricesVolumes {
369 timestamps: timestamps.len(),
370 prices: n,
371 volumes: volumes.len(),
372 });
373 }
374 if n == 0 {
375 return Err(VwapError::NoData);
376 }
377
378 let (count, unit_char) = parse_anchor(input.get_anchor())
379 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
380
381 let chosen = match kern {
382 Kernel::Auto => detect_best_kernel(),
383 other => other,
384 };
385
386 unsafe {
387 match chosen {
388 Kernel::Scalar | Kernel::ScalarBatch => {
389 vwap_scalar(timestamps, volumes, prices, count, unit_char, dst)?
390 }
391 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
392 Kernel::Avx2 | Kernel::Avx2Batch => {
393 vwap_avx2(timestamps, volumes, prices, count, unit_char, dst)?
394 }
395 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
396 Kernel::Avx512 | Kernel::Avx512Batch => {
397 vwap_avx512(timestamps, volumes, prices, count, unit_char, dst)?
398 }
399 _ => unreachable!(),
400 }
401 }
402
403 Ok(())
404}
405
406#[cfg(not(all(target_arch = "wasm32", feature = "wasm")))]
407#[inline]
408pub fn vwap_into(input: &VwapInput, out: &mut [f64]) -> Result<(), VwapError> {
409 vwap_into_slice(out, input, Kernel::Auto)
410}
411
412#[inline(always)]
413pub fn vwap_scalar(
414 timestamps: &[i64],
415 volumes: &[f64],
416 prices: &[f64],
417 count: u32,
418 unit_char: char,
419 out: &mut [f64],
420) -> Result<(), VwapError> {
421 debug_assert_eq!(out.len(), prices.len(), "output slice length mismatch");
422
423 let mut current_group_id: i64 = i64::MIN;
424 let mut volume_sum: f64 = 0.0;
425 let mut vol_price_sum: f64 = 0.0;
426
427 let n = prices.len();
428 if n == 0 {
429 return Ok(());
430 }
431
432 unsafe {
433 let ts_ptr = timestamps.as_ptr();
434 let vol_ptr = volumes.as_ptr();
435 let pr_ptr = prices.as_ptr();
436 let out_ptr = out.as_mut_ptr();
437
438 if unit_char == 'm' || unit_char == 'h' || unit_char == 'd' {
439 const MINUTE_MS: i64 = 60_000;
440 const HOUR_MS: i64 = 3_600_000;
441 const DAY_MS: i64 = 86_400_000;
442
443 let unit_ms: i64 = match unit_char {
444 'm' => MINUTE_MS,
445 'h' => HOUR_MS,
446 _ => DAY_MS,
447 };
448 let bucket_ms: i64 = (count as i64) * unit_ms;
449
450 let mut i: usize = 0;
451 let unroll_end = n & !1usize;
452 let mut window_start: i64 = 0;
453 let mut next_cutoff: i64 = i64::MIN;
454
455 while i < unroll_end {
456 let ts0 = *ts_ptr.add(i);
457 if ts0 >= 0 {
458 if ts0 >= next_cutoff || ts0 < window_start {
459 let gid0 = ts0 / bucket_ms;
460 current_group_id = gid0;
461 window_start = gid0.saturating_mul(bucket_ms);
462 next_cutoff = window_start.saturating_add(bucket_ms);
463 volume_sum = 0.0;
464 vol_price_sum = 0.0;
465 }
466 } else {
467 let gid0 = ts0 / bucket_ms;
468 if gid0 != current_group_id {
469 current_group_id = gid0;
470 window_start = gid0.saturating_mul(bucket_ms);
471 next_cutoff = window_start.saturating_add(bucket_ms);
472 volume_sum = 0.0;
473 vol_price_sum = 0.0;
474 }
475 }
476 let v0 = *vol_ptr.add(i);
477 let p0 = *pr_ptr.add(i);
478 volume_sum += v0;
479 vol_price_sum = p0.mul_add(v0, vol_price_sum);
480 *out_ptr.add(i) = if volume_sum > 0.0 {
481 vol_price_sum / volume_sum
482 } else {
483 f64::NAN
484 };
485
486 let idx1 = i + 1;
487 let ts1 = *ts_ptr.add(idx1);
488 if ts1 >= 0 {
489 if ts1 >= next_cutoff || ts1 < window_start {
490 let gid1 = ts1 / bucket_ms;
491 current_group_id = gid1;
492 window_start = gid1.saturating_mul(bucket_ms);
493 next_cutoff = window_start.saturating_add(bucket_ms);
494 volume_sum = 0.0;
495 vol_price_sum = 0.0;
496 }
497 } else {
498 let gid1 = ts1 / bucket_ms;
499 if gid1 != current_group_id {
500 current_group_id = gid1;
501 window_start = gid1.saturating_mul(bucket_ms);
502 next_cutoff = window_start.saturating_add(bucket_ms);
503 volume_sum = 0.0;
504 vol_price_sum = 0.0;
505 }
506 }
507 let v1 = *vol_ptr.add(idx1);
508 let p1 = *pr_ptr.add(idx1);
509 volume_sum += v1;
510 vol_price_sum = p1.mul_add(v1, vol_price_sum);
511 *out_ptr.add(idx1) = if volume_sum > 0.0 {
512 vol_price_sum / volume_sum
513 } else {
514 f64::NAN
515 };
516
517 i += 2;
518 }
519
520 if unroll_end != n {
521 let ts = *ts_ptr.add(unroll_end);
522 if ts >= 0 {
523 if ts >= next_cutoff || ts < window_start {
524 let gid = ts / bucket_ms;
525 current_group_id = gid;
526 window_start = gid.saturating_mul(bucket_ms);
527 next_cutoff = window_start.saturating_add(bucket_ms);
528 volume_sum = 0.0;
529 vol_price_sum = 0.0;
530 }
531 } else {
532 let gid = ts / bucket_ms;
533 if gid != current_group_id {
534 current_group_id = gid;
535 window_start = gid.saturating_mul(bucket_ms);
536 next_cutoff = window_start.saturating_add(bucket_ms);
537 volume_sum = 0.0;
538 vol_price_sum = 0.0;
539 }
540 }
541 let v = *vol_ptr.add(unroll_end);
542 let p = *pr_ptr.add(unroll_end);
543 volume_sum += v;
544 vol_price_sum = p.mul_add(v, vol_price_sum);
545 *out_ptr.add(unroll_end) = if volume_sum > 0.0 {
546 vol_price_sum / volume_sum
547 } else {
548 f64::NAN
549 };
550 }
551
552 return Ok(());
553 }
554
555 if unit_char == 'M' {
556 let mut i: usize = 0;
557 while i < n {
558 let ts = *ts_ptr.add(i);
559 let gid = match floor_to_month(ts, count) {
560 Ok(g) => g,
561 Err(_) => return Err(VwapError::MonthConversionError { ts_ms: ts }),
562 };
563 if gid != current_group_id {
564 current_group_id = gid;
565 volume_sum = 0.0;
566 vol_price_sum = 0.0;
567 }
568 let v = *vol_ptr.add(i);
569 let p = *pr_ptr.add(i);
570 volume_sum += v;
571 vol_price_sum = p.mul_add(v, vol_price_sum);
572 *out_ptr.add(i) = if volume_sum > 0.0 {
573 vol_price_sum / volume_sum
574 } else {
575 f64::NAN
576 };
577 i += 1;
578 }
579 return Ok(());
580 }
581 }
582
583 Err(VwapError::UnsupportedAnchorUnit { unit_char })
584}
585
586#[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
587#[inline(always)]
588pub unsafe fn vwap_avx2(
589 timestamps: &[i64],
590 volumes: &[f64],
591 prices: &[f64],
592 count: u32,
593 unit_char: char,
594 out: &mut [f64],
595) -> Result<(), VwapError> {
596 vwap_scalar(timestamps, volumes, prices, count, unit_char, out)
597}
598
599#[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
600#[inline(always)]
601pub unsafe fn vwap_avx512(
602 timestamps: &[i64],
603 volumes: &[f64],
604 prices: &[f64],
605 count: u32,
606 unit_char: char,
607 out: &mut [f64],
608) -> Result<(), VwapError> {
609 vwap_scalar(timestamps, volumes, prices, count, unit_char, out)
610}
611
612#[derive(Debug, Clone)]
613pub struct VwapStream {
614 anchor: String,
615 count: u32,
616 unit_char: char,
617
618 bucket_ms: i64,
619 next_cutoff: i64,
620 current_group_id: i64,
621
622 volume_sum: f64,
623 vol_price_sum: f64,
624}
625
626impl VwapStream {
627 #[inline]
628 pub fn try_new(params: VwapParams) -> Result<Self, VwapError> {
629 let anchor = params.anchor.unwrap_or_else(|| "1d".to_string());
630 let (count, unit_char) = parse_anchor(&anchor)
631 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
632
633 const MINUTE_MS: i64 = 60_000;
634 const HOUR_MS: i64 = 3_600_000;
635 const DAY_MS: i64 = 86_400_000;
636
637 let bucket_ms = match unit_char {
638 'm' => (count as i64).saturating_mul(MINUTE_MS),
639 'h' => (count as i64).saturating_mul(HOUR_MS),
640 'd' => (count as i64).saturating_mul(DAY_MS),
641 'M' => 0,
642 _ => return Err(VwapError::UnsupportedAnchorUnit { unit_char }),
643 };
644
645 Ok(Self {
646 anchor,
647 count,
648 unit_char,
649 bucket_ms,
650 next_cutoff: i64::MIN,
651 current_group_id: i64::MIN,
652 volume_sum: 0.0,
653 vol_price_sum: 0.0,
654 })
655 }
656
657 #[inline(always)]
658 pub fn update(&mut self, timestamp: i64, price: f64, volume: f64) -> Option<f64> {
659 match self.unit_char {
660 'm' | 'h' | 'd' => self.update_linear(timestamp, price, volume),
661 'M' => self.update_month(timestamp, price, volume),
662 _ => None,
663 }
664 }
665
666 #[inline(always)]
667 fn init_linear(&mut self, ts: i64) {
668 let gid = ts / self.bucket_ms;
669 self.current_group_id = gid;
670
671 self.next_cutoff = gid.saturating_add(1).saturating_mul(self.bucket_ms);
672 self.volume_sum = 0.0;
673 self.vol_price_sum = 0.0;
674 }
675
676 #[inline(always)]
677 fn roll_linear_to(&mut self, ts: i64) {
678 let delta = ts.saturating_sub(self.next_cutoff);
679 let k = (delta / self.bucket_ms).saturating_add(1);
680 self.current_group_id = self.current_group_id.saturating_add(k);
681 self.next_cutoff = self
682 .next_cutoff
683 .saturating_add(self.bucket_ms.saturating_mul(k));
684 self.volume_sum = 0.0;
685 self.vol_price_sum = 0.0;
686 }
687
688 #[inline(always)]
689 fn update_linear(&mut self, ts: i64, price: f64, volume: f64) -> Option<f64> {
690 debug_assert!(self.bucket_ms > 0);
691 if self.current_group_id == i64::MIN {
692 self.init_linear(ts);
693 } else if ts >= self.next_cutoff {
694 self.roll_linear_to(ts);
695 }
696
697 self.volume_sum += volume;
698 self.vol_price_sum = price.mul_add(volume, self.vol_price_sum);
699
700 if self.volume_sum > 0.0 {
701 Some(self.vol_price_sum / self.volume_sum)
702 } else {
703 None
704 }
705 }
706
707 #[inline]
708 fn month_gid_and_next_cutoff(&self, ts_ms: i64) -> Result<(i64, i64), VwapError> {
709 let seconds = ts_ms / 1000;
710 let nanos = ((ts_ms % 1000) * 1_000_000) as u32;
711 let dt = NaiveDateTime::from_timestamp_opt(seconds, nanos)
712 .ok_or_else(|| VwapError::MonthConversionError { ts_ms })?;
713
714 let year = dt.year();
715 let month = dt.month() as i32;
716 let total_months = (year - 1970) as i64 * 12 + (month - 1) as i64;
717 let gid = total_months / (self.count as i64);
718
719 let next_bucket_months = gid.saturating_add(1).saturating_mul(self.count as i64);
720
721 let next_year = 1970 + next_bucket_months.div_euclid(12);
722 let next_month0 = next_bucket_months.rem_euclid(12);
723 let next_date = NaiveDate::from_ymd_opt(next_year as i32, (next_month0 + 1) as u32, 1)
724 .ok_or_else(|| VwapError::MonthConversionError { ts_ms })?;
725 let next_dt = next_date
726 .and_hms_opt(0, 0, 0)
727 .ok_or_else(|| VwapError::MonthConversionError { ts_ms })?;
728
729 let next_ms = next_dt.timestamp().saturating_mul(1000);
730
731 Ok((gid, next_ms))
732 }
733
734 #[inline(always)]
735 fn init_month(&mut self, ts: i64) -> Option<()> {
736 match self.month_gid_and_next_cutoff(ts) {
737 Ok((gid, next_ms)) => {
738 self.current_group_id = gid;
739 self.next_cutoff = next_ms;
740 self.volume_sum = 0.0;
741 self.vol_price_sum = 0.0;
742 Some(())
743 }
744 Err(_) => None,
745 }
746 }
747
748 #[inline(always)]
749 fn update_month(&mut self, ts: i64, price: f64, volume: f64) -> Option<f64> {
750 if self.current_group_id == i64::MIN {
751 if self.init_month(ts).is_none() {
752 return None;
753 }
754 } else if ts >= self.next_cutoff {
755 match self.month_gid_and_next_cutoff(ts) {
756 Ok((gid, next_ms)) => {
757 self.current_group_id = gid;
758 self.next_cutoff = next_ms;
759 self.volume_sum = 0.0;
760 self.vol_price_sum = 0.0;
761 }
762 Err(_) => return None,
763 }
764 }
765
766 self.volume_sum += volume;
767 self.vol_price_sum = price.mul_add(volume, self.vol_price_sum);
768
769 if self.volume_sum > 0.0 {
770 Some(self.vol_price_sum / self.volume_sum)
771 } else {
772 None
773 }
774 }
775}
776
777#[derive(Clone, Debug)]
778pub struct VwapBatchRange {
779 pub anchor: (String, String, u32),
780}
781
782impl Default for VwapBatchRange {
783 fn default() -> Self {
784 Self {
785 anchor: ("1d".to_string(), "250d".to_string(), 1),
786 }
787 }
788}
789
790#[derive(Clone, Debug, Default)]
791pub struct VwapBatchBuilder {
792 range: VwapBatchRange,
793 kernel: Kernel,
794}
795
796impl VwapBatchBuilder {
797 pub fn new() -> Self {
798 Self::default()
799 }
800 pub fn kernel(mut self, k: Kernel) -> Self {
801 self.kernel = k;
802 self
803 }
804 #[inline]
805 pub fn anchor_range(
806 mut self,
807 start: impl Into<String>,
808 end: impl Into<String>,
809 step: u32,
810 ) -> Self {
811 self.range.anchor = (start.into(), end.into(), step);
812 self
813 }
814 #[inline]
815 pub fn anchor_static(mut self, val: impl Into<String>) -> Self {
816 let s = val.into();
817 self.range.anchor = (s.clone(), s, 0);
818 self
819 }
820 pub fn apply_slice(
821 self,
822 timestamps: &[i64],
823 volumes: &[f64],
824 prices: &[f64],
825 ) -> Result<VwapBatchOutput, VwapError> {
826 vwap_batch_with_kernel(timestamps, volumes, prices, &self.range, self.kernel)
827 }
828}
829
830pub fn vwap_batch_with_kernel(
831 timestamps: &[i64],
832 volumes: &[f64],
833 prices: &[f64],
834 sweep: &VwapBatchRange,
835 k: Kernel,
836) -> Result<VwapBatchOutput, VwapError> {
837 let kernel = match k {
838 Kernel::Auto => detect_best_batch_kernel(),
839 other if other.is_batch() => other,
840 other => return Err(VwapError::InvalidKernelForBatch(other)),
841 };
842
843 let simd = match kernel {
844 Kernel::Avx512Batch => Kernel::Avx512,
845 Kernel::Avx2Batch => Kernel::Avx2,
846 Kernel::ScalarBatch => Kernel::Scalar,
847 _ => unreachable!(),
848 };
849
850 vwap_batch_inner(timestamps, volumes, prices, sweep, simd, true)
851}
852
853#[derive(Clone, Debug)]
854pub struct VwapBatchOutput {
855 pub values: Vec<f64>,
856 pub combos: Vec<VwapParams>,
857 pub rows: usize,
858 pub cols: usize,
859}
860
861impl VwapBatchOutput {
862 pub fn row_for_anchor(&self, a: &str) -> Option<usize> {
863 self.combos
864 .iter()
865 .position(|p| p.anchor.as_deref() == Some(a))
866 }
867
868 pub fn values_for(&self, a: &str) -> Option<&[f64]> {
869 self.row_for_anchor(a).map(|row| {
870 let start = row * self.cols;
871 &self.values[start..start + self.cols]
872 })
873 }
874}
875
876#[inline(always)]
877pub(crate) fn expand_grid_vwap(r: &VwapBatchRange) -> Vec<VwapParams> {
878 if r.anchor.2 == 0 || r.anchor.0 == r.anchor.1 {
879 return vec![VwapParams {
880 anchor: Some(r.anchor.0.clone()),
881 }];
882 }
883
884 let step = r.anchor.2.max(1);
885 let start = anchor_to_num_and_unit(&r.anchor.0);
886 let end = anchor_to_num_and_unit(&r.anchor.1);
887 let mut result = Vec::new();
888 if let (Some((mut a, unit_a)), Some((b, unit_b))) = (start, end) {
889 if unit_a != unit_b {
890 return result;
891 }
892 if a <= b {
893 while a <= b {
894 result.push(VwapParams {
895 anchor: Some(format!("{}{}", a, unit_a)),
896 });
897 a = a.saturating_add(step);
898 }
899 } else {
900 while a >= b {
901 result.push(VwapParams {
902 anchor: Some(format!("{}{}", a, unit_a)),
903 });
904 if a < step {
905 break;
906 }
907 a -= step;
908 }
909 }
910 }
911 result
912}
913
914fn anchor_to_num_and_unit(anchor: &str) -> Option<(u32, char)> {
915 let mut idx = 0;
916 for (pos, ch) in anchor.char_indices() {
917 if !ch.is_ascii_digit() {
918 idx = pos;
919 break;
920 }
921 }
922 if idx == 0 {
923 return None;
924 }
925 let num = anchor[..idx].parse::<u32>().ok()?;
926 let unit = anchor[idx..].chars().next()?;
927 Some((num, unit))
928}
929
930#[inline]
931pub(crate) fn first_valid_vwap_index(
932 timestamps: &[i64],
933 volumes: &[f64],
934 count: u32,
935 unit: char,
936) -> usize {
937 if timestamps.is_empty() {
938 return 0;
939 }
940 let mut cur_gid = i64::MIN;
941 let mut vsum = 0.0;
942 for i in 0..timestamps.len() {
943 let ts = timestamps[i];
944 let gid = match unit {
945 'm' => ts / ((count as i64) * 60_000),
946 'h' => ts / ((count as i64) * 3_600_000),
947 'd' => ts / ((count as i64) * 86_400_000),
948 'M' => floor_to_month(ts, count).unwrap_or(i64::MIN),
949 _ => i64::MIN,
950 };
951 if gid != cur_gid {
952 cur_gid = gid;
953 vsum = 0.0;
954 }
955 vsum += volumes[i];
956 if vsum > 0.0 {
957 return i;
958 }
959 }
960 0
961}
962
963#[inline(always)]
964pub fn vwap_batch_slice(
965 timestamps: &[i64],
966 volumes: &[f64],
967 prices: &[f64],
968 sweep: &VwapBatchRange,
969 kern: Kernel,
970) -> Result<VwapBatchOutput, VwapError> {
971 vwap_batch_inner(timestamps, volumes, prices, sweep, kern, false)
972}
973
974#[inline(always)]
975pub fn vwap_batch_par_slice(
976 timestamps: &[i64],
977 volumes: &[f64],
978 prices: &[f64],
979 sweep: &VwapBatchRange,
980 kern: Kernel,
981) -> Result<VwapBatchOutput, VwapError> {
982 vwap_batch_inner(timestamps, volumes, prices, sweep, kern, true)
983}
984
985fn vwap_batch_inner(
986 timestamps: &[i64],
987 volumes: &[f64],
988 prices: &[f64],
989 sweep: &VwapBatchRange,
990 kern: Kernel,
991 parallel: bool,
992) -> Result<VwapBatchOutput, VwapError> {
993 let combos = expand_grid_vwap(sweep);
994 if combos.is_empty() {
995 return Err(VwapError::InvalidRange {
996 start: sweep.anchor.0.clone(),
997 end: sweep.anchor.1.clone(),
998 step: sweep.anchor.2,
999 });
1000 }
1001
1002 let rows = combos.len();
1003 let cols = prices.len();
1004
1005 let mut raw = make_uninit_matrix(rows, cols);
1006
1007 let mut warm: Vec<usize> = Vec::with_capacity(rows);
1008 for prm in &combos {
1009 let (cnt, unit) = parse_anchor(prm.anchor.as_deref().unwrap_or("1d"))
1010 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })?;
1011 let w = first_valid_vwap_index(timestamps, volumes, cnt, unit);
1012 warm.push(w);
1013 }
1014 init_matrix_prefixes(&mut raw, cols, &warm);
1015
1016 let pv: Vec<f64> = prices
1017 .iter()
1018 .zip(volumes.iter())
1019 .map(|(&p, &v)| p * v)
1020 .collect();
1021
1022 let do_row = |row: usize, dst_mu: &mut [MaybeUninit<f64>]| unsafe {
1023 let params = combos.get(row).unwrap();
1024 let (count, unit_char) = parse_anchor(params.anchor.as_deref().unwrap_or("1d"))
1025 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })
1026 .unwrap();
1027
1028 let out_row =
1029 core::slice::from_raw_parts_mut(dst_mu.as_mut_ptr() as *mut f64, dst_mu.len());
1030
1031 match kern {
1032 Kernel::Scalar => {
1033 vwap_row_scalar_pv(timestamps, volumes, &pv, count, unit_char, out_row)
1034 }
1035 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
1036 Kernel::Avx2 => vwap_row_avx2(timestamps, volumes, prices, count, unit_char, out_row),
1037 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
1038 Kernel::Avx512 => {
1039 vwap_row_avx512(timestamps, volumes, prices, count, unit_char, out_row)
1040 }
1041 _ => unreachable!(),
1042 }
1043 };
1044
1045 if parallel {
1046 #[cfg(not(target_arch = "wasm32"))]
1047 {
1048 raw.par_chunks_mut(cols)
1049 .enumerate()
1050 .for_each(|(row, slice)| do_row(row, slice));
1051 }
1052
1053 #[cfg(target_arch = "wasm32")]
1054 {
1055 for (row, slice) in raw.chunks_mut(cols).enumerate() {
1056 do_row(row, slice);
1057 }
1058 }
1059 } else {
1060 for (row, slice) in raw.chunks_mut(cols).enumerate() {
1061 do_row(row, slice);
1062 }
1063 }
1064
1065 let mut raw_guard = core::mem::ManuallyDrop::new(raw);
1066 let values: Vec<f64> = unsafe {
1067 Vec::from_raw_parts(
1068 raw_guard.as_mut_ptr() as *mut f64,
1069 raw_guard.len(),
1070 raw_guard.capacity(),
1071 )
1072 };
1073
1074 Ok(VwapBatchOutput {
1075 values,
1076 combos,
1077 rows,
1078 cols,
1079 })
1080}
1081
1082#[inline(always)]
1083fn vwap_batch_inner_into(
1084 timestamps: &[i64],
1085 volumes: &[f64],
1086 prices: &[f64],
1087 sweep: &VwapBatchRange,
1088 kern: Kernel,
1089 parallel: bool,
1090 out: &mut [f64],
1091) -> Result<Vec<VwapParams>, VwapError> {
1092 if timestamps.len() != volumes.len() || volumes.len() != prices.len() {
1093 return Err(VwapError::MismatchTimestampsPricesVolumes {
1094 timestamps: timestamps.len(),
1095 prices: prices.len(),
1096 volumes: volumes.len(),
1097 });
1098 }
1099 let combos = expand_grid_vwap(sweep);
1100 if combos.is_empty() {
1101 return Err(VwapError::InvalidRange {
1102 start: sweep.anchor.0.clone(),
1103 end: sweep.anchor.1.clone(),
1104 step: sweep.anchor.2,
1105 });
1106 }
1107
1108 let rows = combos.len();
1109 let cols = prices.len();
1110
1111 let expected = rows
1112 .checked_mul(cols)
1113 .ok_or_else(|| VwapError::InvalidRange {
1114 start: sweep.anchor.0.clone(),
1115 end: sweep.anchor.1.clone(),
1116 step: sweep.anchor.2,
1117 })?;
1118 if out.len() != expected {
1119 return Err(VwapError::OutputLengthMismatch {
1120 expected,
1121 got: out.len(),
1122 });
1123 }
1124
1125 let pv: Vec<f64> = prices
1126 .iter()
1127 .zip(volumes.iter())
1128 .map(|(&p, &v)| p * v)
1129 .collect();
1130
1131 let do_row = |row: usize, dst: &mut [f64]| unsafe {
1132 let params = combos.get(row).unwrap();
1133 let (count, unit_char) = parse_anchor(params.anchor.as_deref().unwrap_or("1d"))
1134 .map_err(|e| VwapError::ParseAnchorError { msg: e.to_string() })
1135 .unwrap();
1136
1137 let out_row = dst;
1138
1139 match kern {
1140 Kernel::Scalar => {
1141 vwap_row_scalar_pv(timestamps, volumes, &pv, count, unit_char, out_row)
1142 }
1143 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
1144 Kernel::Avx2 => vwap_row_avx2(timestamps, volumes, prices, count, unit_char, out_row),
1145 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
1146 Kernel::Avx512 => {
1147 vwap_row_avx512(timestamps, volumes, prices, count, unit_char, out_row)
1148 }
1149 _ => unreachable!(),
1150 }
1151 };
1152
1153 if parallel {
1154 #[cfg(not(target_arch = "wasm32"))]
1155 {
1156 out.par_chunks_mut(cols)
1157 .enumerate()
1158 .for_each(|(row, slice)| do_row(row, slice));
1159 }
1160
1161 #[cfg(target_arch = "wasm32")]
1162 {
1163 for (row, slice) in out.chunks_mut(cols).enumerate() {
1164 do_row(row, slice);
1165 }
1166 }
1167 } else {
1168 for (row, slice) in out.chunks_mut(cols).enumerate() {
1169 do_row(row, slice);
1170 }
1171 }
1172
1173 Ok(combos)
1174}
1175
1176#[inline]
1177pub(crate) fn parse_anchor(anchor: &str) -> Result<(u32, char), Box<dyn std::error::Error>> {
1178 let mut idx = 0;
1179 for (pos, ch) in anchor.char_indices() {
1180 if !ch.is_ascii_digit() {
1181 idx = pos;
1182 break;
1183 }
1184 }
1185 if idx == 0 {
1186 return Err(format!("No numeric portion found in anchor '{}'", anchor).into());
1187 }
1188 let num_part = &anchor[..idx];
1189 let unit_part = &anchor[idx..];
1190 let count = num_part
1191 .parse::<u32>()
1192 .map_err(|_| format!("Failed parsing numeric portion '{}'", num_part))?;
1193 if unit_part.len() != 1 {
1194 return Err(format!("Anchor unit must be 1 char (found '{}')", unit_part).into());
1195 }
1196 let mut unit_char = unit_part.chars().next().unwrap();
1197 unit_char = match unit_char {
1198 'H' => 'h',
1199 'D' => 'd',
1200 c => c,
1201 };
1202 match unit_char {
1203 'm' | 'h' | 'd' | 'M' => Ok((count, unit_char)),
1204 _ => Err(format!("Unsupported unit '{}'", unit_char).into()),
1205 }
1206}
1207
1208#[inline]
1209pub(crate) fn floor_to_month(ts_ms: i64, count: u32) -> Result<i64, Box<dyn Error>> {
1210 let seconds = ts_ms / 1000;
1211 let nanos = ((ts_ms % 1000) * 1_000_000) as u32;
1212
1213 let dt = NaiveDateTime::from_timestamp_opt(seconds, nanos)
1214 .ok_or_else(|| format!("Invalid timestamp: {}", ts_ms))?;
1215
1216 let year = dt.year();
1217 let month = dt.month() as i32;
1218 let total_months = (year - 1970) as i64 * 12 + (month - 1) as i64;
1219
1220 if count == 1 {
1221 Ok(total_months)
1222 } else {
1223 Ok(total_months / (count as i64))
1224 }
1225}
1226
1227#[inline(always)]
1228pub unsafe fn vwap_row_scalar(
1229 timestamps: &[i64],
1230 volumes: &[f64],
1231 prices: &[f64],
1232 count: u32,
1233 unit_char: char,
1234 out: &mut [f64],
1235) {
1236 vwap_scalar(timestamps, volumes, prices, count, unit_char, out);
1237}
1238
1239#[inline(always)]
1240unsafe fn vwap_row_scalar_pv(
1241 timestamps: &[i64],
1242 volumes: &[f64],
1243 pv: &[f64],
1244 count: u32,
1245 unit_char: char,
1246 out: &mut [f64],
1247) {
1248 debug_assert_eq!(pv.len(), out.len(), "pv length must match out len");
1249
1250 let n = out.len();
1251 if n == 0 {
1252 return;
1253 }
1254 let mut current_group_id: i64 = i64::MIN;
1255 let mut volume_sum: f64 = 0.0;
1256 let mut vol_price_sum: f64 = 0.0;
1257
1258 let ts_ptr = timestamps.as_ptr();
1259 let vol_ptr = volumes.as_ptr();
1260 let pv_ptr = pv.as_ptr();
1261 let out_ptr = out.as_mut_ptr();
1262
1263 if unit_char == 'm' || unit_char == 'h' || unit_char == 'd' {
1264 const MINUTE_MS: i64 = 60_000;
1265 const HOUR_MS: i64 = 3_600_000;
1266 const DAY_MS: i64 = 86_400_000;
1267 let unit_ms: i64 = match unit_char {
1268 'm' => MINUTE_MS,
1269 'h' => HOUR_MS,
1270 _ => DAY_MS,
1271 };
1272 let bucket_ms: i64 = (count as i64) * unit_ms;
1273
1274 let mut i: usize = 0;
1275 let unroll_end = n & !1usize;
1276 while i < unroll_end {
1277 let ts0 = *ts_ptr.add(i);
1278 let gid0 = ts0 / bucket_ms;
1279 if gid0 != current_group_id {
1280 current_group_id = gid0;
1281 volume_sum = 0.0;
1282 vol_price_sum = 0.0;
1283 }
1284 let v0 = *vol_ptr.add(i);
1285 let pv0 = *pv_ptr.add(i);
1286 volume_sum += v0;
1287 vol_price_sum += pv0;
1288 *out_ptr.add(i) = if volume_sum > 0.0 {
1289 vol_price_sum / volume_sum
1290 } else {
1291 f64::NAN
1292 };
1293
1294 let idx1 = i + 1;
1295 let ts1 = *ts_ptr.add(idx1);
1296 let gid1 = ts1 / bucket_ms;
1297 if gid1 != current_group_id {
1298 current_group_id = gid1;
1299 volume_sum = 0.0;
1300 vol_price_sum = 0.0;
1301 }
1302 let v1 = *vol_ptr.add(idx1);
1303 let pv1 = *pv_ptr.add(idx1);
1304 volume_sum += v1;
1305 vol_price_sum += pv1;
1306 *out_ptr.add(idx1) = if volume_sum > 0.0 {
1307 vol_price_sum / volume_sum
1308 } else {
1309 f64::NAN
1310 };
1311
1312 i += 2;
1313 }
1314 if unroll_end != n {
1315 let ts = *ts_ptr.add(unroll_end);
1316 let gid = ts / bucket_ms;
1317 if gid != current_group_id {
1318 current_group_id = gid;
1319 volume_sum = 0.0;
1320 vol_price_sum = 0.0;
1321 }
1322 let v = *vol_ptr.add(unroll_end);
1323 let pvx = *pv_ptr.add(unroll_end);
1324 volume_sum += v;
1325 vol_price_sum += pvx;
1326 *out_ptr.add(unroll_end) = if volume_sum > 0.0 {
1327 vol_price_sum / volume_sum
1328 } else {
1329 f64::NAN
1330 };
1331 }
1332 return;
1333 }
1334
1335 if unit_char == 'M' {
1336 let mut i: usize = 0;
1337 while i < n {
1338 let ts = *ts_ptr.add(i);
1339 let gid = match floor_to_month(ts, count) {
1340 Ok(g) => g,
1341 Err(_) => return,
1342 };
1343 if gid != current_group_id {
1344 current_group_id = gid;
1345 volume_sum = 0.0;
1346 vol_price_sum = 0.0;
1347 }
1348 let v = *vol_ptr.add(i);
1349 let pvx = *pv_ptr.add(i);
1350 volume_sum += v;
1351 vol_price_sum += pvx;
1352 *out_ptr.add(i) = if volume_sum > 0.0 {
1353 vol_price_sum / volume_sum
1354 } else {
1355 f64::NAN
1356 };
1357 i += 1;
1358 }
1359 return;
1360 }
1361}
1362
1363#[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
1364#[inline(always)]
1365pub unsafe fn vwap_row_avx2(
1366 timestamps: &[i64],
1367 volumes: &[f64],
1368 prices: &[f64],
1369 count: u32,
1370 unit_char: char,
1371 out: &mut [f64],
1372) {
1373 vwap_row_scalar(timestamps, volumes, prices, count, unit_char, out);
1374}
1375
1376#[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
1377#[inline(always)]
1378pub unsafe fn vwap_row_avx512(
1379 timestamps: &[i64],
1380 volumes: &[f64],
1381 prices: &[f64],
1382 count: u32,
1383 unit_char: char,
1384 out: &mut [f64],
1385) {
1386 vwap_row_scalar(timestamps, volumes, prices, count, unit_char, out);
1387}
1388
1389#[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
1390#[inline(always)]
1391pub unsafe fn vwap_row_avx512_short(
1392 timestamps: &[i64],
1393 volumes: &[f64],
1394 prices: &[f64],
1395 count: u32,
1396 unit_char: char,
1397 out: &mut [f64],
1398) {
1399 vwap_row_scalar(timestamps, volumes, prices, count, unit_char, out);
1400}
1401
1402#[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
1403#[inline(always)]
1404pub unsafe fn vwap_row_avx512_long(
1405 timestamps: &[i64],
1406 volumes: &[f64],
1407 prices: &[f64],
1408 count: u32,
1409 unit_char: char,
1410 out: &mut [f64],
1411) {
1412 vwap_row_scalar(timestamps, volumes, prices, count, unit_char, out);
1413}
1414
1415#[inline(always)]
1416fn expand_grid(r: &VwapBatchRange) -> Vec<VwapParams> {
1417 expand_grid_vwap(r)
1418}
1419
1420#[cfg(test)]
1421mod tests {
1422 use super::*;
1423 use crate::skip_if_unsupported;
1424 use crate::utilities::data_loader::read_candles_from_csv;
1425
1426 fn check_vwap_partial_params(test_name: &str, kernel: Kernel) -> Result<(), Box<dyn Error>> {
1427 skip_if_unsupported!(kernel, test_name);
1428 let file_path = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1429 let candles = read_candles_from_csv(file_path)?;
1430 let params_default = VwapParams { anchor: None };
1431 let input_default = VwapInput::from_candles(&candles, "hlc3", params_default);
1432 let output_default = vwap_with_kernel(&input_default, kernel)?;
1433 assert_eq!(output_default.values.len(), candles.close.len());
1434 Ok(())
1435 }
1436
1437 #[test]
1438 fn test_vwap_into_matches_api() -> Result<(), Box<dyn Error>> {
1439 let file_path = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1440 let candles = read_candles_from_csv(file_path)?;
1441
1442 let timestamps = candles.get_timestamp().map_err(|e| e.to_string())?;
1443 let volumes = candles
1444 .select_candle_field("volume")
1445 .map_err(|e| e.to_string())?;
1446 let prices = source_type(&candles, "hlc3");
1447
1448 let take = 512usize.min(prices.len());
1449 let start = prices.len().saturating_sub(take);
1450 let ts_slice = ×tamps[start..start + take];
1451 let vol_slice = &volumes[start..start + take];
1452 let price_slice = &prices[start..start + take];
1453
1454 let params = VwapParams { anchor: None };
1455 let input = VwapInput::from_slice(ts_slice, vol_slice, price_slice, params);
1456
1457 let baseline = vwap(&input)?.values;
1458
1459 let mut out = vec![0.0; price_slice.len()];
1460 vwap_into(&input, &mut out)?;
1461
1462 assert_eq!(baseline.len(), out.len());
1463
1464 fn eq_or_both_nan(a: f64, b: f64) -> bool {
1465 (a.is_nan() && b.is_nan()) || (a == b) || ((a - b).abs() <= 1e-12)
1466 }
1467
1468 for i in 0..out.len() {
1469 assert!(
1470 eq_or_both_nan(baseline[i], out[i]),
1471 "Mismatch at {}: baseline={}, into={}",
1472 i,
1473 baseline[i],
1474 out[i]
1475 );
1476 }
1477
1478 Ok(())
1479 }
1480
1481 fn check_vwap_accuracy(test_name: &str, kernel: Kernel) -> Result<(), Box<dyn Error>> {
1482 skip_if_unsupported!(kernel, test_name);
1483 let expected_last_five_vwap = [
1484 59353.05963230107,
1485 59330.15815713043,
1486 59289.94649532547,
1487 59274.6155462414,
1488 58730.0,
1489 ];
1490 let file_path: &str = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1491 let candles = read_candles_from_csv(file_path)?;
1492 let params = VwapParams {
1493 anchor: Some("1D".to_string()),
1494 };
1495 let input = VwapInput::from_candles(&candles, "hlc3", params);
1496 let result = vwap_with_kernel(&input, kernel)?;
1497 assert!(result.values.len() >= 5, "Not enough data points for test");
1498 let start_idx = result.values.len() - 5;
1499 for (i, &vwap_val) in result.values[start_idx..].iter().enumerate() {
1500 let exp_val = expected_last_five_vwap[i];
1501 assert!(
1502 (vwap_val - exp_val).abs() < 1e-5,
1503 "[{}] VWAP mismatch at index {}: expected {}, got {}",
1504 test_name,
1505 i,
1506 exp_val,
1507 vwap_val
1508 );
1509 }
1510 Ok(())
1511 }
1512
1513 fn check_vwap_candles_plus_prices(
1514 test_name: &str,
1515 kernel: Kernel,
1516 ) -> Result<(), Box<dyn Error>> {
1517 skip_if_unsupported!(kernel, test_name);
1518 let file_path = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1519 let candles = read_candles_from_csv(file_path)?;
1520 let source_prices = candles.get_calculated_field("hl2").unwrap();
1521 let params = VwapParams {
1522 anchor: Some("1d".to_string()),
1523 };
1524 let input = VwapInput::from_candles_plus_prices(&candles, source_prices, params);
1525 let result = vwap_with_kernel(&input, kernel)?;
1526 assert_eq!(result.values.len(), candles.close.len());
1527 Ok(())
1528 }
1529
1530 fn check_vwap_anchor_parsing_error(
1531 test_name: &str,
1532 kernel: Kernel,
1533 ) -> Result<(), Box<dyn Error>> {
1534 skip_if_unsupported!(kernel, test_name);
1535 let file_path = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1536 let candles = read_candles_from_csv(file_path)?;
1537 let params = VwapParams {
1538 anchor: Some("xyz".to_string()),
1539 };
1540 let input = VwapInput::from_candles(&candles, "hlc3", params);
1541 let result = vwap_with_kernel(&input, kernel);
1542 assert!(result.is_err());
1543 Ok(())
1544 }
1545
1546 fn check_vwap_slice_data_reinput(
1547 test_name: &str,
1548 kernel: Kernel,
1549 ) -> Result<(), Box<dyn Error>> {
1550 skip_if_unsupported!(kernel, test_name);
1551 let file_path = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1552 let candles = read_candles_from_csv(file_path)?;
1553 let first_params = VwapParams {
1554 anchor: Some("1d".to_string()),
1555 };
1556 let first_input = VwapInput::from_candles(&candles, "close", first_params);
1557 let first_result = vwap_with_kernel(&first_input, kernel)?;
1558 let second_params = VwapParams {
1559 anchor: Some("1h".to_string()),
1560 };
1561 let source_prices = &first_result.values;
1562 let second_input =
1563 VwapInput::from_candles_plus_prices(&candles, source_prices, second_params);
1564 let second_result = vwap_with_kernel(&second_input, kernel)?;
1565 assert_eq!(second_result.values.len(), first_result.values.len());
1566 Ok(())
1567 }
1568
1569 fn check_vwap_nan_handling(test_name: &str, kernel: Kernel) -> Result<(), Box<dyn Error>> {
1570 skip_if_unsupported!(kernel, test_name);
1571 let file_path = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1572 let candles = read_candles_from_csv(file_path)?;
1573 let input = VwapInput::with_default_candles(&candles);
1574 let result = vwap_with_kernel(&input, kernel)?;
1575 assert_eq!(result.values.len(), candles.close.len());
1576 for &val in &result.values {
1577 if !val.is_nan() {
1578 assert!(val.is_finite());
1579 }
1580 }
1581 Ok(())
1582 }
1583
1584 fn check_vwap_with_default_candles(
1585 test_name: &str,
1586 _kernel: Kernel,
1587 ) -> Result<(), Box<dyn Error>> {
1588 let file_path = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1589 let candles = read_candles_from_csv(file_path)?;
1590 let input = VwapInput::with_default_candles(&candles);
1591 match input.data {
1592 VwapData::Candles { source, .. } => {
1593 assert_eq!(source, "hlc3");
1594 }
1595 _ => panic!("Expected VwapData::Candles"),
1596 }
1597 let anchor = input.get_anchor();
1598 assert_eq!(anchor, "1d");
1599 Ok(())
1600 }
1601
1602 fn check_vwap_with_default_params(
1603 test_name: &str,
1604 _kernel: Kernel,
1605 ) -> Result<(), Box<dyn Error>> {
1606 let default_params = VwapParams::default();
1607 assert_eq!(default_params.anchor, Some("1d".to_string()));
1608 Ok(())
1609 }
1610
1611 macro_rules! generate_all_vwap_tests {
1612 ($($test_fn:ident),*) => {
1613 paste::paste! {
1614 $(
1615 #[test]
1616 fn [<$test_fn _scalar_f64>]() {
1617 let _ = $test_fn(stringify!([<$test_fn _scalar_f64>]), Kernel::Scalar);
1618 }
1619 )*
1620 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
1621 $(
1622 #[test]
1623 fn [<$test_fn _avx2_f64>]() {
1624 let _ = $test_fn(stringify!([<$test_fn _avx2_f64>]), Kernel::Avx2);
1625 }
1626 #[test]
1627 fn [<$test_fn _avx512_f64>]() {
1628 let _ = $test_fn(stringify!([<$test_fn _avx512_f64>]), Kernel::Avx512);
1629 }
1630 )*
1631 }
1632 }
1633 }
1634
1635 #[cfg(debug_assertions)]
1636 fn check_vwap_no_poison(test_name: &str, kernel: Kernel) -> Result<(), Box<dyn Error>> {
1637 skip_if_unsupported!(kernel, test_name);
1638
1639 let file_path = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1640 let candles = read_candles_from_csv(file_path)?;
1641
1642 let test_anchors = vec!["1m", "5m", "15m", "30m", "1h", "4h", "1d", "3d"];
1643 let test_sources = vec!["close", "open", "high", "low", "hl2", "hlc3", "ohlc4"];
1644
1645 for anchor in test_anchors {
1646 for source in &test_sources {
1647 let params = VwapParams {
1648 anchor: Some(anchor.to_string()),
1649 };
1650 let input = VwapInput::from_candles(&candles, source, params);
1651 let output = vwap_with_kernel(&input, kernel)?;
1652
1653 for (i, &val) in output.values.iter().enumerate() {
1654 if val.is_nan() {
1655 continue;
1656 }
1657
1658 let bits = val.to_bits();
1659
1660 if bits == 0x11111111_11111111 {
1661 panic!(
1662 "[{}] Found alloc_with_nan_prefix poison value {} (0x{:016X}) at index {} (anchor={}, source={})",
1663 test_name, val, bits, i, anchor, source
1664 );
1665 }
1666
1667 if bits == 0x22222222_22222222 {
1668 panic!(
1669 "[{}] Found init_matrix_prefixes poison value {} (0x{:016X}) at index {} (anchor={}, source={})",
1670 test_name, val, bits, i, anchor, source
1671 );
1672 }
1673
1674 if bits == 0x33333333_33333333 {
1675 panic!(
1676 "[{}] Found make_uninit_matrix poison value {} (0x{:016X}) at index {} (anchor={}, source={})",
1677 test_name, val, bits, i, anchor, source
1678 );
1679 }
1680 }
1681 }
1682 }
1683
1684 Ok(())
1685 }
1686
1687 #[cfg(not(debug_assertions))]
1688 fn check_vwap_no_poison(_test_name: &str, _kernel: Kernel) -> Result<(), Box<dyn Error>> {
1689 Ok(())
1690 }
1691
1692 #[cfg(feature = "proptest")]
1693 fn check_vwap_property(
1694 test_name: &str,
1695 kernel: Kernel,
1696 ) -> Result<(), Box<dyn std::error::Error>> {
1697 use proptest::prelude::*;
1698 skip_if_unsupported!(kernel, test_name);
1699
1700 let file_path = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1701 let candles = read_candles_from_csv(file_path)?;
1702
1703 let timestamps = candles.get_timestamp().unwrap();
1704 let volumes = candles.select_candle_field("volume").unwrap();
1705 let close_prices = &candles.close;
1706 let high_prices = &candles.high;
1707 let low_prices = &candles.low;
1708
1709 let anchor_periods = vec!["30m", "1h", "4h", "12h", "1d", "2d", "3d"];
1710
1711 let strat = (
1712 0usize..anchor_periods.len(),
1713 0usize..timestamps.len().saturating_sub(200),
1714 100usize..=200,
1715 );
1716
1717 proptest::test_runner::TestRunner::default()
1718 .run(&strat, |(anchor_idx, start_idx, slice_len)| {
1719 let end_idx = (start_idx + slice_len).min(timestamps.len());
1720 if end_idx <= start_idx || end_idx - start_idx < 10 {
1721 return Ok(());
1722 }
1723
1724 let anchor = anchor_periods[anchor_idx];
1725 let ts_slice = ×tamps[start_idx..end_idx];
1726 let vol_slice = &volumes[start_idx..end_idx];
1727 let price_slice = &close_prices[start_idx..end_idx];
1728 let high_slice = &high_prices[start_idx..end_idx];
1729 let low_slice = &low_prices[start_idx..end_idx];
1730
1731 let params = VwapParams {
1732 anchor: Some(anchor.to_string()),
1733 };
1734 let input = VwapInput::from_slice(ts_slice, vol_slice, price_slice, params.clone());
1735
1736 let result = vwap_with_kernel(&input, kernel);
1737
1738 let scalar_input =
1739 VwapInput::from_slice(ts_slice, vol_slice, price_slice, params.clone());
1740 let scalar_result = vwap_with_kernel(&scalar_input, Kernel::Scalar);
1741
1742 match (result, scalar_result) {
1743 (Ok(VwapOutput { values: out }), Ok(VwapOutput { values: ref_out })) => {
1744 prop_assert_eq!(out.len(), price_slice.len());
1745 prop_assert_eq!(ref_out.len(), price_slice.len());
1746
1747 let (count, unit_char) = parse_anchor(anchor).unwrap();
1748
1749 for i in 0..out.len() {
1750 let y = out[i];
1751 let r = ref_out[i];
1752
1753 if y.is_finite() && r.is_finite() {
1754 let y_bits = y.to_bits();
1755 let r_bits = r.to_bits();
1756 let ulp_diff: u64 = y_bits.abs_diff(r_bits);
1757 prop_assert!(
1758 (y - r).abs() <= 1e-9 || ulp_diff <= 5,
1759 "Kernel mismatch at {}: {} vs {} (ULP={})",
1760 i,
1761 y,
1762 r,
1763 ulp_diff
1764 );
1765 } else if y.is_nan() && r.is_nan() {
1766 continue;
1767 } else {
1768 prop_assert_eq!(
1769 y.is_nan(),
1770 r.is_nan(),
1771 "NaN mismatch at {}: {} vs {}",
1772 i,
1773 y,
1774 r
1775 );
1776 }
1777
1778 if y.is_finite() && vol_slice[i] > 0.0 && price_slice[i] > 0.0 {
1779 prop_assert!(
1780 y > 0.0,
1781 "VWAP should be positive at {} for positive price {} and volume {}",
1782 i, price_slice[i], vol_slice[i]
1783 );
1784 }
1785 }
1786
1787 let mut current_group_id = -1_i64;
1788 let mut group_start = 0;
1789
1790 for i in 0..ts_slice.len() {
1791 let ts_ms = ts_slice[i];
1792 let group_id = match unit_char {
1793 'm' => ts_ms / ((count as i64) * 60_000),
1794 'h' => ts_ms / ((count as i64) * 3_600_000),
1795 'd' => ts_ms / ((count as i64) * 86_400_000),
1796 'M' => floor_to_month(ts_ms, count).unwrap_or(-1),
1797 _ => -1,
1798 };
1799
1800 if group_id != current_group_id {
1801 if current_group_id != -1 && i > group_start {
1802 let mut vol_sum = 0.0;
1803 let mut vol_price_sum = 0.0;
1804 let mut min_price = f64::INFINITY;
1805 let mut max_price = f64::NEG_INFINITY;
1806
1807 for j in group_start..i {
1808 vol_sum += vol_slice[j];
1809 vol_price_sum += vol_slice[j] * price_slice[j];
1810 min_price = min_price.min(low_slice[j]);
1811 max_price = max_price.max(high_slice[j]);
1812 }
1813
1814 if vol_sum > 0.0 {
1815 let expected_vwap = vol_price_sum / vol_sum;
1816 let actual_vwap = out[i - 1];
1817
1818 prop_assert!(
1819 (actual_vwap - expected_vwap).abs() < 1e-6,
1820 "VWAP formula mismatch at group ending at {}: {} vs expected {}",
1821 i - 1, actual_vwap, expected_vwap
1822 );
1823
1824 prop_assert!(
1825 actual_vwap >= min_price - 1e-9
1826 && actual_vwap <= max_price + 1e-9,
1827 "VWAP {} outside price bounds [{}, {}] at {}",
1828 actual_vwap,
1829 min_price,
1830 max_price,
1831 i - 1
1832 );
1833 }
1834 }
1835
1836 current_group_id = group_id;
1837 group_start = i;
1838 }
1839 }
1840
1841 if price_slice.len() <= 50 {
1842 let mut stream = VwapStream::try_new(params).unwrap();
1843 let mut stream_values = Vec::with_capacity(price_slice.len());
1844
1845 for i in 0..price_slice.len() {
1846 match stream.update(ts_slice[i], price_slice[i], vol_slice[i]) {
1847 Some(val) => stream_values.push(val),
1848 None => stream_values.push(f64::NAN),
1849 }
1850 }
1851
1852 for (i, (&batch_val, &stream_val)) in
1853 out.iter().zip(stream_values.iter()).enumerate()
1854 {
1855 if batch_val.is_nan() && stream_val.is_nan() {
1856 continue;
1857 }
1858 if batch_val.is_finite() && stream_val.is_finite() {
1859 prop_assert!(
1860 (batch_val - stream_val).abs() < 1e-9,
1861 "Streaming mismatch at {}: batch={} vs stream={}",
1862 i,
1863 batch_val,
1864 stream_val
1865 );
1866 }
1867 }
1868 }
1869
1870 {
1871 let base_ts = 1609459200000_i64;
1872 let test_ts = vec![base_ts, base_ts + 3600000, base_ts + 7200000];
1873 let test_prices = vec![100.0, 200.0, 300.0];
1874 let test_volumes = vec![1.0, 2.0, 3.0];
1875
1876 let test_params = VwapParams {
1877 anchor: Some("1d".to_string()),
1878 };
1879 let test_input = VwapInput::from_slice(
1880 &test_ts,
1881 &test_volumes,
1882 &test_prices,
1883 test_params,
1884 );
1885
1886 if let Ok(VwapOutput { values: test_out }) =
1887 vwap_with_kernel(&test_input, kernel)
1888 {
1889 if test_out.len() >= 3 {
1890 if test_out[0].is_finite() {
1891 prop_assert!(
1892 (test_out[0] - 100.0).abs() < 1e-9,
1893 "VWAP at index 0 should be 100, got {}",
1894 test_out[0]
1895 );
1896 }
1897 if test_out[1].is_finite() {
1898 let expected_1 = 500.0 / 3.0;
1899 prop_assert!(
1900 (test_out[1] - expected_1).abs() < 1e-9,
1901 "VWAP at index 1 should be {}, got {}",
1902 expected_1,
1903 test_out[1]
1904 );
1905 }
1906 if test_out[2].is_finite() {
1907 let expected_2 = 1400.0 / 6.0;
1908 prop_assert!(
1909 (test_out[2] - expected_2).abs() < 1e-9,
1910 "VWAP at index 2 should be {}, got {}",
1911 expected_2,
1912 test_out[2]
1913 );
1914 }
1915 }
1916 }
1917 }
1918 }
1919 (Err(e1), Err(e2)) => {
1920 prop_assert_eq!(
1921 std::mem::discriminant(&e1),
1922 std::mem::discriminant(&e2),
1923 "Different error types: {:?} vs {:?}",
1924 e1,
1925 e2
1926 );
1927 }
1928 _ => {
1929 prop_assert!(
1930 false,
1931 "Kernel consistency failure: one succeeded, one failed"
1932 );
1933 }
1934 }
1935
1936 Ok(())
1937 })
1938 .unwrap();
1939
1940 Ok(())
1941 }
1942
1943 generate_all_vwap_tests!(
1944 check_vwap_partial_params,
1945 check_vwap_accuracy,
1946 check_vwap_candles_plus_prices,
1947 check_vwap_anchor_parsing_error,
1948 check_vwap_slice_data_reinput,
1949 check_vwap_nan_handling,
1950 check_vwap_with_default_candles,
1951 check_vwap_with_default_params,
1952 check_vwap_no_poison
1953 );
1954
1955 #[cfg(feature = "proptest")]
1956 generate_all_vwap_tests!(check_vwap_property);
1957 fn check_batch_default_row(test: &str, kernel: Kernel) -> Result<(), Box<dyn Error>> {
1958 skip_if_unsupported!(kernel, test);
1959 let file = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1960 let c = read_candles_from_csv(file)?;
1961 let timestamps = c.get_timestamp().unwrap();
1962 let prices = c.get_calculated_field("hlc3").unwrap();
1963 let volumes = c.select_candle_field("volume").unwrap();
1964
1965 let output = VwapBatchBuilder::new()
1966 .kernel(kernel)
1967 .apply_slice(timestamps, volumes, prices)?;
1968
1969 let def = VwapParams::default();
1970 let row = output
1971 .combos
1972 .iter()
1973 .position(|p| p.anchor == def.anchor)
1974 .expect("default row missing");
1975 let row_values = &output.values[row * output.cols..(row + 1) * output.cols];
1976
1977 assert_eq!(row_values.len(), c.close.len());
1978
1979 let expected = [
1980 59353.05963230107,
1981 59330.15815713043,
1982 59289.94649532547,
1983 59274.6155462414,
1984 58730.0,
1985 ];
1986 let start = row_values.len() - 5;
1987 for (i, &v) in row_values[start..].iter().enumerate() {
1988 assert!(
1989 (v - expected[i]).abs() < 1e-5,
1990 "[{test}] default-row mismatch at idx {i}: {v} vs {expected:?}"
1991 );
1992 }
1993 Ok(())
1994 }
1995
1996 fn check_batch_anchor_grid(test: &str, kernel: Kernel) -> Result<(), Box<dyn Error>> {
1997 skip_if_unsupported!(kernel, test);
1998 let file = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
1999 let c = read_candles_from_csv(file)?;
2000 let timestamps = c.get_timestamp().unwrap();
2001 let prices = c.get_calculated_field("hlc3").unwrap();
2002 let volumes = c.select_candle_field("volume").unwrap();
2003
2004 let batch = VwapBatchBuilder::new()
2005 .kernel(kernel)
2006 .anchor_range("1d", "3d", 1)
2007 .apply_slice(timestamps, volumes, prices)?;
2008
2009 assert_eq!(batch.cols, c.close.len());
2010 assert!(batch.rows >= 1 && batch.rows <= 3);
2011
2012 let anchors: Vec<_> = batch
2013 .combos
2014 .iter()
2015 .map(|p| p.anchor.clone().unwrap())
2016 .collect();
2017 assert_eq!(
2018 anchors,
2019 vec!["1d".to_string(), "2d".to_string(), "3d".to_string()]
2020 );
2021 Ok(())
2022 }
2023
2024 macro_rules! gen_batch_tests {
2025 ($fn_name:ident) => {
2026 paste::paste! {
2027 #[test] fn [<$fn_name _scalar>]() {
2028 let _ = $fn_name(stringify!([<$fn_name _scalar>]), Kernel::ScalarBatch);
2029 }
2030 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
2031 #[test] fn [<$fn_name _avx2>]() {
2032 let _ = $fn_name(stringify!([<$fn_name _avx2>]), Kernel::Avx2Batch);
2033 }
2034 #[cfg(all(feature = "nightly-avx", target_arch = "x86_64"))]
2035 #[test] fn [<$fn_name _avx512>]() {
2036 let _ = $fn_name(stringify!([<$fn_name _avx512>]), Kernel::Avx512Batch);
2037 }
2038 #[test] fn [<$fn_name _auto_detect>]() {
2039 let _ = $fn_name(stringify!([<$fn_name _auto_detect>]), Kernel::Auto);
2040 }
2041 }
2042 };
2043 }
2044
2045 #[cfg(debug_assertions)]
2046 fn check_batch_no_poison(test: &str, kernel: Kernel) -> Result<(), Box<dyn Error>> {
2047 skip_if_unsupported!(kernel, test);
2048
2049 let file = "src/data/2018-09-01-2024-Bitfinex_Spot-4h.csv";
2050 let c = read_candles_from_csv(file)?;
2051 let timestamps = c.get_timestamp().unwrap();
2052 let volumes = c.select_candle_field("volume").unwrap();
2053
2054 let anchor_ranges = vec![
2055 ("1m", "5m", 1),
2056 ("1h", "6h", 1),
2057 ("1d", "5d", 1),
2058 ("7d", "14d", 7),
2059 ];
2060
2061 let test_sources = vec!["close", "open", "high", "low", "hl2", "hlc3", "ohlc4"];
2062
2063 for (start, end, step) in anchor_ranges {
2064 for source in &test_sources {
2065 let prices = match *source {
2066 "close" => c.select_candle_field("close").unwrap(),
2067 "open" => c.select_candle_field("open").unwrap(),
2068 "high" => c.select_candle_field("high").unwrap(),
2069 "low" => c.select_candle_field("low").unwrap(),
2070 _ => c.get_calculated_field(source).unwrap(),
2071 };
2072
2073 let output = VwapBatchBuilder::new()
2074 .kernel(kernel)
2075 .anchor_range(start, end, step)
2076 .apply_slice(timestamps, volumes, prices)?;
2077
2078 for (idx, &val) in output.values.iter().enumerate() {
2079 if val.is_nan() {
2080 continue;
2081 }
2082
2083 let bits = val.to_bits();
2084 let row = idx / output.cols;
2085 let col = idx % output.cols;
2086
2087 if bits == 0x11111111_11111111 {
2088 panic!(
2089 "[{}] Found alloc_with_nan_prefix poison value {} (0x{:016X}) at row {} col {} (flat index {}) with anchor_range({},{},{}) source={}",
2090 test, val, bits, row, col, idx, start, end, step, source
2091 );
2092 }
2093
2094 if bits == 0x22222222_22222222 {
2095 panic!(
2096 "[{}] Found init_matrix_prefixes poison value {} (0x{:016X}) at row {} col {} (flat index {}) with anchor_range({},{},{}) source={}",
2097 test, val, bits, row, col, idx, start, end, step, source
2098 );
2099 }
2100
2101 if bits == 0x33333333_33333333 {
2102 panic!(
2103 "[{}] Found make_uninit_matrix poison value {} (0x{:016X}) at row {} col {} (flat index {}) with anchor_range({},{},{}) source={}",
2104 test, val, bits, row, col, idx, start, end, step, source
2105 );
2106 }
2107 }
2108 }
2109 }
2110
2111 Ok(())
2112 }
2113
2114 #[cfg(not(debug_assertions))]
2115 fn check_batch_no_poison(_test: &str, _kernel: Kernel) -> Result<(), Box<dyn Error>> {
2116 Ok(())
2117 }
2118
2119 gen_batch_tests!(check_batch_default_row);
2120 gen_batch_tests!(check_batch_anchor_grid);
2121 gen_batch_tests!(check_batch_no_poison);
2122}
2123
2124#[cfg(feature = "python")]
2125#[pyfunction(name = "vwap")]
2126#[pyo3(signature = (timestamps, volumes, prices, anchor=None, kernel=None))]
2127
2128pub fn vwap_py<'py>(
2129 py: Python<'py>,
2130 timestamps: numpy::PyReadonlyArray1<'py, i64>,
2131 volumes: numpy::PyReadonlyArray1<'py, f64>,
2132 prices: numpy::PyReadonlyArray1<'py, f64>,
2133 anchor: Option<&str>,
2134 kernel: Option<&str>,
2135) -> PyResult<Bound<'py, numpy::PyArray1<f64>>> {
2136 use numpy::{IntoPyArray, PyArrayMethods};
2137
2138 let ts_slice = timestamps.as_slice()?;
2139 let vol_slice = volumes.as_slice()?;
2140 let price_slice = prices.as_slice()?;
2141
2142 let kern = crate::utilities::kernel_validation::validate_kernel(kernel, false)?;
2143
2144 let params = VwapParams {
2145 anchor: anchor.map(|s| s.to_string()),
2146 };
2147 let vwap_in = VwapInput::from_slice(ts_slice, vol_slice, price_slice, params);
2148
2149 let result_vec: Vec<f64> = py
2150 .allow_threads(|| vwap_with_kernel(&vwap_in, kern).map(|o| o.values))
2151 .map_err(|e| PyValueError::new_err(e.to_string()))?;
2152
2153 Ok(result_vec.into_pyarray(py))
2154}
2155
2156#[cfg(feature = "python")]
2157#[pyclass(name = "VwapStream")]
2158pub struct VwapStreamPy {
2159 stream: VwapStream,
2160}
2161
2162#[cfg(feature = "python")]
2163#[pymethods]
2164impl VwapStreamPy {
2165 #[new]
2166 fn new(anchor: Option<&str>) -> PyResult<Self> {
2167 let params = VwapParams {
2168 anchor: anchor.map(|s| s.to_string()),
2169 };
2170 let stream =
2171 VwapStream::try_new(params).map_err(|e| PyValueError::new_err(e.to_string()))?;
2172 Ok(VwapStreamPy { stream })
2173 }
2174
2175 fn update(&mut self, timestamp: i64, price: f64, volume: f64) -> Option<f64> {
2176 self.stream.update(timestamp, price, volume)
2177 }
2178}
2179
2180#[cfg(feature = "python")]
2181#[pyfunction(name = "vwap_batch")]
2182#[pyo3(signature = (timestamps, volumes, prices, anchor_range, kernel=None))]
2183
2184pub fn vwap_batch_py<'py>(
2185 py: Python<'py>,
2186 timestamps: numpy::PyReadonlyArray1<'py, i64>,
2187 volumes: numpy::PyReadonlyArray1<'py, f64>,
2188 prices: numpy::PyReadonlyArray1<'py, f64>,
2189 anchor_range: (String, String, u32),
2190 kernel: Option<&str>,
2191) -> PyResult<Bound<'py, pyo3::types::PyDict>> {
2192 use numpy::{IntoPyArray, PyArray1, PyArrayMethods};
2193 use pyo3::types::PyDict;
2194
2195 let ts_slice = timestamps.as_slice()?;
2196 let vol_slice = volumes.as_slice()?;
2197 let price_slice = prices.as_slice()?;
2198
2199 let kern = crate::utilities::kernel_validation::validate_kernel(kernel, true)?;
2200
2201 let sweep = VwapBatchRange {
2202 anchor: (anchor_range.0, anchor_range.1, anchor_range.2),
2203 };
2204
2205 let combos = expand_grid_vwap(&sweep);
2206 let rows = combos.len();
2207 let cols = price_slice.len();
2208
2209 let out_arr = unsafe { PyArray1::<f64>::new(py, [rows * cols], false) };
2210 let slice_out = unsafe { out_arr.as_slice_mut()? };
2211
2212 let combos = py
2213 .allow_threads(|| {
2214 let kernel = match kern {
2215 Kernel::Auto => detect_best_batch_kernel(),
2216 k => k,
2217 };
2218 let simd = match kernel {
2219 Kernel::Avx512Batch => Kernel::Avx512,
2220 Kernel::Avx2Batch => Kernel::Avx2,
2221 Kernel::ScalarBatch => Kernel::Scalar,
2222 _ => unreachable!(),
2223 };
2224 vwap_batch_inner_into(
2225 ts_slice,
2226 vol_slice,
2227 price_slice,
2228 &sweep,
2229 simd,
2230 true,
2231 slice_out,
2232 )
2233 })
2234 .map_err(|e| PyValueError::new_err(e.to_string()))?;
2235
2236 let dict = PyDict::new(py);
2237 dict.set_item("values", out_arr.reshape((rows, cols))?)?;
2238
2239 let anchors_list = PyList::new(
2240 py,
2241 combos
2242 .iter()
2243 .map(|p| p.anchor.clone().unwrap_or_else(|| "1d".to_string())),
2244 )?;
2245 dict.set_item("anchors", anchors_list)?;
2246
2247 Ok(dict)
2248}
2249
2250#[cfg(all(feature = "python", feature = "cuda"))]
2251#[pyfunction(name = "vwap_cuda_batch_dev")]
2252#[pyo3(signature = (timestamps, volumes, prices, anchor_range, device_id=0))]
2253pub fn vwap_cuda_batch_dev_py(
2254 py: Python<'_>,
2255 timestamps: numpy::PyReadonlyArray1<'_, i64>,
2256 volumes: numpy::PyReadonlyArray1<'_, f64>,
2257 prices: numpy::PyReadonlyArray1<'_, f64>,
2258 anchor_range: (String, String, u32),
2259 device_id: usize,
2260) -> PyResult<DeviceArrayF32VwapPy> {
2261 use numpy::PyArrayMethods;
2262
2263 if !cuda_available() {
2264 return Err(PyValueError::new_err("CUDA not available"));
2265 }
2266
2267 let ts_slice = timestamps.as_slice()?;
2268 let vol_slice = volumes.as_slice()?;
2269 let price_slice = prices.as_slice()?;
2270
2271 if ts_slice.len() != vol_slice.len() || vol_slice.len() != price_slice.len() {
2272 return Err(PyValueError::new_err(
2273 "timestamps, volumes, and prices must share the same length",
2274 ));
2275 }
2276
2277 let (start, end, step) = anchor_range;
2278 let sweep = VwapBatchRange {
2279 anchor: (start, end, step),
2280 };
2281
2282 let (inner, dev) = py
2283 .allow_threads(
2284 || -> Result<_, crate::cuda::moving_averages::vwap_wrapper::CudaVwapError> {
2285 let cuda = CudaVwap::new(device_id)?;
2286 let arr =
2287 cuda.vwap_batch_dev_retaining_ctx(ts_slice, vol_slice, price_slice, &sweep)?;
2288 Ok((arr, cuda.device_id()))
2289 },
2290 )
2291 .map_err(|e| PyValueError::new_err(e.to_string()))?;
2292
2293 Ok(DeviceArrayF32VwapPy {
2294 inner: Some(inner),
2295 device_id: dev,
2296 })
2297}
2298
2299#[cfg(all(feature = "python", feature = "cuda"))]
2300#[pyfunction(name = "vwap_cuda_many_series_one_param_dev")]
2301#[pyo3(signature = (timestamps, prices_tm, volumes_tm, anchor, device_id=0))]
2302pub fn vwap_cuda_many_series_one_param_dev_py(
2303 py: Python<'_>,
2304 timestamps: numpy::PyReadonlyArray1<'_, i64>,
2305 prices_tm: numpy::PyReadonlyArray2<'_, f64>,
2306 volumes_tm: numpy::PyReadonlyArray2<'_, f64>,
2307 anchor: String,
2308 device_id: usize,
2309) -> PyResult<DeviceArrayF32VwapPy> {
2310 use numpy::PyArrayMethods;
2311 use numpy::PyUntypedArrayMethods;
2312
2313 if !cuda_available() {
2314 return Err(PyValueError::new_err("CUDA not available"));
2315 }
2316
2317 let ts_slice = timestamps.as_slice()?;
2318 let p_shape = prices_tm.shape();
2319 let v_shape = volumes_tm.shape();
2320 if p_shape != v_shape {
2321 return Err(PyValueError::new_err(
2322 "prices_tm and volumes_tm shapes must match",
2323 ));
2324 }
2325 let rows = p_shape[0];
2326 let cols = p_shape[1];
2327 if ts_slice.len() != rows {
2328 return Err(PyValueError::new_err(
2329 "timestamps length must equal rows of matrices",
2330 ));
2331 }
2332 let prices_flat = prices_tm.as_slice()?;
2333 let volumes_flat = volumes_tm.as_slice()?;
2334
2335 let (inner, dev) = py
2336 .allow_threads(
2337 || -> Result<_, crate::cuda::moving_averages::vwap_wrapper::CudaVwapError> {
2338 let cuda = CudaVwap::new(device_id)?;
2339 let arr = cuda.vwap_many_series_one_param_time_major_dev_retaining_ctx(
2340 ts_slice,
2341 volumes_flat,
2342 prices_flat,
2343 cols,
2344 rows,
2345 &anchor,
2346 )?;
2347 Ok((arr, cuda.device_id()))
2348 },
2349 )
2350 .map_err(|e| PyValueError::new_err(e.to_string()))?;
2351
2352 Ok(DeviceArrayF32VwapPy {
2353 inner: Some(inner),
2354 device_id: dev,
2355 })
2356}
2357
2358#[cfg(all(feature = "python", feature = "cuda"))]
2359#[pyclass(module = "ta_indicators.cuda", name = "DeviceArrayF32Vwap", unsendable)]
2360pub struct DeviceArrayF32VwapPy {
2361 pub(crate) inner: Option<VwapDeviceArrayF32>,
2362 pub(crate) device_id: u32,
2363}
2364
2365#[cfg(all(feature = "python", feature = "cuda"))]
2366#[pymethods]
2367impl DeviceArrayF32VwapPy {
2368 #[getter]
2369 fn __cuda_array_interface__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
2370 let inner = self
2371 .inner
2372 .as_ref()
2373 .ok_or_else(|| PyValueError::new_err("buffer already exported via __dlpack__"))?;
2374 let d = PyDict::new(py);
2375 d.set_item("shape", (inner.rows, inner.cols))?;
2376 d.set_item("typestr", "<f4")?;
2377 d.set_item(
2378 "strides",
2379 (
2380 inner.cols * std::mem::size_of::<f32>(),
2381 std::mem::size_of::<f32>(),
2382 ),
2383 )?;
2384 let ptr_val: usize = if inner.rows == 0 || inner.cols == 0 {
2385 0
2386 } else {
2387 inner.buf.as_device_ptr().as_raw() as usize
2388 };
2389 d.set_item("data", (ptr_val, false))?;
2390
2391 d.set_item("version", 3)?;
2392 Ok(d)
2393 }
2394
2395 fn __dlpack_device__(&self) -> PyResult<(i32, i32)> {
2396 Ok((2, self.device_id as i32))
2397 }
2398
2399 #[pyo3(signature = (stream=None, max_version=None, dl_device=None, copy=None))]
2400 fn __dlpack__<'py>(
2401 &mut self,
2402 py: Python<'py>,
2403 stream: Option<PyObject>,
2404 max_version: Option<PyObject>,
2405 dl_device: Option<PyObject>,
2406 copy: Option<PyObject>,
2407 ) -> PyResult<PyObject> {
2408 let (kdl, alloc_dev) = self.__dlpack_device__()?;
2409 if let Some(dev_obj) = dl_device.as_ref() {
2410 if let Ok((dev_ty, dev_id)) = dev_obj.extract::<(i32, i32)>(py) {
2411 if dev_ty != kdl || dev_id != alloc_dev {
2412 let wants_copy = copy
2413 .as_ref()
2414 .and_then(|c| c.extract::<bool>(py).ok())
2415 .unwrap_or(false);
2416 if wants_copy {
2417 return Err(PyValueError::new_err(
2418 "device copy not implemented for __dlpack__",
2419 ));
2420 } else {
2421 return Err(PyValueError::new_err("dl_device mismatch for __dlpack__"));
2422 }
2423 }
2424 }
2425 }
2426 let _ = stream;
2427
2428 let inner = self
2429 .inner
2430 .take()
2431 .ok_or_else(|| PyValueError::new_err("buffer already exported via __dlpack__"))?;
2432
2433 let rows = inner.rows;
2434 let cols = inner.cols;
2435 let buf = inner.buf;
2436
2437 let max_version_bound = max_version.map(|obj| obj.into_bound(py));
2438 export_f32_cuda_dlpack_2d(py, buf, rows, cols, alloc_dev, max_version_bound)
2439 }
2440}
2441
2442#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2443#[wasm_bindgen]
2444pub fn vwap_js(
2445 timestamps: &[f64],
2446 volumes: &[f64],
2447 prices: &[f64],
2448 anchor: Option<String>,
2449 kernel: Option<String>,
2450) -> Result<Vec<f64>, JsValue> {
2451 let ts_i64: Vec<i64> = timestamps
2452 .iter()
2453 .map(|&t| {
2454 if t.is_nan() || t.is_infinite() || t < 0.0 {
2455 return Err(JsValue::from_str(&format!("Invalid timestamp: {}", t)));
2456 }
2457 Ok(t as i64)
2458 })
2459 .collect::<Result<Vec<_>, _>>()?;
2460
2461 let kern = match kernel.as_deref() {
2462 None | Some("auto") => Kernel::Auto,
2463 Some("scalar") => Kernel::Scalar,
2464 Some("scalar_batch") => Kernel::ScalarBatch,
2465 Some(k) => return Err(JsValue::from_str(&format!("Unknown kernel: {}", k))),
2466 };
2467
2468 let params = VwapParams { anchor };
2469 let input = VwapInput::from_slice(&ts_i64, volumes, prices, params);
2470
2471 let mut output = vec![0.0; prices.len()];
2472
2473 vwap_into_slice(&mut output, &input, kern).map_err(|e| JsValue::from_str(&e.to_string()))?;
2474
2475 Ok(output)
2476}
2477
2478#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2479#[wasm_bindgen]
2480pub fn vwap_into(
2481 timestamps_ptr: *const f64,
2482 volumes_ptr: *const f64,
2483 prices_ptr: *const f64,
2484 out_ptr: *mut f64,
2485 len: usize,
2486 anchor: Option<String>,
2487) -> Result<(), JsValue> {
2488 if timestamps_ptr.is_null()
2489 || volumes_ptr.is_null()
2490 || prices_ptr.is_null()
2491 || out_ptr.is_null()
2492 {
2493 return Err(JsValue::from_str("null pointer passed to vwap_into"));
2494 }
2495
2496 unsafe {
2497 let ts_f64 = std::slice::from_raw_parts(timestamps_ptr, len);
2498 let vols = std::slice::from_raw_parts(volumes_ptr, len);
2499 let pric = std::slice::from_raw_parts(prices_ptr, len);
2500
2501 let mut ts_i64 = Vec::with_capacity(len);
2502 ts_i64.set_len(len);
2503 for i in 0..len {
2504 let t = ts_f64[i];
2505 if !t.is_finite() || t < 0.0 {
2506 return Err(JsValue::from_str("invalid timestamp"));
2507 }
2508 *ts_i64.get_unchecked_mut(i) = t as i64;
2509 }
2510
2511 let params = VwapParams { anchor };
2512 let input = VwapInput::from_slice(&ts_i64, vols, pric, params);
2513
2514 if core::ptr::eq(prices_ptr, out_ptr as *const f64) {
2515 let mut tmp = vec![0.0; len];
2516 vwap_into_slice(&mut tmp, &input, detect_best_kernel())
2517 .map_err(|e| JsValue::from_str(&e.to_string()))?;
2518 let out = std::slice::from_raw_parts_mut(out_ptr, len);
2519 out.copy_from_slice(&tmp);
2520 } else {
2521 let out = std::slice::from_raw_parts_mut(out_ptr, len);
2522 vwap_into_slice(out, &input, detect_best_kernel())
2523 .map_err(|e| JsValue::from_str(&e.to_string()))?;
2524 }
2525 Ok(())
2526 }
2527}
2528
2529#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2530#[wasm_bindgen]
2531pub fn vwap_alloc(len: usize) -> *mut f64 {
2532 let mut vec = Vec::<f64>::with_capacity(len);
2533 let ptr = vec.as_mut_ptr();
2534 std::mem::forget(vec);
2535 ptr
2536}
2537
2538#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2539#[wasm_bindgen]
2540pub fn vwap_free(ptr: *mut f64, len: usize) {
2541 if !ptr.is_null() {
2542 unsafe {
2543 let _ = Vec::from_raw_parts(ptr, len, len);
2544 }
2545 }
2546}
2547
2548#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2549#[derive(Serialize, Deserialize)]
2550pub struct VwapBatchConfig {
2551 pub anchor_range: (String, String, u32),
2552}
2553
2554#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2555#[derive(Serialize, Deserialize)]
2556pub struct VwapBatchJsOutput {
2557 pub values: Vec<f64>,
2558 pub combos: Vec<VwapParams>,
2559 pub rows: usize,
2560 pub cols: usize,
2561}
2562
2563#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2564#[wasm_bindgen(js_name = "vwap_batch")]
2565pub fn vwap_batch_unified_js(
2566 timestamps: &[f64],
2567 volumes: &[f64],
2568 prices: &[f64],
2569 start: String,
2570 end: String,
2571 step: u32,
2572) -> Result<JsValue, JsValue> {
2573 let ts_i64: Vec<i64> = timestamps
2574 .iter()
2575 .map(|&t| {
2576 if !t.is_finite() || t < 0.0 {
2577 return Err(JsValue::from_str("invalid timestamp"));
2578 }
2579 Ok(t as i64)
2580 })
2581 .collect::<Result<_, _>>()?;
2582
2583 let sweep = VwapBatchRange {
2584 anchor: (start, end, step),
2585 };
2586
2587 let kernel = match detect_best_kernel() {
2588 Kernel::Auto => Kernel::Scalar,
2589 k => k,
2590 };
2591
2592 let out = vwap_batch_inner(&ts_i64, volumes, prices, &sweep, kernel, false)
2593 .map_err(|e| JsValue::from_str(&e.to_string()))?;
2594
2595 let js = VwapBatchJsOutput {
2596 values: out.values,
2597 combos: out.combos,
2598 rows: out.rows,
2599 cols: out.cols,
2600 };
2601 serde_wasm_bindgen::to_value(&js).map_err(|e| JsValue::from_str(&e.to_string()))
2602}
2603
2604#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2605#[wasm_bindgen]
2606pub fn vwap_batch_into(
2607 timestamps_ptr: *const f64,
2608 volumes_ptr: *const f64,
2609 prices_ptr: *const f64,
2610 out_ptr: *mut f64,
2611 len: usize,
2612 anchor_start: String,
2613 anchor_end: String,
2614 anchor_step: u32,
2615) -> Result<usize, JsValue> {
2616 if timestamps_ptr.is_null()
2617 || volumes_ptr.is_null()
2618 || prices_ptr.is_null()
2619 || out_ptr.is_null()
2620 {
2621 return Err(JsValue::from_str("null pointer passed to vwap_batch_into"));
2622 }
2623
2624 unsafe {
2625 let timestamps_f64 = std::slice::from_raw_parts(timestamps_ptr, len);
2626 let volumes = std::slice::from_raw_parts(volumes_ptr, len);
2627 let prices = std::slice::from_raw_parts(prices_ptr, len);
2628
2629 let ts_i64: Vec<i64> = timestamps_f64.iter().map(|&t| t as i64).collect();
2630
2631 let sweep = VwapBatchRange {
2632 anchor: (anchor_start, anchor_end, anchor_step),
2633 };
2634
2635 let combos = expand_grid_vwap(&sweep);
2636 let rows = combos.len();
2637 let cols = len;
2638
2639 let out = std::slice::from_raw_parts_mut(out_ptr, rows * cols);
2640
2641 vwap_batch_inner_into(
2642 &ts_i64,
2643 volumes,
2644 prices,
2645 &sweep,
2646 detect_best_kernel(),
2647 false,
2648 out,
2649 )
2650 .map_err(|e| JsValue::from_str(&e.to_string()))?;
2651
2652 Ok(rows)
2653 }
2654}
2655
2656#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2657#[wasm_bindgen]
2658pub fn vwap_batch_metadata_js(
2659 anchor_start: String,
2660 anchor_end: String,
2661 anchor_step: u32,
2662) -> Result<Vec<String>, JsValue> {
2663 let sweep = VwapBatchRange {
2664 anchor: (anchor_start, anchor_end, anchor_step),
2665 };
2666
2667 let combos = expand_grid_vwap(&sweep);
2668 let metadata: Vec<String> = combos
2669 .iter()
2670 .map(|c| c.anchor.clone().unwrap_or_else(|| "1d".to_string()))
2671 .collect();
2672
2673 Ok(metadata)
2674}
2675
2676#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2677#[wasm_bindgen]
2678#[deprecated(
2679 since = "1.0.0",
2680 note = "For anchor state reuse patterns, use the fast/unsafe API with persistent buffers"
2681)]
2682pub struct VwapContext {
2683 anchor: String,
2684}
2685
2686#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
2687#[wasm_bindgen]
2688#[allow(deprecated)]
2689impl VwapContext {
2690 #[wasm_bindgen(constructor)]
2691 #[deprecated(
2692 since = "1.0.0",
2693 note = "For anchor state reuse patterns, use the fast/unsafe API with persistent buffers"
2694 )]
2695 pub fn new(anchor: String) -> Result<VwapContext, JsValue> {
2696 if anchor.is_empty() {
2697 return Err(JsValue::from_str("Invalid anchor: empty string"));
2698 }
2699
2700 let _ = parse_anchor(&anchor).map_err(|e| JsValue::from_str(&e.to_string()))?;
2701
2702 Ok(VwapContext { anchor })
2703 }
2704
2705 pub fn update_into(
2706 &self,
2707 timestamps_ptr: *const f64,
2708 volumes_ptr: *const f64,
2709 prices_ptr: *const f64,
2710 out_ptr: *mut f64,
2711 len: usize,
2712 ) -> Result<(), JsValue> {
2713 if timestamps_ptr.is_null()
2714 || volumes_ptr.is_null()
2715 || prices_ptr.is_null()
2716 || out_ptr.is_null()
2717 {
2718 return Err(JsValue::from_str("null pointer passed to update_into"));
2719 }
2720
2721 unsafe {
2722 let timestamps_f64 = std::slice::from_raw_parts(timestamps_ptr, len);
2723 let volumes = std::slice::from_raw_parts(volumes_ptr, len);
2724 let prices = std::slice::from_raw_parts(prices_ptr, len);
2725
2726 let ts_i64: Vec<i64> = timestamps_f64
2727 .iter()
2728 .map(|&t| {
2729 if t.is_nan() || t.is_infinite() || t < 0.0 {
2730 return Err(JsValue::from_str(&format!("Invalid timestamp: {}", t)));
2731 }
2732 Ok(t as i64)
2733 })
2734 .collect::<Result<Vec<_>, _>>()?;
2735
2736 let params = VwapParams {
2737 anchor: Some(self.anchor.clone()),
2738 };
2739 let input = VwapInput::from_slice(&ts_i64, volumes, prices, params);
2740
2741 let out = std::slice::from_raw_parts_mut(out_ptr, len);
2742 vwap_into_slice(out, &input, Kernel::Auto)
2743 .map_err(|e| JsValue::from_str(&e.to_string()))?;
2744 }
2745
2746 Ok(())
2747 }
2748}