gistools/readers/json/
line_delimited.rs

1use super::ToGisJSON;
2use crate::{
3    geometry::ConvertFeature,
4    parsers::{FeatureReader, Reader},
5};
6use alloc::{
7    collections::VecDeque,
8    string::{String, ToString},
9};
10use core::{cell::RefCell, marker::PhantomData};
11use s2json::{Features, MValue, VectorFeature};
12use serde::de::DeserializeOwned;
13
14#[derive(Debug, Clone)]
15struct NewLineDelimitedJSONParser {
16    offset: u64,
17    end: u64,
18    tmp_chunks: VecDeque<String>,
19    partial_line: String,
20}
21
22/// # NewLine Delimited JSON Reader
23///
24/// ## Description
25///
26/// Parse (Geo|S2)JSON from a file that is in a newline-delimited format
27///
28/// Implements the [`FeatureReader`] trait
29///
30/// ## Usage
31/// ```rust
32/// use gistools::{parsers::{FileReader, FeatureReader}, readers::NewLineDelimitedJSONReader};
33/// use s2json::{Properties, MValue};
34/// use std::path::PathBuf;
35///
36/// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
37/// path = path.join("tests/readers/json/fixtures/points.geojsonld");
38///
39/// let reader: NewLineDelimitedJSONReader<_> = NewLineDelimitedJSONReader::new(FileReader::from(path), None);
40/// let features: Vec<_> = reader.iter().collect();
41/// assert_eq!(features.len(), 3);
42/// ```
43#[derive(Debug, Clone)]
44pub struct NewLineDelimitedJSONReader<
45    T: Reader,
46    M: Clone + DeserializeOwned = (),
47    P: Clone + Default + DeserializeOwned = MValue,
48    D: Clone + Default + DeserializeOwned = MValue,
49> {
50    reader: T,
51    seperator: char, // default is '\n'
52    parser: RefCell<NewLineDelimitedJSONParser>,
53    _phantom: PhantomData<VectorFeature<M, P, D>>,
54}
55impl<
56    T: Reader,
57    M: Clone + DeserializeOwned,
58    P: Clone + Default + DeserializeOwned,
59    D: Clone + Default + DeserializeOwned,
60> NewLineDelimitedJSONReader<T, M, P, D>
61{
62    /// Create a Newline-Delimited JSON Reader
63    pub fn new(reader: T, seperator: Option<char>) -> NewLineDelimitedJSONReader<T, M, P, D> {
64        let end = reader.len();
65        NewLineDelimitedJSONReader {
66            reader,
67            _phantom: PhantomData,
68            seperator: seperator.unwrap_or('\n'),
69            parser: RefCell::new(NewLineDelimitedJSONParser {
70                offset: 0,
71                end,
72                tmp_chunks: VecDeque::new(),
73                partial_line: String::new(),
74            }),
75        }
76    }
77
78    /// Reset to the beginning
79    pub fn reset(&self) {
80        let mut parser = self.parser.borrow_mut();
81        parser.offset = 0;
82        parser.tmp_chunks.clear();
83        parser.partial_line.clear();
84    }
85
86    /// Set a new position in the file for parallel processing
87    pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
88        self.reset();
89        // setup chunk size, start and end
90        let len = self.reader.len();
91        let chunk_size = len.div_ceil(pool_size);
92        let mut start = thread_id.saturating_mul(chunk_size);
93        let mut end = u64::min(start + chunk_size, len);
94        // align to separators
95        if thread_id > 0 {
96            start = align_to_separator(&self.reader, start, end, self.seperator);
97        }
98        if thread_id < pool_size - 1 {
99            end = align_to_separator(&self.reader, end, len, self.seperator);
100        }
101        // update parser
102        let mut parser = self.parser.borrow_mut();
103        parser.offset = start;
104        parser.end = end;
105    }
106
107    /// Get the next feature
108    pub fn next_feature(&self) -> Option<VectorFeature<M, P, D>> {
109        let mut parser = self.parser.borrow_mut();
110
111        // 1) Serve from buffer if available
112        if let Some(line) = parser.tmp_chunks.pop_front() {
113            return self.parse_line(&line);
114        }
115
116        // 2) Refill buffer from reader
117        if parser.offset < parser.end {
118            let length = u64::min(65_536, parser.end - parser.offset);
119            let chunk = self.reader.parse_string(Some(parser.offset), Some(length));
120            // Prepend any leftover partial line
121            let combined = core::mem::take(&mut parser.partial_line) + &chunk;
122            // Split on separator (e.g. '\n') into complete lines
123            let mut parts: VecDeque<String> = combined
124                .split(self.seperator)
125                .map(str::to_string)
126                .filter(|s| !s.is_empty())
127                .collect();
128            // Handle trailing separator
129            parser.partial_line = if combined.ends_with(self.seperator) {
130                String::new()
131            } else {
132                parts.pop_back().unwrap_or_default()
133            };
134
135            parser.tmp_chunks = parts;
136            parser.offset += length;
137
138            return parser.tmp_chunks.pop_front().and_then(|line| self.parse_line(&line));
139        }
140
141        // 3) Final cleanup: parse trailing partial line if any
142        if !parser.partial_line.is_empty() {
143            let line = core::mem::take(&mut parser.partial_line);
144            let feature = self.parse_line(&line);
145            parser.partial_line.clear();
146            return feature;
147        }
148
149        None
150    }
151
152    fn parse_line(&self, line: &str) -> Option<VectorFeature<M, P, D>> {
153        if line.len() > 1
154            && let Ok(feature) = line.to_features()
155        {
156            match feature {
157                Features::Feature(feature) => {
158                    return Some(feature.to_vector(Some(true)));
159                }
160                Features::VectorFeature(vf) => {
161                    return Some(vf);
162                }
163            }
164        }
165        None
166    }
167}
168impl<
169    T: Reader,
170    M: Clone + DeserializeOwned,
171    P: Clone + Default + DeserializeOwned,
172    D: Clone + Default + DeserializeOwned,
173> Iterator for NewLineDelimitedJSONReader<T, M, P, D>
174{
175    type Item = VectorFeature<M, P, D>;
176    fn next(&mut self) -> Option<Self::Item> {
177        self.next_feature()
178    }
179}
180/// The Newline Delimited JSON Iterator tool
181#[derive(Debug)]
182pub struct NewLineDelimitedJSONIterator<
183    'a,
184    T: Reader,
185    M: Clone + DeserializeOwned,
186    P: Clone + Default + DeserializeOwned,
187    D: Clone + Default + DeserializeOwned,
188> {
189    reader: &'a NewLineDelimitedJSONReader<T, M, P, D>,
190}
191impl<
192    T: Reader,
193    M: Clone + DeserializeOwned,
194    P: Clone + Default + DeserializeOwned,
195    D: Clone + Default + DeserializeOwned,
196> Iterator for NewLineDelimitedJSONIterator<'_, T, M, P, D>
197{
198    type Item = VectorFeature<M, P, D>;
199
200    fn next(&mut self) -> Option<Self::Item> {
201        self.reader.next_feature()
202    }
203}
204/// A feature reader trait with a callback-based approach
205impl<
206    T: Reader,
207    M: Clone + DeserializeOwned,
208    P: Clone + Default + DeserializeOwned,
209    D: Clone + Default + DeserializeOwned,
210> FeatureReader<M, P, D> for NewLineDelimitedJSONReader<T, M, P, D>
211{
212    type FeatureIterator<'a>
213        = NewLineDelimitedJSONIterator<'a, T, M, P, D>
214    where
215        T: 'a,
216        M: 'a,
217        P: 'a,
218        D: 'a;
219
220    fn iter(&self) -> Self::FeatureIterator<'_> {
221        self.reset();
222        NewLineDelimitedJSONIterator { reader: self }
223    }
224
225    fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
226        self.par_seek(pool_size as u64, thread_id as u64);
227        NewLineDelimitedJSONIterator { reader: self }
228    }
229}
230
231/// # Text Sequence JSON Reader
232///
233/// ## Description
234///
235/// Parse GeoJSON from a file that is in the `geojson-text-sequences` format.
236///
237/// Implements the [`FeatureReader`] trait
238///
239/// ## Usage
240/// ```rust
241/// use gistools::{parsers::{FileReader, FeatureReader}, readers::SequenceJSONReader};
242/// use s2json::{Properties, MValue};
243/// use std::path::PathBuf;
244///
245/// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
246/// path = path.join("tests/readers/json/fixtures/features.geojsonseq");
247///
248/// let reader: SequenceJSONReader<_> = SequenceJSONReader::new(FileReader::from(path));
249/// let features: Vec<_> = reader.iter().collect();
250/// assert_eq!(features.len(), 3);
251/// ```
252///
253/// ## Links
254/// - <https://datatracker.ietf.org/doc/html/rfc7464>
255/// - <https://datatracker.ietf.org/doc/html/rfc8142>
256/// - <https://github.com/geojson/geojson-text-sequences?tab=readme-ov-file>
257#[derive(Debug, Clone)]
258pub struct SequenceJSONReader<
259    T: Reader,
260    M: Clone + DeserializeOwned = (),
261    P: Clone + Default + DeserializeOwned = MValue,
262    D: Clone + Default + DeserializeOwned = MValue,
263> {
264    newline: NewLineDelimitedJSONReader<T, M, P, D>,
265}
266impl<
267    T: Reader,
268    M: Clone + DeserializeOwned,
269    P: Clone + Default + DeserializeOwned,
270    D: Clone + Default + DeserializeOwned,
271> SequenceJSONReader<T, M, P, D>
272{
273    /// Create a new SequenceJSONReader
274    pub fn new(reader: T) -> SequenceJSONReader<T, M, P, D> {
275        SequenceJSONReader { newline: NewLineDelimitedJSONReader::new(reader, Some('␞')) }
276    }
277
278    /// Set a new position in the file for parallel processing
279    pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
280        self.newline.par_seek(pool_size, thread_id);
281    }
282
283    /// Reset to the beginning
284    pub fn reset(&self) {
285        self.newline.reset();
286    }
287}
288impl<
289    T: Reader,
290    M: Clone + DeserializeOwned,
291    P: Clone + Default + DeserializeOwned,
292    D: Clone + Default + DeserializeOwned,
293> Iterator for SequenceJSONReader<T, M, P, D>
294{
295    type Item = VectorFeature<M, P, D>;
296
297    fn next(&mut self) -> Option<Self::Item> {
298        self.newline.next()
299    }
300}
301/// The  Delimited JSON Iterator tool
302#[derive(Debug)]
303pub struct SequenceJSONIterator<
304    'a,
305    T: Reader,
306    M: Clone + DeserializeOwned,
307    P: Clone + Default + DeserializeOwned,
308    D: Clone + Default + DeserializeOwned,
309> {
310    reader: &'a SequenceJSONReader<T, M, P, D>,
311}
312impl<
313    T: Reader,
314    M: Clone + DeserializeOwned,
315    P: Clone + Default + DeserializeOwned,
316    D: Clone + Default + DeserializeOwned,
317> Iterator for SequenceJSONIterator<'_, T, M, P, D>
318{
319    type Item = VectorFeature<M, P, D>;
320
321    fn next(&mut self) -> Option<Self::Item> {
322        self.reader.newline.next_feature()
323    }
324}
325/// A feature reader trait with a callback-based approach
326impl<
327    T: Reader,
328    M: Clone + DeserializeOwned,
329    P: Clone + Default + DeserializeOwned,
330    D: Clone + Default + DeserializeOwned,
331> FeatureReader<M, P, D> for SequenceJSONReader<T, M, P, D>
332{
333    type FeatureIterator<'a>
334        = SequenceJSONIterator<'a, T, M, P, D>
335    where
336        T: 'a,
337        M: 'a,
338        P: 'a,
339        D: 'a;
340
341    fn iter(&self) -> Self::FeatureIterator<'_> {
342        self.reset();
343        SequenceJSONIterator { reader: self }
344    }
345
346    fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
347        self.par_seek(pool_size as u64, thread_id as u64);
348        self.iter()
349    }
350}
351
352// Helper function to align to separators if using parallel processing
353fn align_to_separator<R: Reader>(reader: &R, mut pos: u64, end: u64, sep: char) -> u64 {
354    let sep_u8 = sep as u8;
355
356    while pos < end {
357        let len = u64::min(65_536, end - pos);
358        let chunk = reader.parse_string(Some(pos), Some(len));
359        if let Some(rel) = chunk.as_bytes().iter().position(|&b| b == sep_u8) {
360            return pos + rel as u64 + 1; // move past separator
361        }
362        pos += len;
363    }
364    end
365}