#![expect(clippy::unwrap_used)]
use arrow::array::{ListArray, RecordBatch, StringArray, TimestampNanosecondArray};
use arrow::datatypes::Schema;
use futures::TryStreamExt as _;
use itertools::Itertools as _;
use re_arrow_util::ArrowArrayDowncastRef as _;
use re_protos::cloud::v1alpha1::ext::DatasetDetails;
use re_protos::cloud::v1alpha1::rerun_cloud_service_server::RerunCloudService;
use re_protos::cloud::v1alpha1::{
CreateDatasetEntryRequest, DataSource, DataSourceKind, GetDatasetManifestSchemaRequest,
GetSegmentTableSchemaRequest, ReadDatasetEntryRequest, ScanDatasetManifestRequest,
ScanDatasetManifestResponse, ScanSegmentTableRequest, ScanSegmentTableResponse,
};
use re_protos::headers::RerunHeadersInjectorExt as _;
use url::Url;
use super::common::{DataSourcesDefinition, LayerDefinition, RerunCloudServiceExt as _, prop};
use crate::{
FieldsTestExt as _, RecordBatchTestExt as _, SchemaTestExt as _, create_simple_recording_in,
};
pub async fn register_and_scan_simple_dataset(service: impl RerunCloudService) {
let data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
1,
[
LayerDefinition::simple("my_segment_id1", &["my/entity", "my/other/entity"]),
LayerDefinition::simple("my_segment_id2", &["my/entity"]),
LayerDefinition::simple(
"my_segment_id3",
&["my/entity", "another/one", "yet/another/one"],
),
],
);
let dataset_name = "my_dataset1";
service.create_dataset_entry_with_name(dataset_name).await;
service
.register_with_dataset_name(dataset_name, data_sources_def.to_data_sources())
.await;
scan_segment_table_and_snapshot(&service, dataset_name, "simple").await;
scan_dataset_manifest_and_snapshot(&service, dataset_name, "simple").await;
}
pub async fn register_and_scan_blueprint_dataset(service: impl RerunCloudService) {
let blueprint_data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
2,
[LayerDefinition::simple_blueprint("blueprint_segment_id")],
);
let dataset_name = "my_dataset1";
service.create_dataset_entry_with_name(dataset_name).await;
let dataset_details: DatasetDetails = service
.read_dataset_entry(
tonic::Request::new(ReadDatasetEntryRequest {})
.with_entry_name(dataset_name)
.unwrap(),
)
.await
.unwrap()
.into_inner()
.dataset
.unwrap()
.dataset_details
.unwrap()
.try_into()
.unwrap();
assert!(dataset_details.blueprint_dataset.is_some());
let blueprint_dataset_name = service
.read_dataset_entry(
tonic::Request::new(ReadDatasetEntryRequest {})
.with_entry_id(dataset_details.blueprint_dataset.unwrap())
.unwrap(),
)
.await
.unwrap()
.into_inner()
.dataset
.unwrap()
.details
.unwrap()
.name
.unwrap();
service
.register_with_dataset_name(
&blueprint_dataset_name,
blueprint_data_sources_def.to_data_sources(),
)
.await;
scan_segment_table_and_snapshot(&service, &blueprint_dataset_name, "simple_blueprint").await;
scan_dataset_manifest_and_snapshot(&service, &blueprint_dataset_name, "simple_blueprint").await;
}
pub async fn register_and_scan_simple_dataset_with_properties(service: impl RerunCloudService) {
let data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
1,
[
LayerDefinition::simple("my_segment_id1", &["my/entity", "my/other/entity"]),
LayerDefinition::simple("my_segment_id2", &["my/entity"]),
LayerDefinition::simple(
"my_segment_id3",
&["my/entity", "another/one", "yet/another/one"],
),
LayerDefinition::properties(
"my_segment_id1",
[prop(
"text_log",
re_sdk_types::archetypes::TextLog::new("i'm segment 1"),
)],
)
.layer_name("props"),
LayerDefinition::properties(
"my_segment_id2",
[
prop(
"text_log",
re_sdk_types::archetypes::TextLog::new("i'm segment 2"),
),
prop(
"points",
re_sdk_types::archetypes::Points2D::new([(0.0, 1.0)]),
),
],
)
.layer_name("props"),
],
);
let dataset_name = "my_dataset1";
service.create_dataset_entry_with_name(dataset_name).await;
service
.register_with_dataset_name(dataset_name, data_sources_def.to_data_sources())
.await;
scan_segment_table_and_snapshot(&service, dataset_name, "simple_with_properties").await;
scan_dataset_manifest_and_snapshot(&service, dataset_name, "simple_with_properties").await;
}
pub async fn register_and_scan_simple_dataset_with_properties_out_of_order(
service: impl RerunCloudService,
) {
let first_logged_data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
10, [LayerDefinition::properties(
"my_segment_id1",
[prop(
"text_log",
re_sdk_types::archetypes::TextLog::new(
"I was logged first, registered last, so I should win",
),
)],
)
.layer_name("prop1")],
);
let first_logged_data_sources = first_logged_data_sources_def.to_data_sources();
let last_logged_data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
20, [LayerDefinition::properties(
"my_segment_id1",
[prop(
"text_log",
re_sdk_types::archetypes::TextLog::new("I was logged last, registered first"),
)],
)
.layer_name("prop2")],
);
let last_logged_data_sources = last_logged_data_sources_def.to_data_sources();
let dataset_name = "my_dataset";
service.create_dataset_entry_with_name(dataset_name).await;
service
.register_with_dataset_name(dataset_name, last_logged_data_sources)
.await;
service
.register_with_dataset_name(dataset_name, first_logged_data_sources)
.await;
let dataset_manifest =
scan_dataset_manifest_and_snapshot(&service, dataset_name, "out_of_order_properties").await;
scan_segment_table_and_snapshot(&service, dataset_name, "out_of_order_properties").await;
let registration_time_col = dataset_manifest
.column_by_name(ScanDatasetManifestResponse::FIELD_REGISTRATION_TIME)
.unwrap()
.downcast_array_ref::<TimestampNanosecondArray>()
.unwrap();
let prop_col = dataset_manifest
.column_by_name("property:text_log:TextLog:text")
.unwrap()
.downcast_array_ref::<ListArray>()
.unwrap();
assert!(registration_time_col.value(0) < registration_time_col.value(1));
assert_eq!(
prop_col
.value(0)
.downcast_array_ref::<StringArray>()
.unwrap()
.value(0),
"I was logged last, registered first"
);
}
pub async fn register_and_scan_simple_dataset_with_layers(service: impl RerunCloudService) {
let data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
1,
[
LayerDefinition::simple("segment1", &["my/entity", "another/one", "yet/another/one"]),
LayerDefinition::simple("segment1", &["extra/entity"]).layer_name("extra"),
LayerDefinition::simple("segment2", &["another/one", "yet/another/one"])
.layer_name("base"),
LayerDefinition::simple("segment2", &["extra/entity"]).layer_name("extra"),
LayerDefinition::simple("segment3", &["i/am/alone"]),
],
);
let dataset_name = "dataset_with_layers";
service.create_dataset_entry_with_name(dataset_name).await;
service
.register_with_dataset_name(dataset_name, data_sources_def.to_data_sources())
.await;
scan_segment_table_and_snapshot(&service, dataset_name, "simple_with_layers").await;
scan_dataset_manifest_and_snapshot(&service, dataset_name, "simple_with_layers").await;
}
pub async fn register_with_prefix(fe: impl RerunCloudService) {
let root_dir = tempfile::tempdir().expect("creating temp dir");
let tuid_prefix1 = 1;
create_simple_recording_in(
tuid_prefix1,
"my_segment_id1",
&["my/entity", "my/other/entity"],
root_dir.path(),
)
.expect("creating recording");
let tuid_prefix2 = 2;
create_simple_recording_in(
tuid_prefix2,
"my_segment_id2",
&["my/entity"],
root_dir.path(),
)
.expect("creating recording");
let tuid_prefix3 = 3;
create_simple_recording_in(
tuid_prefix3,
"my_segment_id3",
&["my/entity", "another/one", "yet/another/one"],
root_dir.path(),
)
.expect("creating recording");
let dataset_name = "my_dataset1";
fe.create_dataset_entry(tonic::Request::new(CreateDatasetEntryRequest {
name: Some(dataset_name.to_owned()),
id: None,
}))
.await
.unwrap();
let root_url =
Url::parse(&format!("file://{}/", root_dir.path().display())).expect("creating root url");
fe.register_with_dataset_name(
dataset_name,
vec![
DataSource {
storage_url: Some(root_url.to_string()),
prefix: true,
layer: None,
typ: DataSourceKind::Rrd as i32,
}, ],
)
.await;
scan_segment_table_and_snapshot(&fe, dataset_name, "register_prefix_segments").await;
scan_dataset_manifest_and_snapshot(&fe, dataset_name, "register_prefix_manifest").await;
}
pub async fn register_and_scan_empty_dataset(service: impl RerunCloudService) {
let dataset_name = "empty_dataset";
service.create_dataset_entry_with_name(dataset_name).await;
scan_segment_table_and_snapshot(&service, dataset_name, "empty").await;
scan_dataset_manifest_and_snapshot(&service, dataset_name, "empty").await;
}
pub async fn register_segment_bumps_timestamp(service: impl RerunCloudService) {
async fn get_dataset_updated_at_nanos(
service: &impl RerunCloudService,
dataset_name: &str,
) -> i64 {
service
.read_dataset_entry(
tonic::Request::new(ReadDatasetEntryRequest {})
.with_entry_name(dataset_name)
.unwrap(),
)
.await
.unwrap()
.into_inner()
.dataset
.unwrap()
.details
.as_ref()
.unwrap()
.updated_at
.as_ref()
.map(|ts| ts.seconds * 1_000_000_000 + ts.nanos as i64)
.unwrap()
}
let dataset_name = "timestamp_test_dataset";
service.create_dataset_entry_with_name(dataset_name).await;
let initial_updated_at_nanos = get_dataset_updated_at_nanos(&service, dataset_name).await;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
1,
[LayerDefinition::simple("segment1", &["my/entity"])],
);
service
.register_with_dataset_name(dataset_name, data_sources_def.to_data_sources())
.await;
let after_register_updated_at_nanos =
get_dataset_updated_at_nanos(&service, dataset_name).await;
assert!(
after_register_updated_at_nanos > initial_updated_at_nanos,
"Timestamp should be updated after registering segment. Initial: {initial_updated_at_nanos}, After register: {after_register_updated_at_nanos}"
);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let layer_data_sources_def = DataSourcesDefinition::new_with_tuid_prefix(
2,
[LayerDefinition::simple("segment1", &["another/entity"]).layer_name("layer2")],
);
service
.register_with_dataset_name(dataset_name, layer_data_sources_def.to_data_sources())
.await;
let after_layer_updated_at_nanos = get_dataset_updated_at_nanos(&service, dataset_name).await;
assert!(
after_layer_updated_at_nanos > after_register_updated_at_nanos,
"Timestamp should be updated after adding a layer. After register: {after_register_updated_at_nanos}, After layer: {after_layer_updated_at_nanos}"
);
}
async fn scan_segment_table_and_snapshot(
service: &impl RerunCloudService,
dataset_name: &str,
snapshot_name: &str,
) -> RecordBatch {
let responses: Vec<_> = service
.scan_segment_table(
tonic::Request::new(ScanSegmentTableRequest {
columns: vec![], })
.with_entry_name(dataset_name)
.unwrap(),
)
.await
.unwrap()
.into_inner()
.try_collect()
.await
.unwrap();
let batches: Vec<RecordBatch> = responses
.into_iter()
.map(|resp| resp.data.unwrap().try_into().unwrap())
.collect_vec();
let batch = arrow::compute::concat_batches(
batches
.first()
.expect("there should be at least one batch")
.schema_ref(),
&batches,
)
.unwrap();
let alleged_schema: Schema = service
.get_segment_table_schema(
tonic::Request::new(GetSegmentTableSchemaRequest {})
.with_entry_name(dataset_name)
.unwrap(),
)
.await
.unwrap()
.into_inner()
.schema
.unwrap()
.try_into()
.unwrap();
assert_eq!(
alleged_schema.fields(),
batch.schema_ref().fields(),
"The actual schema is not consistent with the schema advertised by \
`get_segment_table_schema`.\n\nActual:\n{}\n\nAlleged:\n{}\n",
batch.schema().format_snapshot(),
alleged_schema.format_snapshot(),
);
let required_fields = ScanSegmentTableResponse::fields();
assert!(
batch.schema().fields().contains_unordered(&required_fields),
"the schema should contain all the required fields, but it doesn't",
);
let unstable_column_names = vec![
ScanSegmentTableResponse::FIELD_STORAGE_URLS,
ScanSegmentTableResponse::FIELD_SIZE_BYTES,
ScanSegmentTableResponse::FIELD_LAST_UPDATED_AT,
];
let filtered_batch = batch
.remove_columns(&unstable_column_names)
.auto_sort_rows()
.unwrap()
.sort_property_columns();
insta::assert_snapshot!(
format!("{snapshot_name}_segments_schema"),
batch.format_schema_snapshot()
);
insta::assert_snapshot!(
format!("{snapshot_name}_segments_data"),
filtered_batch.format_snapshot(false)
);
batch
}
async fn scan_dataset_manifest_and_snapshot(
service: &impl RerunCloudService,
dataset_name: &str,
snapshot_name: &str,
) -> RecordBatch {
let responses: Vec<_> = service
.scan_dataset_manifest(
tonic::Request::new(ScanDatasetManifestRequest {
columns: vec![], })
.with_entry_name(dataset_name)
.unwrap(),
)
.await
.unwrap()
.into_inner()
.try_collect()
.await
.unwrap();
let batches: Vec<RecordBatch> = responses
.into_iter()
.map(|resp| resp.data.unwrap().try_into().unwrap())
.collect_vec();
let batch = arrow::compute::concat_batches(
batches
.first()
.expect("there should be at least one batch")
.schema_ref(),
&batches,
)
.unwrap();
let alleged_schema: Schema = service
.get_dataset_manifest_schema(
tonic::Request::new(GetDatasetManifestSchemaRequest {})
.with_entry_name(dataset_name)
.unwrap(),
)
.await
.unwrap()
.into_inner()
.schema
.unwrap()
.try_into()
.unwrap();
assert_eq!(
alleged_schema.fields(),
batch.schema_ref().fields(),
"The actual schema is not consistent with the schema advertised by \
`get_dataset_manifest_schema`.\n\nActual:\n{}\n\nAlleged:\n{}\n",
batch.schema().format_snapshot(),
alleged_schema.format_snapshot(),
);
let required_fields = ScanDatasetManifestResponse::fields();
assert!(
batch.schema().fields().contains_unordered(&required_fields),
"the schema should contain all the required fields, but it doesn't",
);
let unstable_column_names = vec![
ScanDatasetManifestResponse::FIELD_STORAGE_URL,
ScanDatasetManifestResponse::FIELD_SIZE_BYTES,
ScanDatasetManifestResponse::FIELD_LAST_UPDATED_AT,
ScanDatasetManifestResponse::FIELD_REGISTRATION_TIME,
];
let filtered_batch = batch
.remove_columns(&unstable_column_names)
.auto_sort_rows()
.unwrap()
.sort_property_columns();
insta::assert_snapshot!(
format!("{snapshot_name}_manifest_schema"),
batch.format_schema_snapshot()
);
insta::assert_snapshot!(
format!("{snapshot_name}_manifest_data"),
filtered_batch.format_snapshot(false)
);
batch
}