1use super::ToGisJSON;
2use crate::{
3 geometry::ConvertFeature,
4 parsers::{FeatureReader, Reader},
5};
6use alloc::{
7 collections::VecDeque,
8 string::{String, ToString},
9};
10use core::{cell::RefCell, marker::PhantomData};
11use s2json::{Features, MValue, VectorFeature};
12use serde::de::DeserializeOwned;
13
14#[derive(Debug, Clone)]
15struct NewLineDelimitedJSONParser {
16 offset: u64,
17 end: u64,
18 tmp_chunks: VecDeque<String>,
19 partial_line: String,
20}
21
22#[derive(Debug, Clone)]
44pub struct NewLineDelimitedJSONReader<
45 T: Reader,
46 M: Clone + DeserializeOwned = (),
47 P: Clone + Default + DeserializeOwned = MValue,
48 D: Clone + Default + DeserializeOwned = MValue,
49> {
50 reader: T,
51 seperator: char, parser: RefCell<NewLineDelimitedJSONParser>,
53 _phantom: PhantomData<VectorFeature<M, P, D>>,
54}
55impl<
56 T: Reader,
57 M: Clone + DeserializeOwned,
58 P: Clone + Default + DeserializeOwned,
59 D: Clone + Default + DeserializeOwned,
60> NewLineDelimitedJSONReader<T, M, P, D>
61{
62 pub fn new(reader: T, seperator: Option<char>) -> NewLineDelimitedJSONReader<T, M, P, D> {
64 let end = reader.len();
65 NewLineDelimitedJSONReader {
66 reader,
67 _phantom: PhantomData,
68 seperator: seperator.unwrap_or('\n'),
69 parser: RefCell::new(NewLineDelimitedJSONParser {
70 offset: 0,
71 end,
72 tmp_chunks: VecDeque::new(),
73 partial_line: String::new(),
74 }),
75 }
76 }
77
78 pub fn reset(&self) {
80 let mut parser = self.parser.borrow_mut();
81 parser.offset = 0;
82 parser.tmp_chunks.clear();
83 parser.partial_line.clear();
84 }
85
86 pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
88 self.reset();
89 let len = self.reader.len();
91 let chunk_size = len.div_ceil(pool_size);
92 let mut start = thread_id.saturating_mul(chunk_size);
93 let mut end = u64::min(start + chunk_size, len);
94 if thread_id > 0 {
96 start = align_to_separator(&self.reader, start, end, self.seperator);
97 }
98 if thread_id < pool_size - 1 {
99 end = align_to_separator(&self.reader, end, len, self.seperator);
100 }
101 let mut parser = self.parser.borrow_mut();
103 parser.offset = start;
104 parser.end = end;
105 }
106
107 pub fn next_feature(&self) -> Option<VectorFeature<M, P, D>> {
109 let mut parser = self.parser.borrow_mut();
110
111 if let Some(line) = parser.tmp_chunks.pop_front() {
113 return self.parse_line(&line);
114 }
115
116 if parser.offset < parser.end {
118 let length = u64::min(65_536, parser.end - parser.offset);
119 let chunk = self.reader.parse_string(Some(parser.offset), Some(length));
120 let combined = core::mem::take(&mut parser.partial_line) + &chunk;
122 let mut parts: VecDeque<String> = combined
124 .split(self.seperator)
125 .map(str::to_string)
126 .filter(|s| !s.is_empty())
127 .collect();
128 parser.partial_line = if combined.ends_with(self.seperator) {
130 String::new()
131 } else {
132 parts.pop_back().unwrap_or_default()
133 };
134
135 parser.tmp_chunks = parts;
136 parser.offset += length;
137
138 return parser.tmp_chunks.pop_front().and_then(|line| self.parse_line(&line));
139 }
140
141 if !parser.partial_line.is_empty() {
143 let line = core::mem::take(&mut parser.partial_line);
144 let feature = self.parse_line(&line);
145 parser.partial_line.clear();
146 return feature;
147 }
148
149 None
150 }
151
152 fn parse_line(&self, line: &str) -> Option<VectorFeature<M, P, D>> {
153 if line.len() > 1
154 && let Ok(feature) = line.to_features()
155 {
156 match feature {
157 Features::Feature(feature) => {
158 return Some(feature.to_vector(Some(true)));
159 }
160 Features::VectorFeature(vf) => {
161 return Some(vf);
162 }
163 }
164 }
165 None
166 }
167}
168impl<
169 T: Reader,
170 M: Clone + DeserializeOwned,
171 P: Clone + Default + DeserializeOwned,
172 D: Clone + Default + DeserializeOwned,
173> Iterator for NewLineDelimitedJSONReader<T, M, P, D>
174{
175 type Item = VectorFeature<M, P, D>;
176 fn next(&mut self) -> Option<Self::Item> {
177 self.next_feature()
178 }
179}
180#[derive(Debug)]
182pub struct NewLineDelimitedJSONIterator<
183 'a,
184 T: Reader,
185 M: Clone + DeserializeOwned,
186 P: Clone + Default + DeserializeOwned,
187 D: Clone + Default + DeserializeOwned,
188> {
189 reader: &'a NewLineDelimitedJSONReader<T, M, P, D>,
190}
191impl<
192 T: Reader,
193 M: Clone + DeserializeOwned,
194 P: Clone + Default + DeserializeOwned,
195 D: Clone + Default + DeserializeOwned,
196> Iterator for NewLineDelimitedJSONIterator<'_, T, M, P, D>
197{
198 type Item = VectorFeature<M, P, D>;
199
200 fn next(&mut self) -> Option<Self::Item> {
201 self.reader.next_feature()
202 }
203}
204impl<
206 T: Reader,
207 M: Clone + DeserializeOwned,
208 P: Clone + Default + DeserializeOwned,
209 D: Clone + Default + DeserializeOwned,
210> FeatureReader<M, P, D> for NewLineDelimitedJSONReader<T, M, P, D>
211{
212 type FeatureIterator<'a>
213 = NewLineDelimitedJSONIterator<'a, T, M, P, D>
214 where
215 T: 'a,
216 M: 'a,
217 P: 'a,
218 D: 'a;
219
220 fn iter(&self) -> Self::FeatureIterator<'_> {
221 self.reset();
222 NewLineDelimitedJSONIterator { reader: self }
223 }
224
225 fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
226 self.par_seek(pool_size as u64, thread_id as u64);
227 NewLineDelimitedJSONIterator { reader: self }
228 }
229}
230
231#[derive(Debug, Clone)]
258pub struct SequenceJSONReader<
259 T: Reader,
260 M: Clone + DeserializeOwned = (),
261 P: Clone + Default + DeserializeOwned = MValue,
262 D: Clone + Default + DeserializeOwned = MValue,
263> {
264 newline: NewLineDelimitedJSONReader<T, M, P, D>,
265}
266impl<
267 T: Reader,
268 M: Clone + DeserializeOwned,
269 P: Clone + Default + DeserializeOwned,
270 D: Clone + Default + DeserializeOwned,
271> SequenceJSONReader<T, M, P, D>
272{
273 pub fn new(reader: T) -> SequenceJSONReader<T, M, P, D> {
275 SequenceJSONReader { newline: NewLineDelimitedJSONReader::new(reader, Some('␞')) }
276 }
277
278 pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
280 self.newline.par_seek(pool_size, thread_id);
281 }
282
283 pub fn reset(&self) {
285 self.newline.reset();
286 }
287}
288impl<
289 T: Reader,
290 M: Clone + DeserializeOwned,
291 P: Clone + Default + DeserializeOwned,
292 D: Clone + Default + DeserializeOwned,
293> Iterator for SequenceJSONReader<T, M, P, D>
294{
295 type Item = VectorFeature<M, P, D>;
296
297 fn next(&mut self) -> Option<Self::Item> {
298 self.newline.next()
299 }
300}
301#[derive(Debug)]
303pub struct SequenceJSONIterator<
304 'a,
305 T: Reader,
306 M: Clone + DeserializeOwned,
307 P: Clone + Default + DeserializeOwned,
308 D: Clone + Default + DeserializeOwned,
309> {
310 reader: &'a SequenceJSONReader<T, M, P, D>,
311}
312impl<
313 T: Reader,
314 M: Clone + DeserializeOwned,
315 P: Clone + Default + DeserializeOwned,
316 D: Clone + Default + DeserializeOwned,
317> Iterator for SequenceJSONIterator<'_, T, M, P, D>
318{
319 type Item = VectorFeature<M, P, D>;
320
321 fn next(&mut self) -> Option<Self::Item> {
322 self.reader.newline.next_feature()
323 }
324}
325impl<
327 T: Reader,
328 M: Clone + DeserializeOwned,
329 P: Clone + Default + DeserializeOwned,
330 D: Clone + Default + DeserializeOwned,
331> FeatureReader<M, P, D> for SequenceJSONReader<T, M, P, D>
332{
333 type FeatureIterator<'a>
334 = SequenceJSONIterator<'a, T, M, P, D>
335 where
336 T: 'a,
337 M: 'a,
338 P: 'a,
339 D: 'a;
340
341 fn iter(&self) -> Self::FeatureIterator<'_> {
342 self.reset();
343 SequenceJSONIterator { reader: self }
344 }
345
346 fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
347 self.par_seek(pool_size as u64, thread_id as u64);
348 self.iter()
349 }
350}
351
352fn align_to_separator<R: Reader>(reader: &R, mut pos: u64, end: u64, sep: char) -> u64 {
354 let sep_u8 = sep as u8;
355
356 while pos < end {
357 let len = u64::min(65_536, end - pos);
358 let chunk = reader.parse_string(Some(pos), Some(len));
359 if let Some(rel) = chunk.as_bytes().iter().position(|&b| b == sep_u8) {
360 return pos + rel as u64 + 1; }
362 pos += len;
363 }
364 end
365}