eventdbx 1.7.13

An event-sourced, key-value, write-side database system.
Documentation
use std::{net::TcpListener, time::Duration};

use base64::{Engine, engine::general_purpose::STANDARD};
use eventdbx::{
    config::Config,
    plugin::PluginManager,
    server,
    token::{IssueTokenInput, TokenManager},
};
use reqwest::Client;
use serde_json::{Value, json};
use tempfile::TempDir;
use tokio::{task::JoinHandle, time::sleep};

type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;

#[tokio::test(flavor = "multi_thread")]
async fn graphql_append_and_query_flow() -> TestResult<()> {
    let temp = TempDir::new()?;
    let mut config = Config::default();
    config.data_dir = temp.path().join("data");
    let port = match allocate_port() {
        Ok(port) => port,
        Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
            eprintln!("skipping graphql regression test: port binding not permitted ({err})");
            return Ok(());
        }
        Err(err) => return Err(err.into()),
    };
    config.port = port;
    config.restrict = false;
    config.data_encryption_key = Some(STANDARD.encode([42u8; 32]));
    config.ensure_data_dir()?;

    let encryptor = config
        .encryption_key()?
        .expect("encryption key should be configured");
    let token_manager = TokenManager::load(config.tokens_path(), Some(encryptor.clone()))?;
    let token = token_manager
        .issue(IssueTokenInput {
            group: "testers".into(),
            user: "graphql-regression".into(),
            expiration_secs: Some(3600),
            limit: None,
            keep_alive: true,
        })?
        .token;
    drop(token_manager);

    let plugins = PluginManager::from_config(&config)?;
    let server_handle = spawn_server(config.clone(), plugins)?;

    let base_url = format!("http://127.0.0.1:{}", config.port);
    wait_for_health(&base_url).await?;
    let client = Client::new();

    let aggregate_type = "graphql-person";
    let aggregate_id = "gp-001";
    let mutation = r#"
        mutation Append($input: AppendEventInput!) {
            appendEvent(input: $input) {
                aggregateType
                aggregateId
                eventType
                version
                payload
                merkleRoot
            }
        }
    "#;

    let event_payload = json!({
        "status": "active",
        "notes": "Created via GraphQL regression test"
    });

    let mutation_response = graphql_call(
        &client,
        &base_url,
        Some(&token),
        mutation,
        json!({ "input": {
            "aggregateType": aggregate_type,
            "aggregateId": aggregate_id,
            "eventType": "person-upserted",
            "payload": event_payload.clone()
        }}),
    )
    .await?;

    assert!(
        mutation_response.get("errors").is_none(),
        "unexpected GraphQL errors: {:?}",
        mutation_response
    );
    let append_event = &mutation_response["data"]["appendEvent"];
    assert_eq!(
        append_event["aggregateType"].as_str(),
        Some(aggregate_type),
        "aggregate type should match"
    );
    assert_eq!(
        append_event["aggregateId"].as_str(),
        Some(aggregate_id),
        "aggregate id should match"
    );
    assert_eq!(
        append_event["eventType"].as_str(),
        Some("person-upserted"),
        "event type should match"
    );
    assert_eq!(
        append_event["version"].as_u64(),
        Some(1),
        "version should start at 1"
    );
    let merkle_root = append_event["merkleRoot"]
        .as_str()
        .expect("merkle root should be present")
        .to_string();

    let aggregates_query = r#"
        query Aggregates($take: Int!) {
            aggregates(take: $take) {
                aggregateType
                aggregateId
                version
                state
            }
        }
    "#;
    let aggregates_response = graphql_call(
        &client,
        &base_url,
        None,
        aggregates_query,
        json!({ "take": 10 }),
    )
    .await?;
    assert!(
        aggregates_response.get("errors").is_none(),
        "aggregates query should succeed: {:?}",
        aggregates_response
    );
    let aggregates = aggregates_response["data"]["aggregates"]
        .as_array()
        .expect("aggregates response should be array");
    let found = aggregates.iter().find(|entry| {
        entry["aggregateType"].as_str() == Some(aggregate_type)
            && entry["aggregateId"].as_str() == Some(aggregate_id)
    });
    let aggregate_entry = found.expect("aggregate should be returned by aggregates query");
    assert_eq!(
        aggregate_entry["version"].as_u64(),
        Some(1),
        "aggregate version should be 1"
    );
    assert_eq!(
        aggregate_entry["state"]["status"].as_str(),
        Some("active"),
        "aggregate state should include persisted status"
    );

    let aggregate_query = r#"
        query Aggregate($aggregateType: String!, $aggregateId: String!) {
            aggregate(aggregateType: $aggregateType, aggregateId: $aggregateId) {
                aggregateType
                aggregateId
                version
                state
            }
        }
    "#;
    let aggregate_response = graphql_call(
        &client,
        &base_url,
        None,
        aggregate_query,
        json!({
            "aggregateType": aggregate_type,
            "aggregateId": aggregate_id
        }),
    )
    .await?;
    assert!(
        aggregate_response.get("errors").is_none(),
        "aggregate query should succeed: {:?}",
        aggregate_response
    );
    let aggregate = aggregate_response["data"]["aggregate"]
        .as_object()
        .expect("aggregate query should return an object");
    assert_eq!(
        aggregate.get("aggregateType").and_then(Value::as_str),
        Some(aggregate_type),
        "aggregate query type should match"
    );
    assert_eq!(
        aggregate.get("aggregateId").and_then(Value::as_str),
        Some(aggregate_id),
        "aggregate query id should match"
    );
    assert_eq!(
        aggregate.get("version").and_then(Value::as_u64),
        Some(1),
        "aggregate version should be 1"
    );
    let aggregate_state = aggregate
        .get("state")
        .and_then(Value::as_object)
        .expect("aggregate state should be an object");
    assert_eq!(
        aggregate_state.get("notes").and_then(Value::as_str),
        Some("Created via GraphQL regression test"),
        "aggregate state should retain notes"
    );

    let events_query = r#"
        query Events($aggregateType: String!, $aggregateId: String!) {
            aggregateEvents(aggregateType: $aggregateType, aggregateId: $aggregateId) {
                eventType
                version
                payload
                merkleRoot
            }
        }
    "#;
    let events_response = graphql_call(
        &client,
        &base_url,
        None,
        events_query,
        json!({
            "aggregateType": aggregate_type,
            "aggregateId": aggregate_id
        }),
    )
    .await?;
    assert!(
        events_response.get("errors").is_none(),
        "events query should succeed: {:?}",
        events_response
    );
    let events = events_response["data"]["aggregateEvents"]
        .as_array()
        .expect("events response should be array");
    assert_eq!(events.len(), 1, "should be exactly one event");
    let first_event = &events[0];
    assert_eq!(
        first_event["eventType"].as_str(),
        Some("person-upserted"),
        "event type should match seeded value"
    );
    assert_eq!(
        first_event["version"].as_u64(),
        Some(1),
        "event version should be 1"
    );
    assert_eq!(
        first_event["payload"]["status"].as_str(),
        Some("active"),
        "payload status should match"
    );
    assert_eq!(
        first_event["merkleRoot"].as_str(),
        Some(merkle_root.as_str()),
        "event merkle root should align with mutation output"
    );

    let verify_query = r#"
        query Verify($aggregateType: String!, $aggregateId: String!) {
            verifyAggregate(aggregateType: $aggregateType, aggregateId: $aggregateId) {
                merkleRoot
            }
        }
    "#;
    let verify_response = graphql_call(
        &client,
        &base_url,
        None,
        verify_query,
        json!({
            "aggregateType": aggregate_type,
            "aggregateId": aggregate_id
        }),
    )
    .await?;
    let verify_merkle = verify_response["data"]["verifyAggregate"]["merkleRoot"]
        .as_str()
        .expect("verifyAggregate should return merkle root");
    assert_eq!(
        verify_merkle, merkle_root,
        "verifyAggregate should return the event merkle root"
    );

    let unauthorized_response = graphql_call(
        &client,
        &base_url,
        None,
        mutation,
        json!({ "input": {
            "aggregateType": aggregate_type,
            "aggregateId": "gp-unauthorized",
            "eventType": "person-upserted",
            "payload": event_payload
        }}),
    )
    .await?;
    let errors = unauthorized_response["errors"]
        .as_array()
        .expect("unauthorized mutation should return errors");
    assert!(
        errors
            .iter()
            .any(|err| err["message"].as_str() == Some("unauthorized")),
        "unauthorized mutation should include unauthorized error"
    );

    server_handle.abort();
    let _ = server_handle.await;
    Ok(())
}

