1use std::sync::Arc;
2
3use crate::error::{Result, SqlError};
4use crate::types::Value;
5
6#[repr(u8)]
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum JsonbType {
9 Null = 0,
10 True = 1,
11 False = 2,
12 Integer = 3,
13 Real = 4,
14 String = 5,
15 Array = 6,
16 Object = 7,
17}
18
19impl JsonbType {
20 fn from_nibble(n: u8) -> Option<Self> {
21 match n {
22 0 => Some(Self::Null),
23 1 => Some(Self::True),
24 2 => Some(Self::False),
25 3 => Some(Self::Integer),
26 4 => Some(Self::Real),
27 5 => Some(Self::String),
28 6 => Some(Self::Array),
29 7 => Some(Self::Object),
30 _ => None,
31 }
32 }
33}
34
35const SIZE_CLASS_U8: u8 = 12;
36const SIZE_CLASS_U16: u8 = 13;
37const SIZE_CLASS_U32: u8 = 14;
38const SIZE_CLASS_U64: u8 = 15;
39
40pub fn validate_text(s: &str) -> Result<()> {
41 serde_json::from_str::<serde_json::Value>(s)
42 .map(|_| ())
43 .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}")))
44}
45
46pub fn text_to_jsonb(s: &str) -> Result<Value> {
47 let v: serde_json::Value = serde_json::from_str(s)
48 .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}")))?;
49 reject_null_bytes(&v)?;
50 let mut buf = Vec::with_capacity(s.len());
51 encode_canonical(&v, &mut buf)?;
52 Ok(Value::Jsonb(Arc::from(buf)))
53}
54
55fn reject_null_bytes(v: &serde_json::Value) -> Result<()> {
56 match v {
57 serde_json::Value::String(s) if s.contains('\0') => Err(SqlError::InvalidValue(
58 "unsupported Unicode escape sequence \\u0000".into(),
59 )),
60 serde_json::Value::Array(items) => items.iter().try_for_each(reject_null_bytes),
61 serde_json::Value::Object(map) => map.iter().try_for_each(|(k, v)| {
62 if k.contains('\0') {
63 return Err(SqlError::InvalidValue(
64 "unsupported Unicode escape sequence \\u0000".into(),
65 ));
66 }
67 reject_null_bytes(v)
68 }),
69 _ => Ok(()),
70 }
71}
72
73pub fn decode_to_text(bytes: &[u8]) -> Result<String> {
74 let v = decode_to_serde(bytes)?;
75 serde_json::to_string(&v).map_err(|e| SqlError::InvalidValue(format!("JSONB render: {e}")))
76}
77
78pub fn decode_to_serde(bytes: &[u8]) -> Result<serde_json::Value> {
79 let mut pos = 0;
80 let v = decode_value(bytes, &mut pos)?;
81 if pos != bytes.len() {
82 return Err(SqlError::InvalidValue("trailing bytes in JSONB".into()));
83 }
84 Ok(v)
85}
86
87pub fn encode_canonical(v: &serde_json::Value, out: &mut Vec<u8>) -> Result<()> {
88 match v {
89 serde_json::Value::Null => out.push(header_byte(JsonbType::Null, 0)),
90 serde_json::Value::Bool(true) => out.push(header_byte(JsonbType::True, 0)),
91 serde_json::Value::Bool(false) => out.push(header_byte(JsonbType::False, 0)),
92 serde_json::Value::Number(n) => {
93 if let Some(i) = n.as_i64() {
94 out.push(header_byte(JsonbType::Integer, 0));
95 out.extend_from_slice(&i.to_le_bytes());
96 } else if let Some(f) = n.as_f64() {
97 if f.is_finite() {
98 out.push(header_byte(JsonbType::Real, 0));
99 out.extend_from_slice(&f.to_le_bytes());
100 } else {
101 return Err(SqlError::InvalidValue("non-finite number in JSON".into()));
102 }
103 } else {
104 return Err(SqlError::InvalidValue(format!("unsupported number: {n}")));
105 }
106 }
107 serde_json::Value::String(s) => encode_string(s, out),
108 serde_json::Value::Array(items) => {
109 let mut payload = Vec::new();
110 for item in items {
111 encode_canonical(item, &mut payload)?;
112 }
113 write_header_with_len(JsonbType::Array, payload.len(), out);
114 out.extend_from_slice(&payload);
115 }
116 serde_json::Value::Object(map) => {
117 let mut keys: Vec<&String> = map.keys().collect();
118 keys.sort();
119 let mut payload = Vec::new();
120 for k in keys {
121 encode_string(k, &mut payload);
122 encode_canonical(&map[k], &mut payload)?;
123 }
124 write_header_with_len(JsonbType::Object, payload.len(), out);
125 out.extend_from_slice(&payload);
126 }
127 }
128 Ok(())
129}
130
131fn encode_string(s: &str, out: &mut Vec<u8>) {
132 let bytes = s.as_bytes();
133 write_header_with_len(JsonbType::String, bytes.len(), out);
134 out.extend_from_slice(bytes);
135}
136
137fn header_byte(ty: JsonbType, size_class: u8) -> u8 {
138 debug_assert!(size_class <= 15);
139 (ty as u8) << 4 | (size_class & 0x0F)
140}
141
142fn write_header_with_len(ty: JsonbType, len: usize, out: &mut Vec<u8>) {
143 if len <= 11 {
144 out.push(header_byte(ty, len as u8));
145 } else if len <= u8::MAX as usize {
146 out.push(header_byte(ty, SIZE_CLASS_U8));
147 out.push(len as u8);
148 } else if len <= u16::MAX as usize {
149 out.push(header_byte(ty, SIZE_CLASS_U16));
150 out.extend_from_slice(&(len as u16).to_le_bytes());
151 } else if len <= u32::MAX as usize {
152 out.push(header_byte(ty, SIZE_CLASS_U32));
153 out.extend_from_slice(&(len as u32).to_le_bytes());
154 } else {
155 out.push(header_byte(ty, SIZE_CLASS_U64));
156 out.extend_from_slice(&(len as u64).to_le_bytes());
157 }
158}
159
160pub fn read_header(bytes: &[u8]) -> Result<(JsonbType, usize, usize)> {
161 if bytes.is_empty() {
162 return Err(SqlError::InvalidValue("empty JSONB".into()));
163 }
164 let h = bytes[0];
165 let ty = JsonbType::from_nibble(h >> 4)
166 .ok_or_else(|| SqlError::InvalidValue("invalid JSONB type tag".into()))?;
167 let size_class = h & 0x0F;
168 let (payload_start, payload_len) = match size_class {
169 0..=11 => (1, size_class as usize),
170 SIZE_CLASS_U8 => {
171 if bytes.len() < 2 {
172 return Err(SqlError::InvalidValue("truncated JSONB header".into()));
173 }
174 (2, bytes[1] as usize)
175 }
176 SIZE_CLASS_U16 => {
177 if bytes.len() < 3 {
178 return Err(SqlError::InvalidValue("truncated JSONB header".into()));
179 }
180 (3, u16::from_le_bytes([bytes[1], bytes[2]]) as usize)
181 }
182 SIZE_CLASS_U32 => {
183 if bytes.len() < 5 {
184 return Err(SqlError::InvalidValue("truncated JSONB header".into()));
185 }
186 (
187 5,
188 u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]) as usize,
189 )
190 }
191 SIZE_CLASS_U64 => {
192 if bytes.len() < 9 {
193 return Err(SqlError::InvalidValue("truncated JSONB header".into()));
194 }
195 let arr: [u8; 8] = bytes[1..9].try_into().unwrap();
196 (9, u64::from_le_bytes(arr) as usize)
197 }
198 _ => unreachable!(),
199 };
200 let fixed_payload = match ty {
201 JsonbType::Null | JsonbType::True | JsonbType::False => Some(0),
202 JsonbType::Integer | JsonbType::Real => Some(8),
203 _ => None,
204 };
205 let payload_len = fixed_payload.unwrap_or(payload_len);
206 if payload_start + payload_len > bytes.len() {
207 return Err(SqlError::InvalidValue("JSONB payload truncated".into()));
208 }
209 Ok((ty, payload_start, payload_len))
210}
211
212pub fn skip_value(bytes: &[u8]) -> Result<usize> {
213 let (_ty, payload_start, payload_len) = read_header(bytes)?;
214 Ok(payload_start + payload_len)
215}
216
217pub fn find_object_key<'a>(bytes: &'a [u8], key: &str) -> Result<Option<&'a [u8]>> {
218 let (ty, payload_start, payload_len) = read_header(bytes)?;
219 if ty != JsonbType::Object {
220 return Ok(None);
221 }
222 let payload = &bytes[payload_start..payload_start + payload_len];
223 let key_bytes = key.as_bytes();
224 let mut pos = 0usize;
225 while pos < payload.len() {
226 let (kty, kp_start, kp_len) = read_header(&payload[pos..])?;
227 if kty != JsonbType::String {
228 return Err(SqlError::InvalidValue("JSONB object key not string".into()));
229 }
230 let k_total = kp_start + kp_len;
231 let k_slice = &payload[pos + kp_start..pos + k_total];
232 let value_start = pos + k_total;
233 let value_total = skip_value(&payload[value_start..])?;
234 if k_slice == key_bytes {
235 return Ok(Some(&payload[value_start..value_start + value_total]));
236 }
237 pos = value_start + value_total;
238 }
239 Ok(None)
240}
241
242pub fn array_get(bytes: &[u8], idx: i64) -> Result<Option<&[u8]>> {
243 let (ty, payload_start, payload_len) = read_header(bytes)?;
244 if ty != JsonbType::Array {
245 return Ok(None);
246 }
247 let payload = &bytes[payload_start..payload_start + payload_len];
248 if idx < 0 {
249 let mut elems = Vec::new();
250 let mut pos = 0usize;
251 while pos < payload.len() {
252 let total = skip_value(&payload[pos..])?;
253 elems.push((pos, total));
254 pos += total;
255 }
256 let len = elems.len() as i64;
257 let real = len + idx;
258 if real < 0 {
259 return Ok(None);
260 }
261 let (start, total) = elems[real as usize];
262 return Ok(Some(&payload[start..start + total]));
263 }
264 let mut pos = 0usize;
265 let mut remaining = idx;
266 while pos < payload.len() {
267 let total = skip_value(&payload[pos..])?;
268 if remaining == 0 {
269 return Ok(Some(&payload[pos..pos + total]));
270 }
271 remaining -= 1;
272 pos += total;
273 }
274 Ok(None)
275}
276
277pub fn array_len_bytes(bytes: &[u8]) -> Result<Option<usize>> {
278 let (ty, payload_start, payload_len) = read_header(bytes)?;
279 if ty != JsonbType::Array {
280 return Ok(None);
281 }
282 let payload = &bytes[payload_start..payload_start + payload_len];
283 let mut pos = 0usize;
284 let mut count = 0usize;
285 while pos < payload.len() {
286 pos += skip_value(&payload[pos..])?;
287 count += 1;
288 }
289 Ok(Some(count))
290}
291
292pub fn object_len_bytes(bytes: &[u8]) -> Result<Option<usize>> {
293 let (ty, payload_start, payload_len) = read_header(bytes)?;
294 if ty != JsonbType::Object {
295 return Ok(None);
296 }
297 let payload = &bytes[payload_start..payload_start + payload_len];
298 let mut pos = 0usize;
299 let mut count = 0usize;
300 while pos < payload.len() {
301 pos += skip_value(&payload[pos..])?;
302 pos += skip_value(&payload[pos..])?;
303 count += 1;
304 }
305 Ok(Some(count))
306}
307
308pub fn read_scalar_text(bytes: &[u8]) -> Result<Option<String>> {
309 let (ty, payload_start, payload_len) = read_header(bytes)?;
310 let payload = &bytes[payload_start..payload_start + payload_len];
311 match ty {
312 JsonbType::Null => Ok(None),
313 JsonbType::True => Ok(Some("true".into())),
314 JsonbType::False => Ok(Some("false".into())),
315 JsonbType::Integer => {
316 let arr: [u8; 8] = payload
317 .try_into()
318 .map_err(|_| SqlError::InvalidValue("JSONB integer payload size".into()))?;
319 Ok(Some(i64::from_le_bytes(arr).to_string()))
320 }
321 JsonbType::Real => {
322 let arr: [u8; 8] = payload
323 .try_into()
324 .map_err(|_| SqlError::InvalidValue("JSONB real payload size".into()))?;
325 let f = f64::from_le_bytes(arr);
326 let n = serde_json::Number::from_f64(f)
327 .ok_or_else(|| SqlError::InvalidValue("non-finite JSONB number".into()))?;
328 Ok(Some(n.to_string()))
329 }
330 JsonbType::String => {
331 let s = std::str::from_utf8(payload)
332 .map_err(|_| SqlError::InvalidValue("JSONB string not UTF-8".into()))?;
333 Ok(Some(s.to_string()))
334 }
335 JsonbType::Array | JsonbType::Object => {
336 let v = decode_to_serde(bytes)?;
337 Ok(Some(serde_json::to_string(&v).map_err(|e| {
338 SqlError::InvalidValue(format!("JSON render: {e}"))
339 })?))
340 }
341 }
342}
343
344pub fn jsonb_contains_bytes(lhs: &[u8], rhs: &[u8]) -> Result<bool> {
345 let (lty, lps, lpl) = read_header(lhs)?;
346 let (rty, rps, rpl) = read_header(rhs)?;
347 let lpay = &lhs[lps..lps + lpl];
348 let rpay = &rhs[rps..rps + rpl];
349 match (lty, rty) {
350 (JsonbType::Object, JsonbType::Object) => {
351 let mut rp = 0usize;
352 while rp < rpay.len() {
353 let (_rkty, rkps, rkpl) = read_header(&rpay[rp..])?;
354 let rk_total = rkps + rkpl;
355 let rk = &rpay[rp + rkps..rp + rk_total];
356 let rv_start = rp + rk_total;
357 let rv_total = skip_value(&rpay[rv_start..])?;
358 let rv = &rpay[rv_start..rv_start + rv_total];
359 let mut lp = 0usize;
360 let mut found = false;
361 while lp < lpay.len() {
362 let (_lkty, lkps, lkpl) = read_header(&lpay[lp..])?;
363 let lk_total = lkps + lkpl;
364 let lk = &lpay[lp + lkps..lp + lk_total];
365 let lv_start = lp + lk_total;
366 let lv_total = skip_value(&lpay[lv_start..])?;
367 if lk == rk {
368 let lv = &lpay[lv_start..lv_start + lv_total];
369 if !jsonb_contains_bytes(lv, rv)? {
370 return Ok(false);
371 }
372 found = true;
373 break;
374 }
375 lp = lv_start + lv_total;
376 }
377 if !found {
378 return Ok(false);
379 }
380 rp = rv_start + rv_total;
381 }
382 Ok(true)
383 }
384 (JsonbType::Array, JsonbType::Array) => {
385 let mut rp = 0usize;
386 while rp < rpay.len() {
387 let rv_total = skip_value(&rpay[rp..])?;
388 let rv = &rpay[rp..rp + rv_total];
389 let mut lp = 0usize;
390 let mut found = false;
391 while lp < lpay.len() {
392 let lv_total = skip_value(&lpay[lp..])?;
393 let lv = &lpay[lp..lp + lv_total];
394 if jsonb_contains_bytes(lv, rv)? {
395 found = true;
396 break;
397 }
398 lp += lv_total;
399 }
400 if !found {
401 return Ok(false);
402 }
403 rp += rv_total;
404 }
405 Ok(true)
406 }
407 (JsonbType::Array, _) => {
408 let r_total = rps + rpl;
409 let r_full = &rhs[..r_total];
410 let mut lp = 0usize;
411 while lp < lpay.len() {
412 let lv_total = skip_value(&lpay[lp..])?;
413 if &lpay[lp..lp + lv_total] == r_full {
414 return Ok(true);
415 }
416 lp += lv_total;
417 }
418 Ok(false)
419 }
420 _ => {
421 let l_total = lps + lpl;
422 let r_total = rps + rpl;
423 Ok(lhs[..l_total] == rhs[..r_total])
424 }
425 }
426}
427
428pub fn has_top_key_bytes(bytes: &[u8], key: &str) -> Result<bool> {
429 let (ty, payload_start, payload_len) = read_header(bytes)?;
430 let payload = &bytes[payload_start..payload_start + payload_len];
431 let key_bytes = key.as_bytes();
432 match ty {
433 JsonbType::Object => {
434 let mut pos = 0usize;
435 while pos < payload.len() {
436 let (_kty, kps, kpl) = read_header(&payload[pos..])?;
437 let k_total = kps + kpl;
438 if &payload[pos + kps..pos + k_total] == key_bytes {
439 return Ok(true);
440 }
441 pos += k_total;
442 pos += skip_value(&payload[pos..])?;
443 }
444 Ok(false)
445 }
446 JsonbType::Array => {
447 let mut pos = 0usize;
448 while pos < payload.len() {
449 let (ety, eps, epl) = read_header(&payload[pos..])?;
450 if ety == JsonbType::String && &payload[pos + eps..pos + eps + epl] == key_bytes {
451 return Ok(true);
452 }
453 pos += eps + epl;
454 }
455 Ok(false)
456 }
457 _ => Ok(false),
458 }
459}
460
461fn decode_value(bytes: &[u8], pos: &mut usize) -> Result<serde_json::Value> {
462 let (ty, payload_start, payload_len) = read_header(&bytes[*pos..])?;
463 let payload = &bytes[*pos + payload_start..*pos + payload_start + payload_len];
464 let total = payload_start + payload_len;
465 let v = match ty {
466 JsonbType::Null => serde_json::Value::Null,
467 JsonbType::True => serde_json::Value::Bool(true),
468 JsonbType::False => serde_json::Value::Bool(false),
469 JsonbType::Integer => {
470 let arr: [u8; 8] = payload
471 .try_into()
472 .map_err(|_| SqlError::InvalidValue("JSONB integer payload size".into()))?;
473 serde_json::Value::Number(i64::from_le_bytes(arr).into())
474 }
475 JsonbType::Real => {
476 let arr: [u8; 8] = payload
477 .try_into()
478 .map_err(|_| SqlError::InvalidValue("JSONB real payload size".into()))?;
479 let f = f64::from_le_bytes(arr);
480 serde_json::Number::from_f64(f)
481 .map(serde_json::Value::Number)
482 .ok_or_else(|| SqlError::InvalidValue("non-finite JSONB number".into()))?
483 }
484 JsonbType::String => {
485 let s = std::str::from_utf8(payload)
486 .map_err(|_| SqlError::InvalidValue("JSONB string not UTF-8".into()))?;
487 serde_json::Value::String(s.to_string())
488 }
489 JsonbType::Array => {
490 let mut items = Vec::new();
491 let mut child_pos = 0usize;
492 while child_pos < payload.len() {
493 let mut local = child_pos;
494 let item = decode_value(payload, &mut local)?;
495 items.push(item);
496 child_pos = local;
497 }
498 serde_json::Value::Array(items)
499 }
500 JsonbType::Object => {
501 let mut map = serde_json::Map::new();
502 let mut child_pos = 0usize;
503 while child_pos < payload.len() {
504 let mut local = child_pos;
505 let key = match decode_value(payload, &mut local)? {
506 serde_json::Value::String(s) => s,
507 _ => return Err(SqlError::InvalidValue("JSONB object key not string".into())),
508 };
509 let value = decode_value(payload, &mut local)?;
510 map.insert(key, value);
511 child_pos = local;
512 }
513 serde_json::Value::Object(map)
514 }
515 };
516 *pos += total;
517 Ok(v)
518}
519
520fn value_to_serde(v: &Value) -> Result<serde_json::Value> {
521 match v {
522 Value::Json(s) => serde_json::from_str(s)
523 .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}"))),
524 Value::Jsonb(b) => decode_to_serde(b),
525 _ => Err(SqlError::TypeMismatch {
526 expected: "JSON or JSONB".into(),
527 got: v.data_type().to_string(),
528 }),
529 }
530}
531
532fn serde_to_value(j: serde_json::Value, target: crate::types::DataType) -> Result<Value> {
533 use crate::types::DataType;
534 match target {
535 DataType::Json => Ok(Value::Json(
536 serde_json::to_string(&j)
537 .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
538 .into(),
539 )),
540 DataType::Jsonb => {
541 let mut buf = Vec::new();
542 encode_canonical(&j, &mut buf)?;
543 Ok(Value::Jsonb(Arc::from(buf)))
544 }
545 _ => Err(SqlError::InvalidValue(format!(
546 "cannot serialize JSON to {target}"
547 ))),
548 }
549}
550
551fn serde_to_scalar_value(j: serde_json::Value) -> Value {
552 match j {
553 serde_json::Value::Null => Value::Null,
554 serde_json::Value::Bool(b) => Value::Boolean(b),
555 serde_json::Value::Number(n) => {
556 if let Some(i) = n.as_i64() {
557 Value::Integer(i)
558 } else if let Some(f) = n.as_f64() {
559 Value::Real(f)
560 } else {
561 Value::Null
562 }
563 }
564 serde_json::Value::String(s) => Value::Text(s.into()),
565 other => {
566 let text = serde_json::to_string(&other).unwrap_or_default();
567 Value::Text(text.into())
568 }
569 }
570}
571
572pub fn op_get(lhs: &Value, key: &Value) -> Result<Value> {
573 if let Value::Jsonb(b) = lhs {
574 let slice = match key {
575 Value::Text(k) => find_object_key(b, k.as_str())?,
576 Value::Integer(i) => array_get(b, *i)?,
577 _ => None,
578 };
579 return match slice {
580 Some(bytes) => Ok(Value::Jsonb(Arc::from(bytes))),
581 None => Ok(Value::Null),
582 };
583 }
584 let target = match lhs {
585 Value::Json(_) => crate::types::DataType::Json,
586 _ => {
587 return Err(SqlError::TypeMismatch {
588 expected: "JSON or JSONB".into(),
589 got: lhs.data_type().to_string(),
590 })
591 }
592 };
593 let j = value_to_serde(lhs)?;
594 let extracted = navigate_one(&j, key);
595 match extracted {
596 Some(v) => serde_to_value(v, target),
597 None => Ok(Value::Null),
598 }
599}
600
601pub fn op_get_text(lhs: &Value, key: &Value) -> Result<Value> {
602 if let Value::Jsonb(b) = lhs {
603 let slice = match key {
604 Value::Text(k) => find_object_key(b, k.as_str())?,
605 Value::Integer(i) => array_get(b, *i)?,
606 _ => None,
607 };
608 return match slice {
609 Some(bytes) => match read_scalar_text(bytes)? {
610 Some(s) => Ok(Value::Text(s.into())),
611 None => Ok(Value::Null),
612 },
613 None => Ok(Value::Null),
614 };
615 }
616 let j = value_to_serde(lhs)?;
617 match navigate_one(&j, key) {
618 Some(serde_json::Value::Null) => Ok(Value::Null),
619 Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
620 Some(v) => Ok(Value::Text(
621 serde_json::to_string(&v)
622 .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
623 .into(),
624 )),
625 None => Ok(Value::Null),
626 }
627}
628
629pub fn op_path(lhs: &Value, path: &Value) -> Result<Value> {
630 let target = match lhs {
631 Value::Json(_) => crate::types::DataType::Json,
632 Value::Jsonb(_) => crate::types::DataType::Jsonb,
633 _ => {
634 return Err(SqlError::TypeMismatch {
635 expected: "JSON or JSONB".into(),
636 got: lhs.data_type().to_string(),
637 })
638 }
639 };
640 let j = value_to_serde(lhs)?;
641 let segments = path_to_segments(path)?;
642 match navigate_path(&j, &segments) {
643 Some(v) => serde_to_value(v, target),
644 None => Ok(Value::Null),
645 }
646}
647
648pub fn op_path_text(lhs: &Value, path: &Value) -> Result<Value> {
649 let j = value_to_serde(lhs)?;
650 let segments = path_to_segments(path)?;
651 match navigate_path(&j, &segments) {
652 Some(serde_json::Value::Null) => Ok(Value::Null),
653 Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
654 Some(v) => Ok(Value::Text(
655 serde_json::to_string(&v)
656 .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
657 .into(),
658 )),
659 None => Ok(Value::Null),
660 }
661}
662
663pub fn op_contains(lhs: &Value, rhs: &Value) -> Result<Value> {
664 if let (Value::Jsonb(l), Value::Jsonb(r)) = (lhs, rhs) {
665 return Ok(Value::Boolean(jsonb_contains_bytes(l, r)?));
666 }
667 let left = value_to_serde(lhs)?;
668 let right = value_to_serde(rhs)?;
669 Ok(Value::Boolean(json_contains(&left, &right)))
670}
671
672pub fn op_contained_by(lhs: &Value, rhs: &Value) -> Result<Value> {
673 if let (Value::Jsonb(l), Value::Jsonb(r)) = (lhs, rhs) {
674 return Ok(Value::Boolean(jsonb_contains_bytes(r, l)?));
675 }
676 let left = value_to_serde(lhs)?;
677 let right = value_to_serde(rhs)?;
678 Ok(Value::Boolean(json_contains(&right, &left)))
679}
680
681pub fn op_has_key(lhs: &Value, rhs: &Value) -> Result<Value> {
682 let key = match rhs {
683 Value::Text(s) => s.as_str(),
684 _ => {
685 return Err(SqlError::TypeMismatch {
686 expected: "TEXT key".into(),
687 got: rhs.data_type().to_string(),
688 })
689 }
690 };
691 if let Value::Jsonb(b) = lhs {
692 return Ok(Value::Boolean(has_top_key_bytes(b, key)?));
693 }
694 let left = value_to_serde(lhs)?;
695 let exists = match &left {
696 serde_json::Value::Object(m) => m.contains_key(key),
697 serde_json::Value::Array(arr) => arr
698 .iter()
699 .any(|e| matches!(e, serde_json::Value::String(s) if s == key)),
700 _ => false,
701 };
702 Ok(Value::Boolean(exists))
703}
704
705pub fn op_has_any_key(lhs: &Value, rhs: &Value) -> Result<Value> {
706 let left = value_to_serde(lhs)?;
707 let keys = text_array(rhs)?;
708 let m = match &left {
709 serde_json::Value::Object(m) => m,
710 _ => return Ok(Value::Boolean(false)),
711 };
712 Ok(Value::Boolean(
713 keys.iter().any(|k| m.contains_key(k.as_str())),
714 ))
715}
716
717pub fn op_has_all_keys(lhs: &Value, rhs: &Value) -> Result<Value> {
718 let left = value_to_serde(lhs)?;
719 let keys = text_array(rhs)?;
720 let m = match &left {
721 serde_json::Value::Object(m) => m,
722 _ => return Ok(Value::Boolean(keys.is_empty())),
723 };
724 Ok(Value::Boolean(
725 keys.iter().all(|k| m.contains_key(k.as_str())),
726 ))
727}
728
729pub fn op_delete_path(lhs: &Value, path: &Value) -> Result<Value> {
730 let target = match lhs {
731 Value::Json(_) => crate::types::DataType::Json,
732 Value::Jsonb(_) => crate::types::DataType::Jsonb,
733 _ => {
734 return Err(SqlError::TypeMismatch {
735 expected: "JSON or JSONB".into(),
736 got: lhs.data_type().to_string(),
737 })
738 }
739 };
740 let mut j = value_to_serde(lhs)?;
741 let segments = path_to_segments(path)?;
742 delete_at_path(&mut j, &segments);
743 serde_to_value(j, target)
744}
745
746pub fn op_delete_one(lhs: &Value, rhs: &Value) -> Result<Value> {
747 let target = match lhs {
748 Value::Json(_) => crate::types::DataType::Json,
749 Value::Jsonb(_) => crate::types::DataType::Jsonb,
750 _ => {
751 return Err(SqlError::TypeMismatch {
752 expected: "JSON or JSONB".into(),
753 got: lhs.data_type().to_string(),
754 })
755 }
756 };
757 let mut j = value_to_serde(lhs)?;
758 match (&mut j, rhs) {
759 (serde_json::Value::Object(m), Value::Text(k)) => {
760 m.remove(k.as_str());
761 }
762 (serde_json::Value::Array(arr), Value::Integer(i)) => {
763 let len = arr.len() as i64;
764 let idx = if *i < 0 { len + i } else { *i };
765 if (0..len).contains(&idx) {
766 arr.remove(idx as usize);
767 }
768 }
769 (serde_json::Value::Array(arr), Value::Text(k)) => {
770 arr.retain(|e| !matches!(e, serde_json::Value::String(s) if s == k.as_str()));
771 }
772 _ => {}
773 }
774 serde_to_value(j, target)
775}
776
777pub fn op_concat(lhs: &Value, rhs: &Value) -> Result<Value> {
778 let target = match (lhs, rhs) {
779 (Value::Jsonb(_), _) | (_, Value::Jsonb(_)) => crate::types::DataType::Jsonb,
780 _ => crate::types::DataType::Json,
781 };
782 let mut left = value_to_serde(lhs)?;
783 let right = value_to_serde(rhs)?;
784 match (&mut left, right) {
785 (serde_json::Value::Object(a), serde_json::Value::Object(b)) => {
786 for (k, v) in b {
787 a.insert(k, v);
788 }
789 }
790 (serde_json::Value::Array(a), serde_json::Value::Array(b)) => {
791 a.extend(b);
792 }
793 (serde_json::Value::Array(a), other) => {
794 a.push(other);
795 }
796 (a, serde_json::Value::Array(mut b)) => {
797 let owned = std::mem::take(a);
798 let mut combined = vec![owned];
799 combined.append(&mut b);
800 *a = serde_json::Value::Array(combined);
801 }
802 (a, b) => {
803 let av = std::mem::take(a);
804 *a = serde_json::Value::Array(vec![av, b]);
805 }
806 }
807 serde_to_value(left, target)
808}
809
810pub fn op_path_exists(lhs: &Value, path: &Value) -> Result<Value> {
811 let j = value_to_serde(lhs)?;
812 let path_str = match path {
813 Value::Text(s) => s.to_string(),
814 Value::Json(s) => s.to_string(),
815 _ => {
816 return Err(SqlError::TypeMismatch {
817 expected: "TEXT path".into(),
818 got: path.data_type().to_string(),
819 })
820 }
821 };
822 let jp = sql_json_path::JsonPath::new(&path_str)
823 .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
824 let exists = jp
825 .exists(&j)
826 .map_err(|e| SqlError::InvalidValue(format!("JSON path eval: {e}")))?;
827 Ok(Value::Boolean(exists))
828}
829
830pub fn op_path_match(lhs: &Value, path: &Value) -> Result<Value> {
831 let j = value_to_serde(lhs)?;
832 let path_str = match path {
833 Value::Text(s) => s.to_string(),
834 Value::Json(s) => s.to_string(),
835 _ => {
836 return Err(SqlError::TypeMismatch {
837 expected: "TEXT path".into(),
838 got: path.data_type().to_string(),
839 })
840 }
841 };
842 let jp = sql_json_path::JsonPath::new(&path_str)
843 .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
844 let result = jp
845 .query(&j)
846 .map_err(|e| SqlError::InvalidValue(format!("JSON path eval: {e}")))?;
847 let truthy = result
848 .iter()
849 .any(|node| matches!(node.as_ref(), serde_json::Value::Bool(true)));
850 Ok(Value::Boolean(truthy))
851}
852
853pub fn fn_json_exists(j_val: &Value, path: &Value) -> Result<Value> {
854 op_path_exists(j_val, path)
855}
856
857pub fn fn_json_value(j_val: &Value, path: &Value) -> Result<Value> {
858 let j = value_to_serde(j_val)?;
859 let path_str = match path {
860 Value::Text(s) => s.to_string(),
861 _ => {
862 return Err(SqlError::TypeMismatch {
863 expected: "TEXT path".into(),
864 got: path.data_type().to_string(),
865 })
866 }
867 };
868 let jp = sql_json_path::JsonPath::new(&path_str)
869 .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
870 match jp
871 .query_first(&j)
872 .map_err(|e| SqlError::InvalidValue(format!("JSON path eval: {e}")))?
873 {
874 Some(node) => match node.into_owned() {
875 serde_json::Value::Null => Ok(Value::Null),
876 serde_json::Value::String(s) => Ok(Value::Text(s.into())),
877 other => Ok(Value::Text(
878 serde_json::to_string(&other)
879 .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
880 .into(),
881 )),
882 },
883 None => Ok(Value::Null),
884 }
885}
886
887pub fn fn_json_query(j_val: &Value, path: &Value, target: crate::types::DataType) -> Result<Value> {
888 let j = value_to_serde(j_val)?;
889 let path_str = match path {
890 Value::Text(s) => s.to_string(),
891 _ => {
892 return Err(SqlError::TypeMismatch {
893 expected: "TEXT path".into(),
894 got: path.data_type().to_string(),
895 })
896 }
897 };
898 let jp = sql_json_path::JsonPath::new(&path_str)
899 .map_err(|e| SqlError::InvalidValue(format!("invalid JSON path: {e}")))?;
900 let nodes = jp
901 .query(&j)
902 .map_err(|e| SqlError::InvalidValue(format!("JSON path eval: {e}")))?;
903 if nodes.is_empty() {
904 return Ok(Value::Null);
905 }
906 let result_json = if nodes.len() == 1 {
907 nodes[0].as_ref().clone()
908 } else {
909 serde_json::Value::Array(nodes.iter().map(|n| n.as_ref().clone()).collect())
910 };
911 serde_to_value(result_json, target)
912}
913
914#[derive(Debug, Clone, PartialEq, Eq)]
915pub enum PathSeg {
916 Key(String),
917 Index(i64),
918 Wildcard,
919}
920
921pub fn parse_dollar_path(s: &str) -> Result<Vec<PathSeg>> {
922 let s = s.trim();
923 let s = s.strip_prefix('$').unwrap_or(s);
924 let bytes = s.as_bytes();
925 let mut out = Vec::new();
926 let mut i = 0;
927 while i < bytes.len() {
928 match bytes[i] {
929 b'.' => {
930 i += 1;
931 let start = i;
932 while i < bytes.len() && bytes[i] != b'.' && bytes[i] != b'[' {
933 i += 1;
934 }
935 if i > start {
936 let key = std::str::from_utf8(&bytes[start..i])
937 .map_err(|_| SqlError::InvalidValue("invalid path segment".into()))?;
938 out.push(PathSeg::Key(key.to_string()));
939 }
940 }
941 b'[' => {
942 i += 1;
943 let start = i;
944 while i < bytes.len() && bytes[i] != b']' {
945 i += 1;
946 }
947 if i > bytes.len() {
948 return Err(SqlError::InvalidValue("unterminated index".into()));
949 }
950 let inner = std::str::from_utf8(&bytes[start..i])
951 .map_err(|_| SqlError::InvalidValue("invalid path index".into()))?;
952 if inner.trim() == "*" {
953 out.push(PathSeg::Wildcard);
954 } else if let Ok(idx) = inner.parse::<i64>() {
955 out.push(PathSeg::Index(idx));
956 } else {
957 let key = inner.trim_matches('"').trim_matches('\'');
958 out.push(PathSeg::Key(key.to_string()));
959 }
960 i += 1;
961 }
962 _ => i += 1,
963 }
964 }
965 Ok(out)
966}
967
968fn path_to_segments(v: &Value) -> Result<Vec<PathSeg>> {
969 match v {
970 Value::Text(s) => {
971 if s.starts_with('$') {
972 parse_dollar_path(s)
973 } else if s.starts_with('{') && s.ends_with('}') {
974 parse_pg_array_path(s)
975 } else {
976 Ok(vec![PathSeg::Key(s.to_string())])
977 }
978 }
979 Value::Integer(i) => Ok(vec![PathSeg::Index(*i)]),
980 Value::Json(_) | Value::Jsonb(_) => {
981 let parsed = value_to_serde(v)?;
982 json_to_path(&parsed)
983 }
984 _ => Err(SqlError::TypeMismatch {
985 expected: "TEXT or path array".into(),
986 got: v.data_type().to_string(),
987 }),
988 }
989}
990
991fn parse_pg_array_path(s: &str) -> Result<Vec<PathSeg>> {
992 let inner = &s[1..s.len() - 1];
993 if inner.is_empty() {
994 return Ok(vec![]);
995 }
996 inner
997 .split(',')
998 .map(|raw| {
999 let trimmed = raw.trim().trim_matches('"');
1000 if let Ok(idx) = trimmed.parse::<i64>() {
1001 PathSeg::Index(idx)
1002 } else {
1003 PathSeg::Key(trimmed.to_string())
1004 }
1005 })
1006 .map(Ok)
1007 .collect()
1008}
1009
1010fn json_to_path(j: &serde_json::Value) -> Result<Vec<PathSeg>> {
1011 let arr = j
1012 .as_array()
1013 .ok_or_else(|| SqlError::InvalidValue("path must be a JSON array".into()))?;
1014 arr.iter()
1015 .map(|item| match item {
1016 serde_json::Value::String(s) => Ok(PathSeg::Key(s.clone())),
1017 serde_json::Value::Number(n) => n
1018 .as_i64()
1019 .map(PathSeg::Index)
1020 .ok_or_else(|| SqlError::InvalidValue("path index out of range".into())),
1021 _ => Err(SqlError::InvalidValue(
1022 "path segments must be strings or integers".into(),
1023 )),
1024 })
1025 .collect()
1026}
1027
1028fn navigate_one(j: &serde_json::Value, key: &Value) -> Option<serde_json::Value> {
1029 match (j, key) {
1030 (serde_json::Value::Object(m), Value::Text(k)) => m.get(k.as_str()).cloned(),
1031 (serde_json::Value::Array(arr), Value::Integer(i)) => {
1032 let len = arr.len() as i64;
1033 let idx = if *i < 0 { len + i } else { *i };
1034 if (0..len).contains(&idx) {
1035 Some(arr[idx as usize].clone())
1036 } else {
1037 None
1038 }
1039 }
1040 _ => None,
1041 }
1042}
1043
1044fn navigate_path(j: &serde_json::Value, segments: &[PathSeg]) -> Option<serde_json::Value> {
1045 let mut cur = j.clone();
1046 for seg in segments {
1047 cur = match (&cur, seg) {
1048 (serde_json::Value::Object(m), PathSeg::Key(k)) => m.get(k.as_str())?.clone(),
1049 (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1050 let len = arr.len() as i64;
1051 let idx = if *i < 0 { len + i } else { *i };
1052 if (0..len).contains(&idx) {
1053 arr[idx as usize].clone()
1054 } else {
1055 return None;
1056 }
1057 }
1058 (serde_json::Value::Array(arr), PathSeg::Key(k)) => {
1059 let idx: i64 = k.parse().ok()?;
1060 let len = arr.len() as i64;
1061 let idx = if idx < 0 { len + idx } else { idx };
1062 if (0..len).contains(&idx) {
1063 arr[idx as usize].clone()
1064 } else {
1065 return None;
1066 }
1067 }
1068 _ => return None,
1069 };
1070 }
1071 Some(cur)
1072}
1073
1074fn delete_at_path(j: &mut serde_json::Value, segments: &[PathSeg]) {
1075 if segments.is_empty() {
1076 return;
1077 }
1078 let (last, prefix) = segments.split_last().unwrap();
1079 let target = navigate_mut(j, prefix);
1080 if let Some(t) = target {
1081 match (t, last) {
1082 (serde_json::Value::Object(m), PathSeg::Key(k)) => {
1083 m.remove(k.as_str());
1084 }
1085 (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1086 let len = arr.len() as i64;
1087 let idx = if *i < 0 { len + i } else { *i };
1088 if (0..len).contains(&idx) {
1089 arr.remove(idx as usize);
1090 }
1091 }
1092 _ => {}
1093 }
1094 }
1095}
1096
1097fn navigate_mut<'a>(
1098 j: &'a mut serde_json::Value,
1099 segments: &[PathSeg],
1100) -> Option<&'a mut serde_json::Value> {
1101 let mut cur = j;
1102 for seg in segments {
1103 cur = match (cur, seg) {
1104 (serde_json::Value::Object(m), PathSeg::Key(k)) => m.get_mut(k.as_str())?,
1105 (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1106 let len = arr.len() as i64;
1107 let idx = if *i < 0 { len + i } else { *i };
1108 if (0..len).contains(&idx) {
1109 arr.get_mut(idx as usize)?
1110 } else {
1111 return None;
1112 }
1113 }
1114 _ => return None,
1115 };
1116 }
1117 Some(cur)
1118}
1119
1120fn json_contains(left: &serde_json::Value, right: &serde_json::Value) -> bool {
1121 match (left, right) {
1122 (serde_json::Value::Object(a), serde_json::Value::Object(b)) => b
1123 .iter()
1124 .all(|(k, v)| a.get(k).is_some_and(|av| json_contains(av, v))),
1125 (serde_json::Value::Array(a), serde_json::Value::Array(b)) => {
1126 b.iter().all(|bv| a.iter().any(|av| json_contains(av, bv)))
1127 }
1128 (serde_json::Value::Array(a), other) => a.iter().any(|av| json_contains(av, other)),
1129 (a, b) => a == b,
1130 }
1131}
1132
1133fn text_array(v: &Value) -> Result<Vec<String>> {
1134 match v {
1135 Value::Text(s) => Ok(vec![s.to_string()]),
1136 Value::Json(_) | Value::Jsonb(_) => {
1137 let j = value_to_serde(v)?;
1138 j.as_array()
1139 .ok_or_else(|| SqlError::InvalidValue("expected JSON text array".into()))?
1140 .iter()
1141 .map(|e| match e {
1142 serde_json::Value::String(s) => Ok(s.clone()),
1143 _ => Err(SqlError::InvalidValue(
1144 "array elements must be strings".into(),
1145 )),
1146 })
1147 .collect()
1148 }
1149 _ => Err(SqlError::TypeMismatch {
1150 expected: "TEXT array or JSON array".into(),
1151 got: v.data_type().to_string(),
1152 }),
1153 }
1154}
1155
1156pub fn agg_array(values: &[Value], target: crate::types::DataType) -> Result<Value> {
1157 let items: Result<Vec<serde_json::Value>> = values.iter().map(value_to_serde_lossy).collect();
1158 serde_to_value(serde_json::Value::Array(items?), target)
1159}
1160
1161pub fn materialize_json_table(
1162 source: &Value,
1163 spec: &crate::parser::JsonTableSpec,
1164) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
1165 if source.is_null() {
1166 let names = json_table_column_names(&spec.columns);
1167 return Ok((names, vec![]));
1168 }
1169 let root = value_to_serde(source)?;
1170 let root_segs = parse_dollar_path(&spec.root_path)?;
1171 let matches = json_table_walk(&root, &root_segs);
1172 let mut rows: Vec<Vec<Value>> = Vec::new();
1173 let mut ordinality_counter = 0i64;
1174 for m in matches {
1175 ordinality_counter += 1;
1176 emit_json_table_rows(
1177 &m,
1178 &spec.columns,
1179 ordinality_counter,
1180 &mut Vec::new(),
1181 &mut rows,
1182 )?;
1183 }
1184 let names = json_table_column_names(&spec.columns);
1185 Ok((names, rows))
1186}
1187
1188fn json_table_column_names(columns: &[crate::parser::JsonTableCol]) -> Vec<String> {
1189 use crate::parser::JsonTableCol as C;
1190 let mut out = Vec::new();
1191 for c in columns {
1192 match c {
1193 C::Named { name, .. } | C::Ordinality { name } => out.push(name.clone()),
1194 C::Nested { columns, .. } => out.extend(json_table_column_names(columns)),
1195 }
1196 }
1197 out
1198}
1199
1200fn json_table_walk(j: &serde_json::Value, segs: &[PathSeg]) -> Vec<serde_json::Value> {
1201 let mut frontier = vec![j.clone()];
1202 for seg in segs {
1203 let mut next = Vec::new();
1204 for cur in frontier {
1205 match (cur, seg) {
1206 (serde_json::Value::Object(m), PathSeg::Key(k)) => {
1207 if let Some(v) = m.get(k.as_str()) {
1208 next.push(v.clone());
1209 }
1210 }
1211 (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1212 let len = arr.len() as i64;
1213 let idx = if *i < 0 { len + i } else { *i };
1214 if (0..len).contains(&idx) {
1215 next.push(arr[idx as usize].clone());
1216 }
1217 }
1218 (serde_json::Value::Array(arr), PathSeg::Wildcard) => {
1219 next.extend(arr);
1220 }
1221 _ => {}
1222 }
1223 }
1224 frontier = next;
1225 }
1226 frontier
1227 .into_iter()
1228 .flat_map(|v| match v {
1229 serde_json::Value::Array(arr) if segs.last() == Some(&PathSeg::Wildcard) => arr,
1230 other => vec![other],
1231 })
1232 .collect()
1233}
1234
1235fn emit_json_table_rows(
1236 row_doc: &serde_json::Value,
1237 columns: &[crate::parser::JsonTableCol],
1238 parent_ordinality: i64,
1239 _prefix: &mut Vec<Value>,
1240 out: &mut Vec<Vec<Value>>,
1241) -> Result<()> {
1242 use crate::parser::JsonTableCol as C;
1243
1244 let mut scalars: Vec<(usize, Value)> = Vec::new();
1245 let mut nesteds: Vec<(usize, Vec<Vec<Value>>)> = Vec::new();
1246 let mut widths: Vec<usize> = Vec::with_capacity(columns.len());
1247
1248 for (idx, c) in columns.iter().enumerate() {
1249 match c {
1250 C::Named {
1251 ty, path, exists, ..
1252 } => {
1253 let segs = parse_dollar_path(path)?;
1254 let matches = json_table_walk(row_doc, &segs);
1255 let v = if *exists {
1256 Value::Boolean(!matches.is_empty())
1257 } else if matches.is_empty() {
1258 Value::Null
1259 } else {
1260 json_table_coerce(&matches[0], *ty)?
1261 };
1262 scalars.push((idx, v));
1263 widths.push(1);
1264 }
1265 C::Ordinality { .. } => {
1266 scalars.push((idx, Value::Integer(parent_ordinality)));
1267 widths.push(1);
1268 }
1269 C::Nested { path, columns } => {
1270 let segs = parse_dollar_path(path)?;
1271 let matches = json_table_walk(row_doc, &segs);
1272 let inner_width = json_table_column_names(columns).len();
1273 let mut inner: Vec<Vec<Value>> = Vec::new();
1274 let mut ord = 0i64;
1275 for m in matches {
1276 ord += 1;
1277 let mut empty_prefix: Vec<Value> = Vec::new();
1278 emit_json_table_rows(&m, columns, ord, &mut empty_prefix, &mut inner)?;
1279 }
1280 if inner.is_empty() {
1281 inner.push(vec![Value::Null; inner_width]);
1282 }
1283 nesteds.push((idx, inner));
1284 widths.push(inner_width);
1285 }
1286 }
1287 }
1288
1289 let offsets: Vec<usize> = widths
1290 .iter()
1291 .scan(0usize, |acc, w| {
1292 let cur = *acc;
1293 *acc += w;
1294 Some(cur)
1295 })
1296 .collect();
1297 let total: usize = widths.iter().sum();
1298
1299 if nesteds.is_empty() {
1300 let mut row = vec![Value::Null; total];
1301 for (idx, v) in scalars {
1302 row[offsets[idx]] = v;
1303 }
1304 out.push(row);
1305 return Ok(());
1306 }
1307
1308 let mut indices = vec![0usize; nesteds.len()];
1309 loop {
1310 let mut row = vec![Value::Null; total];
1311 for (idx, v) in &scalars {
1312 row[offsets[*idx]] = v.clone();
1313 }
1314 for (ni, (col_idx, group)) in nesteds.iter().enumerate() {
1315 let off = offsets[*col_idx];
1316 let inner_row = &group[indices[ni]];
1317 for (k, v) in inner_row.iter().enumerate() {
1318 row[off + k] = v.clone();
1319 }
1320 }
1321 out.push(row);
1322
1323 let mut k = indices.len();
1324 let done = loop {
1325 if k == 0 {
1326 break true;
1327 }
1328 k -= 1;
1329 indices[k] += 1;
1330 if indices[k] < nesteds[k].1.len() {
1331 break false;
1332 }
1333 indices[k] = 0;
1334 };
1335 if done {
1336 return Ok(());
1337 }
1338 }
1339}
1340
1341fn json_table_coerce(v: &serde_json::Value, target: crate::types::DataType) -> Result<Value> {
1342 use crate::types::DataType;
1343 if matches!(v, serde_json::Value::Null) {
1344 return Ok(Value::Null);
1345 }
1346 match (v, target) {
1347 (_, DataType::Json) => serde_to_value(v.clone(), DataType::Json),
1348 (_, DataType::Jsonb) => serde_to_value(v.clone(), DataType::Jsonb),
1349 (serde_json::Value::Number(n), DataType::Integer) => n
1350 .as_i64()
1351 .map(Value::Integer)
1352 .ok_or_else(|| SqlError::InvalidValue("JSON_TABLE: number not i64".into())),
1353 (serde_json::Value::Number(n), DataType::Real) => n
1354 .as_f64()
1355 .map(Value::Real)
1356 .ok_or_else(|| SqlError::InvalidValue("JSON_TABLE: number not f64".into())),
1357 (serde_json::Value::Bool(b), DataType::Boolean) => Ok(Value::Boolean(*b)),
1358 (serde_json::Value::String(s), DataType::Text) => Ok(Value::Text(s.clone().into())),
1359 _ => {
1360 let text_form = match v {
1361 serde_json::Value::String(s) => s.clone(),
1362 _ => serde_json::to_string(v)
1363 .map_err(|e| SqlError::InvalidValue(format!("JSON_TABLE render: {e}")))?,
1364 };
1365 let text_val = Value::Text(text_form.into());
1366 text_val.coerce_into(target).ok_or_else(|| {
1367 SqlError::InvalidValue(format!("JSON_TABLE: cannot coerce value to {target}"))
1368 })
1369 }
1370 }
1371}
1372
1373pub fn extract_gin_entries(value: &Value, ops: crate::types::GinOpsClass) -> Result<Vec<Vec<u8>>> {
1376 if value.is_null() {
1377 return Ok(vec![]);
1378 }
1379 let j = value_to_serde(value)?;
1380 let mut out: Vec<Vec<u8>> = Vec::new();
1381 extract_gin_walk(&j, ops, &mut out);
1382 out.sort();
1383 out.dedup();
1384 Ok(out)
1385}
1386
1387fn extract_gin_walk(j: &serde_json::Value, ops: crate::types::GinOpsClass, out: &mut Vec<Vec<u8>>) {
1388 use crate::types::GinOpsClass;
1389 match j {
1390 serde_json::Value::Object(m) => {
1391 for (k, v) in m {
1392 if matches!(ops, GinOpsClass::JsonbOps) {
1393 let mut key_entry = Vec::with_capacity(k.len() + 1);
1394 key_entry.push(0x01);
1395 key_entry.extend_from_slice(k.as_bytes());
1396 out.push(key_entry);
1397 }
1398 if let Some(s) = scalar_repr(v) {
1399 let mut pair = Vec::with_capacity(k.len() + s.len() + 2);
1400 pair.push(0x02);
1401 pair.extend_from_slice(k.as_bytes());
1402 pair.push(0x00);
1403 pair.extend_from_slice(s.as_bytes());
1404 out.push(pair);
1405 }
1406 extract_gin_walk(v, ops, out);
1407 }
1408 }
1409 serde_json::Value::Array(arr) => {
1410 for v in arr {
1411 if let Some(s) = scalar_repr(v) {
1412 let mut entry = Vec::with_capacity(s.len() + 1);
1413 entry.push(0x03);
1414 entry.extend_from_slice(s.as_bytes());
1415 out.push(entry);
1416 }
1417 extract_gin_walk(v, ops, out);
1418 }
1419 }
1420 _ => {}
1421 }
1422}
1423
1424fn scalar_repr(v: &serde_json::Value) -> Option<String> {
1425 match v {
1426 serde_json::Value::Null => Some("null".into()),
1427 serde_json::Value::Bool(true) => Some("true".into()),
1428 serde_json::Value::Bool(false) => Some("false".into()),
1429 serde_json::Value::String(s) => Some(s.clone()),
1430 serde_json::Value::Number(n) => Some(n.to_string()),
1431 _ => None,
1432 }
1433}
1434
1435pub fn agg_object(pairs: &[(Value, Value)], target: crate::types::DataType) -> Result<Value> {
1436 let mut map = serde_json::Map::new();
1437 for (k, v) in pairs {
1438 if k.is_null() {
1439 continue;
1440 }
1441 let key_str = match k {
1442 Value::Text(s) => s.to_string(),
1443 Value::Json(s) => s.to_string(),
1444 _ => format!("{k}"),
1445 };
1446 let val = value_to_serde_lossy(v)?;
1447 map.insert(key_str, val);
1448 }
1449 serde_to_value(serde_json::Value::Object(map), target)
1450}
1451
1452pub fn dispatch_srf(name: &str, args: &[Value]) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
1453 let upper = name.to_ascii_uppercase();
1454 match upper.as_str() {
1455 "JSONB_ARRAY_ELEMENTS" | "JSON_ARRAY_ELEMENTS" => {
1456 if args.len() != 1 {
1457 return Err(SqlError::InvalidValue(format!(
1458 "{name} requires 1 argument"
1459 )));
1460 }
1461 if args[0].is_null() {
1462 return Ok((vec!["value".into()], vec![]));
1463 }
1464 let j = value_to_serde(&args[0])?;
1465 let arr = j
1466 .as_array()
1467 .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON array")))?;
1468 let target = if upper.starts_with("JSONB") {
1469 crate::types::DataType::Jsonb
1470 } else {
1471 crate::types::DataType::Json
1472 };
1473 let rows: Result<Vec<Vec<Value>>> = arr
1474 .iter()
1475 .map(|v| serde_to_value(v.clone(), target).map(|val| vec![val]))
1476 .collect();
1477 Ok((vec!["value".into()], rows?))
1478 }
1479 "JSONB_ARRAY_ELEMENTS_TEXT" | "JSON_ARRAY_ELEMENTS_TEXT" => {
1480 if args.len() != 1 {
1481 return Err(SqlError::InvalidValue(format!(
1482 "{name} requires 1 argument"
1483 )));
1484 }
1485 if args[0].is_null() {
1486 return Ok((vec!["value".into()], vec![]));
1487 }
1488 let j = value_to_serde(&args[0])?;
1489 let arr = j
1490 .as_array()
1491 .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON array")))?;
1492 let rows: Vec<Vec<Value>> = arr
1493 .iter()
1494 .map(|v| {
1495 let text = match v {
1496 serde_json::Value::String(s) => s.clone(),
1497 _ => serde_json::to_string(v).unwrap_or_default(),
1498 };
1499 vec![Value::Text(text.into())]
1500 })
1501 .collect();
1502 Ok((vec!["value".into()], rows))
1503 }
1504 "JSONB_EACH" | "JSON_EACH" => {
1505 if args.len() != 1 {
1506 return Err(SqlError::InvalidValue(format!(
1507 "{name} requires 1 argument"
1508 )));
1509 }
1510 if args[0].is_null() {
1511 return Ok((vec!["key".into(), "value".into()], vec![]));
1512 }
1513 let j = value_to_serde(&args[0])?;
1514 let obj = j
1515 .as_object()
1516 .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON object")))?;
1517 let target = if upper.starts_with("JSONB") {
1518 crate::types::DataType::Jsonb
1519 } else {
1520 crate::types::DataType::Json
1521 };
1522 let rows: Result<Vec<Vec<Value>>> = obj
1523 .iter()
1524 .map(|(k, v)| {
1525 Ok(vec![
1526 Value::Text(k.clone().into()),
1527 serde_to_value(v.clone(), target)?,
1528 ])
1529 })
1530 .collect();
1531 Ok((vec!["key".into(), "value".into()], rows?))
1532 }
1533 "JSONB_EACH_TEXT" | "JSON_EACH_TEXT" => {
1534 if args.len() != 1 {
1535 return Err(SqlError::InvalidValue(format!(
1536 "{name} requires 1 argument"
1537 )));
1538 }
1539 if args[0].is_null() {
1540 return Ok((vec!["key".into(), "value".into()], vec![]));
1541 }
1542 let j = value_to_serde(&args[0])?;
1543 let obj = j
1544 .as_object()
1545 .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON object")))?;
1546 let rows: Vec<Vec<Value>> = obj
1547 .iter()
1548 .map(|(k, v)| {
1549 let text = match v {
1550 serde_json::Value::String(s) => s.clone(),
1551 _ => serde_json::to_string(v).unwrap_or_default(),
1552 };
1553 vec![Value::Text(k.clone().into()), Value::Text(text.into())]
1554 })
1555 .collect();
1556 Ok((vec!["key".into(), "value".into()], rows))
1557 }
1558 "JSONB_OBJECT_KEYS" | "JSON_OBJECT_KEYS" => {
1559 if args.len() != 1 {
1560 return Err(SqlError::InvalidValue(format!(
1561 "{name} requires 1 argument"
1562 )));
1563 }
1564 if args[0].is_null() {
1565 return Ok((vec!["key".into()], vec![]));
1566 }
1567 let j = value_to_serde(&args[0])?;
1568 let obj = j
1569 .as_object()
1570 .ok_or_else(|| SqlError::InvalidValue(format!("{name} requires JSON object")))?;
1571 let rows: Vec<Vec<Value>> = obj
1572 .keys()
1573 .map(|k| vec![Value::Text(k.clone().into())])
1574 .collect();
1575 Ok((vec!["key".into()], rows))
1576 }
1577 _ => Err(SqlError::Unsupported(format!(
1578 "set-returning function: {name}"
1579 ))),
1580 }
1581}
1582
1583pub fn is_srf_name(name: &str) -> bool {
1584 matches!(
1585 name.to_ascii_uppercase().as_str(),
1586 "JSONB_ARRAY_ELEMENTS"
1587 | "JSON_ARRAY_ELEMENTS"
1588 | "JSONB_ARRAY_ELEMENTS_TEXT"
1589 | "JSON_ARRAY_ELEMENTS_TEXT"
1590 | "JSONB_EACH"
1591 | "JSON_EACH"
1592 | "JSONB_EACH_TEXT"
1593 | "JSON_EACH_TEXT"
1594 | "JSONB_OBJECT_KEYS"
1595 | "JSON_OBJECT_KEYS"
1596 )
1597}
1598
1599pub fn extract_to_value(target: crate::types::DataType, j: serde_json::Value) -> Result<Value> {
1600 serde_to_value(j, target)
1601}
1602
1603pub fn to_scalar(j: serde_json::Value) -> Value {
1604 serde_to_scalar_value(j)
1605}
1606
1607pub fn fn_typeof(v: &Value) -> Result<Value> {
1608 if let Value::Jsonb(b) = v {
1609 let (ty, _, _) = read_header(b)?;
1610 let s = match ty {
1611 JsonbType::Null => "null",
1612 JsonbType::True | JsonbType::False => "boolean",
1613 JsonbType::Integer | JsonbType::Real => "number",
1614 JsonbType::String => "string",
1615 JsonbType::Array => "array",
1616 JsonbType::Object => "object",
1617 };
1618 return Ok(Value::Text(s.into()));
1619 }
1620 let j = value_to_serde(v)?;
1621 let s = match j {
1622 serde_json::Value::Null => "null",
1623 serde_json::Value::Bool(_) => "boolean",
1624 serde_json::Value::Number(_) => "number",
1625 serde_json::Value::String(_) => "string",
1626 serde_json::Value::Array(_) => "array",
1627 serde_json::Value::Object(_) => "object",
1628 };
1629 Ok(Value::Text(s.into()))
1630}
1631
1632pub fn fn_array_length(v: &Value) -> Result<Value> {
1633 if let Value::Jsonb(b) = v {
1634 return match array_len_bytes(b)? {
1635 Some(n) => Ok(Value::Integer(n as i64)),
1636 None => Err(SqlError::InvalidValue(
1637 "jsonb_array_length called on non-array".into(),
1638 )),
1639 };
1640 }
1641 let j = value_to_serde(v)?;
1642 match j {
1643 serde_json::Value::Array(arr) => Ok(Value::Integer(arr.len() as i64)),
1644 _ => Err(SqlError::InvalidValue(
1645 "jsonb_array_length called on non-array".into(),
1646 )),
1647 }
1648}
1649
1650pub fn fn_object_length(v: &Value) -> Result<Value> {
1651 if let Value::Jsonb(b) = v {
1652 return match object_len_bytes(b)? {
1653 Some(n) => Ok(Value::Integer(n as i64)),
1654 None => Err(SqlError::InvalidValue(
1655 "jsonb_object_length called on non-object".into(),
1656 )),
1657 };
1658 }
1659 let j = value_to_serde(v)?;
1660 match j {
1661 serde_json::Value::Object(m) => Ok(Value::Integer(m.len() as i64)),
1662 _ => Err(SqlError::InvalidValue(
1663 "jsonb_object_length called on non-object".into(),
1664 )),
1665 }
1666}
1667
1668pub fn fn_extract_path(
1669 args: &[Value],
1670 target: crate::types::DataType,
1671 as_text: bool,
1672) -> Result<Value> {
1673 let mut j = value_to_serde(&args[0])?;
1674 for key_val in &args[1..] {
1675 if key_val.is_null() {
1676 return Ok(Value::Null);
1677 }
1678 let key = match key_val {
1679 Value::Text(s) => s.to_string(),
1680 other => other.to_string(),
1681 };
1682 match &mut j {
1683 serde_json::Value::Object(m) => {
1684 if let Some(next) = m.remove(&key) {
1685 j = next;
1686 } else {
1687 return Ok(Value::Null);
1688 }
1689 }
1690 serde_json::Value::Array(arr) => {
1691 let idx: i64 = key.parse().map_err(|_| {
1692 SqlError::InvalidValue(format!("array path key not integer: {key}"))
1693 })?;
1694 let len = arr.len() as i64;
1695 let idx = if idx < 0 { len + idx } else { idx };
1696 if (0..len).contains(&idx) {
1697 j = arr.remove(idx as usize);
1698 } else {
1699 return Ok(Value::Null);
1700 }
1701 }
1702 _ => return Ok(Value::Null),
1703 }
1704 }
1705 if as_text {
1706 match j {
1707 serde_json::Value::Null => Ok(Value::Null),
1708 serde_json::Value::String(s) => Ok(Value::Text(s.into())),
1709 other => Ok(Value::Text(
1710 serde_json::to_string(&other)
1711 .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
1712 .into(),
1713 )),
1714 }
1715 } else {
1716 serde_to_value(j, target)
1717 }
1718}
1719
1720pub fn fn_sqlite_extract(j_val: &Value, path: &Value) -> Result<Value> {
1721 let path_str = match path {
1722 Value::Text(s) => s.to_string(),
1723 _ => {
1724 return Err(SqlError::TypeMismatch {
1725 expected: "TEXT path".into(),
1726 got: path.data_type().to_string(),
1727 })
1728 }
1729 };
1730 let j = value_to_serde(j_val)?;
1731 let segments = parse_dollar_path(&path_str)?;
1732 match navigate_path(&j, &segments) {
1733 Some(serde_json::Value::Null) => Ok(Value::Null),
1734 Some(serde_json::Value::String(s)) => Ok(Value::Text(s.into())),
1735 Some(other) => Ok(Value::Text(
1736 serde_json::to_string(&other)
1737 .map_err(|e| SqlError::InvalidValue(format!("JSON render: {e}")))?
1738 .into(),
1739 )),
1740 None => Ok(Value::Null),
1741 }
1742}
1743
1744pub fn fn_valid(v: &Value) -> Result<Value> {
1745 let s = match v {
1746 Value::Text(s) => s.as_str(),
1747 Value::Json(s) => s.as_str(),
1748 _ => return Ok(Value::Boolean(false)),
1749 };
1750 Ok(Value::Boolean(
1751 serde_json::from_str::<serde_json::Value>(s).is_ok(),
1752 ))
1753}
1754
1755pub fn fn_strip_nulls(v: &Value, target: crate::types::DataType) -> Result<Value> {
1756 let mut j = value_to_serde(v)?;
1757 strip_nulls_inplace(&mut j);
1758 serde_to_value(j, target)
1759}
1760
1761fn strip_nulls_inplace(j: &mut serde_json::Value) {
1762 match j {
1763 serde_json::Value::Object(m) => {
1764 m.retain(|_, v| !matches!(v, serde_json::Value::Null));
1765 for v in m.values_mut() {
1766 strip_nulls_inplace(v);
1767 }
1768 }
1769 serde_json::Value::Array(arr) => {
1770 for v in arr.iter_mut() {
1771 strip_nulls_inplace(v);
1772 }
1773 }
1774 _ => {}
1775 }
1776}
1777
1778pub fn fn_pretty(v: &Value) -> Result<Value> {
1779 let j = value_to_serde(v)?;
1780 let s = serde_json::to_string_pretty(&j)
1781 .map_err(|e| SqlError::InvalidValue(format!("JSON pretty render: {e}")))?;
1782 Ok(Value::Text(s.into()))
1783}
1784
1785pub fn fn_build_object(args: &[Value], target: crate::types::DataType) -> Result<Value> {
1786 if args.len() % 2 != 0 {
1787 return Err(SqlError::InvalidValue(
1788 "jsonb_build_object requires an even number of arguments".into(),
1789 ));
1790 }
1791 let mut map = serde_json::Map::new();
1792 for pair in args.chunks(2) {
1793 let key = match &pair[0] {
1794 Value::Null => continue,
1795 Value::Text(s) => s.to_string(),
1796 other => other.to_string(),
1797 };
1798 let val = value_to_serde_lossy(&pair[1])?;
1799 map.insert(key, val);
1800 }
1801 serde_to_value(serde_json::Value::Object(map), target)
1802}
1803
1804pub fn fn_build_array(args: &[Value], target: crate::types::DataType) -> Result<Value> {
1805 let items: Result<Vec<serde_json::Value>> = args.iter().map(value_to_serde_lossy).collect();
1806 serde_to_value(serde_json::Value::Array(items?), target)
1807}
1808
1809pub fn fn_set(
1810 j: &Value,
1811 path: &Value,
1812 new_value: &Value,
1813 create_missing: bool,
1814 target: crate::types::DataType,
1815) -> Result<Value> {
1816 let mut root = value_to_serde(j)?;
1817 let segments = path_to_segments(path)?;
1818 let new_serde = value_to_serde_lossy(new_value)?;
1819 if !set_at_path(&mut root, &segments, new_serde, create_missing, false) {
1820 return serde_to_value(root, target);
1821 }
1822 serde_to_value(root, target)
1823}
1824
1825pub fn fn_insert(
1826 j: &Value,
1827 path: &Value,
1828 new_value: &Value,
1829 insert_after: bool,
1830 target: crate::types::DataType,
1831) -> Result<Value> {
1832 let mut root = value_to_serde(j)?;
1833 let segments = path_to_segments(path)?;
1834 let new_serde = value_to_serde_lossy(new_value)?;
1835 set_at_path(&mut root, &segments, new_serde, true, insert_after);
1836 serde_to_value(root, target)
1837}
1838
1839fn set_at_path(
1840 root: &mut serde_json::Value,
1841 segments: &[PathSeg],
1842 new_value: serde_json::Value,
1843 create_missing: bool,
1844 insert_array: bool,
1845) -> bool {
1846 if segments.is_empty() {
1847 return false;
1848 }
1849 let (last, prefix) = segments.split_last().unwrap();
1850 let Some(target) = navigate_mut(root, prefix) else {
1851 return false;
1852 };
1853 match (target, last) {
1854 (serde_json::Value::Object(m), PathSeg::Key(k)) => {
1855 let exists = m.contains_key(k.as_str());
1856 if exists || create_missing {
1857 m.insert(k.clone(), new_value);
1858 true
1859 } else {
1860 false
1861 }
1862 }
1863 (serde_json::Value::Array(arr), PathSeg::Index(i)) => {
1864 let len = arr.len() as i64;
1865 let idx = if *i < 0 { len + i } else { *i };
1866 if insert_array {
1867 let target_pos = if idx <= 0 {
1868 0
1869 } else if idx >= len {
1870 arr.len()
1871 } else {
1872 idx as usize
1873 };
1874 arr.insert(target_pos, new_value);
1875 true
1876 } else if (0..len).contains(&idx) {
1877 arr[idx as usize] = new_value;
1878 true
1879 } else if create_missing {
1880 if idx < 0 {
1881 arr.insert(0, new_value);
1882 } else {
1883 arr.push(new_value);
1884 }
1885 true
1886 } else {
1887 false
1888 }
1889 }
1890 _ => false,
1891 }
1892}
1893
1894pub fn fn_to_json(v: &Value, target: crate::types::DataType) -> Result<Value> {
1895 let j = value_to_serde_lossy(v)?;
1896 serde_to_value(j, target)
1897}
1898
1899pub fn fn_json_object(args: &[Value]) -> Result<Value> {
1900 match args.len() {
1901 1 => {
1902 let j = value_to_serde(&args[0])?;
1903 let arr = j
1904 .as_array()
1905 .ok_or_else(|| SqlError::InvalidValue("json_object expects text array".into()))?;
1906 let mut map = serde_json::Map::new();
1907 let mut i = 0;
1908 while i + 1 < arr.len() {
1909 let key = arr[i]
1910 .as_str()
1911 .ok_or_else(|| SqlError::InvalidValue("json_object key must be string".into()))?
1912 .to_string();
1913 let val = arr[i + 1].clone();
1914 map.insert(key, val);
1915 i += 2;
1916 }
1917 serde_to_value(serde_json::Value::Object(map), crate::types::DataType::Json)
1918 }
1919 2 => {
1920 let keys = text_array(&args[0])?;
1921 let vals = text_array(&args[1])?;
1922 if keys.len() != vals.len() {
1923 return Err(SqlError::InvalidValue(
1924 "json_object: keys and values must be same length".into(),
1925 ));
1926 }
1927 let mut map = serde_json::Map::new();
1928 for (k, v) in keys.into_iter().zip(vals) {
1929 map.insert(k, serde_json::Value::String(v));
1930 }
1931 serde_to_value(serde_json::Value::Object(map), crate::types::DataType::Json)
1932 }
1933 _ => Err(SqlError::InvalidValue(
1934 "json_object requires 1 or 2 arguments".into(),
1935 )),
1936 }
1937}
1938
1939fn value_to_serde_lossy(v: &Value) -> Result<serde_json::Value> {
1940 match v {
1941 Value::Null => Ok(serde_json::Value::Null),
1942 Value::Boolean(b) => Ok(serde_json::Value::Bool(*b)),
1943 Value::Integer(i) => Ok(serde_json::Value::Number((*i).into())),
1944 Value::Real(r) => serde_json::Number::from_f64(*r)
1945 .map(serde_json::Value::Number)
1946 .ok_or_else(|| SqlError::InvalidValue("non-finite number".into())),
1947 Value::Text(s) => Ok(serde_json::Value::String(s.to_string())),
1948 Value::Json(s) => serde_json::from_str(s)
1949 .map_err(|e| SqlError::InvalidValue(format!("invalid JSON: {e}"))),
1950 Value::Jsonb(b) => decode_to_serde(b),
1951 Value::Blob(b) => {
1952 let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect();
1953 Ok(serde_json::Value::String(hex))
1954 }
1955 Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. } => {
1956 Ok(serde_json::Value::String(format!("{v}")))
1957 }
1958 }
1959}
1960
1961#[cfg(test)]
1962#[path = "json_tests.rs"]
1963mod tests;