polars_redis/
schema.rs

1//! Schema definitions for Redis data mapping to Arrow/Polars types.
2//!
3//! Redis stores everything as strings, so we need a schema to know how to
4//! interpret the data when converting to Arrow arrays.
5
6use std::collections::HashMap;
7
8use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
9
10use crate::error::{Error, Result};
11
12/// Supported data types for Redis field conversion.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum RedisType {
15    /// UTF-8 string (no conversion needed).
16    Utf8,
17    /// 64-bit signed integer.
18    Int64,
19    /// 64-bit floating point.
20    Float64,
21    /// Boolean (parsed from "true"/"false", "1"/"0", etc.).
22    Boolean,
23    /// Date (days since epoch, parsed from "YYYY-MM-DD" or epoch days).
24    Date,
25    /// Datetime with microsecond precision (parsed from ISO 8601 or Unix timestamp).
26    Datetime,
27}
28
29impl RedisType {
30    /// Convert to Arrow DataType.
31    pub fn to_arrow_type(&self) -> DataType {
32        match self {
33            RedisType::Utf8 => DataType::Utf8,
34            RedisType::Int64 => DataType::Int64,
35            RedisType::Float64 => DataType::Float64,
36            RedisType::Boolean => DataType::Boolean,
37            RedisType::Date => DataType::Date32,
38            RedisType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
39        }
40    }
41
42    /// Parse a string value according to this type.
43    pub fn parse(&self, value: &str) -> Result<TypedValue> {
44        match self {
45            RedisType::Utf8 => Ok(TypedValue::Utf8(value.to_string())),
46            RedisType::Int64 => value.parse::<i64>().map(TypedValue::Int64).map_err(|e| {
47                Error::TypeConversion(format!("Failed to parse '{}' as i64: {}", value, e))
48            }),
49            RedisType::Float64 => value.parse::<f64>().map(TypedValue::Float64).map_err(|e| {
50                Error::TypeConversion(format!("Failed to parse '{}' as f64: {}", value, e))
51            }),
52            RedisType::Boolean => parse_boolean(value)
53                .map(TypedValue::Boolean)
54                .ok_or_else(|| {
55                    Error::TypeConversion(format!("Failed to parse '{}' as boolean", value))
56                }),
57            RedisType::Date => parse_date(value).map(TypedValue::Date).ok_or_else(|| {
58                Error::TypeConversion(format!("Failed to parse '{}' as date", value))
59            }),
60            RedisType::Datetime => {
61                parse_datetime(value)
62                    .map(TypedValue::Datetime)
63                    .ok_or_else(|| {
64                        Error::TypeConversion(format!("Failed to parse '{}' as datetime", value))
65                    })
66            }
67        }
68    }
69}
70
71/// A typed value after parsing from Redis string.
72#[derive(Debug, Clone, PartialEq)]
73pub enum TypedValue {
74    Utf8(String),
75    Int64(i64),
76    Float64(f64),
77    Boolean(bool),
78    /// Date as days since Unix epoch (1970-01-01).
79    Date(i32),
80    /// Datetime as microseconds since Unix epoch.
81    Datetime(i64),
82}
83
84/// Parse a string as a boolean value.
85///
86/// Accepts: "true", "false", "1", "0", "yes", "no" (case-insensitive).
87fn parse_boolean(s: &str) -> Option<bool> {
88    match s.to_lowercase().as_str() {
89        "true" | "1" | "yes" | "t" | "y" => Some(true),
90        "false" | "0" | "no" | "f" | "n" => Some(false),
91        _ => None,
92    }
93}
94
95/// Parse a string as a date (days since Unix epoch).
96///
97/// Accepts:
98/// - ISO 8601 date: "2024-01-15"
99/// - Epoch days as integer: "19738"
100pub fn parse_date(s: &str) -> Option<i32> {
101    // Try parsing as epoch days first (integer)
102    if let Ok(days) = s.parse::<i32>() {
103        return Some(days);
104    }
105
106    // Try parsing as YYYY-MM-DD
107    if s.len() >= 10 {
108        let parts: Vec<&str> = s.split('-').collect();
109        if parts.len() >= 3
110            && let (Ok(year), Ok(month), Ok(day)) = (
111                parts[0].parse::<i32>(),
112                parts[1].parse::<u32>(),
113                parts[2].chars().take(2).collect::<String>().parse::<u32>(),
114            )
115        {
116            // Calculate days since epoch (1970-01-01)
117            // This is a simplified calculation - not accounting for all edge cases
118            return days_since_epoch(year, month, day);
119        }
120    }
121
122    None
123}
124
125/// Calculate days since Unix epoch for a given date.
126fn days_since_epoch(year: i32, month: u32, day: u32) -> Option<i32> {
127    // Validate basic ranges
128    if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
129        return None;
130    }
131
132    // Days in each month (non-leap year)
133    let days_in_month = [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
134
135    let is_leap = |y: i32| (y % 4 == 0 && y % 100 != 0) || (y % 400 == 0);
136
137    // Count days from 1970 to this year
138    let mut total_days: i32 = 0;
139
140    if year >= 1970 {
141        for y in 1970..year {
142            total_days += if is_leap(y) { 366 } else { 365 };
143        }
144    } else {
145        for y in year..1970 {
146            total_days -= if is_leap(y) { 366 } else { 365 };
147        }
148    }
149
150    // Add days for months in current year
151    for m in 1..month {
152        total_days += days_in_month[m as usize];
153        if m == 2 && is_leap(year) {
154            total_days += 1;
155        }
156    }
157
158    // Add days in current month
159    total_days += day as i32 - 1;
160
161    Some(total_days)
162}
163
164/// Parse a string as a datetime (microseconds since Unix epoch).
165///
166/// Accepts:
167/// - ISO 8601 datetime: "2024-01-15T10:30:00", "2024-01-15T10:30:00Z", "2024-01-15T10:30:00.123456Z"
168/// - Unix timestamp (seconds): "1705315800"
169/// - Unix timestamp (milliseconds): "1705315800000"
170/// - Unix timestamp (microseconds): "1705315800000000"
171pub fn parse_datetime(s: &str) -> Option<i64> {
172    let s = s.trim();
173
174    // Try parsing as numeric timestamp
175    if let Ok(ts) = s.parse::<i64>() {
176        // Heuristic to detect timestamp unit:
177        // - < 1e10: seconds (up to year 2286)
178        // - < 1e13: milliseconds (up to year 2286)
179        // - >= 1e13: microseconds
180        if ts < 10_000_000_000 {
181            // Seconds -> microseconds
182            return Some(ts * 1_000_000);
183        } else if ts < 10_000_000_000_000 {
184            // Milliseconds -> microseconds
185            return Some(ts * 1_000);
186        } else {
187            // Already microseconds
188            return Some(ts);
189        }
190    }
191
192    // Try parsing ISO 8601 datetime
193    parse_iso8601_datetime(s)
194}
195
196/// Parse ISO 8601 datetime string to microseconds since epoch.
197fn parse_iso8601_datetime(s: &str) -> Option<i64> {
198    // Remove trailing Z if present
199    let s = s.trim_end_matches('Z');
200
201    // Split into date and time parts
202    let parts: Vec<&str> = s.split('T').collect();
203    if parts.len() != 2 {
204        // Try space separator
205        let parts: Vec<&str> = s.split(' ').collect();
206        if parts.len() != 2 {
207            return None;
208        }
209        return parse_datetime_parts(parts[0], parts[1]);
210    }
211
212    parse_datetime_parts(parts[0], parts[1])
213}
214
215/// Parse date and time parts into microseconds since epoch.
216fn parse_datetime_parts(date_str: &str, time_str: &str) -> Option<i64> {
217    // Parse date
218    let date_parts: Vec<&str> = date_str.split('-').collect();
219    if date_parts.len() != 3 {
220        return None;
221    }
222
223    let year = date_parts[0].parse::<i32>().ok()?;
224    let month = date_parts[1].parse::<u32>().ok()?;
225    let day = date_parts[2].parse::<u32>().ok()?;
226
227    // Parse time (handle fractional seconds)
228    let time_str = time_str.split('+').next()?.split('-').next()?; // Remove timezone offset
229    let time_parts: Vec<&str> = time_str.split(':').collect();
230    if time_parts.len() < 2 {
231        return None;
232    }
233
234    let hour = time_parts[0].parse::<u32>().ok()?;
235    let minute = time_parts[1].parse::<u32>().ok()?;
236
237    let (second, microsecond) = if time_parts.len() >= 3 {
238        let sec_parts: Vec<&str> = time_parts[2].split('.').collect();
239        let sec = sec_parts[0].parse::<u32>().ok()?;
240        let usec = if sec_parts.len() > 1 {
241            // Pad or truncate to 6 digits for microseconds
242            let frac = sec_parts[1];
243            let padded = format!("{:0<6}", frac);
244            padded[..6].parse::<u32>().unwrap_or(0)
245        } else {
246            0
247        };
248        (sec, usec)
249    } else {
250        (0, 0)
251    };
252
253    // Get days since epoch
254    let days = days_since_epoch(year, month, day)?;
255
256    // Convert to microseconds
257    let day_us = days as i64 * 24 * 60 * 60 * 1_000_000;
258    let time_us =
259        (hour as i64 * 3600 + minute as i64 * 60 + second as i64) * 1_000_000 + microsecond as i64;
260
261    Some(day_us + time_us)
262}
263
264/// Schema for a Redis hash, mapping field names to types.
265///
266/// Defines how Redis hash fields should be converted to Arrow/Polars columns.
267/// Each field is mapped to a [`RedisType`] which determines parsing behavior.
268///
269/// # Example
270///
271/// ```ignore
272/// use polars_redis::{HashSchema, RedisType};
273///
274/// let schema = HashSchema::new(vec![
275///     ("name".to_string(), RedisType::Utf8),
276///     ("age".to_string(), RedisType::Int64),
277///     ("score".to_string(), RedisType::Float64),
278///     ("active".to_string(), RedisType::Boolean),
279/// ])
280/// .with_key(true)
281/// .with_key_column_name("_key")
282/// .with_ttl(true);
283/// ```
284///
285/// # Optional Metadata Columns
286///
287/// - Key column: Include the Redis key as a column (default: true, name: "_key")
288/// - TTL column: Include the key's TTL in seconds (default: false, name: "_ttl")
289/// - Row index: Include a row number column (default: false, name: "_index")
290#[derive(Debug, Clone)]
291pub struct HashSchema {
292    /// Ordered list of field names.
293    fields: Vec<String>,
294    /// Map from field name to type.
295    types: HashMap<String, RedisType>,
296    /// Whether to include the Redis key as a column.
297    include_key: bool,
298    /// Name of the key column (if included).
299    key_column_name: String,
300    /// Whether to include the TTL as a column.
301    include_ttl: bool,
302    /// Name of the TTL column (if included).
303    ttl_column_name: String,
304    /// Whether to include the row index as a column.
305    include_row_index: bool,
306    /// Name of the row index column (if included).
307    row_index_column_name: String,
308}
309
310impl HashSchema {
311    /// Create a new HashSchema from a list of (field_name, type) pairs.
312    pub fn new(field_types: Vec<(String, RedisType)>) -> Self {
313        let fields: Vec<String> = field_types.iter().map(|(name, _)| name.clone()).collect();
314        let types: HashMap<String, RedisType> = field_types.into_iter().collect();
315
316        Self {
317            fields,
318            types,
319            include_key: true,
320            key_column_name: "_key".to_string(),
321            include_ttl: false,
322            ttl_column_name: "_ttl".to_string(),
323            include_row_index: false,
324            row_index_column_name: "_index".to_string(),
325        }
326    }
327
328    /// Set whether to include the Redis key as a column.
329    pub fn with_key(mut self, include: bool) -> Self {
330        self.include_key = include;
331        self
332    }
333
334    /// Set the name of the key column.
335    pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
336        self.key_column_name = name.into();
337        self
338    }
339
340    /// Set whether to include the TTL as a column.
341    pub fn with_ttl(mut self, include: bool) -> Self {
342        self.include_ttl = include;
343        self
344    }
345
346    /// Set the name of the TTL column.
347    pub fn with_ttl_column_name(mut self, name: impl Into<String>) -> Self {
348        self.ttl_column_name = name.into();
349        self
350    }
351
352    /// Set whether to include the row index as a column.
353    pub fn with_row_index(mut self, include: bool) -> Self {
354        self.include_row_index = include;
355        self
356    }
357
358    /// Set the name of the row index column.
359    pub fn with_row_index_column_name(mut self, name: impl Into<String>) -> Self {
360        self.row_index_column_name = name.into();
361        self
362    }
363
364    /// Get the ordered field names.
365    pub fn fields(&self) -> &[String] {
366        &self.fields
367    }
368
369    /// Get the type for a field.
370    pub fn field_type(&self, name: &str) -> Option<RedisType> {
371        self.types.get(name).copied()
372    }
373
374    /// Whether the key column is included.
375    pub fn include_key(&self) -> bool {
376        self.include_key
377    }
378
379    /// Get the key column name.
380    pub fn key_column_name(&self) -> &str {
381        &self.key_column_name
382    }
383
384    /// Whether the TTL column is included.
385    pub fn include_ttl(&self) -> bool {
386        self.include_ttl
387    }
388
389    /// Get the TTL column name.
390    pub fn ttl_column_name(&self) -> &str {
391        &self.ttl_column_name
392    }
393
394    /// Whether the row index column is included.
395    pub fn include_row_index(&self) -> bool {
396        self.include_row_index
397    }
398
399    /// Get the row index column name.
400    pub fn row_index_column_name(&self) -> &str {
401        &self.row_index_column_name
402    }
403
404    /// Convert to Arrow Schema.
405    pub fn to_arrow_schema(&self) -> Schema {
406        let mut arrow_fields: Vec<Field> = Vec::with_capacity(self.fields.len() + 3);
407
408        // Add row index column first if included
409        if self.include_row_index {
410            arrow_fields.push(Field::new(
411                &self.row_index_column_name,
412                DataType::UInt64,
413                false,
414            ));
415        }
416
417        // Add key column if included
418        if self.include_key {
419            arrow_fields.push(Field::new(&self.key_column_name, DataType::Utf8, false));
420        }
421
422        // Add TTL column if included (Int64, nullable - returns -1 for no TTL, -2 for missing key)
423        if self.include_ttl {
424            arrow_fields.push(Field::new(&self.ttl_column_name, DataType::Int64, true));
425        }
426
427        // Add data fields
428        for field_name in &self.fields {
429            if let Some(redis_type) = self.types.get(field_name) {
430                // Fields are nullable since Redis might not have all fields
431                arrow_fields.push(Field::new(field_name, redis_type.to_arrow_type(), true));
432            }
433        }
434
435        Schema::new(arrow_fields)
436    }
437
438    /// Get a subset schema with only the specified columns (for projection pushdown).
439    pub fn project(&self, columns: &[String]) -> Self {
440        let projected_fields: Vec<String> = columns
441            .iter()
442            .filter(|c| {
443                // Include if it's a data field (not the key column, TTL column, or row index column)
444                self.types.contains_key(*c)
445            })
446            .cloned()
447            .collect();
448
449        let projected_types: HashMap<String, RedisType> = projected_fields
450            .iter()
451            .filter_map(|f| self.types.get(f).map(|t| (f.clone(), *t)))
452            .collect();
453
454        // Check if key column is requested
455        let include_key = self.include_key && columns.contains(&self.key_column_name);
456
457        // Check if TTL column is requested
458        let include_ttl = self.include_ttl && columns.contains(&self.ttl_column_name);
459
460        // Check if row index column is requested
461        let include_row_index =
462            self.include_row_index && columns.contains(&self.row_index_column_name);
463
464        Self {
465            fields: projected_fields,
466            types: projected_types,
467            include_key,
468            key_column_name: self.key_column_name.clone(),
469            include_ttl,
470            ttl_column_name: self.ttl_column_name.clone(),
471            include_row_index,
472            row_index_column_name: self.row_index_column_name.clone(),
473        }
474    }
475}
476
477impl Default for HashSchema {
478    fn default() -> Self {
479        Self {
480            fields: Vec::new(),
481            types: HashMap::new(),
482            include_key: true,
483            key_column_name: "_key".to_string(),
484            include_ttl: false,
485            ttl_column_name: "_ttl".to_string(),
486            include_row_index: false,
487            row_index_column_name: "_index".to_string(),
488        }
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495
496    #[test]
497    fn test_redis_type_to_arrow() {
498        assert_eq!(RedisType::Utf8.to_arrow_type(), DataType::Utf8);
499        assert_eq!(RedisType::Int64.to_arrow_type(), DataType::Int64);
500        assert_eq!(RedisType::Float64.to_arrow_type(), DataType::Float64);
501        assert_eq!(RedisType::Boolean.to_arrow_type(), DataType::Boolean);
502        assert_eq!(RedisType::Date.to_arrow_type(), DataType::Date32);
503        assert_eq!(
504            RedisType::Datetime.to_arrow_type(),
505            DataType::Timestamp(TimeUnit::Microsecond, None)
506        );
507    }
508
509    #[test]
510    fn test_parse_int64() {
511        assert_eq!(RedisType::Int64.parse("42").unwrap(), TypedValue::Int64(42));
512        assert_eq!(
513            RedisType::Int64.parse("-100").unwrap(),
514            TypedValue::Int64(-100)
515        );
516        assert!(RedisType::Int64.parse("not_a_number").is_err());
517    }
518
519    #[test]
520    fn test_parse_float64() {
521        assert_eq!(
522            RedisType::Float64.parse("3.5").unwrap(),
523            TypedValue::Float64(3.5)
524        );
525        assert_eq!(
526            RedisType::Float64.parse("-0.5").unwrap(),
527            TypedValue::Float64(-0.5)
528        );
529        assert!(RedisType::Float64.parse("not_a_float").is_err());
530    }
531
532    #[test]
533    fn test_parse_boolean() {
534        assert_eq!(
535            RedisType::Boolean.parse("true").unwrap(),
536            TypedValue::Boolean(true)
537        );
538        assert_eq!(
539            RedisType::Boolean.parse("FALSE").unwrap(),
540            TypedValue::Boolean(false)
541        );
542        assert_eq!(
543            RedisType::Boolean.parse("1").unwrap(),
544            TypedValue::Boolean(true)
545        );
546        assert_eq!(
547            RedisType::Boolean.parse("0").unwrap(),
548            TypedValue::Boolean(false)
549        );
550        assert_eq!(
551            RedisType::Boolean.parse("yes").unwrap(),
552            TypedValue::Boolean(true)
553        );
554        assert_eq!(
555            RedisType::Boolean.parse("no").unwrap(),
556            TypedValue::Boolean(false)
557        );
558        assert!(RedisType::Boolean.parse("maybe").is_err());
559    }
560
561    #[test]
562    fn test_hash_schema_creation() {
563        let schema = HashSchema::new(vec![
564            ("name".to_string(), RedisType::Utf8),
565            ("age".to_string(), RedisType::Int64),
566        ]);
567
568        assert_eq!(schema.fields(), &["name", "age"]);
569        assert_eq!(schema.field_type("name"), Some(RedisType::Utf8));
570        assert_eq!(schema.field_type("age"), Some(RedisType::Int64));
571        assert_eq!(schema.field_type("missing"), None);
572    }
573
574    #[test]
575    fn test_hash_schema_to_arrow() {
576        let schema = HashSchema::new(vec![
577            ("name".to_string(), RedisType::Utf8),
578            ("age".to_string(), RedisType::Int64),
579            ("active".to_string(), RedisType::Boolean),
580        ]);
581
582        let arrow_schema = schema.to_arrow_schema();
583        assert_eq!(arrow_schema.fields().len(), 4); // _key + 3 fields
584
585        assert_eq!(arrow_schema.field(0).name(), "_key");
586        assert_eq!(arrow_schema.field(0).data_type(), &DataType::Utf8);
587
588        assert_eq!(arrow_schema.field(1).name(), "name");
589        assert_eq!(arrow_schema.field(2).name(), "age");
590        assert_eq!(arrow_schema.field(3).name(), "active");
591    }
592
593    #[test]
594    fn test_hash_schema_without_key() {
595        let schema = HashSchema::new(vec![("name".to_string(), RedisType::Utf8)]).with_key(false);
596
597        let arrow_schema = schema.to_arrow_schema();
598        assert_eq!(arrow_schema.fields().len(), 1);
599        assert_eq!(arrow_schema.field(0).name(), "name");
600    }
601
602    #[test]
603    fn test_hash_schema_projection() {
604        let schema = HashSchema::new(vec![
605            ("name".to_string(), RedisType::Utf8),
606            ("age".to_string(), RedisType::Int64),
607            ("email".to_string(), RedisType::Utf8),
608        ]);
609
610        // Project to only name and email
611        let projected = schema.project(&["name".to_string(), "email".to_string()]);
612        assert_eq!(projected.fields(), &["name", "email"]);
613        assert!(!projected.include_key()); // Key not in projection
614
615        // Project with key
616        let projected_with_key = schema.project(&["_key".to_string(), "name".to_string()]);
617        assert_eq!(projected_with_key.fields(), &["name"]);
618        assert!(projected_with_key.include_key());
619    }
620
621    #[test]
622    fn test_parse_date_iso() {
623        // 2024-01-15 is 19737 days since 1970-01-01
624        let result = RedisType::Date.parse("2024-01-15").unwrap();
625        assert!(matches!(result, TypedValue::Date(_)));
626        if let TypedValue::Date(days) = result {
627            // Verify it's a reasonable value (around 19737)
628            assert!(days > 19000 && days < 20000);
629        }
630    }
631
632    #[test]
633    fn test_parse_date_epoch_days() {
634        assert_eq!(
635            RedisType::Date.parse("19737").unwrap(),
636            TypedValue::Date(19737)
637        );
638        assert_eq!(RedisType::Date.parse("0").unwrap(), TypedValue::Date(0));
639    }
640
641    #[test]
642    fn test_parse_date_invalid() {
643        assert!(RedisType::Date.parse("not-a-date").is_err());
644        assert!(RedisType::Date.parse("2024-13-01").is_err()); // Invalid month
645        assert!(RedisType::Date.parse("2024-01-32").is_err()); // Invalid day
646    }
647
648    #[test]
649    fn test_parse_datetime_iso() {
650        // Test basic ISO 8601
651        let result = RedisType::Datetime.parse("2024-01-15T10:30:00").unwrap();
652        assert!(matches!(result, TypedValue::Datetime(_)));
653
654        // Test with Z suffix
655        let result = RedisType::Datetime.parse("2024-01-15T10:30:00Z").unwrap();
656        assert!(matches!(result, TypedValue::Datetime(_)));
657
658        // Test with fractional seconds
659        let result = RedisType::Datetime
660            .parse("2024-01-15T10:30:00.123456Z")
661            .unwrap();
662        assert!(matches!(result, TypedValue::Datetime(_)));
663    }
664
665    #[test]
666    fn test_parse_datetime_unix_seconds() {
667        // Unix timestamp in seconds (2024-01-15 10:30:00 UTC approximately)
668        let result = RedisType::Datetime.parse("1705315800").unwrap();
669        if let TypedValue::Datetime(us) = result {
670            // Should be converted to microseconds
671            assert_eq!(us, 1_705_315_800_000_000);
672        } else {
673            panic!("Expected Datetime");
674        }
675    }
676
677    #[test]
678    fn test_parse_datetime_unix_milliseconds() {
679        let result = RedisType::Datetime.parse("1705315800000").unwrap();
680        if let TypedValue::Datetime(us) = result {
681            // Should be converted to microseconds
682            assert_eq!(us, 1_705_315_800_000_000);
683        } else {
684            panic!("Expected Datetime");
685        }
686    }
687
688    #[test]
689    fn test_parse_datetime_unix_microseconds() {
690        let result = RedisType::Datetime.parse("1705315800000000").unwrap();
691        if let TypedValue::Datetime(us) = result {
692            // Already in microseconds
693            assert_eq!(us, 1_705_315_800_000_000);
694        } else {
695            panic!("Expected Datetime");
696        }
697    }
698
699    #[test]
700    fn test_parse_datetime_invalid() {
701        assert!(RedisType::Datetime.parse("not-a-datetime").is_err());
702        assert!(RedisType::Datetime.parse("2024-01-15").is_err()); // Date only, no time
703    }
704
705    #[test]
706    fn test_days_since_epoch() {
707        // 1970-01-01 should be day 0
708        assert_eq!(days_since_epoch(1970, 1, 1), Some(0));
709
710        // 1970-01-02 should be day 1
711        assert_eq!(days_since_epoch(1970, 1, 2), Some(1));
712
713        // 1971-01-01 should be 365
714        assert_eq!(days_since_epoch(1971, 1, 1), Some(365));
715
716        // 1972-01-01 should be 365 + 365 = 730
717        assert_eq!(days_since_epoch(1972, 1, 1), Some(730));
718
719        // 1973-01-01 should be 730 + 366 (1972 is leap) = 1096
720        assert_eq!(days_since_epoch(1973, 1, 1), Some(1096));
721    }
722
723    // ========================================================================
724    // Edge Case Tests
725    // ========================================================================
726
727    #[test]
728    fn test_parse_int64_edge_cases() {
729        // Max/min i64
730        assert_eq!(
731            RedisType::Int64.parse("9223372036854775807").unwrap(),
732            TypedValue::Int64(i64::MAX)
733        );
734        assert_eq!(
735            RedisType::Int64.parse("-9223372036854775808").unwrap(),
736            TypedValue::Int64(i64::MIN)
737        );
738
739        // Overflow
740        assert!(RedisType::Int64.parse("9223372036854775808").is_err());
741        assert!(RedisType::Int64.parse("-9223372036854775809").is_err());
742
743        // Leading/trailing whitespace should fail
744        assert!(RedisType::Int64.parse(" 42").is_err());
745        assert!(RedisType::Int64.parse("42 ").is_err());
746
747        // Empty string
748        assert!(RedisType::Int64.parse("").is_err());
749
750        // Zero
751        assert_eq!(RedisType::Int64.parse("0").unwrap(), TypedValue::Int64(0));
752        assert_eq!(RedisType::Int64.parse("-0").unwrap(), TypedValue::Int64(0));
753    }
754
755    #[test]
756    fn test_parse_float64_edge_cases() {
757        // Very small/large values
758        assert!(matches!(
759            RedisType::Float64.parse("1e308").unwrap(),
760            TypedValue::Float64(_)
761        ));
762        assert!(matches!(
763            RedisType::Float64.parse("1e-308").unwrap(),
764            TypedValue::Float64(_)
765        ));
766
767        // Infinity (should parse but be infinity)
768        let inf = RedisType::Float64.parse("inf");
769        assert!(inf.is_ok());
770        if let TypedValue::Float64(v) = inf.unwrap() {
771            assert!(v.is_infinite() && v.is_sign_positive());
772        }
773
774        let neg_inf = RedisType::Float64.parse("-inf");
775        assert!(neg_inf.is_ok());
776        if let TypedValue::Float64(v) = neg_inf.unwrap() {
777            assert!(v.is_infinite() && v.is_sign_negative());
778        }
779
780        // NaN (should parse)
781        let nan = RedisType::Float64.parse("NaN");
782        assert!(nan.is_ok());
783        if let TypedValue::Float64(v) = nan.unwrap() {
784            assert!(v.is_nan());
785        }
786
787        // Empty string
788        assert!(RedisType::Float64.parse("").is_err());
789
790        // Scientific notation
791        assert_eq!(
792            RedisType::Float64.parse("1.5e2").unwrap(),
793            TypedValue::Float64(150.0)
794        );
795    }
796
797    #[test]
798    fn test_parse_utf8_edge_cases() {
799        // Empty string is valid UTF-8
800        assert_eq!(
801            RedisType::Utf8.parse("").unwrap(),
802            TypedValue::Utf8("".to_string())
803        );
804
805        // Unicode
806        assert_eq!(
807            RedisType::Utf8.parse("Hello").unwrap(),
808            TypedValue::Utf8("Hello".to_string())
809        );
810
811        // Very long string
812        let long_string = "x".repeat(100_000);
813        assert_eq!(
814            RedisType::Utf8.parse(&long_string).unwrap(),
815            TypedValue::Utf8(long_string)
816        );
817
818        // Special characters
819        assert_eq!(
820            RedisType::Utf8.parse("\n\t\r").unwrap(),
821            TypedValue::Utf8("\n\t\r".to_string())
822        );
823    }
824
825    #[test]
826    fn test_hash_schema_empty_fields() {
827        let schema = HashSchema::new(vec![]);
828        assert!(schema.fields().is_empty());
829
830        let arrow_schema = schema.to_arrow_schema();
831        // Should still have key column
832        assert_eq!(arrow_schema.fields().len(), 1);
833    }
834
835    #[test]
836    fn test_hash_schema_duplicate_field_names() {
837        // This is allowed at schema level (last one wins in HashMap lookup)
838        let schema = HashSchema::new(vec![
839            ("name".to_string(), RedisType::Utf8),
840            ("name".to_string(), RedisType::Int64),
841        ]);
842
843        // Fields list preserves duplicates
844        assert_eq!(schema.fields().len(), 2);
845    }
846
847    #[test]
848    fn test_hash_schema_special_field_names() {
849        // Field names with special characters
850        let schema = HashSchema::new(vec![
851            ("field-with-dashes".to_string(), RedisType::Utf8),
852            ("field.with.dots".to_string(), RedisType::Int64),
853            ("field:with:colons".to_string(), RedisType::Float64),
854            ("".to_string(), RedisType::Boolean), // Empty field name
855        ]);
856
857        assert_eq!(schema.fields().len(), 4);
858        assert_eq!(
859            schema.field_type("field-with-dashes"),
860            Some(RedisType::Utf8)
861        );
862    }
863
864    #[test]
865    fn test_projection_empty() {
866        let schema = HashSchema::new(vec![
867            ("name".to_string(), RedisType::Utf8),
868            ("age".to_string(), RedisType::Int64),
869        ]);
870
871        let projected = schema.project(&[]);
872        assert!(projected.fields().is_empty());
873        assert!(!projected.include_key());
874    }
875
876    #[test]
877    fn test_projection_nonexistent_fields() {
878        let schema = HashSchema::new(vec![("name".to_string(), RedisType::Utf8)]);
879
880        // Projecting non-existent field should not include it
881        let projected = schema.project(&["nonexistent".to_string()]);
882        assert!(projected.fields().is_empty());
883    }
884}