1use std::collections::{BTreeMap, HashSet};
2use std::io::BufRead;
3use std::path::Path;
4
5use polars::prelude::{DataFrame, NamedFrom, Series};
6use serde_json::Value;
7
8use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
9use crate::{config, FloeResult};
10
11struct JsonInputAdapter;
12
13static JSON_INPUT_ADAPTER: JsonInputAdapter = JsonInputAdapter;
14
15pub(crate) fn json_input_adapter() -> &'static dyn InputAdapter {
16 &JSON_INPUT_ADAPTER
17}
18
19#[derive(Debug, Clone)]
20pub struct JsonReadError {
21 pub rule: String,
22 pub message: String,
23}
24
25impl std::fmt::Display for JsonReadError {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 write!(f, "{}: {}", self.rule, self.message)
28 }
29}
30
31impl std::error::Error for JsonReadError {}
32
33fn read_ndjson_file(input_path: &Path) -> Result<DataFrame, JsonReadError> {
34 let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
35 rule: "json_parse_error".to_string(),
36 message: format!("failed to read json at {}: {err}", input_path.display()),
37 })?;
38
39 let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::new();
40 let mut keys = HashSet::new();
41 for (idx, line) in content.lines().enumerate() {
42 let line = line.trim();
43 if line.is_empty() {
44 continue;
45 }
46 let value: Value = serde_json::from_str(line).map_err(|err| JsonReadError {
47 rule: "json_parse_error".to_string(),
48 message: format!("json parse error at line {}: {err}", idx + 1),
49 })?;
50 let object = value.as_object().ok_or_else(|| JsonReadError {
51 rule: "json_parse_error".to_string(),
52 message: format!("expected json object at line {}", idx + 1),
53 })?;
54
55 let mut row = BTreeMap::new();
56 for (key, value) in object {
57 if value.is_object() || value.is_array() {
58 return Err(JsonReadError {
59 rule: "json_unsupported_value".to_string(),
60 message: format!(
61 "nested json values are not supported (line {}, key {})",
62 idx + 1,
63 key
64 ),
65 });
66 }
67 let cell = match value {
68 Value::Null => None,
69 Value::String(value) => Some(value.clone()),
70 Value::Bool(value) => Some(value.to_string()),
71 Value::Number(value) => Some(value.to_string()),
72 _ => None,
73 };
74 row.insert(key.clone(), cell);
75 keys.insert(key.clone());
76 }
77 rows.push(row);
78 }
79
80 let mut columns = keys.into_iter().collect::<Vec<String>>();
81 columns.sort();
82
83 build_dataframe(&columns, &rows)
84}
85
86fn read_ndjson_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
87 let file = std::fs::File::open(input_path).map_err(|err| JsonReadError {
88 rule: "json_parse_error".to_string(),
89 message: format!("failed to read json at {}: {err}", input_path.display()),
90 })?;
91 let reader = std::io::BufReader::new(file);
92 let mut first_error: Option<JsonReadError> = None;
93
94 for (idx, line) in reader.lines().enumerate() {
95 let line = line.map_err(|err| JsonReadError {
96 rule: "json_parse_error".to_string(),
97 message: format!("failed to read json at {}: {err}", input_path.display()),
98 })?;
99 let line = line.trim();
100 if line.is_empty() {
101 continue;
102 }
103 match serde_json::from_str::<Value>(line) {
104 Ok(value) => {
105 let object = value.as_object().ok_or_else(|| JsonReadError {
106 rule: "json_parse_error".to_string(),
107 message: format!("expected json object at line {}", idx + 1),
108 })?;
109 let mut keys = Vec::with_capacity(object.len());
110 for (key, value) in object {
111 if value.is_object() || value.is_array() {
112 return Err(JsonReadError {
113 rule: "json_unsupported_value".to_string(),
114 message: format!(
115 "nested json values are not supported (line {}, key {})",
116 idx + 1,
117 key
118 ),
119 });
120 }
121 keys.push(key.clone());
122 }
123 keys.sort();
124 return Ok(keys);
125 }
126 Err(err) => {
127 if first_error.is_none() {
128 first_error = Some(JsonReadError {
129 rule: "json_parse_error".to_string(),
130 message: format!("json parse error at line {}: {err}", idx + 1),
131 });
132 }
133 continue;
134 }
135 }
136 }
137
138 Err(first_error.unwrap_or_else(|| JsonReadError {
139 rule: "json_parse_error".to_string(),
140 message: format!("no json objects found in {}", input_path.display()),
141 }))
142}
143
144fn read_json_array_file(input_path: &Path) -> Result<DataFrame, JsonReadError> {
145 let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
146 rule: "json_parse_error".to_string(),
147 message: format!("failed to read json at {}: {err}", input_path.display()),
148 })?;
149
150 let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
151 rule: "json_parse_error".to_string(),
152 message: format!("json parse error: {err}"),
153 })?;
154 let array = value.as_array().ok_or_else(|| JsonReadError {
155 rule: "json_parse_error".to_string(),
156 message: "expected json array at root".to_string(),
157 })?;
158
159 let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::with_capacity(array.len());
160 let mut columns = Vec::new();
161
162 for (idx, value) in array.iter().enumerate() {
163 let object = value.as_object().ok_or_else(|| JsonReadError {
164 rule: "json_parse_error".to_string(),
165 message: format!("expected json object at index {}", idx),
166 })?;
167
168 let mut row = BTreeMap::new();
169 if columns.is_empty() {
170 for key in object.keys() {
171 columns.push(key.clone());
172 }
173 }
174
175 for (key, value) in object {
176 if value.is_object() || value.is_array() {
177 return Err(JsonReadError {
178 rule: "json_unsupported_value".to_string(),
179 message: format!(
180 "nested json values are not supported (index {}, key {})",
181 idx, key
182 ),
183 });
184 }
185 let cell = match value {
186 Value::Null => None,
187 Value::String(value) => Some(value.clone()),
188 Value::Bool(value) => Some(value.to_string()),
189 Value::Number(value) => Some(value.to_string()),
190 _ => None,
191 };
192 row.insert(key.clone(), cell);
193 }
194 rows.push(row);
195 }
196
197 build_dataframe(&columns, &rows)
198}
199
200fn build_dataframe(
201 columns: &[String],
202 rows: &[BTreeMap<String, Option<String>>],
203) -> Result<DataFrame, JsonReadError> {
204 let mut series = Vec::with_capacity(columns.len());
205 for name in columns {
206 let mut values = Vec::with_capacity(rows.len());
207 for row in rows {
208 values.push(row.get(name).cloned().unwrap_or(None));
209 }
210 series.push(Series::new(name.as_str().into(), values).into());
211 }
212
213 DataFrame::new(series).map_err(|err| JsonReadError {
214 rule: "json_parse_error".to_string(),
215 message: format!("failed to build dataframe: {err}"),
216 })
217}
218
219fn read_json_array_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
220 let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
221 rule: "json_parse_error".to_string(),
222 message: format!("failed to read json at {}: {err}", input_path.display()),
223 })?;
224 let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
225 rule: "json_parse_error".to_string(),
226 message: format!("json parse error: {err}"),
227 })?;
228 let array = value.as_array().ok_or_else(|| JsonReadError {
229 rule: "json_parse_error".to_string(),
230 message: "expected json array at root".to_string(),
231 })?;
232 if array.is_empty() {
233 return Err(JsonReadError {
234 rule: "json_parse_error".to_string(),
235 message: "json array is empty".to_string(),
236 });
237 }
238 let object = array[0].as_object().ok_or_else(|| JsonReadError {
239 rule: "json_parse_error".to_string(),
240 message: "expected json object at index 0".to_string(),
241 })?;
242 let mut keys = Vec::with_capacity(object.len());
243 for (key, value) in object {
244 if value.is_object() || value.is_array() {
245 return Err(JsonReadError {
246 rule: "json_unsupported_value".to_string(),
247 message: format!(
248 "nested json values are not supported (index 0, key {})",
249 key
250 ),
251 });
252 }
253 keys.push(key.clone());
254 }
255 keys.sort();
256 Ok(keys)
257}
258
259impl InputAdapter for JsonInputAdapter {
260 fn format(&self) -> &'static str {
261 "json"
262 }
263
264 fn read_input_columns(
265 &self,
266 entity: &config::EntityConfig,
267 input_file: &InputFile,
268 _columns: &[config::ColumnConfig],
269 ) -> Result<Vec<String>, FileReadError> {
270 let json_mode = entity
271 .source
272 .options
273 .as_ref()
274 .and_then(|options| options.json_mode.as_deref())
275 .unwrap_or("array");
276 let path = &input_file.source_local_path;
277 let read_result = match json_mode {
278 "ndjson" => read_ndjson_columns(path),
279 "array" => read_json_array_columns(path),
280 other => Err(JsonReadError {
281 rule: "json_parse_error".to_string(),
282 message: format!(
283 "unsupported source.options.json_mode={} (allowed: array, ndjson)",
284 other
285 ),
286 }),
287 };
288 read_result.map_err(|err| FileReadError {
289 rule: err.rule,
290 message: err.message,
291 })
292 }
293
294 fn read_inputs(
295 &self,
296 entity: &config::EntityConfig,
297 files: &[InputFile],
298 columns: &[config::ColumnConfig],
299 normalize_strategy: Option<&str>,
300 collect_raw: bool,
301 ) -> FloeResult<Vec<ReadInput>> {
302 let mut inputs = Vec::with_capacity(files.len());
303 let json_mode = entity
304 .source
305 .options
306 .as_ref()
307 .and_then(|options| options.json_mode.as_deref())
308 .unwrap_or("array");
309 for input_file in files {
310 let path = &input_file.source_local_path;
311 let read_result = match json_mode {
312 "ndjson" => read_ndjson_file(path),
313 "array" => read_json_array_file(path),
314 other => Err(JsonReadError {
315 rule: "json_parse_error".to_string(),
316 message: format!(
317 "unsupported source.options.json_mode={} (allowed: array, ndjson)",
318 other
319 ),
320 }),
321 };
322 match read_result {
323 Ok(df) => {
324 let input = format::read_input_from_df(
325 input_file,
326 &df,
327 columns,
328 normalize_strategy,
329 collect_raw,
330 )?;
331 inputs.push(input);
332 }
333 Err(err) => {
334 inputs.push(ReadInput::FileError {
335 input_file: input_file.clone(),
336 error: FileReadError {
337 rule: err.rule,
338 message: err.message,
339 },
340 });
341 }
342 }
343 }
344 Ok(inputs)
345 }
346}