fn allocate_port() -> std::io::Result<u16> {
    let listener = TcpListener::bind(("127.0.0.1", 0))?;
    let port = listener.local_addr()?.port();
    drop(listener);
    Ok(port)
}

fn spawn_server(
    config: Config,
    plugins: PluginManager,
) -> TestResult<JoinHandle<eventdbx::error::Result<()>>> {
    Ok(tokio::spawn(
        async move { server::run(config, plugins).await },
    ))
}

async fn wait_for_health(base_url: &str) -> TestResult<()> {
    let client = Client::new();
    for _ in 0..40 {
        if let Ok(resp) = client.get(format!("{base_url}/health")).send().await {
            if resp.status().is_success() {
                return Ok(());
            }
        }
        sleep(Duration::from_millis(100)).await;
    }
    Err("server did not become healthy in time".into())
}

async fn graphql_call(
    client: &Client,
    base_url: &str,
    token: Option<&str>,
    query: &str,
    variables: Value,
) -> TestResult<Value> {
    let mut request = client
        .post(format!("{base_url}/graphql"))
        .header("Content-Type", "application/json");
    if let Some(token) = token {
        request = request.bearer_auth(token);
    }
    let response = request
        .json(&json!({ "query": query, "variables": variables }))
        .send()
        .await?;

    let status = response.status();
    if !status.is_success() {
        let body = response.text().await.unwrap_or_default();
        return Err(format!("graphql request failed ({status}): {body}").into());
    }

    let payload: Value = response.json().await?;
    Ok(payload)
}