use crate::common::{fresh_stream_id, generate_events};
use chrono::{Datelike, Utc};
use eventstore::{
Acl, Client, ReadEvent, StreamAclBuilder, StreamMetadataBuilder, StreamMetadataResult,
StreamName, StreamPosition, SubscriptionEvent,
};
use futures::channel::oneshot;
use std::collections::HashMap;
use std::time::Duration;
use tracing::{debug, warn};
async fn test_write_events(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("write_events");
let events = generate_events("write-events-test".to_string(), 3);
let result = client
.append_to_stream(stream_id, &Default::default(), events)
.await?;
debug!("Write response: {:?}", result);
assert_eq!(result.next_expected_version, 2);
Ok(())
}
async fn test_tick_date_conversion(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("ticks_date");
let events = generate_events("about_date_stuff", 1);
client
.append_to_stream(stream_id.as_str(), &Default::default(), events)
.await?;
let mut stream = client
.read_stream(stream_id.as_str(), &Default::default())
.await?;
let event = stream.next().await?.unwrap();
let now = Utc::now();
let created = event.get_original_event().created;
assert_eq!(now.day(), created.day());
assert_eq!(now.year(), created.year());
assert_eq!(now.month(), created.month());
Ok(())
}
async fn test_read_all_stream_events(client: &Client) -> eventstore::Result<()> {
let result = client.read_all(&Default::default()).await?.next().await?;
assert!(result.is_some());
Ok(())
}
async fn test_read_stream_events(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("read_stream_events");
let events = generate_events("es6-read-stream-events-test".to_string(), 10);
let _ = client
.append_to_stream(stream_id.clone(), &Default::default(), events)
.await?;
let mut pos = 0usize;
let mut idx = 0i64;
let mut stream = client.read_stream(stream_id, &Default::default()).await?;
while let Some(event) = stream.next().await? {
let event = event.get_original_event();
let obj: HashMap<String, i64> = event.as_json().unwrap();
let value = obj.get("event_index").unwrap();
idx = *value;
pos += 1;
}
assert_eq!(pos, 10);
assert_eq!(idx, 10);
Ok(())
}
async fn test_read_stream_events_with_position(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("read_position");
let events = generate_events("read_position", 10);
let _ = client
.append_to_stream(stream_id.as_str(), &Default::default(), events)
.await?;
let options = eventstore::ReadStreamOptions::default()
.forwards()
.position(StreamPosition::Start);
let mut stream = client.read_stream(stream_id, &options).await?;
let mut last_stream_position = 0u64;
while let Some(event) = stream.next_read_event().await? {
if let ReadEvent::LastStreamPosition(pos) = event {
last_stream_position = pos;
}
}
assert_eq!(9, last_stream_position);
Ok(())
}
async fn test_read_stream_populates_log_position(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("read_stream_populates_log_position");
let events = generate_events("read_stream_populates_log_position".to_string(), 1);
let write_result = client
.append_to_stream(stream_id.clone(), &Default::default(), events)
.await?;
assert_eq!(write_result.position.prepare, write_result.position.commit);
let mut pos = 0usize;
let mut stream = client.read_stream(stream_id, &Default::default()).await?;
while let Some(event) = stream.next().await? {
let event = event.get_original_event();
assert_eq!(write_result.position, event.position);
pos += 1;
}
assert_eq!(pos, 1);
Ok(())
}
async fn test_metadata(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("metadata");
let events = generate_events("metadata-test".to_string(), 5);
let _ = client
.append_to_stream(stream_id.as_str(), &Default::default(), events)
.await?;
let expected = StreamMetadataBuilder::new()
.max_age(std::time::Duration::from_secs(2))
.acl(Acl::Stream(
StreamAclBuilder::new().add_read_roles("admin").build(),
))
.build();
let _ = client
.set_stream_metadata(stream_id.as_str(), &Default::default(), &expected)
.await?;
let actual = client
.get_stream_metadata(stream_id.as_str(), &Default::default())
.await?;
assert!(actual.is_success());
if let StreamMetadataResult::Success(actual) = actual {
assert_eq!(&expected, actual.metadata());
}
Ok(())
}
async fn test_metadata_not_exist(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("metadata_not_exist");
let events = generate_events("metadata-test-not-exist".to_string(), 5);
let _ = client
.append_to_stream(stream_id.as_str(), &Default::default(), events)
.await?;
let actual = client
.get_stream_metadata(stream_id.as_str(), &Default::default())
.await?;
assert!(actual.is_not_found());
Ok(())
}
async fn test_read_stream_events_non_existent(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("read_stream_events");
let mut stream = client
.read_stream(stream_id.as_str(), &Default::default())
.await?;
if let Err(eventstore::Error::ResourceNotFound) = stream.next().await {
return Ok(());
}
panic!("We expected to have a stream not found result");
}
async fn test_delete_stream(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("delete");
let events = generate_events("delete-test".to_string(), 1);
let _ = client
.append_to_stream(stream_id.clone(), &Default::default(), events)
.await?;
let result = client
.delete_stream(stream_id.as_str(), &Default::default())
.await?;
debug!("Delete stream [{}] result: {:?}", stream_id, result);
Ok(())
}
async fn test_tombstone_stream(client: &Client) -> eventstore::Result<()> {
let stream_id = fresh_stream_id("tombstone");
let events = generate_events("tombstone-test".to_string(), 1);
let _ = client
.append_to_stream(stream_id.clone(), &Default::default(), events)
.await?;
let result = client
.tombstone_stream(stream_id.as_str(), &Default::default())
.await?;
debug!("Tombstone stream [{}] result: {:?}", stream_id, result);
let result = client
.read_stream(stream_id.as_str(), &Default::default())
.await;
if let Err(eventstore::Error::ResourceDeleted) = result {
Ok(())
} else {
panic!("Expected stream deleted error");
}
}
async fn test_subscription(client: &Client) -> eyre::Result<()> {
let stream_id = fresh_stream_id("catchup");
let events_before = generate_events("catchup-test-before".to_string(), 3);
let events_after = generate_events("catchup-test-after".to_string(), 3);
let _ = client
.append_to_stream(stream_id.as_str(), &Default::default(), events_before)
.await?;
let options = eventstore::SubscribeToStreamOptions::default()
.start_from(eventstore::StreamPosition::Start);
let mut sub = client
.subscribe_to_stream(stream_id.as_str(), &options)
.await;
let (tx, recv) = oneshot::channel();
tokio::spawn(async move {
let mut count = 0usize;
let max = 6usize;
loop {
sub.next().await?;
count += 1;
if count == max {
break;
}
}
tx.send(count).unwrap();
Ok(()) as eventstore::Result<()>
});
let _ = client
.append_to_stream(stream_id, &Default::default(), events_after)
.await?;
match tokio::time::timeout(Duration::from_secs(60), recv).await {
Ok(test_count) => {
assert_eq!(
test_count?, 6,
"We are testing proper state after catchup subscription: got {} expected {}.",
test_count?, 6
);
}
Err(_) => panic!("test_subscription timed out!"),
}
Ok(())
}
async fn test_subscription_caughtup(client: &Client) -> eventstore::Result<()> {
let info = client.server_info().await?;
if info.version() < (23, 10) {
warn!(
"test_susbcription_caughtup is skipped because server {} doesn't support it",
info.version()
);
return Ok(());
}
let stream_id = fresh_stream_id("catchup_live_detection").into_stream_name();
let events = generate_events("catchup_live_detected", 10);
let _ = client
.append_to_stream(stream_id.clone(), &Default::default(), events)
.await?;
let options = eventstore::SubscribeToStreamOptions::default()
.start_from(eventstore::StreamPosition::Start);
let mut sub = client
.subscribe_to_stream(stream_id.clone(), &options)
.await;
let (tx, recv) = oneshot::channel();
tokio::spawn(async move {
loop {
if let SubscriptionEvent::CaughtUp = sub.next_subscription_event().await? {
break;
}
}
let _ = tx.send(());
Ok(()) as eventstore::Result<()>
});
if tokio::time::timeout(Duration::from_secs(60), recv)
.await
.is_err()
{
panic!("test_subscription_caughtup timed out!");
}
Ok(())
}
async fn test_subscription_all_filter(client: &Client) -> eventstore::Result<()> {
let filter = eventstore::SubscriptionFilter::on_event_type().exclude_system_events();
let options = eventstore::SubscribeToAllOptions::default()
.position(eventstore::StreamPosition::Start)
.filter(filter);
let mut sub = client.subscribe_to_all(&options).await;
match tokio::time::timeout(Duration::from_secs(60), async move {
let event = sub.next().await?;
assert!(!event.get_original_event().event_type.starts_with('$'));
Ok(()) as eventstore::Result<()>
})
.await
{
Ok(result) => assert!(result.is_ok()),
Err(_) => panic!("we are supposed to receive event from that subscription"),
};
Ok(())
}
async fn test_batch_append(client: &Client) -> eventstore::Result<()> {
let batch_client = client.batch_append(&Default::default()).await?;
for _ in 0..3 {
let stream_id = fresh_stream_id("batch-append");
let events = generate_events("batch-append-type", 3);
let _ = batch_client
.append_to_stream(
stream_id.as_str(),
eventstore::ExpectedRevision::Any,
events,
)
.await?;
let options = eventstore::ReadStreamOptions::default()
.forwards()
.position(eventstore::StreamPosition::Start);
let mut stream = client.read_stream(stream_id.as_str(), &options).await?;
let mut cpt = 0usize;
while let Some(_) = stream.next().await? {
cpt += 1;
}
assert_eq!(cpt, 3, "We expecting 3 events out of those streams");
}
Ok(())
}
pub async fn tests(client: Client) -> eyre::Result<()> {
let info = client.server_info().await?;
debug!("Before test_write_events…");
test_write_events(&client).await?;
debug!("Complete");
debug!("Before test_tick_date_conversion…");
test_tick_date_conversion(&client).await?;
debug!("Complete");
debug!("Before test_all_read_stream_events…");
test_read_all_stream_events(&client).await?;
debug!("Complete");
debug!("Before test_read_stream_events…");
test_read_stream_events(&client).await?;
debug!("Complete");
if info.version() >= (21, 10) {
debug!("Before test_read_stream_events_with_position");
test_read_stream_events_with_position(&client).await?;
debug!("Complete");
}
if info.version() >= 22 {
debug!("Before test_read_stream_populates_log_position");
test_read_stream_populates_log_position(&client).await?;
}
debug!("Complete");
debug!("Before test_read_stream_events_non_existent");
test_read_stream_events_non_existent(&client).await?;
debug!("Complete");
debug!("Before test test_metadata");
test_metadata(&client).await?;
debug!("Complete");
debug!("Before test test_metadata_not_exist");
test_metadata_not_exist(&client).await?;
debug!("Complete");
debug!("Before test_delete_stream…");
test_delete_stream(&client).await?;
debug!("Complete");
debug!("Before test_tombstone_stream…");
test_tombstone_stream(&client).await?;
debug!("Complete");
debug!("Before test_subscription…");
test_subscription(&client).await?;
debug!("Complete");
debug!("Before test_subscription_caughtup…");
test_subscription_caughtup(&client).await?;
debug!("Complete");
debug!("Before test_subscription_all_filter…");
test_subscription_all_filter(&client).await?;
debug!("Complete");
debug!("Before test_batch_append");
if let Err(e) = test_batch_append(&client).await {
if let eventstore::Error::UnsupportedFeature = e {
warn!("batch_append is not supported on the server we are targeting");
Ok(())
} else {
Err(e)
}?;
}
debug!("Complete");
Ok(())
}