use chrono::Utc;
use object_store::path::Path as ObjPath;
use scouter_dataframe::{parquet::tracing::service::TraceSpanService, storage::ObjectStore};
use scouter_mocks::generate_trace_with_spans;
use scouter_settings::ObjectStorageSettings;
use scouter_types::sql::TraceSpan;
const CLOUD_SCHEMES: &[&str] = &["gs://", "s3://", "az://"];
fn storage_prefix(settings: &ObjectStorageSettings) -> Option<String> {
CLOUD_SCHEMES
.iter()
.find_map(|scheme| settings.storage_uri.strip_prefix(scheme))
.and_then(|rest| rest.split_once('/').map(|(_, path)| path.to_string()))
}
async fn cleanup_remote(settings: &ObjectStorageSettings) {
let store = match ObjectStore::new(settings) {
Ok(s) => s,
Err(_) => return,
};
let prefix = storage_prefix(settings);
let list_path = prefix.as_deref().map(ObjPath::from);
if let Ok(files) = store.list(list_path.as_ref()).await {
for file in files {
let _ = store.delete(&ObjPath::from(file.as_str())).await;
}
}
}
async fn run_cloud_integration_test(settings: &ObjectStorageSettings, label: &str) {
cleanup_remote(settings).await;
let service = TraceSpanService::new(settings, 24, Some(2), None, 10)
.await
.unwrap_or_else(|e| panic!("Failed to initialize TraceSpanService on {label}: {e}"));
let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
let first_trace_id = spans.first().unwrap().trace_id;
service
.write_spans_direct(spans)
.await
.unwrap_or_else(|e| panic!("Failed to write spans to {label}: {e}"));
let start = Utc::now() - chrono::Duration::hours(1);
let end = Utc::now() + chrono::Duration::hours(1);
let result_spans: Vec<TraceSpan> = service
.query_service
.get_trace_spans(
Some(first_trace_id.as_bytes()),
None,
Some(&start),
Some(&end),
None,
)
.await
.unwrap_or_else(|e| panic!("Failed to query spans from {label}: {e}"));
assert!(
!result_spans.is_empty(),
"[{label}] Expected ≥1 span but got 0. trace_id={:?}",
first_trace_id
);
for span in &result_spans {
assert!(
span.start_time > start && span.start_time < end,
"[{label}] Span timestamp outside query window: {:?}",
span.start_time
);
}
service
.shutdown()
.await
.unwrap_or_else(|e| panic!("[{label}] Shutdown failed: {e}"));
cleanup_remote(settings).await;
let store = ObjectStore::new(settings).unwrap();
let prefix = storage_prefix(settings);
let list_path = prefix.as_deref().map(ObjPath::from);
let remaining = store.list(list_path.as_ref()).await.unwrap_or_default();
assert!(
remaining.is_empty(),
"[{label}] Expected empty prefix after cleanup but found {} file(s): {:?}",
remaining.len(),
remaining
);
}
#[tokio::test]
async fn test_trace_service_gcs_integration() {
if !std::env::var("SCOUTER_STORAGE_URI")
.unwrap_or_default()
.starts_with("gs://")
{
eprintln!("Skipping GCS test: SCOUTER_STORAGE_URI not set to gs://");
return;
}
run_cloud_integration_test(&ObjectStorageSettings::default(), "GCS").await;
}
#[tokio::test]
async fn test_trace_service_s3_integration() {
if !std::env::var("SCOUTER_STORAGE_URI")
.unwrap_or_default()
.starts_with("s3://")
{
eprintln!("Skipping S3 test: SCOUTER_STORAGE_URI not set to s3://");
return;
}
run_cloud_integration_test(&ObjectStorageSettings::default(), "S3").await;
}
#[tokio::test]
async fn test_trace_service_azure_integration() {
if !std::env::var("SCOUTER_STORAGE_URI")
.unwrap_or_default()
.starts_with("az://")
{
eprintln!("Skipping Azure test: SCOUTER_STORAGE_URI not set to az://");
return;
}
run_cloud_integration_test(&ObjectStorageSettings::default(), "Azure").await;
}