eventdbx 2.0.3

An event-sourced, key-value, write-side database system.
Documentation
use std::{io, net::TcpListener, path::PathBuf, time::Duration};

use base64::{Engine, engine::general_purpose::STANDARD};
use eventdbx::{
    api_grpc::proto::{
        AppendEventRequest, GetAggregateRequest, ListAggregatesRequest, ListEventsRequest,
        VerifyAggregateRequest, event_service_client::EventServiceClient,
    },
    config::Config,
    restrict::RestrictMode,
    server,
    token::{IssueTokenInput, JwtLimits, TokenManager},
};
use serde_json::json;
use tempfile::TempDir;
use tokio::{task::JoinHandle, time::sleep};
use tonic::{Code, Request, Status};

type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;

#[tokio::test(flavor = "multi_thread")]
async fn grpc_append_and_query_flow() -> TestResult<()> {
    let temp = TempDir::new()?;
    let mut config = Config::default();
    config.data_dir = temp.path().join("data");
    let http_port = match allocate_port() {
        Ok(port) => port,
        Err(err) if err.kind() == io::ErrorKind::PermissionDenied => {
            eprintln!("skipping grpc regression test: port binding not permitted ({err})");
            return Ok(());
        }
        Err(err) => return Err(err.into()),
    };
    let socket_port = match allocate_port() {
        Ok(port) => port,
        Err(err) if err.kind() == io::ErrorKind::PermissionDenied => {
            eprintln!("skipping grpc regression test: port binding not permitted ({err})");
            return Ok(());
        }
        Err(err) => return Err(err.into()),
    };
    config.port = http_port;
    config.restrict = RestrictMode::Off;
    config.data_encryption_key = Some(STANDARD.encode([7u8; 32]));
    config.grpc.bind_addr = format!("127.0.0.1:{http_port}");
    config.socket.bind_addr = format!("127.0.0.1:{socket_port}");
    config.ensure_data_dir()?;
    let config_path = temp.path().join("config.toml");
    config.save(&config_path)?;

    let encryptor = config
        .encryption_key()?
        .expect("encryption key should be configured");
    let jwt_config = config.jwt_manager_config()?;
    let token_manager = TokenManager::load(
        jwt_config,
        config.tokens_path(),
        config.jwt_revocations_path(),
        Some(encryptor.clone()),
    )?;
    let token = token_manager
        .issue(IssueTokenInput {
            subject: "testers:grpc-regression".into(),
            group: "testers".into(),
            user: "grpc-regression".into(),
            root: true,
            actions: Vec::new(),
            resources: Vec::new(),
            ttl_secs: Some(3600),
            not_before: None,
            issued_by: "tests".into(),
            limits: JwtLimits {
                write_events: None,
                keep_alive: true,
            },
        })?
        .token;
    drop(token_manager);

    let server_handle = spawn_server(config.clone(), config_path.clone())?;

    let base_url = format!("http://127.0.0.1:{}", http_port);
    wait_for_http_health(&base_url).await?;

    let endpoint = format!("http://127.0.0.1:{http_port}");
    let mut client = wait_for_grpc(&endpoint).await?;

    let aggregate_type = "grpc-person";
    let aggregate_id = "gp-001";
    let event_note = "gRPC regression bootstrap";
    let payload_json = json!({
        "status": "active",
        "notes": "Created via gRPC regression test"
    })
    .to_string();

    let mut append_request = Request::new(AppendEventRequest {
        aggregate_type: aggregate_type.to_string(),
        aggregate_id: aggregate_id.to_string(),
        event_type: "person-upserted".to_string(),
        payload_json: payload_json.clone(),
        note: Some(event_note.to_string()),
        patch_json: None,
    });
    set_bearer(&mut append_request, &token)?;
    let append_response = client.append_event(append_request).await?;
    let event = append_response
        .into_inner()
        .event
        .expect("appendEvent should return an event");

    assert_eq!(event.aggregate_type, aggregate_type);
    assert_eq!(event.aggregate_id, aggregate_id);
    assert_eq!(event.event_type, "person-upserted");
    assert_eq!(event.version, 1);
    let metadata = event
        .metadata
        .as_ref()
        .expect("event metadata should be present");
    assert_eq!(metadata.note.as_deref(), Some(event_note));
    let event_payload: serde_json::Value = serde_json::from_str(&event.payload_json)?;
    assert_eq!(event_payload["notes"], "Created via gRPC regression test");
    let first_merkle_root = event.merkle_root.clone();

    let aggregates_response = client
        .list_aggregates(ListAggregatesRequest { skip: 0, take: 10 })
        .await?
        .into_inner();
    let aggregate_entry = aggregates_response
        .aggregates
        .iter()
        .find(|agg| agg.aggregate_type == aggregate_type && agg.aggregate_id == aggregate_id)
        .expect("aggregate should appear in aggregates list");
    assert_eq!(aggregate_entry.version, 1);
    assert_eq!(
        aggregate_entry.state.get("status").map(String::as_str),
        Some("active")
    );

    let aggregate_response = client
        .get_aggregate(GetAggregateRequest {
            aggregate_type: aggregate_type.to_string(),
            aggregate_id: aggregate_id.to_string(),
        })
        .await?
        .into_inner()
        .aggregate
        .expect("aggregate query should return a record");
    assert_eq!(aggregate_response.version, 1);
    assert_eq!(
        aggregate_response.state.get("notes").map(String::as_str),
        Some("Created via gRPC regression test")
    );

    let patch_document = json!([
        { "op": "replace", "path": "/status", "value": "inactive" },
        { "op": "add", "path": "/contact", "value": { "address": { "city": "Spokane" } } }
    ])
    .to_string();
    let patch_note = "gRPC regression patch";

    let mut patch_request = Request::new(AppendEventRequest {
        aggregate_type: aggregate_type.to_string(),
        aggregate_id: aggregate_id.to_string(),
        event_type: "person-patched".to_string(),
        payload_json: String::new(),
        note: Some(patch_note.to_string()),
        patch_json: Some(patch_document.clone()),
    });
    set_bearer(&mut patch_request, &token)?;
    let patch_response = client.append_event(patch_request).await?;
    let patch_event = patch_response
        .into_inner()
        .event
        .expect("patch append should return an event");
    let merkle_root = patch_event.merkle_root.clone();
    let patch_metadata = patch_event
        .metadata
        .as_ref()
        .expect("patch metadata should be present");
    assert_eq!(patch_metadata.note.as_deref(), Some(patch_note));

    let events_response = client
        .list_events(ListEventsRequest {
            aggregate_type: aggregate_type.to_string(),
            aggregate_id: aggregate_id.to_string(),
            skip: 0,
            take: 10,
        })
        .await?
        .into_inner();
    assert_eq!(events_response.events.len(), 2);
    let first_event = &events_response.events[0];
    assert_eq!(first_event.event_type, "person-upserted");
    assert_eq!(first_event.version, 1);
    assert_eq!(first_event.merkle_root, first_merkle_root);
    assert_eq!(
        first_event
            .metadata
            .as_ref()
            .and_then(|meta| meta.note.as_deref()),
        Some(event_note)
    );
    let first_payload: serde_json::Value = serde_json::from_str(&first_event.payload_json)?;
    assert_eq!(
        first_payload["status"], "active",
        "payload status should match"
    );
    let second_event = &events_response.events[1];
    assert_eq!(second_event.event_type, "person-patched");
    assert_eq!(second_event.version, 2);
    assert_eq!(
        second_event
            .metadata
            .as_ref()
            .and_then(|meta| meta.note.as_deref()),
        Some(patch_note)
    );
    let second_payload: serde_json::Value = serde_json::from_str(&second_event.payload_json)?;
    assert_eq!(second_payload["status"], "inactive");
    assert_eq!(
        second_payload["contact"]["address"]["city"], "Spokane",
        "patch should update nested contact address"
    );

    let aggregate_after_patch = client
        .get_aggregate(GetAggregateRequest {
            aggregate_type: aggregate_type.to_string(),
            aggregate_id: aggregate_id.to_string(),
        })
        .await?
        .into_inner()
        .aggregate
        .expect("aggregate query after patch should return a record");
    assert_eq!(aggregate_after_patch.version, 2);
    assert_eq!(
        aggregate_after_patch
            .state
            .get("status")
            .map(String::as_str),
        Some("inactive")
    );

    let verify_response = client
        .verify_aggregate(VerifyAggregateRequest {
            aggregate_type: aggregate_type.to_string(),
            aggregate_id: aggregate_id.to_string(),
        })
        .await?
        .into_inner();
    assert_eq!(verify_response.merkle_root, merkle_root);

    let unauthorized_result = client
        .append_event(Request::new(AppendEventRequest {
            aggregate_type: aggregate_type.to_string(),
            aggregate_id: "gp-unauthorized".to_string(),
            event_type: "person-upserted".to_string(),
            payload_json,
            note: Some(event_note.to_string()),
            patch_json: None,
        }))
        .await;
    match unauthorized_result {
        Err(status) => assert_eq!(status.code(), Code::Unauthenticated),
        Ok(_) => panic!("unauthorized append should fail"),
    }

    server_handle.abort();
    let _ = server_handle.await;

    Ok(())
}

