use std::sync::{Arc, Mutex};
use sync42::wait_list::WaitList;
static HEAD_PASSBACK_INITIATED: biometrics::Counter =
biometrics::Counter::new("playground.clicker.head_passback_initiated");
static HEAD_PASSBACK_FINISHED: biometrics::Counter =
biometrics::Counter::new("playground.clicker.head_passback_finished");
static INCREMENT_OUTSTANDING_CALLS: biometrics::Counter =
biometrics::Counter::new("playground.clicker.increment_outstanding_calls");
static DECREMENT_OUTSTANDING_CALLS: biometrics::Counter =
biometrics::Counter::new("playground.clicker.decrement_outstanding_calls");
static BLOCKING_ON_TICKET_ACQUISITION: biometrics::Counter =
biometrics::Counter::new("playground.clicker.blocking_on_ticket_acquisition");
static ABLE_TO_ACQUIRE_TICKETS: biometrics::Counter =
biometrics::Counter::new("playground.clicker.able_to_acquire_tickets");
pub fn register_biometrics(collector: &mut biometrics::Collector) {
sync42::register_biometrics(collector);
collector.register_counter(&HEAD_PASSBACK_INITIATED);
collector.register_counter(&HEAD_PASSBACK_FINISHED);
collector.register_counter(&INCREMENT_OUTSTANDING_CALLS);
collector.register_counter(&DECREMENT_OUTSTANDING_CALLS);
collector.register_counter(&BLOCKING_ON_TICKET_ACQUISITION);
collector.register_counter(&ABLE_TO_ACQUIRE_TICKETS);
}
struct Counter {
count: Mutex<u64>,
}
impl Counter {
const fn new() -> Self {
Self {
count: Mutex::new(1),
}
}
fn count(&self, amount: u64) -> u64 {
let mut count = self.count.lock().unwrap();
let alloc: u64 = *count;
*count = count.wrapping_add(amount);
std::thread::sleep(std::time::Duration::from_millis(1_000));
alloc
}
}
#[derive(Clone, Debug, Default)]
enum WaitState {
#[default]
Present,
CallingCount,
Counted(u64),
}
struct ClickerState {
outstanding_calls: u64,
head_passback: bool,
}
impl ClickerState {
const fn new() -> Self {
Self {
outstanding_calls: 0,
head_passback: false,
}
}
}
struct Clicker<'a> {
concurrent_count_calls: u64,
counter: &'a Counter,
state: Mutex<ClickerState>,
wait_list: WaitList<WaitState>,
}
impl<'a> Clicker<'a> {
fn new(concurrent_count_calls: u64, counter: &'a Counter) -> Self {
Self {
concurrent_count_calls,
counter,
state: Mutex::new(ClickerState::new()),
wait_list: WaitList::new(),
}
}
fn click(&self) -> u64 {
let mut waiter = self.wait_list.link(WaitState::Present);
let tickets = {
let mut state = self.state.lock().unwrap();
'conditions: while !(state.outstanding_calls < self.concurrent_count_calls)
&& !(waiter.is_head() && state.head_passback)
{
match waiter.load() {
Some(WaitState::Present) => {
BLOCKING_ON_TICKET_ACQUISITION.click();
state = waiter.naked_wait(state);
}
Some(WaitState::CallingCount) => {
panic!("CallingCount state achieved before allowed to call count");
}
Some(WaitState::Counted(_)) => {
break 'conditions;
}
None => {
panic!("Our waiter was initialized to WaitState::Present, but the value got lost");
}
}
if state.outstanding_calls < self.concurrent_count_calls && !state.head_passback{
HEAD_PASSBACK_INITIATED.click();
state.head_passback = true;
INCREMENT_OUTSTANDING_CALLS.click();
state.outstanding_calls += 1;
self.wait_list.notify_head();
}
}
if let Some(WaitState::Counted(x)) = waiter.load() {
self.wait_list.unlink(waiter);
self.wait_list.notify_head();
return x;
}
ABLE_TO_ACQUIRE_TICKETS.click();
let mut count = 0;
for mut w in waiter.iter() {
match w.load() {
Some(WaitState::Present) => {
count += 1;
}
Some(WaitState::CallingCount) | Some(WaitState::Counted(_)) => {
break;
}
None => {
panic!("Our waiter was initialized to WaitState::Present, but the value got lost");
}
};
}
waiter.store(WaitState::CallingCount);
if waiter.is_head() && state.head_passback {
HEAD_PASSBACK_FINISHED.click();
state.head_passback = false;
} else {
INCREMENT_OUTSTANDING_CALLS.click();
state.outstanding_calls += 1;
}
count
};
let ticket_run_start = self.counter.count(tickets);
for (mut w, ticket) in
std::iter::zip(waiter.iter(), ticket_run_start..ticket_run_start + tickets)
{
w.store(WaitState::Counted(ticket))
}
if let Some(WaitState::Counted(x)) = waiter.load() {
self.wait_list.unlink(waiter);
{
let mut state = self.state.lock().unwrap();
if !state.head_passback {
HEAD_PASSBACK_INITIATED.click();
state.head_passback = true;
} else {
state.outstanding_calls -= 1;
DECREMENT_OUTSTANDING_CALLS.click();
}
}
self.wait_list.notify_head();
return x;
}
panic!("We gave everyone except ourselves a ticket.");
}
}
fn main() {
std::thread::spawn(|| {
let mut collector = biometrics::Collector::new();
register_biometrics(&mut collector);
let fout = std::fs::File::create("/dev/stdout").unwrap();
let mut emit = biometrics::PlainTextEmitter::new(fout);
loop {
if let Err(e) = collector.emit(&mut emit) {
eprintln!("collector error: {}", e);
}
std::thread::sleep(std::time::Duration::from_millis(250));
}
});
const NUMBER_OF_THREADS: u64 = sync42::MAX_CONCURRENCY as u64 / 2;
const CONCURRENT_COUNT: u64 = 1;
static COUNTER: Counter = Counter::new();
let clicker: Arc<Clicker> = Arc::new(Clicker::new(CONCURRENT_COUNT, &COUNTER));
let mut threads = Vec::new();
for _ in 0..NUMBER_OF_THREADS {
let c = Arc::clone(&clicker);
threads.push(std::thread::spawn(move || loop {
let ticket = c.click();
println!("{}", ticket);
if ticket > 1_000_000 {
break;
}
}));
}
for thread in threads.into_iter() {
thread.join().unwrap();
}
}