umadb-client 0.5.0

gRPC client library for UmaDB event store
Documentation
use umadb_client::UmaDbClient;
use umadb_dcb::{
    DcbAppendCondition, DcbError, DcbEvent, DcbEventStoreSync, DcbQuery, DcbQueryItem, TrackingInfo,
};
use uuid::Uuid;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to the gRPC server
    let url = "https://localhost:50051".to_string();
    let client = UmaDbClient::new(url)
        .ca_path("server.pem".to_string()) // For self-signed server certificates.
        .api_key("umadb:example-api-key-4f7c2b1d9e5f4a038c1a".to_string())
        .connect()?;

    // Define a consistency boundary
    let cb = DcbQuery {
        items: vec![DcbQueryItem {
            types: vec!["example".to_string()],
            tags: vec!["tag1".to_string(), "tag2".to_string()],
        }],
    };

    // Read events for a decision model
    let mut read_response = client.read(Some(cb.clone()), None, false, None, false)?;

    // Build decision model
    while let Some(result) = read_response.next() {
        match result {
            Ok(event) => {
                println!(
                    "Got event at position {}: {:?}",
                    event.position, event.event
                );
            }
            Err(status) => panic!("gRPC stream error: {}", status),
        }
    }

    // Remember the last-known position
    let last_known_position = read_response.head().unwrap();
    println!("Last known position is: {:?}", last_known_position);

    // Produce new event
    let event = DcbEvent {
        event_type: "example".to_string(),
        tags: vec!["tag1".to_string(), "tag2".to_string()],
        data: b"Hello, world!".to_vec(),
        uuid: Some(Uuid::new_v4()),
    };

    // Append event in consistency boundary
    let commit_position1 = client.append(
        vec![event.clone()],
        Some(DcbAppendCondition {
            fail_if_events_match: cb.clone(),
            after: last_known_position,
        }),
        None,
    )?;
    println!("Appended event at position: {}", commit_position1);

    // Append conflicting event - expect an error
    let conflicting_event = DcbEvent {
        event_type: "example".to_string(),
        tags: vec!["tag1".to_string(), "tag2".to_string()],
        data: b"Hello, world!".to_vec(),
        uuid: Some(Uuid::new_v4()), // different UUID
    };
    let conflicting_result = client.append(
        vec![conflicting_event],
        Some(DcbAppendCondition {
            fail_if_events_match: cb.clone(),
            after: last_known_position,
        }),
        None,
    );

    // Expect an integrity error
    match conflicting_result {
        Err(DcbError::IntegrityError(integrity_error)) => {
            println!("Conflicting event was rejected: {:?}", integrity_error);
        }
        other => panic!("Expected IntegrityError, got {:?}", other),
    }

    // Conditional appends with event UUIDs are idempotent.
    println!(
        "Retrying to append event at position: {:?}",
        last_known_position
    );
    let commit_position2 = client.append(
        vec![event.clone()],
        Some(DcbAppendCondition {
            fail_if_events_match: cb.clone(),
            after: last_known_position,
        }),
        None,
    )?;

    if commit_position1 == commit_position2 {
        println!(
            "Append method returned same commit position: {}",
            commit_position2
        );
    } else {
        panic!("Expected idempotent retry!")
    }

    // Subscribe to all events for a projection
    let mut subscription = client.read(None, None, false, None, true)?;

    // Build an up-to-date view
    while let Some(result) = subscription.next() {
        match result {
            Ok(ev) => {
                println!("Processing event at {}: {:?}", ev.position, ev.event);
                if ev.position == commit_position2 {
                    println!("Projection has processed new event!");
                    break;
                }
            }
            Err(status) => panic!("gRPC stream error: {}", status),
        }
    }

    // Track an upstream position
    let upstream_position = client.get_tracking_info("upstream")?;
    let next_upstream_position = upstream_position.unwrap_or(0) + 1;
    println!("Next upstream position: {next_upstream_position}");
    client.append(
        vec![],
        None,
        Some(TrackingInfo {
            source: "upstream".to_string(),
            position: next_upstream_position,
        }),
    )?;
    assert_eq!(
        next_upstream_position,
        client.get_tracking_info("upstream")?.unwrap()
    );
    println!("Upstream position tracked okay!");

    // Try recording the same upstream position
    let conflicting_result = client.append(
        vec![],
        None,
        Some(TrackingInfo {
            source: "upstream".to_string(),
            position: next_upstream_position,
        }),
    );

    // Expect an integrity error
    match conflicting_result {
        Err(DcbError::IntegrityError(integrity_error)) => {
            println!(
                "Conflicting upstream position was rejected: {:?}",
                integrity_error
            );
        }
        other => panic!("Expected IntegrityError, got {:?}", other),
    }

    Ok(())
}