rdf_borsh/
borsh_reader.rs

1// This is free and unencumbered software released into the public domain.
2
3extern crate alloc;
4
5use alloc::{boxed::Box, collections::BTreeMap, vec::Vec};
6use borsh::{
7    io::{Read, Result},
8    BorshDeserialize,
9};
10use core::error::Error;
11use lz4_flex::frame::FrameDecoder;
12use rdf_model::{
13    Countable, Enumerable, HeapQuad, MaybeDurable, MaybeIndexed, MaybeMutable, Source, Statement,
14};
15use rdf_reader::{Format, Reader};
16
17use crate::{parse_header, BorshQuad, BorshTerm, BorshTermId};
18
19pub struct BorshReader<R: Read> {
20    decompressor: FrameDecoder<R>,
21
22    term_dict: BTreeMap<BorshTermId<u16>, BorshTerm>,
23
24    quad_count: usize,
25    read_count: usize,
26}
27
28impl<R: Read> BorshReader<R> {
29    pub fn new(mut source: R) -> Result<Self> {
30        let quad_count = {
31            let mut buf = [0u8; 10];
32            source.read_exact(&mut buf)?;
33            parse_header(&mut &buf[..]).map_err(|_| borsh::io::ErrorKind::InvalidData)?
34        };
35
36        let mut decompressor = FrameDecoder::new(source);
37
38        let term_dict = {
39            Vec::<BorshTerm>::deserialize_reader(&mut decompressor)?
40                .into_iter()
41                .fold(BTreeMap::new(), |mut acc, term| {
42                    let id = BorshTermId::from(acc.len() as u16 + 1);
43                    acc.insert(id, term);
44                    acc
45                })
46        };
47
48        let _quad_count = {
49            let mut buf = [0u8; 4];
50            decompressor.read_exact(&mut buf)?;
51            u32::from_le_bytes(buf)
52        };
53
54        // rest of `source` is the quads section block
55
56        Ok(Self {
57            decompressor,
58            quad_count: quad_count as _,
59            term_dict,
60            read_count: 0usize,
61        })
62    }
63}
64
65impl<R: Read> Reader for BorshReader<R> {
66    fn format(&self) -> Format {
67        todo!() // TODO
68    }
69}
70
71impl<R: Read> Source for BorshReader<R> {}
72impl<R: Read> Enumerable for BorshReader<R> {}
73impl<R: Read> MaybeDurable for BorshReader<R> {}
74impl<R: Read> MaybeIndexed for BorshReader<R> {}
75impl<R: Read> MaybeMutable for BorshReader<R> {}
76
77impl<R: Read> Countable for BorshReader<R> {
78    fn count(&self) -> usize {
79        self.quad_count
80    }
81}
82
83impl<R: Read> Iterator for BorshReader<R> {
84    type Item = core::result::Result<Box<dyn Statement>, Box<dyn Error>>;
85
86    fn next(&mut self) -> Option<Self::Item> {
87        if self.len() == 0 {
88            return None;
89        }
90
91        let quad = match BorshQuad::<u16>::deserialize_reader(&mut self.decompressor) {
92            Ok(q) => q,
93            Err(e) => return Some(Err(Box::new(e))),
94        };
95
96        let Some(s) = self.term_dict.get(&quad.subject) else {
97            return Some(Err(Box::new(borsh::io::Error::new(
98                borsh::io::ErrorKind::InvalidData,
99                "subject has unknown term ID",
100            ))));
101        };
102        let Some(p) = self.term_dict.get(&quad.predicate) else {
103            return Some(Err(Box::new(borsh::io::Error::new(
104                borsh::io::ErrorKind::InvalidData,
105                "predicate has unknown term ID",
106            ))));
107        };
108        let Some(o) = self.term_dict.get(&quad.object) else {
109            return Some(Err(Box::new(borsh::io::Error::new(
110                borsh::io::ErrorKind::InvalidData,
111                "object has unknown term ID",
112            ))));
113        };
114
115        let stmt = if quad.context.is_zero() {
116            HeapQuad::from((s.0.clone(), p.0.clone(), o.0.clone()))
117        } else {
118            let Some(g) = self.term_dict.get(&quad.context) else {
119                return Some(Err(Box::new(borsh::io::Error::new(
120                    borsh::io::ErrorKind::InvalidData,
121                    "context has unknown term ID",
122                ))));
123            };
124            HeapQuad::from((s.0.clone(), p.0.clone(), o.0.clone(), g.0.clone()))
125        };
126
127        self.read_count += 1;
128
129        Some(Ok(Box::new(stmt)))
130    }
131
132    fn size_hint(&self) -> (usize, Option<usize>) {
133        let rem = self.quad_count - self.read_count;
134        (rem, Some(rem))
135    }
136}
137
138impl<R: Read> ExactSizeIterator for BorshReader<R> {}