Skip to main content

brainwires_datasets/jsonl/
reader.rs

1use std::io::{BufRead, BufReader, Read};
2use std::path::Path;
3
4use crate::error::{DatasetError, DatasetResult};
5use crate::types::{PreferencePair, TrainingExample};
6
7/// Streaming JSONL reader — memory-efficient, reads one line at a time.
8pub struct JsonlReader<R: Read> {
9    reader: BufReader<R>,
10    line_number: usize,
11}
12
13impl JsonlReader<std::fs::File> {
14    /// Open a JSONL file for reading.
15    pub fn open(path: impl AsRef<Path>) -> DatasetResult<Self> {
16        let file = std::fs::File::open(path.as_ref())?;
17        Ok(Self::new(file))
18    }
19}
20
21impl<R: Read> JsonlReader<R> {
22    /// Create a new JSONL reader wrapping the given reader.
23    pub fn new(reader: R) -> Self {
24        Self {
25            reader: BufReader::new(reader),
26            line_number: 0,
27        }
28    }
29
30    /// Read the next example from the JSONL stream.
31    pub fn next_example(&mut self) -> DatasetResult<Option<TrainingExample>> {
32        let mut line = String::new();
33        loop {
34            line.clear();
35            let bytes_read = self.reader.read_line(&mut line)?;
36            self.line_number += 1;
37
38            if bytes_read == 0 {
39                return Ok(None);
40            }
41
42            let trimmed = line.trim();
43            if trimmed.is_empty() {
44                continue;
45            }
46
47            let example: TrainingExample =
48                serde_json::from_str(trimmed).map_err(|e| DatasetError::Validation {
49                    message: format!("line {}: {}", self.line_number, e),
50                })?;
51            return Ok(Some(example));
52        }
53    }
54
55    /// Read all examples into a Vec.
56    pub fn read_all(&mut self) -> DatasetResult<Vec<TrainingExample>> {
57        let mut examples = Vec::new();
58        while let Some(example) = self.next_example()? {
59            examples.push(example);
60        }
61        tracing::debug!("Read {} examples from JSONL", examples.len());
62        Ok(examples)
63    }
64
65    /// Read the next preference pair from the JSONL stream.
66    pub fn next_preference(&mut self) -> DatasetResult<Option<PreferencePair>> {
67        let mut line = String::new();
68        loop {
69            line.clear();
70            let bytes_read = self.reader.read_line(&mut line)?;
71            self.line_number += 1;
72
73            if bytes_read == 0 {
74                return Ok(None);
75            }
76
77            let trimmed = line.trim();
78            if trimmed.is_empty() {
79                continue;
80            }
81
82            let pair: PreferencePair =
83                serde_json::from_str(trimmed).map_err(|e| DatasetError::Validation {
84                    message: format!("line {}: {}", self.line_number, e),
85                })?;
86            return Ok(Some(pair));
87        }
88    }
89
90    /// Read all preference pairs into a Vec.
91    pub fn read_all_preferences(&mut self) -> DatasetResult<Vec<PreferencePair>> {
92        let mut pairs = Vec::new();
93        while let Some(pair) = self.next_preference()? {
94            pairs.push(pair);
95        }
96        tracing::debug!("Read {} preference pairs from JSONL", pairs.len());
97        Ok(pairs)
98    }
99
100    /// Current line number (1-based).
101    pub fn line_number(&self) -> usize {
102        self.line_number
103    }
104}
105
106/// Convenience: read all examples from a JSONL file path.
107pub fn read_jsonl(path: impl AsRef<Path>) -> DatasetResult<Vec<TrainingExample>> {
108    let mut reader = JsonlReader::open(path)?;
109    reader.read_all()
110}
111
112/// Convenience: read all preference pairs from a JSONL file path.
113pub fn read_jsonl_preferences(path: impl AsRef<Path>) -> DatasetResult<Vec<PreferencePair>> {
114    let mut reader = JsonlReader::open(path)?;
115    reader.read_all_preferences()
116}
117
118/// Iterator adapter over JsonlReader.
119impl<R: Read> Iterator for JsonlReader<R> {
120    type Item = DatasetResult<TrainingExample>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        match self.next_example() {
124            Ok(Some(example)) => Some(Ok(example)),
125            Ok(None) => None,
126            Err(e) => Some(Err(e)),
127        }
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use std::io::Cursor;
135
136    fn sample_jsonl() -> &'static str {
137        r#"{"messages":[{"role":"user","content":"Hello"},{"role":"assistant","content":"Hi!"}]}
138{"messages":[{"role":"system","content":"Be helpful"},{"role":"user","content":"Q"},{"role":"assistant","content":"A"}]}
139"#
140    }
141
142    #[test]
143    fn test_read_jsonl_from_cursor() {
144        let cursor = Cursor::new(sample_jsonl());
145        let mut reader = JsonlReader::new(cursor);
146        let examples = reader.read_all().unwrap();
147        assert_eq!(examples.len(), 2);
148        assert_eq!(examples[0].messages.len(), 2);
149        assert_eq!(examples[1].messages.len(), 3);
150    }
151
152    #[test]
153    fn test_reader_iterator() {
154        let cursor = Cursor::new(sample_jsonl());
155        let reader = JsonlReader::new(cursor);
156        let examples: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
157        assert_eq!(examples.len(), 2);
158    }
159
160    #[test]
161    fn test_reader_skips_blank_lines() {
162        let data = r#"{"messages":[{"role":"user","content":"A"},{"role":"assistant","content":"B"}]}
163
164{"messages":[{"role":"user","content":"C"},{"role":"assistant","content":"D"}]}
165"#;
166        let cursor = Cursor::new(data);
167        let mut reader = JsonlReader::new(cursor);
168        let examples = reader.read_all().unwrap();
169        assert_eq!(examples.len(), 2);
170    }
171
172    #[test]
173    fn test_reader_error_on_invalid_json() {
174        let data = "not valid json\n";
175        let cursor = Cursor::new(data);
176        let mut reader = JsonlReader::new(cursor);
177        let result = reader.next_example();
178        assert!(result.is_err());
179    }
180
181    #[test]
182    fn test_read_preference_pairs() {
183        let data = r#"{"prompt":[{"role":"user","content":"Q1"}],"chosen":[{"role":"assistant","content":"Good"}],"rejected":[{"role":"assistant","content":"Bad"}]}
184{"prompt":[{"role":"user","content":"Q2"}],"chosen":[{"role":"assistant","content":"Yes"}],"rejected":[{"role":"assistant","content":"No"}]}
185"#;
186        let cursor = Cursor::new(data);
187        let mut reader = JsonlReader::new(cursor);
188        let pairs = reader.read_all_preferences().unwrap();
189        assert_eq!(pairs.len(), 2);
190        assert_eq!(pairs[0].prompt[0].content, "Q1");
191    }
192}