marple-db 0.2.1

Rust SDK for the MarpleDB API
Documentation
use marple_db::{Dataset, ImportStatus, MarpleDB, Metadata, PushFileOptions, UploadModeOverride};
use serde_json::{Value, json};
use std::env;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

const DEFAULT_MDB_URL: &str = "https://db.marpledata.com/api/v1";
const TEST_STREAM_PREFIX: &str = "Salty Compulsory RustSdkTest";
const MIB: u64 = 1024 * 1024;
const MULTIPART_THRESHOLD: u64 = 128 * MIB;

static INTEGRATION_TEST_LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();

fn load_env_files() {
    dotenvy::dotenv().ok();
    dotenvy::from_path("../../python/.env").ok();
}

fn env_opt(name: &str) -> Option<String> {
    env::var(name).ok().filter(|s| !s.trim().is_empty())
}

fn maybe_skip_integration() -> Option<(String, String)> {
    load_env_files();
    let token = env_opt("MDB_TOKEN")?;
    let url = env_opt("MDB_URL").unwrap_or_else(|| DEFAULT_MDB_URL.to_string());
    Some((token, url))
}

fn db(token: &str, url: &str) -> anyhow::Result<MarpleDB> {
    Ok(MarpleDB::new(url, token)?)
}

async fn integration_test_guard() -> tokio::sync::MutexGuard<'static, ()> {
    INTEGRATION_TEST_LOCK
        .get_or_init(|| tokio::sync::Mutex::new(()))
        .lock()
        .await
}

fn unique_stream_name(suffix: &str) -> String {
    let ts = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .expect("clock")
        .as_secs();
    format!("{TEST_STREAM_PREFIX} {suffix} {ts}")
}

fn example_csv_path() -> PathBuf {
    Path::new(env!("CARGO_MANIFEST_DIR"))
        .join("../../test_data/examples_race.csv")
        .canonicalize()
        .expect("example CSV path")
}

async fn cleanup_streams(db: &MarpleDB) -> anyhow::Result<()> {
    for stream in db.get_streams().await? {
        if !stream.name.starts_with(TEST_STREAM_PREFIX) {
            continue;
        }
        let _ = db
            .post::<_, Value>(&format!("/stream/{}/delete", stream.id), &json!({}))
            .await;
    }
    Ok(())
}

async fn create_test_stream(db: &MarpleDB, suffix: &str) -> anyhow::Result<marple_db::Stream> {
    let options: Metadata = [("plugin_args".to_string(), json!("--use-index"))]
        .into_iter()
        .collect();
    Ok(db
        .create_stream(&unique_stream_name(suffix), &options)
        .await?)
}

async fn run_with_cleanup<F, Fut>(db: &MarpleDB, flow: F) -> anyhow::Result<()>
where
    F: FnOnce() -> Fut,
    Fut: std::future::Future<Output = anyhow::Result<()>>,
{
    let _guard = integration_test_guard().await;
    cleanup_streams(db).await?;

    let result = flow().await;
    let cleanup_result = cleanup_streams(db).await;

    if let Err(cleanup_error) = cleanup_result {
        if result.is_ok() {
            return Err(cleanup_error);
        }
        eprintln!("Rust SDK integration cleanup failed: {cleanup_error:#}");
    }

    result
}

async fn upload_and_assert_dataset(
    db: &MarpleDB,
    stream_id: i32,
    file_path: &Path,
    options: PushFileOptions,
    expected_metadata: &[(&str, &str)],
    exercise_generic_endpoints: bool,
) -> anyhow::Result<Dataset> {
    anyhow::ensure!(file_path.exists(), "test CSV missing at {:?}", file_path);
    let expected_size = fs::metadata(file_path)?.len();
    anyhow::ensure!(expected_size > 0, "test CSV is empty");

    let dataset = db.push_file(stream_id, file_path, options).await?;
    let dataset = db
        .wait_for_import(stream_id, dataset.id, Duration::from_secs(180))
        .await?;
    anyhow::ensure!(
        dataset.import_status == ImportStatus::Finished,
        "ingest failed"
    );

    for (key, value) in expected_metadata {
        anyhow::ensure!(
            dataset.metadata.get(*key).and_then(Value::as_str) == Some(*value),
            "dataset metadata missing expected {key} value"
        );
    }

    let backup_size = dataset
        .backup_size
        .ok_or_else(|| anyhow::anyhow!("finished dataset has no backup_size"))?;
    anyhow::ensure!(
        backup_size == expected_size,
        "backup_size mismatch: source csv is {expected_size} bytes, backup_size is {backup_size} bytes"
    );

    anyhow::ensure!(
        db.get_datasets(stream_id)
            .await?
            .iter()
            .any(|candidate| candidate.id == dataset.id),
        "dataset id not found in dataset list"
    );

    if exercise_generic_endpoints {
        let query = "select path, stream_id, metadata from mdb_default_dataset limit 1;";
        db.post::<_, Value>("/query", &json!({ "query": query }))
            .await?;

        let signals: Value = db
            .get(
                &format!("/stream/{}/dataset/{}/signals", stream_id, dataset.id),
                &(),
            )
            .await?;
        anyhow::ensure!(
            signals
                .as_array()
                .is_some_and(|signals| !signals.is_empty()),
            "signals response should be a non-empty array"
        );
    }

    let download_url = db.get_download_link(&dataset).await?;
    let response = db.storage_client().get(download_url).send().await?;
    anyhow::ensure!(
        response.status().is_success(),
        "download URL returned status {}",
        response.status()
    );
    let downloaded = response.bytes().await?;
    let downloaded_size = downloaded.len() as u64;
    anyhow::ensure!(
        downloaded_size == expected_size,
        "downloaded file size mismatch: source csv is {expected_size} bytes, downloaded file is {downloaded_size} bytes"
    );
    anyhow::ensure!(
        downloaded_size == backup_size,
        "downloaded file size mismatch: backup_size is {backup_size} bytes, downloaded file is {downloaded_size} bytes"
    );

    Ok(dataset)
}

