Skip to main content

stryke/
native_data.rs

1//! Native CSV (`csv` crate), SQLite (`rusqlite`), and HTTP JSON (`ureq` + `serde_json`) helpers.
2
3use std::io::Read;
4use std::sync::Arc;
5use std::time::Duration;
6
7use indexmap::IndexMap;
8use jaq_core::data::JustLut;
9use num_traits::cast::ToPrimitive;
10use parking_lot::{Mutex, RwLock};
11use rayon::prelude::*;
12use rusqlite::{types::Value, Connection};
13use serde_json::Value as JsonValue;
14
15use crate::ast::StructDef;
16use crate::error::{PerlError, PerlResult};
17use crate::value::{HeapObject, PerlDataFrame, PerlValue, StructInstance};
18
19/// Parallel row→hashref conversion after a sequential CSV parse (good CPU parallelism on wide files).
20pub(crate) fn par_csv_read(path: &str) -> PerlResult<PerlValue> {
21    let mut rdr = csv::Reader::from_path(path)
22        .map_err(|e| PerlError::runtime(format!("par_csv_read: {}: {}", path, e), 0))?;
23    let headers: Vec<String> = rdr
24        .headers()
25        .map_err(|e| PerlError::runtime(format!("par_csv_read: {}: {}", path, e), 0))?
26        .iter()
27        .map(|s| s.to_string())
28        .collect();
29    let mut raw_rows: Vec<csv::StringRecord> = Vec::new();
30    for rec in rdr.records() {
31        raw_rows.push(rec.map_err(|e| PerlError::runtime(format!("par_csv_read: {}", e), 0))?);
32    }
33    let rows: Vec<PerlValue> = raw_rows
34        .into_par_iter()
35        .map(|record| {
36            let mut map = IndexMap::new();
37            for (i, h) in headers.iter().enumerate() {
38                let cell = record.get(i).unwrap_or("");
39                map.insert(h.clone(), PerlValue::string(cell.to_string()));
40            }
41            PerlValue::hash_ref(Arc::new(RwLock::new(map)))
42        })
43        .collect();
44    Ok(PerlValue::array(rows))
45}
46
47/// Columnar dataframe from a CSV path (header row + string cells; use `sum` etc. with numeric strings).
48pub(crate) fn dataframe_from_elements(val: &PerlValue) -> PerlResult<PerlValue> {
49    let rows = val.map_flatten_outputs(true);
50    if rows.is_empty() {
51        return Ok(PerlValue::dataframe(Arc::new(Mutex::new(PerlDataFrame {
52            columns: vec![],
53            cols: vec![],
54            group_by: None,
55        }))));
56    }
57
58    // Detect format: list of hashrefs or list of arrayrefs
59    let first_row = &rows[0];
60    if let Some(first_row_map) = first_row.as_hash_ref() {
61        // List of hashrefs: use keys of the first row as columns
62        let columns: Vec<String> = first_row_map.read().keys().cloned().collect();
63        let mut cols: Vec<Vec<PerlValue>> = (0..columns.len()).map(|_| Vec::new()).collect();
64        for row_val in rows {
65            if let Some(row_lock) = row_val.as_hash_ref() {
66                let row_map = row_lock.read();
67                for (i, col_name) in columns.iter().enumerate() {
68                    cols[i].push(row_map.get(col_name).cloned().unwrap_or(PerlValue::UNDEF));
69                }
70            }
71        }
72        return Ok(PerlValue::dataframe(Arc::new(Mutex::new(PerlDataFrame {
73            columns,
74            cols,
75            group_by: None,
76        }))));
77    } else if let Some(first_row_lock) = first_row.as_array_ref() {
78        // List of arrayrefs: first row is headers
79        let first_row_arr = first_row_lock.read();
80        let columns: Vec<String> = first_row_arr.iter().map(|v| v.to_string()).collect();
81        let mut cols: Vec<Vec<PerlValue>> = (0..columns.len()).map(|_| Vec::new()).collect();
82        for row_val in rows.iter().skip(1) {
83            if let Some(row_lock) = row_val.as_array_ref() {
84                let row_arr = row_lock.read();
85                for (i, col) in cols.iter_mut().enumerate().take(columns.len()) {
86                    col.push(row_arr.get(i).cloned().unwrap_or(PerlValue::UNDEF));
87                }
88            }
89        }
90        return Ok(PerlValue::dataframe(Arc::new(Mutex::new(PerlDataFrame {
91            columns,
92            cols,
93            group_by: None,
94        }))));
95    }
96
97    Err(PerlError::runtime(
98        "dataframe expects a file path or a list of hashrefs/arrayrefs",
99        0,
100    ))
101}
102
103pub(crate) fn dataframe_from_path(path: &str) -> PerlResult<PerlValue> {
104    let mut rdr = csv::Reader::from_path(path)
105        .map_err(|e| PerlError::runtime(format!("dataframe: {}: {}", path, e), 0))?;
106    let headers: Vec<String> = rdr
107        .headers()
108        .map_err(|e| PerlError::runtime(format!("dataframe: {}: {}", path, e), 0))?
109        .iter()
110        .map(|s| s.to_string())
111        .collect();
112    let ncols = headers.len();
113    let mut cols: Vec<Vec<PerlValue>> = (0..ncols).map(|_| Vec::new()).collect();
114    for rec in rdr.records() {
115        let record = rec.map_err(|e| PerlError::runtime(format!("dataframe: {}", e), 0))?;
116        for (i, col) in cols.iter_mut().enumerate().take(ncols) {
117            let cell = record.get(i).unwrap_or("");
118            col.push(PerlValue::string(cell.to_string()));
119        }
120    }
121    let df = PerlDataFrame {
122        columns: headers,
123        cols,
124        group_by: None,
125    };
126    Ok(PerlValue::dataframe(Arc::new(Mutex::new(df))))
127}
128
129pub(crate) fn csv_read(path: &str) -> PerlResult<PerlValue> {
130    let mut rdr = csv::Reader::from_path(path)
131        .map_err(|e| PerlError::runtime(format!("csv_read: {}: {}", path, e), 0))?;
132    let headers: Vec<String> = rdr
133        .headers()
134        .map_err(|e| PerlError::runtime(format!("csv_read: {}: {}", path, e), 0))?
135        .iter()
136        .map(|s| s.to_string())
137        .collect();
138    let mut rows = Vec::new();
139    for rec in rdr.records() {
140        let record = rec.map_err(|e| PerlError::runtime(format!("csv_read: {}", e), 0))?;
141        let mut map = IndexMap::new();
142        for (i, h) in headers.iter().enumerate() {
143            let cell = record.get(i).unwrap_or("");
144            map.insert(h.clone(), PerlValue::string(cell.to_string()));
145        }
146        rows.push(PerlValue::hash_ref(Arc::new(RwLock::new(map))));
147    }
148    Ok(PerlValue::array(rows))
149}
150
151/// Writes rows as CSV. Each row is a hash or hashref; header row is the union of keys
152/// (first-seen order, then keys from later rows in order).
153pub(crate) fn csv_write(path: &str, rows: &[PerlValue]) -> PerlResult<PerlValue> {
154    let mut header: Vec<String> = Vec::new();
155    let mut seen = std::collections::HashSet::<String>::new();
156    let mut normalized: Vec<IndexMap<String, PerlValue>> = Vec::new();
157
158    for row in rows {
159        let map = hash_like(row)?;
160        for k in map.keys() {
161            if seen.insert(k.clone()) {
162                header.push(k.clone());
163            }
164        }
165        normalized.push(map);
166    }
167
168    let mut wtr = csv::Writer::from_path(path)
169        .map_err(|e| PerlError::runtime(format!("csv_write: {}: {}", path, e), 0))?;
170    wtr.write_record(&header)
171        .map_err(|e| PerlError::runtime(format!("csv_write: {}", e), 0))?;
172    for map in &normalized {
173        let record: Vec<String> = header
174            .iter()
175            .map(|k| map.get(k).map(|v| v.to_string()).unwrap_or_default())
176            .collect();
177        wtr.write_record(&record)
178            .map_err(|e| PerlError::runtime(format!("csv_write: {}", e), 0))?;
179    }
180    wtr.flush()
181        .map_err(|e| PerlError::runtime(format!("csv_write: {}", e), 0))?;
182    Ok(PerlValue::integer(normalized.len() as i64))
183}
184
185fn hash_like(v: &PerlValue) -> PerlResult<IndexMap<String, PerlValue>> {
186    if let Some(h) = v.as_hash_map() {
187        return Ok(h);
188    }
189    if let Some(r) = v.as_hash_ref() {
190        return Ok(r.read().clone());
191    }
192    if let Some(b) = v.as_blessed_ref() {
193        let d = b.data.read();
194        if let Some(h) = d.as_hash_map() {
195            return Ok(h);
196        }
197    }
198    Err(PerlError::runtime(
199        "csv_write: row must be hash or hashref",
200        0,
201    ))
202}
203
204pub(crate) fn sqlite_open(path: &str) -> PerlResult<PerlValue> {
205    let conn = Connection::open(path)
206        .map_err(|e| PerlError::runtime(format!("sqlite: {}: {}", path, e), 0))?;
207    Ok(PerlValue::sqlite_conn(Arc::new(Mutex::new(conn))))
208}
209
210pub(crate) fn sqlite_dispatch(
211    conn: &Arc<Mutex<Connection>>,
212    method: &str,
213    args: &[PerlValue],
214    line: usize,
215) -> PerlResult<PerlValue> {
216    let c = conn.lock();
217    match method {
218        "exec" => {
219            if args.is_empty() {
220                return Err(PerlError::runtime("sqlite->exec needs SQL string", line));
221            }
222            let sql = args[0].to_string();
223            let params: Vec<Value> = args[1..].iter().map(perl_to_sql_value).collect();
224            let n = exec_sql(&c, &sql, &params)?;
225            Ok(PerlValue::integer(n as i64))
226        }
227        "query" => {
228            if args.is_empty() {
229                return Err(PerlError::runtime("sqlite->query needs SQL string", line));
230            }
231            let sql = args[0].to_string();
232            let params: Vec<Value> = args[1..].iter().map(perl_to_sql_value).collect();
233            query_sql(&c, &sql, &params, line)
234        }
235        "last_insert_rowid" => {
236            if !args.is_empty() {
237                return Err(PerlError::runtime(
238                    "sqlite->last_insert_rowid takes no arguments",
239                    line,
240                ));
241            }
242            Ok(PerlValue::integer(c.last_insert_rowid()))
243        }
244        _ => Err(PerlError::runtime(
245            format!("unknown sqlite method: {}", method),
246            line,
247        )),
248    }
249}
250
251pub(crate) fn exec_sql(conn: &Connection, sql: &str, params: &[Value]) -> PerlResult<usize> {
252    conn.execute(sql, rusqlite::params_from_iter(params.iter()))
253        .map_err(|e| PerlError::runtime(format!("sqlite exec: {}", e), 0))
254}
255
256pub(crate) fn query_sql(
257    conn: &Connection,
258    sql: &str,
259    params: &[Value],
260    line: usize,
261) -> PerlResult<PerlValue> {
262    let mut stmt = conn
263        .prepare(sql)
264        .map_err(|e| PerlError::runtime(format!("sqlite query: {}", e), line))?;
265    let col_count = stmt.column_count();
266    let mut col_names = Vec::with_capacity(col_count);
267    for i in 0..col_count {
268        col_names.push(
269            stmt.column_name(i)
270                .map(|s| s.to_string())
271                .unwrap_or_else(|_| format!("col{}", i)),
272        );
273    }
274    let mut rows = stmt
275        .query(rusqlite::params_from_iter(params.iter()))
276        .map_err(|e| PerlError::runtime(format!("sqlite query: {}", e), line))?;
277    let mut rows_out = Vec::new();
278    while let Some(row) = rows
279        .next()
280        .map_err(|e| PerlError::runtime(format!("sqlite query: {}", e), line))?
281    {
282        let mut map = IndexMap::new();
283        for (i, col_name) in col_names.iter().enumerate().take(col_count) {
284            let v = row
285                .get::<_, Value>(i)
286                .map_err(|e| PerlError::runtime(format!("sqlite query: {}", e), line))?;
287            map.insert(col_name.clone(), sqlite_value_to_perl(v));
288        }
289        rows_out.push(PerlValue::hash_ref(Arc::new(RwLock::new(map))));
290    }
291    Ok(PerlValue::array(rows_out))
292}
293
294pub(crate) fn perl_to_sql_value(v: &PerlValue) -> Value {
295    if v.is_undef() {
296        return Value::Null;
297    }
298    if let Some(i) = v.as_integer() {
299        return Value::Integer(i);
300    }
301    if let Some(f) = v.as_float() {
302        return Value::Real(f);
303    }
304    if let Some(s) = v.as_str() {
305        return Value::Text(s);
306    }
307    if let Some(b) = v.as_bytes_arc() {
308        return Value::Blob((*b).clone());
309    }
310    Value::Text(v.to_string())
311}
312
313pub(crate) fn sqlite_value_to_perl(v: Value) -> PerlValue {
314    match v {
315        Value::Null => PerlValue::UNDEF,
316        Value::Integer(i) => PerlValue::integer(i),
317        Value::Real(r) => PerlValue::float(r),
318        Value::Text(s) => PerlValue::string(s),
319        Value::Blob(b) => PerlValue::bytes(Arc::new(b)),
320    }
321}
322
323/// Build a struct instance with defaults evaluated by the interpreter.
324/// Called from interpreter when constructing structs so default expressions can be evaluated.
325pub(crate) fn struct_new_with_defaults(
326    def: &Arc<StructDef>,
327    provided: &[(String, PerlValue)],
328    defaults: &[Option<PerlValue>],
329    line: usize,
330) -> PerlResult<PerlValue> {
331    let mut values = vec![PerlValue::UNDEF; def.fields.len()];
332    for (k, v) in provided {
333        let idx = def.field_index(k).ok_or_else(|| {
334            PerlError::runtime(format!("struct {}: unknown field `{}`", def.name, k), line)
335        })?;
336        let field = &def.fields[idx];
337        field.ty.check_value(v).map_err(|msg| {
338            PerlError::type_error(format!("struct {} field `{}`: {}", def.name, k, msg), line)
339        })?;
340        values[idx] = v.clone();
341    }
342    for (idx, field) in def.fields.iter().enumerate() {
343        if values[idx].is_undef() {
344            if let Some(dv) = defaults.get(idx).and_then(|o| o.as_ref()) {
345                // Skip type check if default is undef (nullable field pattern)
346                if !dv.is_undef() {
347                    field.ty.check_value(dv).map_err(|msg| {
348                        PerlError::type_error(
349                            format!(
350                                "struct {} field `{}` default: {}",
351                                def.name, field.name, msg
352                            ),
353                            line,
354                        )
355                    })?;
356                }
357                values[idx] = dv.clone();
358            } else if field.default.is_none() && !matches!(field.ty, crate::ast::PerlTypeName::Any)
359            {
360                return Err(PerlError::runtime(
361                    format!(
362                        "struct {}: missing field `{}` ({})",
363                        def.name,
364                        field.name,
365                        field.ty.display_name()
366                    ),
367                    line,
368                ));
369            }
370        }
371    }
372    Ok(PerlValue::struct_inst(Arc::new(StructInstance::new(
373        Arc::clone(def),
374        values,
375    ))))
376}
377
378/// GET `url` and return the response body as a UTF-8 string (invalid UTF-8 is lossy).
379pub(crate) fn fetch(url: &str) -> PerlResult<PerlValue> {
380    let s = http_get_body(url)?;
381    Ok(PerlValue::string(s))
382}
383
384/// GET `url`, parse JSON, map to [`PerlValue`] (objects → `HashRef`, arrays → `Array`, etc.).
385pub(crate) fn fetch_json(url: &str) -> PerlResult<PerlValue> {
386    let s = http_get_body(url)?;
387    let v: JsonValue = serde_json::from_str(&s)
388        .map_err(|e| PerlError::runtime(format!("fetch_json: {}", e), 0))?;
389    Ok(json_to_perl(v))
390}
391
392fn http_get_body(url: &str) -> PerlResult<String> {
393    ureq::get(url)
394        .call()
395        .map_err(|e| PerlError::runtime(format!("fetch: {}", e), 0))?
396        .into_string()
397        .map_err(|e| PerlError::runtime(format!("fetch: {}", e), 0))
398}
399
400fn perl_hash_lookup(v: &PerlValue, key: &str) -> Option<PerlValue> {
401    v.hash_get(key)
402        .or_else(|| v.as_hash_ref().and_then(|r| r.read().get(key).cloned()))
403}
404
405fn perl_opt_lookup(opts: Option<&PerlValue>, key: &str) -> Option<PerlValue> {
406    let o = opts?;
407    perl_hash_lookup(o, key)
408}
409
410fn perl_opt_bool(opts: Option<&PerlValue>, key: &str) -> bool {
411    perl_opt_lookup(opts, key).is_some_and(|v| v.is_true())
412}
413
414fn perl_opt_u64(opts: Option<&PerlValue>, key: &str) -> Option<u64> {
415    perl_opt_lookup(opts, key).map(|v| v.to_int().max(0) as u64)
416}
417
418fn body_bytes_from_perl(v: &PerlValue) -> Vec<u8> {
419    if let Some(b) = v.as_bytes_arc() {
420        return b.as_ref().clone();
421    }
422    v.to_string().into_bytes()
423}
424
425fn headers_map_has_content_type(headers_val: &PerlValue) -> bool {
426    if let Some(m) = headers_val.as_hash_map() {
427        return m.keys().any(|k| k.eq_ignore_ascii_case("content-type"));
428    }
429    if let Some(r) = headers_val.as_hash_ref() {
430        return r
431            .read()
432            .keys()
433            .any(|k| k.eq_ignore_ascii_case("content-type"));
434    }
435    false
436}
437
438fn apply_request_headers(
439    mut req: ureq::Request,
440    headers_val: &PerlValue,
441) -> PerlResult<ureq::Request> {
442    let pairs: Vec<(String, String)> = if let Some(m) = headers_val.as_hash_map() {
443        m.iter().map(|(k, v)| (k.clone(), v.to_string())).collect()
444    } else if let Some(r) = headers_val.as_hash_ref() {
445        r.read()
446            .iter()
447            .map(|(k, v)| (k.clone(), v.to_string()))
448            .collect()
449    } else {
450        return Err(PerlError::runtime(
451            "http_request: headers must be a hash or hashref",
452            0,
453        ));
454    };
455    for (k, v) in pairs {
456        req = req.set(&k, &v);
457    }
458    Ok(req)
459}
460
461/// Full HTTP request: `opts` hash(ref) keys: `method` (default GET), `headers`, `body`, `json`
462/// (encodes body, sets `Content-Type` unless already in `headers`), `timeout` / `timeout_secs`
463/// (omit for 30s; `0` disables client timeout), `binary_response` (body as `BYTES` instead of decoded string).
464///
465/// Returns a hashref: `status`, `status_text`, `headers` (hashref, lowercased names), `body`.
466pub(crate) fn http_request(url: &str, opts: Option<&PerlValue>) -> PerlResult<PerlValue> {
467    let method = perl_opt_lookup(opts, "method")
468        .map(|v| v.to_string())
469        .filter(|s| !s.is_empty())
470        .unwrap_or_else(|| "GET".to_string());
471    let method_uc = method.to_ascii_uppercase();
472    let timeout_secs = perl_opt_u64(opts, "timeout_secs").or_else(|| perl_opt_u64(opts, "timeout"));
473    let binary_response = perl_opt_bool(opts, "binary_response");
474
475    let mut req = ureq::request(method_uc.as_str(), url);
476    match timeout_secs {
477        None => {
478            req = req.timeout(Duration::from_secs(30));
479        }
480        Some(0) => {}
481        Some(n) => {
482            req = req.timeout(Duration::from_secs(n));
483        }
484    }
485
486    if let Some(hv) = opts.and_then(|o| perl_hash_lookup(o, "headers")) {
487        req = apply_request_headers(req, &hv)?;
488    }
489
490    let mut body: Vec<u8> = Vec::new();
491    if let Some(o) = opts {
492        if let Some(jv) = perl_hash_lookup(o, "json") {
493            let jstr = json_encode(&jv)?;
494            if let Some(hv) = perl_hash_lookup(o, "headers") {
495                if !headers_map_has_content_type(&hv) {
496                    req = req.set("Content-Type", "application/json; charset=utf-8");
497                }
498            } else {
499                req = req.set("Content-Type", "application/json; charset=utf-8");
500            }
501            body = jstr.into_bytes();
502        } else if let Some(bv) = perl_hash_lookup(o, "body") {
503            body = body_bytes_from_perl(&bv);
504        }
505    }
506
507    let resp = if body.is_empty() {
508        req.call()
509    } else {
510        req.send_bytes(&body)
511    }
512    .map_err(|e| PerlError::runtime(format!("http_request: {}", e), 0))?;
513
514    let status = resp.status();
515    let status_text = resp.status_text().to_string();
516    let mut hdr_map = IndexMap::new();
517    let mut names = resp.headers_names();
518    names.sort();
519    names.dedup();
520    for n in names {
521        let vals: Vec<&str> = resp.all(&n);
522        if !vals.is_empty() {
523            hdr_map.insert(n, PerlValue::string(vals.join(", ")));
524        }
525    }
526    let headers_ref = PerlValue::hash_ref(Arc::new(RwLock::new(hdr_map)));
527
528    let body_val = if binary_response {
529        let mut buf = Vec::new();
530        resp.into_reader()
531            .read_to_end(&mut buf)
532            .map_err(|e| PerlError::runtime(format!("http_request: body read: {}", e), 0))?;
533        PerlValue::bytes(Arc::new(buf))
534    } else {
535        let s = resp
536            .into_string()
537            .map_err(|e| PerlError::runtime(format!("http_request: body: {}", e), 0))?;
538        PerlValue::string(s)
539    };
540
541    let mut out = IndexMap::new();
542    out.insert("status".into(), PerlValue::integer(status as i64));
543    out.insert("status_text".into(), PerlValue::string(status_text));
544    out.insert("headers".into(), headers_ref);
545    out.insert("body".into(), body_val);
546    Ok(PerlValue::hash_ref(Arc::new(RwLock::new(out))))
547}
548
549/// Parse JSON from the `body` field of an [`http_request`] result hashref.
550pub(crate) fn http_response_json_body(res: &PerlValue) -> PerlResult<PerlValue> {
551    let body = perl_hash_lookup(res, "body")
552        .ok_or_else(|| PerlError::runtime("fetch_json: http response missing body", 0))?;
553    let s = if let Some(b) = body.as_bytes_arc() {
554        String::from_utf8_lossy(b.as_ref()).into_owned()
555    } else {
556        body.to_string()
557    };
558    json_decode(&s)
559}
560
561/// Serialize a [`PerlValue`] to a JSON string (arrays, hashes, refs, structs, scalars; not code/refs/IO).
562pub(crate) fn json_encode(v: &PerlValue) -> PerlResult<String> {
563    let j = perl_to_json_value(v)?;
564    serde_json::to_string(&j).map_err(|e| PerlError::runtime(format!("json_encode: {}", e), 0))
565}
566
567/// Parse a JSON string into [`PerlValue`] (same mapping as [`fetch_json`]).
568pub(crate) fn json_decode(s: &str) -> PerlResult<PerlValue> {
569    let v: JsonValue = serde_json::from_str(s.trim())
570        .map_err(|e| PerlError::runtime(format!("json_decode: {}", e), 0))?;
571    Ok(json_to_perl(v))
572}
573
574/// Run a [jq](https://jqlang.org/)-syntax filter (via [jaq](https://github.com/01mf02/jaq)) on JSON
575/// derived from `data` (same encodable shapes as [`json_encode`]).
576///
577/// Returns `undef` if the filter yields no values, a single Perl value if it yields one output,
578/// or an array of values if it yields more than one (e.g. `.items[]`).
579pub(crate) fn json_jq(data: &PerlValue, filter_src: &str) -> PerlResult<PerlValue> {
580    let j = perl_to_json_value(data)?;
581    let input: jaq_json::Val = serde_json::from_value(j)
582        .map_err(|e| PerlError::runtime(format!("json_jq: could not convert input: {}", e), 0))?;
583
584    let arena = jaq_core::load::Arena::default();
585    let defs = jaq_core::defs()
586        .chain(jaq_std::defs())
587        .chain(jaq_json::defs());
588    let loader = jaq_core::load::Loader::new(defs);
589    let file = jaq_core::load::File {
590        code: filter_src,
591        path: (),
592    };
593    let modules = loader
594        .load(&arena, file)
595        .map_err(|e| PerlError::runtime(format!("json_jq: parse/load: {:?}", e), 0))?;
596
597    type JData = JustLut<jaq_json::Val>;
598    let filter = jaq_core::Compiler::default()
599        .with_funs(
600            jaq_core::funs::<JData>()
601                .chain(jaq_std::funs::<JData>())
602                .chain(jaq_json::funs::<JData>()),
603        )
604        .compile(modules)
605        .map_err(|e| PerlError::runtime(format!("json_jq: compile: {:?}", e), 0))?;
606
607    let ctx = jaq_core::Ctx::<JData>::new(&filter.lut, jaq_core::Vars::new([]));
608    let mut results = Vec::new();
609    for x in filter.id.run((ctx, input)) {
610        match jaq_core::unwrap_valr(x) {
611            Ok(v) => results.push(jaq_json_val_to_perl(v)?),
612            Err(e) => {
613                return Err(PerlError::runtime(format!("json_jq: {}", e), 0));
614            }
615        }
616    }
617
618    match results.len() {
619        0 => Ok(PerlValue::UNDEF),
620        1 => Ok(results.pop().expect("one")),
621        _ => Ok(PerlValue::array(results)),
622    }
623}
624
625fn jaq_json_val_to_perl(v: jaq_json::Val) -> PerlResult<PerlValue> {
626    use jaq_json::Val as Jv;
627    match v {
628        Jv::Null => Ok(PerlValue::UNDEF),
629        Jv::Bool(b) => Ok(PerlValue::integer(i64::from(b))),
630        Jv::Num(n) => jaq_num_to_perl(n),
631        Jv::BStr(b) => Ok(PerlValue::string(String::from_utf8_lossy(&b).into_owned())),
632        Jv::TStr(b) => Ok(PerlValue::string(String::from_utf8_lossy(&b).into_owned())),
633        Jv::Arr(a) => {
634            let v = a.as_ref();
635            let mut out = Vec::with_capacity(v.len());
636            for x in v.iter() {
637                out.push(jaq_json_val_to_perl(x.clone())?);
638            }
639            Ok(PerlValue::array(out))
640        }
641        Jv::Obj(o) => {
642            let mut map = IndexMap::new();
643            for (k, val) in o.iter() {
644                map.insert(k.to_string(), jaq_json_val_to_perl(val.clone())?);
645            }
646            Ok(PerlValue::hash_ref(Arc::new(RwLock::new(map))))
647        }
648    }
649}
650
651fn jaq_num_to_perl(n: jaq_json::Num) -> PerlResult<PerlValue> {
652    use jaq_json::Num as Jn;
653    match n {
654        Jn::Int(i) => Ok(PerlValue::integer(i as i64)),
655        Jn::Float(f) => Ok(PerlValue::float(f)),
656        Jn::BigInt(r) => {
657            let bi = (*r).clone();
658            if let Some(i) = bi.to_i64() {
659                Ok(PerlValue::integer(i))
660            } else if let Some(f) = bi.to_f64() {
661                Ok(PerlValue::float(f))
662            } else {
663                Ok(PerlValue::string(bi.to_string()))
664            }
665        }
666        Jn::Dec(s) => {
667            let f: f64 = s.parse().unwrap_or(f64::NAN);
668            Ok(PerlValue::float(f))
669        }
670    }
671}
672
673pub(crate) fn perl_to_json_value(v: &PerlValue) -> PerlResult<JsonValue> {
674    if v.is_undef() {
675        return Ok(JsonValue::Null);
676    }
677    if let Some(n) = v.as_integer() {
678        return Ok(JsonValue::Number(n.into()));
679    }
680    if let Some(f) = v.as_float() {
681        return serde_json::Number::from_f64(f)
682            .map(JsonValue::Number)
683            .ok_or_else(|| PerlError::runtime("json_encode: non-finite float", 0));
684    }
685    if crate::nanbox::is_raw_float_bits(v.0) {
686        let f = f64::from_bits(v.0);
687        return serde_json::Number::from_f64(f)
688            .map(JsonValue::Number)
689            .ok_or_else(|| PerlError::runtime("json_encode: non-finite float", 0));
690    }
691    if let Some(a) = v.as_array_vec() {
692        let mut out = Vec::with_capacity(a.len());
693        for x in &a {
694            out.push(perl_to_json_value(x)?);
695        }
696        return Ok(JsonValue::Array(out));
697    }
698    if let Some(h) = v.as_hash_map() {
699        let mut m = serde_json::Map::new();
700        for (k, val) in h.iter() {
701            m.insert(k.clone(), perl_to_json_value(val)?);
702        }
703        return Ok(JsonValue::Object(m));
704    }
705    if let Some(r) = v.as_array_ref() {
706        let g = r.read();
707        let mut out = Vec::with_capacity(g.len());
708        for x in g.iter() {
709            out.push(perl_to_json_value(x)?);
710        }
711        return Ok(JsonValue::Array(out));
712    }
713    if let Some(r) = v.as_hash_ref() {
714        let g = r.read();
715        let mut m = serde_json::Map::new();
716        for (k, val) in g.iter() {
717            m.insert(k.clone(), perl_to_json_value(val)?);
718        }
719        return Ok(JsonValue::Object(m));
720    }
721    if let Some(r) = v.as_scalar_ref() {
722        return perl_to_json_value(&r.read());
723    }
724    if let Some(a) = v.as_atomic_arc() {
725        return perl_to_json_value(&a.lock().clone());
726    }
727    if let Some(s) = v.as_str() {
728        return Ok(JsonValue::String(s));
729    }
730    if let Some(b) = v.as_bytes_arc() {
731        return Ok(JsonValue::String(String::from_utf8_lossy(&b).into_owned()));
732    }
733    if let Some(si) = v.as_struct_inst() {
734        let mut m = serde_json::Map::new();
735        let values = si.get_values();
736        for (i, field) in si.def.fields.iter().enumerate() {
737            if let Some(fv) = values.get(i) {
738                m.insert(field.name.clone(), perl_to_json_value(fv)?);
739            }
740        }
741        return Ok(JsonValue::Object(m));
742    }
743    if let Some(b) = v.as_blessed_ref() {
744        let inner = b.data.read().clone();
745        return perl_to_json_value(&inner);
746    }
747    if let Some(vals) = v
748        .with_heap(|h| match h {
749            HeapObject::Set(s) => Some(s.values().cloned().collect::<Vec<_>>()),
750            _ => None,
751        })
752        .flatten()
753    {
754        let mut out = Vec::with_capacity(vals.len());
755        for x in vals {
756            out.push(perl_to_json_value(&x)?);
757        }
758        return Ok(JsonValue::Array(out));
759    }
760    if let Some(vals) = v
761        .with_heap(|h| match h {
762            HeapObject::Deque(d) => Some(d.lock().iter().cloned().collect::<Vec<_>>()),
763            _ => None,
764        })
765        .flatten()
766    {
767        let mut out = Vec::with_capacity(vals.len());
768        for x in vals {
769            out.push(perl_to_json_value(&x)?);
770        }
771        return Ok(JsonValue::Array(out));
772    }
773
774    if let Some(df) = v.as_dataframe() {
775        let g = df.lock();
776        let n = g.nrows();
777        let mut rows = Vec::with_capacity(n);
778        for r in 0..n {
779            let mut m = serde_json::Map::new();
780            for (i, col) in g.columns.iter().enumerate() {
781                m.insert(col.clone(), perl_to_json_value(&g.cols[i][r])?);
782            }
783            rows.push(JsonValue::Object(m));
784        }
785        return Ok(JsonValue::Array(rows));
786    }
787
788    Err(PerlError::runtime(
789        format!(
790            "json_encode: value cannot be encoded as JSON ({})",
791            v.type_name()
792        ),
793        0,
794    ))
795}
796
797fn json_to_perl(v: JsonValue) -> PerlValue {
798    match v {
799        JsonValue::Null => PerlValue::UNDEF,
800        JsonValue::Bool(b) => PerlValue::integer(i64::from(b)),
801        JsonValue::Number(n) => {
802            if let Some(i) = n.as_i64() {
803                PerlValue::integer(i)
804            } else if let Some(u) = n.as_u64() {
805                PerlValue::integer(u as i64)
806            } else {
807                PerlValue::float(n.as_f64().unwrap_or(0.0))
808            }
809        }
810        JsonValue::String(s) => PerlValue::string(s),
811        JsonValue::Array(a) => PerlValue::array(a.into_iter().map(json_to_perl).collect()),
812        JsonValue::Object(o) => {
813            let mut map = IndexMap::new();
814            for (k, v) in o {
815                map.insert(k, json_to_perl(v));
816            }
817            PerlValue::hash_ref(Arc::new(RwLock::new(map)))
818        }
819    }
820}
821
822#[cfg(test)]
823mod http_json_tests {
824    use super::*;
825
826    #[test]
827    fn json_to_perl_object_hashref() {
828        let v: JsonValue = serde_json::from_str(r#"{"name":"a","n":1}"#).unwrap();
829        let p = json_to_perl(v);
830        let r = p.as_hash_ref().expect("expected HashRef");
831        let g = r.read();
832        assert_eq!(g.get("name").unwrap().to_string(), "a");
833        assert_eq!(g.get("n").unwrap().to_int(), 1);
834    }
835
836    #[test]
837    fn json_to_perl_array() {
838        let v: JsonValue = serde_json::from_str(r#"[1,"x",null]"#).unwrap();
839        let p = json_to_perl(v);
840        let a = p.as_array_vec().expect("expected Array");
841        assert_eq!(a.len(), 3);
842        assert_eq!(a[0].to_int(), 1);
843        assert_eq!(a[1].to_string(), "x");
844        assert!(a[2].is_undef());
845    }
846
847    #[test]
848    fn json_encode_decode_roundtrip() {
849        let p = PerlValue::array(vec![
850            PerlValue::integer(1),
851            PerlValue::string("x".into()),
852            PerlValue::UNDEF,
853        ]);
854        let s = json_encode(&p).expect("encode");
855        let back = json_decode(&s).expect("decode");
856        let a = back.as_array_vec().expect("array");
857        assert_eq!(a.len(), 3);
858        assert_eq!(a[0].to_int(), 1);
859        assert_eq!(a[1].to_string(), "x");
860        assert!(a[2].is_undef());
861    }
862
863    #[test]
864    fn json_encode_hash_roundtrip() {
865        let mut m = IndexMap::new();
866        m.insert("a".into(), PerlValue::integer(2));
867        let p = PerlValue::hash(m);
868        let s = json_encode(&p).expect("encode");
869        assert!(s.contains("\"a\""));
870        let back = json_decode(&s).expect("decode");
871        let h = back.as_hash_ref().expect("hashref");
872        assert_eq!(h.read().get("a").unwrap().to_int(), 2);
873    }
874
875    #[test]
876    fn json_jq_field_select() {
877        let p = json_decode(r#"{"a":1,"b":{"c":3}}"#).unwrap();
878        let out = json_jq(&p, ".b.c").unwrap();
879        assert_eq!(out.to_int(), 3);
880    }
881
882    #[test]
883    fn json_jq_map_select_multiple_yields_array() {
884        let p = json_decode(r#"[1,2,3,4]"#).unwrap();
885        let out = json_jq(&p, "map(select(. > 2))").unwrap();
886        let a = out.as_array_vec().expect("array");
887        assert_eq!(a.len(), 2);
888        assert_eq!(a[0].to_int(), 3);
889        assert_eq!(a[1].to_int(), 4);
890    }
891
892    #[test]
893    fn test_dataframe_from_path() {
894        let tmp = std::env::temp_dir().join(format!("test_df_{}.csv", std::process::id()));
895        let csv_data = "id,name,val\n1,alice,10.5\n2,bob,20.0\n";
896        std::fs::write(&tmp, csv_data).expect("write csv");
897
898        let df_val = dataframe_from_path(tmp.to_str().unwrap()).expect("dataframe_from_path");
899        let df_lock = df_val.as_dataframe().expect("as_dataframe");
900        let df = df_lock.lock();
901
902        assert_eq!(df.columns, vec!["id", "name", "val"]);
903        assert_eq!(df.cols.len(), 3);
904        assert_eq!(df.cols[0][0].to_string(), "1");
905        assert_eq!(df.cols[1][1].to_string(), "bob");
906        assert_eq!(df.cols[2][0].to_string(), "10.5");
907
908        let _ = std::fs::remove_file(&tmp);
909    }
910}