1use crate::{
4 CachedStringBlock, DbcHeader, Error, FieldType, Result, Schema, StringBlock, StringRef,
5 types::*,
6 versions::{DbcVersion, Wdb2Header, Wdb5Header},
7};
8use std::collections::HashMap;
9use std::fmt;
10use std::io::{Cursor, Read, Seek, SeekFrom};
11use std::sync::Arc;
12
13#[derive(Debug, Clone)]
15pub enum Value {
16 Int32(i32),
18 UInt32(u32),
20 Float32(f32),
22 StringRef(StringRef),
24 Bool(bool),
26 UInt8(u8),
28 Int8(i8),
30 UInt16(u16),
32 Int16(i16),
34 Array(Vec<Value>),
36}
37
38impl fmt::Display for Value {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match self {
41 Value::Int32(v) => write!(f, "{v}"),
42 Value::UInt32(v) => write!(f, "{v}"),
43 Value::Float32(v) => write!(f, "{v}"),
44 Value::StringRef(r) => write!(f, "StringRef({})", r.offset()),
45 Value::Bool(v) => write!(f, "{v}"),
46 Value::UInt8(v) => write!(f, "{v}"),
47 Value::Int8(v) => write!(f, "{v}"),
48 Value::UInt16(v) => write!(f, "{v}"),
49 Value::Int16(v) => write!(f, "{v}"),
50 Value::Array(values) => {
51 write!(f, "[")?;
52 for (i, v) in values.iter().enumerate() {
53 if i > 0 {
54 write!(f, ", ")?;
55 }
56 write!(f, "{v}")?;
57 }
58 write!(f, "]")
59 }
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct Record {
67 values: Vec<Value>,
69 schema: Option<Arc<Schema>>,
71}
72
73impl Record {
74 pub(crate) fn new(values: Vec<Value>, schema: Option<Arc<Schema>>) -> Self {
76 Self { values, schema }
77 }
78
79 pub fn get_value(&self, index: usize) -> Option<&Value> {
81 self.values.get(index)
82 }
83
84 pub fn get_value_by_name(&self, name: &str) -> Option<&Value> {
86 if let Some(schema) = &self.schema {
87 let index = schema.fields.iter().position(|f| f.name == name)?;
88 self.values.get(index)
89 } else {
90 None
91 }
92 }
93
94 pub fn values(&self) -> &[Value] {
96 &self.values
97 }
98
99 pub fn schema(&self) -> Option<&Schema> {
101 self.schema.as_ref().map(|s| s.as_ref())
102 }
103
104 pub fn len(&self) -> usize {
106 self.values.len()
107 }
108
109 pub fn is_empty(&self) -> bool {
111 self.values.is_empty()
112 }
113}
114
115#[derive(Debug, Clone)]
117pub struct RecordSet {
118 records: Vec<Record>,
120 schema: Option<Arc<Schema>>,
122 string_block: StringBlock,
124 cached_string_block: Option<CachedStringBlock>,
126 key_map: Option<HashMap<Key, usize>>,
128 sorted_key_indices: Option<Vec<(Key, usize)>>,
130}
131
132impl RecordSet {
133 pub(crate) fn new(
135 records: Vec<Record>,
136 schema: Option<Arc<Schema>>,
137 string_block: StringBlock,
138 ) -> Self {
139 let key_map = if let Some(schema) = &schema {
140 if let Some(key_field_index) = schema.key_field_index {
141 let mut map = HashMap::with_capacity(records.len());
142 for (i, record) in records.iter().enumerate() {
143 if let Some(Value::UInt32(key)) = record.get_value(key_field_index) {
144 map.insert(*key, i);
145 }
146 }
147 Some(map)
148 } else {
149 None
150 }
151 } else {
152 None
153 };
154
155 Self {
156 records,
157 schema,
158 string_block,
159 cached_string_block: None,
160 key_map,
161 sorted_key_indices: None,
162 }
163 }
164
165 pub fn get_record(&self, index: usize) -> Option<&Record> {
167 self.records.get(index)
168 }
169
170 pub fn get_record_by_key(&self, key: Key) -> Option<&Record> {
172 if let Some(key_map) = &self.key_map {
173 let index = key_map.get(&key)?;
174 self.records.get(*index)
175 } else {
176 None
177 }
178 }
179
180 pub fn get_string(&self, string_ref: StringRef) -> Result<&str> {
182 if let Some(cached) = &self.cached_string_block {
183 cached.get_string(string_ref)
184 } else {
185 self.string_block.get_string(string_ref)
186 }
187 }
188
189 pub fn records(&self) -> &[Record] {
191 &self.records
192 }
193
194 pub fn schema(&self) -> Option<&Schema> {
196 self.schema.as_ref().map(|s| s.as_ref())
197 }
198
199 pub fn string_block(&self) -> &StringBlock {
201 &self.string_block
202 }
203
204 pub fn len(&self) -> usize {
206 self.records.len()
207 }
208
209 pub fn is_empty(&self) -> bool {
211 self.records.is_empty()
212 }
213
214 pub fn enable_string_caching(&mut self) {
216 self.cached_string_block = Some(CachedStringBlock::from_string_block(&self.string_block));
217 }
218
219 pub fn create_sorted_key_map(&mut self) -> Result<()> {
221 if self.schema.is_none() || self.schema.as_ref().unwrap().key_field_index.is_none() {
222 return Err(Error::InvalidRecord(
223 "No key field defined in schema".to_string(),
224 ));
225 }
226
227 let key_field_index = self.schema.as_ref().unwrap().key_field_index.unwrap();
228
229 let mut key_indices: Vec<(Key, usize)> = self
231 .records
232 .iter()
233 .enumerate()
234 .filter_map(|(i, record)| {
235 if let Some(Value::UInt32(key)) = record.get_value(key_field_index) {
236 Some((*key, i))
237 } else {
238 None
239 }
240 })
241 .collect();
242
243 key_indices.sort_by_key(|&(key, _)| key);
245
246 let mut map = HashMap::with_capacity(key_indices.len());
248 for (key, index) in &key_indices {
249 map.insert(*key, *index);
250 }
251
252 self.key_map = Some(map);
253
254 self.sorted_key_indices = Some(key_indices);
256
257 Ok(())
258 }
259
260 pub fn get_record_by_key_binary_search(&self, key: Key) -> Option<&Record> {
262 if let Some(sorted_key_indices) = &self.sorted_key_indices {
263 let result = sorted_key_indices.binary_search_by_key(&key, |&(k, _)| k);
265
266 if let Ok(pos) = result {
267 let (_, index) = sorted_key_indices[pos];
268 self.records.get(index)
269 } else {
270 None
271 }
272 } else {
273 self.get_record_by_key(key)
275 }
276 }
277}
278
279#[derive(Debug)]
281pub struct DbcParser {
282 header: DbcHeader,
284 schema: Option<Arc<Schema>>,
286 pub(crate) data: Vec<u8>,
288 version: DbcVersion,
290 record_data_offset: u64,
292 string_block_offset: u64,
294}
295
296impl DbcParser {
297 pub fn parse<R: Read + Seek>(reader: &mut R) -> Result<Self> {
299 let version = DbcVersion::detect(reader)?;
301
302 let (header, record_data_offset, string_block_offset) = match version {
304 DbcVersion::WDBC => {
305 let h = DbcHeader::parse(reader)?;
306 let record_offset = DbcHeader::SIZE as u64;
307 let string_offset = h.string_block_offset();
308 (h, record_offset, string_offset)
309 }
310 DbcVersion::WDB2 => {
311 let wdb2_header = Wdb2Header::parse(reader)?;
312 let record_offset = wdb2_header.record_data_offset();
313 let string_offset = wdb2_header.string_block_offset();
314 (wdb2_header.to_dbc_header(), record_offset, string_offset)
315 }
316 DbcVersion::WDB5 => {
317 let wdb5_header = Wdb5Header::parse(reader)?;
318 let record_offset = Wdb5Header::SIZE as u64;
319 let string_offset = wdb5_header.string_block_offset();
320 (wdb5_header.to_dbc_header(), record_offset, string_offset)
321 }
322 _ => {
323 return Err(Error::InvalidHeader(format!(
324 "Unsupported DBC version: {version:?}"
325 )));
326 }
327 };
328
329 reader.seek(SeekFrom::Start(0))?;
331
332 let mut data = Vec::new();
334 reader.read_to_end(&mut data)?;
335
336 Ok(Self {
337 header,
338 schema: None,
339 data,
340 version,
341 record_data_offset,
342 string_block_offset,
343 })
344 }
345
346 pub fn parse_bytes(bytes: &[u8]) -> Result<Self> {
348 let mut cursor = Cursor::new(bytes);
349 Self::parse(&mut cursor)
350 }
351
352 pub fn with_schema(mut self, mut schema: Schema) -> Result<Self> {
354 schema
355 .validate(self.header.field_count, self.header.record_size)
356 .map_err(Error::SchemaValidation)?;
357
358 self.schema = Some(Arc::new(schema));
359 Ok(self)
360 }
361
362 pub fn parse_records(&self) -> Result<RecordSet> {
364 let mut cursor = Cursor::new(self.data.as_slice());
365
366 cursor.seek(SeekFrom::Start(self.record_data_offset))?;
368
369 let mut records = Vec::with_capacity(self.header.record_count as usize);
370
371 for _ in 0..self.header.record_count {
372 let record = if let Some(schema) = &self.schema {
373 self.parse_record_with_schema(&mut cursor, schema)?
374 } else {
375 self.parse_record_raw(&mut cursor)?
376 };
377 records.push(record);
378 }
379
380 let string_block = StringBlock::parse(
382 &mut cursor,
383 self.string_block_offset,
384 self.header.string_block_size,
385 )?;
386
387 Ok(RecordSet::new(records, self.schema.clone(), string_block))
388 }
389
390 fn parse_record_with_schema(
392 &self,
393 cursor: &mut Cursor<&[u8]>,
394 schema: &Arc<Schema>,
395 ) -> Result<Record> {
396 let mut values = Vec::with_capacity(schema.fields.len());
397
398 for field in &schema.fields {
399 let value = if field.is_array {
400 let array_size = field.array_size.unwrap_or(0);
401 let mut array_values = Vec::with_capacity(array_size);
402
403 for _ in 0..array_size {
404 array_values.push(self.parse_field_value(cursor, field.field_type)?);
405 }
406
407 Value::Array(array_values)
408 } else {
409 self.parse_field_value(cursor, field.field_type)?
410 };
411
412 values.push(value);
413 }
414
415 Ok(Record::new(values, Some(Arc::clone(schema))))
416 }
417
418 fn parse_record_raw(&self, cursor: &mut Cursor<&[u8]>) -> Result<Record> {
420 let mut values = Vec::with_capacity(self.header.field_count as usize);
421
422 for _ in 0..self.header.field_count {
423 let mut buf = [0u8; 4];
425 cursor.read_exact(&mut buf)?;
426 let value = u32::from_le_bytes(buf);
427 values.push(Value::UInt32(value));
428 }
429
430 Ok(Record::new(values, None))
431 }
432
433 fn parse_field_value(
435 &self,
436 cursor: &mut Cursor<&[u8]>,
437 field_type: FieldType,
438 ) -> Result<Value> {
439 crate::field_parser::parse_field_value(cursor, field_type)
440 }
441
442 pub fn header(&self) -> &DbcHeader {
444 &self.header
445 }
446
447 pub fn schema(&self) -> Option<&Schema> {
449 self.schema.as_ref().map(|s| s.as_ref())
450 }
451
452 pub fn version(&self) -> DbcVersion {
454 self.version
455 }
456
457 pub fn data(&self) -> &[u8] {
459 &self.data
460 }
461}