Skip to main content

rsonpath/result/
nodes.rs

1//! Main [`Recorder`] implementation collecting the bytes of all matches.
2//!
3//! This is the heaviest recorder. It will copy all bytes of all matches into [`Vecs`](`Vec`).
4#![allow(
5    clippy::expect_used,
6    reason = "There is number of invariants that are hard to enforce on the type level, \
7    and handling of Depth that should be properly error-handled by the engine, not here. \
8    Using `expect` here is idiomatic"
9)]
10
11use super::{output_queue::OutputQueue, *};
12use crate::{debug, is_json_whitespace};
13use std::{
14    cell::RefCell,
15    fmt::{self, Debug},
16    str,
17};
18
19/// Recorder that saves full info about a [`Match`].
20pub struct NodesRecorder<'s, B, S> {
21    internal: RefCell<InternalRecorder<'s, B, S>>,
22}
23
24impl<'s, B, S> NodesRecorder<'s, B, S>
25where
26    B: Deref<Target = [u8]>,
27    S: Sink<Match>,
28{
29    pub(crate) fn build_recorder(sink: &'s mut S, leading_padding_len: usize) -> Self {
30        Self {
31            internal: RefCell::new(InternalRecorder::new(sink, leading_padding_len)),
32        }
33    }
34}
35
36impl<B, S> InputRecorder<B> for NodesRecorder<'_, B, S>
37where
38    B: Deref<Target = [u8]>,
39    S: Sink<Match>,
40{
41    #[inline(always)]
42    fn record_block_start(&self, new_block: B) {
43        self.internal.borrow_mut().record_block(new_block)
44    }
45}
46
47impl<B, S> Recorder<B> for NodesRecorder<'_, B, S>
48where
49    B: Deref<Target = [u8]>,
50    S: Sink<Match>,
51{
52    #[inline]
53    fn record_match(&self, idx: usize, depth: Depth, ty: MatchedNodeType) -> Result<(), EngineError> {
54        debug!("Recording match at {idx}");
55        self.internal.borrow_mut().record_match(idx, depth, ty);
56        Ok(())
57    }
58
59    #[inline]
60    fn record_value_terminator(&self, idx: usize, depth: Depth) -> Result<(), EngineError> {
61        self.internal
62            .borrow_mut()
63            .record_value_terminator(idx, depth)
64            .map_err(|err| EngineError::SinkError(Box::new(err)))
65    }
66}
67
68/*
69{
70    [
71        1,
72        2,
73        [
74            3,
75            4
76        ]
77    ],
78    [
79        5
80    ]
81}
82
83// Required order:
84// [1,2,[3,4]], 1, 2, [3,4], 3, 4, [5], 5
85
86// Finalization order:
87// 1, 2, 3, 4, [3,4], [1,2,[3,4]], 5, [5]
88
891. By default, we assume the common case of no overlapping matches.
90In that case we don't have to maintain any stack, the state is simply
91a buffer for the current match and information on when to end it.
922. If a new match is registered when there is a match active, it means
93they are overlapping and we switch to the second algorithm.
94
95Matches are pushed onto a stack. Every time we finish a match we need to find
96the node that is finalized. If we keep all matches on the stack it would take
97potentially linear time. In the above example, when [3,4] is finalized,
98there is 3 and 4 already finalized *above* on the stack. This leads to a quadratic
99blowup if implemented naively (just consider a long list of atoms).
100
101Instead we keep only the active matches on the stack, annotated with the output number
102of the match. In a secondary array we keep the finished nodes in the output order.
103When popping we can write the node into the array with random-access. Because
104the order is maintained, outputting the nodes is easy since we can just look at the
105node with the number that should be output next and iterate from there.
106
107This would be potentially wasteful on its own, since we'd always have the secondary array
108grow to the total number of matches. We can instead compress the array when it becomes
109empty and keep a map between output number and array indices. For example, here's
110the state of this algorithm on the above example after the match of "2" is completed.
111
112STACK             | DONE (off. 0) |
113                  | Some(2)       |
114                  | Some(1)       |
115(0, [1,2...)      | None          |
116
117After "4":
118
119STACK             | DONE (off. 0) |
120                  | Some(4)       |
121                  | Some(3)       |
122                  | None          |
123                  | Some(2)       |
124(3, [3,4])        | Some(1)       |
125(0, [1,2,[3,4...) | None          |
126
127Now after the first list gets finalized we can output everything in the array starting from
128index 0. Now that the stack is empty we can compress.
129
130STACK             | DONE (off. 5)
131
132Now we push the second list and the 5, finalize the 5.
133We write it to array at index 1, since its output order is 6 and the offset from compression
134is 5.
135
136STACK             | DONE (off. 5)
137                  | Some(5)
138(6, [5...)        | None
139*/
140
141enum InternalRecorder<'s, B, S> {
142    Simple(SimpleRecorder<'s, B, S>),
143    Stack(StackRecorder<'s, B, S>),
144    Transition,
145}
146
147impl<'s, B, S> InternalRecorder<'s, B, S>
148where
149    B: Deref<Target = [u8]>,
150    S: Sink<Match>,
151{
152    fn new(sink: &'s mut S, leading_padding_len: usize) -> Self {
153        Self::Simple(SimpleRecorder::new(sink, leading_padding_len))
154    }
155
156    #[inline(always)]
157    fn record_block(&mut self, block: B) {
158        match self {
159            Self::Simple(r) => r.record_block(block),
160            Self::Stack(r) => r.record_block(block),
161            Self::Transition => unreachable!(),
162        }
163    }
164
165    #[inline(always)]
166    fn record_match(&mut self, idx: usize, depth: Depth, ty: MatchedNodeType) {
167        match self {
168            Self::Simple(simple) => {
169                if !simple.try_record_match(idx, depth, ty) {
170                    let simple = match std::mem::replace(self, Self::Transition) {
171                        Self::Simple(s) => s,
172                        Self::Stack(_) | Self::Transition => unreachable!(),
173                    };
174                    let mut stack = simple.transform_to_stack();
175                    stack.record_match(idx, depth, ty);
176                    *self = Self::Stack(stack);
177                }
178            }
179            Self::Stack(stack) => stack.record_match(idx, depth, ty),
180            Self::Transition => unreachable!(),
181        }
182    }
183
184    #[allow(clippy::panic_in_result_fn, reason = "Reaching unreachable is an unrecoverable bug.")]
185    #[inline(always)]
186    fn record_value_terminator(&mut self, idx: usize, depth: Depth) -> Result<(), EngineError> {
187        match self {
188            Self::Simple(r) => r.record_value_terminator(idx, depth),
189            Self::Stack(r) => r.record_value_terminator(idx, depth),
190            Self::Transition => unreachable!(),
191        }
192    }
193}
194
195struct SimpleRecorder<'s, B, S> {
196    idx: usize,
197    current_block: Option<B>,
198    node: Option<SimplePartialNode>,
199    sink: &'s mut S,
200    leading_padding_len: usize,
201}
202
203struct SimplePartialNode {
204    start_idx: usize,
205    start_depth: Depth,
206    buf: Vec<u8>,
207    ty: MatchedNodeType,
208}
209
210impl<'s, B, S> SimpleRecorder<'s, B, S>
211where
212    B: Deref<Target = [u8]>,
213    S: Sink<Match>,
214{
215    fn new(sink: &'s mut S, leading_padding_len: usize) -> Self {
216        Self {
217            idx: 0,
218            current_block: None,
219            node: None,
220            sink,
221            leading_padding_len,
222        }
223    }
224
225    fn record_block(&mut self, block: B) {
226        if let Some(finished) = self.current_block.as_ref() {
227            if let Some(node) = self.node.as_mut() {
228                debug!("Continuing node, idx is {}", self.idx);
229                append_block(&mut node.buf, finished, self.idx, node.start_idx)
230            }
231
232            self.idx += finished.len();
233        }
234
235        self.current_block = Some(block);
236        debug!("New block, idx = {}", self.idx);
237    }
238
239    fn record_value_terminator(&mut self, idx: usize, depth: Depth) -> Result<(), EngineError> {
240        debug!("Value terminator at {idx}, depth {depth}");
241        if let Some(node) = self.node.as_ref() {
242            if node.start_depth >= depth {
243                let mut node = self.node.take().expect("node is Some");
244                debug!("Mark node as ended at {}", idx + 1);
245                append_final_block(
246                    &mut node.buf,
247                    self.current_block
248                        .as_ref()
249                        .ok_or(EngineError::MissingOpeningCharacter())?,
250                    self.idx,
251                    node.start_idx,
252                    idx + 1,
253                );
254                finalize_node(&mut node.buf, node.ty);
255
256                debug!("Committing and outputting node");
257                self.sink
258                    .add_match(Match {
259                        span_start: node.start_idx - self.leading_padding_len,
260                        bytes: node.buf,
261                    })
262                    .map_err(|err| EngineError::SinkError(Box::new(err)))?;
263            }
264        }
265
266        Ok(())
267    }
268
269    fn try_record_match(&mut self, idx: usize, depth: Depth, ty: MatchedNodeType) -> bool {
270        if self.node.is_some() {
271            debug!("nested match detected, switching to stack");
272            return false;
273        }
274
275        let node = SimplePartialNode {
276            start_idx: idx,
277            start_depth: depth,
278            buf: vec![],
279            ty,
280        };
281        self.node = Some(node);
282
283        true
284    }
285
286    fn transform_to_stack(self) -> StackRecorder<'s, B, S> {
287        match self.node {
288            Some(node) => StackRecorder {
289                idx: self.idx,
290                match_count: 1,
291                current_block: self.current_block,
292                stack: vec![PartialNode {
293                    id: 0,
294                    start_idx: node.start_idx,
295                    start_depth: node.start_depth,
296                    buf: node.buf,
297                    ty: node.ty,
298                }],
299                output_queue: OutputQueue::new(),
300                sink: self.sink,
301                leading_padding_len: self.leading_padding_len,
302            },
303            None => StackRecorder {
304                idx: self.idx,
305                match_count: 0,
306                current_block: self.current_block,
307                stack: vec![],
308                output_queue: OutputQueue::new(),
309                sink: self.sink,
310                leading_padding_len: self.leading_padding_len,
311            },
312        }
313    }
314}
315
316struct StackRecorder<'s, B, S> {
317    idx: usize,
318    match_count: usize,
319    current_block: Option<B>,
320    stack: Vec<PartialNode>,
321    output_queue: OutputQueue<Match>,
322    sink: &'s mut S,
323    leading_padding_len: usize,
324}
325
326struct PartialNode {
327    id: usize,
328    start_idx: usize,
329    start_depth: Depth,
330    buf: Vec<u8>,
331    ty: MatchedNodeType,
332}
333
334impl<B, S> StackRecorder<'_, B, S>
335where
336    B: Deref<Target = [u8]>,
337    S: Sink<Match>,
338{
339    fn record_block(&mut self, block: B) {
340        if let Some(finished) = self.current_block.as_ref() {
341            for node in &mut self.stack {
342                debug!("Continuing node: {node:?}, idx is {}", self.idx);
343                append_block(&mut node.buf, finished, self.idx, node.start_idx)
344            }
345
346            self.idx += finished.len();
347        }
348
349        self.current_block = Some(block);
350        debug!("New block, idx = {}", self.idx);
351    }
352
353    fn record_match(&mut self, idx: usize, depth: Depth, ty: MatchedNodeType) {
354        let node = PartialNode {
355            id: self.match_count,
356            start_idx: idx,
357            start_depth: depth,
358            buf: vec![],
359            ty,
360        };
361
362        debug!("New node {node:?}");
363        self.match_count += 1;
364        self.stack.push(node);
365    }
366
367    #[inline]
368    fn record_value_terminator(&mut self, idx: usize, depth: Depth) -> Result<(), EngineError> {
369        debug!("Value terminator at {idx}, depth {depth}");
370        while let Some(node) = self.stack.last() {
371            if node.start_depth >= depth {
372                debug!("Mark node {node:?} as ended at {}", idx + 1);
373                let mut node = self.stack.pop().expect("last was Some, pop must succeed");
374                append_final_block(
375                    &mut node.buf,
376                    self.current_block
377                        .as_ref()
378                        .ok_or(EngineError::MissingOpeningCharacter())?,
379                    self.idx,
380                    node.start_idx,
381                    idx + 1,
382                );
383                finalize_node(&mut node.buf, node.ty);
384
385                debug!("Committing node: {node:?}");
386                self.output_queue.insert(
387                    node.id,
388                    Match {
389                        span_start: node.start_idx - self.leading_padding_len,
390                        bytes: node.buf,
391                    },
392                );
393            } else {
394                break;
395            }
396        }
397
398        if self.stack.is_empty() {
399            debug!("Outputting batch of nodes.");
400            self.output_queue
401                .output_to(self.sink)
402                .map_err(|err| EngineError::SinkError(Box::new(err)))?;
403        }
404
405        Ok(())
406    }
407}
408
409#[inline(always)]
410fn append_block(dest: &mut Vec<u8>, src: &[u8], src_start: usize, read_start: usize) {
411    if read_start >= src_start + src.len() {
412        return;
413    }
414
415    let to_extend = if read_start > src_start {
416        let in_block_start = read_start - src_start;
417        &src[in_block_start..]
418    } else {
419        src
420    };
421
422    dest.extend(to_extend);
423}
424
425#[inline(always)]
426fn append_final_block(dest: &mut Vec<u8>, src: &[u8], src_start: usize, read_start: usize, read_end: usize) {
427    debug!("src_start: {src_start}, read_start: {read_start}, read_end: {read_end}");
428    debug_assert!(read_end >= src_start, "block cannot end before it starts");
429    let in_block_start = read_start.saturating_sub(src_start);
430    let in_block_end = read_end - src_start;
431
432    dest.extend(&src[in_block_start..in_block_end]);
433}
434
435#[inline(always)]
436fn finalize_node(buf: &mut Vec<u8>, ty: MatchedNodeType) {
437    debug!("Finalizing node");
438
439    if ty == MatchedNodeType::Atomic {
440        // Atomic nodes are finished when the next structural character is matched.
441        // The buffer includes that character and all preceding whitespace.
442        // We need to remove it before saving the result.
443        if buf.len() <= 1 {
444            // This should never happen in a valid JSON, but we also don't want to panic if the file is invalid.
445            buf.truncate(0)
446        } else {
447            let mut i = buf.len() - 2;
448            while is_json_whitespace(buf[i]) {
449                i -= 1;
450            }
451
452            buf.truncate(i + 1);
453        }
454    }
455}
456
457impl Debug for PartialNode {
458    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459        f.debug_struct("PartialNode")
460            .field("start_idx", &self.start_idx)
461            .field("start_depth", &self.start_depth)
462            .field("ty", &self.ty)
463            .field("buf", &str::from_utf8(&self.buf).unwrap_or("[invalid utf8]"))
464            .finish()
465    }
466}