1use crate::{
7 schema::{registry::ParsingContext, CqlType},
8 types::{ComparatorType, Value},
9 Error, Result,
10};
11use std::collections::HashMap;
12
13#[derive(Debug)]
15pub struct SchemaParser {
16 context: ParsingContext,
18}
19
20impl SchemaParser {
21 pub fn new(context: ParsingContext) -> Result<Self> {
23 if !context.is_complete() {
24 return Err(Error::Schema(
25 "Incomplete parsing context: schema must be fully defined".to_string(),
26 ));
27 }
28 Ok(Self { context })
29 }
30
31 pub fn parse_partition_key(&self, data: &[u8]) -> Result<Vec<Value>> {
33 if self.context.partition_comparators.is_empty() {
34 return Err(Error::Schema(
35 "No partition key comparators defined in schema".to_string(),
36 ));
37 }
38
39 let mut values = Vec::new();
40 let mut offset = 0;
41
42 for (idx, comparator) in self.context.partition_comparators.iter().enumerate() {
43 let key_column = &self.context.schema.partition_keys[idx];
44 let (value, consumed) = self.parse_value_with_comparator(
45 &data[offset..],
46 comparator,
47 &key_column.data_type,
48 )?;
49 values.push(value);
50 offset += consumed;
51 }
52
53 Ok(values)
54 }
55
56 pub fn parse_clustering_keys(&self, data: &[u8]) -> Result<Vec<Value>> {
58 if self.context.clustering_comparators.is_empty() {
59 return Ok(Vec::new()); }
61
62 let mut values = Vec::new();
63 let mut offset = 0;
64
65 for (idx, comparator) in self.context.clustering_comparators.iter().enumerate() {
66 if offset >= data.len() {
67 break; }
69
70 let key_column = &self.context.schema.clustering_keys[idx];
71 let (value, consumed) = self.parse_value_with_comparator(
72 &data[offset..],
73 comparator,
74 &key_column.data_type,
75 )?;
76 values.push(value);
77 offset += consumed;
78 }
79
80 Ok(values)
81 }
82
83 pub fn parse_column_value(&self, column_name: &str, data: &[u8]) -> Result<(Value, usize)> {
86 let comparator = self
87 .context
88 .get_column_comparator(column_name)
89 .ok_or_else(|| {
90 Error::Schema(format!(
91 "Column '{}' not found in schema for {}.{}",
92 column_name, self.context.schema.keyspace, self.context.schema.table
93 ))
94 })?;
95
96 let _column = self
97 .context
98 .schema
99 .columns
100 .iter()
101 .find(|c| c.name == column_name)
102 .ok_or_else(|| Error::Schema(format!("Column '{}' not found", column_name)))?;
103
104 self.parse_value_with_provided_comparator(data, comparator)
107 }
108
109 fn parse_value_with_comparator(
111 &self,
112 data: &[u8],
113 comparator: &ComparatorType,
114 type_str: &str,
115 ) -> Result<(Value, usize)> {
116 let cql_type = CqlType::parse(type_str)?;
117 self.parse_typed_value(data, &cql_type, comparator)
118 }
119
120 fn parse_value_with_provided_comparator(
122 &self,
123 data: &[u8],
124 comparator: &ComparatorType,
125 ) -> Result<(Value, usize)> {
126 let cql_type = self.comparator_to_cql_type(comparator)?;
128 self.parse_typed_value(data, &cql_type, comparator)
129 }
130
131 fn parse_typed_value(
133 &self,
134 data: &[u8],
135 cql_type: &CqlType,
136 comparator: &ComparatorType,
137 ) -> Result<(Value, usize)> {
138 match cql_type {
141 CqlType::Boolean => self.parse_boolean(data),
142 CqlType::TinyInt => self.parse_tinyint(data),
143 CqlType::SmallInt => self.parse_smallint(data),
144 CqlType::Int => self.parse_int(data),
145 CqlType::BigInt => self.parse_bigint(data),
146 CqlType::Counter => self.parse_counter(data),
147 CqlType::Float => self.parse_float(data),
148 CqlType::Double => self.parse_double(data),
149 CqlType::Text | CqlType::Varchar | CqlType::Ascii => self.parse_text(data),
150 CqlType::Blob => self.parse_blob(data),
151 CqlType::Timestamp => self.parse_timestamp(data),
152 CqlType::Uuid | CqlType::TimeUuid => self.parse_uuid(data),
153 CqlType::List(elem_type) => self.parse_list(data, elem_type, comparator),
154 CqlType::Set(elem_type) => self.parse_set(data, elem_type, comparator),
155 CqlType::Map(key_type, val_type) => {
156 self.parse_map(data, key_type, val_type, comparator)
157 }
158 CqlType::Tuple(field_types) => self.parse_tuple(data, field_types, comparator),
159 CqlType::Udt(type_name, fields) => self.parse_udt(data, type_name, fields, comparator),
160 CqlType::Frozen(inner_type) => self.parse_frozen(data, inner_type, comparator),
161 _ => Err(Error::Schema(format!(
162 "Unsupported type for schema-driven parsing: {:?}",
163 cql_type
164 ))),
165 }
166 }
167
168 fn parse_boolean(&self, data: &[u8]) -> Result<(Value, usize)> {
171 if data.is_empty() {
172 return Err(Error::schema("Insufficient data for boolean".to_string()));
173 }
174 Ok((Value::Boolean(data[0] != 0), 1))
175 }
176
177 fn parse_tinyint(&self, data: &[u8]) -> Result<(Value, usize)> {
178 if data.is_empty() {
179 return Err(Error::schema("Insufficient data for tinyint".to_string()));
180 }
181 Ok((Value::TinyInt(data[0] as i8), 1))
182 }
183
184 fn parse_smallint(&self, data: &[u8]) -> Result<(Value, usize)> {
185 if data.len() < 2 {
186 return Err(Error::schema("Insufficient data for smallint".to_string()));
187 }
188 let value = i16::from_be_bytes([data[0], data[1]]);
189 Ok((Value::SmallInt(value), 2))
190 }
191
192 fn parse_int(&self, data: &[u8]) -> Result<(Value, usize)> {
193 if data.len() < 4 {
194 return Err(Error::schema("Insufficient data for int".to_string()));
195 }
196 let value = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
197 Ok((Value::Integer(value), 4))
198 }
199
200 fn parse_bigint(&self, data: &[u8]) -> Result<(Value, usize)> {
201 if data.len() < 8 {
202 return Err(Error::schema("Insufficient data for bigint".to_string()));
203 }
204 let mut bytes = [0u8; 8];
205 bytes.copy_from_slice(&data[0..8]);
206 let value = i64::from_be_bytes(bytes);
207 Ok((Value::BigInt(value), 8))
208 }
209
210 fn parse_counter(&self, data: &[u8]) -> Result<(Value, usize)> {
211 if data.len() < 8 {
212 return Err(Error::schema("Insufficient data for counter".to_string()));
213 }
214 let mut bytes = [0u8; 8];
215 bytes.copy_from_slice(&data[0..8]);
216 let value = i64::from_be_bytes(bytes);
217 Ok((Value::Counter(value), 8))
218 }
219
220 fn parse_float(&self, data: &[u8]) -> Result<(Value, usize)> {
221 if data.len() < 4 {
222 return Err(Error::schema("Insufficient data for float".to_string()));
223 }
224 let mut bytes = [0u8; 4];
225 bytes.copy_from_slice(&data[0..4]);
226 let value = f32::from_be_bytes(bytes);
227 Ok((Value::Float32(value), 4))
228 }
229
230 fn parse_double(&self, data: &[u8]) -> Result<(Value, usize)> {
231 if data.len() < 8 {
232 return Err(Error::schema("Insufficient data for double".to_string()));
233 }
234 let mut bytes = [0u8; 8];
235 bytes.copy_from_slice(&data[0..8]);
236 let value = f64::from_be_bytes(bytes);
237 Ok((Value::Float(value), 8))
238 }
239
240 fn parse_text(&self, data: &[u8]) -> Result<(Value, usize)> {
241 if data.len() < 4 {
243 return Err(Error::schema(
244 "Insufficient data for text length".to_string(),
245 ));
246 }
247 let len = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
248 if data.len() < 4 + len {
249 return Err(Error::schema(
250 "Insufficient data for text content".to_string(),
251 ));
252 }
253 let text = String::from_utf8(data[4..4 + len].to_vec())
254 .map_err(|e| Error::schema(format!("Invalid UTF-8: {}", e)))?;
255 Ok((Value::Text(text), 4 + len))
256 }
257
258 fn parse_blob(&self, data: &[u8]) -> Result<(Value, usize)> {
259 if data.len() < 4 {
261 return Err(Error::schema(
262 "Insufficient data for blob length".to_string(),
263 ));
264 }
265 let len = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
266 if data.len() < 4 + len {
267 return Err(Error::schema(
268 "Insufficient data for blob content".to_string(),
269 ));
270 }
271 Ok((Value::Blob(data[4..4 + len].to_vec()), 4 + len))
272 }
273
274 fn parse_timestamp(&self, data: &[u8]) -> Result<(Value, usize)> {
275 if data.len() < 8 {
276 return Err(Error::schema("Insufficient data for timestamp".to_string()));
277 }
278 let mut bytes = [0u8; 8];
279 bytes.copy_from_slice(&data[0..8]);
280 let millis = i64::from_be_bytes(bytes);
281 Ok((Value::Timestamp(millis), 8))
282 }
283
284 fn parse_uuid(&self, data: &[u8]) -> Result<(Value, usize)> {
285 if data.len() < 16 {
286 return Err(Error::schema("Insufficient data for UUID".to_string()));
287 }
288 let uuid_bytes: [u8; 16] = data[0..16]
289 .try_into()
290 .map_err(|_| Error::schema("Invalid UUID bytes".to_string()))?;
291 Ok((Value::Uuid(uuid_bytes), 16))
292 }
293
294 fn parse_list(
295 &self,
296 data: &[u8],
297 elem_type: &CqlType,
298 _comparator: &ComparatorType,
299 ) -> Result<(Value, usize)> {
300 let mut offset = 0;
301
302 if data.len() < 4 {
304 return Err(Error::schema("Insufficient data for list size".to_string()));
305 }
306 let count = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
307 offset += 4;
308
309 let mut elements = Vec::with_capacity(count);
310 let elem_comparator = ComparatorType::from_cql_type(elem_type)?;
311
312 for _ in 0..count {
313 let (value, consumed) =
314 self.parse_typed_value(&data[offset..], elem_type, &elem_comparator)?;
315 elements.push(value);
316 offset += consumed;
317 }
318
319 Ok((Value::List(elements), offset))
320 }
321
322 fn parse_set(
323 &self,
324 data: &[u8],
325 elem_type: &CqlType,
326 _comparator: &ComparatorType,
327 ) -> Result<(Value, usize)> {
328 let mut offset = 0;
329
330 if data.len() < 4 {
332 return Err(Error::schema("Insufficient data for set size".to_string()));
333 }
334 let count = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
335 offset += 4;
336
337 let mut elements = Vec::with_capacity(count);
338 let elem_comparator = ComparatorType::from_cql_type(elem_type)?;
339
340 for _ in 0..count {
341 let (value, consumed) =
342 self.parse_typed_value(&data[offset..], elem_type, &elem_comparator)?;
343 elements.push(value);
344 offset += consumed;
345 }
346
347 Ok((Value::Set(elements), offset))
348 }
349
350 fn parse_map(
351 &self,
352 data: &[u8],
353 key_type: &CqlType,
354 val_type: &CqlType,
355 _comparator: &ComparatorType,
356 ) -> Result<(Value, usize)> {
357 let mut offset = 0;
358
359 if data.len() < 4 {
361 return Err(Error::schema("Insufficient data for map size".to_string()));
362 }
363 let count = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
364 offset += 4;
365
366 let mut map = Vec::with_capacity(count);
367 let key_comparator = ComparatorType::from_cql_type(key_type)?;
368 let val_comparator = ComparatorType::from_cql_type(val_type)?;
369
370 for _ in 0..count {
371 let (key, key_consumed) =
372 self.parse_typed_value(&data[offset..], key_type, &key_comparator)?;
373 offset += key_consumed;
374
375 let (value, val_consumed) =
376 self.parse_typed_value(&data[offset..], val_type, &val_comparator)?;
377 offset += val_consumed;
378
379 map.push((key, value));
380 }
381
382 Ok((Value::Map(map), offset))
383 }
384
385 fn parse_tuple(
386 &self,
387 data: &[u8],
388 field_types: &[CqlType],
389 _comparator: &ComparatorType,
390 ) -> Result<(Value, usize)> {
391 let mut offset = 0;
392 let mut values = Vec::with_capacity(field_types.len());
393
394 for field_type in field_types {
395 let field_comparator = ComparatorType::from_cql_type(field_type)?;
396 let (value, consumed) =
397 self.parse_typed_value(&data[offset..], field_type, &field_comparator)?;
398 values.push(value);
399 offset += consumed;
400 }
401
402 Ok((Value::Tuple(values), offset))
403 }
404
405 fn parse_udt(
406 &self,
407 data: &[u8],
408 type_name: &str,
409 fields: &[(String, CqlType)],
410 _comparator: &ComparatorType,
411 ) -> Result<(Value, usize)> {
412 let mut offset = 0;
413 let mut field_values = Vec::with_capacity(fields.len());
414
415 for (field_name, field_type) in fields {
416 let field_comparator = ComparatorType::from_cql_type(field_type)?;
417
418 if data.len() >= offset + 4 {
420 let field_len = i32::from_be_bytes([
421 data[offset],
422 data[offset + 1],
423 data[offset + 2],
424 data[offset + 3],
425 ]);
426
427 if field_len < 0 {
428 field_values.push(crate::types::UdtField {
430 name: field_name.clone(),
431 value: None,
432 });
433 offset += 4;
434 continue;
435 }
436 }
437
438 let (value, consumed) =
442 self.parse_typed_value(&data[offset..], field_type, &field_comparator)?;
443 field_values.push(crate::types::UdtField {
444 name: field_name.clone(),
445 value: Some(value),
446 });
447 offset += consumed;
448 }
449
450 Ok((
451 Value::Udt(crate::types::UdtValue {
452 type_name: type_name.to_string(),
453 keyspace: self.context.schema.keyspace.clone(),
454 fields: field_values,
455 }),
456 offset,
457 ))
458 }
459
460 fn parse_frozen(
461 &self,
462 data: &[u8],
463 inner_type: &CqlType,
464 _comparator: &ComparatorType,
465 ) -> Result<(Value, usize)> {
466 let inner_comparator = ComparatorType::from_cql_type(inner_type)?;
468 let (inner_value, consumed) =
469 self.parse_typed_value(data, inner_type, &inner_comparator)?;
470 Ok((Value::Frozen(Box::new(inner_value)), consumed))
471 }
472
473 pub fn parse_row(&self, data: &[u8]) -> Result<HashMap<String, Value>> {
476 let mut row = HashMap::new();
477 let mut offset = 0;
478
479 for column in &self.context.schema.columns {
480 if offset >= data.len() {
481 row.insert(column.name.clone(), Value::Null);
483 continue;
484 }
485
486 if data.len() >= offset + 4 {
488 let value_len = i32::from_be_bytes([
489 data[offset],
490 data[offset + 1],
491 data[offset + 2],
492 data[offset + 3],
493 ]);
494
495 if value_len < 0 {
496 row.insert(column.name.clone(), Value::Null);
497 offset += 4;
498 continue;
499 }
500 }
501
502 let (value, consumed) = self.parse_column_value(&column.name, &data[offset..])?;
504 row.insert(column.name.clone(), value);
505 offset += consumed;
506 }
507
508 Ok(row)
509 }
510
511 #[allow(clippy::only_used_in_recursion)]
513 fn comparator_to_cql_type(&self, comparator: &ComparatorType) -> Result<CqlType> {
514 match comparator {
515 ComparatorType::Boolean => Ok(CqlType::Boolean),
516 ComparatorType::TinyInt => Ok(CqlType::TinyInt),
517 ComparatorType::SmallInt => Ok(CqlType::SmallInt),
518 ComparatorType::Int => Ok(CqlType::Int),
519 ComparatorType::BigInt => Ok(CqlType::BigInt),
520 ComparatorType::Counter => Ok(CqlType::Counter),
521 ComparatorType::Float32 => Ok(CqlType::Float),
522 ComparatorType::Float => Ok(CqlType::Double),
523 ComparatorType::Text => Ok(CqlType::Text),
524 ComparatorType::Blob => Ok(CqlType::Blob),
525 ComparatorType::Timestamp => Ok(CqlType::Timestamp),
526 ComparatorType::Uuid => Ok(CqlType::Uuid),
527 ComparatorType::Varint => Ok(CqlType::Custom("varint".to_string())),
528 ComparatorType::Decimal => Ok(CqlType::Decimal),
529 ComparatorType::Duration => Ok(CqlType::Duration),
530 ComparatorType::Date => Ok(CqlType::Date),
531 ComparatorType::Json => Ok(CqlType::Custom("json".to_string())),
532 ComparatorType::List(elem_comparator) => {
533 let elem_type = self.comparator_to_cql_type(elem_comparator)?;
534 Ok(CqlType::List(Box::new(elem_type)))
535 }
536 ComparatorType::Set(elem_comparator) => {
537 let elem_type = self.comparator_to_cql_type(elem_comparator)?;
538 Ok(CqlType::Set(Box::new(elem_type)))
539 }
540 ComparatorType::Map(key_comparator, val_comparator) => {
541 let key_type = self.comparator_to_cql_type(key_comparator)?;
542 let val_type = self.comparator_to_cql_type(val_comparator)?;
543 Ok(CqlType::Map(Box::new(key_type), Box::new(val_type)))
544 }
545 ComparatorType::Tuple(field_comparators) => {
546 let mut field_types = Vec::new();
547 for field_comparator in field_comparators {
548 field_types.push(self.comparator_to_cql_type(field_comparator)?);
549 }
550 Ok(CqlType::Tuple(field_types))
551 }
552 ComparatorType::Udt {
553 type_name,
554 field_comparators,
555 ..
556 } => {
557 let mut fields = Vec::new();
558 for (field_name, field_comparator) in field_comparators {
559 let field_type = self.comparator_to_cql_type(field_comparator)?;
560 fields.push((field_name.clone(), field_type));
561 }
562 Ok(CqlType::Udt(type_name.clone(), fields))
563 }
564 ComparatorType::Frozen(inner_comparator) => {
565 let inner_type = self.comparator_to_cql_type(inner_comparator)?;
566 Ok(CqlType::Frozen(Box::new(inner_type)))
567 }
568 ComparatorType::Custom(type_name) => Ok(CqlType::Custom(type_name.clone())),
569 }
570 }
571}
572
573#[cfg(test)]
574#[path = "parser_tests.rs"]
575mod tests;