Skip to main content

citadel_sql/
json.rs

1use std::sync::Arc;
2
3use crate::error::{Result, SqlError};
4use crate::types::Value;
5
6#[repr(u8)]
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum JsonbType {
9    Null = 0,
10    True = 1,
11    False = 2,
12    Integer = 3,
13    Real = 4,
14    String = 5,
15    Array = 6,
16    Object = 7,
17}
18
19impl JsonbType {
20    fn from_nibble(n: u8) -> Option<Self> {
21        match n {
22            0 => Some(Self::Null),
23            1 => Some(Self::True),
24            2 => Some(Self::False),
25            3 => Some(Self::Integer),
26            4 => Some(Self::Real),
27            5 => Some(Self::String),
28            6 => Some(Self::Array),
29            7 => Some(Self::Object),
30            _ => None,
31        }
32    }
33}
34
35const SIZE_CLASS_U8: u8 = 12;
36const SIZE_CLASS_U16: u8 = 13;
37const SIZE_CLASS_U32: u8 = 14;
38const SIZE_CLASS_U64: u8 = 15;
39
40pub fn validate_text(s: &str) -> Result<()> {
41    serde_json::from_str::<serde_json::Value>(s)
42        .map(|_| ())
43        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}")))
44}
45
46pub fn text_to_jsonb(s: &str) -> Result<Value> {
47    let v: serde_json::Value = serde_json::from_str(s)
48        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}")))?;
49    reject_null_bytes(&v)?;
50    let mut buf = Vec::with_capacity(s.len());
51    encode_canonical(&v, &mut buf)?;
52    Ok(Value::Jsonb(Arc::from(buf)))
53}
54
55fn reject_null_bytes(v: &serde_json::Value) -> Result<()> {
56    match v {
57        serde_json::Value::String(s) if s.contains('\0') => Err(SqlError::InvalidValue(
58            "unsupported Unicode escape sequence \\u0000".into(),
59        )),
60        serde_json::Value::Array(items) => items.iter().try_for_each(reject_null_bytes),
61        serde_json::Value::Object(map) => map.iter().try_for_each(|(k, v)| {
62            if k.contains('\0') {
63                return Err(SqlError::InvalidValue(
64                    "unsupported Unicode escape sequence \\u0000".into(),
65                ));
66            }
67            reject_null_bytes(v)
68        }),
69        _ => Ok(()),
70    }
71}
72
73pub fn decode_to_text(bytes: &[u8]) -> Result<String> {
74    let v = decode_to_serde(bytes)?;
75    serde_json::to_string(&v).map_err(|e| SqlError::InvalidValue(format!("JSONB render: {e}")))
76}
77
78pub fn decode_to_serde(bytes: &[u8]) -> Result<serde_json::Value> {
79    let mut pos = 0;
80    let v = decode_value(bytes, &mut pos)?;
81    if pos != bytes.len() {
82        return Err(SqlError::InvalidValue("trailing bytes in JSONB".into()));
83    }
84    Ok(v)
85}
86
87pub fn encode_canonical(v: &serde_json::Value, out: &mut Vec<u8>) -> Result<()> {
88    match v {
89        serde_json::Value::Null => out.push(header_byte(JsonbType::Null, 0)),
90        serde_json::Value::Bool(true) => out.push(header_byte(JsonbType::True, 0)),
91        serde_json::Value::Bool(false) => out.push(header_byte(JsonbType::False, 0)),
92        serde_json::Value::Number(n) => {
93            if let Some(i) = n.as_i64() {
94                out.push(header_byte(JsonbType::Integer, 0));
95                out.extend_from_slice(&i.to_le_bytes());
96            } else if let Some(f) = n.as_f64() {
97                if f.is_finite() {
98                    out.push(header_byte(JsonbType::Real, 0));
99                    out.extend_from_slice(&f.to_le_bytes());
100                } else {
101                    return Err(SqlError::InvalidValue("non-finite number in JSON".into()));
102                }
103            } else {
104                return Err(SqlError::InvalidValue(format!("unsupported number: {n}")));
105            }
106        }
107        serde_json::Value::String(s) => encode_string(s, out),
108        serde_json::Value::Array(items) => {
109            let mut payload = Vec::new();
110            for item in items {
111                encode_canonical(item, &mut payload)?;
112            }
113            write_header_with_len(JsonbType::Array, payload.len(), out);
114            out.extend_from_slice(&payload);
115        }
116        serde_json::Value::Object(map) => {
117            let mut keys: Vec<&String> = map.keys().collect();
118            keys.sort();
119            let mut payload = Vec::new();
120            for k in keys {
121                encode_string(k, &mut payload);
122                encode_canonical(&map[k], &mut payload)?;
123            }
124            write_header_with_len(JsonbType::Object, payload.len(), out);
125            out.extend_from_slice(&payload);
126        }
127    }
128    Ok(())
129}
130
131fn encode_string(s: &str, out: &mut Vec<u8>) {
132    let bytes = s.as_bytes();
133    write_header_with_len(JsonbType::String, bytes.len(), out);
134    out.extend_from_slice(bytes);
135}
136
137fn header_byte(ty: JsonbType, size_class: u8) -> u8 {
138    debug_assert!(size_class <= 15);
139    (ty as u8) << 4 | (size_class & 0x0F)
140}
141
142fn write_header_with_len(ty: JsonbType, len: usize, out: &mut Vec<u8>) {
143    if len <= 11 {
144        out.push(header_byte(ty, len as u8));
145    } else if len <= u8::MAX as usize {
146        out.push(header_byte(ty, SIZE_CLASS_U8));
147        out.push(len as u8);
148    } else if len <= u16::MAX as usize {
149        out.push(header_byte(ty, SIZE_CLASS_U16));
150        out.extend_from_slice(&(len as u16).to_le_bytes());
151    } else if len <= u32::MAX as usize {
152        out.push(header_byte(ty, SIZE_CLASS_U32));
153        out.extend_from_slice(&(len as u32).to_le_bytes());
154    } else {
155        out.push(header_byte(ty, SIZE_CLASS_U64));
156        out.extend_from_slice(&(len as u64).to_le_bytes());
157    }
158}
159
160pub fn read_header(bytes: &[u8]) -> Result<(JsonbType, usize, usize)> {
161    if bytes.is_empty() {
162        return Err(SqlError::InvalidValue("empty JSONB".into()));
163    }
164    let h = bytes[0];
165    let ty = JsonbType::from_nibble(h >> 4)
166        .ok_or_else(|| SqlError::InvalidValue("invalid JSONB type tag".into()))?;
167    let size_class = h & 0x0F;
168    let (payload_start, payload_len) = match size_class {
169        0..=11 => (1, size_class as usize),
170        SIZE_CLASS_U8 => {
171            if bytes.len() < 2 {
172                return Err(SqlError::InvalidValue("truncated JSONB header".into()));
173            }
174            (2, bytes[1] as usize)
175        }
176        SIZE_CLASS_U16 => {
177            if bytes.len() < 3 {
178                return Err(SqlError::InvalidValue("truncated JSONB header".into()));
179            }
180            (3, u16::from_le_bytes([bytes[1], bytes[2]]) as usize)
181        }
182        SIZE_CLASS_U32 => {
183            if bytes.len() < 5 {
184                return Err(SqlError::InvalidValue("truncated JSONB header".into()));
185            }
186            (
187                5,
188                u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]) as usize,
189            )
190        }
191        SIZE_CLASS_U64 => {
192            if bytes.len() < 9 {
193                return Err(SqlError::InvalidValue("truncated JSONB header".into()));
194            }
195            let arr: [u8; 8] = bytes[1..9].try_into().unwrap();
196            (9, u64::from_le_bytes(arr) as usize)
197        }
198        _ => unreachable!(),
199    };
200    let fixed_payload = match ty {
201        JsonbType::Null | JsonbType::True | JsonbType::False => Some(0),
202        JsonbType::Integer | JsonbType::Real => Some(8),
203        _ => None,
204    };
205    let payload_len = fixed_payload.unwrap_or(payload_len);
206    if payload_start + payload_len > bytes.len() {
207        return Err(SqlError::InvalidValue("JSONB payload truncated".into()));
208    }
209    Ok((ty, payload_start, payload_len))
210}
211
212pub fn skip_value(bytes: &[u8]) -> Result<usize> {
213    let (_ty, payload_start, payload_len) = read_header(bytes)?;
214    Ok(payload_start + payload_len)
215}
216
217pub fn find_object_key<'a>(bytes: &'a [u8], key: &str) -> Result<Option<&'a [u8]>> {
218    let (ty, payload_start, payload_len) = read_header(bytes)?;
219    if ty != JsonbType::Object {
220        return Ok(None);
221    }
222    let payload = &bytes[payload_start..payload_start + payload_len];
223    let key_bytes = key.as_bytes();
224    let mut pos = 0usize;
225    while pos < payload.len() {
226        let (kty, kp_start, kp_len) = read_header(&payload[pos..])?;
227        if kty != JsonbType::String {
228            return Err(SqlError::InvalidValue("JSONB object key not string".into()));
229        }
230        let k_total = kp_start + kp_len;
231        let k_slice = &payload[pos + kp_start..pos + k_total];
232        let value_start = pos + k_total;
233        let value_total = skip_value(&payload[value_start..])?;
234        if k_slice == key_bytes {
235            return Ok(Some(&payload[value_start..value_start + value_total]));
236        }
237        pos = value_start + value_total;
238    }
239    Ok(None)
240}
241
242pub fn array_get(bytes: &[u8], idx: i64) -> Result<Option<&[u8]>> {
243    let (ty, payload_start, payload_len) = read_header(bytes)?;
244    if ty != JsonbType::Array {
245        return Ok(None);
246    }
247    let payload = &bytes[payload_start..payload_start + payload_len];
248    if idx < 0 {
249        let mut elems = Vec::new();
250        let mut pos = 0usize;
251        while pos < payload.len() {
252            let total = skip_value(&payload[pos..])?;
253            elems.push((pos, total));
254            pos += total;
255        }
256        let len = elems.len() as i64;
257        let real = len + idx;
258        if real < 0 {
259            return Ok(None);
260        }
261        let (start, total) = elems[real as usize];
262        return Ok(Some(&payload[start..start + total]));
263    }
264    let mut pos = 0usize;
265    let mut remaining = idx;
266    while pos < payload.len() {
267        let total = skip_value(&payload[pos..])?;
268        if remaining == 0 {
269            return Ok(Some(&payload[pos..pos + total]));
270        }
271        remaining -= 1;
272        pos += total;
273    }
274    Ok(None)
275}
276
277pub fn array_len_bytes(bytes: &[u8]) -> Result<Option<usize>> {
278    let (ty, payload_start, payload_len) = read_header(bytes)?;
279    if ty != JsonbType::Array {
280        return Ok(None);
281    }
282    let payload = &bytes[payload_start..payload_start + payload_len];
283    let mut pos = 0usize;
284    let mut count = 0usize;
285    while pos < payload.len() {
286        pos += skip_value(&payload[pos..])?;
287        count += 1;
288    }
289    Ok(Some(count))
290}
291
292pub fn object_len_bytes(bytes: &[u8]) -> Result<Option<usize>> {
293    let (ty, payload_start, payload_len) = read_header(bytes)?;
294    if ty != JsonbType::Object {
295        return Ok(None);
296    }
297    let payload = &bytes[payload_start..payload_start + payload_len];
298    let mut pos = 0usize;
299    let mut count = 0usize;
300    while pos < payload.len() {
301        pos += skip_value(&payload[pos..])?;
302        pos += skip_value(&payload[pos..])?;
303        count += 1;
304    }
305    Ok(Some(count))
306}
307
308pub fn read_scalar_text(bytes: &[u8]) -> Result<Option<String>> {
309    let (ty, payload_start, payload_len) = read_header(bytes)?;
310    let payload = &bytes[payload_start..payload_start + payload_len];
311    match ty {
312        JsonbType::Null => Ok(None),
313        JsonbType::True => Ok(Some("true".into())),
314        JsonbType::False => Ok(Some("false".into())),
315        JsonbType::Integer => {
316            let arr: [u8; 8] = payload
317                .try_into()
318                .map_err(|_| SqlError::InvalidValue("JSONB integer payload size".into()))?;
319            Ok(Some(i64::from_le_bytes(arr).to_string()))
320        }
321        JsonbType::Real => {
322            let arr: [u8; 8] = payload
323                .try_into()
324                .map_err(|_| SqlError::InvalidValue("JSONB real payload size".into()))?;
325            let f = f64::from_le_bytes(arr);
326            let n = serde_json::Number::from_f64(f)
327                .ok_or_else(|| SqlError::InvalidValue("non-finite JSONB number".into()))?;
328            Ok(Some(n.to_string()))
329        }
330        JsonbType::String => {
331            let s = std::str::from_utf8(payload)
332                .map_err(|_| SqlError::InvalidValue("JSONB string not UTF-8".into()))?;
333            Ok(Some(s.to_string()))
334        }
335        JsonbType::Array | JsonbType::Object => {
336            let v = decode_to_serde(bytes)?;
337            Ok(Some(serde_json::to_string(&v).map_err(|e| {
338                SqlError::InvalidValue(format!("JSON render: {e}"))
339            })?))
340        }
341    }
342}
343
344pub fn jsonb_contains_bytes(lhs: &[u8], rhs: &[u8]) -> Result<bool> {
345    let (lty, lps, lpl) = read_header(lhs)?;
346    let (rty, rps, rpl) = read_header(rhs)?;
347    let lpay = &lhs[lps..lps + lpl];
348    let rpay = &rhs[rps..rps + rpl];
349    match (lty, rty) {
350        (JsonbType::Object, JsonbType::Object) => {
351            let mut rp = 0usize;
352            while rp < rpay.len() {
353                let (_rkty, rkps, rkpl) = read_header(&rpay[rp..])?;
354                let rk_total = rkps + rkpl;
355                let rk = &rpay[rp + rkps..rp + rk_total];
356                let rv_start = rp + rk_total;
357                let rv_total = skip_value(&rpay[rv_start..])?;
358                let rv = &rpay[rv_start..rv_start + rv_total];
359                let mut lp = 0usize;
360                let mut found = false;
361                while lp < lpay.len() {
362                    let (_lkty, lkps, lkpl) = read_header(&lpay[lp..])?;
363                    let lk_total = lkps + lkpl;
364                    let lk = &lpay[lp + lkps..lp + lk_total];
365                    let lv_start = lp + lk_total;
366                    let lv_total = skip_value(&lpay[lv_start..])?;
367                    if lk == rk {
368                        let lv = &lpay[lv_start..lv_start + lv_total];
369                        if !jsonb_contains_bytes(lv, rv)? {
370                            return Ok(false);
371                        }
372                        found = true;
373                        break;
374                    }
375                    lp = lv_start + lv_total;
376                }
377                if !found {
378                    return Ok(false);
379                }
380                rp = rv_start + rv_total;
381            }
382            Ok(true)
383        }
384        (JsonbType::Array, JsonbType::Array) => {
385            let mut rp = 0usize;
386            while rp < rpay.len() {
387                let rv_total = skip_value(&rpay[rp..])?;
388                let rv = &rpay[rp..rp + rv_total];
389                let mut lp = 0usize;
390                let mut found = false;
391                while lp < lpay.len() {
392                    let lv_total = skip_value(&lpay[lp..])?;
393                    let lv = &lpay[lp..lp + lv_total];
394                    if jsonb_contains_bytes(lv, rv)? {
395                        found = true;
396                        break;
397                    }
398                    lp += lv_total;
399                }
400                if !found {
401                    return Ok(false);
402                }
403                rp += rv_total;
404            }
405            Ok(true)
406        }
407        (JsonbType::Array, _) => {
408            let r_total = rps + rpl;
409            let r_full = &rhs[..r_total];
410            let mut lp = 0usize;
411            while lp < lpay.len() {
412                let lv_total = skip_value(&lpay[lp..])?;
413                if &lpay[lp..lp + lv_total] == r_full {
414                    return Ok(true);
415                }
416                lp += lv_total;
417            }
418            Ok(false)
419        }
420        _ => {
421            let l_total = lps + lpl;
422            let r_total = rps + rpl;
423            Ok(lhs[..l_total] == rhs[..r_total])
424        }
425    }
426}
427
428pub fn has_top_key_bytes(bytes: &[u8], key: &str) -> Result<bool> {
429    let (ty, payload_start, payload_len) = read_header(bytes)?;
430    let payload = &bytes[payload_start..payload_start + payload_len];
431    let key_bytes = key.as_bytes();
432    match ty {
433        JsonbType::Object => {
434            let mut pos = 0usize;
435            while pos < payload.len() {
436                let (_kty, kps, kpl) = read_header(&payload[pos..])?;
437                let k_total = kps + kpl;
438                if &payload[pos + kps..pos + k_total] == key_bytes {
439                    return Ok(true);
440                }
441                pos += k_total;
442                pos += skip_value(&payload[pos..])?;
443            }
444            Ok(false)
445        }
446        JsonbType::Array => {
447            let mut pos = 0usize;
448            while pos < payload.len() {
449                let (ety, eps, epl) = read_header(&payload[pos..])?;
450                if ety == JsonbType::String && &payload[pos + eps..pos + eps + epl] == key_bytes {
451                    return Ok(true);
452                }
453                pos += eps + epl;
454            }
455            Ok(false)
456        }
457        _ => Ok(false),
458    }
459}
460
461fn decode_value(bytes: &[u8], pos: &mut usize) -> Result<serde_json::Value> {
462    let (ty, payload_start, payload_len) = read_header(&bytes[*pos..])?;
463    let payload = &bytes[*pos + payload_start..*pos + payload_start + payload_len];
464    let total = payload_start + payload_len;
465    let v = match ty {
466        JsonbType::Null => serde_json::Value::Null,
467        JsonbType::True => serde_json::Value::Bool(true),
468        JsonbType::False => serde_json::Value::Bool(false),
469        JsonbType::Integer => {
470            let arr: [u8; 8] = payload
471                .try_into()
472                .map_err(|_| SqlError::InvalidValue("JSONB integer payload size".into()))?;
473            serde_json::Value::Number(i64::from_le_bytes(arr).into())
474        }
475        JsonbType::Real => {
476            let arr: [u8; 8] = payload
477                .try_into()
478                .map_err(|_| SqlError::InvalidValue("JSONB real payload size".into()))?;
479            let f = f64::from_le_bytes(arr);
480            serde_json::Number::from_f64(f)
481                .map(serde_json::Value::Number)
482                .ok_or_else(|| SqlError::InvalidValue("non-finite JSONB number".into()))?
483        }
484        JsonbType::String => {
485            let s = std::str::from_utf8(payload)
486                .map_err(|_| SqlError::InvalidValue("JSONB string not UTF-8".into()))?;
487            serde_json::Value::String(s.to_string())
488        }
489        JsonbType::Array => {
490            let mut items = Vec::new();
491            let mut child_pos = 0usize;
492            while child_pos < payload.len() {
493                let mut local = child_pos;
494                let item = decode_value(payload, &mut local)?;
495                items.push(item);
496                child_pos = local;
497            }
498            serde_json::Value::Array(items)
499        }
500        JsonbType::Object => {
501            let mut map = serde_json::Map::new();
502            let mut child_pos = 0usize;
503            while child_pos < payload.len() {
504                let mut local = child_pos;
505                let key = match decode_value(payload, &mut local)? {
506                    serde_json::Value::String(s) => s,
507                    _ => return Err(SqlError::InvalidValue("JSONB object key not string".into())),
508                };
509                let value = decode_value(payload, &mut local)?;
510                map.insert(key, value);
511                child_pos = local;
512            }
513            serde_json::Value::Object(map)
514        }
515    };
516    *pos += total;
517    Ok(v)
518}
519
520pub(crate) fn value_to_serde(v: &Value) -> Result<serde_json::Value> {
521    match v {
522        Value::Json(s) => serde_json::from_str(s)
523            .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}"))),
524        Value::Jsonb(b) => decode_to_serde(b),
525        _ => Err(SqlError::TypeMismatch {
526            expected: "JSON or JSONB".into(),
527            got: v.data_type().to_string(),
528        }),
529    }
530}
531
532fn serde_to_value(j: serde_json::Value, target: crate::types::DataType) -> Result<Value> {
533    use crate::types::DataType;
534    match target {
535        DataType::Json => Ok(Value::Json(
536            serde_json::to_string(&j)
537                .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
538                .into(),
539        )),
540        DataType::Jsonb => {
541            let mut buf = Vec::new();
542            encode_canonical(&j, &mut buf)?;
543            Ok(Value::Jsonb(Arc::from(buf)))
544        }
545        _ => Err(SqlError::InvalidValue(format!(
546            "cannot serialize JSON to {target}"
547        ))),
548    }
549}
550
551fn serde_to_scalar_value(j: serde_json::Value) -> Value {
552    match j {
553        serde_json::Value::Null => Value::Null,
554        serde_json::Value::Bool(b) => Value::Boolean(b),
555        serde_json::Value::Number(n) => {
556            if let Some(i) = n.as_i64() {
557                Value::Integer(i)
558            } else if let Some(f) = n.as_f64() {
559                Value::Real(f)
560            } else {
561                Value::Null
562            }
563        }
564        serde_json::Value::String(s) => Value::Text(s.into()),
565        other => {
566            let text = serde_json::to_string(&other).unwrap_or_default();
567            Value::Text(text.into())
568        }
569    }
570}
571
572pub fn op_get(lhs: &Value, key: &Value) -> Result<Value> {
573    if let Value::Jsonb(b) = lhs {
574        let slice = match key {
575            Value::Text(k) => find_object_key(b, k.as_str())?,
576            Value::Integer(i) => array_get(b, *i)?,
577            _ => None,
578        };
579        return match slice {
580            Some(bytes) => Ok(Value::Jsonb(Arc::from(bytes))),
581            None => Ok(Value::Null),
582        };
583    }
584    let target = match lhs {
585        Value::Json(_) => crate::types::DataType::Json,
586        _ => {
587            return Err(SqlError::TypeMismatch {
588                expected: "JSON or JSONB".into(),
589                got: lhs.data_type().to_string(),
590            })
591        }
592    };
593    let j = value_to_serde(lhs)?;
594    let extracted = navigate_one(&j, key);
595    match extracted {
596        Some(v) => serde_to_value(v, target),
597        None => Ok(Value::Null),
598    }
599}
600
601pub fn op_get_text(lhs: &Value, key: &Value) -> Result<Value> {
602    if let Value::Jsonb(b) = lhs {
603        let slice = match key {
604            Value::Text(k) => find_object_key(b, k.as_str())?,
605            Value::Integer(i) => array_get(b, *i)?,
606            _ => None,
607        };
608        return match slice {
609            Some(bytes) => match read_scalar_text(bytes)? {
610                Some(s) => Ok(Value::Text(s.into())),
611                None => Ok(Value::Null),
612            },
613            None => Ok(Value::Null),
614        };
615    }
616    let j = value_to_serde(lhs)?;
617    match navigate_one(&j, key) {
618        Some(serde_json::Value::Null) => Ok(Value::Null),
619        Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
620        Some(v) => Ok(Value::Text(
621            serde_json::to_string(&v)
622                .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
623                .into(),
624        )),
625        None => Ok(Value::Null),
626    }
627}
628
629pub fn op_path(lhs: &Value, path: &Value) -> Result<Value> {
630    let target = match lhs {
631        Value::Json(_) => crate::types::DataType::Json,
632        Value::Jsonb(_) => crate::types::DataType::Jsonb,
633        _ => {
634            return Err(SqlError::TypeMismatch {
635                expected: "JSON or JSONB".into(),
636                got: lhs.data_type().to_string(),
637            })
638        }
639    };
640    let j = value_to_serde(lhs)?;
641    let segments = path_to_segments(path)?;
642    match navigate_path(&j, &segments) {
643        Some(v) => serde_to_value(v, target),
644        None => Ok(Value::Null),
645    }
646}
647
648pub fn op_path_text(lhs: &Value, path: &Value) -> Result<Value> {
649    let j = value_to_serde(lhs)?;
650    let segments = path_to_segments(path)?;
651    match navigate_path(&j, &segments) {
652        Some(serde_json::Value::Null) => Ok(Value::Null),
653        Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
654        Some(v) => Ok(Value::Text(
655            serde_json::to_string(&v)
656                .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
657                .into(),
658        )),
659        None => Ok(Value::Null),
660    }
661}
662
663pub fn op_contains(lhs: &Value, rhs: &Value) -> Result<Value> {
664    if let (Value::Jsonb(l), Value::Jsonb(r)) = (lhs, rhs) {
665        return Ok(Value::Boolean(jsonb_contains_bytes(l, r)?));
666    }
667    let left = value_to_serde(lhs)?;
668    let right = value_to_serde(rhs)?;
669    Ok(Value::Boolean(json_contains(&left, &right)))
670}
671
672pub fn op_contained_by(lhs: &Value, rhs: &Value) -> Result<Value> {
673    if let (Value::Jsonb(l), Value::Jsonb(r)) = (lhs, rhs) {
674        return Ok(Value::Boolean(jsonb_contains_bytes(r, l)?));
675    }
676    let left = value_to_serde(lhs)?;
677    let right = value_to_serde(rhs)?;
678    Ok(Value::Boolean(json_contains(&right, &left)))
679}
680
681pub fn op_has_key(lhs: &Value, rhs: &Value) -> Result<Value> {
682    let key = match rhs {
683        Value::Text(s) => s.as_str(),
684        _ => {
685            return Err(SqlError::TypeMismatch {
686                expected: "TEXT key".into(),
687                got: rhs.data_type().to_string(),
688            })
689        }
690    };
691    if let Value::Jsonb(b) = lhs {
692        return Ok(Value::Boolean(has_top_key_bytes(b, key)?));
693    }
694    let left = value_to_serde(lhs)?;
695    let exists = match &left {
696        serde_json::Value::Object(m) => m.contains_key(key),
697        serde_json::Value::Array(arr) => arr
698            .iter()
699            .any(|e| matches!(e, serde_json::Value::String(s) if s == key)),
700        _ => false,
701    };
702    Ok(Value::Boolean(exists))
703}
704
705pub fn op_has_any_key(lhs: &Value, rhs: &Value) -> Result<Value> {
706    let left = value_to_serde(lhs)?;
707    let keys = text_array(rhs)?;
708    let m = match &left {
709        serde_json::Value::Object(m) => m,
710        _ => return Ok(Value::Boolean(false)),
711    };
712    Ok(Value::Boolean(
713        keys.iter().any(|k| m.contains_key(k.as_str())),
714    ))
715}
716
717pub fn op_has_all_keys(lhs: &Value, rhs: &Value) -> Result<Value> {
718    let left = value_to_serde(lhs)?;
719    let keys = text_array(rhs)?;
720    let m = match &left {
721        serde_json::Value::Object(m) => m,
722        _ => return Ok(Value::Boolean(keys.is_empty())),
723    };
724    Ok(Value::Boolean(
725        keys.iter().all(|k| m.contains_key(k.as_str())),
726    ))
727}
728
729pub fn op_delete_path(lhs: &Value, path: &Value) -> Result<Value> {
730    let target = match lhs {
731        Value::Json(_) => crate::types::DataType::Json,
732        Value::Jsonb(_) => crate::types::DataType::Jsonb,
733        _ => {
734            return Err(SqlError::TypeMismatch {
735                expected: "JSON or JSONB".into(),
736                got: lhs.data_type().to_string(),
737            })
738        }
739    };
740    let mut j = value_to_serde(lhs)?;
741    let segments = path_to_segments(path)?;
742    delete_at_path(&mut j, &segments);
743    serde_to_value(j, target)
744}
745
746pub fn op_delete_one(lhs: &Value, rhs: &Value) -> Result<Value> {
747    let target = match lhs {
748        Value::Json(_) => crate::types::DataType::Json,
749        Value::Jsonb(_) => crate::types::DataType::Jsonb,
750        _ => {
751            return Err(SqlError::TypeMismatch {
752                expected: "JSON or JSONB".into(),
753                got: lhs.data_type().to_string(),
754            })
755        }
756    };
757    let mut j = value_to_serde(lhs)?;
758    match (&mut j, rhs) {
759        (serde_json::Value::Object(m), Value::Text(k)) => {
760            m.remove(k.as_str());
761        }
762        (serde_json::Value::Array(arr), Value::Integer(i)) => {
763            let len = arr.len() as i64;
764            let idx = if *i < 0 { len + i } else { *i };
765            if (0..len).contains(&idx) {
766                arr.remove(idx as usize);
767            }
768        }
769        (serde_json::Value::Array(arr), Value::Text(k)) => {
770            arr.retain(|e| !matches!(e, serde_json::Value::String(s) if s == k.as_str()));
771        }
772        _ => {}
773    }
774    serde_to_value(j, target)
775}
776
777pub fn op_concat(lhs: &Value, rhs: &Value) -> Result<Value> {
778    let target = match (lhs, rhs) {
779        (Value::Jsonb(_), _) | (_, Value::Jsonb(_)) => crate::types::DataType::Jsonb,
780        _ => crate::types::DataType::Json,
781    };
782    let mut left = value_to_serde(lhs)?;
783    let right = value_to_serde(rhs)?;
784    match (&mut left, right) {
785        (serde_json::Value::Object(a), serde_json::Value::Object(b)) => {
786            for (k, v) in b {
787                a.insert(k, v);
788            }
789        }
790        (serde_json::Value::Array(a), serde_json::Value::Array(b)) => {
791            a.extend(b);
792        }
793        (serde_json::Value::Array(a), other) => {
794            a.push(other);
795        }
796        (a, serde_json::Value::Array(mut b)) => {
797            let owned = std::mem::take(a);
798            let mut combined = vec![owned];
799            combined.append(&mut b);
800            *a = serde_json::Value::Array(combined);
801        }
802        (a, b) => {
803            let av = std::mem::take(a);
804            *a = serde_json::Value::Array(vec![av, b]);
805        }
806    }
807    serde_to_value(left, target)
808}
809
810fn coerce_path_arg(v: &Value) -> Result<String> {
811    match v {
812        Value::Text(s) => Ok(s.to_string()),
813        Value::Json(s) => Ok(s.to_string()),
814        _ => Err(SqlError::TypeMismatch {
815            expected: "TEXT path".into(),
816            got: v.data_type().to_string(),
817        }),
818    }
819}
820
821fn coerce_vars_arg(v: &Value) -> Result<Option<serde_json::Value>> {
822    if v.is_null() {
823        return Ok(None);
824    }
825    let j = value_to_serde(v)?;
826    if !j.is_object() {
827        return Err(SqlError::InvalidValue(
828            "jsonpath vars argument must be a JSONB object".into(),
829        ));
830    }
831    Ok(Some(j))
832}
833
834fn coerce_silent_arg(v: &Value) -> Result<bool> {
835    if v.is_null() {
836        return Ok(false);
837    }
838    match v {
839        Value::Boolean(b) => Ok(*b),
840        _ => Err(SqlError::TypeMismatch {
841            expected: "BOOLEAN".into(),
842            got: v.data_type().to_string(),
843        }),
844    }
845}
846
847fn jp_query(
848    j: &serde_json::Value,
849    path_str: &str,
850    vars: Option<&serde_json::Value>,
851    silent: bool,
852) -> Result<Vec<serde_json::Value>> {
853    let jp = sql_json_path::JsonPath::new(path_str)
854        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
855    let result = match vars {
856        Some(v) => jp.query_with_vars(j, v),
857        None => jp.query(j),
858    };
859    match result {
860        Ok(nodes) => Ok(nodes.into_iter().map(|c| c.into_owned()).collect()),
861        Err(e) if silent && e.can_silent() => Ok(vec![]),
862        Err(e) => Err(SqlError::InvalidValue(format!("JSON path eval: {e}"))),
863    }
864}
865
866fn jp_query_first(
867    j: &serde_json::Value,
868    path_str: &str,
869    vars: Option<&serde_json::Value>,
870    silent: bool,
871) -> Result<Option<serde_json::Value>> {
872    let jp = sql_json_path::JsonPath::new(path_str)
873        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
874    let result = match vars {
875        Some(v) => jp.query_first_with_vars(j, v),
876        None => jp.query_first(j),
877    };
878    match result {
879        Ok(opt) => Ok(opt.map(|c| c.into_owned())),
880        Err(e) if silent && e.can_silent() => Ok(None),
881        Err(e) => Err(SqlError::InvalidValue(format!("JSON path eval: {e}"))),
882    }
883}
884
885fn jp_exists(
886    j: &serde_json::Value,
887    path_str: &str,
888    vars: Option<&serde_json::Value>,
889    silent: bool,
890) -> Result<Option<bool>> {
891    let jp = sql_json_path::JsonPath::new(path_str)
892        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
893    let result = match vars {
894        Some(v) => jp.exists_with_vars(j, v),
895        None => jp.exists(j),
896    };
897    match result {
898        Ok(b) => Ok(Some(b)),
899        Err(e) if silent && e.can_silent() => Ok(None),
900        Err(e) => Err(SqlError::InvalidValue(format!("JSON path eval: {e}"))),
901    }
902}
903
904pub fn op_path_exists(lhs: &Value, path: &Value) -> Result<Value> {
905    let j = value_to_serde(lhs)?;
906    let path_str = coerce_path_arg(path)?;
907    let exists = jp_exists(&j, &path_str, None, false)?.unwrap_or(false);
908    Ok(Value::Boolean(exists))
909}
910
911pub fn op_path_match(lhs: &Value, path: &Value) -> Result<Value> {
912    let j = value_to_serde(lhs)?;
913    let path_str = coerce_path_arg(path)?;
914    let nodes = jp_query(&j, &path_str, None, false)?;
915    let truthy = nodes
916        .iter()
917        .any(|node| matches!(node, serde_json::Value::Bool(true)));
918    Ok(Value::Boolean(truthy))
919}
920
921pub fn fn_json_exists(j_val: &Value, path: &Value) -> Result<Value> {
922    op_path_exists(j_val, path)
923}
924
925pub fn fn_json_value(j_val: &Value, path: &Value) -> Result<Value> {
926    let j = value_to_serde(j_val)?;
927    let path_str = coerce_path_arg(path)?;
928    match jp_query_first(&j, &path_str, None, false)? {
929        Some(serde_json::Value::Null) => Ok(Value::Null),
930        Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
931        Some(other) => Ok(Value::Text(
932            serde_json::to_string(&other)
933                .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
934                .into(),
935        )),
936        None => Ok(Value::Null),
937    }
938}
939
940pub fn fn_json_query(j_val: &Value, path: &Value, target: crate::types::DataType) -> Result<Value> {
941    let j = value_to_serde(j_val)?;
942    let path_str = coerce_path_arg(path)?;
943    let nodes = jp_query(&j, &path_str, None, false)?;
944    if nodes.is_empty() {
945        return Ok(Value::Null);
946    }
947    let result_json = if nodes.len() == 1 {
948        nodes[0].clone()
949    } else {
950        serde_json::Value::Array(nodes)
951    };
952    serde_to_value(result_json, target)
953}
954
955pub fn fn_jsonb_path_exists(args: &[Value]) -> Result<Value> {
956    if !(2..=4).contains(&args.len()) {
957        return Err(SqlError::InvalidValue(
958            "jsonb_path_exists: expected 2..=4 arguments".into(),
959        ));
960    }
961    let j = value_to_serde(&args[0])?;
962    let path_str = coerce_path_arg(&args[1])?;
963    let vars = args.get(2).map(coerce_vars_arg).transpose()?.flatten();
964    let silent = args
965        .get(3)
966        .map(coerce_silent_arg)
967        .transpose()?
968        .unwrap_or(false);
969    match jp_exists(&j, &path_str, vars.as_ref(), silent)? {
970        Some(b) => Ok(Value::Boolean(b)),
971        None => Ok(Value::Null),
972    }
973}
974
975pub fn fn_jsonb_path_match(args: &[Value]) -> Result<Value> {
976    if !(2..=4).contains(&args.len()) {
977        return Err(SqlError::InvalidValue(
978            "jsonb_path_match: expected 2..=4 arguments".into(),
979        ));
980    }
981    let j = value_to_serde(&args[0])?;
982    let path_str = coerce_path_arg(&args[1])?;
983    let vars = args.get(2).map(coerce_vars_arg).transpose()?.flatten();
984    let silent = args
985        .get(3)
986        .map(coerce_silent_arg)
987        .transpose()?
988        .unwrap_or(false);
989    let nodes = jp_query(&j, &path_str, vars.as_ref(), silent)?;
990    if nodes.len() != 1 {
991        if silent {
992            return Ok(Value::Null);
993        }
994        return Err(SqlError::InvalidValue(
995            "jsonb_path_match: expected exactly one boolean result".into(),
996        ));
997    }
998    match &nodes[0] {
999        serde_json::Value::Bool(b) => Ok(Value::Boolean(*b)),
1000        _ if silent => Ok(Value::Null),
1001        _ => Err(SqlError::InvalidValue(
1002            "jsonb_path_match: result is not a boolean".into(),
1003        )),
1004    }
1005}
1006
1007pub fn fn_jsonb_path_query_first(args: &[Value]) -> Result<Value> {
1008    if !(2..=4).contains(&args.len()) {
1009        return Err(SqlError::InvalidValue(
1010            "jsonb_path_query_first: expected 2..=4 arguments".into(),
1011        ));
1012    }
1013    let j = value_to_serde(&args[0])?;
1014    let path_str = coerce_path_arg(&args[1])?;
1015    let vars = args.get(2).map(coerce_vars_arg).transpose()?.flatten();
1016    let silent = args
1017        .get(3)
1018        .map(coerce_silent_arg)
1019        .transpose()?
1020        .unwrap_or(false);
1021    match jp_query_first(&j, &path_str, vars.as_ref(), silent)? {
1022        Some(v) => serde_to_value(v, crate::types::DataType::Jsonb),
1023        None => Ok(Value::Null),
1024    }
1025}
1026
1027pub fn fn_jsonb_path_query_array(args: &[Value]) -> Result<Value> {
1028    if !(2..=4).contains(&args.len()) {
1029        return Err(SqlError::InvalidValue(
1030            "jsonb_path_query_array: expected 2..=4 arguments".into(),
1031        ));
1032    }
1033    let j = value_to_serde(&args[0])?;
1034    let path_str = coerce_path_arg(&args[1])?;
1035    let vars = args.get(2).map(coerce_vars_arg).transpose()?.flatten();
1036    let silent = args
1037        .get(3)
1038        .map(coerce_silent_arg)
1039        .transpose()?
1040        .unwrap_or(false);
1041    let nodes = jp_query(&j, &path_str, vars.as_ref(), silent)?;
1042    serde_to_value(
1043        serde_json::Value::Array(nodes),
1044        crate::types::DataType::Jsonb,
1045    )
1046}
1047
1048fn jp_query_tz(
1049    j: &serde_json::Value,
1050    path_str: &str,
1051    vars: Option<&serde_json::Value>,
1052    silent: bool,
1053) -> Result<Vec<serde_json::Value>> {
1054    let jp = sql_json_path::JsonPath::new(path_str)
1055        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
1056    let result = match vars {
1057        Some(v) => jp.query_with_vars_tz(j, v),
1058        None => jp.query_tz(j),
1059    };
1060    match result {
1061        Ok(nodes) => Ok(nodes.into_iter().map(|c| c.into_owned()).collect()),
1062        Err(e) if silent && e.can_silent() => Ok(vec![]),
1063        Err(e) => Err(SqlError::InvalidValue(format!("JSON path eval: {e}"))),
1064    }
1065}
1066
1067fn jp_query_first_tz(
1068    j: &serde_json::Value,
1069    path_str: &str,
1070    vars: Option<&serde_json::Value>,
1071    silent: bool,
1072) -> Result<Option<serde_json::Value>> {
1073    let jp = sql_json_path::JsonPath::new(path_str)
1074        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
1075    let result = match vars {
1076        Some(v) => jp.query_first_with_vars_tz(j, v),
1077        None => jp.query_first_tz(j),
1078    };
1079    match result {
1080        Ok(opt) => Ok(opt.map(|c| c.into_owned())),
1081        Err(e) if silent && e.can_silent() => Ok(None),
1082        Err(e) => Err(SqlError::InvalidValue(format!("JSON path eval: {e}"))),
1083    }
1084}
1085
1086fn jp_exists_tz(
1087    j: &serde_json::Value,
1088    path_str: &str,
1089    vars: Option<&serde_json::Value>,
1090    silent: bool,
1091) -> Result<Option<bool>> {
1092    let jp = sql_json_path::JsonPath::new(path_str)
1093        .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
1094    let result = match vars {
1095        Some(v) => jp.exists_with_vars_tz(j, v),
1096        None => jp.exists_tz(j),
1097    };
1098    match result {
1099        Ok(b) => Ok(Some(b)),
1100        Err(e) if silent && e.can_silent() => Ok(None),
1101        Err(e) => Err(SqlError::InvalidValue(format!("JSON path eval: {e}"))),
1102    }
1103}
1104
1105pub fn fn_jsonb_path_exists_tz(args: &[Value]) -> Result<Value> {
1106    if !(2..=4).contains(&args.len()) {
1107        return Err(SqlError::InvalidValue(
1108            "jsonb_path_exists_tz: expected 2..=4 arguments".into(),
1109        ));
1110    }
1111    let j = value_to_serde(&args[0])?;
1112    let path_str = coerce_path_arg(&args[1])?;
1113    let vars = args.get(2).map(coerce_vars_arg).transpose()?.flatten();
1114    let silent = args
1115        .get(3)
1116        .map(coerce_silent_arg)
1117        .transpose()?
1118        .unwrap_or(false);
1119    match jp_exists_tz(&j, &path_str, vars.as_ref(), silent)? {
1120        Some(b) => Ok(Value::Boolean(b)),
1121        None => Ok(Value::Null),
1122    }
1123}
1124
1125pub fn fn_jsonb_path_match_tz(args: &[Value]) -> Result<Value> {
1126    if !(2..=4).contains(&args.len()) {
1127        return Err(SqlError::InvalidValue(
1128            "jsonb_path_match_tz: expected 2..=4 arguments".into(),
1129        ));
1130    }
1131    let j = value_to_serde(&args[0])?;
1132    let path_str = coerce_path_arg(&args[1])?;
1133    let vars = args.get(2).map(coerce_vars_arg).transpose()?.flatten();
1134    let silent = args
1135        .get(3)
1136        .map(coerce_silent_arg)
1137        .transpose()?
1138        .unwrap_or(false);
1139    let nodes = jp_query_tz(&j, &path_str, vars.as_ref(), silent)?;
1140    if nodes.len() != 1 {
1141        if silent {
1142            return Ok(Value::Null);
1143        }
1144        return Err(SqlError::InvalidValue(
1145            "jsonb_path_match_tz: expected exactly one boolean result".into(),
1146        ));
1147    }
1148    match &nodes[0] {
1149        serde_json::Value::Bool(b) => Ok(Value::Boolean(*b)),
1150        _ if silent => Ok(Value::Null),
1151        _ => Err(SqlError::InvalidValue(
1152            "jsonb_path_match_tz: result is not a boolean".into(),
1153        )),
1154    }
1155}
1156
1157pub fn fn_jsonb_path_query_first_tz(args: &[Value]) -> Result<Value> {
1158    if !(2..=4).contains(&args.len()) {
1159        return Err(SqlError::InvalidValue(
1160            "jsonb_path_query_first_tz: expected 2..=4 arguments".into(),
1161        ));
1162    }
1163    let j = value_to_serde(&args[0])?;
1164    let path_str = coerce_path_arg(&args[1])?;
1165    let vars = args.get(2).map(coerce_vars_arg).transpose()?.flatten();
1166    let silent = args
1167        .get(3)
1168        .map(coerce_silent_arg)
1169        .transpose()?
1170        .unwrap_or(false);
1171    match jp_query_first_tz(&j, &path_str, vars.as_ref(), silent)? {
1172        Some(v) => serde_to_value(v, crate::types::DataType::Jsonb),
1173        None => Ok(Value::Null),
1174    }
1175}
1176
1177pub fn fn_jsonb_path_query_array_tz(args: &[Value]) -> Result<Value> {
1178    if !(2..=4).contains(&args.len()) {
1179        return Err(SqlError::InvalidValue(
1180            "jsonb_path_query_array_tz: expected 2..=4 arguments".into(),
1181        ));
1182    }
1183    let j = value_to_serde(&args[0])?;
1184    let path_str = coerce_path_arg(&args[1])?;
1185    let vars = args.get(2).map(coerce_vars_arg).transpose()?.flatten();
1186    let silent = args
1187        .get(3)
1188        .map(coerce_silent_arg)
1189        .transpose()?
1190        .unwrap_or(false);
1191    let nodes = jp_query_tz(&j, &path_str, vars.as_ref(), silent)?;
1192    serde_to_value(
1193        serde_json::Value::Array(nodes),
1194        crate::types::DataType::Jsonb,
1195    )
1196}
1197
1198pub fn fn_jsonb_path_query_tz(args: &[Value]) -> Result<Value> {
1199    fn_jsonb_path_query_first_tz(args)
1200}
1201
1202#[derive(Debug, Clone, PartialEq, Eq)]
1203pub enum PathSeg {
1204    Key(String),
1205    Index(i64),
1206    Wildcard,
1207}
1208
1209pub fn parse_dollar_path(s: &str) -> Result<Vec<PathSeg>> {
1210    let s = s.trim();
1211    let s = s.strip_prefix('$').unwrap_or(s);
1212    let bytes = s.as_bytes();
1213    let mut out = Vec::new();
1214    let mut i = 0;
1215    while i < bytes.len() {
1216        match bytes[i] {
1217            b'.' => {
1218                i += 1;
1219                let start = i;
1220                while i < bytes.len() && bytes[i] != b'.' && bytes[i] != b'[' {
1221                    i += 1;
1222                }
1223                if i > start {
1224                    let key = std::str::from_utf8(&bytes[start..i])
1225                        .map_err(|_| SqlError::InvalidValue("invalid path segment".into()))?;
1226                    out.push(PathSeg::Key(key.to_string()));
1227                }
1228            }
1229            b'[' => {
1230                i += 1;
1231                let start = i;
1232                while i < bytes.len() && bytes[i] != b']' {
1233                    i += 1;
1234                }
1235                if i > bytes.len() {
1236                    return Err(SqlError::InvalidValue("unterminated index".into()));
1237                }
1238                let inner = std::str::from_utf8(&bytes[start..i])
1239                    .map_err(|_| SqlError::InvalidValue("invalid path index".into()))?;
1240                if inner.trim() == "*" {
1241                    out.push(PathSeg::Wildcard);
1242                } else if let Ok(idx) = inner.parse::<i64>() {
1243                    out.push(PathSeg::Index(idx));
1244                } else {
1245                    let key = inner.trim_matches('"').trim_matches('\'');
1246                    out.push(PathSeg::Key(key.to_string()));
1247                }
1248                i += 1;
1249            }
1250            _ => i += 1,
1251        }
1252    }
1253    Ok(out)
1254}
1255
1256fn path_to_segments(v: &Value) -> Result<Vec<PathSeg>> {
1257    match v {
1258        Value::Text(s) => {
1259            if s.starts_with('$') {
1260                parse_dollar_path(s)
1261            } else if s.starts_with('{') && s.ends_with('}') {
1262                parse_pg_array_path(s)
1263            } else {
1264                Ok(vec![PathSeg::Key(s.to_string())])
1265            }
1266        }
1267        Value::Integer(i) => Ok(vec![PathSeg::Index(*i)]),
1268        Value::Json(_) | Value::Jsonb(_) => {
1269            let parsed = value_to_serde(v)?;
1270            json_to_path(&parsed)
1271        }
1272        _ => Err(SqlError::TypeMismatch {
1273            expected: "TEXT or path array".into(),
1274            got: v.data_type().to_string(),
1275        }),
1276    }
1277}
1278
1279fn parse_pg_array_path(s: &str) -> Result<Vec<PathSeg>> {
1280    let inner = &s[1..s.len() - 1];
1281    if inner.is_empty() {
1282        return Ok(vec![]);
1283    }
1284    inner
1285        .split(',')
1286        .map(|raw| {
1287            let trimmed = raw.trim().trim_matches('"');
1288            if let Ok(idx) = trimmed.parse::<i64>() {
1289                PathSeg::Index(idx)
1290            } else {
1291                PathSeg::Key(trimmed.to_string())
1292            }
1293        })
1294        .map(Ok)
1295        .collect()
1296}
1297
1298fn json_to_path(j: &serde_json::Value) -> Result<Vec<PathSeg>> {
1299    let arr = j
1300        .as_array()
1301        .ok_or_else(|| SqlError::InvalidValue("path must be a JSON array".into()))?;
1302    arr.iter()
1303        .map(|item| match item {
1304            serde_json::Value::String(s) => Ok(PathSeg::Key(s.clone())),
1305            serde_json::Value::Number(n) => n
1306                .as_i64()
1307                .map(PathSeg::Index)
1308                .ok_or_else(|| SqlError::InvalidValue("path index out of range".into())),
1309            _ => Err(SqlError::InvalidValue(
1310                "path segments must be strings or integers".into(),
1311            )),
1312        })
1313        .collect()
1314}
1315
1316fn navigate_one(j: &serde_json::Value, key: &Value) -> Option<serde_json::Value> {
1317    match (j, key) {
1318        (serde_json::Value::Object(m), Value::Text(k)) => m.get(k.as_str()).cloned(),
1319        (serde_json::Value::Array(arr), Value::Integer(i)) => {
1320            let len = arr.len() as i64;
1321            let idx = if *i < 0 { len + i } else { *i };
1322            if (0..len).contains(&idx) {
1323                Some(arr[idx as usize].clone())
1324            } else {
1325                None
1326            }
1327        }
1328        _ => None,
1329    }
1330}
1331
1332fn navigate_path(j: &serde_json::Value, segments: &[PathSeg]) -> Option<serde_json::Value> {
1333    let mut cur = j.clone();
1334    for seg in segments {
1335        cur = match (&cur, seg) {
1336            (serde_json::Value::Object(m), PathSeg::Key(k)) => m.get(k.as_str())?.clone(),
1337            (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1338                let len = arr.len() as i64;
1339                let idx = if *i < 0 { len + i } else { *i };
1340                if (0..len).contains(&idx) {
1341                    arr[idx as usize].clone()
1342                } else {
1343                    return None;
1344                }
1345            }
1346            (serde_json::Value::Array(arr), PathSeg::Key(k)) => {
1347                let idx: i64 = k.parse().ok()?;
1348                let len = arr.len() as i64;
1349                let idx = if idx < 0 { len + idx } else { idx };
1350                if (0..len).contains(&idx) {
1351                    arr[idx as usize].clone()
1352                } else {
1353                    return None;
1354                }
1355            }
1356            _ => return None,
1357        };
1358    }
1359    Some(cur)
1360}
1361
1362fn delete_at_path(j: &mut serde_json::Value, segments: &[PathSeg]) {
1363    if segments.is_empty() {
1364        return;
1365    }
1366    let (last, prefix) = segments.split_last().unwrap();
1367    let target = navigate_mut(j, prefix);
1368    if let Some(t) = target {
1369        match (t, last) {
1370            (serde_json::Value::Object(m), PathSeg::Key(k)) => {
1371                m.remove(k.as_str());
1372            }
1373            (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1374                let len = arr.len() as i64;
1375                let idx = if *i < 0 { len + i } else { *i };
1376                if (0..len).contains(&idx) {
1377                    arr.remove(idx as usize);
1378                }
1379            }
1380            _ => {}
1381        }
1382    }
1383}
1384
1385fn navigate_mut<'a>(
1386    j: &'a mut serde_json::Value,
1387    segments: &[PathSeg],
1388) -> Option<&'a mut serde_json::Value> {
1389    let mut cur = j;
1390    for seg in segments {
1391        cur = match (cur, seg) {
1392            (serde_json::Value::Object(m), PathSeg::Key(k)) => m.get_mut(k.as_str())?,
1393            (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1394                let len = arr.len() as i64;
1395                let idx = if *i < 0 { len + i } else { *i };
1396                if (0..len).contains(&idx) {
1397                    arr.get_mut(idx as usize)?
1398                } else {
1399                    return None;
1400                }
1401            }
1402            _ => return None,
1403        };
1404    }
1405    Some(cur)
1406}
1407
1408fn json_contains(left: &serde_json::Value, right: &serde_json::Value) -> bool {
1409    match (left, right) {
1410        (serde_json::Value::Object(a), serde_json::Value::Object(b)) => b
1411            .iter()
1412            .all(|(k, v)| a.get(k).is_some_and(|av| json_contains(av, v))),
1413        (serde_json::Value::Array(a), serde_json::Value::Array(b)) => {
1414            b.iter().all(|bv| a.iter().any(|av| json_contains(av, bv)))
1415        }
1416        (serde_json::Value::Array(a), other) => a.iter().any(|av| json_contains(av, other)),
1417        (a, b) => a == b,
1418    }
1419}
1420
1421fn text_array(v: &Value) -> Result<Vec<String>> {
1422    match v {
1423        Value::Text(s) => Ok(vec![s.to_string()]),
1424        Value::Json(_) | Value::Jsonb(_) => {
1425            let j = value_to_serde(v)?;
1426            j.as_array()
1427                .ok_or_else(|| SqlError::InvalidValue("expected JSON text array".into()))?
1428                .iter()
1429                .map(|e| match e {
1430                    serde_json::Value::String(s) => Ok(s.clone()),
1431                    _ => Err(SqlError::InvalidValue(
1432                        "array elements must be strings".into(),
1433                    )),
1434                })
1435                .collect()
1436        }
1437        _ => Err(SqlError::TypeMismatch {
1438            expected: "TEXT array or JSON array".into(),
1439            got: v.data_type().to_string(),
1440        }),
1441    }
1442}
1443
1444pub fn agg_array(values: &[Value], target: crate::types::DataType) -> Result<Value> {
1445    let items: Result<Vec<serde_json::Value>> = values.iter().map(value_to_serde_lossy).collect();
1446    serde_to_value(serde_json::Value::Array(items?), target)
1447}
1448
1449pub fn materialize_json_table(
1450    source: &Value,
1451    spec: &crate::parser::JsonTableSpec,
1452) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
1453    if source.is_null() {
1454        let names = json_table_column_names(&spec.columns);
1455        return Ok((names, vec![]));
1456    }
1457    let root = value_to_serde(source)?;
1458    let root_segs = parse_dollar_path(&spec.root_path)?;
1459    let matches = json_table_walk(&root, &root_segs);
1460    let mut rows: Vec<Vec<Value>> = Vec::new();
1461    let mut ordinality_counter = 0i64;
1462    for m in matches {
1463        ordinality_counter += 1;
1464        emit_json_table_rows(
1465            &m,
1466            &spec.columns,
1467            ordinality_counter,
1468            &mut Vec::new(),
1469            &mut rows,
1470        )?;
1471    }
1472    let names = json_table_column_names(&spec.columns);
1473    Ok((names, rows))
1474}
1475
1476fn json_table_column_names(columns: &[crate::parser::JsonTableCol]) -> Vec<String> {
1477    use crate::parser::JsonTableCol as C;
1478    let mut out = Vec::new();
1479    for c in columns {
1480        match c {
1481            C::Named { name, .. } | C::Ordinality { name } => out.push(name.clone()),
1482            C::Nested { columns, .. } => out.extend(json_table_column_names(columns)),
1483        }
1484    }
1485    out
1486}
1487
1488fn json_table_walk(j: &serde_json::Value, segs: &[PathSeg]) -> Vec<serde_json::Value> {
1489    let mut frontier = vec![j.clone()];
1490    for seg in segs {
1491        let mut next = Vec::new();
1492        for cur in frontier {
1493            match (cur, seg) {
1494                (serde_json::Value::Object(m), PathSeg::Key(k)) => {
1495                    if let Some(v) = m.get(k.as_str()) {
1496                        next.push(v.clone());
1497                    }
1498                }
1499                (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1500                    let len = arr.len() as i64;
1501                    let idx = if *i < 0 { len + i } else { *i };
1502                    if (0..len).contains(&idx) {
1503                        next.push(arr[idx as usize].clone());
1504                    }
1505                }
1506                (serde_json::Value::Array(arr), PathSeg::Wildcard) => {
1507                    next.extend(arr);
1508                }
1509                _ => {}
1510            }
1511        }
1512        frontier = next;
1513    }
1514    frontier
1515        .into_iter()
1516        .flat_map(|v| match v {
1517            serde_json::Value::Array(arr) if segs.last() == Some(&PathSeg::Wildcard) => arr,
1518            other => vec![other],
1519        })
1520        .collect()
1521}
1522
1523fn emit_json_table_rows(
1524    row_doc: &serde_json::Value,
1525    columns: &[crate::parser::JsonTableCol],
1526    parent_ordinality: i64,
1527    _prefix: &mut Vec<Value>,
1528    out: &mut Vec<Vec<Value>>,
1529) -> Result<()> {
1530    use crate::parser::JsonTableCol as C;
1531
1532    let mut scalars: Vec<(usize, Value)> = Vec::new();
1533    let mut nesteds: Vec<(usize, Vec<Vec<Value>>)> = Vec::new();
1534    let mut widths: Vec<usize> = Vec::with_capacity(columns.len());
1535
1536    for (idx, c) in columns.iter().enumerate() {
1537        match c {
1538            C::Named {
1539                ty, path, exists, ..
1540            } => {
1541                let segs = parse_dollar_path(path)?;
1542                let matches = json_table_walk(row_doc, &segs);
1543                let v = if *exists {
1544                    Value::Boolean(!matches.is_empty())
1545                } else if matches.is_empty() {
1546                    Value::Null
1547                } else {
1548                    json_table_coerce(&matches[0], *ty)?
1549                };
1550                scalars.push((idx, v));
1551                widths.push(1);
1552            }
1553            C::Ordinality { .. } => {
1554                scalars.push((idx, Value::Integer(parent_ordinality)));
1555                widths.push(1);
1556            }
1557            C::Nested { path, columns } => {
1558                let segs = parse_dollar_path(path)?;
1559                let matches = json_table_walk(row_doc, &segs);
1560                let inner_width = json_table_column_names(columns).len();
1561                let mut inner: Vec<Vec<Value>> = Vec::new();
1562                let mut ord = 0i64;
1563                for m in matches {
1564                    ord += 1;
1565                    let mut empty_prefix: Vec<Value> = Vec::new();
1566                    emit_json_table_rows(&m, columns, ord, &mut empty_prefix, &mut inner)?;
1567                }
1568                if inner.is_empty() {
1569                    inner.push(vec![Value::Null; inner_width]);
1570                }
1571                nesteds.push((idx, inner));
1572                widths.push(inner_width);
1573            }
1574        }
1575    }
1576
1577    let offsets: Vec<usize> = widths
1578        .iter()
1579        .scan(0usize, |acc, w| {
1580            let cur = *acc;
1581            *acc += w;
1582            Some(cur)
1583        })
1584        .collect();
1585    let total: usize = widths.iter().sum();
1586
1587    if nesteds.is_empty() {
1588        let mut row = vec![Value::Null; total];
1589        for (idx, v) in scalars {
1590            row[offsets[idx]] = v;
1591        }
1592        out.push(row);
1593        return Ok(());
1594    }
1595
1596    let mut indices = vec![0usize; nesteds.len()];
1597    loop {
1598        let mut row = vec![Value::Null; total];
1599        for (idx, v) in &scalars {
1600            row[offsets[*idx]] = v.clone();
1601        }
1602        for (ni, (col_idx, group)) in nesteds.iter().enumerate() {
1603            let off = offsets[*col_idx];
1604            let inner_row = &group[indices[ni]];
1605            for (k, v) in inner_row.iter().enumerate() {
1606                row[off + k] = v.clone();
1607            }
1608        }
1609        out.push(row);
1610
1611        let mut k = indices.len();
1612        let done = loop {
1613            if k == 0 {
1614                break true;
1615            }
1616            k -= 1;
1617            indices[k] += 1;
1618            if indices[k] < nesteds[k].1.len() {
1619                break false;
1620            }
1621            indices[k] = 0;
1622        };
1623        if done {
1624            return Ok(());
1625        }
1626    }
1627}
1628
1629fn json_table_coerce(v: &serde_json::Value, target: crate::types::DataType) -> Result<Value> {
1630    use crate::types::DataType;
1631    if matches!(v, serde_json::Value::Null) {
1632        return Ok(Value::Null);
1633    }
1634    match (v, target) {
1635        (_, DataType::Json) => serde_to_value(v.clone(), DataType::Json),
1636        (_, DataType::Jsonb) => serde_to_value(v.clone(), DataType::Jsonb),
1637        (serde_json::Value::Number(n), DataType::Integer) => n
1638            .as_i64()
1639            .map(Value::Integer)
1640            .ok_or_else(|| SqlError::InvalidValue("JSON_TABLE: number not i64".into())),
1641        (serde_json::Value::Number(n), DataType::Real) => n
1642            .as_f64()
1643            .map(Value::Real)
1644            .ok_or_else(|| SqlError::InvalidValue("JSON_TABLE: number not f64".into())),
1645        (serde_json::Value::Bool(b), DataType::Boolean) => Ok(Value::Boolean(*b)),
1646        (serde_json::Value::String(s), DataType::Text) => Ok(Value::Text(s.clone().into())),
1647        _ => {
1648            let text_form = match v {
1649                serde_json::Value::String(s) => s.clone(),
1650                _ => serde_json::to_string(v)
1651                    .map_err(|e| SqlError::InvalidValue(format!("JSON_TABLE render: {e}")))?,
1652            };
1653            let text_val = Value::Text(text_form.into());
1654            text_val.coerce_into(target).ok_or_else(|| {
1655                SqlError::InvalidValue(format!("JSON_TABLE: cannot coerce value to {target}"))
1656            })
1657        }
1658    }
1659}
1660
1661/// GIN entry layout (jsonb_ops): `0x01‖key` (key-exists, `?`),
1662/// `0x02‖key‖0x00‖value` (pair, `@>`), `0x03‖value` (array element).
1663pub fn extract_gin_entries(value: &Value, ops: crate::types::GinOpsClass) -> Result<Vec<Vec<u8>>> {
1664    use crate::types::GinOpsClass;
1665    if value.is_null() {
1666        return Ok(vec![]);
1667    }
1668    let j = value_to_serde(value)?;
1669    let mut out: Vec<Vec<u8>> = Vec::new();
1670    match ops {
1671        GinOpsClass::JsonbOps => extract_jsonb_ops_walk(&j, &mut out),
1672        GinOpsClass::JsonbPathOps => extract_path_ops_walk(&j, 0, &mut out),
1673    }
1674    out.sort();
1675    out.dedup();
1676    Ok(out)
1677}
1678
1679fn extract_jsonb_ops_walk(j: &serde_json::Value, out: &mut Vec<Vec<u8>>) {
1680    match j {
1681        serde_json::Value::Object(m) => {
1682            for (k, v) in m {
1683                let mut key_entry = Vec::with_capacity(k.len() + 1);
1684                key_entry.push(0x01);
1685                key_entry.extend_from_slice(k.as_bytes());
1686                out.push(key_entry);
1687                if let Some(s) = scalar_repr(v) {
1688                    let mut pair = Vec::with_capacity(k.len() + s.len() + 2);
1689                    pair.push(0x02);
1690                    pair.extend_from_slice(k.as_bytes());
1691                    pair.push(0x00);
1692                    pair.extend_from_slice(s.as_bytes());
1693                    out.push(pair);
1694                }
1695                extract_jsonb_ops_walk(v, out);
1696            }
1697        }
1698        serde_json::Value::Array(arr) => {
1699            for v in arr {
1700                if let Some(s) = scalar_repr(v) {
1701                    let mut entry = Vec::with_capacity(s.len() + 1);
1702                    entry.push(0x03);
1703                    entry.extend_from_slice(s.as_bytes());
1704                    out.push(entry);
1705                }
1706                extract_jsonb_ops_walk(v, out);
1707            }
1708        }
1709        _ => {}
1710    }
1711}
1712
1713fn extract_path_ops_walk(j: &serde_json::Value, path: u32, out: &mut Vec<Vec<u8>>) {
1714    match j {
1715        serde_json::Value::Object(m) => {
1716            for (k, v) in m {
1717                let next = path.rotate_left(1) ^ fx_hash_u32(k.as_bytes());
1718                extract_path_ops_walk(v, next, out);
1719            }
1720        }
1721        serde_json::Value::Array(arr) => {
1722            for v in arr {
1723                extract_path_ops_walk(v, path, out);
1724            }
1725        }
1726        _ => {
1727            let leaf = path.rotate_left(1) ^ hash_scalar_for_path(j);
1728            out.push(leaf.to_le_bytes().to_vec());
1729        }
1730    }
1731}
1732
1733fn fx_hash_u32(bytes: &[u8]) -> u32 {
1734    use std::hash::{Hash, Hasher};
1735    let mut h = rustc_hash::FxHasher::default();
1736    bytes.hash(&mut h);
1737    h.finish() as u32
1738}
1739
1740fn hash_scalar_for_path(v: &serde_json::Value) -> u32 {
1741    match v {
1742        serde_json::Value::Null => 0x0000_0001,
1743        serde_json::Value::Bool(true) => 0x0000_0002,
1744        serde_json::Value::Bool(false) => 0x0000_0004,
1745        serde_json::Value::String(s) => fx_hash_u32(s.as_bytes()),
1746        serde_json::Value::Number(n) => {
1747            if let Some(i) = n.as_i64() {
1748                return fx_hash_u32(&i.to_le_bytes());
1749            }
1750            if let Some(u) = n.as_u64() {
1751                return fx_hash_u32(&u.to_le_bytes());
1752            }
1753            let f = n.as_f64().unwrap_or(0.0);
1754            if f.is_finite() && f.fract() == 0.0 && f.abs() < i64::MAX as f64 {
1755                fx_hash_u32(&(f as i64).to_le_bytes())
1756            } else {
1757                fx_hash_u32(&f.to_bits().to_le_bytes())
1758            }
1759        }
1760        _ => 0,
1761    }
1762}
1763
1764fn scalar_repr(v: &serde_json::Value) -> Option<String> {
1765    match v {
1766        serde_json::Value::Null => Some("null".into()),
1767        serde_json::Value::Bool(true) => Some("true".into()),
1768        serde_json::Value::Bool(false) => Some("false".into()),
1769        serde_json::Value::String(s) => Some(s.clone()),
1770        serde_json::Value::Number(n) => Some(n.to_string()),
1771        _ => None,
1772    }
1773}
1774
1775pub fn agg_object(pairs: &[(Value, Value)], target: crate::types::DataType) -> Result<Value> {
1776    let mut map = serde_json::Map::new();
1777    for (k, v) in pairs {
1778        if k.is_null() {
1779            continue;
1780        }
1781        let key_str = match k {
1782            Value::Text(s) => s.to_string(),
1783            Value::Json(s) => s.to_string(),
1784            _ => format!("{k}"),
1785        };
1786        let val = value_to_serde_lossy(v)?;
1787        map.insert(key_str, val);
1788    }
1789    serde_to_value(serde_json::Value::Object(map), target)
1790}
1791
1792pub fn populate_record_row(
1793    obj: &serde_json::Map<String, serde_json::Value>,
1794    columns: &[crate::types::ColumnDef],
1795) -> Result<Vec<Value>> {
1796    columns
1797        .iter()
1798        .map(|col| match obj.get(&col.name) {
1799            None | Some(serde_json::Value::Null) => Ok(Value::Null),
1800            Some(v) => coerce_json_field(v, col.data_type),
1801        })
1802        .collect()
1803}
1804
1805fn coerce_json_field(j: &serde_json::Value, target: crate::types::DataType) -> Result<Value> {
1806    use crate::types::DataType;
1807    match target {
1808        DataType::Json | DataType::Jsonb => serde_to_value(j.clone(), target),
1809        _ => {
1810            let v = serde_to_scalar_value(j.clone());
1811            crate::eval::eval_cast(&v, target)
1812        }
1813    }
1814}
1815
1816pub fn dispatch_srf(name: &str, args: &[Value]) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
1817    let upper = name.to_ascii_uppercase();
1818    match upper.as_str() {
1819        "JSONB_ARRAY_ELEMENTS" | "JSON_ARRAY_ELEMENTS" => {
1820            if args.len() != 1 {
1821                return Err(SqlError::InvalidValue(format!(
1822                    "{name} requires 1 argument"
1823                )));
1824            }
1825            if args[0].is_null() {
1826                return Ok((vec!["value".into()], vec![]));
1827            }
1828            let j = value_to_serde(&args[0])?;
1829            let arr = j
1830                .as_array()
1831                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON array")))?;
1832            let target = if upper.starts_with("JSONB") {
1833                crate::types::DataType::Jsonb
1834            } else {
1835                crate::types::DataType::Json
1836            };
1837            let rows: Result<Vec<Vec<Value>>> = arr
1838                .iter()
1839                .map(|v| serde_to_value(v.clone(), target).map(|val| vec![val]))
1840                .collect();
1841            Ok((vec!["value".into()], rows?))
1842        }
1843        "JSONB_ARRAY_ELEMENTS_TEXT" | "JSON_ARRAY_ELEMENTS_TEXT" => {
1844            if args.len() != 1 {
1845                return Err(SqlError::InvalidValue(format!(
1846                    "{name} requires 1 argument"
1847                )));
1848            }
1849            if args[0].is_null() {
1850                return Ok((vec!["value".into()], vec![]));
1851            }
1852            let j = value_to_serde(&args[0])?;
1853            let arr = j
1854                .as_array()
1855                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON array")))?;
1856            let rows: Vec<Vec<Value>> = arr
1857                .iter()
1858                .map(|v| {
1859                    let text = match v {
1860                        serde_json::Value::String(s) => s.clone(),
1861                        _ => serde_json::to_string(v).unwrap_or_default(),
1862                    };
1863                    vec![Value::Text(text.into())]
1864                })
1865                .collect();
1866            Ok((vec!["value".into()], rows))
1867        }
1868        "JSONB_EACH" | "JSON_EACH" => {
1869            if args.len() != 1 {
1870                return Err(SqlError::InvalidValue(format!(
1871                    "{name} requires 1 argument"
1872                )));
1873            }
1874            if args[0].is_null() {
1875                return Ok((vec!["key".into(), "value".into()], vec![]));
1876            }
1877            let j = value_to_serde(&args[0])?;
1878            let obj = j
1879                .as_object()
1880                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON object")))?;
1881            let target = if upper.starts_with("JSONB") {
1882                crate::types::DataType::Jsonb
1883            } else {
1884                crate::types::DataType::Json
1885            };
1886            let rows: Result<Vec<Vec<Value>>> = obj
1887                .iter()
1888                .map(|(k, v)| {
1889                    Ok(vec![
1890                        Value::Text(k.clone().into()),
1891                        serde_to_value(v.clone(), target)?,
1892                    ])
1893                })
1894                .collect();
1895            Ok((vec!["key".into(), "value".into()], rows?))
1896        }
1897        "JSONB_EACH_TEXT" | "JSON_EACH_TEXT" => {
1898            if args.len() != 1 {
1899                return Err(SqlError::InvalidValue(format!(
1900                    "{name} requires 1 argument"
1901                )));
1902            }
1903            if args[0].is_null() {
1904                return Ok((vec!["key".into(), "value".into()], vec![]));
1905            }
1906            let j = value_to_serde(&args[0])?;
1907            let obj = j
1908                .as_object()
1909                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON object")))?;
1910            let rows: Vec<Vec<Value>> = obj
1911                .iter()
1912                .map(|(k, v)| {
1913                    let text = match v {
1914                        serde_json::Value::String(s) => s.clone(),
1915                        _ => serde_json::to_string(v).unwrap_or_default(),
1916                    };
1917                    vec![Value::Text(k.clone().into()), Value::Text(text.into())]
1918                })
1919                .collect();
1920            Ok((vec!["key".into(), "value".into()], rows))
1921        }
1922        "JSONB_OBJECT_KEYS" | "JSON_OBJECT_KEYS" => {
1923            if args.len() != 1 {
1924                return Err(SqlError::InvalidValue(format!(
1925                    "{name} requires 1 argument"
1926                )));
1927            }
1928            if args[0].is_null() {
1929                return Ok((vec!["key".into()], vec![]));
1930            }
1931            let j = value_to_serde(&args[0])?;
1932            let obj = j
1933                .as_object()
1934                .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON object")))?;
1935            let rows: Vec<Vec<Value>> = obj
1936                .keys()
1937                .map(|k| vec![Value::Text(k.clone().into())])
1938                .collect();
1939            Ok((vec!["key".into()], rows))
1940        }
1941        _ => Err(SqlError::Unsupported(format!(
1942            "set-returning function: {name}"
1943        ))),
1944    }
1945}
1946
1947pub fn is_srf_name(name: &str) -> bool {
1948    matches!(
1949        name.to_ascii_uppercase().as_str(),
1950        "JSONB_ARRAY_ELEMENTS"
1951            | "JSON_ARRAY_ELEMENTS"
1952            | "JSONB_ARRAY_ELEMENTS_TEXT"
1953            | "JSON_ARRAY_ELEMENTS_TEXT"
1954            | "JSONB_EACH"
1955            | "JSON_EACH"
1956            | "JSONB_EACH_TEXT"
1957            | "JSON_EACH_TEXT"
1958            | "JSONB_OBJECT_KEYS"
1959            | "JSON_OBJECT_KEYS"
1960            | "JSONB_POPULATE_RECORD"
1961            | "JSONB_POPULATE_RECORDSET"
1962    )
1963}
1964
1965pub fn extract_to_value(target: crate::types::DataType, j: serde_json::Value) -> Result<Value> {
1966    serde_to_value(j, target)
1967}
1968
1969pub fn to_scalar(j: serde_json::Value) -> Value {
1970    serde_to_scalar_value(j)
1971}
1972
1973pub fn fn_typeof(v: &Value) -> Result<Value> {
1974    if let Value::Jsonb(b) = v {
1975        let (ty, _, _) = read_header(b)?;
1976        let s = match ty {
1977            JsonbType::Null => "null",
1978            JsonbType::True | JsonbType::False => "boolean",
1979            JsonbType::Integer | JsonbType::Real => "number",
1980            JsonbType::String => "string",
1981            JsonbType::Array => "array",
1982            JsonbType::Object => "object",
1983        };
1984        return Ok(Value::Text(s.into()));
1985    }
1986    let j = value_to_serde(v)?;
1987    let s = match j {
1988        serde_json::Value::Null => "null",
1989        serde_json::Value::Bool(_) => "boolean",
1990        serde_json::Value::Number(_) => "number",
1991        serde_json::Value::String(_) => "string",
1992        serde_json::Value::Array(_) => "array",
1993        serde_json::Value::Object(_) => "object",
1994    };
1995    Ok(Value::Text(s.into()))
1996}
1997
1998pub fn fn_array_length(v: &Value) -> Result<Value> {
1999    if let Value::Jsonb(b) = v {
2000        return match array_len_bytes(b)? {
2001            Some(n) => Ok(Value::Integer(n as i64)),
2002            None => Err(SqlError::InvalidValue(
2003                "jsonb_array_length called on non-array".into(),
2004            )),
2005        };
2006    }
2007    let j = value_to_serde(v)?;
2008    match j {
2009        serde_json::Value::Array(arr) => Ok(Value::Integer(arr.len() as i64)),
2010        _ => Err(SqlError::InvalidValue(
2011            "jsonb_array_length called on non-array".into(),
2012        )),
2013    }
2014}
2015
2016pub fn fn_object_length(v: &Value) -> Result<Value> {
2017    if let Value::Jsonb(b) = v {
2018        return match object_len_bytes(b)? {
2019            Some(n) => Ok(Value::Integer(n as i64)),
2020            None => Err(SqlError::InvalidValue(
2021                "jsonb_object_length called on non-object".into(),
2022            )),
2023        };
2024    }
2025    let j = value_to_serde(v)?;
2026    match j {
2027        serde_json::Value::Object(m) => Ok(Value::Integer(m.len() as i64)),
2028        _ => Err(SqlError::InvalidValue(
2029            "jsonb_object_length called on non-object".into(),
2030        )),
2031    }
2032}
2033
2034pub fn fn_extract_path(
2035    args: &[Value],
2036    target: crate::types::DataType,
2037    as_text: bool,
2038) -> Result<Value> {
2039    let mut j = value_to_serde(&args[0])?;
2040    for key_val in &args[1..] {
2041        if key_val.is_null() {
2042            return Ok(Value::Null);
2043        }
2044        let key = match key_val {
2045            Value::Text(s) => s.to_string(),
2046            other => other.to_string(),
2047        };
2048        match &mut j {
2049            serde_json::Value::Object(m) => {
2050                if let Some(next) = m.remove(&key) {
2051                    j = next;
2052                } else {
2053                    return Ok(Value::Null);
2054                }
2055            }
2056            serde_json::Value::Array(arr) => {
2057                let idx: i64 = key.parse().map_err(|_| {
2058                    SqlError::InvalidValue(format!("array path key not integer: {key}"))
2059                })?;
2060                let len = arr.len() as i64;
2061                let idx = if idx < 0 { len + idx } else { idx };
2062                if (0..len).contains(&idx) {
2063                    j = arr.remove(idx as usize);
2064                } else {
2065                    return Ok(Value::Null);
2066                }
2067            }
2068            _ => return Ok(Value::Null),
2069        }
2070    }
2071    if as_text {
2072        match j {
2073            serde_json::Value::Null => Ok(Value::Null),
2074            serde_json::Value::String(s) => Ok(Value::Text(s.into())),
2075            other => Ok(Value::Text(
2076                serde_json::to_string(&other)
2077                    .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
2078                    .into(),
2079            )),
2080        }
2081    } else {
2082        serde_to_value(j, target)
2083    }
2084}
2085
2086pub fn fn_sqlite_extract(j_val: &Value, path: &Value) -> Result<Value> {
2087    let path_str = match path {
2088        Value::Text(s) => s.to_string(),
2089        _ => {
2090            return Err(SqlError::TypeMismatch {
2091                expected: "TEXT path".into(),
2092                got: path.data_type().to_string(),
2093            })
2094        }
2095    };
2096    let j = value_to_serde(j_val)?;
2097    let segments = parse_dollar_path(&path_str)?;
2098    match navigate_path(&j, &segments) {
2099        Some(serde_json::Value::Null) => Ok(Value::Null),
2100        Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
2101        Some(other) => Ok(Value::Text(
2102            serde_json::to_string(&other)
2103                .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
2104                .into(),
2105        )),
2106        None => Ok(Value::Null),
2107    }
2108}
2109
2110pub fn fn_valid(v: &Value) -> Result<Value> {
2111    let s = match v {
2112        Value::Text(s) => s.as_str(),
2113        Value::Json(s) => s.as_str(),
2114        _ => return Ok(Value::Boolean(false)),
2115    };
2116    Ok(Value::Boolean(
2117        serde_json::from_str::<serde_json::Value>(s).is_ok(),
2118    ))
2119}
2120
2121pub fn fn_strip_nulls(v: &Value, target: crate::types::DataType) -> Result<Value> {
2122    let mut j = value_to_serde(v)?;
2123    strip_nulls_inplace(&mut j);
2124    serde_to_value(j, target)
2125}
2126
2127fn strip_nulls_inplace(j: &mut serde_json::Value) {
2128    match j {
2129        serde_json::Value::Object(m) => {
2130            m.retain(|_, v| !matches!(v, serde_json::Value::Null));
2131            for v in m.values_mut() {
2132                strip_nulls_inplace(v);
2133            }
2134        }
2135        serde_json::Value::Array(arr) => {
2136            for v in arr.iter_mut() {
2137                strip_nulls_inplace(v);
2138            }
2139        }
2140        _ => {}
2141    }
2142}
2143
2144pub fn fn_pretty(v: &Value) -> Result<Value> {
2145    let j = value_to_serde(v)?;
2146    let s = serde_json::to_string_pretty(&j)
2147        .map_err(|e| SqlError::InvalidValue(format!("JSON pretty render: {e}")))?;
2148    Ok(Value::Text(s.into()))
2149}
2150
2151pub fn fn_build_object(args: &[Value], target: crate::types::DataType) -> Result<Value> {
2152    if args.len() % 2 != 0 {
2153        return Err(SqlError::InvalidValue(
2154            "jsonb_build_object requires an even number of arguments".into(),
2155        ));
2156    }
2157    let mut map = serde_json::Map::new();
2158    for pair in args.chunks(2) {
2159        let key = match &pair[0] {
2160            Value::Null => continue,
2161            Value::Text(s) => s.to_string(),
2162            other => other.to_string(),
2163        };
2164        let val = value_to_serde_lossy(&pair[1])?;
2165        map.insert(key, val);
2166    }
2167    serde_to_value(serde_json::Value::Object(map), target)
2168}
2169
2170pub fn fn_build_array(args: &[Value], target: crate::types::DataType) -> Result<Value> {
2171    let items: Result<Vec<serde_json::Value>> = args.iter().map(value_to_serde_lossy).collect();
2172    serde_to_value(serde_json::Value::Array(items?), target)
2173}
2174
2175pub fn fn_set(
2176    j: &Value,
2177    path: &Value,
2178    new_value: &Value,
2179    create_missing: bool,
2180    target: crate::types::DataType,
2181) -> Result<Value> {
2182    let mut root = value_to_serde(j)?;
2183    let segments = path_to_segments(path)?;
2184    let new_serde = value_to_serde_lossy(new_value)?;
2185    if !set_at_path(&mut root, &segments, new_serde, create_missing, false) {
2186        return serde_to_value(root, target);
2187    }
2188    serde_to_value(root, target)
2189}
2190
2191pub fn fn_insert(
2192    j: &Value,
2193    path: &Value,
2194    new_value: &Value,
2195    insert_after: bool,
2196    target: crate::types::DataType,
2197) -> Result<Value> {
2198    let mut root = value_to_serde(j)?;
2199    let segments = path_to_segments(path)?;
2200    let new_serde = value_to_serde_lossy(new_value)?;
2201    set_at_path(&mut root, &segments, new_serde, true, insert_after);
2202    serde_to_value(root, target)
2203}
2204
2205fn set_at_path(
2206    root: &mut serde_json::Value,
2207    segments: &[PathSeg],
2208    new_value: serde_json::Value,
2209    create_missing: bool,
2210    insert_array: bool,
2211) -> bool {
2212    if segments.is_empty() {
2213        return false;
2214    }
2215    let (last, prefix) = segments.split_last().unwrap();
2216    let Some(target) = navigate_mut(root, prefix) else {
2217        return false;
2218    };
2219    match (target, last) {
2220        (serde_json::Value::Object(m), PathSeg::Key(k)) => {
2221            let exists = m.contains_key(k.as_str());
2222            if exists || create_missing {
2223                m.insert(k.clone(), new_value);
2224                true
2225            } else {
2226                false
2227            }
2228        }
2229        (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
2230            let len = arr.len() as i64;
2231            let idx = if *i < 0 { len + i } else { *i };
2232            if insert_array {
2233                let target_pos = if idx <= 0 {
2234                    0
2235                } else if idx >= len {
2236                    arr.len()
2237                } else {
2238                    idx as usize
2239                };
2240                arr.insert(target_pos, new_value);
2241                true
2242            } else if (0..len).contains(&idx) {
2243                arr[idx as usize] = new_value;
2244                true
2245            } else if create_missing {
2246                if idx < 0 {
2247                    arr.insert(0, new_value);
2248                } else {
2249                    arr.push(new_value);
2250                }
2251                true
2252            } else {
2253                false
2254            }
2255        }
2256        _ => false,
2257    }
2258}
2259
2260pub fn fn_to_json(v: &Value, target: crate::types::DataType) -> Result<Value> {
2261    let j = value_to_serde_lossy(v)?;
2262    serde_to_value(j, target)
2263}
2264
2265pub fn fn_json_object(args: &[Value]) -> Result<Value> {
2266    match args.len() {
2267        1 => {
2268            let j = value_to_serde(&args[0])?;
2269            let arr = j
2270                .as_array()
2271                .ok_or_else(|| SqlError::InvalidValue("json_object expects text array".into()))?;
2272            let mut map = serde_json::Map::new();
2273            let mut i = 0;
2274            while i + 1 < arr.len() {
2275                let key = arr[i]
2276                    .as_str()
2277                    .ok_or_else(|| SqlError::InvalidValue("json_object key must be string".into()))?
2278                    .to_string();
2279                let val = arr[i + 1].clone();
2280                map.insert(key, val);
2281                i += 2;
2282            }
2283            serde_to_value(serde_json::Value::Object(map), crate::types::DataType::Json)
2284        }
2285        2 => {
2286            let keys = text_array(&args[0])?;
2287            let vals = text_array(&args[1])?;
2288            if keys.len() != vals.len() {
2289                return Err(SqlError::InvalidValue(
2290                    "json_object: keys and values must be same length".into(),
2291                ));
2292            }
2293            let mut map = serde_json::Map::new();
2294            for (k, v) in keys.into_iter().zip(vals) {
2295                map.insert(k, serde_json::Value::String(v));
2296            }
2297            serde_to_value(serde_json::Value::Object(map), crate::types::DataType::Json)
2298        }
2299        _ => Err(SqlError::InvalidValue(
2300            "json_object requires 1 or 2 arguments".into(),
2301        )),
2302    }
2303}
2304
2305fn value_to_serde_lossy(v: &Value) -> Result<serde_json::Value> {
2306    match v {
2307        Value::Null => Ok(serde_json::Value::Null),
2308        Value::Boolean(b) => Ok(serde_json::Value::Bool(*b)),
2309        Value::Integer(i) => Ok(serde_json::Value::Number((*i).into())),
2310        Value::Real(r) => serde_json::Number::from_f64(*r)
2311            .map(serde_json::Value::Number)
2312            .ok_or_else(|| SqlError::InvalidValue("non-finite number".into())),
2313        Value::Text(s) => Ok(serde_json::Value::String(s.to_string())),
2314        Value::Json(s) => serde_json::from_str(s)
2315            .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}"))),
2316        Value::Jsonb(b) => decode_to_serde(b),
2317        Value::Blob(b) => {
2318            let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect();
2319            Ok(serde_json::Value::String(hex))
2320        }
2321        Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. } => {
2322            Ok(serde_json::Value::String(format!("{v}")))
2323        }
2324        Value::TsVector(_) | Value::TsQuery(_) => Ok(serde_json::Value::String(format!("{v}"))),
2325        Value::Array(a) => {
2326            let mut out = Vec::with_capacity(a.len());
2327            for elem in a.iter() {
2328                out.push(value_to_serde_lossy(elem)?);
2329            }
2330            Ok(serde_json::Value::Array(out))
2331        }
2332    }
2333}
2334
2335#[cfg(test)]
2336#[path = "json_tests.rs"]
2337mod tests;