use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use axum::extract::{Query, State};
use axum::http::{header, HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::Json;
use bytes::Bytes;
use futures::stream::{self, Stream, StreamExt};
use ndarray::{Array, IxDyn};
use serde::Deserialize;
use tracing::{debug, info};
use crate::error::{Result, RossbyError};
use crate::state::AppState;
fn generate_request_id() -> String {
use uuid::Uuid;
Uuid::new_v4().to_string()
}
fn log_request_error(error: &RossbyError, endpoint: &str, request_id: &str, params: Option<&str>) {
tracing::error!(
endpoint = endpoint,
request_id = %request_id,
params = params,
error = %error,
"Request failed"
);
}
#[derive(Debug, Deserialize, Clone)]
pub struct DataQuery {
pub vars: String,
#[serde(default)]
pub layout: Option<String>,
#[serde(default)]
pub format: Option<String>,
#[serde(flatten)]
pub dynamic_params: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub enum DimensionSelector {
SingleValue { dimension: String, value: f64 },
ValueRange {
dimension: String,
start: f64,
end: f64,
},
SingleIndex { dimension: String, index: usize },
IndexRange {
dimension: String,
start: usize,
end: usize,
},
}
struct ParsedDataQuery {
variables: Vec<String>,
dimension_selectors: Vec<DimensionSelector>,
layout: Option<Vec<String>>,
}
pub async fn data_handler(
State(state): State<Arc<AppState>>,
Query(params): Query<DataQuery>,
) -> Response {
let request_id = generate_request_id();
let start_time = Instant::now();
debug!(
endpoint = "/data",
request_id = %request_id,
vars = %params.vars,
layout = ?params.layout,
format = ?params.format,
params = ?params.dynamic_params,
"Processing data query"
);
debug!(
"Available dimensions: {:?}",
state.metadata.dimensions.keys().collect::<Vec<_>>()
);
debug!(
"Available variables: {:?}",
state.metadata.variables.keys().collect::<Vec<_>>()
);
let params_clone = params.clone();
let output_format = params.format.as_deref().unwrap_or("arrow");
match output_format {
"arrow" => {
match process_data_query(state, params_clone.clone()) {
Ok(arrow_data) => {
let duration = start_time.elapsed();
info!(
endpoint = "/data",
request_id = %request_id,
format = "arrow",
duration_us = duration.as_micros() as u64,
"Data query successful"
);
(
StatusCode::OK,
[(
header::CONTENT_TYPE,
HeaderValue::from_static("application/vnd.apache.arrow.stream"),
)],
arrow_data,
)
.into_response()
}
Err(error) => handle_data_error(error, &request_id, ¶ms),
}
}
"json" => {
match process_data_query_json(state, params_clone.clone()) {
Ok(response) => {
let duration = start_time.elapsed();
info!(
endpoint = "/data",
request_id = %request_id,
format = "json",
duration_us = duration.as_micros() as u64,
"Data query successful"
);
response
}
Err(error) => handle_data_error(error, &request_id, ¶ms),
}
}
_ => {
(
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("Unsupported format: {}", output_format),
"request_id": request_id
})),
)
.into_response()
}
}
}
fn handle_data_error(error: RossbyError, request_id: &str, params: &DataQuery) -> Response {
log_request_error(
&error,
"/data",
request_id,
Some(&format!(
"vars={}, params={:?}",
params.vars, params.dynamic_params
)),
);
tracing::debug!(
"/data endpoint failed: {:?} for params: {:?}",
error,
params
);
let status = match &error {
RossbyError::PayloadTooLarge { .. } => StatusCode::PAYLOAD_TOO_LARGE,
_ => StatusCode::BAD_REQUEST,
};
(
status,
Json(serde_json::json!({
"error": error.to_string(),
"request_id": request_id
})),
)
.into_response()
}
fn process_data_query_json(state: Arc<AppState>, params: DataQuery) -> Result<Response> {
use axum::body::Body;
let variables = params
.vars
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
if variables.is_empty() {
return Err(RossbyError::InvalidParameter {
param: "vars".to_string(),
message: "At least one variable must be specified".to_string(),
});
}
let mut invalid_vars = Vec::new();
for var in &variables {
if !state.has_variable(var) {
invalid_vars.push(var.clone());
}
}
if !invalid_vars.is_empty() {
return Err(RossbyError::InvalidVariables {
names: invalid_vars,
});
}
let dimension_selectors = process_dimension_constraints(&state, ¶ms.dynamic_params)?;
let layout = params.layout.as_ref().map(|layout_str| {
layout_str
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>()
});
if let Some(layout_dims) = &layout {
for dim in layout_dims {
let dim_result = state.resolve_dimension(dim);
if dim_result.is_err() {
debug!("Failed to resolve dimension: {} - {:?}", dim, dim_result);
return Err(RossbyError::InvalidParameter {
param: "layout".to_string(),
message: format!("Unknown dimension in layout: {}", dim),
});
}
}
}
let parsed_query = ParsedDataQuery {
variables,
dimension_selectors,
layout,
};
let stream = create_json_stream(state, parsed_query, params.clone())?;
Ok((
StatusCode::OK,
[(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
)],
Body::from_stream(stream),
)
.into_response())
}
fn create_json_stream(
state: Arc<AppState>,
query: ParsedDataQuery,
_params: DataQuery,
) -> Result<impl Stream<Item = std::result::Result<Bytes, std::io::Error>> + Send> {
let ParsedDataQuery {
variables,
dimension_selectors,
layout,
} = query;
let mut selected_ranges: HashMap<String, (usize, usize)> = HashMap::new();
let mut coordinate_arrays: HashMap<String, Vec<f64>> = HashMap::new();
for selector in dimension_selectors {
match selector {
DimensionSelector::SingleValue { dimension, value } => {
let index = state.find_coordinate_index(&dimension, value)?;
selected_ranges.insert(dimension.clone(), (index, index));
let coords = state.get_coordinate_checked(&dimension)?;
coordinate_arrays.insert(dimension, vec![coords[index]]);
}
DimensionSelector::ValueRange {
dimension,
start,
end,
} => {
let start_idx = state.find_coordinate_index(&dimension, start)?;
let end_idx = state.find_coordinate_index(&dimension, end)?;
selected_ranges.insert(dimension.clone(), (start_idx, end_idx));
let coords = state.get_coordinate_checked(&dimension)?;
let selected_coords = coords[start_idx..=end_idx].to_vec();
coordinate_arrays.insert(dimension, selected_coords);
}
DimensionSelector::SingleIndex { dimension, index } => {
let coords = state.get_coordinate_checked(&dimension)?;
if index >= coords.len() {
return Err(RossbyError::IndexOutOfBounds {
param: dimension.clone(),
value: index.to_string(),
max: coords.len() - 1,
});
}
selected_ranges.insert(dimension.clone(), (index, index));
coordinate_arrays.insert(dimension, vec![coords[index]]);
}
DimensionSelector::IndexRange {
dimension,
start,
end,
} => {
let coords = state.get_coordinate_checked(&dimension)?;
if start >= coords.len() || end >= coords.len() {
return Err(RossbyError::IndexOutOfBounds {
param: dimension.clone(),
value: format!("{}..{}", start, end),
max: coords.len() - 1,
});
}
selected_ranges.insert(dimension.clone(), (start, end));
let selected_coords = coords[start..=end].to_vec();
coordinate_arrays.insert(dimension, selected_coords);
}
}
}
let dimension_order = determine_dimension_order(&state, &variables, layout.as_ref())?;
populate_query_coordinates(
&state,
&dimension_order,
&mut selected_ranges,
&mut coordinate_arrays,
)?;
let total_points: usize = coordinate_arrays
.values()
.map(|coords| coords.len())
.product();
if total_points > state.config.server.max_data_points {
return Err(RossbyError::PayloadTooLarge {
message: "The requested data would exceed the maximum allowed size".to_string(),
requested: total_points,
max_allowed: state.config.server.max_data_points,
});
}
let mut var_data_arrays = Vec::new();
let mut var_metadata = Vec::new();
for var_name in &variables {
let array = extract_variable_data(&state, var_name, &selected_ranges)?;
var_data_arrays.push(array);
let var_meta = state.get_variable_metadata_checked(var_name)?;
var_metadata.push((var_name.clone(), var_meta));
}
normalize_variable_arrays(&state, &variables, &mut var_data_arrays)?;
let shapes: Vec<Vec<usize>> = var_data_arrays
.iter()
.map(|arr| arr.shape().to_vec())
.collect();
let mut var_meta_json = serde_json::Map::new();
for (var_name, var_meta) in var_metadata.iter() {
let mut attrs = serde_json::Map::new();
for (key, value) in &var_meta.attributes {
match value {
crate::state::AttributeValue::Text(text) => {
attrs.insert(key.clone(), serde_json::Value::String(text.clone()));
}
crate::state::AttributeValue::Number(num) => {
if let Some(json_num) = serde_json::Number::from_f64(*num) {
attrs.insert(key.clone(), serde_json::Value::Number(json_num));
} else {
attrs.insert(key.clone(), serde_json::Value::Null);
}
}
crate::state::AttributeValue::NumberArray(nums) => {
let arr: Vec<serde_json::Value> = nums
.iter()
.map(|&n| {
if let Some(json_num) = serde_json::Number::from_f64(n) {
serde_json::Value::Number(json_num)
} else {
serde_json::Value::Null
}
})
.collect();
attrs.insert(key.clone(), serde_json::Value::Array(arr));
}
}
}
var_meta_json.insert(var_name.clone(), serde_json::Value::Object(attrs));
}
let metadata = serde_json::json!({
"query": {
"vars": variables.join(","),
"layout": layout,
"format": "json"
},
"shapes": shapes,
"dimensions": dimension_order,
"variables": var_meta_json
});
let mut json_prefix = String::from("{\n \"metadata\": ");
json_prefix.push_str(&serde_json::to_string_pretty(&metadata).unwrap_or_default());
json_prefix.push_str(",\n \"data\": {\n");
let mut streams = Vec::new();
for (idx, (var_name, data_array)) in variables.iter().zip(var_data_arrays.iter()).enumerate() {
let var_prefix = if idx == 0 {
format!(" \"{}\": [", var_name)
} else {
format!(",\n \"{}\": [", var_name)
};
let var_meta = state.get_variable_metadata_checked(var_name)?;
let fill_value = var_meta
.attributes
.get("_FillValue")
.and_then(|attr| match attr {
crate::state::AttributeValue::Number(n) => Some(*n as f32),
_ => None,
});
let scale_factor = var_meta
.attributes
.get("scale_factor")
.and_then(|attr| match attr {
crate::state::AttributeValue::Number(n) => Some(*n as f32),
_ => None,
})
.unwrap_or(1.0);
let add_offset = var_meta
.attributes
.get("add_offset")
.and_then(|attr| match attr {
crate::state::AttributeValue::Number(n) => Some(*n as f32),
_ => None,
})
.unwrap_or(0.0);
let flat_data: Vec<f32> = data_array.iter().copied().collect();
const CHUNK_SIZE: usize = 1000;
let total_elements = flat_data.len();
let chunk_ranges: Vec<(usize, usize)> = (0..total_elements)
.step_by(CHUNK_SIZE)
.map(|start| (start, std::cmp::min(start + CHUNK_SIZE, total_elements)))
.collect();
let chunk_streams =
chunk_ranges
.into_iter()
.enumerate()
.map(move |(chunk_idx, (start, end))| {
let data_slice = &flat_data[start..end];
let is_first = chunk_idx == 0;
let is_last = end == total_elements;
let mut chunk_str = String::with_capacity(data_slice.len() * 10);
for (i, &value) in data_slice.iter().enumerate() {
if i > 0 || !is_first {
chunk_str.push_str(", ");
}
if let Some(fill) = fill_value {
if value == fill {
chunk_str.push_str("null");
continue;
}
}
let processed_value = value * scale_factor + add_offset;
chunk_str.push_str(&processed_value.to_string());
}
if is_last {
chunk_str.push(']');
}
Ok(Bytes::from(chunk_str))
});
let var_stream = stream::once(async move { Ok(Bytes::from(var_prefix)) })
.chain(stream::iter(chunk_streams));
streams.push(var_stream);
}
let json_prefix_stream = stream::once(async { Ok(Bytes::from(json_prefix)) });
let json_suffix_stream = stream::once(async { Ok(Bytes::from("\n }\n}")) });
let combined_stream = json_prefix_stream
.chain(stream::iter(streams).flatten())
.chain(json_suffix_stream);
Ok(combined_stream)
}
fn process_data_query(state: Arc<AppState>, params: DataQuery) -> Result<Vec<u8>> {
let variables = params
.vars
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
if variables.is_empty() {
return Err(RossbyError::InvalidParameter {
param: "vars".to_string(),
message: "At least one variable must be specified".to_string(),
});
}
let mut invalid_vars = Vec::new();
for var in &variables {
if !state.has_variable(var) {
invalid_vars.push(var.clone());
}
}
if !invalid_vars.is_empty() {
return Err(RossbyError::InvalidVariables {
names: invalid_vars,
});
}
let dimension_selectors = process_dimension_constraints(&state, ¶ms.dynamic_params)?;
let layout = params.layout.as_ref().map(|layout_str| {
layout_str
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>()
});
if let Some(layout_dims) = &layout {
debug!("Validating layout dimensions: {:?}", layout_dims);
debug!(
"Available dimensions: {:?}",
state.metadata.dimensions.keys().collect::<Vec<_>>()
);
debug!(
"Dimension aliases: {:?}",
state.config.data.dimension_aliases
);
for dim in layout_dims {
let dim_result = state.resolve_dimension(dim);
if dim_result.is_err() {
debug!("Failed to resolve dimension: {} - {:?}", dim, dim_result);
return Err(RossbyError::InvalidParameter {
param: "layout".to_string(),
message: format!("Unknown dimension in layout: {}", dim),
});
}
}
}
let parsed_query = ParsedDataQuery {
variables,
dimension_selectors,
layout,
};
extract_and_format_data(state, parsed_query)
}
fn process_dimension_constraints(
state: &AppState,
dynamic_params: &HashMap<String, String>,
) -> Result<Vec<DimensionSelector>> {
let mut selectors = Vec::new();
for (key, value) in dynamic_params {
if let Ok(file_specific) = state.resolve_dimension(key) {
let parsed_value = value
.parse::<f64>()
.map_err(|_| RossbyError::InvalidParameter {
param: key.clone(),
message: format!("Could not parse '{}' as a number", value),
})?;
selectors.push(DimensionSelector::SingleValue {
dimension: file_specific.to_string(),
value: parsed_value,
});
continue;
}
if key == "time_index" {
let index = value
.parse::<usize>()
.map_err(|_| RossbyError::InvalidParameter {
param: key.clone(),
message: format!("Could not parse '{}' as an integer index", value),
})?;
let mut found_time_dim = false;
for time_dim_name in &["time", "t"] {
if let Ok(time_dim) = state.resolve_dimension(time_dim_name) {
selectors.push(DimensionSelector::SingleIndex {
dimension: time_dim.to_string(),
index,
});
found_time_dim = true;
break;
}
}
if !found_time_dim && !state.metadata.dimensions.is_empty() {
if let Some((first_dim_name, _)) = state.metadata.dimensions.iter().next() {
tracing::debug!(
"No time dimension found, using first dimension '{}' for time_index",
first_dim_name
);
selectors.push(DimensionSelector::SingleIndex {
dimension: first_dim_name.clone(),
index,
});
}
}
continue;
}
if let Some(dim_name) = key.strip_suffix("_range") {
if let Ok(file_specific) = state.resolve_dimension(dim_name) {
let parts: Vec<&str> = value.split(',').collect();
if parts.len() != 2 {
return Err(RossbyError::InvalidParameter {
param: key.clone(),
message: format!(
"Range parameter must contain exactly two comma-separated values, got: '{}'",
value
),
});
}
let start =
parts[0]
.trim()
.parse::<f64>()
.map_err(|_| RossbyError::InvalidParameter {
param: key.clone(),
message: format!(
"Could not parse start value '{}' as a number",
parts[0]
),
})?;
let end =
parts[1]
.trim()
.parse::<f64>()
.map_err(|_| RossbyError::InvalidParameter {
param: key.clone(),
message: format!(
"Could not parse end value '{}' as a number",
parts[1]
),
})?;
selectors.push(DimensionSelector::ValueRange {
dimension: file_specific.to_string(),
start,
end,
});
continue;
}
}
if let Some(dim_name) = key
.strip_prefix("__")
.and_then(|s| s.strip_suffix("_index"))
{
if let Some(canonical) = state.get_canonical_dimension_name(dim_name) {
if let Ok(file_specific) = state.resolve_dimension(canonical) {
let index =
value
.parse::<usize>()
.map_err(|_| RossbyError::InvalidParameter {
param: key.clone(),
message: format!("Could not parse '{}' as an integer index", value),
})?;
selectors.push(DimensionSelector::SingleIndex {
dimension: file_specific.to_string(),
index,
});
continue;
}
}
}
if let Some(dim_name) = key
.strip_prefix("__")
.and_then(|s| s.strip_suffix("_index_range"))
{
if let Some(canonical) = state.get_canonical_dimension_name(dim_name) {
if let Ok(file_specific) = state.resolve_dimension(canonical) {
let parts: Vec<&str> = value.split(',').collect();
if parts.len() != 2 {
return Err(RossbyError::InvalidParameter {
param: key.clone(),
message: format!(
"Range parameter must contain exactly two comma-separated values, got: '{}'",
value
),
});
}
let start = parts[0].trim().parse::<usize>().map_err(|_| {
RossbyError::InvalidParameter {
param: key.clone(),
message: format!(
"Could not parse start index '{}' as an integer",
parts[0]
),
}
})?;
let end = parts[1].trim().parse::<usize>().map_err(|_| {
RossbyError::InvalidParameter {
param: key.clone(),
message: format!(
"Could not parse end index '{}' as an integer",
parts[1]
),
}
})?;
selectors.push(DimensionSelector::IndexRange {
dimension: file_specific.to_string(),
start,
end,
});
continue;
}
}
}
}
Ok(selectors)
}
fn determine_dimension_order(
state: &AppState,
variables: &[String],
layout: Option<&Vec<String>>,
) -> Result<Vec<String>> {
if let Some(layout_dims) = layout {
return Ok(layout_dims
.iter()
.map(|dim| state.resolve_dimension(dim).unwrap_or(dim).to_string())
.collect());
}
for var_name in variables {
let var_meta = state.get_variable_metadata_checked(var_name)?;
if !var_meta.dimensions.is_empty() {
return Ok(var_meta.dimensions.clone());
}
}
Ok(Vec::new())
}
fn populate_query_coordinates(
state: &AppState,
dimension_order: &[String],
selected_ranges: &mut HashMap<String, (usize, usize)>,
coordinate_arrays: &mut HashMap<String, Vec<f64>>,
) -> Result<()> {
for dim_name in dimension_order {
if !selected_ranges.contains_key(dim_name) {
let dim = state.metadata.dimensions.get(dim_name).ok_or_else(|| {
RossbyError::DataNotFound {
message: format!("Dimension {} not found in metadata", dim_name),
}
})?;
let end = dim
.size
.checked_sub(1)
.ok_or_else(|| RossbyError::DataNotFound {
message: format!("Dimension {} is empty", dim_name),
})?;
selected_ranges.insert(dim_name.clone(), (0, end));
}
if coordinate_arrays.contains_key(dim_name) {
continue;
}
let (start, end) = selected_ranges[dim_name];
if let Some(coords) = state.get_coordinate(dim_name) {
if end >= coords.len() {
return Err(RossbyError::IndexOutOfBounds {
param: dim_name.clone(),
value: format!("{}..{}", start, end),
max: coords.len().saturating_sub(1),
});
}
coordinate_arrays.insert(dim_name.clone(), coords[start..=end].to_vec());
} else {
let indices: Vec<f64> = (start..=end).map(|i| i as f64).collect();
coordinate_arrays.insert(dim_name.clone(), indices);
}
}
Ok(())
}
fn normalize_variable_arrays(
state: &AppState,
variables: &[String],
data_arrays: &mut [Array<f32, IxDyn>],
) -> Result<()> {
let reference_shape = variables
.iter()
.zip(data_arrays.iter())
.find_map(|(var_name, array)| {
state
.get_variable_metadata_checked(var_name)
.ok()
.filter(|meta| !meta.dimensions.is_empty())
.map(|_| array.shape().to_vec())
});
let Some(reference_shape) = reference_shape else {
return Ok(());
};
for (var_name, array) in variables.iter().zip(data_arrays.iter_mut()) {
let var_meta = state.get_variable_metadata_checked(var_name)?;
if var_meta.dimensions.is_empty() {
let scalar_value =
array
.iter()
.next()
.copied()
.ok_or_else(|| RossbyError::DataNotFound {
message: format!("Scalar variable {} has no data", var_name),
})?;
*array = Array::from_elem(IxDyn(&reference_shape), scalar_value);
}
if array.shape() != reference_shape.as_slice() {
return Err(RossbyError::InvalidParameter {
param: "vars".to_string(),
message: format!(
"Requested variables resolve to incompatible shapes: {} has {:?}, expected {:?}",
var_name,
array.shape(),
reference_shape
),
});
}
}
Ok(())
}
fn extract_and_format_data(state: Arc<AppState>, query: ParsedDataQuery) -> Result<Vec<u8>> {
let ParsedDataQuery {
variables,
dimension_selectors,
layout,
} = query;
let mut selected_ranges: HashMap<String, (usize, usize)> = HashMap::new();
let mut coordinate_arrays: HashMap<String, Vec<f64>> = HashMap::new();
for selector in dimension_selectors {
match selector {
DimensionSelector::SingleValue { dimension, value } => {
let index = state.find_coordinate_index(&dimension, value)?;
selected_ranges.insert(dimension.clone(), (index, index));
let coords = state.get_coordinate_checked(&dimension)?;
coordinate_arrays.insert(dimension, vec![coords[index]]);
}
DimensionSelector::ValueRange {
dimension,
start,
end,
} => {
let start_idx = state.find_coordinate_index(&dimension, start)?;
let end_idx = state.find_coordinate_index(&dimension, end)?;
selected_ranges.insert(dimension.clone(), (start_idx, end_idx));
let coords = state.get_coordinate_checked(&dimension)?;
let selected_coords = coords[start_idx..=end_idx].to_vec();
coordinate_arrays.insert(dimension, selected_coords);
}
DimensionSelector::SingleIndex { dimension, index } => {
let coords = state.get_coordinate_checked(&dimension)?;
if index >= coords.len() {
return Err(RossbyError::IndexOutOfBounds {
param: dimension.clone(),
value: index.to_string(),
max: coords.len() - 1,
});
}
selected_ranges.insert(dimension.clone(), (index, index));
coordinate_arrays.insert(dimension, vec![coords[index]]);
}
DimensionSelector::IndexRange {
dimension,
start,
end,
} => {
let coords = state.get_coordinate_checked(&dimension)?;
if start >= coords.len() || end >= coords.len() {
return Err(RossbyError::IndexOutOfBounds {
param: dimension.clone(),
value: format!("{}..{}", start, end),
max: coords.len() - 1,
});
}
selected_ranges.insert(dimension.clone(), (start, end));
let selected_coords = coords[start..=end].to_vec();
coordinate_arrays.insert(dimension, selected_coords);
}
}
}
let dimension_order = determine_dimension_order(&state, &variables, layout.as_ref())?;
populate_query_coordinates(
&state,
&dimension_order,
&mut selected_ranges,
&mut coordinate_arrays,
)?;
let total_points: usize = coordinate_arrays
.values()
.map(|coords| coords.len())
.product();
if total_points > state.config.server.max_data_points {
return Err(RossbyError::PayloadTooLarge {
message: "The requested data would exceed the maximum allowed size".to_string(),
requested: total_points,
max_allowed: state.config.server.max_data_points,
});
}
let mut var_data_arrays = Vec::new();
for var_name in &variables {
let array = extract_variable_data(&state, var_name, &selected_ranges)?;
var_data_arrays.push(array);
}
normalize_variable_arrays(&state, &variables, &mut var_data_arrays)?;
let mut ordered_dimension_names = Vec::new();
let mut ordered_coordinate_arrays = Vec::new();
for dim_name in &dimension_order {
if let Some(coords) = coordinate_arrays.get(dim_name) {
ordered_dimension_names.push(dim_name.clone());
ordered_coordinate_arrays.push(coords);
}
}
let var_data_array_refs: Vec<&Array<f32, IxDyn>> = var_data_arrays.iter().collect();
create_arrow_table(
&variables,
&var_data_array_refs,
&ordered_dimension_names,
&ordered_coordinate_arrays,
layout.as_ref(),
)
}
fn extract_variable_data(
state: &AppState,
var_name: &str,
selected_ranges: &HashMap<String, (usize, usize)>,
) -> Result<Array<f32, IxDyn>> {
let var_data = state.get_variable_checked(var_name)?;
let var_meta = state.get_variable_metadata_checked(var_name)?;
let dimensions = &var_meta.dimensions;
let mut result = var_data.to_owned();
for (i, dim_name) in dimensions.iter().enumerate().rev() {
if let Some(&(start, end)) = selected_ranges.get(dim_name) {
let axis = ndarray::Axis(i);
if start == end {
result = result.index_axis(axis, start).to_owned().into_dyn();
} else {
result = result
.slice_axis(axis, ndarray::Slice::from(start..=end))
.to_owned();
}
}
}
Ok(result)
}
fn create_arrow_table(
variables: &[String],
data_arrays: &[&Array<f32, IxDyn>],
dimension_names: &[String],
coordinate_arrays: &[&Vec<f64>],
layout: Option<&Vec<String>>,
) -> Result<Vec<u8>> {
use std::sync::Arc;
debug!(
"Creating Arrow table with {} variables, {} dimensions",
variables.len(),
dimension_names.len()
);
let total_elements: usize = if let Some(first_data) = data_arrays.first() {
first_data.len()
} else {
return Err(RossbyError::Conversion {
message: "No data arrays provided for Arrow table creation".to_string(),
});
};
debug!("Total elements needed for each column: {}", total_elements);
for (i, arr) in data_arrays.iter().enumerate() {
debug!(
"Data array {} has {} elements and shape {:?}",
i,
arr.len(),
arr.shape()
);
}
for (i, coords) in coordinate_arrays.iter().enumerate() {
debug!(
"Coordinate array {} ({}) has {} elements",
i,
dimension_names.get(i).unwrap_or(&"unknown".to_string()),
coords.len()
);
}
let mut fields = Vec::new();
for dim_name in dimension_names.iter() {
fields.push(Field::new(dim_name, DataType::Float64, false));
}
for (var_name, data_array) in variables.iter().zip(data_arrays.iter()) {
let mut metadata = HashMap::new();
let shape = data_array.shape();
metadata.insert(
"shape".to_string(),
serde_json::to_string(&shape).map_err(|e| RossbyError::Conversion {
message: format!("Failed to serialize shape metadata: {}", e),
})?,
);
let dimension_names_vec = dimension_names.to_vec();
let dimension_order = layout.unwrap_or(&dimension_names_vec);
metadata.insert(
"dimensions".to_string(),
serde_json::to_string(dimension_order).map_err(|e| RossbyError::Conversion {
message: format!("Failed to serialize dimensions metadata: {}", e),
})?,
);
let field = Field::new(var_name, DataType::Float32, false).with_metadata(metadata);
fields.push(field);
}
let schema = Arc::new(Schema::new(fields));
let mut columns = Vec::new();
for (dim_idx, &coords) in coordinate_arrays.iter().enumerate() {
let unknown_str = "unknown".to_string();
let dim_name = dimension_names.get(dim_idx).unwrap_or(&unknown_str);
let array = if coords.len() == total_elements {
debug!(
"Using coordinate array for {} as-is ({} elements)",
dim_name,
coords.len()
);
Float64Array::from((*coords).clone())
} else if coords.len() == 1 {
debug!(
"Repeating single coordinate value for {} to {} elements",
dim_name, total_elements
);
let repeated_val = coords[0];
Float64Array::from(vec![repeated_val; total_elements])
} else {
debug!(
"Creating compatible coordinate array for {} ({} elements needed, had {})",
dim_name,
total_elements,
coords.len()
);
let mut compatible_coords = Vec::with_capacity(total_elements);
for i in 0..total_elements {
compatible_coords.push(coords[i % coords.len()]);
}
Float64Array::from(compatible_coords)
};
columns.push(Arc::new(array) as ArrayRef);
}
for (var_idx, &data_array) in data_arrays.iter().enumerate() {
let unknown_str = "unknown".to_string();
let var_name = variables.get(var_idx).unwrap_or(&unknown_str);
let flat_data: Vec<f32> = data_array.iter().copied().collect();
debug!(
"Adding variable {} with {} elements",
var_name,
flat_data.len()
);
let array = Float32Array::from(flat_data);
columns.push(Arc::new(array) as ArrayRef);
}
let batch =
RecordBatch::try_new(schema.clone(), columns).map_err(|e| RossbyError::Conversion {
message: format!("Failed to create Arrow record batch: {}", e),
})?;
let mut output = Vec::new();
let mut writer =
StreamWriter::try_new(&mut output, &schema).map_err(|e| RossbyError::Conversion {
message: format!("Failed to create Arrow IPC writer: {}", e),
})?;
writer.write(&batch).map_err(|e| RossbyError::Conversion {
message: format!("Failed to write Arrow record batch: {}", e),
})?;
writer.finish().map_err(|e| RossbyError::Conversion {
message: format!("Failed to finalize Arrow IPC stream: {}", e),
})?;
Ok(output)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Config;
use crate::state::{AppState, Dimension, Metadata, Variable};
use std::collections::HashMap;
fn create_test_state() -> Arc<AppState> {
let mut dimensions = HashMap::new();
dimensions.insert(
"time".to_string(),
Dimension {
name: "time".to_string(),
size: 5,
is_unlimited: false,
},
);
dimensions.insert(
"lat".to_string(),
Dimension {
name: "lat".to_string(),
size: 3,
is_unlimited: false,
},
);
dimensions.insert(
"lon".to_string(),
Dimension {
name: "lon".to_string(),
size: 4,
is_unlimited: false,
},
);
let mut variables = HashMap::new();
variables.insert(
"t2m".to_string(),
Variable {
name: "t2m".to_string(),
dimensions: vec!["time".to_string(), "lat".to_string(), "lon".to_string()],
shape: vec![5, 3, 4],
attributes: HashMap::new(),
dtype: "f32".to_string(),
},
);
let mut coordinates = HashMap::new();
coordinates.insert(
"time".to_string(),
vec![
1672531200.0,
1672534800.0,
1672538400.0,
1672542000.0,
1672545600.0,
],
);
coordinates.insert("lat".to_string(), vec![35.0, 36.0, 37.0]);
coordinates.insert("lon".to_string(), vec![139.0, 140.0, 141.0, 142.0]);
let metadata = Metadata {
global_attributes: HashMap::new(),
dimensions,
variables,
coordinates,
};
let mut data = HashMap::new();
let t2m_data =
Array::from_shape_fn((5, 3, 4), |(t, la, lo)| (t * 100 + la * 10 + lo) as f32)
.into_dyn();
data.insert("t2m".to_string(), t2m_data);
let mut dimension_aliases = HashMap::new();
dimension_aliases.insert("latitude".to_string(), "lat".to_string());
dimension_aliases.insert("longitude".to_string(), "lon".to_string());
let mut config = Config::default();
config.data.dimension_aliases = dimension_aliases;
config.server.max_data_points = 1000;
Arc::new(AppState::new(config, metadata, data))
}
fn create_test_state_with_scalar(max_data_points: usize) -> Arc<AppState> {
let mut state = create_test_state().as_ref().clone();
state.metadata.variables.insert(
"offset".to_string(),
Variable {
name: "offset".to_string(),
dimensions: Vec::new(),
shape: Vec::new(),
attributes: HashMap::new(),
dtype: "f32".to_string(),
},
);
state
.data
.insert("offset".to_string(), Array::from_elem(IxDyn(&[]), 273.15));
state.config.server.max_data_points = max_data_points;
Arc::new(state)
}
#[test]
fn test_dimension_selector_parsing() {
let _state = create_test_state();
let mut params = HashMap::new();
params.insert("time".to_string(), "1672531200".to_string());
params.insert("lat_range".to_string(), "35.0,37.0".to_string());
params.insert("__lon_index".to_string(), "2".to_string());
let selectors = process_dimension_constraints(&_state, ¶ms).unwrap();
assert_eq!(selectors.len(), 3);
for selector in selectors {
match selector {
DimensionSelector::SingleValue { dimension, value } => {
assert_eq!(dimension, "time");
assert_eq!(value, 1672531200.0);
}
DimensionSelector::ValueRange {
dimension,
start,
end,
} => {
assert_eq!(dimension, "lat");
assert_eq!(start, 35.0);
assert_eq!(end, 37.0);
}
DimensionSelector::SingleIndex { dimension, index } => {
assert_eq!(dimension, "lon");
assert_eq!(index, 2);
}
_ => panic!("Unexpected selector type"),
}
}
}
#[test]
fn test_extract_variable_data() {
let state = create_test_state();
let mut selected_ranges = HashMap::new();
selected_ranges.insert("time".to_string(), (0, 0));
let result = extract_variable_data(&state, "t2m", &selected_ranges).unwrap();
assert_eq!(result.shape(), &[3, 4]);
assert_eq!(result[[1, 2]], 12.0);
}
#[test]
fn test_create_arrow_table() {
let data = Array::from_vec(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
let data_dyn = data.into_dyn();
let dim_names = vec!["x".to_string()];
let x_coords = vec![10.0, 20.0, 30.0, 40.0, 50.0]; let coord_arrays = vec![&x_coords];
let variables = vec!["temp".to_string()];
let data_arrays = vec![&data_dyn];
let arrow_data =
create_arrow_table(&variables, &data_arrays, &dim_names, &coord_arrays, None).unwrap();
assert!(!arrow_data.is_empty());
assert!(arrow_data.len() > 100);
}
#[test]
fn test_extract_and_format_data_with_scalar_first() {
let state = create_test_state_with_scalar(1000);
let query = ParsedDataQuery {
variables: vec!["offset".to_string(), "t2m".to_string()],
dimension_selectors: vec![DimensionSelector::SingleIndex {
dimension: "time".to_string(),
index: 0,
}],
layout: None,
};
let arrow_data = extract_and_format_data(state, query).unwrap();
assert!(!arrow_data.is_empty());
}
#[test]
fn test_extract_and_format_scalar_only_ignores_unrelated_dimensions() {
let state = create_test_state_with_scalar(1);
let query = ParsedDataQuery {
variables: vec!["offset".to_string()],
dimension_selectors: Vec::new(),
layout: None,
};
let arrow_data = extract_and_format_data(state, query).unwrap();
assert!(!arrow_data.is_empty());
}
}