rsonpath/result/
nodes.rs

1//! Main [`Recorder`] implementation collecting the bytes of all matches.
2//!
3//! This is the heaviest recorder. It will copy all bytes of all matches into [`Vecs`](`Vec`).
4//!
5// There is number of invariants that are hard to enforce on the type level,
6// and handling of Depth that should be properly error-handled by the engine, not here.
7// Using `expect` here is idiomatic.
8#![allow(clippy::expect_used)]
9
10use super::{output_queue::OutputQueue, *};
11use crate::{debug, is_json_whitespace};
12use std::{
13    cell::RefCell,
14    fmt::{self, Debug},
15    str,
16};
17
18/// Recorder that saves full info about a [`Match`].
19pub struct NodesRecorder<'s, B, S> {
20    internal: RefCell<InternalRecorder<'s, B, S>>,
21}
22
23impl<'s, B, S> NodesRecorder<'s, B, S>
24where
25    B: Deref<Target = [u8]>,
26    S: Sink<Match>,
27{
28    pub(crate) fn build_recorder(sink: &'s mut S, leading_padding_len: usize) -> Self {
29        Self {
30            internal: RefCell::new(InternalRecorder::new(sink, leading_padding_len)),
31        }
32    }
33}
34
35impl<B, S> InputRecorder<B> for NodesRecorder<'_, B, S>
36where
37    B: Deref<Target = [u8]>,
38    S: Sink<Match>,
39{
40    #[inline(always)]
41    fn record_block_start(&self, new_block: B) {
42        self.internal.borrow_mut().record_block(new_block)
43    }
44}
45
46impl<B, S> Recorder<B> for NodesRecorder<'_, B, S>
47where
48    B: Deref<Target = [u8]>,
49    S: Sink<Match>,
50{
51    #[inline]
52    fn record_match(&self, idx: usize, depth: Depth, ty: MatchedNodeType) -> Result<(), EngineError> {
53        debug!("Recording match at {idx}");
54        self.internal.borrow_mut().record_match(idx, depth, ty);
55        Ok(())
56    }
57
58    #[inline]
59    fn record_value_terminator(&self, idx: usize, depth: Depth) -> Result<(), EngineError> {
60        self.internal
61            .borrow_mut()
62            .record_value_terminator(idx, depth)
63            .map_err(|err| EngineError::SinkError(Box::new(err)))
64    }
65}
66
67/*
68{
69    [
70        1,
71        2,
72        [
73            3,
74            4
75        ]
76    ],
77    [
78        5
79    ]
80}
81
82// Required order:
83// [1,2,[3,4]], 1, 2, [3,4], 3, 4, [5], 5
84
85// Finalization order:
86// 1, 2, 3, 4, [3,4], [1,2,[3,4]], 5, [5]
87
881. By default, we assume the common case of no overlapping matches.
89In that case we don't have to maintain any stack, the state is simply
90a buffer for the current match and information on when to end it.
912. If a new match is registered when there is a match active, it means
92they are overlapping and we switch to the second algorithm.
93
94Matches are pushed onto a stack. Every time we finish a match we need to find
95the node that is finalized. If we keep all matches on the stack it would take
96potentially linear time. In the above example, when [3,4] is finalized,
97there is 3 and 4 already finalized *above* on the stack. This leads to a quadratic
98blowup if implemented naively (just consider a long list of atoms).
99
100Instead we keep only the active matches on the stack, annotated with the output number
101of the match. In a secondary array we keep the finished nodes in the output order.
102When popping we can write the node into the array with random-access. Because
103the order is maintained, outputting the nodes is easy since we can just look at the
104node with the number that should be output next and iterate from there.
105
106This would be potentially wasteful on its own, since we'd always have the secondary array
107grow to the total number of matches. We can instead compress the array when it becomes
108empty and keep a map between output number and array indices. For example, here's
109the state of this algorithm on the above example after the match of "2" is completed.
110
111STACK             | DONE (off. 0) |
112                  | Some(2)       |
113                  | Some(1)       |
114(0, [1,2...)      | None          |
115
116After "4":
117
118STACK             | DONE (off. 0) |
119                  | Some(4)       |
120                  | Some(3)       |
121                  | None          |
122                  | Some(2)       |
123(3, [3,4])        | Some(1)       |
124(0, [1,2,[3,4...) | None          |
125
126Now after the first list gets finalized we can output everything in the array starting from
127index 0. Now that the stack is empty we can compress.
128
129STACK             | DONE (off. 5)
130
131Now we push the second list and the 5, finalize the 5.
132We write it to array at index 1, since its output order is 6 and the offset from compression
133is 5.
134
135STACK             | DONE (off. 5)
136                  | Some(5)
137(6, [5...)        | None
138*/
139
140enum InternalRecorder<'s, B, S> {
141    Simple(SimpleRecorder<'s, B, S>),
142    Stack(StackRecorder<'s, B, S>),
143    Transition,
144}
145
146impl<'s, B, S> InternalRecorder<'s, B, S>
147where
148    B: Deref<Target = [u8]>,
149    S: Sink<Match>,
150{
151    fn new(sink: &'s mut S, leading_padding_len: usize) -> Self {
152        Self::Simple(SimpleRecorder::new(sink, leading_padding_len))
153    }
154
155    #[inline(always)]
156    fn record_block(&mut self, block: B) {
157        match self {
158            Self::Simple(r) => r.record_block(block),
159            Self::Stack(r) => r.record_block(block),
160            Self::Transition => unreachable!(),
161        }
162    }
163
164    #[inline(always)]
165    fn record_match(&mut self, idx: usize, depth: Depth, ty: MatchedNodeType) {
166        match self {
167            Self::Simple(simple) => {
168                if !simple.try_record_match(idx, depth, ty) {
169                    let simple = match std::mem::replace(self, Self::Transition) {
170                        Self::Simple(s) => s,
171                        Self::Stack(_) | Self::Transition => unreachable!(),
172                    };
173                    let mut stack = simple.transform_to_stack();
174                    stack.record_match(idx, depth, ty);
175                    *self = Self::Stack(stack);
176                }
177            }
178            Self::Stack(stack) => stack.record_match(idx, depth, ty),
179            Self::Transition => unreachable!(),
180        }
181    }
182
183    #[allow(clippy::panic_in_result_fn)] // Reaching unreachable is an unrecoverable bug.
184    #[inline(always)]
185    fn record_value_terminator(&mut self, idx: usize, depth: Depth) -> Result<(), EngineError> {
186        match self {
187            Self::Simple(r) => r.record_value_terminator(idx, depth),
188            Self::Stack(r) => r.record_value_terminator(idx, depth),
189            Self::Transition => unreachable!(),
190        }
191    }
192}
193
194struct SimpleRecorder<'s, B, S> {
195    idx: usize,
196    current_block: Option<B>,
197    node: Option<SimplePartialNode>,
198    sink: &'s mut S,
199    leading_padding_len: usize,
200}
201
202struct SimplePartialNode {
203    start_idx: usize,
204    start_depth: Depth,
205    buf: Vec<u8>,
206    ty: MatchedNodeType,
207}
208
209impl<'s, B, S> SimpleRecorder<'s, B, S>
210where
211    B: Deref<Target = [u8]>,
212    S: Sink<Match>,
213{
214    fn new(sink: &'s mut S, leading_padding_len: usize) -> Self {
215        Self {
216            idx: 0,
217            current_block: None,
218            node: None,
219            sink,
220            leading_padding_len,
221        }
222    }
223
224    fn record_block(&mut self, block: B) {
225        if let Some(finished) = self.current_block.as_ref() {
226            if let Some(node) = self.node.as_mut() {
227                debug!("Continuing node, idx is {}", self.idx);
228                append_block(&mut node.buf, finished, self.idx, node.start_idx)
229            }
230
231            self.idx += finished.len();
232        }
233
234        self.current_block = Some(block);
235        debug!("New block, idx = {}", self.idx);
236    }
237
238    fn record_value_terminator(&mut self, idx: usize, depth: Depth) -> Result<(), EngineError> {
239        debug!("Value terminator at {idx}, depth {depth}");
240        if let Some(node) = self.node.as_ref() {
241            if node.start_depth >= depth {
242                let mut node = self.node.take().expect("node is Some");
243                debug!("Mark node as ended at {}", idx + 1);
244                append_final_block(
245                    &mut node.buf,
246                    self.current_block
247                        .as_ref()
248                        .ok_or(EngineError::MissingOpeningCharacter())?,
249                    self.idx,
250                    node.start_idx,
251                    idx + 1,
252                );
253                finalize_node(&mut node.buf, node.ty);
254
255                debug!("Committing and outputting node");
256                self.sink
257                    .add_match(Match {
258                        span_start: node.start_idx - self.leading_padding_len,
259                        bytes: node.buf,
260                    })
261                    .map_err(|err| EngineError::SinkError(Box::new(err)))?;
262            }
263        }
264
265        Ok(())
266    }
267
268    fn try_record_match(&mut self, idx: usize, depth: Depth, ty: MatchedNodeType) -> bool {
269        if self.node.is_some() {
270            debug!("nested match detected, switching to stack");
271            return false;
272        }
273
274        let node = SimplePartialNode {
275            start_idx: idx,
276            start_depth: depth,
277            buf: vec![],
278            ty,
279        };
280        self.node = Some(node);
281
282        true
283    }
284
285    fn transform_to_stack(self) -> StackRecorder<'s, B, S> {
286        match self.node {
287            Some(node) => StackRecorder {
288                idx: self.idx,
289                match_count: 1,
290                current_block: self.current_block,
291                stack: vec![PartialNode {
292                    id: 0,
293                    start_idx: node.start_idx,
294                    start_depth: node.start_depth,
295                    buf: node.buf,
296                    ty: node.ty,
297                }],
298                output_queue: OutputQueue::new(),
299                sink: self.sink,
300                leading_padding_len: self.leading_padding_len,
301            },
302            None => StackRecorder {
303                idx: self.idx,
304                match_count: 0,
305                current_block: self.current_block,
306                stack: vec![],
307                output_queue: OutputQueue::new(),
308                sink: self.sink,
309                leading_padding_len: self.leading_padding_len,
310            },
311        }
312    }
313}
314
315struct StackRecorder<'s, B, S> {
316    idx: usize,
317    match_count: usize,
318    current_block: Option<B>,
319    stack: Vec<PartialNode>,
320    output_queue: OutputQueue<Match>,
321    sink: &'s mut S,
322    leading_padding_len: usize,
323}
324
325struct PartialNode {
326    id: usize,
327    start_idx: usize,
328    start_depth: Depth,
329    buf: Vec<u8>,
330    ty: MatchedNodeType,
331}
332
333impl<B, S> StackRecorder<'_, B, S>
334where
335    B: Deref<Target = [u8]>,
336    S: Sink<Match>,
337{
338    fn record_block(&mut self, block: B) {
339        if let Some(finished) = self.current_block.as_ref() {
340            for node in &mut self.stack {
341                debug!("Continuing node: {node:?}, idx is {}", self.idx);
342                append_block(&mut node.buf, finished, self.idx, node.start_idx)
343            }
344
345            self.idx += finished.len();
346        }
347
348        self.current_block = Some(block);
349        debug!("New block, idx = {}", self.idx);
350    }
351
352    fn record_match(&mut self, idx: usize, depth: Depth, ty: MatchedNodeType) {
353        let node = PartialNode {
354            id: self.match_count,
355            start_idx: idx,
356            start_depth: depth,
357            buf: vec![],
358            ty,
359        };
360
361        debug!("New node {node:?}");
362        self.match_count += 1;
363        self.stack.push(node);
364    }
365
366    #[inline]
367    fn record_value_terminator(&mut self, idx: usize, depth: Depth) -> Result<(), EngineError> {
368        debug!("Value terminator at {idx}, depth {depth}");
369        while let Some(node) = self.stack.last() {
370            if node.start_depth >= depth {
371                debug!("Mark node {node:?} as ended at {}", idx + 1);
372                let mut node = self.stack.pop().expect("last was Some, pop must succeed");
373                append_final_block(
374                    &mut node.buf,
375                    self.current_block
376                        .as_ref()
377                        .ok_or(EngineError::MissingOpeningCharacter())?,
378                    self.idx,
379                    node.start_idx,
380                    idx + 1,
381                );
382                finalize_node(&mut node.buf, node.ty);
383
384                debug!("Committing node: {node:?}");
385                self.output_queue.insert(
386                    node.id,
387                    Match {
388                        span_start: node.start_idx - self.leading_padding_len,
389                        bytes: node.buf,
390                    },
391                );
392            } else {
393                break;
394            }
395        }
396
397        if self.stack.is_empty() {
398            debug!("Outputting batch of nodes.");
399            self.output_queue
400                .output_to(self.sink)
401                .map_err(|err| EngineError::SinkError(Box::new(err)))?;
402        }
403
404        Ok(())
405    }
406}
407
408#[inline(always)]
409fn append_block(dest: &mut Vec<u8>, src: &[u8], src_start: usize, read_start: usize) {
410    if read_start >= src_start + src.len() {
411        return;
412    }
413
414    let to_extend = if read_start > src_start {
415        let in_block_start = read_start - src_start;
416        &src[in_block_start..]
417    } else {
418        src
419    };
420
421    dest.extend(to_extend);
422}
423
424#[inline(always)]
425fn append_final_block(dest: &mut Vec<u8>, src: &[u8], src_start: usize, read_start: usize, read_end: usize) {
426    debug!("src_start: {src_start}, read_start: {read_start}, read_end: {read_end}");
427    debug_assert!(read_end >= src_start);
428    let in_block_start = read_start.saturating_sub(src_start);
429    let in_block_end = read_end - src_start;
430
431    dest.extend(&src[in_block_start..in_block_end]);
432}
433
434#[inline(always)]
435fn finalize_node(buf: &mut Vec<u8>, ty: MatchedNodeType) {
436    debug!("Finalizing node");
437
438    if ty == MatchedNodeType::Atomic {
439        // Atomic nodes are finished when the next structural character is matched.
440        // The buffer includes that character and all preceding whitespace.
441        // We need to remove it before saving the result.
442        if buf.len() <= 1 {
443            // This should never happen in a valid JSON, but we also don't want to panic if the file is invalid.
444            buf.truncate(0)
445        } else {
446            let mut i = buf.len() - 2;
447            while is_json_whitespace(buf[i]) {
448                i -= 1;
449            }
450
451            buf.truncate(i + 1);
452        }
453    }
454}
455
456impl Debug for PartialNode {
457    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
458        f.debug_struct("PartialNode")
459            .field("start_idx", &self.start_idx)
460            .field("start_depth", &self.start_depth)
461            .field("ty", &self.ty)
462            .field("buf", &str::from_utf8(&self.buf).unwrap_or("[invalid utf8]"))
463            .finish()
464    }
465}