brainwires_datasets/jsonl/
reader.rs1use std::io::{BufRead, BufReader, Read};
2use std::path::Path;
3
4use crate::error::{DatasetError, DatasetResult};
5use crate::types::{PreferencePair, TrainingExample};
6
7pub struct JsonlReader<R: Read> {
9 reader: BufReader<R>,
10 line_number: usize,
11}
12
13impl JsonlReader<std::fs::File> {
14 pub fn open(path: impl AsRef<Path>) -> DatasetResult<Self> {
16 let file = std::fs::File::open(path.as_ref())?;
17 Ok(Self::new(file))
18 }
19}
20
21impl<R: Read> JsonlReader<R> {
22 pub fn new(reader: R) -> Self {
24 Self {
25 reader: BufReader::new(reader),
26 line_number: 0,
27 }
28 }
29
30 pub fn next_example(&mut self) -> DatasetResult<Option<TrainingExample>> {
32 let mut line = String::new();
33 loop {
34 line.clear();
35 let bytes_read = self.reader.read_line(&mut line)?;
36 self.line_number += 1;
37
38 if bytes_read == 0 {
39 return Ok(None);
40 }
41
42 let trimmed = line.trim();
43 if trimmed.is_empty() {
44 continue;
45 }
46
47 let example: TrainingExample =
48 serde_json::from_str(trimmed).map_err(|e| DatasetError::Validation {
49 message: format!("line {}: {}", self.line_number, e),
50 })?;
51 return Ok(Some(example));
52 }
53 }
54
55 pub fn read_all(&mut self) -> DatasetResult<Vec<TrainingExample>> {
57 let mut examples = Vec::new();
58 while let Some(example) = self.next_example()? {
59 examples.push(example);
60 }
61 tracing::debug!("Read {} examples from JSONL", examples.len());
62 Ok(examples)
63 }
64
65 pub fn next_preference(&mut self) -> DatasetResult<Option<PreferencePair>> {
67 let mut line = String::new();
68 loop {
69 line.clear();
70 let bytes_read = self.reader.read_line(&mut line)?;
71 self.line_number += 1;
72
73 if bytes_read == 0 {
74 return Ok(None);
75 }
76
77 let trimmed = line.trim();
78 if trimmed.is_empty() {
79 continue;
80 }
81
82 let pair: PreferencePair =
83 serde_json::from_str(trimmed).map_err(|e| DatasetError::Validation {
84 message: format!("line {}: {}", self.line_number, e),
85 })?;
86 return Ok(Some(pair));
87 }
88 }
89
90 pub fn read_all_preferences(&mut self) -> DatasetResult<Vec<PreferencePair>> {
92 let mut pairs = Vec::new();
93 while let Some(pair) = self.next_preference()? {
94 pairs.push(pair);
95 }
96 tracing::debug!("Read {} preference pairs from JSONL", pairs.len());
97 Ok(pairs)
98 }
99
100 pub fn line_number(&self) -> usize {
102 self.line_number
103 }
104}
105
106pub fn read_jsonl(path: impl AsRef<Path>) -> DatasetResult<Vec<TrainingExample>> {
108 let mut reader = JsonlReader::open(path)?;
109 reader.read_all()
110}
111
112pub fn read_jsonl_preferences(path: impl AsRef<Path>) -> DatasetResult<Vec<PreferencePair>> {
114 let mut reader = JsonlReader::open(path)?;
115 reader.read_all_preferences()
116}
117
118impl<R: Read> Iterator for JsonlReader<R> {
120 type Item = DatasetResult<TrainingExample>;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 match self.next_example() {
124 Ok(Some(example)) => Some(Ok(example)),
125 Ok(None) => None,
126 Err(e) => Some(Err(e)),
127 }
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use std::io::Cursor;
135
136 fn sample_jsonl() -> &'static str {
137 r#"{"messages":[{"role":"user","content":"Hello"},{"role":"assistant","content":"Hi!"}]}
138{"messages":[{"role":"system","content":"Be helpful"},{"role":"user","content":"Q"},{"role":"assistant","content":"A"}]}
139"#
140 }
141
142 #[test]
143 fn test_read_jsonl_from_cursor() {
144 let cursor = Cursor::new(sample_jsonl());
145 let mut reader = JsonlReader::new(cursor);
146 let examples = reader.read_all().unwrap();
147 assert_eq!(examples.len(), 2);
148 assert_eq!(examples[0].messages.len(), 2);
149 assert_eq!(examples[1].messages.len(), 3);
150 }
151
152 #[test]
153 fn test_reader_iterator() {
154 let cursor = Cursor::new(sample_jsonl());
155 let reader = JsonlReader::new(cursor);
156 let examples: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
157 assert_eq!(examples.len(), 2);
158 }
159
160 #[test]
161 fn test_reader_skips_blank_lines() {
162 let data = r#"{"messages":[{"role":"user","content":"A"},{"role":"assistant","content":"B"}]}
163
164{"messages":[{"role":"user","content":"C"},{"role":"assistant","content":"D"}]}
165"#;
166 let cursor = Cursor::new(data);
167 let mut reader = JsonlReader::new(cursor);
168 let examples = reader.read_all().unwrap();
169 assert_eq!(examples.len(), 2);
170 }
171
172 #[test]
173 fn test_reader_error_on_invalid_json() {
174 let data = "not valid json\n";
175 let cursor = Cursor::new(data);
176 let mut reader = JsonlReader::new(cursor);
177 let result = reader.next_example();
178 assert!(result.is_err());
179 }
180
181 #[test]
182 fn test_read_preference_pairs() {
183 let data = r#"{"prompt":[{"role":"user","content":"Q1"}],"chosen":[{"role":"assistant","content":"Good"}],"rejected":[{"role":"assistant","content":"Bad"}]}
184{"prompt":[{"role":"user","content":"Q2"}],"chosen":[{"role":"assistant","content":"Yes"}],"rejected":[{"role":"assistant","content":"No"}]}
185"#;
186 let cursor = Cursor::new(data);
187 let mut reader = JsonlReader::new(cursor);
188 let pairs = reader.read_all_preferences().unwrap();
189 assert_eq!(pairs.len(), 2);
190 assert_eq!(pairs[0].prompt[0].content, "Q1");
191 }
192}