fn generate_multipart_csv(output_dir: &Path) -> anyhow::Result<PathBuf> {
    let source = fs::read(example_csv_path())?;
    anyhow::ensure!(!source.is_empty(), "source CSV is empty");

    let output_path = output_dir.join("multipart-examples-race.csv");
    let repeat_count = (MULTIPART_THRESHOLD / source.len() as u64) + 1;
    let mut output = fs::File::create(&output_path)?;

    for _ in 0..repeat_count {
        output.write_all(&source)?;
    }
    output.flush()?;

    anyhow::ensure!(
        fs::metadata(&output_path)?.len() > MULTIPART_THRESHOLD,
        "generated multipart test file does not exceed multipart threshold"
    );
    Ok(output_path)
}

#[tokio::test]
async fn test_sdk_health_and_streams() -> anyhow::Result<()> {
    let _guard = integration_test_guard().await;
    let Some((token, url)) = maybe_skip_integration() else {
        eprintln!("Skipping Rust SDK integration test: missing env var MDB_TOKEN");
        return Ok(());
    };

    let db = db(&token, &url)?;
    assert_eq!(db.health().await?.status, "healthy");
    db.get_streams().await?;

    let invalid_db = MarpleDB::new(&url, "invalid_token")?;
    assert!(invalid_db.get_streams().await.is_err());

    Ok(())
}

#[tokio::test]
async fn test_sdk_auto_upload_flow() -> anyhow::Result<()> {
    let Some((token, url)) = maybe_skip_integration() else {
        eprintln!("Skipping Rust SDK integration test: missing env var MDB_TOKEN");
        return Ok(());
    };

    let db = db(&token, &url)?;
    run_with_cleanup(&db, || async { run_auto_upload_flow(&db).await }).await
}

#[tokio::test]
async fn test_sdk_server_upload_flow() -> anyhow::Result<()> {
    let Some((token, url)) = maybe_skip_integration() else {
        eprintln!("Skipping Rust SDK integration test: missing env var MDB_TOKEN");
        return Ok(());
    };

    let db = db(&token, &url)?;
    run_with_cleanup(&db, || async { run_server_upload_flow(&db).await }).await
}

#[tokio::test]
async fn test_sdk_multipart_upload_flow() -> anyhow::Result<()> {
    let Some((token, url)) = maybe_skip_integration() else {
        eprintln!("Skipping Rust SDK integration test: missing env var MDB_TOKEN");
        return Ok(());
    };

    let db = db(&token, &url)?;
    run_with_cleanup(&db, || async { run_multipart_upload_flow(&db).await }).await
}

async fn run_auto_upload_flow(db: &MarpleDB) -> anyhow::Result<()> {
    let csv_path = example_csv_path();
    let stream = create_test_stream(db, "auto").await?;

    let fetched = db.get_stream(&stream.name).await?;
    anyhow::ensure!(fetched.id == stream.id, "fetched stream id mismatch");

    anyhow::ensure!(
        db.get_streams()
            .await?
            .iter()
            .any(|candidate| candidate.id == stream.id),
        "created stream not found in stream list"
    );

    let update_options: Metadata = [("name".to_string(), json!(stream.name.clone()))]
        .into_iter()
        .collect();
    let updated = db.update_stream(stream.id, &update_options).await?;
    anyhow::ensure!(updated.id == stream.id, "updated stream id mismatch");

    let metadata_deployment = "integration-test";
    let metadata_foo = "Bar";
    upload_and_assert_dataset(
        db,
        stream.id,
        &csv_path,
        PushFileOptions::builder()
            .metadata([
                ("Deployment".to_string(), json!(metadata_deployment)),
                ("Foo".to_string(), json!(metadata_foo)),
            ])
            .build(),
        &[("Deployment", metadata_deployment), ("Foo", metadata_foo)],
        true,
    )
    .await?;

    Ok(())
}

async fn run_server_upload_flow(db: &MarpleDB) -> anyhow::Result<()> {
    let csv_path = example_csv_path();
    let stream = create_test_stream(db, "server").await?;
    let upload_mode = "server";

    upload_and_assert_dataset(
        db,
        stream.id,
        &csv_path,
        PushFileOptions::builder()
            .metadata([("upload_mode".to_string(), json!(upload_mode))])
            .upload_mode(UploadModeOverride::Server)
            .build(),
        &[("upload_mode", upload_mode)],
        false,
    )
    .await?;

    Ok(())
}

async fn run_multipart_upload_flow(db: &MarpleDB) -> anyhow::Result<()> {
    let tmp = tempfile::tempdir()?;
    let csv_path = generate_multipart_csv(tmp.path())?;
    let stream = create_test_stream(db, "multipart").await?;
    let upload_mode = "multipart";

    upload_and_assert_dataset(
        db,
        stream.id,
        &csv_path,
        PushFileOptions::builder()
            .metadata([("upload_mode".to_string(), json!(upload_mode))])
            .build(),
        &[("upload_mode", upload_mode)],
        false,
    )
    .await?;

    Ok(())
}