tonbo 0.4.0-a1

Embedded database for serverless and edge runtimes, storing data as Parquet on S3
Documentation
#![cfg(test)]

use std::{
    env,
    path::PathBuf,
    sync::Arc,
    time::{Duration, SystemTime, UNIX_EPOCH},
};

use fusio::{DynFs, disk::LocalFs, path::Path as FusioPath};

use crate::{
    db::{AwsCreds, ObjectSpec, S3Spec, WalConfig},
    wal::{WalSyncPolicy, state::FsWalStateStore},
};

/// Local filesystem backend harness metadata.
pub struct LocalHarness {
    pub root: PathBuf,
    pub wal_dir: PathBuf,
    pub wal_config: WalConfig,
    pub cleanup: Option<Box<dyn FnOnce() + Send>>,
}

/// S3/object-store backend harness metadata.
pub struct S3Harness {
    pub object: ObjectSpec,
    pub wal_config: WalConfig,
}

type S3Env = (
    Option<String>,
    String,
    String,
    String,
    String,
    bool,
    Option<String>,
);

/// Common WAL tuning for e2e forcing small segments and fast flush.
pub fn wal_tuning(policy: WalSyncPolicy) -> WalConfig {
    WalConfig::default()
        .segment_max_bytes(256)
        .flush_interval(Duration::from_millis(1))
        .sync_policy(policy)
}

fn workspace_temp_dir(prefix: &str) -> PathBuf {
    let base = std::env::current_dir().expect("cwd");
    let dir = base.join("target").join("tmp").join(format!(
        "{prefix}-{}",
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("time")
            .as_nanos()
    ));
    std::fs::create_dir_all(&dir).expect("create workspace temp dir");
    dir
}

/// Local filesystem backend harness.
pub fn local_harness(
    label: &str,
    wal_cfg: WalConfig,
) -> Result<LocalHarness, Box<dyn std::error::Error>> {
    let root = workspace_temp_dir(label);
    let wal_dir = root.join("wal");
    std::fs::create_dir_all(&wal_dir)?;

    let wal_path = FusioPath::from_filesystem_path(&wal_dir)?;
    let wal_fs = Arc::new(LocalFs {});
    let wal_backend: Arc<dyn DynFs> = wal_fs.clone();
    let wal_state = Arc::new(FsWalStateStore::new(wal_fs));
    let wal_config = wal_cfg
        .clone()
        .wal_dir(wal_path)
        .segment_backend(wal_backend)
        .state_store(Some(wal_state));

    Ok(LocalHarness {
        root: root.clone(),
        wal_dir,
        wal_config,
        cleanup: Some(Box::new(move || {
            if let Err(err) = std::fs::remove_dir_all(&root) {
                eprintln!("cleanup failed for {:?}: {err}", &root);
            }
        })),
    })
}

fn s3_env() -> Option<S3Env> {
    let endpoint = env::var("TONBO_S3_ENDPOINT").ok();
    let bucket = env::var("TONBO_S3_BUCKET").ok()?;
    let region = env::var("TONBO_S3_REGION").ok()?;
    let access = env::var("TONBO_S3_ACCESS_KEY").ok()?;
    let secret = env::var("TONBO_S3_SECRET_KEY").ok()?;
    let s3_express = matches!(
        env::var("TONBO_S3_EXPRESS").ok().as_deref(),
        Some("1" | "true" | "TRUE")
    );
    let session = env::var("TONBO_S3_SESSION_TOKEN").ok();
    Some((
        endpoint, bucket, region, access, secret, s3_express, session,
    ))
}

fn unique_label(base: &str) -> String {
    let nanos = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_else(|_| Duration::from_secs(0))
        .as_nanos();
    format!("{base}-{nanos}")
}

/// S3/object-store backend harness. Returns None when env is not present.
pub fn maybe_s3_harness(
    label: &str,
    wal_cfg: WalConfig,
) -> Result<Option<S3Harness>, Box<dyn std::error::Error>> {
    let Some((endpoint, bucket, region, access, secret, s3_express, session)) = s3_env() else {
        return Ok(None);
    };

    let credentials = match session {
        Some(token) => AwsCreds::with_session_token(access, secret, token),
        None => AwsCreds::new(access, secret),
    };

    let mut s3 = S3Spec::new(bucket.clone(), unique_label(label), credentials);
    s3.endpoint = endpoint;
    s3.region = Some(region);
    s3.s3_express = Some(s3_express);
    s3.sign_payload = Some(true);

    let object = ObjectSpec::s3(s3);

    Ok(Some(S3Harness {
        object,
        wal_config: wal_cfg,
    }))
}