1use crate::{
4 CachedStringBlock, DbcHeader, Error, FieldType, Result, Schema, StringBlock, StringRef,
5 types::*,
6 versions::{DbcVersion, Wdb2Header, Wdb5Header},
7};
8use std::collections::HashMap;
9use std::fmt;
10use std::io::{Cursor, Read, Seek, SeekFrom};
11use std::sync::Arc;
12
13#[derive(Debug, Clone)]
15pub enum Value {
16 Int32(i32),
18 UInt32(u32),
20 Float32(f32),
22 StringRef(StringRef),
24 Bool(bool),
26 UInt8(u8),
28 Int8(i8),
30 UInt16(u16),
32 Int16(i16),
34 Array(Vec<Value>),
36}
37
38impl fmt::Display for Value {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match self {
41 Value::Int32(v) => write!(f, "{v}"),
42 Value::UInt32(v) => write!(f, "{v}"),
43 Value::Float32(v) => write!(f, "{v}"),
44 Value::StringRef(r) => write!(f, "StringRef({})", r.offset()),
45 Value::Bool(v) => write!(f, "{v}"),
46 Value::UInt8(v) => write!(f, "{v}"),
47 Value::Int8(v) => write!(f, "{v}"),
48 Value::UInt16(v) => write!(f, "{v}"),
49 Value::Int16(v) => write!(f, "{v}"),
50 Value::Array(values) => {
51 write!(f, "[")?;
52 for (i, v) in values.iter().enumerate() {
53 if i > 0 {
54 write!(f, ", ")?;
55 }
56 write!(f, "{v}")?;
57 }
58 write!(f, "]")
59 }
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct Record {
67 values: Vec<Value>,
69 schema: Option<Arc<Schema>>,
71}
72
73impl Record {
74 pub(crate) fn new(values: Vec<Value>, schema: Option<Arc<Schema>>) -> Self {
76 Self { values, schema }
77 }
78
79 pub fn get_value(&self, index: usize) -> Option<&Value> {
81 self.values.get(index)
82 }
83
84 pub fn get_value_by_name(&self, name: &str) -> Option<&Value> {
86 if let Some(schema) = &self.schema {
87 let index = schema.fields.iter().position(|f| f.name == name)?;
88 self.values.get(index)
89 } else {
90 None
91 }
92 }
93
94 pub fn values(&self) -> &[Value] {
96 &self.values
97 }
98
99 pub fn schema(&self) -> Option<&Schema> {
101 self.schema.as_ref().map(|s| s.as_ref())
102 }
103
104 pub fn len(&self) -> usize {
106 self.values.len()
107 }
108
109 pub fn is_empty(&self) -> bool {
111 self.values.is_empty()
112 }
113}
114
115#[derive(Debug, Clone)]
117pub struct RecordSet {
118 records: Vec<Record>,
120 schema: Option<Arc<Schema>>,
122 string_block: StringBlock,
124 cached_string_block: Option<CachedStringBlock>,
126 key_map: Option<HashMap<Key, usize>>,
128 sorted_key_indices: Option<Vec<(Key, usize)>>,
130}
131
132impl RecordSet {
133 pub(crate) fn new(
135 records: Vec<Record>,
136 schema: Option<Arc<Schema>>,
137 string_block: StringBlock,
138 ) -> Self {
139 let key_map = if let Some(schema) = &schema {
140 if let Some(key_field_index) = schema.key_field_index {
141 let mut map = HashMap::with_capacity(records.len());
142 for (i, record) in records.iter().enumerate() {
143 if let Some(Value::UInt32(key)) = record.get_value(key_field_index) {
144 map.insert(*key, i);
145 }
146 }
147 Some(map)
148 } else {
149 None
150 }
151 } else {
152 None
153 };
154
155 Self {
156 records,
157 schema,
158 string_block,
159 cached_string_block: None,
160 key_map,
161 sorted_key_indices: None,
162 }
163 }
164
165 pub fn get_record(&self, index: usize) -> Option<&Record> {
167 self.records.get(index)
168 }
169
170 pub fn get_record_by_key(&self, key: Key) -> Option<&Record> {
172 if let Some(key_map) = &self.key_map {
173 let index = key_map.get(&key)?;
174 self.records.get(*index)
175 } else {
176 None
177 }
178 }
179
180 pub fn get_string(&self, string_ref: StringRef) -> Result<&str> {
182 if let Some(cached) = &self.cached_string_block {
183 cached.get_string(string_ref)
184 } else {
185 self.string_block.get_string(string_ref)
186 }
187 }
188
189 pub fn records(&self) -> &[Record] {
191 &self.records
192 }
193
194 pub fn schema(&self) -> Option<&Schema> {
196 self.schema.as_ref().map(|s| s.as_ref())
197 }
198
199 pub fn string_block(&self) -> &StringBlock {
201 &self.string_block
202 }
203
204 pub fn len(&self) -> usize {
206 self.records.len()
207 }
208
209 pub fn is_empty(&self) -> bool {
211 self.records.is_empty()
212 }
213
214 pub fn enable_string_caching(&mut self) {
216 self.cached_string_block = Some(CachedStringBlock::from_string_block(&self.string_block));
217 }
218
219 pub fn create_sorted_key_map(&mut self) -> Result<()> {
221 if self.schema.is_none() || self.schema.as_ref().unwrap().key_field_index.is_none() {
222 return Err(Error::InvalidRecord(
223 "No key field defined in schema".to_string(),
224 ));
225 }
226
227 let key_field_index = self.schema.as_ref().unwrap().key_field_index.unwrap();
228
229 let mut key_indices: Vec<(Key, usize)> = self
231 .records
232 .iter()
233 .enumerate()
234 .filter_map(|(i, record)| {
235 if let Some(Value::UInt32(key)) = record.get_value(key_field_index) {
236 Some((*key, i))
237 } else {
238 None
239 }
240 })
241 .collect();
242
243 key_indices.sort_by_key(|&(key, _)| key);
245
246 let mut map = HashMap::with_capacity(key_indices.len());
248 for (key, index) in &key_indices {
249 map.insert(*key, *index);
250 }
251
252 self.key_map = Some(map);
253
254 self.sorted_key_indices = Some(key_indices);
256
257 Ok(())
258 }
259
260 pub fn get_record_by_key_binary_search(&self, key: Key) -> Option<&Record> {
262 if let Some(sorted_key_indices) = &self.sorted_key_indices {
263 let result = sorted_key_indices.binary_search_by_key(&key, |&(k, _)| k);
265
266 if let Ok(pos) = result {
267 let (_, index) = sorted_key_indices[pos];
268 self.records.get(index)
269 } else {
270 None
271 }
272 } else {
273 self.get_record_by_key(key)
275 }
276 }
277}
278
279#[derive(Debug)]
281pub struct DbcParser {
282 header: DbcHeader,
284 schema: Option<Arc<Schema>>,
286 pub(crate) data: Vec<u8>,
288 version: DbcVersion,
290}
291
292impl DbcParser {
293 pub fn parse<R: Read + Seek>(reader: &mut R) -> Result<Self> {
295 let version = DbcVersion::detect(reader)?;
297
298 let header = match version {
300 DbcVersion::WDBC => DbcHeader::parse(reader)?,
301 DbcVersion::WDB2 => {
302 let wdb2_header = Wdb2Header::parse(reader)?;
303 wdb2_header.to_dbc_header()
304 }
305 DbcVersion::WDB5 => {
306 let wdb5_header = Wdb5Header::parse(reader)?;
307 wdb5_header.to_dbc_header()
308 }
309 _ => {
310 return Err(Error::InvalidHeader(format!(
311 "Unsupported DBC version: {version:?}"
312 )));
313 }
314 };
315
316 reader.seek(SeekFrom::Start(0))?;
318
319 let mut data = Vec::with_capacity(header.total_size() as usize);
321 reader.read_to_end(&mut data)?;
322
323 Ok(Self {
324 header,
325 schema: None,
326 data,
327 version,
328 })
329 }
330
331 pub fn parse_bytes(bytes: &[u8]) -> Result<Self> {
333 let mut cursor = Cursor::new(bytes);
334 Self::parse(&mut cursor)
335 }
336
337 pub fn with_schema(mut self, mut schema: Schema) -> Result<Self> {
339 schema
340 .validate(self.header.field_count, self.header.record_size)
341 .map_err(Error::SchemaValidation)?;
342
343 self.schema = Some(Arc::new(schema));
344 Ok(self)
345 }
346
347 pub fn parse_records(&self) -> Result<RecordSet> {
349 let mut cursor = Cursor::new(self.data.as_slice());
350
351 cursor.seek(SeekFrom::Start(DbcHeader::SIZE as u64))?;
353
354 let mut records = Vec::with_capacity(self.header.record_count as usize);
355
356 for _ in 0..self.header.record_count {
357 let record = if let Some(schema) = &self.schema {
358 self.parse_record_with_schema(&mut cursor, schema)?
359 } else {
360 self.parse_record_raw(&mut cursor)?
361 };
362 records.push(record);
363 }
364
365 let string_block = StringBlock::parse(
367 &mut cursor,
368 self.header.string_block_offset(),
369 self.header.string_block_size,
370 )?;
371
372 Ok(RecordSet::new(records, self.schema.clone(), string_block))
373 }
374
375 fn parse_record_with_schema(
377 &self,
378 cursor: &mut Cursor<&[u8]>,
379 schema: &Arc<Schema>,
380 ) -> Result<Record> {
381 let mut values = Vec::with_capacity(schema.fields.len());
382
383 for field in &schema.fields {
384 let value = if field.is_array {
385 let array_size = field.array_size.unwrap_or(0);
386 let mut array_values = Vec::with_capacity(array_size);
387
388 for _ in 0..array_size {
389 array_values.push(self.parse_field_value(cursor, field.field_type)?);
390 }
391
392 Value::Array(array_values)
393 } else {
394 self.parse_field_value(cursor, field.field_type)?
395 };
396
397 values.push(value);
398 }
399
400 Ok(Record::new(values, Some(Arc::clone(schema))))
401 }
402
403 fn parse_record_raw(&self, cursor: &mut Cursor<&[u8]>) -> Result<Record> {
405 let mut values = Vec::with_capacity(self.header.field_count as usize);
406
407 for _ in 0..self.header.field_count {
408 let mut buf = [0u8; 4];
410 cursor.read_exact(&mut buf)?;
411 let value = u32::from_le_bytes(buf);
412 values.push(Value::UInt32(value));
413 }
414
415 Ok(Record::new(values, None))
416 }
417
418 fn parse_field_value(
420 &self,
421 cursor: &mut Cursor<&[u8]>,
422 field_type: FieldType,
423 ) -> Result<Value> {
424 crate::field_parser::parse_field_value(cursor, field_type)
425 }
426
427 pub fn header(&self) -> &DbcHeader {
429 &self.header
430 }
431
432 pub fn schema(&self) -> Option<&Schema> {
434 self.schema.as_ref().map(|s| s.as_ref())
435 }
436
437 pub fn version(&self) -> DbcVersion {
439 self.version
440 }
441
442 pub fn data(&self) -> &[u8] {
444 &self.data
445 }
446}