use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_zerobus_sdk_wrapper::{
TransmissionResult, WrapperConfiguration, ZerobusError, ZerobusWrapper,
};
use std::sync::Arc;
#[tokio::test]
#[ignore] async fn test_send_batch_contract_per_row_fields() {
let config = WrapperConfiguration::new(
"https://test-workspace.cloud.databricks.com".to_string(),
"test_table".to_string(),
);
let wrapper = match ZerobusWrapper::new(config).await {
Ok(w) => w,
Err(_) => return,
};
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]);
let id_array = Int64Array::from(vec![1, 2]);
let name_array = StringArray::from(vec!["Alice", "Bob"]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(name_array)],
)
.unwrap();
let result = wrapper.send_batch(batch).await;
match result {
Ok(transmission_result) => {
let _ = transmission_result.failed_rows;
let _ = transmission_result.successful_rows;
let _ = transmission_result.total_rows;
let _ = transmission_result.successful_count;
let _ = transmission_result.failed_count;
assert_eq!(
transmission_result.total_rows,
transmission_result.successful_count + transmission_result.failed_count
);
}
Err(_) => {
}
}
}
#[tokio::test]
#[ignore] async fn test_send_batch_with_descriptor_contract() {
let config = WrapperConfiguration::new(
"https://test-workspace.cloud.databricks.com".to_string(),
"test_table".to_string(),
);
let wrapper = match ZerobusWrapper::new(config).await {
Ok(w) => w,
Err(_) => return,
};
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]);
let id_array = Int64Array::from(vec![1, 2]);
let name_array = StringArray::from(vec!["Alice", "Bob"]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(name_array)],
)
.unwrap();
let result = wrapper.send_batch_with_descriptor(batch, None).await;
match result {
Ok(transmission_result) => {
assert!(transmission_result.total_rows >= 0);
assert_eq!(
transmission_result.total_rows,
transmission_result.successful_count + transmission_result.failed_count
);
}
Err(_) => {
}
}
}
#[tokio::test]
#[ignore] async fn test_backward_compatibility_contract() {
let config = WrapperConfiguration::new(
"https://test-workspace.cloud.databricks.com".to_string(),
"test_table".to_string(),
);
let wrapper = match ZerobusWrapper::new(config).await {
Ok(w) => w,
Err(_) => return,
};
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Int64Array::from(vec![1]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array)]).unwrap();
let result = wrapper.send_batch(batch).await;
match result {
Ok(transmission_result) => {
if transmission_result.success {
assert!(transmission_result.error.is_none());
} else {
}
if let Some(error) = &transmission_result.error {
assert!(!transmission_result.success || transmission_result.failed_count > 0);
}
}
Err(_) => {
}
}
}