gistools/readers/json/
large_json.rs

1use super::ToGisJSON;
2use crate::{
3    geometry::ConvertFeature,
4    parsers::{FeatureReader, Reader},
5};
6use alloc::{
7    string::{String, ToString},
8    vec,
9    vec::Vec,
10};
11use core::{cell::RefCell, marker::PhantomData};
12use s2json::{Features, MValue, VectorFeature};
13use serde::de::DeserializeOwned;
14
15// TODO: Currently a mediocre solution for multi-threading, but it works for now.
16// Ideally, we find a position in the file at `(length / pool_size) * thread_id` and read from there.
17
18const LEFT_BRACE: u8 = 0x7b;
19const RIGHT_BRACE: u8 = 0x7d;
20const BACKSLASH: u8 = 0x5c;
21const STRING: u8 = 0x22;
22
23#[derive(Debug, Clone)]
24struct JSONParser {
25    buffer: Vec<u8>,
26    chunk_size: u64,
27    offset: u64,
28    pos: usize,
29    brace_depth: isize,
30    feature: Vec<Vec<u8>>,
31    start: Option<usize>,
32    end: Option<usize>,
33    is_object: bool,
34}
35impl Default for JSONParser {
36    fn default() -> Self {
37        JSONParser {
38            buffer: vec![],
39            chunk_size: 65_536,
40            offset: 0,
41            pos: 0,
42            brace_depth: 0,
43            feature: vec![],
44            start: None,
45            end: None,
46            is_object: true,
47        }
48    }
49}
50impl JSONParser {
51    /// Reset to the beginning
52    pub fn setup<
53        T: Reader,
54        M: Clone + DeserializeOwned,
55        P: Clone + Default + DeserializeOwned,
56        D: Clone + Default + DeserializeOwned,
57    >(
58        &mut self,
59        reader: &JSONReader<T, M, P, D>,
60    ) {
61        *self = JSONParser::default();
62        self.chunk_size = u64::min(65_536, reader.length - self.offset);
63        self.buffer = reader.reader.slice(Some(0), Some(self.chunk_size)).to_vec();
64    }
65}
66
67/// # JSON Reader
68///
69/// ## Description
70/// Parse (Geo|S2)JSON. Can handle millions of features.
71///
72/// Implements the [`FeatureReader`] trait
73///
74/// ## Usage
75/// ```rust
76/// use gistools::{parsers::{FileReader, FeatureReader}, readers::JSONReader};
77/// use std::path::PathBuf;
78///
79/// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
80/// path = path.join("tests/writers/fixtures/points.geojson");
81///
82/// let reader: JSONReader<_> = JSONReader::new(FileReader::from(path));
83/// let features: Vec<_> = reader.iter().collect();
84/// assert_eq!(features.len(), 3);
85/// ```
86#[derive(Debug, Clone)]
87pub struct JSONReader<
88    T: Reader,
89    M: Clone + DeserializeOwned = (),
90    P: Clone + Default + DeserializeOwned = MValue,
91    D: Clone + Default + DeserializeOwned = MValue,
92> {
93    reader: T,
94    length: u64,
95    parser: RefCell<JSONParser>,
96    _phantom: PhantomData<VectorFeature<M, P, D>>,
97}
98impl<
99    T: Reader,
100    M: Clone + DeserializeOwned,
101    P: Clone + Default + DeserializeOwned,
102    D: Clone + Default + DeserializeOwned,
103> JSONReader<T, M, P, D>
104{
105    /// Create a new JSONReader
106    pub fn new(reader: T) -> JSONReader<T, M, P, D> {
107        let length = reader.len();
108        let json_reader = JSONReader {
109            reader,
110            length,
111            parser: RefCell::new(JSONParser::default()),
112            _phantom: PhantomData,
113        };
114        json_reader.reset();
115
116        json_reader
117    }
118
119    /// Reset to the beginning
120    pub fn reset(&self) {
121        // reset the parser reading in the first chunk
122        self.parser.borrow_mut().setup(self);
123        // find out starting position
124        let set = self.set_start_position();
125        if !set {
126            panic!("File is not geojson or s2json");
127        }
128    }
129
130    /// since we know that a '{' is the start of a feature after we read a '"features"',
131    /// than we start there to avoid reading in values that are not features.
132    /// This is a modified Knuth–Morris–Pratt algorithm
133    fn set_start_position(&self) -> bool {
134        let features = "\"features\":".as_bytes();
135        let features_size = features.len();
136        let mut parser = self.parser.borrow_mut();
137
138        let mut k = 0;
139        while parser.pos < parser.chunk_size as usize {
140            if features[k] == parser.buffer[parser.pos] {
141                k += 1;
142                parser.pos += 1;
143                if k == features_size {
144                    return true;
145                }
146            } else {
147                k = 0;
148                parser.pos += 1;
149            }
150        }
151        // if we made it here, we need to read in the next buffer chunk.
152        // If we hit the end of the file, return false
153        parser.offset += parser.chunk_size;
154        if parser.offset < self.length {
155            parser.pos = 0;
156            let chunk_size = u64::min(65_536, self.length - parser.offset);
157            parser.buffer =
158                self.reader.slice(Some(parser.offset), Some(parser.offset + chunk_size));
159            drop(parser);
160            self.set_start_position()
161        } else {
162            false
163        }
164    }
165
166    fn parse_line(&self, line: &str) -> Option<VectorFeature<M, P, D>> {
167        if line.len() > 1
168            && let Ok(feature) = line.to_features()
169        {
170            match feature {
171                Features::Feature(feature) => {
172                    return Some(feature.to_vector(Some(true)));
173                }
174                Features::VectorFeature(vf) => {
175                    return Some(vf);
176                }
177            }
178        }
179        None
180    }
181
182    /// everytime we see a "{" we start 'recording' the feature. If we see more "{" on our journey, we increment.
183    /// Once we find the end of the feature, store the "start" and "end" indexes, slice the buffer and send out
184    /// as a return. If we run out of buffer to read AKA we finish the file, we return a null. If we run
185    /// out of the buffer, but we still have file left to read, just read into the buffer and continue on
186    pub fn next_feature(&self) -> Option<VectorFeature<M, P, D>> {
187        let mut parser = self.parser.borrow_mut();
188        // get started
189        while parser.pos < parser.chunk_size as usize {
190            if parser.buffer[parser.pos] == BACKSLASH {
191                parser.pos += 1;
192            } else if parser.buffer[parser.pos] == STRING {
193                parser.is_object = !parser.is_object;
194            } else if parser.buffer[parser.pos] == LEFT_BRACE && parser.is_object {
195                if parser.brace_depth == 0 {
196                    parser.start = Some(parser.pos);
197                }
198                parser.brace_depth += 1; // first brace is the start of the feature
199            } else if parser.buffer[parser.pos] == RIGHT_BRACE && parser.is_object {
200                parser.brace_depth -= 1; // if this hits zero, we are at the end of the feature
201                if parser.brace_depth == 0 {
202                    parser.end = Some(parser.pos);
203                    break;
204                }
205            }
206            parser.pos += 1;
207        }
208
209        // what if the last char in current buffer was a BACKSLASH?
210        // we need to make sure in the next buffer we account for increment
211        let chunk_size = parser.chunk_size as usize;
212        let increment_space = parser.pos.saturating_sub(chunk_size);
213
214        if let (Some(start), Some(end)) = (parser.start, parser.end) {
215            parser.pos += 1;
216            let buf: Vec<u8> = parser.buffer[start..end + 1].to_vec();
217            parser.feature.push(buf);
218            let feature = parser.feature.concat();
219            // reset variables
220            parser.feature = vec![];
221            parser.start = None;
222            parser.end = None;
223            parser.brace_depth = 0;
224            parser.is_object = true;
225            // convert feature to a &str and parse it
226            let feature_str: String = String::from_utf8_lossy(&feature).to_string();
227            self.parse_line(&feature_str)
228        } else {
229            // if offset isn't at filesize, increment buffer and start again
230            if let Some(start) = parser.start {
231                let buf = parser.buffer[start..].to_vec();
232                parser.feature.push(buf);
233                parser.start = Some(0);
234            }
235            parser.offset += parser.chunk_size;
236            if parser.offset < self.length {
237                parser.pos = if increment_space > 0 { increment_space } else { 0 };
238                parser.chunk_size = u64::min(65_536, self.length - parser.offset);
239                parser.buffer =
240                    self.reader.slice(Some(parser.offset), Some(parser.offset + parser.chunk_size));
241                drop(parser);
242                self.next_feature()
243            } else {
244                None
245            } // end of file
246        }
247    }
248}
249impl<
250    T: Reader,
251    M: Clone + DeserializeOwned,
252    P: Clone + Default + DeserializeOwned,
253    D: Clone + Default + DeserializeOwned,
254> Iterator for JSONReader<T, M, P, D>
255{
256    type Item = VectorFeature<M, P, D>;
257    fn next(&mut self) -> Option<Self::Item> {
258        self.next_feature()
259    }
260}
261/// The JSON Iterator tool
262#[derive(Debug)]
263pub struct JSONIterator<
264    'a,
265    T: Reader,
266    M: Clone + DeserializeOwned,
267    P: Clone + Default + DeserializeOwned,
268    D: Clone + Default + DeserializeOwned,
269> {
270    reader: &'a JSONReader<T, M, P, D>,
271    index: u64,
272    pool_size: u64,
273    thread_id: u64,
274}
275impl<
276    T: Reader,
277    M: Clone + DeserializeOwned,
278    P: Clone + Default + DeserializeOwned,
279    D: Clone + Default + DeserializeOwned,
280> Iterator for JSONIterator<'_, T, M, P, D>
281{
282    type Item = VectorFeature<M, P, D>;
283
284    fn next(&mut self) -> Option<Self::Item> {
285        while self.pool_size > 1 && self.index % self.pool_size != self.thread_id {
286            // skip, belongs to another thread
287            self.index += 1;
288            self.reader.next_feature();
289            continue;
290        }
291        self.index += 1;
292        self.reader.next_feature()
293    }
294}
295/// A feature reader trait with a callback-based approach
296impl<
297    T: Reader,
298    M: Clone + DeserializeOwned,
299    P: Clone + Default + DeserializeOwned,
300    D: Clone + Default + DeserializeOwned,
301> FeatureReader<M, P, D> for JSONReader<T, M, P, D>
302{
303    type FeatureIterator<'a>
304        = JSONIterator<'a, T, M, P, D>
305    where
306        T: 'a,
307        M: 'a,
308        P: 'a,
309        D: 'a;
310
311    fn iter(&self) -> Self::FeatureIterator<'_> {
312        self.reset();
313        JSONIterator { reader: self, index: 0, pool_size: 1, thread_id: 0 }
314    }
315
316    fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
317        self.reset();
318        JSONIterator {
319            reader: self,
320            index: 0,
321            pool_size: pool_size as u64,
322            thread_id: thread_id as u64,
323        }
324    }
325}