use crate::{CheckerBuilder, CheckerVisitor, Fingerprint, fingerprint, Model, Property};
use crate::checker::{Checker, EventuallyBits, Expectation, Path};
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use nohash_hasher::NoHashHasher;
use parking_lot::{Condvar, Mutex};
use std::collections::{HashMap, VecDeque};
use std::hash::{BuildHasherDefault, Hash};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
pub(crate) struct BfsChecker<M: Model> {
model: Arc<M>,
thread_count: usize,
handles: Vec<std::thread::JoinHandle<()>>,
job_market: Arc<Mutex<JobMarket<M::State>>>,
state_count: Arc<AtomicUsize>,
generated: Arc<DashMap<Fingerprint, Option<Fingerprint>, BuildHasherDefault<NoHashHasher<u64>>>>,
discoveries: Arc<DashMap<&'static str, Fingerprint>>,
}
struct JobMarket<State> { wait_count: usize, jobs: Vec<Job<State>> }
type Job<State> = VecDeque<(State, Fingerprint, EventuallyBits)>;
impl<M> BfsChecker<M>
where M: Model + Send + Sync + 'static,
M::State: Hash + Send + 'static,
{
pub(crate) fn spawn(options: CheckerBuilder<M>) -> Self {
let model = Arc::new(options.model);
let target_state_count = options.target_state_count;
let thread_count = options.thread_count;
let visitor = Arc::new(options.visitor);
let property_count = model.properties().len();
let init_states: Vec<_> = model.init_states().into_iter()
.filter(|s| model.within_boundary(s))
.collect();
let state_count = Arc::new(AtomicUsize::new(init_states.len()));
let generated = Arc::new({
let generated = DashMap::default();
for s in &init_states { generated.insert(fingerprint(s), None); }
generated
});
let ebits = {
let mut ebits = EventuallyBits::new();
for (i, p) in model.properties().iter().enumerate() {
if let Property { expectation: Expectation::Eventually, .. } = p {
ebits.insert(i);
}
}
ebits
};
let pending: VecDeque<_> = init_states.into_iter()
.map(|s| {
let fp = fingerprint(&s);
(s, fp, ebits.clone())
})
.collect();
let discoveries = Arc::new(DashMap::default());
let mut handles = Vec::new();
let has_new_job = Arc::new(Condvar::new());
let job_market = Arc::new(Mutex::new(JobMarket {
wait_count: thread_count,
jobs: vec![pending],
}));
for t in 0..thread_count {
let model = Arc::clone(&model);
let visitor = Arc::clone(&visitor);
let has_new_job = Arc::clone(&has_new_job);
let job_market = Arc::clone(&job_market);
let state_count = Arc::clone(&state_count);
let generated = Arc::clone(&generated);
let discoveries = Arc::clone(&discoveries);
handles.push(std::thread::spawn(move || {
log::debug!("{}: Thread started.", t);
let mut pending = VecDeque::new();
loop {
if pending.is_empty() {
pending = {
let mut job_market = job_market.lock();
match job_market.jobs.pop() {
None => {
if job_market.wait_count == thread_count {
log::debug!("{}: No more work. Shutting down... gen={}", t, generated.len());
has_new_job.notify_all();
return
}
log::trace!("{}: No jobs. Awaiting. blocked={}", t, job_market.wait_count);
has_new_job.wait(&mut job_market);
continue
}
Some(job) => {
job_market.wait_count -= 1;
log::trace!("{}: Job found. size={}, blocked={}", t, job.len(), job_market.wait_count);
job
}
}
};
}
Self::check_block(
&*model,
&*state_count,
&*generated,
&mut pending,
&*discoveries,
&*visitor,
1500);
if discoveries.len() == property_count {
log::debug!("{}: Discovery complete. Shutting down... gen={}", t, generated.len());
let mut job_market = job_market.lock();
job_market.wait_count += 1;
drop(job_market);
has_new_job.notify_all();
return
}
if let Some(target_state_count) = target_state_count {
if target_state_count.get() <= state_count.load(Ordering::Relaxed) {
log::debug!("{}: Reached target state count. Shutting down... gen={}",
t, generated.len());
return;
}
}
if pending.len() > 1 && thread_count > 1 {
let mut job_market = job_market.lock();
let pieces = 1 + std::cmp::min(job_market.wait_count as usize, pending.len());
let size = pending.len() / pieces;
for _ in 1..pieces {
log::trace!("{}: Sharing work. blocked={}, size={}", t, job_market.wait_count, size);
job_market.jobs.push(pending.split_off(pending.len() - size));
has_new_job.notify_one();
}
} else if pending.is_empty() {
let mut job_market = job_market.lock();
job_market.wait_count += 1;
}
}
}));
}
BfsChecker {
model,
thread_count,
handles,
job_market,
state_count,
generated,
discoveries,
}
}
fn check_block(
model: &M,
state_count: &AtomicUsize,
generated: &DashMap<Fingerprint, Option<Fingerprint>, BuildHasherDefault<NoHashHasher<u64>>>,
pending: &mut Job<M::State>,
discoveries: &DashMap<&'static str, Fingerprint>,
visitor: &Option<Box<dyn CheckerVisitor<M> + Send + Sync>>,
mut max_count: usize)
{
let properties = model.properties();
let mut actions = Vec::new();
loop {
if max_count == 0 { return }
max_count -= 1;
let (state, state_fp, mut ebits) = match pending.pop_back() {
None => return,
Some(pair) => pair,
};
if let Some(visitor) = visitor {
visitor.visit(model, reconstruct_path(model, generated, state_fp));
}
let mut is_awaiting_discoveries = false;
for (i, property) in properties.iter().enumerate() {
if discoveries.contains_key(property.name) { continue }
match property {
Property { expectation: Expectation::Always, condition: always, .. } => {
if !always(model, &state) {
discoveries.insert(property.name, state_fp);
} else {
is_awaiting_discoveries = true;
}
},
Property { expectation: Expectation::Sometimes, condition: sometimes, .. } => {
if sometimes(model, &state) {
discoveries.insert(property.name, state_fp);
} else {
is_awaiting_discoveries = true;
}
},
Property { expectation: Expectation::Eventually, condition: eventually, .. } => {
is_awaiting_discoveries = true;
if eventually(model, &state) {
ebits.remove(i);
}
}
}
}
if !is_awaiting_discoveries { return }
let mut is_terminal = true;
model.actions(&state, &mut actions);
let next_states = actions.drain(..).flat_map(|a| model.next_state(&state, a));
for next_state in next_states {
if !model.within_boundary(&next_state) { continue }
state_count.fetch_add(1, Ordering::Relaxed);
let next_fingerprint = fingerprint(&next_state);
if let Entry::Vacant(next_entry) = generated.entry(next_fingerprint) {
next_entry.insert(Some(state_fp));
} else {
is_terminal = false;
continue
}
is_terminal = false;
pending.push_front((next_state, next_fingerprint, ebits.clone()));
}
if is_terminal {
for (i, property) in properties.iter().enumerate() {
if ebits.contains(i) {
discoveries.insert(property.name, state_fp);
}
}
}
}
}
}
impl<M> Checker<M> for BfsChecker<M>
where M: Model,
M::State: Hash,
{
fn model(&self) -> &M { &self.model }
fn state_count(&self) -> usize {
self.state_count.load(Ordering::Relaxed)
}
fn unique_state_count(&self) -> usize { self.generated.len() }
fn discoveries(&self) -> HashMap<&'static str, Path<M::State, M::Action>> {
self.discoveries.iter()
.map(|mapref| {
(
<&'static str>::clone(mapref.key()),
reconstruct_path(self.model(), &*self.generated, *mapref.value()),
)
})
.collect()
}
fn join(mut self) -> Self {
for h in self.handles.drain(0..) {
h.join().unwrap();
}
self
}
fn is_done(&self) -> bool {
let job_market = self.job_market.lock();
job_market.jobs.is_empty() && job_market.wait_count == self.thread_count
|| self.discoveries.len() == self.model.properties().len()
}
}
fn reconstruct_path<M>(
model: &M,
generated: &DashMap<Fingerprint, Option<Fingerprint>, BuildHasherDefault<NoHashHasher<u64>>>,
fp: Fingerprint)
-> Path<M::State, M::Action>
where M: Model,
M::State: Hash,
{
let mut fingerprints = VecDeque::new();
let mut next_fp = fp;
while let Some(source) = generated.get(&next_fp) {
match *source {
Some(prev_fingerprint) => {
fingerprints.push_front(next_fp);
next_fp = prev_fingerprint;
},
None => {
fingerprints.push_front(next_fp);
break;
},
}
}
Path::from_fingerprints(model, fingerprints)
}
#[cfg(test)]
mod test {
use super::*;
use crate::*;
use crate::test_util::linear_equation_solver::*;
#[test]
fn visits_states_in_bfs_order() {
let (recorder, accessor) = StateRecorder::new_with_accessor();
LinearEquation { a: 2, b: 10, c: 14 }.checker()
.visitor(recorder)
.spawn_bfs().join();
assert_eq!(
accessor(),
vec![
(0, 0), (1, 0), (0, 1), (2, 0), (1, 1), (0, 2), (3, 0), (2, 1), ]);
}
#[test]
fn can_complete_by_enumerating_all_states() {
let checker = LinearEquation { a: 2, b: 4, c: 7 }.checker().spawn_bfs().join();
assert_eq!(checker.is_done(), true);
checker.assert_no_discovery("solvable");
assert_eq!(checker.unique_state_count(), 256 * 256);
}
#[test]
fn can_complete_by_eliminating_properties() {
let checker = LinearEquation { a: 2, b: 10, c: 14 }.checker().spawn_bfs().join();
checker.assert_properties();
assert_eq!(checker.unique_state_count(), 12);
assert_eq!(
checker.discovery("solvable").unwrap().into_actions(),
vec![
Guess::IncreaseX,
Guess::IncreaseX,
Guess::IncreaseY,
]);
checker.assert_discovery(
"solvable",
vec![Guess::IncreaseY; 27]);
}
}