use crate::array_util::*;
use crate::plot::scale::ScaleTypeKind;
pub(super) const ROW_INDEX_COLUMN: &str = "__ggsql_row_index__";
#[allow(unused_imports)]
use crate::plot::ArrayElement;
use crate::plot::ParameterValue;
use crate::{naming, AestheticValue, DataFrame, GgsqlError, Plot, Result};
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, TimeUnit};
use serde_json::{json, Map, Value};
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq)]
pub(super) enum TemporalType {
Date,
DateTime,
Time,
}
pub(super) fn dataframe_to_values(df: &DataFrame) -> Result<Vec<Value>> {
let mut values = Vec::new();
let height = df.height();
let column_names = df.get_column_names();
let columns = df.get_columns();
for row_idx in 0..height {
let mut row_obj = Map::new();
for (col_idx, col_name) in column_names.iter().enumerate() {
let column = columns.get(col_idx).ok_or_else(|| {
GgsqlError::WriterError(format!("Failed to get column {}", col_name))
})?;
let value = series_value_at(column, row_idx)?;
row_obj.insert(col_name.to_string(), value);
}
values.push(Value::Object(row_obj));
}
Ok(values)
}
pub(super) fn series_value_at(array: &ArrayRef, idx: usize) -> Result<Value> {
if array.is_null(idx) {
return Ok(Value::Null);
}
match array.data_type() {
DataType::Int8 => Ok(json!(as_i8(array)?.value(idx))),
DataType::Int16 => Ok(json!(as_i16(array)?.value(idx))),
DataType::Int32 => Ok(json!(as_i32(array)?.value(idx))),
DataType::Int64 => Ok(json!(as_i64(array)?.value(idx))),
DataType::UInt8 => Ok(json!(as_u8(array)?.value(idx))),
DataType::UInt16 => Ok(json!(as_u16(array)?.value(idx))),
DataType::UInt32 => Ok(json!(as_u32(array)?.value(idx))),
DataType::UInt64 => Ok(json!(as_u64(array)?.value(idx))),
DataType::Float32 => Ok(json!(as_f32(array)?.value(idx))),
DataType::Float64 => Ok(json!(as_f64(array)?.value(idx))),
DataType::Boolean => Ok(json!(as_bool(array)?.value(idx))),
DataType::Utf8 => {
Ok(json!(as_str(array)?.value(idx)))
}
DataType::Date32 => {
let days = as_date32(array)?.value(idx);
let unix_epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let date = unix_epoch + chrono::Duration::days(days as i64);
Ok(json!(date.format("%Y-%m-%d").to_string()))
}
DataType::Timestamp(time_unit, _) => {
let timestamp = as_timestamp_us(array).map(|a| a.value(idx)).or_else(|_| {
let cast = cast_array(array, &DataType::Timestamp(TimeUnit::Microsecond, None))?;
Ok(as_timestamp_us(&cast)?.value(idx))
})?;
let micros = match time_unit {
TimeUnit::Microsecond => timestamp,
TimeUnit::Millisecond => timestamp * 1_000,
TimeUnit::Nanosecond => timestamp / 1_000,
TimeUnit::Second => timestamp * 1_000_000,
};
let secs = micros / 1_000_000;
let nsecs = ((micros % 1_000_000) * 1000) as u32;
let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(secs, nsecs)
.unwrap_or_else(|| chrono::DateTime::<chrono::Utc>::from_timestamp(0, 0).unwrap());
Ok(json!(dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()))
}
DataType::Time64(_) => {
let nanos = as_time64_ns(array)?.value(idx);
let hours = nanos / 3_600_000_000_000;
let minutes = (nanos % 3_600_000_000_000) / 60_000_000_000;
let seconds = (nanos % 60_000_000_000) / 1_000_000_000;
let millis = (nanos % 1_000_000_000) / 1_000_000;
Ok(json!(format!(
"{:02}:{:02}:{:02}.{:03}",
hours, minutes, seconds, millis
)))
}
_ => {
Ok(json!(value_to_string(array, idx)))
}
}
}
pub(super) fn find_bin_for_value(value: f64, breaks: &[f64]) -> Option<(f64, f64)> {
let n = breaks.len();
if n < 2 {
return None;
}
for i in 0..n - 1 {
let lower = breaks[i];
let upper = breaks[i + 1];
let is_last_bin = i == n - 2;
let in_bin = if is_last_bin {
value >= lower && value <= upper
} else {
value >= lower && value < upper
};
if in_bin {
return Some((lower, upper));
}
}
None
}
pub(super) fn dataframe_to_values_with_bins(
df: &DataFrame,
binned_columns: &HashMap<String, Vec<f64>>,
) -> Result<Vec<Value>> {
let mut values = Vec::new();
let height = df.height();
let column_names = df.get_column_names();
let columns = df.get_columns();
for row_idx in 0..height {
let mut row_obj = Map::new();
for (col_idx, col_name) in column_names.iter().enumerate() {
let column = columns.get(col_idx).ok_or_else(|| {
GgsqlError::WriterError(format!("Failed to get column {}", col_name))
})?;
let value = series_value_at(column, row_idx)?;
let col_name_str = col_name.to_string();
if let Some(breaks) = binned_columns.get(&col_name_str) {
let temporal_info = value.as_str().and_then(parse_temporal_string);
let numeric_value = value.as_f64().or_else(|| temporal_info.map(|(val, _)| val));
if let Some(val) = numeric_value {
if let Some((start, end)) = find_bin_for_value(val, breaks) {
if let Some((_, temporal_type)) = temporal_info {
let start_str = format_temporal(start, temporal_type);
let end_str = format_temporal(end, temporal_type);
row_obj.insert(col_name_str.clone(), json!(start_str));
row_obj.insert(naming::bin_end_column(&col_name_str), json!(end_str));
} else {
row_obj.insert(col_name_str.clone(), json!(start));
row_obj.insert(naming::bin_end_column(&col_name_str), json!(end));
}
continue;
}
}
}
row_obj.insert(col_name.to_string(), value);
}
values.push(Value::Object(row_obj));
}
Ok(values)
}
pub(super) fn parse_temporal_string(s: &str) -> Option<(f64, TemporalType)> {
if let Some(ArrayElement::Date(days)) = ArrayElement::from_date_string(s) {
return Some((days as f64, TemporalType::Date));
}
if let Some(ArrayElement::DateTime(micros)) = ArrayElement::from_datetime_string(s) {
return Some((micros as f64, TemporalType::DateTime));
}
if let Some(ArrayElement::Time(nanos)) = ArrayElement::from_time_string(s) {
return Some((nanos as f64, TemporalType::Time));
}
None
}
pub(super) fn format_temporal(value: f64, temporal_type: TemporalType) -> String {
match temporal_type {
TemporalType::Date => ArrayElement::date_to_iso(value as i32),
TemporalType::DateTime => ArrayElement::datetime_to_iso(value as i64),
TemporalType::Time => ArrayElement::time_to_iso(value as i64),
}
}
pub(super) fn collect_binned_columns(spec: &Plot) -> HashMap<String, Vec<f64>> {
let mut binned_columns: HashMap<String, Vec<f64>> = HashMap::new();
let aesthetic_ctx = spec.get_aesthetic_context();
for scale in &spec.scales {
if !aesthetic_ctx.is_primary_internal(&scale.aesthetic) {
continue;
}
let is_binned = scale
.scale_type
.as_ref()
.map(|st| st.scale_type_kind() == ScaleTypeKind::Binned)
.unwrap_or(false);
if !is_binned {
continue;
}
if let Some(ParameterValue::Array(breaks)) = scale.properties.get("breaks") {
let break_values: Vec<f64> = breaks.iter().filter_map(|e| e.to_f64()).collect();
if break_values.len() >= 2 {
let aes_col_name = naming::aesthetic_column(&scale.aesthetic);
binned_columns.insert(aes_col_name, break_values.clone());
for layer in &spec.layers {
if let Some(AestheticValue::Column { name: col, .. }) =
layer.mappings.aesthetics.get(&scale.aesthetic)
{
binned_columns.insert(col.clone(), break_values.clone());
}
}
}
}
}
binned_columns
}
pub(super) fn is_binned_aesthetic(aesthetic: &str, spec: &Plot) -> bool {
let aesthetic_ctx = spec.get_aesthetic_context();
let primary = aesthetic_ctx
.primary_internal_position(aesthetic)
.unwrap_or(aesthetic);
spec.find_scale(primary)
.and_then(|s| s.scale_type.as_ref())
.map(|st| st.scale_type_kind() == ScaleTypeKind::Binned)
.unwrap_or(false)
}
pub(super) fn unify_datasets(datasets: &Map<String, Value>) -> Result<Vec<Value>> {
let mut all_columns: std::collections::HashSet<String> = std::collections::HashSet::new();
for (_key, values) in datasets {
if let Some(arr) = values.as_array() {
for row in arr {
if let Some(obj) = row.as_object() {
for col_name in obj.keys() {
all_columns.insert(col_name.clone());
}
}
}
}
}
let mut unified = Vec::new();
let mut row_index: usize = 0;
for (key, values) in datasets {
if let Some(arr) = values.as_array() {
for row in arr {
if let Some(obj) = row.as_object() {
let mut new_row = Map::new();
for col_name in &all_columns {
let value = obj.get(col_name).cloned().unwrap_or(Value::Null);
new_row.insert(col_name.clone(), value);
}
new_row.insert(naming::SOURCE_COLUMN.to_string(), json!(key));
new_row.insert(ROW_INDEX_COLUMN.to_string(), json!(row_index));
row_index += 1;
unified.push(Value::Object(new_row));
}
}
}
}
Ok(unified)
}