use umadb_client::UmaDbClient;
use umadb_dcb::{
DcbAppendCondition, DcbError, DcbEvent, DcbEventStoreSync, DcbQuery, DcbQueryItem, TrackingInfo,
};
use uuid::Uuid;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = "https://localhost:50051".to_string();
let client = UmaDbClient::new(url)
.ca_path("server.pem".to_string()) .api_key("umadb:example-api-key-4f7c2b1d9e5f4a038c1a".to_string())
.connect()?;
let cb = DcbQuery {
items: vec![DcbQueryItem {
types: vec!["example".to_string()],
tags: vec!["tag1".to_string(), "tag2".to_string()],
}],
};
let mut read_response = client.read(Some(cb.clone()), None, false, None, false)?;
while let Some(result) = read_response.next() {
match result {
Ok(event) => {
println!(
"Got event at position {}: {:?}",
event.position, event.event
);
}
Err(status) => panic!("gRPC stream error: {}", status),
}
}
let last_known_position = read_response.head().unwrap();
println!("Last known position is: {:?}", last_known_position);
let event = DcbEvent {
event_type: "example".to_string(),
tags: vec!["tag1".to_string(), "tag2".to_string()],
data: b"Hello, world!".to_vec(),
uuid: Some(Uuid::new_v4()),
};
let commit_position1 = client.append(
vec![event.clone()],
Some(DcbAppendCondition {
fail_if_events_match: cb.clone(),
after: last_known_position,
}),
None,
)?;
println!("Appended event at position: {}", commit_position1);
let conflicting_event = DcbEvent {
event_type: "example".to_string(),
tags: vec!["tag1".to_string(), "tag2".to_string()],
data: b"Hello, world!".to_vec(),
uuid: Some(Uuid::new_v4()), };
let conflicting_result = client.append(
vec![conflicting_event],
Some(DcbAppendCondition {
fail_if_events_match: cb.clone(),
after: last_known_position,
}),
None,
);
match conflicting_result {
Err(DcbError::IntegrityError(integrity_error)) => {
println!("Conflicting event was rejected: {:?}", integrity_error);
}
other => panic!("Expected IntegrityError, got {:?}", other),
}
println!(
"Retrying to append event at position: {:?}",
last_known_position
);
let commit_position2 = client.append(
vec![event.clone()],
Some(DcbAppendCondition {
fail_if_events_match: cb.clone(),
after: last_known_position,
}),
None,
)?;
if commit_position1 == commit_position2 {
println!(
"Append method returned same commit position: {}",
commit_position2
);
} else {
panic!("Expected idempotent retry!")
}
let mut subscription = client.read(None, None, false, None, true)?;
while let Some(result) = subscription.next() {
match result {
Ok(ev) => {
println!("Processing event at {}: {:?}", ev.position, ev.event);
if ev.position == commit_position2 {
println!("Projection has processed new event!");
break;
}
}
Err(status) => panic!("gRPC stream error: {}", status),
}
}
let upstream_position = client.get_tracking_info("upstream")?;
let next_upstream_position = upstream_position.unwrap_or(0) + 1;
println!("Next upstream position: {next_upstream_position}");
client.append(
vec![],
None,
Some(TrackingInfo {
source: "upstream".to_string(),
position: next_upstream_position,
}),
)?;
assert_eq!(
next_upstream_position,
client.get_tracking_info("upstream")?.unwrap()
);
println!("Upstream position tracked okay!");
let conflicting_result = client.append(
vec![],
None,
Some(TrackingInfo {
source: "upstream".to_string(),
position: next_upstream_position,
}),
);
match conflicting_result {
Err(DcbError::IntegrityError(integrity_error)) => {
println!(
"Conflicting upstream position was rejected: {:?}",
integrity_error
);
}
other => panic!("Expected IntegrityError, got {:?}", other),
}
Ok(())
}