Skip to main content

wickra_data/
csv.rs

1//! Stream OHLCV candles out of a CSV file.
2//!
3//! The reader is generic over the column layout, but ships with a sensible
4//! default ("timestamp,open,high,low,close,volume") that matches the standard
5//! Binance / Yahoo Finance / kaggle dataset format.
6//!
7//! The reader is defensive about real-world files: a leading UTF-8 byte-order
8//! mark is stripped, surrounding whitespace is trimmed from every field, and a
9//! file whose header does not name the required columns is rejected with a
10//! clear [`Error::Malformed`] instead of silently consuming its first data row.
11
12use std::path::Path;
13
14use serde::Deserialize;
15
16use crate::error::{Error, Result};
17use wickra_core::Candle;
18
19/// Column names the default OHLCV layout requires. The CSV header must contain
20/// every one of these (extra columns are ignored); matching is exact and
21/// case-sensitive because the underlying `serde` deserialization maps header
22/// names to [`DefaultRow`]'s fields by name.
23const REQUIRED_COLUMNS: [&str; 6] = ["timestamp", "open", "high", "low", "close", "volume"];
24
25/// Default OHLCV CSV row layout.
26///
27/// The timestamp is parsed as an `i64`; if your file ships an RFC3339 / ISO8601
28/// string instead, use [`CandleReader::with_timestamp_parser`].
29#[derive(Debug, Clone, Deserialize)]
30pub struct DefaultRow {
31    pub timestamp: i64,
32    pub open: f64,
33    pub high: f64,
34    pub low: f64,
35    pub close: f64,
36    pub volume: f64,
37}
38
39impl DefaultRow {
40    fn into_candle(self) -> Result<Candle> {
41        Candle::new(
42            self.open,
43            self.high,
44            self.low,
45            self.close,
46            self.volume,
47            self.timestamp,
48        )
49        .map_err(Error::from)
50    }
51}
52
53/// A [`std::io::Read`] adapter that transparently skips a leading UTF-8
54/// byte-order mark.
55///
56/// Spreadsheet exporters — Excel in particular — prefix CSV files with the
57/// three-byte UTF-8 BOM `EF BB BF`. Left in place it becomes part of the first
58/// header name (`\u{feff}timestamp`), which then fails to match the
59/// `timestamp` column. This adapter drops the BOM before the CSV parser ever
60/// sees it; files without a BOM pass through untouched.
61#[derive(Debug)]
62pub struct BomStripReader<R> {
63    inner: R,
64    /// Whether the leading bytes have been inspected for a BOM yet.
65    checked: bool,
66    /// Bytes read during BOM detection that turned out *not* to be a BOM and
67    /// must still be handed to the consumer.
68    leftover: Vec<u8>,
69    leftover_pos: usize,
70}
71
72impl<R: std::io::Read> BomStripReader<R> {
73    /// Wrap `inner`, stripping a leading UTF-8 BOM on the first read.
74    pub fn new(inner: R) -> Self {
75        Self {
76            inner,
77            checked: false,
78            leftover: Vec::new(),
79            leftover_pos: 0,
80        }
81    }
82
83    /// On the first read, consume up to three bytes and decide whether they
84    /// form a BOM. A BOM is discarded; anything else is buffered for replay.
85    fn check_bom(&mut self) -> std::io::Result<()> {
86        if self.checked {
87            return Ok(());
88        }
89        self.checked = true;
90
91        let mut probe = [0u8; 3];
92        let mut filled = 0;
93        while filled < probe.len() {
94            let n = self.inner.read(&mut probe[filled..])?;
95            if n == 0 {
96                break; // short source — fewer than 3 bytes total
97            }
98            filled += n;
99        }
100
101        if probe[..filled] != [0xEF, 0xBB, 0xBF] {
102            // Not a BOM (or a short file): replay every probed byte verbatim.
103            self.leftover.extend_from_slice(&probe[..filled]);
104        }
105        Ok(())
106    }
107}
108
109impl<R: std::io::Read> std::io::Read for BomStripReader<R> {
110    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
111        self.check_bom()?;
112        if self.leftover_pos < self.leftover.len() {
113            let n = (self.leftover.len() - self.leftover_pos).min(buf.len());
114            buf[..n].copy_from_slice(&self.leftover[self.leftover_pos..self.leftover_pos + n]);
115            self.leftover_pos += n;
116            return Ok(n);
117        }
118        self.inner.read(buf)
119    }
120}
121
122/// Validate that a CSV reader's header row names every required OHLCV column.
123fn validate_headers<R: std::io::Read>(reader: &mut csv::Reader<R>) -> Result<()> {
124    let headers = reader.headers()?;
125    let present: Vec<String> = headers.iter().map(|h| h.trim().to_string()).collect();
126    let missing: Vec<&str> = REQUIRED_COLUMNS
127        .iter()
128        .copied()
129        .filter(|col| !present.iter().any(|h| h == col))
130        .collect();
131    if !missing.is_empty() {
132        return Err(Error::Malformed(format!(
133            "CSV header is missing required column(s) [{}]; found [{}] — \
134             the first line must be a header naming {}",
135            missing.join(", "),
136            present.join(", "),
137            REQUIRED_COLUMNS.join(",")
138        )));
139    }
140    Ok(())
141}
142
143/// Streaming OHLCV CSV reader.
144#[derive(Debug)]
145pub struct CandleReader<R: std::io::Read> {
146    reader: csv::Reader<R>,
147}
148
149impl<R: std::io::Read> CandleReader<R> {
150    /// Build a trimming CSV reader around `inner` and validate its header.
151    fn build(inner: R) -> Result<Self> {
152        let mut reader = csv::ReaderBuilder::new()
153            .has_headers(true)
154            .trim(csv::Trim::All)
155            .from_reader(inner);
156        validate_headers(&mut reader)?;
157        Ok(Self { reader })
158    }
159}
160
161impl CandleReader<BomStripReader<std::fs::File>> {
162    /// Open a CSV file at `path`.
163    ///
164    /// The first line must be a header row naming the OHLCV columns; a leading
165    /// UTF-8 BOM and whitespace around values are tolerated.
166    ///
167    /// # Errors
168    /// Returns [`Error::Io`] if the file cannot be opened and
169    /// [`Error::Malformed`] if the header does not contain every required
170    /// column (`timestamp,open,high,low,close,volume`).
171    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
172        let file = std::fs::File::open(path)?;
173        Self::from_reader(file)
174    }
175}
176
177impl<R: std::io::Read> CandleReader<BomStripReader<R>> {
178    /// Build a reader from any [`std::io::Read`] source.
179    ///
180    /// A leading UTF-8 BOM is stripped and the header is validated.
181    ///
182    /// # Errors
183    /// Returns [`Error::Malformed`] if the header does not contain every
184    /// required column.
185    pub fn from_reader(inner: R) -> Result<Self> {
186        Self::build(BomStripReader::new(inner))
187    }
188}
189
190impl<R: std::io::Read> CandleReader<R> {
191    /// Wrap a pre-built [`csv::Reader`]; useful for testing or for non-default
192    /// reader configuration.
193    ///
194    /// Unlike [`from_reader`](Self::from_reader) this does *not* strip a BOM —
195    /// the caller owns the reader's configuration — but the header is still
196    /// validated.
197    ///
198    /// # Errors
199    /// Returns [`Error::Malformed`] if the header does not contain every
200    /// required column.
201    pub fn from_csv_reader(mut reader: csv::Reader<R>) -> Result<Self> {
202        validate_headers(&mut reader)?;
203        Ok(Self { reader })
204    }
205
206    /// Iterator over decoded candles.
207    pub fn candles(&mut self) -> impl Iterator<Item = Result<Candle>> + '_ {
208        self.reader.deserialize::<DefaultRow>().map(|row_res| {
209            let row = row_res?;
210            row.into_candle()
211        })
212    }
213
214    /// Read the entire stream into a `Vec<Candle>`. Convenient for backtests.
215    pub fn read_all(&mut self) -> Result<Vec<Candle>> {
216        self.candles().collect()
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use std::io::Write;
224
225    #[test]
226    fn reads_well_formed_csv() {
227        let mut tmp = tempfile::NamedTempFile::new().unwrap();
228        writeln!(tmp, "timestamp,open,high,low,close,volume").unwrap();
229        writeln!(tmp, "1,10.0,11.0,9.0,10.5,100").unwrap();
230        writeln!(tmp, "2,10.5,11.5,10.0,11.0,150").unwrap();
231        writeln!(tmp, "3,11.0,12.0,10.5,11.5,200").unwrap();
232        tmp.flush().unwrap();
233
234        let mut r = CandleReader::open(tmp.path()).unwrap();
235        let candles = r.read_all().unwrap();
236        assert_eq!(candles.len(), 3);
237        assert_eq!(candles[0].open, 10.0);
238        assert_eq!(candles[2].close, 11.5);
239        assert_eq!(candles[1].timestamp, 2);
240    }
241
242    #[test]
243    fn rejects_invalid_ohlc() {
244        let mut tmp = tempfile::NamedTempFile::new().unwrap();
245        writeln!(tmp, "timestamp,open,high,low,close,volume").unwrap();
246        // high < low → core validation rejects it.
247        writeln!(tmp, "1,10.0,8.0,9.0,9.5,100").unwrap();
248        tmp.flush().unwrap();
249
250        let mut r = CandleReader::open(tmp.path()).unwrap();
251        let candles: Result<Vec<Candle>> = r.candles().collect();
252        assert!(candles.is_err());
253    }
254
255    #[test]
256    fn from_reader_works_on_in_memory_data() {
257        let data = "timestamp,open,high,low,close,volume\n1,1,2,0,1,10\n2,1,2,0,1,10\n";
258        let mut r = CandleReader::from_reader(data.as_bytes()).unwrap();
259        let v = r.read_all().unwrap();
260        assert_eq!(v.len(), 2);
261    }
262
263    #[test]
264    fn rejects_file_without_header() {
265        // No header row — the first line is data. Without validation the
266        // reader would silently swallow it as the header.
267        let data = "1,10.0,11.0,9.0,10.5,100\n2,10.5,11.5,10.0,11.0,150\n";
268        let err = CandleReader::from_reader(data.as_bytes()).unwrap_err();
269        assert!(matches!(err, Error::Malformed(_)));
270    }
271
272    #[test]
273    fn rejects_header_missing_a_column() {
274        // "volume" is absent.
275        let data = "timestamp,open,high,low,close\n1,10.0,11.0,9.0,10.5\n";
276        let err = CandleReader::from_reader(data.as_bytes()).unwrap_err();
277        // The error variant must be Malformed and the message must mention
278        // the missing column. Asserting directly (rather than match-and-
279        // panic-on-other) keeps the assertion's cold path branch-free for
280        // coverage and still pins the diagnostic.
281        assert!(
282            matches!(&err, Error::Malformed(msg) if msg.contains("volume")),
283            "expected Malformed mentioning 'volume', got {err:?}"
284        );
285    }
286
287    /// Cover `from_csv_reader` (lines 201-204): existing tests use
288    /// `from_reader` / `open`, which both construct the inner `csv::Reader`
289    /// internally. Callers that want non-default csv configuration must
290    /// build the reader themselves and pass it through `from_csv_reader`.
291    #[test]
292    fn from_csv_reader_accepts_a_prebuilt_reader() {
293        let data = "timestamp;open;high;low;close;volume\n1;10.0;11.0;9.0;10.5;100\n";
294        let inner = csv::ReaderBuilder::new()
295            .delimiter(b';')
296            .from_reader(data.as_bytes());
297        let mut r = CandleReader::from_csv_reader(inner).unwrap();
298        let candles = r.read_all().unwrap();
299        assert_eq!(candles.len(), 1);
300        assert_eq!(candles[0].close, 10.5);
301    }
302
303    #[test]
304    fn strips_leading_utf8_bom() {
305        // A BOM (\u{feff}) prefixes the header — Excel exports look like this.
306        let data = "\u{feff}timestamp,open,high,low,close,volume\n1,10.0,11.0,9.0,10.5,100\n";
307        let mut r = CandleReader::from_reader(data.as_bytes()).unwrap();
308        let v = r.read_all().unwrap();
309        assert_eq!(v.len(), 1);
310        assert_eq!(v[0].timestamp, 1);
311        assert_eq!(v[0].open, 10.0);
312    }
313
314    #[test]
315    fn tolerates_whitespace_around_fields() {
316        let data = " timestamp , open , high , low , close , volume \n\
317                     1 , 10.0 , 11.0 , 9.0 , 10.5 , 100 \n";
318        let mut r = CandleReader::from_reader(data.as_bytes()).unwrap();
319        let v = r.read_all().unwrap();
320        assert_eq!(v.len(), 1);
321        assert_eq!(v[0].close, 10.5);
322        assert_eq!(v[0].volume, 100.0);
323    }
324
325    #[test]
326    fn bom_stripper_passes_through_non_bom_input() {
327        use std::io::Read;
328        let mut out = String::new();
329        BomStripReader::new("hello".as_bytes())
330            .read_to_string(&mut out)
331            .unwrap();
332        assert_eq!(out, "hello");
333    }
334
335    #[test]
336    fn bom_stripper_handles_short_input() {
337        use std::io::Read;
338        let mut out = Vec::new();
339        // Two bytes — shorter than a 3-byte BOM.
340        BomStripReader::new([0x41u8, 0x42u8].as_slice())
341            .read_to_end(&mut out)
342            .unwrap();
343        assert_eq!(out, vec![0x41, 0x42]);
344    }
345}