Skip to main content

dbn_cli/
filter.rs

1use std::num::NonZeroU64;
2
3use dbn::{
4    decode::{DbnMetadata, DecodeRecordRef},
5    RType, Record, RecordRef, Schema,
6};
7
8#[derive(Debug)]
9pub struct SchemaFilter<D> {
10    decoder: D,
11    rtype: Option<RType>,
12}
13
14impl<D> SchemaFilter<D>
15where
16    D: DbnMetadata,
17{
18    pub fn new(mut decoder: D, schema: Option<Schema>) -> Self {
19        if let Some(schema) = schema {
20            decoder.metadata_mut().schema = Some(schema);
21        }
22        Self::new_no_metadata(decoder, schema)
23    }
24}
25
26impl<D> SchemaFilter<D> {
27    pub fn new_no_metadata(decoder: D, schema: Option<Schema>) -> Self {
28        Self {
29            decoder,
30            rtype: schema.map(RType::from),
31        }
32    }
33}
34
35impl<D: DbnMetadata> DbnMetadata for SchemaFilter<D> {
36    fn metadata(&self) -> &dbn::Metadata {
37        self.decoder.metadata()
38    }
39
40    fn metadata_mut(&mut self) -> &mut dbn::Metadata {
41        self.decoder.metadata_mut()
42    }
43}
44
45impl<D: DecodeRecordRef> DecodeRecordRef for SchemaFilter<D> {
46    fn decode_record_ref(&mut self) -> dbn::Result<Option<dbn::RecordRef<'_>>> {
47        while let Some(record) = self.decoder.decode_record_ref()? {
48            if self
49                .rtype
50                .is_none_or(|rtype| rtype as u8 == record.header().rtype)
51            {
52                // Safe: casting reference to pointer so the pointer will always be valid.
53                // Getting around borrow checker limitation.
54                return Ok(Some(unsafe {
55                    RecordRef::unchecked_from_header(record.header())
56                }));
57            }
58        }
59        Ok(None)
60    }
61}
62
63#[derive(Debug)]
64pub struct LimitFilter<D> {
65    decoder: D,
66    limit: Option<NonZeroU64>,
67    record_count: u64,
68}
69
70impl<D> LimitFilter<D>
71where
72    D: DbnMetadata,
73{
74    pub fn new(mut decoder: D, limit: Option<NonZeroU64>) -> Self {
75        if let Some(limit) = limit {
76            let metadata_limit = &mut decoder.metadata_mut().limit;
77            if let Some(metadata_limit) = metadata_limit {
78                *metadata_limit = (*metadata_limit).min(limit);
79            } else {
80                *metadata_limit = Some(limit);
81            }
82        }
83        Self::new_no_metadata(decoder, limit)
84    }
85}
86
87impl<D> LimitFilter<D> {
88    pub fn new_no_metadata(decoder: D, limit: Option<NonZeroU64>) -> Self {
89        Self {
90            decoder,
91            limit,
92            record_count: 0,
93        }
94    }
95}
96
97impl<D: DbnMetadata> DbnMetadata for LimitFilter<D> {
98    fn metadata(&self) -> &dbn::Metadata {
99        self.decoder.metadata()
100    }
101
102    fn metadata_mut(&mut self) -> &mut dbn::Metadata {
103        self.decoder.metadata_mut()
104    }
105}
106
107impl<D: DecodeRecordRef> DecodeRecordRef for LimitFilter<D> {
108    fn decode_record_ref(&mut self) -> dbn::Result<Option<RecordRef<'_>>> {
109        if self
110            .limit
111            .is_some_and(|limit| self.record_count >= limit.get())
112        {
113            return Ok(None);
114        }
115        Ok(self.decoder.decode_record_ref()?.inspect(|_| {
116            self.record_count += 1;
117        }))
118    }
119}