Skip to main content

citadel_sql/
json.rs

1use std::sync::Arc;
2
3use crate::error::{Result, SqlError};
4use crate::types::Value;
5
6#[repr(u8)]
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum JsonbType {
9    Null = 0,
10    True = 1,
11    False = 2,
12    Integer = 3,
13    Real = 4,
14    String = 5,
15    Array = 6,
16    Object = 7,
17}
18
19impl JsonbType {
20    fn from_nibble(n: u8) -> Option<Self> {
21        match n {
22            0 => Some(Self::Null),
23            1 => Some(Self::True),
24            2 => Some(Self::False),
25            3 => Some(Self::Integer),
26            4 => Some(Self::Real),
27            5 => Some(Self::String),
28            6 => Some(Self::Array),
29            7 => Some(Self::Object),
30            _ => None,
31        }
32    }
33}
34
35const SIZE_CLASS_U8: u8 = 12;
36const SIZE_CLASS_U16: u8 = 13;
37const SIZE_CLASS_U32: u8 = 14;
38const SIZE_CLASS_U64: u8 = 15;
39
40pub fn validate_text(s: &str) -> Result<()> {
41    serde_json::from_str::<serde_json::Value>(s)
42        .map(|_| ())
43        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}")))
44}
45
46pub fn text_to_jsonb(s: &str) -> Result<Value> {
47    let v: serde_json::Value = serde_json::from_str(s)
48        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}")))?;
49    reject_null_bytes(&v)?;
50    let mut buf = Vec::with_capacity(s.len());
51    encode_canonical(&v, &mut buf)?;
52    Ok(Value::Jsonb(Arc::from(buf)))
53}
54
55fn reject_null_bytes(v: &serde_json::Value) -> Result<()> {
56    match v {
57        serde_json::Value::String(s) if s.contains('\0') => Err(SqlError::InvalidValue(
58            "unsupported Unicode escape sequence \\u0000".into(),
59        )),
60        serde_json::Value::Array(items) => items.iter().try_for_each(reject_null_bytes),
61        serde_json::Value::Object(map) => map.iter().try_for_each(|(k, v)| {
62            if k.contains('\0') {
63                return Err(SqlError::InvalidValue(
64                    "unsupported Unicode escape sequence \\u0000".into(),
65                ));
66            }
67            reject_null_bytes(v)
68        }),
69        _ => Ok(()),
70    }
71}
72
73pub fn decode_to_text(bytes: &[u8]) -> Result<String> {
74    let v = decode_to_serde(bytes)?;
75    serde_json::to_string(&v).map_err(|e| SqlError::InvalidValue(format!("JSONB render: {e}")))
76}
77
78pub fn decode_to_serde(bytes: &[u8]) -> Result<serde_json::Value> {
79    let mut pos = 0;
80    let v = decode_value(bytes, &mut pos)?;
81    if pos != bytes.len() {
82        return Err(SqlError::InvalidValue("trailing bytes in JSONB".into()));
83    }
84    Ok(v)
85}
86
87pub fn encode_canonical(v: &serde_json::Value, out: &mut Vec<u8>) -> Result<()> {
88    match v {
89        serde_json::Value::Null => out.push(header_byte(JsonbType::Null, 0)),
90        serde_json::Value::Bool(true) => out.push(header_byte(JsonbType::True, 0)),
91        serde_json::Value::Bool(false) => out.push(header_byte(JsonbType::False, 0)),
92        serde_json::Value::Number(n) => {
93            if let Some(i) = n.as_i64() {
94                out.push(header_byte(JsonbType::Integer, 0));
95                out.extend_from_slice(&i.to_le_bytes());
96            } else if let Some(f) = n.as_f64() {
97                if f.is_finite() {
98                    out.push(header_byte(JsonbType::Real, 0));
99                    out.extend_from_slice(&f.to_le_bytes());
100                } else {
101                    return Err(SqlError::InvalidValue("non-finite number in JSON".into()));
102                }
103            } else {
104                return Err(SqlError::InvalidValue(format!("unsupported number: {n}")));
105            }
106        }
107        serde_json::Value::String(s) => encode_string(s, out),
108        serde_json::Value::Array(items) => {
109            let mut payload = Vec::new();
110            for item in items {
111                encode_canonical(item, &mut payload)?;
112            }
113            write_header_with_len(JsonbType::Array, payload.len(), out);
114            out.extend_from_slice(&payload);
115        }
116        serde_json::Value::Object(map) => {
117            let mut keys: Vec<&String> = map.keys().collect();
118            keys.sort();
119            let mut payload = Vec::new();
120            for k in keys {
121                encode_string(k, &mut payload);
122                encode_canonical(&map[k], &mut payload)?;
123            }
124            write_header_with_len(JsonbType::Object, payload.len(), out);
125            out.extend_from_slice(&payload);
126        }
127    }
128    Ok(())
129}
130
131fn encode_string(s: &str, out: &mut Vec<u8>) {
132    let bytes = s.as_bytes();
133    write_header_with_len(JsonbType::String, bytes.len(), out);
134    out.extend_from_slice(bytes);
135}
136
137fn header_byte(ty: JsonbType, size_class: u8) -> u8 {
138    debug_assert!(size_class <= 15);
139    (ty as u8) << 4 | (size_class & 0x0F)
140}
141
142fn write_header_with_len(ty: JsonbType, len: usize, out: &mut Vec<u8>) {
143    if len <= 11 {
144        out.push(header_byte(ty, len as u8));
145    } else if len <= u8::MAX as usize {
146        out.push(header_byte(ty, SIZE_CLASS_U8));
147        out.push(len as u8);
148    } else if len <= u16::MAX as usize {
149        out.push(header_byte(ty, SIZE_CLASS_U16));
150        out.extend_from_slice(&(len as u16).to_le_bytes());
151    } else if len <= u32::MAX as usize {
152        out.push(header_byte(ty, SIZE_CLASS_U32));
153        out.extend_from_slice(&(len as u32).to_le_bytes());
154    } else {
155        out.push(header_byte(ty, SIZE_CLASS_U64));
156        out.extend_from_slice(&(len as u64).to_le_bytes());
157    }
158}
159
160pub fn read_header(bytes: &[u8]) -> Result<(JsonbType, usize, usize)> {
161    if bytes.is_empty() {
162        return Err(SqlError::InvalidValue("empty JSONB".into()));
163    }
164    let h = bytes[0];
165    let ty = JsonbType::from_nibble(h >> 4)
166        .ok_or_else(|| SqlError::InvalidValue("invalid JSONB type tag".into()))?;
167    let size_class = h & 0x0F;
168    let (payload_start, payload_len) = match size_class {
169        0..=11 => (1, size_class as usize),
170        SIZE_CLASS_U8 => {
171            if bytes.len() < 2 {
172                return Err(SqlError::InvalidValue("truncated JSONB header".into()));
173            }
174            (2, bytes[1] as usize)
175        }
176        SIZE_CLASS_U16 => {
177            if bytes.len() < 3 {
178                return Err(SqlError::InvalidValue("truncated JSONB header".into()));
179            }
180            (3, u16::from_le_bytes([bytes[1], bytes[2]]) as usize)
181        }
182        SIZE_CLASS_U32 => {
183            if bytes.len() < 5 {
184                return Err(SqlError::InvalidValue("truncated JSONB header".into()));
185            }
186            (
187                5,
188                u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]) as usize,
189            )
190        }
191        SIZE_CLASS_U64 => {
192            if bytes.len() < 9 {
193                return Err(SqlError::InvalidValue("truncated JSONB header".into()));
194            }
195            let arr: [u8; 8] = bytes[1..9].try_into().unwrap();
196            (9, u64::from_le_bytes(arr) as usize)
197        }
198        _ => unreachable!(),
199    };
200    let fixed_payload = match ty {
201        JsonbType::Null | JsonbType::True | JsonbType::False => Some(0),
202        JsonbType::Integer | JsonbType::Real => Some(8),
203        _ => None,
204    };
205    let payload_len = fixed_payload.unwrap_or(payload_len);
206    if payload_start + payload_len > bytes.len() {
207        return Err(SqlError::InvalidValue("JSONB payload truncated".into()));
208    }
209    Ok((ty, payload_start, payload_len))
210}
211
212pub fn skip_value(bytes: &[u8]) -> Result<usize> {
213    let (_ty, payload_start, payload_len) = read_header(bytes)?;
214    Ok(payload_start + payload_len)
215}
216
217pub fn find_object_key<'a>(bytes: &'a [u8], key: &str) -> Result<Option<&'a [u8]>> {
218    let (ty, payload_start, payload_len) = read_header(bytes)?;
219    if ty != JsonbType::Object {
220        return Ok(None);
221    }
222    let payload = &bytes[payload_start..payload_start + payload_len];
223    let key_bytes = key.as_bytes();
224    let mut pos = 0usize;
225    while pos < payload.len() {
226        let (kty, kp_start, kp_len) = read_header(&payload[pos..])?;
227        if kty != JsonbType::String {
228            return Err(SqlError::InvalidValue("JSONB object key not string".into()));
229        }
230        let k_total = kp_start + kp_len;
231        let k_slice = &payload[pos + kp_start..pos + k_total];
232        let value_start = pos + k_total;
233        let value_total = skip_value(&payload[value_start..])?;
234        if k_slice == key_bytes {
235            return Ok(Some(&payload[value_start..value_start + value_total]));
236        }
237        pos = value_start + value_total;
238    }
239    Ok(None)
240}
241
242pub fn array_get(bytes: &[u8], idx: i64) -> Result<Option<&[u8]>> {
243    let (ty, payload_start, payload_len) = read_header(bytes)?;
244    if ty != JsonbType::Array {
245        return Ok(None);
246    }
247    let payload = &bytes[payload_start..payload_start + payload_len];
248    if idx < 0 {
249        let mut elems = Vec::new();
250        let mut pos = 0usize;
251        while pos < payload.len() {
252            let total = skip_value(&payload[pos..])?;
253            elems.push((pos, total));
254            pos += total;
255        }
256        let len = elems.len() as i64;
257        let real = len + idx;
258        if real < 0 {
259            return Ok(None);
260        }
261        let (start, total) = elems[real as usize];
262        return Ok(Some(&payload[start..start + total]));
263    }
264    let mut pos = 0usize;
265    let mut remaining = idx;
266    while pos < payload.len() {
267        let total = skip_value(&payload[pos..])?;
268        if remaining == 0 {
269            return Ok(Some(&payload[pos..pos + total]));
270        }
271        remaining -= 1;
272        pos += total;
273    }
274    Ok(None)
275}
276
277pub fn array_len_bytes(bytes: &[u8]) -> Result<Option<usize>> {
278    let (ty, payload_start, payload_len) = read_header(bytes)?;
279    if ty != JsonbType::Array {
280        return Ok(None);
281    }
282    let payload = &bytes[payload_start..payload_start + payload_len];
283    let mut pos = 0usize;
284    let mut count = 0usize;
285    while pos < payload.len() {
286        pos += skip_value(&payload[pos..])?;
287        count += 1;
288    }
289    Ok(Some(count))
290}
291
292pub fn object_len_bytes(bytes: &[u8]) -> Result<Option<usize>> {
293    let (ty, payload_start, payload_len) = read_header(bytes)?;
294    if ty != JsonbType::Object {
295        return Ok(None);
296    }
297    let payload = &bytes[payload_start..payload_start + payload_len];
298    let mut pos = 0usize;
299    let mut count = 0usize;
300    while pos < payload.len() {
301        pos += skip_value(&payload[pos..])?;
302        pos += skip_value(&payload[pos..])?;
303        count += 1;
304    }
305    Ok(Some(count))
306}
307
308pub fn read_scalar_text(bytes: &[u8]) -> Result<Option<String>> {
309    let (ty, payload_start, payload_len) = read_header(bytes)?;
310    let payload = &bytes[payload_start..payload_start + payload_len];
311    match ty {
312        JsonbType::Null => Ok(None),
313        JsonbType::True => Ok(Some("true".into())),
314        JsonbType::False => Ok(Some("false".into())),
315        JsonbType::Integer => {
316            let arr: [u8; 8] = payload
317                .try_into()
318                .map_err(|_| SqlError::InvalidValue("JSONB integer payload size".into()))?;
319            Ok(Some(i64::from_le_bytes(arr).to_string()))
320        }
321        JsonbType::Real => {
322            let arr: [u8; 8] = payload
323                .try_into()
324                .map_err(|_| SqlError::InvalidValue("JSONB real payload size".into()))?;
325            let f = f64::from_le_bytes(arr);
326            let n = serde_json::Number::from_f64(f)
327                .ok_or_else(|| SqlError::InvalidValue("non-finite JSONB number".into()))?;
328            Ok(Some(n.to_string()))
329        }
330        JsonbType::String => {
331            let s = std::str::from_utf8(payload)
332                .map_err(|_| SqlError::InvalidValue("JSONB string not UTF-8".into()))?;
333            Ok(Some(s.to_string()))
334        }
335        JsonbType::Array | JsonbType::Object => {
336            let v = decode_to_serde(bytes)?;
337            Ok(Some(serde_json::to_string(&v).map_err(|e| {
338                SqlError::InvalidValue(format!("JSON render: {e}"))
339            })?))
340        }
341    }
342}
343
344pub fn jsonb_contains_bytes(lhs: &[u8], rhs: &[u8]) -> Result<bool> {
345    let (lty, lps, lpl) = read_header(lhs)?;
346    let (rty, rps, rpl) = read_header(rhs)?;
347    let lpay = &lhs[lps..lps + lpl];
348    let rpay = &rhs[rps..rps + rpl];
349    match (lty, rty) {
350        (JsonbType::Object, JsonbType::Object) => {
351            let mut rp = 0usize;
352            while rp < rpay.len() {
353                let (_rkty, rkps, rkpl) = read_header(&rpay[rp..])?;
354                let rk_total = rkps + rkpl;
355                let rk = &rpay[rp + rkps..rp + rk_total];
356                let rv_start = rp + rk_total;
357                let rv_total = skip_value(&rpay[rv_start..])?;
358                let rv = &rpay[rv_start..rv_start + rv_total];
359                let mut lp = 0usize;
360                let mut found = false;
361                while lp < lpay.len() {
362                    let (_lkty, lkps, lkpl) = read_header(&lpay[lp..])?;
363                    let lk_total = lkps + lkpl;
364                    let lk = &lpay[lp + lkps..lp + lk_total];
365                    let lv_start = lp + lk_total;
366                    let lv_total = skip_value(&lpay[lv_start..])?;
367                    if lk == rk {
368                        let lv = &lpay[lv_start..lv_start + lv_total];
369                        if !jsonb_contains_bytes(lv, rv)? {
370                            return Ok(false);
371                        }
372                        found = true;
373                        break;
374                    }
375                    lp = lv_start + lv_total;
376                }
377                if !found {
378                    return Ok(false);
379                }
380                rp = rv_start + rv_total;
381            }
382            Ok(true)
383        }
384        (JsonbType::Array, JsonbType::Array) => {
385            let mut rp = 0usize;
386            while rp < rpay.len() {
387                let rv_total = skip_value(&rpay[rp..])?;
388                let rv = &rpay[rp..rp + rv_total];
389                let mut lp = 0usize;
390                let mut found = false;
391                while lp < lpay.len() {
392                    let lv_total = skip_value(&lpay[lp..])?;
393                    let lv = &lpay[lp..lp + lv_total];
394                    if jsonb_contains_bytes(lv, rv)? {
395                        found = true;
396                        break;
397                    }
398                    lp += lv_total;
399                }
400                if !found {
401                    return Ok(false);
402                }
403                rp += rv_total;
404            }
405            Ok(true)
406        }
407        (JsonbType::Array, _) => {
408            let r_total = rps + rpl;
409            let r_full = &rhs[..r_total];
410            let mut lp = 0usize;
411            while lp < lpay.len() {
412                let lv_total = skip_value(&lpay[lp..])?;
413                if &lpay[lp..lp + lv_total] == r_full {
414                    return Ok(true);
415                }
416                lp += lv_total;
417            }
418            Ok(false)
419        }
420        _ => {
421            let l_total = lps + lpl;
422            let r_total = rps + rpl;
423            Ok(lhs[..l_total] == rhs[..r_total])
424        }
425    }
426}
427
428pub fn has_top_key_bytes(bytes: &[u8], key: &str) -> Result<bool> {
429    let (ty, payload_start, payload_len) = read_header(bytes)?;
430    let payload = &bytes[payload_start..payload_start + payload_len];
431    let key_bytes = key.as_bytes();
432    match ty {
433        JsonbType::Object => {
434            let mut pos = 0usize;
435            while pos < payload.len() {
436                let (_kty, kps, kpl) = read_header(&payload[pos..])?;
437                let k_total = kps + kpl;
438                if &payload[pos + kps..pos + k_total] == key_bytes {
439                    return Ok(true);
440                }
441                pos += k_total;
442                pos += skip_value(&payload[pos..])?;
443            }
444            Ok(false)
445        }
446        JsonbType::Array => {
447            let mut pos = 0usize;
448            while pos < payload.len() {
449                let (ety, eps, epl) = read_header(&payload[pos..])?;
450                if ety == JsonbType::String && &payload[pos + eps..pos + eps + epl] == key_bytes {
451                    return Ok(true);
452                }
453                pos += eps + epl;
454            }
455            Ok(false)
456        }
457        _ => Ok(false),
458    }
459}
460
461fn decode_value(bytes: &[u8], pos: &mut usize) -> Result<serde_json::Value> {
462    let (ty, payload_start, payload_len) = read_header(&bytes[*pos..])?;
463    let payload = &bytes[*pos + payload_start..*pos + payload_start + payload_len];
464    let total = payload_start + payload_len;
465    let v = match ty {
466        JsonbType::Null => serde_json::Value::Null,
467        JsonbType::True => serde_json::Value::Bool(true),
468        JsonbType::False => serde_json::Value::Bool(false),
469        JsonbType::Integer => {
470            let arr: [u8; 8] = payload
471                .try_into()
472                .map_err(|_| SqlError::InvalidValue("JSONB integer payload size".into()))?;
473            serde_json::Value::Number(i64::from_le_bytes(arr).into())
474        }
475        JsonbType::Real => {
476            let arr: [u8; 8] = payload
477                .try_into()
478                .map_err(|_| SqlError::InvalidValue("JSONB real payload size".into()))?;
479            let f = f64::from_le_bytes(arr);
480            serde_json::Number::from_f64(f)
481                .map(serde_json::Value::Number)
482                .ok_or_else(|| SqlError::InvalidValue("non-finite JSONB number".into()))?
483        }
484        JsonbType::String => {
485            let s = std::str::from_utf8(payload)
486                .map_err(|_| SqlError::InvalidValue("JSONB string not UTF-8".into()))?;
487            serde_json::Value::String(s.to_string())
488        }
489        JsonbType::Array => {
490            let mut items = Vec::new();
491            let mut child_pos = 0usize;
492            while child_pos < payload.len() {
493                let mut local = child_pos;
494                let item = decode_value(payload, &mut local)?;
495                items.push(item);
496                child_pos = local;
497            }
498            serde_json::Value::Array(items)
499        }
500        JsonbType::Object => {
501            let mut map = serde_json::Map::new();
502            let mut child_pos = 0usize;
503            while child_pos < payload.len() {
504                let mut local = child_pos;
505                let key = match decode_value(payload, &mut local)? {
506                    serde_json::Value::String(s) => s,
507                    _ => return Err(SqlError::InvalidValue("JSONB object key not string".into())),
508                };
509                let value = decode_value(payload, &mut local)?;
510                map.insert(key, value);
511                child_pos = local;
512            }
513            serde_json::Value::Object(map)
514        }
515    };
516    *pos += total;
517    Ok(v)
518}
519
520fn value_to_serde(v: &Value) -> Result<serde_json::Value> {
521    match v {
522        Value::Json(s) => serde_json::from_str(s)
523            .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}"))),
524        Value::Jsonb(b) => decode_to_serde(b),
525        _ => Err(SqlError::TypeMismatch {
526            expected: "JSON or JSONB".into(),
527            got: v.data_type().to_string(),
528        }),
529    }
530}
531
532fn serde_to_value(j: serde_json::Value, target: crate::types::DataType) -> Result<Value> {
533    use crate::types::DataType;
534    match target {
535        DataType::Json => Ok(Value::Json(
536            serde_json::to_string(&j)
537                .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
538                .into(),
539        )),
540        DataType::Jsonb => {
541            let mut buf = Vec::new();
542            encode_canonical(&j, &mut buf)?;
543            Ok(Value::Jsonb(Arc::from(buf)))
544        }
545        _ => Err(SqlError::InvalidValue(format!(
546            "cannot serialize JSON to {target}"
547        ))),
548    }
549}
550
551fn serde_to_scalar_value(j: serde_json::Value) -> Value {
552    match j {
553        serde_json::Value::Null => Value::Null,
554        serde_json::Value::Bool(b) => Value::Boolean(b),
555        serde_json::Value::Number(n) => {
556            if let Some(i) = n.as_i64() {
557                Value::Integer(i)
558            } else if let Some(f) = n.as_f64() {
559                Value::Real(f)
560            } else {
561                Value::Null
562            }
563        }
564        serde_json::Value::String(s) => Value::Text(s.into()),
565        other => {
566            let text = serde_json::to_string(&other).unwrap_or_default();
567            Value::Text(text.into())
568        }
569    }
570}
571
572pub fn op_get(lhs: &Value, key: &Value) -> Result<Value> {
573    if let Value::Jsonb(b) = lhs {
574        let slice = match key {
575            Value::Text(k) => find_object_key(b, k.as_str())?,
576            Value::Integer(i) => array_get(b, *i)?,
577            _ => None,
578        };
579        return match slice {
580            Some(bytes) => Ok(Value::Jsonb(Arc::from(bytes))),
581            None => Ok(Value::Null),
582        };
583    }
584    let target = match lhs {
585        Value::Json(_) => crate::types::DataType::Json,
586        _ => {
587            return Err(SqlError::TypeMismatch {
588                expected: "JSON or JSONB".into(),
589                got: lhs.data_type().to_string(),
590            })
591        }
592    };
593    let j = value_to_serde(lhs)?;
594    let extracted = navigate_one(&j, key);
595    match extracted {
596        Some(v) => serde_to_value(v, target),
597        None => Ok(Value::Null),
598    }
599}
600
601pub fn op_get_text(lhs: &Value, key: &Value) -> Result<Value> {
602    if let Value::Jsonb(b) = lhs {
603        let slice = match key {
604            Value::Text(k) => find_object_key(b, k.as_str())?,
605            Value::Integer(i) => array_get(b, *i)?,
606            _ => None,
607        };
608        return match slice {
609            Some(bytes) => match read_scalar_text(bytes)? {
610                Some(s) => Ok(Value::Text(s.into())),
611                None => Ok(Value::Null),
612            },
613            None => Ok(Value::Null),
614        };
615    }
616    let j = value_to_serde(lhs)?;
617    match navigate_one(&j, key) {
618        Some(serde_json::Value::Null) => Ok(Value::Null),
619        Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
620        Some(v) => Ok(Value::Text(
621            serde_json::to_string(&v)
622                .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
623                .into(),
624        )),
625        None => Ok(Value::Null),
626    }
627}
628
629pub fn op_path(lhs: &Value, path: &Value) -> Result<Value> {
630    let target = match lhs {
631        Value::Json(_) => crate::types::DataType::Json,
632        Value::Jsonb(_) => crate::types::DataType::Jsonb,
633        _ => {
634            return Err(SqlError::TypeMismatch {
635                expected: "JSON or JSONB".into(),
636                got: lhs.data_type().to_string(),
637            })
638        }
639    };
640    let j = value_to_serde(lhs)?;
641    let segments = path_to_segments(path)?;
642    match navigate_path(&j, &segments) {
643        Some(v) => serde_to_value(v, target),
644        None => Ok(Value::Null),
645    }
646}
647
648pub fn op_path_text(lhs: &Value, path: &Value) -> Result<Value> {
649    let j = value_to_serde(lhs)?;
650    let segments = path_to_segments(path)?;
651    match navigate_path(&j, &segments) {
652        Some(serde_json::Value::Null) => Ok(Value::Null),
653        Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
654        Some(v) => Ok(Value::Text(
655            serde_json::to_string(&v)
656                .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
657                .into(),
658        )),
659        None => Ok(Value::Null),
660    }
661}
662
663pub fn op_contains(lhs: &Value, rhs: &Value) -> Result<Value> {
664    if let (Value::Jsonb(l), Value::Jsonb(r)) = (lhs, rhs) {
665        return Ok(Value::Boolean(jsonb_contains_bytes(l, r)?));
666    }
667    let left = value_to_serde(lhs)?;
668    let right = value_to_serde(rhs)?;
669    Ok(Value::Boolean(json_contains(&left, &right)))
670}
671
672pub fn op_contained_by(lhs: &Value, rhs: &Value) -> Result<Value> {
673    if let (Value::Jsonb(l), Value::Jsonb(r)) = (lhs, rhs) {
674        return Ok(Value::Boolean(jsonb_contains_bytes(r, l)?));
675    }
676    let left = value_to_serde(lhs)?;
677    let right = value_to_serde(rhs)?;
678    Ok(Value::Boolean(json_contains(&right, &left)))
679}
680
681pub fn op_has_key(lhs: &Value, rhs: &Value) -> Result<Value> {
682    let key = match rhs {
683        Value::Text(s) => s.as_str(),
684        _ => {
685            return Err(SqlError::TypeMismatch {
686                expected: "TEXT key".into(),
687                got: rhs.data_type().to_string(),
688            })
689        }
690    };
691    if let Value::Jsonb(b) = lhs {
692        return Ok(Value::Boolean(has_top_key_bytes(b, key)?));
693    }
694    let left = value_to_serde(lhs)?;
695    let exists = match &left {
696        serde_json::Value::Object(m) => m.contains_key(key),
697        serde_json::Value::Array(arr) => arr
698            .iter()
699            .any(|e| matches!(e, serde_json::Value::String(s) if s == key)),
700        _ => false,
701    };
702    Ok(Value::Boolean(exists))
703}
704
705pub fn op_has_any_key(lhs: &Value, rhs: &Value) -> Result<Value> {
706    let left = value_to_serde(lhs)?;
707    let keys = text_array(rhs)?;
708    let m = match &left {
709        serde_json::Value::Object(m) => m,
710        _ => return Ok(Value::Boolean(false)),
711    };
712    Ok(Value::Boolean(
713        keys.iter().any(|k| m.contains_key(k.as_str())),
714    ))
715}
716
717pub fn op_has_all_keys(lhs: &Value, rhs: &Value) -> Result<Value> {
718    let left = value_to_serde(lhs)?;
719    let keys = text_array(rhs)?;
720    let m = match &left {
721        serde_json::Value::Object(m) => m,
722        _ => return Ok(Value::Boolean(keys.is_empty())),
723    };
724    Ok(Value::Boolean(
725        keys.iter().all(|k| m.contains_key(k.as_str())),
726    ))
727}
728
729pub fn op_delete_path(lhs: &Value, path: &Value) -> Result<Value> {
730    let target = match lhs {
731        Value::Json(_) => crate::types::DataType::Json,
732        Value::Jsonb(_) => crate::types::DataType::Jsonb,
733        _ => {
734            return Err(SqlError::TypeMismatch {
735                expected: "JSON or JSONB".into(),
736                got: lhs.data_type().to_string(),
737            })
738        }
739    };
740    let mut j = value_to_serde(lhs)?;
741    let segments = path_to_segments(path)?;
742    delete_at_path(&mut j, &segments);
743    serde_to_value(j, target)
744}
745
746pub fn op_delete_one(lhs: &Value, rhs: &Value) -> Result<Value> {
747    let target = match lhs {
748        Value::Json(_) => crate::types::DataType::Json,
749        Value::Jsonb(_) => crate::types::DataType::Jsonb,
750        _ => {
751            return Err(SqlError::TypeMismatch {
752                expected: "JSON or JSONB".into(),
753                got: lhs.data_type().to_string(),
754            })
755        }
756    };
757    let mut j = value_to_serde(lhs)?;
758    match (&mut j, rhs) {
759        (serde_json::Value::Object(m), Value::Text(k)) => {
760            m.remove(k.as_str());
761        }
762        (serde_json::Value::Array(arr), Value::Integer(i)) => {
763            let len = arr.len() as i64;
764            let idx = if *i < 0 { len + i } else { *i };
765            if (0..len).contains(&idx) {
766                arr.remove(idx as usize);
767            }
768        }
769        (serde_json::Value::Array(arr), Value::Text(k)) => {
770            arr.retain(|e| !matches!(e, serde_json::Value::String(s) if s == k.as_str()));
771        }
772        _ => {}
773    }
774    serde_to_value(j, target)
775}
776
777pub fn op_concat(lhs: &Value, rhs: &Value) -> Result<Value> {
778    let target = match (lhs, rhs) {
779        (Value::Jsonb(_), _) | (_, Value::Jsonb(_)) => crate::types::DataType::Jsonb,
780        _ => crate::types::DataType::Json,
781    };
782    let mut left = value_to_serde(lhs)?;
783    let right = value_to_serde(rhs)?;
784    match (&mut left, right) {
785        (serde_json::Value::Object(a), serde_json::Value::Object(b)) => {
786            for (k, v) in b {
787                a.insert(k, v);
788            }
789        }
790        (serde_json::Value::Array(a), serde_json::Value::Array(b)) => {
791            a.extend(b);
792        }
793        (serde_json::Value::Array(a), other) => {
794            a.push(other);
795        }
796        (a, serde_json::Value::Array(mut b)) => {
797            let owned = std::mem::take(a);
798            let mut combined = vec![owned];
799            combined.append(&mut b);
800            *a = serde_json::Value::Array(combined);
801        }
802        (a, b) => {
803            let av = std::mem::take(a);
804            *a = serde_json::Value::Array(vec![av, b]);
805        }
806    }
807    serde_to_value(left, target)
808}
809
810pub fn op_path_exists(lhs: &Value, path: &Value) -> Result<Value> {
811    let j = value_to_serde(lhs)?;
812    let path_str = match path {
813        Value::Text(s) => s.to_string(),
814        Value::Json(s) => s.to_string(),
815        _ => {
816            return Err(SqlError::TypeMismatch {
817                expected: "TEXT path".into(),
818                got: path.data_type().to_string(),
819            })
820        }
821    };
822    let jp = sql_json_path::JsonPath::new(&path_str)
823        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
824    let exists = jp
825        .exists(&j)
826        .map_err(|e| SqlError::InvalidValue(format!("JSON path eval: {e}")))?;
827    Ok(Value::Boolean(exists))
828}
829
830pub fn op_path_match(lhs: &Value, path: &Value) -> Result<Value> {
831    let j = value_to_serde(lhs)?;
832    let path_str = match path {
833        Value::Text(s) => s.to_string(),
834        Value::Json(s) => s.to_string(),
835        _ => {
836            return Err(SqlError::TypeMismatch {
837                expected: "TEXT path".into(),
838                got: path.data_type().to_string(),
839            })
840        }
841    };
842    let jp = sql_json_path::JsonPath::new(&path_str)
843        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
844    let result = jp
845        .query(&j)
846        .map_err(|e| SqlError::InvalidValue(format!("JSON path eval: {e}")))?;
847    let truthy = result
848        .iter()
849        .any(|node| matches!(node.as_ref(), serde_json::Value::Bool(true)));
850    Ok(Value::Boolean(truthy))
851}
852
853pub fn fn_json_exists(j_val: &Value, path: &Value) -> Result<Value> {
854    op_path_exists(j_val, path)
855}
856
857pub fn fn_json_value(j_val: &Value, path: &Value) -> Result<Value> {
858    let j = value_to_serde(j_val)?;
859    let path_str = match path {
860        Value::Text(s) => s.to_string(),
861        _ => {
862            return Err(SqlError::TypeMismatch {
863                expected: "TEXT path".into(),
864                got: path.data_type().to_string(),
865            })
866        }
867    };
868    let jp = sql_json_path::JsonPath::new(&path_str)
869        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
870    match jp
871        .query_first(&j)
872        .map_err(|e| SqlError::InvalidValue(format!("JSON path eval: {e}")))?
873    {
874        Some(node) => match node.into_owned() {
875            serde_json::Value::Null => Ok(Value::Null),
876            serde_json::Value::String(s) => Ok(Value::Text(s.into())),
877            other => Ok(Value::Text(
878                serde_json::to_string(&other)
879                    .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
880                    .into(),
881            )),
882        },
883        None => Ok(Value::Null),
884    }
885}
886
887pub fn fn_json_query(j_val: &Value, path: &Value, target: crate::types::DataType) -> Result<Value> {
888    let j = value_to_serde(j_val)?;
889    let path_str = match path {
890        Value::Text(s) => s.to_string(),
891        _ => {
892            return Err(SqlError::TypeMismatch {
893                expected: "TEXT path".into(),
894                got: path.data_type().to_string(),
895            })
896        }
897    };
898    let jp = sql_json_path::JsonPath::new(&path_str)
899        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
900    let nodes = jp
901        .query(&j)
902        .map_err(|e| SqlError::InvalidValue(format!("JSON path eval: {e}")))?;
903    if nodes.is_empty() {
904        return Ok(Value::Null);
905    }
906    let result_json = if nodes.len() == 1 {
907        nodes[0].as_ref().clone()
908    } else {
909        serde_json::Value::Array(nodes.iter().map(|n| n.as_ref().clone()).collect())
910    };
911    serde_to_value(result_json, target)
912}
913
914#[derive(Debug, Clone, PartialEq, Eq)]
915pub enum PathSeg {
916    Key(String),
917    Index(i64),
918    Wildcard,
919}
920
921pub fn parse_dollar_path(s: &str) -> Result<Vec<PathSeg>> {
922    let s = s.trim();
923    let s = s.strip_prefix('$').unwrap_or(s);
924    let bytes = s.as_bytes();
925    let mut out = Vec::new();
926    let mut i = 0;
927    while i < bytes.len() {
928        match bytes[i] {
929            b'.' => {
930                i += 1;
931                let start = i;
932                while i < bytes.len() && bytes[i] != b'.' && bytes[i] != b'[' {
933                    i += 1;
934                }
935                if i > start {
936                    let key = std::str::from_utf8(&bytes[start..i])
937                        .map_err(|_| SqlError::InvalidValue("invalid path segment".into()))?;
938                    out.push(PathSeg::Key(key.to_string()));
939                }
940            }
941            b'[' => {
942                i += 1;
943                let start = i;
944                while i < bytes.len() && bytes[i] != b']' {
945                    i += 1;
946                }
947                if i > bytes.len() {
948                    return Err(SqlError::InvalidValue("unterminated index".into()));
949                }
950                let inner = std::str::from_utf8(&bytes[start..i])
951                    .map_err(|_| SqlError::InvalidValue("invalid path index".into()))?;
952                if inner.trim() == "*" {
953                    out.push(PathSeg::Wildcard);
954                } else if let Ok(idx) = inner.parse::<i64>() {
955                    out.push(PathSeg::Index(idx));
956                } else {
957                    let key = inner.trim_matches('"').trim_matches('\'');
958                    out.push(PathSeg::Key(key.to_string()));
959                }
960                i += 1;
961            }
962            _ => i += 1,
963        }
964    }
965    Ok(out)
966}
967
968fn path_to_segments(v: &Value) -> Result<Vec<PathSeg>> {
969    match v {
970        Value::Text(s) => {
971            if s.starts_with('$') {
972                parse_dollar_path(s)
973            } else if s.starts_with('{') && s.ends_with('}') {
974                parse_pg_array_path(s)
975            } else {
976                Ok(vec![PathSeg::Key(s.to_string())])
977            }
978        }
979        Value::Integer(i) => Ok(vec![PathSeg::Index(*i)]),
980        Value::Json(_) | Value::Jsonb(_) => {
981            let parsed = value_to_serde(v)?;
982            json_to_path(&parsed)
983        }
984        _ => Err(SqlError::TypeMismatch {
985            expected: "TEXT or path array".into(),
986            got: v.data_type().to_string(),
987        }),
988    }
989}
990
991fn parse_pg_array_path(s: &str) -> Result<Vec<PathSeg>> {
992    let inner = &s[1..s.len() - 1];
993    if inner.is_empty() {
994        return Ok(vec![]);
995    }
996    inner
997        .split(',')
998        .map(|raw| {
999            let trimmed = raw.trim().trim_matches('"');
1000            if let Ok(idx) = trimmed.parse::<i64>() {
1001                PathSeg::Index(idx)
1002            } else {
1003                PathSeg::Key(trimmed.to_string())
1004            }
1005        })
1006        .map(Ok)
1007        .collect()
1008}
1009
1010fn json_to_path(j: &serde_json::Value) -> Result<Vec<PathSeg>> {
1011    let arr = j
1012        .as_array()
1013        .ok_or_else(|| SqlError::InvalidValue("path must be a JSON array".into()))?;
1014    arr.iter()
1015        .map(|item| match item {
1016            serde_json::Value::String(s) => Ok(PathSeg::Key(s.clone())),
1017            serde_json::Value::Number(n) => n
1018                .as_i64()
1019                .map(PathSeg::Index)
1020                .ok_or_else(|| SqlError::InvalidValue("path index out of range".into())),
1021            _ => Err(SqlError::InvalidValue(
1022                "path segments must be strings or integers".into(),
1023            )),
1024        })
1025        .collect()
1026}
1027
1028fn navigate_one(j: &serde_json::Value, key: &Value) -> Option<serde_json::Value> {
1029    match (j, key) {
1030        (serde_json::Value::Object(m), Value::Text(k)) => m.get(k.as_str()).cloned(),
1031        (serde_json::Value::Array(arr), Value::Integer(i)) => {
1032            let len = arr.len() as i64;
1033            let idx = if *i < 0 { len + i } else { *i };
1034            if (0..len).contains(&idx) {
1035                Some(arr[idx as usize].clone())
1036            } else {
1037                None
1038            }
1039        }
1040        _ => None,
1041    }
1042}
1043
1044fn navigate_path(j: &serde_json::Value, segments: &[PathSeg]) -> Option<serde_json::Value> {
1045    let mut cur = j.clone();
1046    for seg in segments {
1047        cur = match (&cur, seg) {
1048            (serde_json::Value::Object(m), PathSeg::Key(k)) => m.get(k.as_str())?.clone(),
1049            (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1050                let len = arr.len() as i64;
1051                let idx = if *i < 0 { len + i } else { *i };
1052                if (0..len).contains(&idx) {
1053                    arr[idx as usize].clone()
1054                } else {
1055                    return None;
1056                }
1057            }
1058            (serde_json::Value::Array(arr), PathSeg::Key(k)) => {
1059                let idx: i64 = k.parse().ok()?;
1060                let len = arr.len() as i64;
1061                let idx = if idx < 0 { len + idx } else { idx };
1062                if (0..len).contains(&idx) {
1063                    arr[idx as usize].clone()
1064                } else {
1065                    return None;
1066                }
1067            }
1068            _ => return None,
1069        };
1070    }
1071    Some(cur)
1072}
1073
1074fn delete_at_path(j: &mut serde_json::Value, segments: &[PathSeg]) {
1075    if segments.is_empty() {
1076        return;
1077    }
1078    let (last, prefix) = segments.split_last().unwrap();
1079    let target = navigate_mut(j, prefix);
1080    if let Some(t) = target {
1081        match (t, last) {
1082            (serde_json::Value::Object(m), PathSeg::Key(k)) => {
1083                m.remove(k.as_str());
1084            }
1085            (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1086                let len = arr.len() as i64;
1087                let idx = if *i < 0 { len + i } else { *i };
1088                if (0..len).contains(&idx) {
1089                    arr.remove(idx as usize);
1090                }
1091            }
1092            _ => {}
1093        }
1094    }
1095}
1096
1097fn navigate_mut<'a>(
1098    j: &'a mut serde_json::Value,
1099    segments: &[PathSeg],
1100) -> Option<&'a mut serde_json::Value> {
1101    let mut cur = j;
1102    for seg in segments {
1103        cur = match (cur, seg) {
1104            (serde_json::Value::Object(m), PathSeg::Key(k)) => m.get_mut(k.as_str())?,
1105            (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1106                let len = arr.len() as i64;
1107                let idx = if *i < 0 { len + i } else { *i };
1108                if (0..len).contains(&idx) {
1109                    arr.get_mut(idx as usize)?
1110                } else {
1111                    return None;
1112                }
1113            }
1114            _ => return None,
1115        };
1116    }
1117    Some(cur)
1118}
1119
1120fn json_contains(left: &serde_json::Value, right: &serde_json::Value) -> bool {
1121    match (left, right) {
1122        (serde_json::Value::Object(a), serde_json::Value::Object(b)) => b
1123            .iter()
1124            .all(|(k, v)| a.get(k).is_some_and(|av| json_contains(av, v))),
1125        (serde_json::Value::Array(a), serde_json::Value::Array(b)) => {
1126            b.iter().all(|bv| a.iter().any(|av| json_contains(av, bv)))
1127        }
1128        (serde_json::Value::Array(a), other) => a.iter().any(|av| json_contains(av, other)),
1129        (a, b) => a == b,
1130    }
1131}
1132
1133fn text_array(v: &Value) -> Result<Vec<String>> {
1134    match v {
1135        Value::Text(s) => Ok(vec![s.to_string()]),
1136        Value::Json(_) | Value::Jsonb(_) => {
1137            let j = value_to_serde(v)?;
1138            j.as_array()
1139                .ok_or_else(|| SqlError::InvalidValue("expected JSON text array".into()))?
1140                .iter()
1141                .map(|e| match e {
1142                    serde_json::Value::String(s) => Ok(s.clone()),
1143                    _ => Err(SqlError::InvalidValue(
1144                        "array elements must be strings".into(),
1145                    )),
1146                })
1147                .collect()
1148        }
1149        _ => Err(SqlError::TypeMismatch {
1150            expected: "TEXT array or JSON array".into(),
1151            got: v.data_type().to_string(),
1152        }),
1153    }
1154}
1155
1156pub fn agg_array(values: &[Value], target: crate::types::DataType) -> Result<Value> {
1157    let items: Result<Vec<serde_json::Value>> = values.iter().map(value_to_serde_lossy).collect();
1158    serde_to_value(serde_json::Value::Array(items?), target)
1159}
1160
1161pub fn materialize_json_table(
1162    source: &Value,
1163    spec: &crate::parser::JsonTableSpec,
1164) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
1165    if source.is_null() {
1166        let names = json_table_column_names(&spec.columns);
1167        return Ok((names, vec![]));
1168    }
1169    let root = value_to_serde(source)?;
1170    let root_segs = parse_dollar_path(&spec.root_path)?;
1171    let matches = json_table_walk(&root, &root_segs);
1172    let mut rows: Vec<Vec<Value>> = Vec::new();
1173    let mut ordinality_counter = 0i64;
1174    for m in matches {
1175        ordinality_counter += 1;
1176        emit_json_table_rows(
1177            &m,
1178            &spec.columns,
1179            ordinality_counter,
1180            &mut Vec::new(),
1181            &mut rows,
1182        )?;
1183    }
1184    let names = json_table_column_names(&spec.columns);
1185    Ok((names, rows))
1186}
1187
1188fn json_table_column_names(columns: &[crate::parser::JsonTableCol]) -> Vec<String> {
1189    use crate::parser::JsonTableCol as C;
1190    let mut out = Vec::new();
1191    for c in columns {
1192        match c {
1193            C::Named { name, .. } | C::Ordinality { name } => out.push(name.clone()),
1194            C::Nested { columns, .. } => out.extend(json_table_column_names(columns)),
1195        }
1196    }
1197    out
1198}
1199
1200fn json_table_walk(j: &serde_json::Value, segs: &[PathSeg]) -> Vec<serde_json::Value> {
1201    let mut frontier = vec![j.clone()];
1202    for seg in segs {
1203        let mut next = Vec::new();
1204        for cur in frontier {
1205            match (cur, seg) {
1206                (serde_json::Value::Object(m), PathSeg::Key(k)) => {
1207                    if let Some(v) = m.get(k.as_str()) {
1208                        next.push(v.clone());
1209                    }
1210                }
1211                (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1212                    let len = arr.len() as i64;
1213                    let idx = if *i < 0 { len + i } else { *i };
1214                    if (0..len).contains(&idx) {
1215                        next.push(arr[idx as usize].clone());
1216                    }
1217                }
1218                (serde_json::Value::Array(arr), PathSeg::Wildcard) => {
1219                    next.extend(arr);
1220                }
1221                _ => {}
1222            }
1223        }
1224        frontier = next;
1225    }
1226    frontier
1227        .into_iter()
1228        .flat_map(|v| match v {
1229            serde_json::Value::Array(arr) if segs.last() == Some(&PathSeg::Wildcard) => arr,
1230            other => vec![other],
1231        })
1232        .collect()
1233}
1234
1235fn emit_json_table_rows(
1236    row_doc: &serde_json::Value,
1237    columns: &[crate::parser::JsonTableCol],
1238    parent_ordinality: i64,
1239    _prefix: &mut Vec<Value>,
1240    out: &mut Vec<Vec<Value>>,
1241) -> Result<()> {
1242    use crate::parser::JsonTableCol as C;
1243
1244    let mut scalars: Vec<(usize, Value)> = Vec::new();
1245    let mut nesteds: Vec<(usize, Vec<Vec<Value>>)> = Vec::new();
1246    let mut widths: Vec<usize> = Vec::with_capacity(columns.len());
1247
1248    for (idx, c) in columns.iter().enumerate() {
1249        match c {
1250            C::Named {
1251                ty, path, exists, ..
1252            } => {
1253                let segs = parse_dollar_path(path)?;
1254                let matches = json_table_walk(row_doc, &segs);
1255                let v = if *exists {
1256                    Value::Boolean(!matches.is_empty())
1257                } else if matches.is_empty() {
1258                    Value::Null
1259                } else {
1260                    json_table_coerce(&matches[0], *ty)?
1261                };
1262                scalars.push((idx, v));
1263                widths.push(1);
1264            }
1265            C::Ordinality { .. } => {
1266                scalars.push((idx, Value::Integer(parent_ordinality)));
1267                widths.push(1);
1268            }
1269            C::Nested { path, columns } => {
1270                let segs = parse_dollar_path(path)?;
1271                let matches = json_table_walk(row_doc, &segs);
1272                let inner_width = json_table_column_names(columns).len();
1273                let mut inner: Vec<Vec<Value>> = Vec::new();
1274                let mut ord = 0i64;
1275                for m in matches {
1276                    ord += 1;
1277                    let mut empty_prefix: Vec<Value> = Vec::new();
1278                    emit_json_table_rows(&m, columns, ord, &mut empty_prefix, &mut inner)?;
1279                }
1280                if inner.is_empty() {
1281                    inner.push(vec![Value::Null; inner_width]);
1282                }
1283                nesteds.push((idx, inner));
1284                widths.push(inner_width);
1285            }
1286        }
1287    }
1288
1289    let offsets: Vec<usize> = widths
1290        .iter()
1291        .scan(0usize, |acc, w| {
1292            let cur = *acc;
1293            *acc += w;
1294            Some(cur)
1295        })
1296        .collect();
1297    let total: usize = widths.iter().sum();
1298
1299    if nesteds.is_empty() {
1300        let mut row = vec![Value::Null; total];
1301        for (idx, v) in scalars {
1302            row[offsets[idx]] = v;
1303        }
1304        out.push(row);
1305        return Ok(());
1306    }
1307
1308    let mut indices = vec![0usize; nesteds.len()];
1309    loop {
1310        let mut row = vec![Value::Null; total];
1311        for (idx, v) in &scalars {
1312            row[offsets[*idx]] = v.clone();
1313        }
1314        for (ni, (col_idx, group)) in nesteds.iter().enumerate() {
1315            let off = offsets[*col_idx];
1316            let inner_row = &group[indices[ni]];
1317            for (k, v) in inner_row.iter().enumerate() {
1318                row[off + k] = v.clone();
1319            }
1320        }
1321        out.push(row);
1322
1323        let mut k = indices.len();
1324        let done = loop {
1325            if k == 0 {
1326                break true;
1327            }
1328            k -= 1;
1329            indices[k] += 1;
1330            if indices[k] < nesteds[k].1.len() {
1331                break false;
1332            }
1333            indices[k] = 0;
1334        };
1335        if done {
1336            return Ok(());
1337        }
1338    }
1339}
1340
1341fn json_table_coerce(v: &serde_json::Value, target: crate::types::DataType) -> Result<Value> {
1342    use crate::types::DataType;
1343    if matches!(v, serde_json::Value::Null) {
1344        return Ok(Value::Null);
1345    }
1346    match (v, target) {
1347        (_, DataType::Json) => serde_to_value(v.clone(), DataType::Json),
1348        (_, DataType::Jsonb) => serde_to_value(v.clone(), DataType::Jsonb),
1349        (serde_json::Value::Number(n), DataType::Integer) => n
1350            .as_i64()
1351            .map(Value::Integer)
1352            .ok_or_else(|| SqlError::InvalidValue("JSON_TABLE: number not i64".into())),
1353        (serde_json::Value::Number(n), DataType::Real) => n
1354            .as_f64()
1355            .map(Value::Real)
1356            .ok_or_else(|| SqlError::InvalidValue("JSON_TABLE: number not f64".into())),
1357        (serde_json::Value::Bool(b), DataType::Boolean) => Ok(Value::Boolean(*b)),
1358        (serde_json::Value::String(s), DataType::Text) => Ok(Value::Text(s.clone().into())),
1359        _ => {
1360            let text_form = match v {
1361                serde_json::Value::String(s) => s.clone(),
1362                _ => serde_json::to_string(v)
1363                    .map_err(|e| SqlError::InvalidValue(format!("JSON_TABLE render: {e}")))?,
1364            };
1365            let text_val = Value::Text(text_form.into());
1366            text_val.coerce_into(target).ok_or_else(|| {
1367                SqlError::InvalidValue(format!("JSON_TABLE: cannot coerce value to {target}"))
1368            })
1369        }
1370    }
1371}
1372
1373/// GIN entry layout (jsonb_ops): `0x01‖key` (key-exists, `?`),
1374/// `0x02‖key‖0x00‖value` (pair, `@>`), `0x03‖value` (array element).
1375pub fn extract_gin_entries(value: &Value, ops: crate::types::GinOpsClass) -> Result<Vec<Vec<u8>>> {
1376    if value.is_null() {
1377        return Ok(vec![]);
1378    }
1379    let j = value_to_serde(value)?;
1380    let mut out: Vec<Vec<u8>> = Vec::new();
1381    extract_gin_walk(&j, ops, &mut out);
1382    out.sort();
1383    out.dedup();
1384    Ok(out)
1385}
1386
1387fn extract_gin_walk(j: &serde_json::Value, ops: crate::types::GinOpsClass, out: &mut Vec<Vec<u8>>) {
1388    use crate::types::GinOpsClass;
1389    match j {
1390        serde_json::Value::Object(m) => {
1391            for (k, v) in m {
1392                if matches!(ops, GinOpsClass::JsonbOps) {
1393                    let mut key_entry = Vec::with_capacity(k.len() + 1);
1394                    key_entry.push(0x01);
1395                    key_entry.extend_from_slice(k.as_bytes());
1396                    out.push(key_entry);
1397                }
1398                if let Some(s) = scalar_repr(v) {
1399                    let mut pair = Vec::with_capacity(k.len() + s.len() + 2);
1400                    pair.push(0x02);
1401                    pair.extend_from_slice(k.as_bytes());
1402                    pair.push(0x00);
1403                    pair.extend_from_slice(s.as_bytes());
1404                    out.push(pair);
1405                }
1406                extract_gin_walk(v, ops, out);
1407            }
1408        }
1409        serde_json::Value::Array(arr) => {
1410            for v in arr {
1411                if let Some(s) = scalar_repr(v) {
1412                    let mut entry = Vec::with_capacity(s.len() + 1);
1413                    entry.push(0x03);
1414                    entry.extend_from_slice(s.as_bytes());
1415                    out.push(entry);
1416                }
1417                extract_gin_walk(v, ops, out);
1418            }
1419        }
1420        _ => {}
1421    }
1422}
1423
1424fn scalar_repr(v: &serde_json::Value) -> Option<String> {
1425    match v {
1426        serde_json::Value::Null => Some("null".into()),
1427        serde_json::Value::Bool(true) => Some("true".into()),
1428        serde_json::Value::Bool(false) => Some("false".into()),
1429        serde_json::Value::String(s) => Some(s.clone()),
1430        serde_json::Value::Number(n) => Some(n.to_string()),
1431        _ => None,
1432    }
1433}
1434
1435pub fn agg_object(pairs: &[(Value, Value)], target: crate::types::DataType) -> Result<Value> {
1436    let mut map = serde_json::Map::new();
1437    for (k, v) in pairs {
1438        if k.is_null() {
1439            continue;
1440        }
1441        let key_str = match k {
1442            Value::Text(s) => s.to_string(),
1443            Value::Json(s) => s.to_string(),
1444            _ => format!("{k}"),
1445        };
1446        let val = value_to_serde_lossy(v)?;
1447        map.insert(key_str, val);
1448    }
1449    serde_to_value(serde_json::Value::Object(map), target)
1450}
1451
1452pub fn dispatch_srf(name: &str, args: &[Value]) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
1453    let upper = name.to_ascii_uppercase();
1454    match upper.as_str() {
1455        "JSONB_ARRAY_ELEMENTS" | "JSON_ARRAY_ELEMENTS" => {
1456            if args.len() != 1 {
1457                return Err(SqlError::InvalidValue(format!(
1458                    "{name} requires 1 argument"
1459                )));
1460            }
1461            if args[0].is_null() {
1462                return Ok((vec!["value".into()], vec![]));
1463            }
1464            let j = value_to_serde(&args[0])?;
1465            let arr = j
1466                .as_array()
1467                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON array")))?;
1468            let target = if upper.starts_with("JSONB") {
1469                crate::types::DataType::Jsonb
1470            } else {
1471                crate::types::DataType::Json
1472            };
1473            let rows: Result<Vec<Vec<Value>>> = arr
1474                .iter()
1475                .map(|v| serde_to_value(v.clone(), target).map(|val| vec![val]))
1476                .collect();
1477            Ok((vec!["value".into()], rows?))
1478        }
1479        "JSONB_ARRAY_ELEMENTS_TEXT" | "JSON_ARRAY_ELEMENTS_TEXT" => {
1480            if args.len() != 1 {
1481                return Err(SqlError::InvalidValue(format!(
1482                    "{name} requires 1 argument"
1483                )));
1484            }
1485            if args[0].is_null() {
1486                return Ok((vec!["value".into()], vec![]));
1487            }
1488            let j = value_to_serde(&args[0])?;
1489            let arr = j
1490                .as_array()
1491                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON array")))?;
1492            let rows: Vec<Vec<Value>> = arr
1493                .iter()
1494                .map(|v| {
1495                    let text = match v {
1496                        serde_json::Value::String(s) => s.clone(),
1497                        _ => serde_json::to_string(v).unwrap_or_default(),
1498                    };
1499                    vec![Value::Text(text.into())]
1500                })
1501                .collect();
1502            Ok((vec!["value".into()], rows))
1503        }
1504        "JSONB_EACH" | "JSON_EACH" => {
1505            if args.len() != 1 {
1506                return Err(SqlError::InvalidValue(format!(
1507                    "{name} requires 1 argument"
1508                )));
1509            }
1510            if args[0].is_null() {
1511                return Ok((vec!["key".into(), "value".into()], vec![]));
1512            }
1513            let j = value_to_serde(&args[0])?;
1514            let obj = j
1515                .as_object()
1516                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON object")))?;
1517            let target = if upper.starts_with("JSONB") {
1518                crate::types::DataType::Jsonb
1519            } else {
1520                crate::types::DataType::Json
1521            };
1522            let rows: Result<Vec<Vec<Value>>> = obj
1523                .iter()
1524                .map(|(k, v)| {
1525                    Ok(vec![
1526                        Value::Text(k.clone().into()),
1527                        serde_to_value(v.clone(), target)?,
1528                    ])
1529                })
1530                .collect();
1531            Ok((vec!["key".into(), "value".into()], rows?))
1532        }
1533        "JSONB_EACH_TEXT" | "JSON_EACH_TEXT" => {
1534            if args.len() != 1 {
1535                return Err(SqlError::InvalidValue(format!(
1536                    "{name} requires 1 argument"
1537                )));
1538            }
1539            if args[0].is_null() {
1540                return Ok((vec!["key".into(), "value".into()], vec![]));
1541            }
1542            let j = value_to_serde(&args[0])?;
1543            let obj = j
1544                .as_object()
1545                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON object")))?;
1546            let rows: Vec<Vec<Value>> = obj
1547                .iter()
1548                .map(|(k, v)| {
1549                    let text = match v {
1550                        serde_json::Value::String(s) => s.clone(),
1551                        _ => serde_json::to_string(v).unwrap_or_default(),
1552                    };
1553                    vec![Value::Text(k.clone().into()), Value::Text(text.into())]
1554                })
1555                .collect();
1556            Ok((vec!["key".into(), "value".into()], rows))
1557        }
1558        "JSONB_OBJECT_KEYS" | "JSON_OBJECT_KEYS" => {
1559            if args.len() != 1 {
1560                return Err(SqlError::InvalidValue(format!(
1561                    "{name} requires 1 argument"
1562                )));
1563            }
1564            if args[0].is_null() {
1565                return Ok((vec!["key".into()], vec![]));
1566            }
1567            let j = value_to_serde(&args[0])?;
1568            let obj = j
1569                .as_object()
1570                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON object")))?;
1571            let rows: Vec<Vec<Value>> = obj
1572                .keys()
1573                .map(|k| vec![Value::Text(k.clone().into())])
1574                .collect();
1575            Ok((vec!["key".into()], rows))
1576        }
1577        _ => Err(SqlError::Unsupported(format!(
1578            "set-returning function: {name}"
1579        ))),
1580    }
1581}
1582
1583pub fn is_srf_name(name: &str) -> bool {
1584    matches!(
1585        name.to_ascii_uppercase().as_str(),
1586        "JSONB_ARRAY_ELEMENTS"
1587            | "JSON_ARRAY_ELEMENTS"
1588            | "JSONB_ARRAY_ELEMENTS_TEXT"
1589            | "JSON_ARRAY_ELEMENTS_TEXT"
1590            | "JSONB_EACH"
1591            | "JSON_EACH"
1592            | "JSONB_EACH_TEXT"
1593            | "JSON_EACH_TEXT"
1594            | "JSONB_OBJECT_KEYS"
1595            | "JSON_OBJECT_KEYS"
1596    )
1597}
1598
1599pub fn extract_to_value(target: crate::types::DataType, j: serde_json::Value) -> Result<Value> {
1600    serde_to_value(j, target)
1601}
1602
1603pub fn to_scalar(j: serde_json::Value) -> Value {
1604    serde_to_scalar_value(j)
1605}
1606
1607pub fn fn_typeof(v: &Value) -> Result<Value> {
1608    if let Value::Jsonb(b) = v {
1609        let (ty, _, _) = read_header(b)?;
1610        let s = match ty {
1611            JsonbType::Null => "null",
1612            JsonbType::True | JsonbType::False => "boolean",
1613            JsonbType::Integer | JsonbType::Real => "number",
1614            JsonbType::String => "string",
1615            JsonbType::Array => "array",
1616            JsonbType::Object => "object",
1617        };
1618        return Ok(Value::Text(s.into()));
1619    }
1620    let j = value_to_serde(v)?;
1621    let s = match j {
1622        serde_json::Value::Null => "null",
1623        serde_json::Value::Bool(_) => "boolean",
1624        serde_json::Value::Number(_) => "number",
1625        serde_json::Value::String(_) => "string",
1626        serde_json::Value::Array(_) => "array",
1627        serde_json::Value::Object(_) => "object",
1628    };
1629    Ok(Value::Text(s.into()))
1630}
1631
1632pub fn fn_array_length(v: &Value) -> Result<Value> {
1633    if let Value::Jsonb(b) = v {
1634        return match array_len_bytes(b)? {
1635            Some(n) => Ok(Value::Integer(n as i64)),
1636            None => Err(SqlError::InvalidValue(
1637                "jsonb_array_length called on non-array".into(),
1638            )),
1639        };
1640    }
1641    let j = value_to_serde(v)?;
1642    match j {
1643        serde_json::Value::Array(arr) => Ok(Value::Integer(arr.len() as i64)),
1644        _ => Err(SqlError::InvalidValue(
1645            "jsonb_array_length called on non-array".into(),
1646        )),
1647    }
1648}
1649
1650pub fn fn_object_length(v: &Value) -> Result<Value> {
1651    if let Value::Jsonb(b) = v {
1652        return match object_len_bytes(b)? {
1653            Some(n) => Ok(Value::Integer(n as i64)),
1654            None => Err(SqlError::InvalidValue(
1655                "jsonb_object_length called on non-object".into(),
1656            )),
1657        };
1658    }
1659    let j = value_to_serde(v)?;
1660    match j {
1661        serde_json::Value::Object(m) => Ok(Value::Integer(m.len() as i64)),
1662        _ => Err(SqlError::InvalidValue(
1663            "jsonb_object_length called on non-object".into(),
1664        )),
1665    }
1666}
1667
1668pub fn fn_extract_path(
1669    args: &[Value],
1670    target: crate::types::DataType,
1671    as_text: bool,
1672) -> Result<Value> {
1673    let mut j = value_to_serde(&args[0])?;
1674    for key_val in &args[1..] {
1675        if key_val.is_null() {
1676            return Ok(Value::Null);
1677        }
1678        let key = match key_val {
1679            Value::Text(s) => s.to_string(),
1680            other => other.to_string(),
1681        };
1682        match &mut j {
1683            serde_json::Value::Object(m) => {
1684                if let Some(next) = m.remove(&key) {
1685                    j = next;
1686                } else {
1687                    return Ok(Value::Null);
1688                }
1689            }
1690            serde_json::Value::Array(arr) => {
1691                let idx: i64 = key.parse().map_err(|_| {
1692                    SqlError::InvalidValue(format!("array path key not integer: {key}"))
1693                })?;
1694                let len = arr.len() as i64;
1695                let idx = if idx < 0 { len + idx } else { idx };
1696                if (0..len).contains(&idx) {
1697                    j = arr.remove(idx as usize);
1698                } else {
1699                    return Ok(Value::Null);
1700                }
1701            }
1702            _ => return Ok(Value::Null),
1703        }
1704    }
1705    if as_text {
1706        match j {
1707            serde_json::Value::Null => Ok(Value::Null),
1708            serde_json::Value::String(s) => Ok(Value::Text(s.into())),
1709            other => Ok(Value::Text(
1710                serde_json::to_string(&other)
1711                    .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
1712                    .into(),
1713            )),
1714        }
1715    } else {
1716        serde_to_value(j, target)
1717    }
1718}
1719
1720pub fn fn_sqlite_extract(j_val: &Value, path: &Value) -> Result<Value> {
1721    let path_str = match path {
1722        Value::Text(s) => s.to_string(),
1723        _ => {
1724            return Err(SqlError::TypeMismatch {
1725                expected: "TEXT path".into(),
1726                got: path.data_type().to_string(),
1727            })
1728        }
1729    };
1730    let j = value_to_serde(j_val)?;
1731    let segments = parse_dollar_path(&path_str)?;
1732    match navigate_path(&j, &segments) {
1733        Some(serde_json::Value::Null) => Ok(Value::Null),
1734        Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
1735        Some(other) => Ok(Value::Text(
1736            serde_json::to_string(&other)
1737                .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
1738                .into(),
1739        )),
1740        None => Ok(Value::Null),
1741    }
1742}
1743
1744pub fn fn_valid(v: &Value) -> Result<Value> {
1745    let s = match v {
1746        Value::Text(s) => s.as_str(),
1747        Value::Json(s) => s.as_str(),
1748        _ => return Ok(Value::Boolean(false)),
1749    };
1750    Ok(Value::Boolean(
1751        serde_json::from_str::<serde_json::Value>(s).is_ok(),
1752    ))
1753}
1754
1755pub fn fn_strip_nulls(v: &Value, target: crate::types::DataType) -> Result<Value> {
1756    let mut j = value_to_serde(v)?;
1757    strip_nulls_inplace(&mut j);
1758    serde_to_value(j, target)
1759}
1760
1761fn strip_nulls_inplace(j: &mut serde_json::Value) {
1762    match j {
1763        serde_json::Value::Object(m) => {
1764            m.retain(|_, v| !matches!(v, serde_json::Value::Null));
1765            for v in m.values_mut() {
1766                strip_nulls_inplace(v);
1767            }
1768        }
1769        serde_json::Value::Array(arr) => {
1770            for v in arr.iter_mut() {
1771                strip_nulls_inplace(v);
1772            }
1773        }
1774        _ => {}
1775    }
1776}
1777
1778pub fn fn_pretty(v: &Value) -> Result<Value> {
1779    let j = value_to_serde(v)?;
1780    let s = serde_json::to_string_pretty(&j)
1781        .map_err(|e| SqlError::InvalidValue(format!("JSON pretty render: {e}")))?;
1782    Ok(Value::Text(s.into()))
1783}
1784
1785pub fn fn_build_object(args: &[Value], target: crate::types::DataType) -> Result<Value> {
1786    if args.len() % 2 != 0 {
1787        return Err(SqlError::InvalidValue(
1788            "jsonb_build_object requires an even number of arguments".into(),
1789        ));
1790    }
1791    let mut map = serde_json::Map::new();
1792    for pair in args.chunks(2) {
1793        let key = match &pair[0] {
1794            Value::Null => continue,
1795            Value::Text(s) => s.to_string(),
1796            other => other.to_string(),
1797        };
1798        let val = value_to_serde_lossy(&pair[1])?;
1799        map.insert(key, val);
1800    }
1801    serde_to_value(serde_json::Value::Object(map), target)
1802}
1803
1804pub fn fn_build_array(args: &[Value], target: crate::types::DataType) -> Result<Value> {
1805    let items: Result<Vec<serde_json::Value>> = args.iter().map(value_to_serde_lossy).collect();
1806    serde_to_value(serde_json::Value::Array(items?), target)
1807}
1808
1809pub fn fn_set(
1810    j: &Value,
1811    path: &Value,
1812    new_value: &Value,
1813    create_missing: bool,
1814    target: crate::types::DataType,
1815) -> Result<Value> {
1816    let mut root = value_to_serde(j)?;
1817    let segments = path_to_segments(path)?;
1818    let new_serde = value_to_serde_lossy(new_value)?;
1819    if !set_at_path(&mut root, &segments, new_serde, create_missing, false) {
1820        return serde_to_value(root, target);
1821    }
1822    serde_to_value(root, target)
1823}
1824
1825pub fn fn_insert(
1826    j: &Value,
1827    path: &Value,
1828    new_value: &Value,
1829    insert_after: bool,
1830    target: crate::types::DataType,
1831) -> Result<Value> {
1832    let mut root = value_to_serde(j)?;
1833    let segments = path_to_segments(path)?;
1834    let new_serde = value_to_serde_lossy(new_value)?;
1835    set_at_path(&mut root, &segments, new_serde, true, insert_after);
1836    serde_to_value(root, target)
1837}
1838
1839fn set_at_path(
1840    root: &mut serde_json::Value,
1841    segments: &[PathSeg],
1842    new_value: serde_json::Value,
1843    create_missing: bool,
1844    insert_array: bool,
1845) -> bool {
1846    if segments.is_empty() {
1847        return false;
1848    }
1849    let (last, prefix) = segments.split_last().unwrap();
1850    let Some(target) = navigate_mut(root, prefix) else {
1851        return false;
1852    };
1853    match (target, last) {
1854        (serde_json::Value::Object(m), PathSeg::Key(k)) => {
1855            let exists = m.contains_key(k.as_str());
1856            if exists || create_missing {
1857                m.insert(k.clone(), new_value);
1858                true
1859            } else {
1860                false
1861            }
1862        }
1863        (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1864            let len = arr.len() as i64;
1865            let idx = if *i < 0 { len + i } else { *i };
1866            if insert_array {
1867                let target_pos = if idx <= 0 {
1868                    0
1869                } else if idx >= len {
1870                    arr.len()
1871                } else {
1872                    idx as usize
1873                };
1874                arr.insert(target_pos, new_value);
1875                true
1876            } else if (0..len).contains(&idx) {
1877                arr[idx as usize] = new_value;
1878                true
1879            } else if create_missing {
1880                if idx < 0 {
1881                    arr.insert(0, new_value);
1882                } else {
1883                    arr.push(new_value);
1884                }
1885                true
1886            } else {
1887                false
1888            }
1889        }
1890        _ => false,
1891    }
1892}
1893
1894pub fn fn_to_json(v: &Value, target: crate::types::DataType) -> Result<Value> {
1895    let j = value_to_serde_lossy(v)?;
1896    serde_to_value(j, target)
1897}
1898
1899pub fn fn_json_object(args: &[Value]) -> Result<Value> {
1900    match args.len() {
1901        1 => {
1902            let j = value_to_serde(&args[0])?;
1903            let arr = j
1904                .as_array()
1905                .ok_or_else(|| SqlError::InvalidValue("json_object expects text array".into()))?;
1906            let mut map = serde_json::Map::new();
1907            let mut i = 0;
1908            while i + 1 < arr.len() {
1909                let key = arr[i]
1910                    .as_str()
1911                    .ok_or_else(|| SqlError::InvalidValue("json_object key must be string".into()))?
1912                    .to_string();
1913                let val = arr[i + 1].clone();
1914                map.insert(key, val);
1915                i += 2;
1916            }
1917            serde_to_value(serde_json::Value::Object(map), crate::types::DataType::Json)
1918        }
1919        2 => {
1920            let keys = text_array(&args[0])?;
1921            let vals = text_array(&args[1])?;
1922            if keys.len() != vals.len() {
1923                return Err(SqlError::InvalidValue(
1924                    "json_object: keys and values must be same length".into(),
1925                ));
1926            }
1927            let mut map = serde_json::Map::new();
1928            for (k, v) in keys.into_iter().zip(vals) {
1929                map.insert(k, serde_json::Value::String(v));
1930            }
1931            serde_to_value(serde_json::Value::Object(map), crate::types::DataType::Json)
1932        }
1933        _ => Err(SqlError::InvalidValue(
1934            "json_object requires 1 or 2 arguments".into(),
1935        )),
1936    }
1937}
1938
1939fn value_to_serde_lossy(v: &Value) -> Result<serde_json::Value> {
1940    match v {
1941        Value::Null => Ok(serde_json::Value::Null),
1942        Value::Boolean(b) => Ok(serde_json::Value::Bool(*b)),
1943        Value::Integer(i) => Ok(serde_json::Value::Number((*i).into())),
1944        Value::Real(r) => serde_json::Number::from_f64(*r)
1945            .map(serde_json::Value::Number)
1946            .ok_or_else(|| SqlError::InvalidValue("non-finite number".into())),
1947        Value::Text(s) => Ok(serde_json::Value::String(s.to_string())),
1948        Value::Json(s) => serde_json::from_str(s)
1949            .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}"))),
1950        Value::Jsonb(b) => decode_to_serde(b),
1951        Value::Blob(b) => {
1952            let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect();
1953            Ok(serde_json::Value::String(hex))
1954        }
1955        Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. } => {
1956            Ok(serde_json::Value::String(format!("{v}")))
1957        }
1958    }
1959}
1960
1961#[cfg(test)]
1962#[path = "json_tests.rs"]
1963mod tests;