rdf_borsh/
borsh_reader.rs1extern crate alloc;
4
5use alloc::{boxed::Box, collections::BTreeMap, vec::Vec};
6use borsh::{
7 io::{Read, Result},
8 BorshDeserialize,
9};
10use core::error::Error;
11use lz4_flex::frame::FrameDecoder;
12use rdf_model::{
13 Countable, Enumerable, HeapQuad, MaybeDurable, MaybeIndexed, MaybeMutable, Source, Statement,
14};
15use rdf_reader::{Format, Reader};
16
17use crate::{parse_header, BorshQuad, BorshTerm, BorshTermId};
18
19pub struct BorshReader<R: Read> {
20 decompressor: FrameDecoder<R>,
21
22 term_dict: BTreeMap<BorshTermId<u16>, BorshTerm>,
23
24 quad_count: usize,
25 read_count: usize,
26}
27
28impl<R: Read> BorshReader<R> {
29 pub fn new(mut source: R) -> Result<Self> {
30 let quad_count = {
31 let mut buf = [0u8; 10];
32 source.read_exact(&mut buf)?;
33 parse_header(&mut &buf[..]).map_err(|_| borsh::io::ErrorKind::InvalidData)?
34 };
35
36 let mut decompressor = FrameDecoder::new(source);
37
38 let term_dict = {
39 Vec::<BorshTerm>::deserialize_reader(&mut decompressor)?
40 .into_iter()
41 .fold(BTreeMap::new(), |mut acc, term| {
42 let id = BorshTermId::from(acc.len() as u16 + 1);
43 acc.insert(id, term);
44 acc
45 })
46 };
47
48 let _quad_count = {
49 let mut buf = [0u8; 4];
50 decompressor.read_exact(&mut buf)?;
51 u32::from_le_bytes(buf)
52 };
53
54 Ok(Self {
57 decompressor,
58 quad_count: quad_count as _,
59 term_dict,
60 read_count: 0usize,
61 })
62 }
63}
64
65impl<R: Read> Reader for BorshReader<R> {
66 fn format(&self) -> Format {
67 todo!() }
69}
70
71impl<R: Read> Source for BorshReader<R> {}
72impl<R: Read> Enumerable for BorshReader<R> {}
73impl<R: Read> MaybeDurable for BorshReader<R> {}
74impl<R: Read> MaybeIndexed for BorshReader<R> {}
75impl<R: Read> MaybeMutable for BorshReader<R> {}
76
77impl<R: Read> Countable for BorshReader<R> {
78 fn count(&self) -> usize {
79 self.quad_count
80 }
81}
82
83impl<R: Read> Iterator for BorshReader<R> {
84 type Item = core::result::Result<Box<dyn Statement>, Box<dyn Error>>;
85
86 fn next(&mut self) -> Option<Self::Item> {
87 if self.len() == 0 {
88 return None;
89 }
90
91 let quad = match BorshQuad::<u16>::deserialize_reader(&mut self.decompressor) {
92 Ok(q) => q,
93 Err(e) => return Some(Err(Box::new(e))),
94 };
95
96 let Some(s) = self.term_dict.get(&quad.subject) else {
97 return Some(Err(Box::new(borsh::io::Error::new(
98 borsh::io::ErrorKind::InvalidData,
99 "subject has unknown term ID",
100 ))));
101 };
102 let Some(p) = self.term_dict.get(&quad.predicate) else {
103 return Some(Err(Box::new(borsh::io::Error::new(
104 borsh::io::ErrorKind::InvalidData,
105 "predicate has unknown term ID",
106 ))));
107 };
108 let Some(o) = self.term_dict.get(&quad.object) else {
109 return Some(Err(Box::new(borsh::io::Error::new(
110 borsh::io::ErrorKind::InvalidData,
111 "object has unknown term ID",
112 ))));
113 };
114
115 let stmt = if quad.context.is_zero() {
116 HeapQuad::from((s.0.clone(), p.0.clone(), o.0.clone()))
117 } else {
118 let Some(g) = self.term_dict.get(&quad.context) else {
119 return Some(Err(Box::new(borsh::io::Error::new(
120 borsh::io::ErrorKind::InvalidData,
121 "context has unknown term ID",
122 ))));
123 };
124 HeapQuad::from((s.0.clone(), p.0.clone(), o.0.clone(), g.0.clone()))
125 };
126
127 self.read_count += 1;
128
129 Some(Ok(Box::new(stmt)))
130 }
131
132 fn size_hint(&self) -> (usize, Option<usize>) {
133 let rem = self.quad_count - self.read_count;
134 (rem, Some(rem))
135 }
136}
137
138impl<R: Read> ExactSizeIterator for BorshReader<R> {}