fn allocate_port() -> std::io::Result<u16> {
    let listener = TcpListener::bind(("127.0.0.1", 0))?;
    let port = listener.local_addr()?.port();
    drop(listener);
    Ok(port)
}

fn spawn_server(
    config: Config,
    config_path: PathBuf,
) -> TestResult<JoinHandle<eventdbx::error::Result<()>>> {
    Ok(tokio::spawn(async move {
        server::run(config, config_path).await
    }))
}

async fn wait_for_http_health(base_url: &str) -> TestResult<()> {
    for _ in 0..40 {
        if let Ok(resp) = reqwest::get(format!("{base_url}/health")).await {
            if resp.status().is_success() {
                return Ok(());
            }
        }
        sleep(Duration::from_millis(100)).await;
    }
    Err("server did not become healthy in time".into())
}

async fn wait_for_grpc(
    endpoint: &str,
) -> TestResult<EventServiceClient<tonic::transport::Channel>> {
    for _ in 0..40 {
        match EventServiceClient::connect(endpoint.to_string()).await {
            Ok(client) => return Ok(client),
            Err(_) => sleep(Duration::from_millis(100)).await,
        }
    }
    Err("grpc server did not become ready in time".into())
}

fn set_bearer<T>(request: &mut Request<T>, token: &str) -> Result<(), Status> {
    let value = format!("Bearer {token}");
    let header_value = value
        .parse()
        .map_err(|_| Status::unauthenticated("invalid bearer token"))?;
    request.metadata_mut().insert("authorization", header_value);
    Ok(())
}