1use std::collections::HashMap;
7
8use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
9
10use crate::error::{Error, Result};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum RedisType {
15 Utf8,
17 Int64,
19 Float64,
21 Boolean,
23 Date,
25 Datetime,
27}
28
29impl RedisType {
30 pub fn to_arrow_type(&self) -> DataType {
32 match self {
33 RedisType::Utf8 => DataType::Utf8,
34 RedisType::Int64 => DataType::Int64,
35 RedisType::Float64 => DataType::Float64,
36 RedisType::Boolean => DataType::Boolean,
37 RedisType::Date => DataType::Date32,
38 RedisType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
39 }
40 }
41
42 pub fn parse(&self, value: &str) -> Result<TypedValue> {
44 match self {
45 RedisType::Utf8 => Ok(TypedValue::Utf8(value.to_string())),
46 RedisType::Int64 => value.parse::<i64>().map(TypedValue::Int64).map_err(|e| {
47 Error::TypeConversion(format!("Failed to parse '{}' as i64: {}", value, e))
48 }),
49 RedisType::Float64 => value.parse::<f64>().map(TypedValue::Float64).map_err(|e| {
50 Error::TypeConversion(format!("Failed to parse '{}' as f64: {}", value, e))
51 }),
52 RedisType::Boolean => parse_boolean(value)
53 .map(TypedValue::Boolean)
54 .ok_or_else(|| {
55 Error::TypeConversion(format!("Failed to parse '{}' as boolean", value))
56 }),
57 RedisType::Date => parse_date(value).map(TypedValue::Date).ok_or_else(|| {
58 Error::TypeConversion(format!("Failed to parse '{}' as date", value))
59 }),
60 RedisType::Datetime => {
61 parse_datetime(value)
62 .map(TypedValue::Datetime)
63 .ok_or_else(|| {
64 Error::TypeConversion(format!("Failed to parse '{}' as datetime", value))
65 })
66 }
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq)]
73pub enum TypedValue {
74 Utf8(String),
75 Int64(i64),
76 Float64(f64),
77 Boolean(bool),
78 Date(i32),
80 Datetime(i64),
82}
83
84fn parse_boolean(s: &str) -> Option<bool> {
88 match s.to_lowercase().as_str() {
89 "true" | "1" | "yes" | "t" | "y" => Some(true),
90 "false" | "0" | "no" | "f" | "n" => Some(false),
91 _ => None,
92 }
93}
94
95pub fn parse_date(s: &str) -> Option<i32> {
101 if let Ok(days) = s.parse::<i32>() {
103 return Some(days);
104 }
105
106 if s.len() >= 10 {
108 let parts: Vec<&str> = s.split('-').collect();
109 if parts.len() >= 3
110 && let (Ok(year), Ok(month), Ok(day)) = (
111 parts[0].parse::<i32>(),
112 parts[1].parse::<u32>(),
113 parts[2].chars().take(2).collect::<String>().parse::<u32>(),
114 )
115 {
116 return days_since_epoch(year, month, day);
119 }
120 }
121
122 None
123}
124
125fn days_since_epoch(year: i32, month: u32, day: u32) -> Option<i32> {
127 if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
129 return None;
130 }
131
132 let days_in_month = [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
134
135 let is_leap = |y: i32| (y % 4 == 0 && y % 100 != 0) || (y % 400 == 0);
136
137 let mut total_days: i32 = 0;
139
140 if year >= 1970 {
141 for y in 1970..year {
142 total_days += if is_leap(y) { 366 } else { 365 };
143 }
144 } else {
145 for y in year..1970 {
146 total_days -= if is_leap(y) { 366 } else { 365 };
147 }
148 }
149
150 for m in 1..month {
152 total_days += days_in_month[m as usize];
153 if m == 2 && is_leap(year) {
154 total_days += 1;
155 }
156 }
157
158 total_days += day as i32 - 1;
160
161 Some(total_days)
162}
163
164pub fn parse_datetime(s: &str) -> Option<i64> {
172 let s = s.trim();
173
174 if let Ok(ts) = s.parse::<i64>() {
176 if ts < 10_000_000_000 {
181 return Some(ts * 1_000_000);
183 } else if ts < 10_000_000_000_000 {
184 return Some(ts * 1_000);
186 } else {
187 return Some(ts);
189 }
190 }
191
192 parse_iso8601_datetime(s)
194}
195
196fn parse_iso8601_datetime(s: &str) -> Option<i64> {
198 let s = s.trim_end_matches('Z');
200
201 let parts: Vec<&str> = s.split('T').collect();
203 if parts.len() != 2 {
204 let parts: Vec<&str> = s.split(' ').collect();
206 if parts.len() != 2 {
207 return None;
208 }
209 return parse_datetime_parts(parts[0], parts[1]);
210 }
211
212 parse_datetime_parts(parts[0], parts[1])
213}
214
215fn parse_datetime_parts(date_str: &str, time_str: &str) -> Option<i64> {
217 let date_parts: Vec<&str> = date_str.split('-').collect();
219 if date_parts.len() != 3 {
220 return None;
221 }
222
223 let year = date_parts[0].parse::<i32>().ok()?;
224 let month = date_parts[1].parse::<u32>().ok()?;
225 let day = date_parts[2].parse::<u32>().ok()?;
226
227 let time_str = time_str.split('+').next()?.split('-').next()?; let time_parts: Vec<&str> = time_str.split(':').collect();
230 if time_parts.len() < 2 {
231 return None;
232 }
233
234 let hour = time_parts[0].parse::<u32>().ok()?;
235 let minute = time_parts[1].parse::<u32>().ok()?;
236
237 let (second, microsecond) = if time_parts.len() >= 3 {
238 let sec_parts: Vec<&str> = time_parts[2].split('.').collect();
239 let sec = sec_parts[0].parse::<u32>().ok()?;
240 let usec = if sec_parts.len() > 1 {
241 let frac = sec_parts[1];
243 let padded = format!("{:0<6}", frac);
244 padded[..6].parse::<u32>().unwrap_or(0)
245 } else {
246 0
247 };
248 (sec, usec)
249 } else {
250 (0, 0)
251 };
252
253 let days = days_since_epoch(year, month, day)?;
255
256 let day_us = days as i64 * 24 * 60 * 60 * 1_000_000;
258 let time_us =
259 (hour as i64 * 3600 + minute as i64 * 60 + second as i64) * 1_000_000 + microsecond as i64;
260
261 Some(day_us + time_us)
262}
263
264#[derive(Debug, Clone)]
291pub struct HashSchema {
292 fields: Vec<String>,
294 types: HashMap<String, RedisType>,
296 include_key: bool,
298 key_column_name: String,
300 include_ttl: bool,
302 ttl_column_name: String,
304 include_row_index: bool,
306 row_index_column_name: String,
308}
309
310impl HashSchema {
311 pub fn new(field_types: Vec<(String, RedisType)>) -> Self {
313 let fields: Vec<String> = field_types.iter().map(|(name, _)| name.clone()).collect();
314 let types: HashMap<String, RedisType> = field_types.into_iter().collect();
315
316 Self {
317 fields,
318 types,
319 include_key: true,
320 key_column_name: "_key".to_string(),
321 include_ttl: false,
322 ttl_column_name: "_ttl".to_string(),
323 include_row_index: false,
324 row_index_column_name: "_index".to_string(),
325 }
326 }
327
328 pub fn with_key(mut self, include: bool) -> Self {
330 self.include_key = include;
331 self
332 }
333
334 pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
336 self.key_column_name = name.into();
337 self
338 }
339
340 pub fn with_ttl(mut self, include: bool) -> Self {
342 self.include_ttl = include;
343 self
344 }
345
346 pub fn with_ttl_column_name(mut self, name: impl Into<String>) -> Self {
348 self.ttl_column_name = name.into();
349 self
350 }
351
352 pub fn with_row_index(mut self, include: bool) -> Self {
354 self.include_row_index = include;
355 self
356 }
357
358 pub fn with_row_index_column_name(mut self, name: impl Into<String>) -> Self {
360 self.row_index_column_name = name.into();
361 self
362 }
363
364 pub fn fields(&self) -> &[String] {
366 &self.fields
367 }
368
369 pub fn field_type(&self, name: &str) -> Option<RedisType> {
371 self.types.get(name).copied()
372 }
373
374 pub fn include_key(&self) -> bool {
376 self.include_key
377 }
378
379 pub fn key_column_name(&self) -> &str {
381 &self.key_column_name
382 }
383
384 pub fn include_ttl(&self) -> bool {
386 self.include_ttl
387 }
388
389 pub fn ttl_column_name(&self) -> &str {
391 &self.ttl_column_name
392 }
393
394 pub fn include_row_index(&self) -> bool {
396 self.include_row_index
397 }
398
399 pub fn row_index_column_name(&self) -> &str {
401 &self.row_index_column_name
402 }
403
404 pub fn to_arrow_schema(&self) -> Schema {
406 let mut arrow_fields: Vec<Field> = Vec::with_capacity(self.fields.len() + 3);
407
408 if self.include_row_index {
410 arrow_fields.push(Field::new(
411 &self.row_index_column_name,
412 DataType::UInt64,
413 false,
414 ));
415 }
416
417 if self.include_key {
419 arrow_fields.push(Field::new(&self.key_column_name, DataType::Utf8, false));
420 }
421
422 if self.include_ttl {
424 arrow_fields.push(Field::new(&self.ttl_column_name, DataType::Int64, true));
425 }
426
427 for field_name in &self.fields {
429 if let Some(redis_type) = self.types.get(field_name) {
430 arrow_fields.push(Field::new(field_name, redis_type.to_arrow_type(), true));
432 }
433 }
434
435 Schema::new(arrow_fields)
436 }
437
438 pub fn project(&self, columns: &[String]) -> Self {
440 let projected_fields: Vec<String> = columns
441 .iter()
442 .filter(|c| {
443 self.types.contains_key(*c)
445 })
446 .cloned()
447 .collect();
448
449 let projected_types: HashMap<String, RedisType> = projected_fields
450 .iter()
451 .filter_map(|f| self.types.get(f).map(|t| (f.clone(), *t)))
452 .collect();
453
454 let include_key = self.include_key && columns.contains(&self.key_column_name);
456
457 let include_ttl = self.include_ttl && columns.contains(&self.ttl_column_name);
459
460 let include_row_index =
462 self.include_row_index && columns.contains(&self.row_index_column_name);
463
464 Self {
465 fields: projected_fields,
466 types: projected_types,
467 include_key,
468 key_column_name: self.key_column_name.clone(),
469 include_ttl,
470 ttl_column_name: self.ttl_column_name.clone(),
471 include_row_index,
472 row_index_column_name: self.row_index_column_name.clone(),
473 }
474 }
475}
476
477impl Default for HashSchema {
478 fn default() -> Self {
479 Self {
480 fields: Vec::new(),
481 types: HashMap::new(),
482 include_key: true,
483 key_column_name: "_key".to_string(),
484 include_ttl: false,
485 ttl_column_name: "_ttl".to_string(),
486 include_row_index: false,
487 row_index_column_name: "_index".to_string(),
488 }
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495
496 #[test]
497 fn test_redis_type_to_arrow() {
498 assert_eq!(RedisType::Utf8.to_arrow_type(), DataType::Utf8);
499 assert_eq!(RedisType::Int64.to_arrow_type(), DataType::Int64);
500 assert_eq!(RedisType::Float64.to_arrow_type(), DataType::Float64);
501 assert_eq!(RedisType::Boolean.to_arrow_type(), DataType::Boolean);
502 assert_eq!(RedisType::Date.to_arrow_type(), DataType::Date32);
503 assert_eq!(
504 RedisType::Datetime.to_arrow_type(),
505 DataType::Timestamp(TimeUnit::Microsecond, None)
506 );
507 }
508
509 #[test]
510 fn test_parse_int64() {
511 assert_eq!(RedisType::Int64.parse("42").unwrap(), TypedValue::Int64(42));
512 assert_eq!(
513 RedisType::Int64.parse("-100").unwrap(),
514 TypedValue::Int64(-100)
515 );
516 assert!(RedisType::Int64.parse("not_a_number").is_err());
517 }
518
519 #[test]
520 fn test_parse_float64() {
521 assert_eq!(
522 RedisType::Float64.parse("3.5").unwrap(),
523 TypedValue::Float64(3.5)
524 );
525 assert_eq!(
526 RedisType::Float64.parse("-0.5").unwrap(),
527 TypedValue::Float64(-0.5)
528 );
529 assert!(RedisType::Float64.parse("not_a_float").is_err());
530 }
531
532 #[test]
533 fn test_parse_boolean() {
534 assert_eq!(
535 RedisType::Boolean.parse("true").unwrap(),
536 TypedValue::Boolean(true)
537 );
538 assert_eq!(
539 RedisType::Boolean.parse("FALSE").unwrap(),
540 TypedValue::Boolean(false)
541 );
542 assert_eq!(
543 RedisType::Boolean.parse("1").unwrap(),
544 TypedValue::Boolean(true)
545 );
546 assert_eq!(
547 RedisType::Boolean.parse("0").unwrap(),
548 TypedValue::Boolean(false)
549 );
550 assert_eq!(
551 RedisType::Boolean.parse("yes").unwrap(),
552 TypedValue::Boolean(true)
553 );
554 assert_eq!(
555 RedisType::Boolean.parse("no").unwrap(),
556 TypedValue::Boolean(false)
557 );
558 assert!(RedisType::Boolean.parse("maybe").is_err());
559 }
560
561 #[test]
562 fn test_hash_schema_creation() {
563 let schema = HashSchema::new(vec![
564 ("name".to_string(), RedisType::Utf8),
565 ("age".to_string(), RedisType::Int64),
566 ]);
567
568 assert_eq!(schema.fields(), &["name", "age"]);
569 assert_eq!(schema.field_type("name"), Some(RedisType::Utf8));
570 assert_eq!(schema.field_type("age"), Some(RedisType::Int64));
571 assert_eq!(schema.field_type("missing"), None);
572 }
573
574 #[test]
575 fn test_hash_schema_to_arrow() {
576 let schema = HashSchema::new(vec![
577 ("name".to_string(), RedisType::Utf8),
578 ("age".to_string(), RedisType::Int64),
579 ("active".to_string(), RedisType::Boolean),
580 ]);
581
582 let arrow_schema = schema.to_arrow_schema();
583 assert_eq!(arrow_schema.fields().len(), 4); assert_eq!(arrow_schema.field(0).name(), "_key");
586 assert_eq!(arrow_schema.field(0).data_type(), &DataType::Utf8);
587
588 assert_eq!(arrow_schema.field(1).name(), "name");
589 assert_eq!(arrow_schema.field(2).name(), "age");
590 assert_eq!(arrow_schema.field(3).name(), "active");
591 }
592
593 #[test]
594 fn test_hash_schema_without_key() {
595 let schema = HashSchema::new(vec![("name".to_string(), RedisType::Utf8)]).with_key(false);
596
597 let arrow_schema = schema.to_arrow_schema();
598 assert_eq!(arrow_schema.fields().len(), 1);
599 assert_eq!(arrow_schema.field(0).name(), "name");
600 }
601
602 #[test]
603 fn test_hash_schema_projection() {
604 let schema = HashSchema::new(vec![
605 ("name".to_string(), RedisType::Utf8),
606 ("age".to_string(), RedisType::Int64),
607 ("email".to_string(), RedisType::Utf8),
608 ]);
609
610 let projected = schema.project(&["name".to_string(), "email".to_string()]);
612 assert_eq!(projected.fields(), &["name", "email"]);
613 assert!(!projected.include_key()); let projected_with_key = schema.project(&["_key".to_string(), "name".to_string()]);
617 assert_eq!(projected_with_key.fields(), &["name"]);
618 assert!(projected_with_key.include_key());
619 }
620
621 #[test]
622 fn test_parse_date_iso() {
623 let result = RedisType::Date.parse("2024-01-15").unwrap();
625 assert!(matches!(result, TypedValue::Date(_)));
626 if let TypedValue::Date(days) = result {
627 assert!(days > 19000 && days < 20000);
629 }
630 }
631
632 #[test]
633 fn test_parse_date_epoch_days() {
634 assert_eq!(
635 RedisType::Date.parse("19737").unwrap(),
636 TypedValue::Date(19737)
637 );
638 assert_eq!(RedisType::Date.parse("0").unwrap(), TypedValue::Date(0));
639 }
640
641 #[test]
642 fn test_parse_date_invalid() {
643 assert!(RedisType::Date.parse("not-a-date").is_err());
644 assert!(RedisType::Date.parse("2024-13-01").is_err()); assert!(RedisType::Date.parse("2024-01-32").is_err()); }
647
648 #[test]
649 fn test_parse_datetime_iso() {
650 let result = RedisType::Datetime.parse("2024-01-15T10:30:00").unwrap();
652 assert!(matches!(result, TypedValue::Datetime(_)));
653
654 let result = RedisType::Datetime.parse("2024-01-15T10:30:00Z").unwrap();
656 assert!(matches!(result, TypedValue::Datetime(_)));
657
658 let result = RedisType::Datetime
660 .parse("2024-01-15T10:30:00.123456Z")
661 .unwrap();
662 assert!(matches!(result, TypedValue::Datetime(_)));
663 }
664
665 #[test]
666 fn test_parse_datetime_unix_seconds() {
667 let result = RedisType::Datetime.parse("1705315800").unwrap();
669 if let TypedValue::Datetime(us) = result {
670 assert_eq!(us, 1_705_315_800_000_000);
672 } else {
673 panic!("Expected Datetime");
674 }
675 }
676
677 #[test]
678 fn test_parse_datetime_unix_milliseconds() {
679 let result = RedisType::Datetime.parse("1705315800000").unwrap();
680 if let TypedValue::Datetime(us) = result {
681 assert_eq!(us, 1_705_315_800_000_000);
683 } else {
684 panic!("Expected Datetime");
685 }
686 }
687
688 #[test]
689 fn test_parse_datetime_unix_microseconds() {
690 let result = RedisType::Datetime.parse("1705315800000000").unwrap();
691 if let TypedValue::Datetime(us) = result {
692 assert_eq!(us, 1_705_315_800_000_000);
694 } else {
695 panic!("Expected Datetime");
696 }
697 }
698
699 #[test]
700 fn test_parse_datetime_invalid() {
701 assert!(RedisType::Datetime.parse("not-a-datetime").is_err());
702 assert!(RedisType::Datetime.parse("2024-01-15").is_err()); }
704
705 #[test]
706 fn test_days_since_epoch() {
707 assert_eq!(days_since_epoch(1970, 1, 1), Some(0));
709
710 assert_eq!(days_since_epoch(1970, 1, 2), Some(1));
712
713 assert_eq!(days_since_epoch(1971, 1, 1), Some(365));
715
716 assert_eq!(days_since_epoch(1972, 1, 1), Some(730));
718
719 assert_eq!(days_since_epoch(1973, 1, 1), Some(1096));
721 }
722
723 #[test]
728 fn test_parse_int64_edge_cases() {
729 assert_eq!(
731 RedisType::Int64.parse("9223372036854775807").unwrap(),
732 TypedValue::Int64(i64::MAX)
733 );
734 assert_eq!(
735 RedisType::Int64.parse("-9223372036854775808").unwrap(),
736 TypedValue::Int64(i64::MIN)
737 );
738
739 assert!(RedisType::Int64.parse("9223372036854775808").is_err());
741 assert!(RedisType::Int64.parse("-9223372036854775809").is_err());
742
743 assert!(RedisType::Int64.parse(" 42").is_err());
745 assert!(RedisType::Int64.parse("42 ").is_err());
746
747 assert!(RedisType::Int64.parse("").is_err());
749
750 assert_eq!(RedisType::Int64.parse("0").unwrap(), TypedValue::Int64(0));
752 assert_eq!(RedisType::Int64.parse("-0").unwrap(), TypedValue::Int64(0));
753 }
754
755 #[test]
756 fn test_parse_float64_edge_cases() {
757 assert!(matches!(
759 RedisType::Float64.parse("1e308").unwrap(),
760 TypedValue::Float64(_)
761 ));
762 assert!(matches!(
763 RedisType::Float64.parse("1e-308").unwrap(),
764 TypedValue::Float64(_)
765 ));
766
767 let inf = RedisType::Float64.parse("inf");
769 assert!(inf.is_ok());
770 if let TypedValue::Float64(v) = inf.unwrap() {
771 assert!(v.is_infinite() && v.is_sign_positive());
772 }
773
774 let neg_inf = RedisType::Float64.parse("-inf");
775 assert!(neg_inf.is_ok());
776 if let TypedValue::Float64(v) = neg_inf.unwrap() {
777 assert!(v.is_infinite() && v.is_sign_negative());
778 }
779
780 let nan = RedisType::Float64.parse("NaN");
782 assert!(nan.is_ok());
783 if let TypedValue::Float64(v) = nan.unwrap() {
784 assert!(v.is_nan());
785 }
786
787 assert!(RedisType::Float64.parse("").is_err());
789
790 assert_eq!(
792 RedisType::Float64.parse("1.5e2").unwrap(),
793 TypedValue::Float64(150.0)
794 );
795 }
796
797 #[test]
798 fn test_parse_utf8_edge_cases() {
799 assert_eq!(
801 RedisType::Utf8.parse("").unwrap(),
802 TypedValue::Utf8("".to_string())
803 );
804
805 assert_eq!(
807 RedisType::Utf8.parse("Hello").unwrap(),
808 TypedValue::Utf8("Hello".to_string())
809 );
810
811 let long_string = "x".repeat(100_000);
813 assert_eq!(
814 RedisType::Utf8.parse(&long_string).unwrap(),
815 TypedValue::Utf8(long_string)
816 );
817
818 assert_eq!(
820 RedisType::Utf8.parse("\n\t\r").unwrap(),
821 TypedValue::Utf8("\n\t\r".to_string())
822 );
823 }
824
825 #[test]
826 fn test_hash_schema_empty_fields() {
827 let schema = HashSchema::new(vec![]);
828 assert!(schema.fields().is_empty());
829
830 let arrow_schema = schema.to_arrow_schema();
831 assert_eq!(arrow_schema.fields().len(), 1);
833 }
834
835 #[test]
836 fn test_hash_schema_duplicate_field_names() {
837 let schema = HashSchema::new(vec![
839 ("name".to_string(), RedisType::Utf8),
840 ("name".to_string(), RedisType::Int64),
841 ]);
842
843 assert_eq!(schema.fields().len(), 2);
845 }
846
847 #[test]
848 fn test_hash_schema_special_field_names() {
849 let schema = HashSchema::new(vec![
851 ("field-with-dashes".to_string(), RedisType::Utf8),
852 ("field.with.dots".to_string(), RedisType::Int64),
853 ("field:with:colons".to_string(), RedisType::Float64),
854 ("".to_string(), RedisType::Boolean), ]);
856
857 assert_eq!(schema.fields().len(), 4);
858 assert_eq!(
859 schema.field_type("field-with-dashes"),
860 Some(RedisType::Utf8)
861 );
862 }
863
864 #[test]
865 fn test_projection_empty() {
866 let schema = HashSchema::new(vec![
867 ("name".to_string(), RedisType::Utf8),
868 ("age".to_string(), RedisType::Int64),
869 ]);
870
871 let projected = schema.project(&[]);
872 assert!(projected.fields().is_empty());
873 assert!(!projected.include_key());
874 }
875
876 #[test]
877 fn test_projection_nonexistent_fields() {
878 let schema = HashSchema::new(vec![("name".to_string(), RedisType::Utf8)]);
879
880 let projected = schema.project(&["nonexistent".to_string()]);
882 assert!(projected.fields().is_empty());
883 }
884}