use std::fmt::Display;
use rand::{thread_rng, Rng};
use thingvellir::{
service_builder, Commit, CommitToUpstream, DataCommitRequest, DataLoadRequest,
DefaultCommitPolicy, LoadFromUpstream, ServiceData, ShardStats,
};
use tokio::time::{sleep, Duration};
#[derive(Clone, Default)]
struct InMemoryUpstream {}
impl<Key: Display, Data: Default> LoadFromUpstream<Key, Data> for InMemoryUpstream {
fn load(&mut self, request: DataLoadRequest<Key, Data>) {
request.resolve(Default::default());
}
}
impl<Key: Display, Data: Default> CommitToUpstream<Key, Data> for InMemoryUpstream {
fn commit<'a>(&mut self, request: DataCommitRequest<'a, Key, Data>) {
request.resolve();
}
}
#[derive(Default, Clone)]
struct Counter {
s: u64,
}
impl Counter {
#[allow(dead_code)]
fn incr(&mut self) -> u64 {
self.s += 1;
self.s
}
fn get(&self) -> u64 {
self.s
}
}
impl ServiceData for Counter {}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut service = service_builder::<u64, Counter>(25000).build_mutable(
InMemoryUpstream::default(),
DefaultCommitPolicy::Within(Duration::from_secs(30)),
);
let _result = service
.execute_mut(1, |n| Commit::immediately(n.get()))
.await
.unwrap();
for _ in 0..32 {
let mut handle = service.handle();
tokio::spawn(async move {
loop {
let n = thread_rng().gen_range(0, 5000);
let _result = handle
.execute_mut(n, |n| Commit::immediately(n.get()))
.await
.unwrap();
}
});
}
let _result = service.execute(100, |n| n.get()).await.unwrap();
let _result = service.execute_if_cached(100, |n| n.get()).await.unwrap();
let mut prev_combined_hits = 0;
loop {
sleep(Duration::from_secs(1)).await;
let now = std::time::Instant::now();
let stats = service.get_shard_stats().await.unwrap();
let elapsed = now.elapsed();
println!("per shard stats (latency: {:?}): {:?}", elapsed, stats);
let merged = ShardStats::merge_stats(stats);
let delta = merged.executions_complete - prev_combined_hits;
prev_combined_hits = merged.executions_complete;
println!(
"combined = {:?}, {}, {} per sec",
merged, merged.executions_complete, delta
);
}
}