EvidentSource Client
A Rust client library for connecting to EvidentSource servers. This library provides a high-level API for event sourcing with bi-temporal query support.
Features
- Async/await API built on Tokio
- Automatic TLS support for HTTPS connections
- Live subscriptions with automatic reconnection
- Bi-temporal queries (revision time + effective time)
- Speculative transactions for previewing uncommitted changes
- Type-safe domain types from
evidentsource-core
API Layers
This crate provides two API layers:
High-Level API (Recommended)
The client module provides type-safe access using domain types:
use ;
let es = connect_to_server.await?;
let conn = es.connect.await?;
The client module re-exports all commonly used domain types, so you typically
only need one import path.
Low-Level gRPC API (Advanced)
The grpc module provides direct protocol buffer access:
use ;
let mut client = new.await?;
let db = client.fetch_latest_database.await?;
Use this for custom tooling, debugging, or direct gRPC integration.
Installation
Add to your Cargo.toml:
[]
= { = "../client" }
= { = "../core" }
= { = "1", = ["macros", "rt-multi-thread"] }
= "0.3"
Quick Start
use ;
use DatabaseProvider;
use StreamExt;
async
Core Concepts
EvidentSource
The main entry point for connecting to a server. Use it to:
- List available databases
- Create and delete databases
- Connect to a specific database
let es = connect_to_server.await?;
Connection
A long-lived connection to a specific database that maintains a background subscription for live updates. It provides methods to access database state at different revisions.
let conn = es.connect.await?;
DatabaseAtRevision
A point-in-time view of the database at a specific revision. Use it to query events and fetch state views.
let db = conn.latest_database.await?;
let events = db.query_events;
let state = db.view_state.await?;
Bi-temporal Model
EvidentSource supports two time dimensions:
- Revision time: When events were recorded in the database (immutable)
- Effective time: When events are considered to have occurred in the business domain
This allows queries like "what was the account balance on March 1st, as known on March 15th?"
API Guide
Connecting to a Server
use EvidentSource;
// HTTP connection
let es = connect_to_server.await?;
// HTTPS connection (TLS automatically configured)
let es = connect_to_server.await?;
Database Catalog Operations
use DatabaseCatalog;
use StreamExt;
// List all databases
let mut databases = es.list_databases;
while let Some = databases.next.await
// Create a database
let db_identity = es.create_database.await?;
println!;
// Delete a database
es.delete_database.await?;
Connecting to a Database
let conn = es.connect.await?;
// The connection maintains a live subscription for updates
println!;
Working with Revisions
use DatabaseProvider;
// Get the latest revision (network call)
let db = conn.latest_database.await?;
// Get the local cached revision (no network call, uses subscription)
let db = conn.local_database;
// Get a specific revision (waits if not yet available)
let db = conn.database_at_revision.await?;
// Get the revision that was current at a specific timestamp
use ;
let timestamp: = "2024-03-15T10:00:00Z".parse?;
let db = conn.database_at_timestamp.await?;
Querying Events
use EventSelector;
use DatabaseAtRevision;
use StreamExt;
let db = conn.latest_database.await?;
// Query by event type
let selector = event_type_equals?;
let events: = db.query_events.collect.await;
// Query by subject
let selector = subject_equals?;
let events: = db.query_events.collect.await;
// Combine selectors with method chaining
let selector = event_type_equals?
.and;
let events: = db.query_events.collect.await;
// OR combinations
let selector = event_type_equals?
.or;
Fetching State Views
use ;
use DatabaseAtRevision;
let db = conn.latest_database.await?;
// Fetch a state view
let view_name = new?;
let state_view: StateView = db.view_state.await?;
// Access the state view content
if let Some = state_view.content
Bi-temporal Queries (Effective Timestamp)
use ;
use DatabaseAtRevision;
let db = conn.latest_database.await?;
// Scope to an effective timestamp
let effective_time: = "2024-03-01T00:00:00Z".parse?;
let view = db.at_effective_timestamp;
// State views will be computed as-of the effective timestamp
let state = view.view_state.await?;
Speculative Operations
Preview what the database would look like with uncommitted events:
use EventBuilderV10;
use DatabaseAtRevision;
use NonEmpty;
use Uuid;
let db = conn.latest_database.await?;
// Create speculative events using official CloudEvents SDK
// Stream is set via .source() - it's an alias for source in EvidentSource
let stream = "orders/2024-03-15";
let event = new
.id
.source
.ty
.subject
.data
.build?;
// Create a speculative view
let speculative = db.speculate_with_transaction;
// Query events (includes speculated events)
let selector = subject_equals?;
let all_events: = speculative.query_events.collect.await;
// Chain multiple speculative transactions
let another_event = new
.id
.source
.ty
.subject
.data
.build?;
let speculative2 = speculative.speculate_with_transaction;
Creating CloudEvents
Use the official cloudevents-sdk crate for type-safe event construction:
use EventBuilderV10;
use Uuid;
// Build a CloudEvent with the fluent builder API
// Stream is set via .source() - it's an alias for source in EvidentSource
let stream = "orders/2024-03-15";
let event = new
.id
.source
.ty
.subject
.data
.build?;
Transacting Events
use ;
use DatabaseConnection;
use NonEmpty;
// Create events to transact
let events = new;
// Optional: add constraints for optimistic concurrency
let constraints = vec!;
// Commit the transaction
let new_db = conn.transact.await?;
println!;
Executing State Changes
use ;
use DatabaseConnection;
// Using the builder pattern
let state_change = new?;
let request = json?
.header;
let new_db = conn.execute_state_change.await?;
// Or using the state change builder for a fluent API
let new_db = conn
.state_change
.json?
.execute
.await?;
// With content schema for validation
let new_db = conn
.state_change
.json?
.content_schema
.execute
.await?;
Scanning the Database Log
use DatabaseConnection;
use StreamExt;
// Get batch summaries
let mut summaries = conn.log;
while let Some = summaries.next.await
// Get full batch details including events
let mut batches = conn.log_detail;
while let Some = batches.next.await
Testing with MockDatabase
The SDK provides MockDatabase for unit testing state changes without a server:
use ;
use EventSelector;
use NonEmpty;
Error Handling
Client Errors
use Error;
match connect_to_server.await
Database Errors
use DatabaseError;
match es.connect.await
State Change Errors
use StateChangeError;
match conn.execute_state_change.await
Connection Lifecycle
Live Subscriptions
When you call es.connect(&db_name), the connection:
- Fetches the initial database state
- Starts a background task that subscribes to database updates
- Automatically reconnects with exponential backoff if the connection drops
- Updates internal state so
conn.local_database()always returns recent data
Graceful Shutdown
Always close connections when done to clean up background tasks:
// Explicit close with timeout
conn.close.await?;
If not explicitly closed, the background task will be signaled to stop when the Connection is dropped.
Cloning Connections
Connection is cheaply cloneable. All clones share the same underlying subscription:
let conn2 = conn.clone;
// Both conn and conn2 see the same updates
Low-Level gRPC Access
For advanced use cases requiring direct protocol buffer access, use the grpc module:
use ;
// Proto types for manual construction
use *;
use *;
let mut client = new.await?;
// Direct proto methods
let db = client.fetch_latest_database.await?;
Use this for:
- Custom tooling and debugging
- Direct gRPC integration with other systems
- Low-level protocol buffer manipulation
Most applications should use the high-level client module instead.