polars_redis/types/string/
convert.rs

1//! Conversion from Redis string data to Arrow arrays.
2//!
3//! This module handles the conversion of Redis string values into Arrow RecordBatches
4//! that can be consumed by Polars.
5
6use std::sync::Arc;
7
8use arrow::array::{
9    ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, RecordBatch,
10    StringBuilder, TimestampMicrosecondBuilder,
11};
12use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
13
14use super::reader::StringData;
15use crate::error::{Error, Result};
16
17/// Schema for Redis string values, defining how to parse and convert them.
18///
19/// Redis strings are simple key-value pairs. This schema defines what type
20/// the values should be parsed as (e.g., Int64 for counters, Utf8 for text).
21///
22/// # Example
23///
24/// ```ignore
25/// use polars_redis::StringSchema;
26/// use arrow::datatypes::DataType;
27///
28/// // For string counters
29/// let schema = StringSchema::new(DataType::Int64)
30///     .with_key(true)
31///     .with_value_column_name("count");
32///
33/// // For text values
34/// let schema = StringSchema::new(DataType::Utf8)
35///     .with_key(true);
36/// ```
37///
38/// # Output Schema
39///
40/// - `_key` (optional): The Redis key
41/// - `value`: The parsed value (type depends on schema)
42/// - `_ttl` (optional): TTL in seconds
43#[derive(Debug, Clone)]
44pub struct StringSchema {
45    /// The Arrow DataType for values.
46    value_type: DataType,
47    /// Whether to include the Redis key as a column.
48    include_key: bool,
49    /// Name of the key column.
50    key_column_name: String,
51    /// Name of the value column.
52    value_column_name: String,
53    /// Whether to include the TTL as a column.
54    include_ttl: bool,
55    /// Name of the TTL column.
56    ttl_column_name: String,
57}
58
59impl StringSchema {
60    /// Create a new StringSchema with the given value type.
61    pub fn new(value_type: DataType) -> Self {
62        Self {
63            value_type,
64            include_key: true,
65            key_column_name: "_key".to_string(),
66            value_column_name: "value".to_string(),
67            include_ttl: false,
68            ttl_column_name: "_ttl".to_string(),
69        }
70    }
71
72    /// Set whether to include the Redis key as a column.
73    pub fn with_key(mut self, include: bool) -> Self {
74        self.include_key = include;
75        self
76    }
77
78    /// Set the name of the key column.
79    pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
80        self.key_column_name = name.into();
81        self
82    }
83
84    /// Set the name of the value column.
85    pub fn with_value_column_name(mut self, name: impl Into<String>) -> Self {
86        self.value_column_name = name.into();
87        self
88    }
89
90    /// Set whether to include the TTL as a column.
91    pub fn with_ttl(mut self, include: bool) -> Self {
92        self.include_ttl = include;
93        self
94    }
95
96    /// Set the name of the TTL column.
97    pub fn with_ttl_column_name(mut self, name: impl Into<String>) -> Self {
98        self.ttl_column_name = name.into();
99        self
100    }
101
102    /// Get the value type.
103    pub fn value_type(&self) -> &DataType {
104        &self.value_type
105    }
106
107    /// Whether the key column is included.
108    pub fn include_key(&self) -> bool {
109        self.include_key
110    }
111
112    /// Get the key column name.
113    pub fn key_column_name(&self) -> &str {
114        &self.key_column_name
115    }
116
117    /// Get the value column name.
118    pub fn value_column_name(&self) -> &str {
119        &self.value_column_name
120    }
121
122    /// Whether the TTL column is included.
123    pub fn include_ttl(&self) -> bool {
124        self.include_ttl
125    }
126
127    /// Get the TTL column name.
128    pub fn ttl_column_name(&self) -> &str {
129        &self.ttl_column_name
130    }
131
132    /// Convert to Arrow Schema.
133    pub fn to_arrow_schema(&self) -> Schema {
134        let mut fields = Vec::with_capacity(3);
135
136        if self.include_key {
137            fields.push(Field::new(&self.key_column_name, DataType::Utf8, false));
138        }
139
140        if self.include_ttl {
141            fields.push(Field::new(&self.ttl_column_name, DataType::Int64, false));
142        }
143
144        // Value is nullable (key might not exist)
145        fields.push(Field::new(
146            &self.value_column_name,
147            self.value_type.clone(),
148            true,
149        ));
150
151        Schema::new(fields)
152    }
153}
154
155impl Default for StringSchema {
156    fn default() -> Self {
157        Self::new(DataType::Utf8)
158    }
159}
160
161/// Convert a batch of Redis string data to an Arrow RecordBatch.
162pub fn strings_to_record_batch(data: &[StringData], schema: &StringSchema) -> Result<RecordBatch> {
163    let arrow_schema = Arc::new(schema.to_arrow_schema());
164    let num_rows = data.len();
165
166    let mut arrays: Vec<ArrayRef> = Vec::with_capacity(3);
167
168    // Build key column if included
169    if schema.include_key() {
170        let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
171        for row in data {
172            builder.append_value(&row.key);
173        }
174        arrays.push(Arc::new(builder.finish()));
175    }
176
177    // Build TTL column if included
178    if schema.include_ttl() {
179        let mut builder = Int64Builder::with_capacity(num_rows);
180        for row in data {
181            // TTL: -1 = no expiry, -2 = key doesn't exist, positive = seconds remaining
182            builder.append_value(row.ttl.unwrap_or(-1));
183        }
184        arrays.push(Arc::new(builder.finish()));
185    }
186
187    // Build value column based on type
188    let value_array = build_value_column(data, schema.value_type())?;
189    arrays.push(value_array);
190
191    RecordBatch::try_new(arrow_schema, arrays)
192        .map_err(|e| Error::TypeConversion(format!("Failed to create RecordBatch: {}", e)))
193}
194
195/// Build the value column based on the schema's value type.
196fn build_value_column(data: &[StringData], value_type: &DataType) -> Result<ArrayRef> {
197    match value_type {
198        DataType::Utf8 => build_utf8_column(data),
199        DataType::Int64 => build_int64_column(data),
200        DataType::Float64 => build_float64_column(data),
201        DataType::Boolean => build_boolean_column(data),
202        DataType::Date32 => build_date_column(data),
203        DataType::Timestamp(TimeUnit::Microsecond, None) => build_datetime_column(data),
204        _ => Err(Error::TypeConversion(format!(
205            "Unsupported value type: {:?}",
206            value_type
207        ))),
208    }
209}
210
211/// Build a UTF-8 string column.
212fn build_utf8_column(data: &[StringData]) -> Result<ArrayRef> {
213    let mut builder = StringBuilder::with_capacity(data.len(), data.len() * 32);
214
215    for row in data {
216        match &row.value {
217            Some(value) => builder.append_value(value),
218            None => builder.append_null(),
219        }
220    }
221
222    Ok(Arc::new(builder.finish()))
223}
224
225/// Build an Int64 column, parsing string values.
226fn build_int64_column(data: &[StringData]) -> Result<ArrayRef> {
227    let mut builder = Int64Builder::with_capacity(data.len());
228
229    for row in data {
230        match &row.value {
231            Some(value) => {
232                let parsed = value.parse::<i64>().map_err(|e| {
233                    Error::TypeConversion(format!(
234                        "Failed to parse '{}' as i64 for key '{}': {}",
235                        value, row.key, e
236                    ))
237                })?;
238                builder.append_value(parsed);
239            }
240            None => builder.append_null(),
241        }
242    }
243
244    Ok(Arc::new(builder.finish()))
245}
246
247/// Build a Float64 column, parsing string values.
248fn build_float64_column(data: &[StringData]) -> Result<ArrayRef> {
249    let mut builder = Float64Builder::with_capacity(data.len());
250
251    for row in data {
252        match &row.value {
253            Some(value) => {
254                let parsed = value.parse::<f64>().map_err(|e| {
255                    Error::TypeConversion(format!(
256                        "Failed to parse '{}' as f64 for key '{}': {}",
257                        value, row.key, e
258                    ))
259                })?;
260                builder.append_value(parsed);
261            }
262            None => builder.append_null(),
263        }
264    }
265
266    Ok(Arc::new(builder.finish()))
267}
268
269/// Build a Boolean column, parsing string values.
270fn build_boolean_column(data: &[StringData]) -> Result<ArrayRef> {
271    let mut builder = BooleanBuilder::with_capacity(data.len());
272
273    for row in data {
274        match &row.value {
275            Some(value) => {
276                let parsed = parse_bool(value).ok_or_else(|| {
277                    Error::TypeConversion(format!(
278                        "Failed to parse '{}' as boolean for key '{}'",
279                        value, row.key
280                    ))
281                })?;
282                builder.append_value(parsed);
283            }
284            None => builder.append_null(),
285        }
286    }
287
288    Ok(Arc::new(builder.finish()))
289}
290
291/// Build a Date32 column, parsing string values.
292fn build_date_column(data: &[StringData]) -> Result<ArrayRef> {
293    let mut builder = Date32Builder::with_capacity(data.len());
294
295    for row in data {
296        match &row.value {
297            Some(value) => {
298                let parsed = parse_date(value).ok_or_else(|| {
299                    Error::TypeConversion(format!(
300                        "Failed to parse '{}' as date for key '{}'",
301                        value, row.key
302                    ))
303                })?;
304                builder.append_value(parsed);
305            }
306            None => builder.append_null(),
307        }
308    }
309
310    Ok(Arc::new(builder.finish()))
311}
312
313/// Build a Timestamp column, parsing string values.
314fn build_datetime_column(data: &[StringData]) -> Result<ArrayRef> {
315    let mut builder = TimestampMicrosecondBuilder::with_capacity(data.len());
316
317    for row in data {
318        match &row.value {
319            Some(value) => {
320                let parsed = parse_datetime(value).ok_or_else(|| {
321                    Error::TypeConversion(format!(
322                        "Failed to parse '{}' as datetime for key '{}'",
323                        value, row.key
324                    ))
325                })?;
326                builder.append_value(parsed);
327            }
328            None => builder.append_null(),
329        }
330    }
331
332    Ok(Arc::new(builder.finish()))
333}
334
335/// Parse a string as a boolean.
336fn parse_bool(s: &str) -> Option<bool> {
337    let s = s.trim_matches('"').trim_matches('\'');
338    match s.to_lowercase().as_str() {
339        "true" | "1" | "yes" | "t" | "y" => Some(true),
340        "false" | "0" | "no" | "f" | "n" => Some(false),
341        _ => None,
342    }
343}
344
345/// Parse a string as a date (days since Unix epoch).
346///
347/// Accepts:
348/// - ISO 8601 date: "2024-01-15"
349/// - Epoch days as integer: "19738"
350fn parse_date(s: &str) -> Option<i32> {
351    // Try parsing as epoch days first (integer)
352    if let Ok(days) = s.parse::<i32>() {
353        return Some(days);
354    }
355
356    // Try parsing as YYYY-MM-DD
357    if s.len() >= 10 {
358        let parts: Vec<&str> = s.split('-').collect();
359        if parts.len() >= 3
360            && let (Ok(year), Ok(month), Ok(day)) = (
361                parts[0].parse::<i32>(),
362                parts[1].parse::<u32>(),
363                parts[2].chars().take(2).collect::<String>().parse::<u32>(),
364            )
365        {
366            return days_since_epoch(year, month, day);
367        }
368    }
369
370    None
371}
372
373/// Calculate days since Unix epoch for a given date.
374fn days_since_epoch(year: i32, month: u32, day: u32) -> Option<i32> {
375    if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
376        return None;
377    }
378
379    let days_in_month = [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
380    let is_leap = |y: i32| (y % 4 == 0 && y % 100 != 0) || (y % 400 == 0);
381
382    let mut total_days: i32 = 0;
383
384    if year >= 1970 {
385        for y in 1970..year {
386            total_days += if is_leap(y) { 366 } else { 365 };
387        }
388    } else {
389        for y in year..1970 {
390            total_days -= if is_leap(y) { 366 } else { 365 };
391        }
392    }
393
394    for m in 1..month {
395        total_days += days_in_month[m as usize];
396        if m == 2 && is_leap(year) {
397            total_days += 1;
398        }
399    }
400
401    total_days += day as i32 - 1;
402
403    Some(total_days)
404}
405
406/// Parse a string as a datetime (microseconds since Unix epoch).
407///
408/// Accepts:
409/// - ISO 8601 datetime: "2024-01-15T10:30:00"
410/// - Unix timestamp (seconds): "1705315800"
411/// - Unix timestamp (milliseconds): "1705315800000"
412/// - Unix timestamp (microseconds): "1705315800000000"
413fn parse_datetime(s: &str) -> Option<i64> {
414    let s = s.trim();
415
416    // Try parsing as numeric timestamp
417    if let Ok(ts) = s.parse::<i64>() {
418        if ts < 10_000_000_000 {
419            return Some(ts * 1_000_000);
420        } else if ts < 10_000_000_000_000 {
421            return Some(ts * 1_000);
422        } else {
423            return Some(ts);
424        }
425    }
426
427    // Try parsing ISO 8601 datetime
428    parse_iso8601_datetime(s)
429}
430
431/// Parse ISO 8601 datetime string to microseconds since epoch.
432fn parse_iso8601_datetime(s: &str) -> Option<i64> {
433    let s = s.trim_end_matches('Z');
434
435    let parts: Vec<&str> = s.split('T').collect();
436    if parts.len() != 2 {
437        let parts: Vec<&str> = s.split(' ').collect();
438        if parts.len() != 2 {
439            return None;
440        }
441        return parse_datetime_parts(parts[0], parts[1]);
442    }
443
444    parse_datetime_parts(parts[0], parts[1])
445}
446
447/// Parse date and time parts into microseconds since epoch.
448fn parse_datetime_parts(date_str: &str, time_str: &str) -> Option<i64> {
449    let date_parts: Vec<&str> = date_str.split('-').collect();
450    if date_parts.len() != 3 {
451        return None;
452    }
453
454    let year = date_parts[0].parse::<i32>().ok()?;
455    let month = date_parts[1].parse::<u32>().ok()?;
456    let day = date_parts[2].parse::<u32>().ok()?;
457
458    let time_str = time_str.split('+').next()?.split('-').next()?;
459    let time_parts: Vec<&str> = time_str.split(':').collect();
460    if time_parts.len() < 2 {
461        return None;
462    }
463
464    let hour = time_parts[0].parse::<u32>().ok()?;
465    let minute = time_parts[1].parse::<u32>().ok()?;
466
467    let (second, microsecond) = if time_parts.len() >= 3 {
468        let sec_parts: Vec<&str> = time_parts[2].split('.').collect();
469        let sec = sec_parts[0].parse::<u32>().ok()?;
470        let usec = if sec_parts.len() > 1 {
471            let frac = sec_parts[1];
472            let padded = format!("{:0<6}", frac);
473            padded[..6].parse::<u32>().unwrap_or(0)
474        } else {
475            0
476        };
477        (sec, usec)
478    } else {
479        (0, 0)
480    };
481
482    let days = days_since_epoch(year, month, day)?;
483    let day_us = days as i64 * 24 * 60 * 60 * 1_000_000;
484    let time_us =
485        (hour as i64 * 3600 + minute as i64 * 60 + second as i64) * 1_000_000 + microsecond as i64;
486
487    Some(day_us + time_us)
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    fn make_string_data(key: &str, value: Option<&str>) -> StringData {
495        StringData {
496            key: key.to_string(),
497            value: value.map(|s| s.to_string()),
498            ttl: None,
499        }
500    }
501
502    fn make_string_data_with_ttl(key: &str, value: Option<&str>, ttl: i64) -> StringData {
503        StringData {
504            key: key.to_string(),
505            value: value.map(|s| s.to_string()),
506            ttl: Some(ttl),
507        }
508    }
509
510    #[test]
511    fn test_string_schema_default() {
512        let schema = StringSchema::default();
513        assert!(schema.include_key());
514        assert_eq!(schema.key_column_name(), "_key");
515        assert_eq!(schema.value_column_name(), "value");
516        assert_eq!(schema.value_type(), &DataType::Utf8);
517    }
518
519    #[test]
520    fn test_string_schema_builder() {
521        let schema = StringSchema::new(DataType::Int64)
522            .with_key(false)
523            .with_value_column_name("count");
524
525        assert!(!schema.include_key());
526        assert_eq!(schema.value_column_name(), "count");
527        assert_eq!(schema.value_type(), &DataType::Int64);
528    }
529
530    #[test]
531    fn test_strings_to_record_batch_utf8() {
532        let schema = StringSchema::new(DataType::Utf8);
533        let data = vec![
534            make_string_data("key:1", Some("hello")),
535            make_string_data("key:2", Some("world")),
536            make_string_data("key:3", None),
537        ];
538
539        let batch = strings_to_record_batch(&data, &schema).unwrap();
540        assert_eq!(batch.num_rows(), 3);
541        assert_eq!(batch.num_columns(), 2); // _key, value
542    }
543
544    #[test]
545    fn test_strings_to_record_batch_int64() {
546        let schema = StringSchema::new(DataType::Int64);
547        let data = vec![
548            make_string_data("counter:1", Some("100")),
549            make_string_data("counter:2", Some("-50")),
550            make_string_data("counter:3", None),
551        ];
552
553        let batch = strings_to_record_batch(&data, &schema).unwrap();
554        assert_eq!(batch.num_rows(), 3);
555    }
556
557    #[test]
558    fn test_strings_to_record_batch_float64() {
559        let schema = StringSchema::new(DataType::Float64);
560        let data = vec![
561            make_string_data("price:1", Some("19.99")),
562            make_string_data("price:2", Some("0.5")),
563        ];
564
565        let batch = strings_to_record_batch(&data, &schema).unwrap();
566        assert_eq!(batch.num_rows(), 2);
567    }
568
569    #[test]
570    fn test_strings_to_record_batch_boolean() {
571        let schema = StringSchema::new(DataType::Boolean);
572        let data = vec![
573            make_string_data("flag:1", Some("true")),
574            make_string_data("flag:2", Some("false")),
575            make_string_data("flag:3", Some("1")),
576            make_string_data("flag:4", Some("0")),
577        ];
578
579        let batch = strings_to_record_batch(&data, &schema).unwrap();
580        assert_eq!(batch.num_rows(), 4);
581    }
582
583    #[test]
584    fn test_strings_to_record_batch_no_key() {
585        let schema = StringSchema::new(DataType::Utf8).with_key(false);
586        let data = vec![make_string_data("key:1", Some("hello"))];
587
588        let batch = strings_to_record_batch(&data, &schema).unwrap();
589        assert_eq!(batch.num_columns(), 1); // Just value
590        assert_eq!(batch.schema().field(0).name(), "value");
591    }
592
593    #[test]
594    fn test_strings_to_record_batch_parse_error() {
595        let schema = StringSchema::new(DataType::Int64);
596        let data = vec![make_string_data("key:1", Some("not_a_number"))];
597
598        let result = strings_to_record_batch(&data, &schema);
599        assert!(result.is_err());
600    }
601
602    #[test]
603    fn test_empty_data() {
604        let schema = StringSchema::new(DataType::Utf8);
605        let data: Vec<StringData> = vec![];
606
607        let batch = strings_to_record_batch(&data, &schema).unwrap();
608        assert_eq!(batch.num_rows(), 0);
609    }
610
611    #[test]
612    fn test_strings_to_record_batch_date() {
613        let schema = StringSchema::new(DataType::Date32);
614        let data = vec![
615            make_string_data("date:1", Some("2024-01-15")),
616            make_string_data("date:2", Some("19737")),
617        ];
618
619        let batch = strings_to_record_batch(&data, &schema).unwrap();
620        assert_eq!(batch.num_rows(), 2);
621    }
622
623    #[test]
624    fn test_strings_to_record_batch_datetime() {
625        let schema = StringSchema::new(DataType::Timestamp(TimeUnit::Microsecond, None));
626        let data = vec![
627            make_string_data("ts:1", Some("2024-01-15T10:30:00")),
628            make_string_data("ts:2", Some("1705315800")),
629        ];
630
631        let batch = strings_to_record_batch(&data, &schema).unwrap();
632        assert_eq!(batch.num_rows(), 2);
633    }
634
635    #[test]
636    fn test_string_schema_with_ttl() {
637        let schema = StringSchema::new(DataType::Utf8)
638            .with_ttl(true)
639            .with_ttl_column_name("expires_in");
640
641        assert!(schema.include_ttl());
642        assert_eq!(schema.ttl_column_name(), "expires_in");
643    }
644
645    #[test]
646    fn test_strings_to_record_batch_with_ttl() {
647        let schema = StringSchema::new(DataType::Utf8).with_ttl(true);
648        let data = vec![
649            make_string_data_with_ttl("key:1", Some("hello"), 3600),
650            make_string_data_with_ttl("key:2", Some("world"), -1), // no expiry
651            make_string_data_with_ttl("key:3", Some("test"), 60),
652        ];
653
654        let batch = strings_to_record_batch(&data, &schema).unwrap();
655        assert_eq!(batch.num_rows(), 3);
656        assert_eq!(batch.num_columns(), 3); // _key, _ttl, value
657        assert_eq!(batch.schema().field(1).name(), "_ttl");
658    }
659
660    #[test]
661    fn test_strings_to_record_batch_ttl_column_order() {
662        // Verify column order: key, ttl, value
663        let schema = StringSchema::new(DataType::Utf8).with_ttl(true);
664        let data = vec![make_string_data_with_ttl("key:1", Some("hello"), 100)];
665
666        let batch = strings_to_record_batch(&data, &schema).unwrap();
667        assert_eq!(batch.schema().field(0).name(), "_key");
668        assert_eq!(batch.schema().field(1).name(), "_ttl");
669        assert_eq!(batch.schema().field(2).name(), "value");
670    }
671}