1use crate::copyterm::{self, TermBuf};
24use crate::machine::Machine;
25use crate::render::RenderedSolution;
26use std::io::{self, Write};
27
28pub struct Envelope<'a> {
31 pub count: usize,
32 pub exhausted: bool,
33 pub solutions: &'a [RenderedSolution],
34 pub program_output: Option<&'a str>,
38 pub atoms: Option<&'a [String]>,
43}
44
45impl<'a> Envelope<'a> {
46 pub fn from_machine(m: &'a Machine, exhausted: bool) -> Self {
50 Self {
51 count: m.solutions.len(),
52 exhausted,
53 solutions: &m.solutions,
54 program_output: m.captured_output(),
55 atoms: None,
56 }
57 }
58}
59
60pub enum WireError {
67 Parse(String),
68 Runtime(String),
69}
70
71#[repr(C)]
78pub struct EncoderDesc {
79 pub name: &'static str,
81 pub write_envelope: fn(&mut dyn Write, &Machine, &Envelope) -> io::Result<()>,
82 pub write_error: fn(&mut dyn Write, &WireError) -> io::Result<()>,
83 pub can_stream: fn() -> bool,
86}
87
88impl EncoderDesc {
89 pub unsafe fn find(
95 caps: *const *const EncoderDesc,
96 len: usize,
97 name: &str,
98 ) -> Option<&'static EncoderDesc> {
99 let slice = unsafe { std::slice::from_raw_parts(caps, len) };
100 for &p in slice {
101 let d = unsafe { &*p };
102 if d.name == name {
103 return Some(d);
104 }
105 }
106 None
107 }
108}
109
110fn text_write_envelope(w: &mut dyn Write, _m: &Machine, e: &Envelope) -> io::Result<()> {
120 if e.solutions.is_empty() {
121 return w.write_all(b"false.\n");
122 }
123 for sol in e.solutions {
124 if sol.bindings.is_empty() {
125 w.write_all(b"true.\n")?;
126 continue;
127 }
128 for b in &sol.bindings {
129 writeln!(w, "{} = {}", b.name, b.text)?;
130 }
131 }
132 Ok(())
133}
134
135fn text_write_error(w: &mut dyn Write, err: &WireError) -> io::Result<()> {
136 let msg = match err {
137 WireError::Parse(m) | WireError::Runtime(m) => m,
138 };
139 writeln!(w, "error: {msg}")
140}
141
142const fn text_can_stream() -> bool {
143 true
144}
145
146#[unsafe(no_mangle)]
148pub static PLG_ENC_TEXT: EncoderDesc = EncoderDesc {
149 name: "text",
150 write_envelope: text_write_envelope,
151 write_error: text_write_error,
152 can_stream: text_can_stream,
153};
154
155fn serialize_termbuf(tb: &TermBuf) -> Vec<u8> {
184 let mut out = Vec::with_capacity(13 + tb.cells.len() * 8);
185 out.push(0x01); out.extend_from_slice(&(tb.cells.len() as u32).to_le_bytes());
187 out.extend_from_slice(&tb.root.to_le_bytes());
188 for c in &tb.cells {
189 out.extend_from_slice(&c.to_le_bytes());
190 }
191 out
192}
193
194const T_STRING: u8 = 0x02;
196const T_DOCUMENT: u8 = 0x03;
197const T_ARRAY: u8 = 0x04;
198const T_BINARY: u8 = 0x05;
199const T_BOOL: u8 = 0x08;
200const T_INT32: u8 = 0x10;
201const T_INT64: u8 = 0x12;
202
203fn bson_cstring(buf: &mut Vec<u8>, s: &str) {
204 buf.extend_from_slice(s.as_bytes());
205 buf.push(0x00);
206}
207
208fn bson_doc_begin(buf: &mut Vec<u8>) -> usize {
209 let start = buf.len();
210 buf.extend_from_slice(&[0; 4]); start
212}
213
214fn bson_doc_end(buf: &mut Vec<u8>, start: usize) {
215 buf.push(0x00); let len = i32::try_from(buf.len() - start).expect("bson doc < 2GB");
217 buf[start..start + 4].copy_from_slice(&len.to_le_bytes());
218}
219
220fn bson_atoms_array(buf: &mut Vec<u8>, names: &[String]) {
223 buf.push(T_ARRAY);
224 bson_cstring(buf, "atoms");
225 let arr = bson_doc_begin(buf);
226 for (i, name) in names.iter().enumerate() {
227 buf.push(T_STRING);
228 bson_cstring(buf, &i.to_string());
229 let len = i32::try_from(name.len() + 1).expect("atom name < 2GB");
230 buf.extend_from_slice(&len.to_le_bytes());
231 buf.extend_from_slice(name.as_bytes());
232 buf.push(0x00);
233 }
234 bson_doc_end(buf, arr);
235}
236
237pub fn write_atom_map_bson<W: Write>(w: &mut W, m: &Machine) -> io::Result<()> {
242 let names: Vec<String> = (0..m.atoms.len())
243 .map(|i| {
244 m.atoms
245 .try_resolve(i as u32)
246 .unwrap_or_default()
247 .to_string()
248 })
249 .collect();
250 let mut buf = Vec::new();
251 let doc = bson_doc_begin(&mut buf);
252 buf.push(T_INT32);
253 bson_cstring(&mut buf, "count");
254 buf.extend_from_slice(&(names.len().min(i32::MAX as usize) as i32).to_le_bytes());
255 bson_atoms_array(&mut buf, &names);
256 bson_doc_end(&mut buf, doc);
257 w.write_all(&buf)
258}
259
260pub fn write_atom_map_text<W: Write>(w: &mut W, m: &Machine) -> io::Result<()> {
262 for i in 0..m.atoms.len() {
263 let name = m.atoms.try_resolve(i as u32).unwrap_or_default();
264 writeln!(w, "{i}\t{name}")?;
265 }
266 Ok(())
267}
268
269fn bson_write_envelope(w: &mut dyn Write, m: &Machine, e: &Envelope) -> io::Result<()> {
270 let mut buf = Vec::new();
271 let doc = bson_doc_begin(&mut buf);
272
273 buf.push(T_INT32);
274 bson_cstring(&mut buf, "count");
275 buf.extend_from_slice(&(e.count.min(i32::MAX as usize) as i32).to_le_bytes());
276
277 buf.push(T_BOOL);
278 bson_cstring(&mut buf, "exhausted");
279 buf.push(if e.exhausted { 0x01 } else { 0x00 });
280
281 if let Some(out) = e.program_output {
282 buf.push(T_STRING);
283 bson_cstring(&mut buf, "output");
284 let len = i32::try_from(out.len() + 1).expect("output string < 2GB");
285 buf.extend_from_slice(&len.to_le_bytes());
286 buf.extend_from_slice(out.as_bytes());
287 buf.push(0x00);
288 }
289
290 if let Some(names) = e.atoms {
294 bson_atoms_array(&mut buf, names);
295 }
296
297 buf.push(T_ARRAY);
298 bson_cstring(&mut buf, "solutions");
299 let arr = bson_doc_begin(&mut buf);
300 for (i, sol) in e.solutions.iter().enumerate() {
301 buf.push(T_DOCUMENT);
302 bson_cstring(&mut buf, &i.to_string());
303 let sdoc = bson_doc_begin(&mut buf);
304 for b in &sol.bindings {
305 let tb = copyterm::copy_to_buf(m, b.word);
306 let payload = serialize_termbuf(&tb);
307 buf.push(T_BINARY);
308 bson_cstring(&mut buf, &b.name);
309 let len = i32::try_from(payload.len()).expect("termbuf < 2GB");
310 buf.extend_from_slice(&len.to_le_bytes());
311 buf.push(0x00); buf.extend_from_slice(&payload);
313 }
314 bson_doc_end(&mut buf, sdoc);
315 }
316 bson_doc_end(&mut buf, arr);
317
318 bson_doc_end(&mut buf, doc);
319 w.write_all(&buf)
320}
321
322fn bson_write_error(w: &mut dyn Write, err: &WireError) -> io::Result<()> {
323 let msg = match err {
324 WireError::Parse(m) | WireError::Runtime(m) => m,
325 };
326 let mut buf = Vec::new();
327 let doc = bson_doc_begin(&mut buf);
328 buf.push(T_STRING);
329 bson_cstring(&mut buf, "error");
330 let len = i32::try_from(msg.len() + 1).expect("error message < 2GB");
331 buf.extend_from_slice(&len.to_le_bytes());
332 buf.extend_from_slice(msg.as_bytes());
333 buf.push(0x00);
334 bson_doc_end(&mut buf, doc);
335 w.write_all(&buf)
336}
337
338const fn bson_can_stream() -> bool {
339 false
340}
341
342#[unsafe(no_mangle)]
343pub static PLG_ENC_BSON: EncoderDesc = EncoderDesc {
344 name: "bson",
345 write_envelope: bson_write_envelope,
346 write_error: bson_write_error,
347 can_stream: bson_can_stream,
348};
349
350#[derive(Debug)]
362pub struct ParsedRequest {
363 pub query: String,
364 pub limit: Option<usize>,
365}
366
367pub fn parse_bson_request(buf: &[u8]) -> Result<ParsedRequest, String> {
372 if buf.len() < 5 {
373 return Err("bson request too short".to_string());
374 }
375 let total = i32::from_le_bytes(buf[0..4].try_into().unwrap()) as usize;
376 if total < 5 || total > buf.len() {
377 return Err(format!(
378 "bson request length mismatch: declared {total}, have {}",
379 buf.len()
380 ));
381 }
382 let body = &buf[..total];
383 let mut off = 4; let end = total - 1; let mut query = None;
386 let mut limit = None;
387 while off < end {
388 let ty = body[off];
389 off += 1;
390 let (key, after_key) = read_cstring(body, off)?;
391 off = after_key;
392 match (ty, key.as_str()) {
393 (T_STRING, "query") => {
394 let (s, next) = read_string(body, off)?;
395 query = Some(s);
396 off = next;
397 }
398 (T_INT32, "limit") => {
399 let n = read_i32(body, off)?;
400 limit = Some(n.max(0) as usize);
401 off += 4;
402 }
403 (T_INT64, "limit") => {
404 let n = read_i64(body, off)?;
405 limit = Some(n.max(0) as usize);
406 off += 8;
407 }
408 _ => {
409 off = skip_value(body, off, ty)?;
410 }
411 }
412 }
413 let query = query.ok_or_else(|| "bson request missing required 'query' string".to_string())?;
414 Ok(ParsedRequest { query, limit })
415}
416
417fn read_cstring(buf: &[u8], mut off: usize) -> Result<(String, usize), String> {
418 let end = buf[off..]
419 .iter()
420 .position(|&b| b == 0)
421 .ok_or_else(|| "bson key not null-terminated".to_string())?;
422 let s = std::str::from_utf8(&buf[off..off + end])
423 .map_err(|_| "bson key not utf-8".to_string())?
424 .to_string();
425 off += end + 1;
426 Ok((s, off))
427}
428
429fn read_string(buf: &[u8], off: usize) -> Result<(String, usize), String> {
430 let n = read_i32(buf, off)? as usize;
431 if n == 0 || off + 4 + n > buf.len() {
432 return Err("bson string length out of range".to_string());
433 }
434 let s = std::str::from_utf8(&buf[off + 4..off + 4 + n - 1])
435 .map_err(|_| "bson string not utf-8".to_string())?
436 .to_string();
437 Ok((s, off + 4 + n))
438}
439
440fn read_i32(buf: &[u8], off: usize) -> Result<i32, String> {
441 buf[off..]
442 .get(..4)
443 .map(|b| i32::from_le_bytes(b.try_into().unwrap()))
444 .ok_or_else(|| "bson int32 truncated".to_string())
445}
446
447fn read_i64(buf: &[u8], off: usize) -> Result<i64, String> {
448 buf[off..]
449 .get(..8)
450 .map(|b| i64::from_le_bytes(b.try_into().unwrap()))
451 .ok_or_else(|| "bson int64 truncated".to_string())
452}
453
454fn skip_value(buf: &[u8], off: usize, ty: u8) -> Result<usize, String> {
456 match ty {
457 0x01 => Ok(off + 8), T_STRING => Ok(read_string(buf, off)?.1), T_DOCUMENT | T_ARRAY => {
460 let n = read_i32(buf, off)? as usize;
461 Ok(off + n)
462 }
463 T_BINARY => {
464 let n = read_i32(buf, off)? as usize;
465 Ok(off + 4 + 1 + n)
466 }
467 T_BOOL => Ok(off + 1),
468 0x0A => Ok(off), T_INT32 => Ok(off + 4),
470 T_INT64 => Ok(off + 8),
471 _ => Err(format!("bson: cannot skip unknown element type {ty:#x}")),
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use crate::cell::{TAG_STR, make, make_atom, make_int, pack_functor, payload, tag_of};
479 use plg_shared::StringInterner;
480 use plg_shared::atom::ATOM_NIL;
481
482 fn machine() -> Box<Machine> {
483 Machine::new(StringInterner::new(), Vec::new())
484 }
485
486 fn bytes(f: impl FnOnce(&mut Vec<u8>) -> io::Result<()>) -> Vec<u8> {
487 let mut buf = Vec::new();
488 f(&mut buf).unwrap();
489 buf
490 }
491
492 fn env_with(bindings: Vec<(&str, &str)>) -> Vec<RenderedSolution> {
495 bindings
496 .into_iter()
497 .map(|(n, t)| RenderedSolution {
498 bindings: vec![crate::render::Binding {
499 name: n.to_string(),
500 text: t.to_string(),
501 word: make_atom(0),
502 }],
503 })
504 .collect()
505 }
506
507 #[test]
508 fn text_empty_is_false() {
509 let e = Envelope {
510 count: 0,
511 exhausted: true,
512 solutions: &[],
513 program_output: None,
514 atoms: None,
515 };
516 assert_eq!(
517 String::from_utf8(bytes(|w| (PLG_ENC_TEXT.write_envelope)(w, &machine(), &e))).unwrap(),
518 "false.\n"
519 );
520 }
521
522 #[test]
523 fn text_renders_bindings_and_true() {
524 let sols = env_with(vec![("X", "auth")]);
525 let empty_sols = vec![RenderedSolution { bindings: vec![] }];
526 let e1 = Envelope {
528 count: 1,
529 exhausted: false,
530 solutions: &sols,
531 program_output: None,
532 atoms: None,
533 };
534 assert_eq!(
535 String::from_utf8(bytes(|w| (PLG_ENC_TEXT.write_envelope)(w, &machine(), &e1)))
536 .unwrap(),
537 "X = auth\n"
538 );
539 let e2 = Envelope {
540 count: 1,
541 exhausted: true,
542 solutions: &empty_sols,
543 program_output: None,
544 atoms: None,
545 };
546 assert_eq!(
547 String::from_utf8(bytes(|w| (PLG_ENC_TEXT.write_envelope)(w, &machine(), &e2)))
548 .unwrap(),
549 "true.\n"
550 );
551 }
552
553 #[test]
554 fn descriptors_named_and_streaming() {
555 assert_eq!(PLG_ENC_TEXT.name, "text");
556 assert_eq!(PLG_ENC_BSON.name, "bson");
557 assert!((PLG_ENC_TEXT.can_stream)());
558 assert!(!(PLG_ENC_BSON.can_stream)());
559 }
560
561 #[test]
562 fn find_locates_advertised_encoders() {
563 let caps: [*const EncoderDesc; 2] = [&PLG_ENC_TEXT, &PLG_ENC_BSON];
564 assert_eq!(
565 unsafe { EncoderDesc::find(caps.as_ptr(), 2, "text") }
566 .unwrap()
567 .name,
568 "text"
569 );
570 assert_eq!(
571 unsafe { EncoderDesc::find(caps.as_ptr(), 2, "bson") }
572 .unwrap()
573 .name,
574 "bson"
575 );
576 assert!(unsafe { EncoderDesc::find(caps.as_ptr(), 2, "json") }.is_none());
577 }
578
579 #[test]
580 fn find_omitted_encoder_is_none() {
581 let caps: [*const EncoderDesc; 1] = [&PLG_ENC_TEXT];
582 assert!(unsafe { EncoderDesc::find(caps.as_ptr(), 1, "bson") }.is_none());
583 }
584
585 fn bson_doc_len(buf: &[u8]) -> i32 {
588 i32::from_le_bytes(buf[0..4].try_into().unwrap())
589 }
590
591 fn assert_valid_bson_doc(buf: &[u8]) {
592 assert_eq!(
593 bson_doc_len(buf) as usize,
594 buf.len(),
595 "bson doc self-delimits"
596 );
597 assert_eq!(
598 *buf.last().unwrap(),
599 0x00,
600 "bson doc ends in null terminator"
601 );
602 }
603
604 #[test]
605 fn bson_empty_envelope_self_delimits() {
606 let m = machine();
607 let e = Envelope {
608 count: 0,
609 exhausted: true,
610 solutions: &[],
611 program_output: None,
612 atoms: None,
613 };
614 let buf = bytes(|w| (PLG_ENC_BSON.write_envelope)(w, &m, &e));
615 assert_valid_bson_doc(&buf);
616 assert!(contains_key(&buf, b"count"));
617 assert!(contains_key(&buf, b"exhausted"));
618 assert!(contains_key(&buf, b"solutions"));
619 }
620
621 #[test]
622 fn bson_error_document_valid() {
623 let buf = bytes(|w| (PLG_ENC_BSON.write_error)(w, &WireError::Runtime("boom".into())));
624 assert_valid_bson_doc(&buf);
625 assert!(contains_key(&buf, b"error"));
626 }
627
628 fn contains_key(buf: &[u8], key: &[u8]) -> bool {
629 let mut needle = key.to_vec();
630 needle.push(0x00);
631 buf.windows(needle.len()).any(|w| w == needle.as_slice())
632 }
633
634 fn deserialize_termbuf(data: &[u8]) -> TermBuf {
637 assert_eq!(data[0], 0x01, "format version");
638 let n = u32::from_le_bytes(data[1..5].try_into().unwrap()) as usize;
639 let root = u64::from_le_bytes(data[5..13].try_into().unwrap());
640 let mut cells = Vec::with_capacity(n);
641 for i in 0..n {
642 let off = 13 + i * 8;
643 cells.push(u64::from_le_bytes(data[off..off + 8].try_into().unwrap()));
644 }
645 TermBuf { cells, root }
646 }
647
648 #[test]
649 fn termbuf_framing_roundtrips_scalar_and_cycle() {
650 let m = machine();
651 let a = make_atom(7);
652 let tb = copyterm::copy_to_buf(&m, a);
653 assert!(tb.cells.is_empty());
654 let rt = deserialize_termbuf(&serialize_termbuf(&tb));
655 assert_eq!(rt.root, a);
656
657 let mut m = machine();
659 let x = m.new_var();
660 let s = {
661 let i = m.heap.len();
662 m.heap.push(pack_functor(3, 1));
663 m.heap.push(x);
664 make(TAG_STR, i as u64)
665 };
666 m.bind(payload(x) as usize, s);
667 let tb = copyterm::copy_to_buf(&m, s);
668 let rt = deserialize_termbuf(&serialize_termbuf(&tb));
669 let restored = copyterm::restore_from_buf(&mut m, &rt);
670 assert_eq!(tag_of(restored), TAG_STR);
671 let ri = payload(restored) as usize;
672 assert_eq!(
673 m.deref(m.heap[ri + 1]),
674 restored,
675 "f(X) arg is the term itself"
676 );
677 }
678
679 fn req_doc(fields: &[(u8, &str, &[u8])]) -> Vec<u8> {
682 let mut buf = Vec::new();
683 let start = bson_doc_begin(&mut buf);
684 for (ty, key, val) in fields {
685 buf.push(*ty);
686 bson_cstring(&mut buf, key);
687 buf.extend_from_slice(val);
688 }
689 bson_doc_end(&mut buf, start);
690 buf
691 }
692 fn bson_int32(n: i32) -> Vec<u8> {
693 n.to_le_bytes().to_vec()
694 }
695 fn bson_str(s: &str) -> Vec<u8> {
696 let mut v = (s.len() as i32 + 1).to_le_bytes().to_vec();
697 v.extend_from_slice(s.as_bytes());
698 v.push(0x00);
699 v
700 }
701
702 #[test]
703 fn parses_query_and_int32_limit() {
704 let doc = req_doc(&[
705 (T_STRING, "query", &bson_str("p(X)")),
706 (T_INT32, "limit", &bson_int32(5)),
707 ]);
708 let r = parse_bson_request(&doc).unwrap();
709 assert_eq!(r.query, "p(X)");
710 assert_eq!(r.limit, Some(5));
711 }
712
713 #[test]
714 fn ignores_unknown_fields() {
715 let doc = req_doc(&[
716 (T_STRING, "caller", &bson_str("x")),
717 (T_STRING, "query", &bson_str("ok")),
718 ]);
719 assert_eq!(parse_bson_request(&doc).unwrap().query, "ok");
720 }
721
722 #[test]
723 fn missing_query_is_an_error() {
724 let doc = req_doc(&[(T_INT32, "limit", &bson_int32(3))]);
725 assert!(
726 parse_bson_request(&doc)
727 .unwrap_err()
728 .contains("missing required 'query'")
729 );
730 }
731
732 #[test]
734 fn _keep_imports_used() {
735 let _ = make_int(1);
736 let _ = ATOM_NIL;
737 }
738}