1use std::num::NonZeroU64;
2
3use dbn::{
4 decode::{DbnMetadata, DecodeRecordRef},
5 RType, Record, RecordRef, Schema,
6};
7
8#[derive(Debug)]
9pub struct SchemaFilter<D> {
10 decoder: D,
11 rtype: Option<RType>,
12}
13
14impl<D> SchemaFilter<D>
15where
16 D: DbnMetadata,
17{
18 pub fn new(mut decoder: D, schema: Option<Schema>) -> Self {
19 if let Some(schema) = schema {
20 decoder.metadata_mut().schema = Some(schema);
21 }
22 Self::new_no_metadata(decoder, schema)
23 }
24}
25
26impl<D> SchemaFilter<D> {
27 pub fn new_no_metadata(decoder: D, schema: Option<Schema>) -> Self {
28 Self {
29 decoder,
30 rtype: schema.map(RType::from),
31 }
32 }
33}
34
35impl<D: DbnMetadata> DbnMetadata for SchemaFilter<D> {
36 fn metadata(&self) -> &dbn::Metadata {
37 self.decoder.metadata()
38 }
39
40 fn metadata_mut(&mut self) -> &mut dbn::Metadata {
41 self.decoder.metadata_mut()
42 }
43}
44
45impl<D: DecodeRecordRef> DecodeRecordRef for SchemaFilter<D> {
46 fn decode_record_ref(&mut self) -> dbn::Result<Option<dbn::RecordRef<'_>>> {
47 while let Some(record) = self.decoder.decode_record_ref()? {
48 if self
49 .rtype
50 .is_none_or(|rtype| rtype as u8 == record.header().rtype)
51 {
52 return Ok(Some(unsafe {
55 RecordRef::unchecked_from_header(record.header())
56 }));
57 }
58 }
59 Ok(None)
60 }
61}
62
63#[derive(Debug)]
64pub struct LimitFilter<D> {
65 decoder: D,
66 limit: Option<NonZeroU64>,
67 record_count: u64,
68}
69
70impl<D> LimitFilter<D>
71where
72 D: DbnMetadata,
73{
74 pub fn new(mut decoder: D, limit: Option<NonZeroU64>) -> Self {
75 if let Some(limit) = limit {
76 let metadata_limit = &mut decoder.metadata_mut().limit;
77 if let Some(metadata_limit) = metadata_limit {
78 *metadata_limit = (*metadata_limit).min(limit);
79 } else {
80 *metadata_limit = Some(limit);
81 }
82 }
83 Self::new_no_metadata(decoder, limit)
84 }
85}
86
87impl<D> LimitFilter<D> {
88 pub fn new_no_metadata(decoder: D, limit: Option<NonZeroU64>) -> Self {
89 Self {
90 decoder,
91 limit,
92 record_count: 0,
93 }
94 }
95}
96
97impl<D: DbnMetadata> DbnMetadata for LimitFilter<D> {
98 fn metadata(&self) -> &dbn::Metadata {
99 self.decoder.metadata()
100 }
101
102 fn metadata_mut(&mut self) -> &mut dbn::Metadata {
103 self.decoder.metadata_mut()
104 }
105}
106
107impl<D: DecodeRecordRef> DecodeRecordRef for LimitFilter<D> {
108 fn decode_record_ref(&mut self) -> dbn::Result<Option<RecordRef<'_>>> {
109 if self
110 .limit
111 .is_some_and(|limit| self.record_count >= limit.get())
112 {
113 return Ok(None);
114 }
115 Ok(self.decoder.decode_record_ref()?.inspect(|_| {
116 self.record_count += 1;
117 }))
118 }
119}