1use std::sync::Arc;
7
8use arrow::array::{
9 ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, RecordBatch,
10 StringBuilder, TimestampMicrosecondBuilder,
11};
12use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
13
14use super::reader::StringData;
15use crate::error::{Error, Result};
16
17#[derive(Debug, Clone)]
44pub struct StringSchema {
45 value_type: DataType,
47 include_key: bool,
49 key_column_name: String,
51 value_column_name: String,
53 include_ttl: bool,
55 ttl_column_name: String,
57}
58
59impl StringSchema {
60 pub fn new(value_type: DataType) -> Self {
62 Self {
63 value_type,
64 include_key: true,
65 key_column_name: "_key".to_string(),
66 value_column_name: "value".to_string(),
67 include_ttl: false,
68 ttl_column_name: "_ttl".to_string(),
69 }
70 }
71
72 pub fn with_key(mut self, include: bool) -> Self {
74 self.include_key = include;
75 self
76 }
77
78 pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
80 self.key_column_name = name.into();
81 self
82 }
83
84 pub fn with_value_column_name(mut self, name: impl Into<String>) -> Self {
86 self.value_column_name = name.into();
87 self
88 }
89
90 pub fn with_ttl(mut self, include: bool) -> Self {
92 self.include_ttl = include;
93 self
94 }
95
96 pub fn with_ttl_column_name(mut self, name: impl Into<String>) -> Self {
98 self.ttl_column_name = name.into();
99 self
100 }
101
102 pub fn value_type(&self) -> &DataType {
104 &self.value_type
105 }
106
107 pub fn include_key(&self) -> bool {
109 self.include_key
110 }
111
112 pub fn key_column_name(&self) -> &str {
114 &self.key_column_name
115 }
116
117 pub fn value_column_name(&self) -> &str {
119 &self.value_column_name
120 }
121
122 pub fn include_ttl(&self) -> bool {
124 self.include_ttl
125 }
126
127 pub fn ttl_column_name(&self) -> &str {
129 &self.ttl_column_name
130 }
131
132 pub fn to_arrow_schema(&self) -> Schema {
134 let mut fields = Vec::with_capacity(3);
135
136 if self.include_key {
137 fields.push(Field::new(&self.key_column_name, DataType::Utf8, false));
138 }
139
140 if self.include_ttl {
141 fields.push(Field::new(&self.ttl_column_name, DataType::Int64, false));
142 }
143
144 fields.push(Field::new(
146 &self.value_column_name,
147 self.value_type.clone(),
148 true,
149 ));
150
151 Schema::new(fields)
152 }
153}
154
155impl Default for StringSchema {
156 fn default() -> Self {
157 Self::new(DataType::Utf8)
158 }
159}
160
161pub fn strings_to_record_batch(data: &[StringData], schema: &StringSchema) -> Result<RecordBatch> {
163 let arrow_schema = Arc::new(schema.to_arrow_schema());
164 let num_rows = data.len();
165
166 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(3);
167
168 if schema.include_key() {
170 let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
171 for row in data {
172 builder.append_value(&row.key);
173 }
174 arrays.push(Arc::new(builder.finish()));
175 }
176
177 if schema.include_ttl() {
179 let mut builder = Int64Builder::with_capacity(num_rows);
180 for row in data {
181 builder.append_value(row.ttl.unwrap_or(-1));
183 }
184 arrays.push(Arc::new(builder.finish()));
185 }
186
187 let value_array = build_value_column(data, schema.value_type())?;
189 arrays.push(value_array);
190
191 RecordBatch::try_new(arrow_schema, arrays)
192 .map_err(|e| Error::TypeConversion(format!("Failed to create RecordBatch: {}", e)))
193}
194
195fn build_value_column(data: &[StringData], value_type: &DataType) -> Result<ArrayRef> {
197 match value_type {
198 DataType::Utf8 => build_utf8_column(data),
199 DataType::Int64 => build_int64_column(data),
200 DataType::Float64 => build_float64_column(data),
201 DataType::Boolean => build_boolean_column(data),
202 DataType::Date32 => build_date_column(data),
203 DataType::Timestamp(TimeUnit::Microsecond, None) => build_datetime_column(data),
204 _ => Err(Error::TypeConversion(format!(
205 "Unsupported value type: {:?}",
206 value_type
207 ))),
208 }
209}
210
211fn build_utf8_column(data: &[StringData]) -> Result<ArrayRef> {
213 let mut builder = StringBuilder::with_capacity(data.len(), data.len() * 32);
214
215 for row in data {
216 match &row.value {
217 Some(value) => builder.append_value(value),
218 None => builder.append_null(),
219 }
220 }
221
222 Ok(Arc::new(builder.finish()))
223}
224
225fn build_int64_column(data: &[StringData]) -> Result<ArrayRef> {
227 let mut builder = Int64Builder::with_capacity(data.len());
228
229 for row in data {
230 match &row.value {
231 Some(value) => {
232 let parsed = value.parse::<i64>().map_err(|e| {
233 Error::TypeConversion(format!(
234 "Failed to parse '{}' as i64 for key '{}': {}",
235 value, row.key, e
236 ))
237 })?;
238 builder.append_value(parsed);
239 }
240 None => builder.append_null(),
241 }
242 }
243
244 Ok(Arc::new(builder.finish()))
245}
246
247fn build_float64_column(data: &[StringData]) -> Result<ArrayRef> {
249 let mut builder = Float64Builder::with_capacity(data.len());
250
251 for row in data {
252 match &row.value {
253 Some(value) => {
254 let parsed = value.parse::<f64>().map_err(|e| {
255 Error::TypeConversion(format!(
256 "Failed to parse '{}' as f64 for key '{}': {}",
257 value, row.key, e
258 ))
259 })?;
260 builder.append_value(parsed);
261 }
262 None => builder.append_null(),
263 }
264 }
265
266 Ok(Arc::new(builder.finish()))
267}
268
269fn build_boolean_column(data: &[StringData]) -> Result<ArrayRef> {
271 let mut builder = BooleanBuilder::with_capacity(data.len());
272
273 for row in data {
274 match &row.value {
275 Some(value) => {
276 let parsed = parse_bool(value).ok_or_else(|| {
277 Error::TypeConversion(format!(
278 "Failed to parse '{}' as boolean for key '{}'",
279 value, row.key
280 ))
281 })?;
282 builder.append_value(parsed);
283 }
284 None => builder.append_null(),
285 }
286 }
287
288 Ok(Arc::new(builder.finish()))
289}
290
291fn build_date_column(data: &[StringData]) -> Result<ArrayRef> {
293 let mut builder = Date32Builder::with_capacity(data.len());
294
295 for row in data {
296 match &row.value {
297 Some(value) => {
298 let parsed = parse_date(value).ok_or_else(|| {
299 Error::TypeConversion(format!(
300 "Failed to parse '{}' as date for key '{}'",
301 value, row.key
302 ))
303 })?;
304 builder.append_value(parsed);
305 }
306 None => builder.append_null(),
307 }
308 }
309
310 Ok(Arc::new(builder.finish()))
311}
312
313fn build_datetime_column(data: &[StringData]) -> Result<ArrayRef> {
315 let mut builder = TimestampMicrosecondBuilder::with_capacity(data.len());
316
317 for row in data {
318 match &row.value {
319 Some(value) => {
320 let parsed = parse_datetime(value).ok_or_else(|| {
321 Error::TypeConversion(format!(
322 "Failed to parse '{}' as datetime for key '{}'",
323 value, row.key
324 ))
325 })?;
326 builder.append_value(parsed);
327 }
328 None => builder.append_null(),
329 }
330 }
331
332 Ok(Arc::new(builder.finish()))
333}
334
335fn parse_bool(s: &str) -> Option<bool> {
337 let s = s.trim_matches('"').trim_matches('\'');
338 match s.to_lowercase().as_str() {
339 "true" | "1" | "yes" | "t" | "y" => Some(true),
340 "false" | "0" | "no" | "f" | "n" => Some(false),
341 _ => None,
342 }
343}
344
345fn parse_date(s: &str) -> Option<i32> {
351 if let Ok(days) = s.parse::<i32>() {
353 return Some(days);
354 }
355
356 if s.len() >= 10 {
358 let parts: Vec<&str> = s.split('-').collect();
359 if parts.len() >= 3
360 && let (Ok(year), Ok(month), Ok(day)) = (
361 parts[0].parse::<i32>(),
362 parts[1].parse::<u32>(),
363 parts[2].chars().take(2).collect::<String>().parse::<u32>(),
364 )
365 {
366 return days_since_epoch(year, month, day);
367 }
368 }
369
370 None
371}
372
373fn days_since_epoch(year: i32, month: u32, day: u32) -> Option<i32> {
375 if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
376 return None;
377 }
378
379 let days_in_month = [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
380 let is_leap = |y: i32| (y % 4 == 0 && y % 100 != 0) || (y % 400 == 0);
381
382 let mut total_days: i32 = 0;
383
384 if year >= 1970 {
385 for y in 1970..year {
386 total_days += if is_leap(y) { 366 } else { 365 };
387 }
388 } else {
389 for y in year..1970 {
390 total_days -= if is_leap(y) { 366 } else { 365 };
391 }
392 }
393
394 for m in 1..month {
395 total_days += days_in_month[m as usize];
396 if m == 2 && is_leap(year) {
397 total_days += 1;
398 }
399 }
400
401 total_days += day as i32 - 1;
402
403 Some(total_days)
404}
405
406fn parse_datetime(s: &str) -> Option<i64> {
414 let s = s.trim();
415
416 if let Ok(ts) = s.parse::<i64>() {
418 if ts < 10_000_000_000 {
419 return Some(ts * 1_000_000);
420 } else if ts < 10_000_000_000_000 {
421 return Some(ts * 1_000);
422 } else {
423 return Some(ts);
424 }
425 }
426
427 parse_iso8601_datetime(s)
429}
430
431fn parse_iso8601_datetime(s: &str) -> Option<i64> {
433 let s = s.trim_end_matches('Z');
434
435 let parts: Vec<&str> = s.split('T').collect();
436 if parts.len() != 2 {
437 let parts: Vec<&str> = s.split(' ').collect();
438 if parts.len() != 2 {
439 return None;
440 }
441 return parse_datetime_parts(parts[0], parts[1]);
442 }
443
444 parse_datetime_parts(parts[0], parts[1])
445}
446
447fn parse_datetime_parts(date_str: &str, time_str: &str) -> Option<i64> {
449 let date_parts: Vec<&str> = date_str.split('-').collect();
450 if date_parts.len() != 3 {
451 return None;
452 }
453
454 let year = date_parts[0].parse::<i32>().ok()?;
455 let month = date_parts[1].parse::<u32>().ok()?;
456 let day = date_parts[2].parse::<u32>().ok()?;
457
458 let time_str = time_str.split('+').next()?.split('-').next()?;
459 let time_parts: Vec<&str> = time_str.split(':').collect();
460 if time_parts.len() < 2 {
461 return None;
462 }
463
464 let hour = time_parts[0].parse::<u32>().ok()?;
465 let minute = time_parts[1].parse::<u32>().ok()?;
466
467 let (second, microsecond) = if time_parts.len() >= 3 {
468 let sec_parts: Vec<&str> = time_parts[2].split('.').collect();
469 let sec = sec_parts[0].parse::<u32>().ok()?;
470 let usec = if sec_parts.len() > 1 {
471 let frac = sec_parts[1];
472 let padded = format!("{:0<6}", frac);
473 padded[..6].parse::<u32>().unwrap_or(0)
474 } else {
475 0
476 };
477 (sec, usec)
478 } else {
479 (0, 0)
480 };
481
482 let days = days_since_epoch(year, month, day)?;
483 let day_us = days as i64 * 24 * 60 * 60 * 1_000_000;
484 let time_us =
485 (hour as i64 * 3600 + minute as i64 * 60 + second as i64) * 1_000_000 + microsecond as i64;
486
487 Some(day_us + time_us)
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493
494 fn make_string_data(key: &str, value: Option<&str>) -> StringData {
495 StringData {
496 key: key.to_string(),
497 value: value.map(|s| s.to_string()),
498 ttl: None,
499 }
500 }
501
502 fn make_string_data_with_ttl(key: &str, value: Option<&str>, ttl: i64) -> StringData {
503 StringData {
504 key: key.to_string(),
505 value: value.map(|s| s.to_string()),
506 ttl: Some(ttl),
507 }
508 }
509
510 #[test]
511 fn test_string_schema_default() {
512 let schema = StringSchema::default();
513 assert!(schema.include_key());
514 assert_eq!(schema.key_column_name(), "_key");
515 assert_eq!(schema.value_column_name(), "value");
516 assert_eq!(schema.value_type(), &DataType::Utf8);
517 }
518
519 #[test]
520 fn test_string_schema_builder() {
521 let schema = StringSchema::new(DataType::Int64)
522 .with_key(false)
523 .with_value_column_name("count");
524
525 assert!(!schema.include_key());
526 assert_eq!(schema.value_column_name(), "count");
527 assert_eq!(schema.value_type(), &DataType::Int64);
528 }
529
530 #[test]
531 fn test_strings_to_record_batch_utf8() {
532 let schema = StringSchema::new(DataType::Utf8);
533 let data = vec![
534 make_string_data("key:1", Some("hello")),
535 make_string_data("key:2", Some("world")),
536 make_string_data("key:3", None),
537 ];
538
539 let batch = strings_to_record_batch(&data, &schema).unwrap();
540 assert_eq!(batch.num_rows(), 3);
541 assert_eq!(batch.num_columns(), 2); }
543
544 #[test]
545 fn test_strings_to_record_batch_int64() {
546 let schema = StringSchema::new(DataType::Int64);
547 let data = vec![
548 make_string_data("counter:1", Some("100")),
549 make_string_data("counter:2", Some("-50")),
550 make_string_data("counter:3", None),
551 ];
552
553 let batch = strings_to_record_batch(&data, &schema).unwrap();
554 assert_eq!(batch.num_rows(), 3);
555 }
556
557 #[test]
558 fn test_strings_to_record_batch_float64() {
559 let schema = StringSchema::new(DataType::Float64);
560 let data = vec![
561 make_string_data("price:1", Some("19.99")),
562 make_string_data("price:2", Some("0.5")),
563 ];
564
565 let batch = strings_to_record_batch(&data, &schema).unwrap();
566 assert_eq!(batch.num_rows(), 2);
567 }
568
569 #[test]
570 fn test_strings_to_record_batch_boolean() {
571 let schema = StringSchema::new(DataType::Boolean);
572 let data = vec![
573 make_string_data("flag:1", Some("true")),
574 make_string_data("flag:2", Some("false")),
575 make_string_data("flag:3", Some("1")),
576 make_string_data("flag:4", Some("0")),
577 ];
578
579 let batch = strings_to_record_batch(&data, &schema).unwrap();
580 assert_eq!(batch.num_rows(), 4);
581 }
582
583 #[test]
584 fn test_strings_to_record_batch_no_key() {
585 let schema = StringSchema::new(DataType::Utf8).with_key(false);
586 let data = vec![make_string_data("key:1", Some("hello"))];
587
588 let batch = strings_to_record_batch(&data, &schema).unwrap();
589 assert_eq!(batch.num_columns(), 1); assert_eq!(batch.schema().field(0).name(), "value");
591 }
592
593 #[test]
594 fn test_strings_to_record_batch_parse_error() {
595 let schema = StringSchema::new(DataType::Int64);
596 let data = vec![make_string_data("key:1", Some("not_a_number"))];
597
598 let result = strings_to_record_batch(&data, &schema);
599 assert!(result.is_err());
600 }
601
602 #[test]
603 fn test_empty_data() {
604 let schema = StringSchema::new(DataType::Utf8);
605 let data: Vec<StringData> = vec![];
606
607 let batch = strings_to_record_batch(&data, &schema).unwrap();
608 assert_eq!(batch.num_rows(), 0);
609 }
610
611 #[test]
612 fn test_strings_to_record_batch_date() {
613 let schema = StringSchema::new(DataType::Date32);
614 let data = vec![
615 make_string_data("date:1", Some("2024-01-15")),
616 make_string_data("date:2", Some("19737")),
617 ];
618
619 let batch = strings_to_record_batch(&data, &schema).unwrap();
620 assert_eq!(batch.num_rows(), 2);
621 }
622
623 #[test]
624 fn test_strings_to_record_batch_datetime() {
625 let schema = StringSchema::new(DataType::Timestamp(TimeUnit::Microsecond, None));
626 let data = vec![
627 make_string_data("ts:1", Some("2024-01-15T10:30:00")),
628 make_string_data("ts:2", Some("1705315800")),
629 ];
630
631 let batch = strings_to_record_batch(&data, &schema).unwrap();
632 assert_eq!(batch.num_rows(), 2);
633 }
634
635 #[test]
636 fn test_string_schema_with_ttl() {
637 let schema = StringSchema::new(DataType::Utf8)
638 .with_ttl(true)
639 .with_ttl_column_name("expires_in");
640
641 assert!(schema.include_ttl());
642 assert_eq!(schema.ttl_column_name(), "expires_in");
643 }
644
645 #[test]
646 fn test_strings_to_record_batch_with_ttl() {
647 let schema = StringSchema::new(DataType::Utf8).with_ttl(true);
648 let data = vec![
649 make_string_data_with_ttl("key:1", Some("hello"), 3600),
650 make_string_data_with_ttl("key:2", Some("world"), -1), make_string_data_with_ttl("key:3", Some("test"), 60),
652 ];
653
654 let batch = strings_to_record_batch(&data, &schema).unwrap();
655 assert_eq!(batch.num_rows(), 3);
656 assert_eq!(batch.num_columns(), 3); assert_eq!(batch.schema().field(1).name(), "_ttl");
658 }
659
660 #[test]
661 fn test_strings_to_record_batch_ttl_column_order() {
662 let schema = StringSchema::new(DataType::Utf8).with_ttl(true);
664 let data = vec![make_string_data_with_ttl("key:1", Some("hello"), 100)];
665
666 let batch = strings_to_record_batch(&data, &schema).unwrap();
667 assert_eq!(batch.schema().field(0).name(), "_key");
668 assert_eq!(batch.schema().field(1).name(), "_ttl");
669 assert_eq!(batch.schema().field(2).name(), "value");
670 }
671}