use futures::StreamExt as _;
use re_protos::cloud::v1alpha1::QueryDatasetResponse;
use re_protos::cloud::v1alpha1::ext::QueryDatasetRequest;
use re_protos::cloud::v1alpha1::rerun_cloud_service_server::RerunCloudService;
use re_protos::headers::RerunHeadersInjectorExt as _;
use crate::tests::common::{
DataSourcesDefinition, LayerDefinition, RerunCloudServiceExt as _, concat_record_batches,
};
use crate::{FieldsTestExt as _, RecordBatchTestExt as _};
pub async fn query_empty_dataset(service: impl RerunCloudService) {
let dataset_name = "dataset";
service.create_dataset_entry_with_name(dataset_name).await;
query_dataset_snapshot(
&service,
QueryDatasetRequest::default(),
dataset_name,
"empty_dataset",
)
.await;
}
pub async fn query_simple_dataset(service: impl RerunCloudService) {
let data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
1,
[
LayerDefinition::simple("my_segment_id1", &["my/entity", "my/other/entity"]),
LayerDefinition::simple("my_segment_id2", &["my/entity"]),
LayerDefinition::simple(
"my_segment_id3",
&["my/entity", "another/one", "yet/another/one"],
),
],
);
let dataset_name = "dataset";
service.create_dataset_entry_with_name(dataset_name).await;
service
.register_with_dataset_name(dataset_name, data_sources_def.to_data_sources())
.await;
let requests = vec![
(QueryDatasetRequest::default(), "default"),
(
QueryDatasetRequest {
segment_ids: vec!["my_segment_id3".into()],
..Default::default()
},
"single_segment",
),
(
QueryDatasetRequest {
entity_paths: vec!["/my/entity".into()],
select_all_entity_paths: false,
..Default::default()
},
"single_entity",
),
(
QueryDatasetRequest {
exclude_static_data: true,
..Default::default()
},
"exclude_static",
),
(
QueryDatasetRequest {
exclude_temporal_data: true,
..Default::default()
},
"exclude_temporal",
),
];
for (request, snapshot_name) in requests {
query_dataset_snapshot(
&service,
request,
dataset_name,
&format!("simple_dataset_{snapshot_name}"),
)
.await;
}
}
pub async fn query_simple_dataset_with_layers(service: impl RerunCloudService) {
let data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
1,
[
LayerDefinition::simple("partition1", &["my/entity"]),
LayerDefinition::simple("partition1", &["extra/entity"]).layer_name("extra"),
LayerDefinition::simple("partition2", &["another/one"]).layer_name("base"),
LayerDefinition::simple("partition2", &["extra/entity"]).layer_name("extra"),
LayerDefinition::simple("partition3", &["i/am/alone"]),
],
);
let dataset_name = "dataset_with_layers";
service.create_dataset_entry_with_name(dataset_name).await;
service
.register_with_dataset_name(dataset_name, data_sources_def.to_data_sources())
.await;
query_dataset_snapshot(
&service,
QueryDatasetRequest::default(),
dataset_name,
"simple_with_layer",
)
.await;
}
pub async fn query_dataset_should_fail(service: impl RerunCloudService) {
let dataset_name = "dataset";
service.create_dataset_entry_with_name(dataset_name).await;
let test_cases = vec![
(
"cannot specify entity paths if `select_all_entity_paths` is true",
QueryDatasetRequest {
entity_paths: vec!["/entity/path".into()],
select_all_entity_paths: true,
..Default::default()
},
tonic::Code::InvalidArgument,
),
];
for (descr, request, expected_code) in test_cases {
let response = service
.query_dataset(tonic::Request::new(request.into()))
.await;
match response {
Ok(_) => {
panic!("expected failure with code {expected_code}, but got success ({descr})",);
}
Err(err) => {
assert_eq!(
err.code(),
expected_code,
"expected failure with code {expected_code}, but got {err} ({descr})"
);
}
}
}
}
async fn query_dataset_snapshot(
service: &impl RerunCloudService,
query_dataset_request: QueryDatasetRequest,
dataset_name: &str,
snapshot_name: &str,
) {
let chunk_info = service
.query_dataset(
tonic::Request::new(query_dataset_request.into())
.with_entry_name(dataset_name)
.unwrap(),
)
.await
.unwrap()
.into_inner()
.flat_map(|resp| futures::stream::iter(resp.unwrap().data))
.map(|dfp| dfp.try_into().unwrap())
.collect::<Vec<_>>()
.await;
let merged_chunk_info = concat_record_batches(&chunk_info);
let required_field = QueryDatasetResponse::fields();
assert!(
merged_chunk_info
.schema()
.fields()
.contains_unordered(&required_field),
"query dataset must return all guaranteed fields\nExpected: {:#?}\nGot: {:#?}",
merged_chunk_info.schema().fields(),
required_field,
);
let required_column_names = required_field
.iter()
.map(|f| f.name().as_str())
.collect::<Vec<_>>();
let required_chunk_info = merged_chunk_info.project_columns(&required_column_names);
insta::assert_snapshot!(
format!("{snapshot_name}_schema"),
required_chunk_info.format_schema_snapshot()
);
let filtered_chunk_info = required_chunk_info
.remove_columns(&[
QueryDatasetResponse::FIELD_CHUNK_KEY,
QueryDatasetResponse::FIELD_CHUNK_BYTE_LENGTH,
])
.auto_sort_rows()
.unwrap();
insta::assert_snapshot!(
format!("{snapshot_name}_data"),
filtered_chunk_info.format_snapshot(false)
);
}