Skip to main content

cqlite_core/schema/
parser.rs

1//! Schema-driven parser for CQLite
2//!
3//! This module provides strictly schema-driven parsing without any type guessing or fallback
4//! behavior. All parsing operations require explicit schema context.
5
6use crate::{
7    schema::{registry::ParsingContext, CqlType},
8    types::{ComparatorType, Value},
9    Error, Result,
10};
11use std::collections::HashMap;
12
13/// Schema-driven parser that requires explicit schema context for all operations
14#[derive(Debug)]
15pub struct SchemaParser {
16    /// Parsing context containing schema and comparators
17    context: ParsingContext,
18}
19
20impl SchemaParser {
21    /// Create a new schema-driven parser
22    pub fn new(context: ParsingContext) -> Result<Self> {
23        if !context.is_complete() {
24            return Err(Error::Schema(
25                "Incomplete parsing context: schema must be fully defined".to_string(),
26            ));
27        }
28        Ok(Self { context })
29    }
30
31    /// Parse a partition key using the schema's partition key comparators
32    pub fn parse_partition_key(&self, data: &[u8]) -> Result<Vec<Value>> {
33        if self.context.partition_comparators.is_empty() {
34            return Err(Error::Schema(
35                "No partition key comparators defined in schema".to_string(),
36            ));
37        }
38
39        let mut values = Vec::new();
40        let mut offset = 0;
41
42        for (idx, comparator) in self.context.partition_comparators.iter().enumerate() {
43            let key_column = &self.context.schema.partition_keys[idx];
44            let (value, consumed) = self.parse_value_with_comparator(
45                &data[offset..],
46                comparator,
47                &key_column.data_type,
48            )?;
49            values.push(value);
50            offset += consumed;
51        }
52
53        Ok(values)
54    }
55
56    /// Parse clustering keys using the schema's clustering key comparators
57    pub fn parse_clustering_keys(&self, data: &[u8]) -> Result<Vec<Value>> {
58        if self.context.clustering_comparators.is_empty() {
59            return Ok(Vec::new()); // No clustering keys is valid
60        }
61
62        let mut values = Vec::new();
63        let mut offset = 0;
64
65        for (idx, comparator) in self.context.clustering_comparators.iter().enumerate() {
66            if offset >= data.len() {
67                break; // Partial clustering keys are valid
68            }
69
70            let key_column = &self.context.schema.clustering_keys[idx];
71            let (value, consumed) = self.parse_value_with_comparator(
72                &data[offset..],
73                comparator,
74                &key_column.data_type,
75            )?;
76            values.push(value);
77            offset += consumed;
78        }
79
80        Ok(values)
81    }
82
83    /// Parse a column value using the schema's column type
84    /// Returns the parsed value and the number of bytes consumed
85    pub fn parse_column_value(&self, column_name: &str, data: &[u8]) -> Result<(Value, usize)> {
86        let comparator = self
87            .context
88            .get_column_comparator(column_name)
89            .ok_or_else(|| {
90                Error::Schema(format!(
91                    "Column '{}' not found in schema for {}.{}",
92                    column_name, self.context.schema.keyspace, self.context.schema.table
93                ))
94            })?;
95
96        let _column = self
97            .context
98            .schema
99            .columns
100            .iter()
101            .find(|c| c.name == column_name)
102            .ok_or_else(|| Error::Schema(format!("Column '{}' not found", column_name)))?;
103
104        // Use the provided comparator directly instead of re-parsing the type string
105        // This avoids issues with UDT types that may not be resolvable from string alone
106        self.parse_value_with_provided_comparator(data, comparator)
107    }
108
109    /// Parse a value using a specific comparator and type string
110    fn parse_value_with_comparator(
111        &self,
112        data: &[u8],
113        comparator: &ComparatorType,
114        type_str: &str,
115    ) -> Result<(Value, usize)> {
116        let cql_type = CqlType::parse(type_str)?;
117        self.parse_typed_value(data, &cql_type, comparator)
118    }
119
120    /// Parse a value using only the provided comparator (no type string parsing)
121    fn parse_value_with_provided_comparator(
122        &self,
123        data: &[u8],
124        comparator: &ComparatorType,
125    ) -> Result<(Value, usize)> {
126        // Convert the comparator back to a CqlType for parsing
127        let cql_type = self.comparator_to_cql_type(comparator)?;
128        self.parse_typed_value(data, &cql_type, comparator)
129    }
130
131    /// Parse a value with explicit CQL type and comparator
132    fn parse_typed_value(
133        &self,
134        data: &[u8],
135        cql_type: &CqlType,
136        comparator: &ComparatorType,
137    ) -> Result<(Value, usize)> {
138        // This is where the actual parsing happens based on the CQL type
139        // Each type has its specific binary format that we parse deterministically
140        match cql_type {
141            CqlType::Boolean => self.parse_boolean(data),
142            CqlType::TinyInt => self.parse_tinyint(data),
143            CqlType::SmallInt => self.parse_smallint(data),
144            CqlType::Int => self.parse_int(data),
145            CqlType::BigInt => self.parse_bigint(data),
146            CqlType::Counter => self.parse_counter(data),
147            CqlType::Float => self.parse_float(data),
148            CqlType::Double => self.parse_double(data),
149            CqlType::Text | CqlType::Varchar | CqlType::Ascii => self.parse_text(data),
150            CqlType::Blob => self.parse_blob(data),
151            CqlType::Timestamp => self.parse_timestamp(data),
152            CqlType::Uuid | CqlType::TimeUuid => self.parse_uuid(data),
153            CqlType::List(elem_type) => self.parse_list(data, elem_type, comparator),
154            CqlType::Set(elem_type) => self.parse_set(data, elem_type, comparator),
155            CqlType::Map(key_type, val_type) => {
156                self.parse_map(data, key_type, val_type, comparator)
157            }
158            CqlType::Tuple(field_types) => self.parse_tuple(data, field_types, comparator),
159            CqlType::Udt(type_name, fields) => self.parse_udt(data, type_name, fields, comparator),
160            CqlType::Frozen(inner_type) => self.parse_frozen(data, inner_type, comparator),
161            _ => Err(Error::Schema(format!(
162                "Unsupported type for schema-driven parsing: {:?}",
163                cql_type
164            ))),
165        }
166    }
167
168    // Type-specific parsing methods (no guessing, strict format adherence)
169
170    fn parse_boolean(&self, data: &[u8]) -> Result<(Value, usize)> {
171        if data.is_empty() {
172            return Err(Error::schema("Insufficient data for boolean".to_string()));
173        }
174        Ok((Value::Boolean(data[0] != 0), 1))
175    }
176
177    fn parse_tinyint(&self, data: &[u8]) -> Result<(Value, usize)> {
178        if data.is_empty() {
179            return Err(Error::schema("Insufficient data for tinyint".to_string()));
180        }
181        Ok((Value::TinyInt(data[0] as i8), 1))
182    }
183
184    fn parse_smallint(&self, data: &[u8]) -> Result<(Value, usize)> {
185        if data.len() < 2 {
186            return Err(Error::schema("Insufficient data for smallint".to_string()));
187        }
188        let value = i16::from_be_bytes([data[0], data[1]]);
189        Ok((Value::SmallInt(value), 2))
190    }
191
192    fn parse_int(&self, data: &[u8]) -> Result<(Value, usize)> {
193        if data.len() < 4 {
194            return Err(Error::schema("Insufficient data for int".to_string()));
195        }
196        let value = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
197        Ok((Value::Integer(value), 4))
198    }
199
200    fn parse_bigint(&self, data: &[u8]) -> Result<(Value, usize)> {
201        if data.len() < 8 {
202            return Err(Error::schema("Insufficient data for bigint".to_string()));
203        }
204        let mut bytes = [0u8; 8];
205        bytes.copy_from_slice(&data[0..8]);
206        let value = i64::from_be_bytes(bytes);
207        Ok((Value::BigInt(value), 8))
208    }
209
210    fn parse_counter(&self, data: &[u8]) -> Result<(Value, usize)> {
211        if data.len() < 8 {
212            return Err(Error::schema("Insufficient data for counter".to_string()));
213        }
214        let mut bytes = [0u8; 8];
215        bytes.copy_from_slice(&data[0..8]);
216        let value = i64::from_be_bytes(bytes);
217        Ok((Value::Counter(value), 8))
218    }
219
220    fn parse_float(&self, data: &[u8]) -> Result<(Value, usize)> {
221        if data.len() < 4 {
222            return Err(Error::schema("Insufficient data for float".to_string()));
223        }
224        let mut bytes = [0u8; 4];
225        bytes.copy_from_slice(&data[0..4]);
226        let value = f32::from_be_bytes(bytes);
227        Ok((Value::Float32(value), 4))
228    }
229
230    fn parse_double(&self, data: &[u8]) -> Result<(Value, usize)> {
231        if data.len() < 8 {
232            return Err(Error::schema("Insufficient data for double".to_string()));
233        }
234        let mut bytes = [0u8; 8];
235        bytes.copy_from_slice(&data[0..8]);
236        let value = f64::from_be_bytes(bytes);
237        Ok((Value::Float(value), 8))
238    }
239
240    fn parse_text(&self, data: &[u8]) -> Result<(Value, usize)> {
241        // Text is typically length-prefixed
242        if data.len() < 4 {
243            return Err(Error::schema(
244                "Insufficient data for text length".to_string(),
245            ));
246        }
247        let len = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
248        if data.len() < 4 + len {
249            return Err(Error::schema(
250                "Insufficient data for text content".to_string(),
251            ));
252        }
253        let text = String::from_utf8(data[4..4 + len].to_vec())
254            .map_err(|e| Error::schema(format!("Invalid UTF-8: {}", e)))?;
255        Ok((Value::Text(text), 4 + len))
256    }
257
258    fn parse_blob(&self, data: &[u8]) -> Result<(Value, usize)> {
259        // Blob is typically length-prefixed
260        if data.len() < 4 {
261            return Err(Error::schema(
262                "Insufficient data for blob length".to_string(),
263            ));
264        }
265        let len = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
266        if data.len() < 4 + len {
267            return Err(Error::schema(
268                "Insufficient data for blob content".to_string(),
269            ));
270        }
271        Ok((Value::Blob(data[4..4 + len].to_vec()), 4 + len))
272    }
273
274    fn parse_timestamp(&self, data: &[u8]) -> Result<(Value, usize)> {
275        if data.len() < 8 {
276            return Err(Error::schema("Insufficient data for timestamp".to_string()));
277        }
278        let mut bytes = [0u8; 8];
279        bytes.copy_from_slice(&data[0..8]);
280        let millis = i64::from_be_bytes(bytes);
281        Ok((Value::Timestamp(millis), 8))
282    }
283
284    fn parse_uuid(&self, data: &[u8]) -> Result<(Value, usize)> {
285        if data.len() < 16 {
286            return Err(Error::schema("Insufficient data for UUID".to_string()));
287        }
288        let uuid_bytes: [u8; 16] = data[0..16]
289            .try_into()
290            .map_err(|_| Error::schema("Invalid UUID bytes".to_string()))?;
291        Ok((Value::Uuid(uuid_bytes), 16))
292    }
293
294    fn parse_list(
295        &self,
296        data: &[u8],
297        elem_type: &CqlType,
298        _comparator: &ComparatorType,
299    ) -> Result<(Value, usize)> {
300        let mut offset = 0;
301
302        // Parse collection size
303        if data.len() < 4 {
304            return Err(Error::schema("Insufficient data for list size".to_string()));
305        }
306        let count = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
307        offset += 4;
308
309        let mut elements = Vec::with_capacity(count);
310        let elem_comparator = ComparatorType::from_cql_type(elem_type)?;
311
312        for _ in 0..count {
313            let (value, consumed) =
314                self.parse_typed_value(&data[offset..], elem_type, &elem_comparator)?;
315            elements.push(value);
316            offset += consumed;
317        }
318
319        Ok((Value::List(elements), offset))
320    }
321
322    fn parse_set(
323        &self,
324        data: &[u8],
325        elem_type: &CqlType,
326        _comparator: &ComparatorType,
327    ) -> Result<(Value, usize)> {
328        let mut offset = 0;
329
330        // Parse collection size
331        if data.len() < 4 {
332            return Err(Error::schema("Insufficient data for set size".to_string()));
333        }
334        let count = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
335        offset += 4;
336
337        let mut elements = Vec::with_capacity(count);
338        let elem_comparator = ComparatorType::from_cql_type(elem_type)?;
339
340        for _ in 0..count {
341            let (value, consumed) =
342                self.parse_typed_value(&data[offset..], elem_type, &elem_comparator)?;
343            elements.push(value);
344            offset += consumed;
345        }
346
347        Ok((Value::Set(elements), offset))
348    }
349
350    fn parse_map(
351        &self,
352        data: &[u8],
353        key_type: &CqlType,
354        val_type: &CqlType,
355        _comparator: &ComparatorType,
356    ) -> Result<(Value, usize)> {
357        let mut offset = 0;
358
359        // Parse collection size
360        if data.len() < 4 {
361            return Err(Error::schema("Insufficient data for map size".to_string()));
362        }
363        let count = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
364        offset += 4;
365
366        let mut map = Vec::with_capacity(count);
367        let key_comparator = ComparatorType::from_cql_type(key_type)?;
368        let val_comparator = ComparatorType::from_cql_type(val_type)?;
369
370        for _ in 0..count {
371            let (key, key_consumed) =
372                self.parse_typed_value(&data[offset..], key_type, &key_comparator)?;
373            offset += key_consumed;
374
375            let (value, val_consumed) =
376                self.parse_typed_value(&data[offset..], val_type, &val_comparator)?;
377            offset += val_consumed;
378
379            map.push((key, value));
380        }
381
382        Ok((Value::Map(map), offset))
383    }
384
385    fn parse_tuple(
386        &self,
387        data: &[u8],
388        field_types: &[CqlType],
389        _comparator: &ComparatorType,
390    ) -> Result<(Value, usize)> {
391        let mut offset = 0;
392        let mut values = Vec::with_capacity(field_types.len());
393
394        for field_type in field_types {
395            let field_comparator = ComparatorType::from_cql_type(field_type)?;
396            let (value, consumed) =
397                self.parse_typed_value(&data[offset..], field_type, &field_comparator)?;
398            values.push(value);
399            offset += consumed;
400        }
401
402        Ok((Value::Tuple(values), offset))
403    }
404
405    fn parse_udt(
406        &self,
407        data: &[u8],
408        type_name: &str,
409        fields: &[(String, CqlType)],
410        _comparator: &ComparatorType,
411    ) -> Result<(Value, usize)> {
412        let mut offset = 0;
413        let mut field_values = Vec::with_capacity(fields.len());
414
415        for (field_name, field_type) in fields {
416            let field_comparator = ComparatorType::from_cql_type(field_type)?;
417
418            // Check for null field (length = -1)
419            if data.len() >= offset + 4 {
420                let field_len = i32::from_be_bytes([
421                    data[offset],
422                    data[offset + 1],
423                    data[offset + 2],
424                    data[offset + 3],
425                ]);
426
427                if field_len < 0 {
428                    // Null field
429                    field_values.push(crate::types::UdtField {
430                        name: field_name.clone(),
431                        value: None,
432                    });
433                    offset += 4;
434                    continue;
435                }
436            }
437
438            // Non-null field: parse the field data directly (no per-field length prefix)
439            // Note: The overall UDT may have a length prefix, but individual fields within
440            // the UDT are serialized without their own length prefixes
441            let (value, consumed) =
442                self.parse_typed_value(&data[offset..], field_type, &field_comparator)?;
443            field_values.push(crate::types::UdtField {
444                name: field_name.clone(),
445                value: Some(value),
446            });
447            offset += consumed;
448        }
449
450        Ok((
451            Value::Udt(crate::types::UdtValue {
452                type_name: type_name.to_string(),
453                keyspace: self.context.schema.keyspace.clone(),
454                fields: field_values,
455            }),
456            offset,
457        ))
458    }
459
460    fn parse_frozen(
461        &self,
462        data: &[u8],
463        inner_type: &CqlType,
464        _comparator: &ComparatorType,
465    ) -> Result<(Value, usize)> {
466        // Frozen types are serialized the same as their inner type
467        let inner_comparator = ComparatorType::from_cql_type(inner_type)?;
468        let (inner_value, consumed) =
469            self.parse_typed_value(data, inner_type, &inner_comparator)?;
470        Ok((Value::Frozen(Box::new(inner_value)), consumed))
471    }
472
473    /// Parse a row with all column values using schema
474    /// Returns a HashMap of column names to values with deterministic consumed-byte tracking
475    pub fn parse_row(&self, data: &[u8]) -> Result<HashMap<String, Value>> {
476        let mut row = HashMap::new();
477        let mut offset = 0;
478
479        for column in &self.context.schema.columns {
480            if offset >= data.len() {
481                // Remaining columns are null
482                row.insert(column.name.clone(), Value::Null);
483                continue;
484            }
485
486            // Check for null value (length = -1)
487            if data.len() >= offset + 4 {
488                let value_len = i32::from_be_bytes([
489                    data[offset],
490                    data[offset + 1],
491                    data[offset + 2],
492                    data[offset + 3],
493                ]);
494
495                if value_len < 0 {
496                    row.insert(column.name.clone(), Value::Null);
497                    offset += 4;
498                    continue;
499                }
500            }
501
502            // Parse column value and track exact consumed bytes
503            let (value, consumed) = self.parse_column_value(&column.name, &data[offset..])?;
504            row.insert(column.name.clone(), value);
505            offset += consumed;
506        }
507
508        Ok(row)
509    }
510
511    /// Convert a ComparatorType back to CqlType for parsing
512    #[allow(clippy::only_used_in_recursion)]
513    fn comparator_to_cql_type(&self, comparator: &ComparatorType) -> Result<CqlType> {
514        match comparator {
515            ComparatorType::Boolean => Ok(CqlType::Boolean),
516            ComparatorType::TinyInt => Ok(CqlType::TinyInt),
517            ComparatorType::SmallInt => Ok(CqlType::SmallInt),
518            ComparatorType::Int => Ok(CqlType::Int),
519            ComparatorType::BigInt => Ok(CqlType::BigInt),
520            ComparatorType::Counter => Ok(CqlType::Counter),
521            ComparatorType::Float32 => Ok(CqlType::Float),
522            ComparatorType::Float => Ok(CqlType::Double),
523            ComparatorType::Text => Ok(CqlType::Text),
524            ComparatorType::Blob => Ok(CqlType::Blob),
525            ComparatorType::Timestamp => Ok(CqlType::Timestamp),
526            ComparatorType::Uuid => Ok(CqlType::Uuid),
527            ComparatorType::Varint => Ok(CqlType::Custom("varint".to_string())),
528            ComparatorType::Decimal => Ok(CqlType::Decimal),
529            ComparatorType::Duration => Ok(CqlType::Duration),
530            ComparatorType::Date => Ok(CqlType::Date),
531            ComparatorType::Json => Ok(CqlType::Custom("json".to_string())),
532            ComparatorType::List(elem_comparator) => {
533                let elem_type = self.comparator_to_cql_type(elem_comparator)?;
534                Ok(CqlType::List(Box::new(elem_type)))
535            }
536            ComparatorType::Set(elem_comparator) => {
537                let elem_type = self.comparator_to_cql_type(elem_comparator)?;
538                Ok(CqlType::Set(Box::new(elem_type)))
539            }
540            ComparatorType::Map(key_comparator, val_comparator) => {
541                let key_type = self.comparator_to_cql_type(key_comparator)?;
542                let val_type = self.comparator_to_cql_type(val_comparator)?;
543                Ok(CqlType::Map(Box::new(key_type), Box::new(val_type)))
544            }
545            ComparatorType::Tuple(field_comparators) => {
546                let mut field_types = Vec::new();
547                for field_comparator in field_comparators {
548                    field_types.push(self.comparator_to_cql_type(field_comparator)?);
549                }
550                Ok(CqlType::Tuple(field_types))
551            }
552            ComparatorType::Udt {
553                type_name,
554                field_comparators,
555                ..
556            } => {
557                let mut fields = Vec::new();
558                for (field_name, field_comparator) in field_comparators {
559                    let field_type = self.comparator_to_cql_type(field_comparator)?;
560                    fields.push((field_name.clone(), field_type));
561                }
562                Ok(CqlType::Udt(type_name.clone(), fields))
563            }
564            ComparatorType::Frozen(inner_comparator) => {
565                let inner_type = self.comparator_to_cql_type(inner_comparator)?;
566                Ok(CqlType::Frozen(Box::new(inner_type)))
567            }
568            ComparatorType::Custom(type_name) => Ok(CqlType::Custom(type_name.clone())),
569        }
570    }
571}
572
573#[cfg(test)]
574#[path = "parser_tests.rs"]
575mod tests;