#[macro_use]
extern crate lazy_static;
use std::borrow::Cow;
use std::ops::Deref;
use std::thread;
use futures::prelude::*;
use rand::{rngs::ThreadRng, seq::SliceRandom};
use foundationdb as fdb;
use foundationdb::tuple::{pack, unpack, Subspace};
use foundationdb::{Database, FdbError, RangeOption, TransactError, TransactOption, Transaction};
type Result<T> = std::result::Result<T, Error>;
enum Error {
Internal(FdbError),
NoRemainingSeats,
TooManyClasses,
}
impl From<FdbError> for Error {
fn from(err: FdbError) -> Self {
Error::Internal(err)
}
}
impl TransactError for Error {
fn try_into_fdb_error(self) -> std::result::Result<FdbError, Self> {
match self {
Error::Internal(err) => Ok(err),
_ => Err(self),
}
}
}
const LEVELS: &[&str] = &[
"intro",
"for dummies",
"remedial",
"101",
"201",
"301",
"mastery",
"lab",
"seminar",
];
const TYPES: &[&str] = &[
"chem", "bio", "cs", "geometry", "calc", "alg", "film", "music", "art", "dance",
];
const TIMES: &[&str] = &[
"2:00", "3:00", "4:00", "5:00", "6:00", "7:00", "8:00", "9:00", "10:00", "11:00", "12:00",
"13:00", "14:00", "15:00", "16:00", "17:00", "18:00", "19:00",
];
lazy_static! {
static ref ALL_CLASSES: Vec<String> = all_classes();
}
fn all_classes() -> Vec<String> {
let mut class_names: Vec<String> = Vec::new();
for level in LEVELS {
for _type in TYPES {
for time in TIMES {
class_names.push(format!("{} {} {}", time, _type, level));
}
}
}
class_names
}
fn init_classes(trx: &Transaction, all_classes: &[String]) {
let class_subspace = Subspace::from("class");
for class in all_classes {
trx.set(&class_subspace.pack(class), &pack(&100_i64));
}
}
async fn init(db: &Database, all_classes: &[String]) {
let trx = db.create_trx().expect("could not create transaction");
trx.clear_subspace_range(&"attends".into());
trx.clear_subspace_range(&"class".into());
init_classes(&trx, all_classes);
trx.commit().await.expect("failed to initialize data");
}
async fn get_available_classes(db: &Database) -> Vec<String> {
let trx = db.create_trx().expect("could not create transaction");
let range = RangeOption::from(&Subspace::from("class"));
let got_range = trx
.get_range(&range, 1_024, false)
.await
.expect("failed to get classes");
let mut available_classes = Vec::<String>::new();
for key_value in got_range.iter() {
let count: i64 = unpack(key_value.value()).expect("failed to decode count");
if count > 0 {
let class: String = unpack(key_value.key()).expect("failed to decode class");
available_classes.push(class);
}
}
available_classes
}
async fn ditch_trx(trx: &Transaction, student: &str, class: &str) {
let attends_key = pack(&("attends", student, class));
if trx
.get(&attends_key, true)
.await
.expect("get failed")
.is_none()
{
return;
}
let class_key = pack(&("class", class));
let available_seats = trx
.get(&class_key, true)
.await
.expect("get failed")
.expect("class seats were not initialized");
let available_seats: i64 =
unpack::<i64>(available_seats.deref()).expect("failed to decode i64") + 1;
trx.set(&class_key, &pack(&available_seats));
trx.clear(&attends_key);
}
async fn ditch(db: &Database, student: String, class: String) -> Result<()> {
db.transact_boxed_local(
(student, class),
move |trx, (student, class)| ditch_trx(trx, student, class).map(|_| Ok(())).boxed_local(),
fdb::TransactOption::default(),
)
.await
}
async fn signup_trx(trx: &Transaction, student: &str, class: &str) -> Result<()> {
let attends_key = pack(&("attends", student, class));
if trx
.get(&attends_key, true)
.await
.expect("get failed")
.is_some()
{
return Ok(());
}
let class_key = pack(&("class", class));
let available_seats: i64 = unpack(
&trx.get(&class_key, true)
.await
.expect("get failed")
.expect("class seats were not initialized"),
)
.expect("failed to decode i64");
if available_seats <= 0 {
return Err(Error::NoRemainingSeats);
}
let attends_range = RangeOption::from(&("attends", &student).into());
if trx
.get_range(&attends_range, 1_024, false)
.await
.expect("get_range failed")
.len()
>= 5
{
return Err(Error::TooManyClasses);
}
trx.set(&class_key, &pack(&(available_seats - 1)));
trx.set(&attends_key, &pack(&""));
Ok(())
}
async fn signup(db: &Database, student: String, class: String) -> Result<()> {
db.transact_boxed_local(
(student, class),
|trx, (student, class)| signup_trx(trx, student, class).boxed_local(),
TransactOption::default(),
)
.await
}
async fn switch_classes(
db: &Database,
student_id: String,
old_class: String,
new_class: String,
) -> Result<()> {
async fn switch_classes_body(
trx: &Transaction,
student_id: &str,
old_class: &str,
new_class: &str,
) -> Result<()> {
ditch_trx(trx, student_id, old_class).await;
signup_trx(trx, student_id, new_class).await?;
Ok(())
}
db.transact_boxed_local(
(student_id, old_class, new_class),
move |trx, (student_id, old_class, new_class)| {
switch_classes_body(trx, student_id, old_class, new_class).boxed_local()
},
TransactOption::default(),
)
.await
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum Mood {
Add,
Ditch,
Switch,
}
async fn perform_op(
db: &Database,
rng: &mut ThreadRng,
mood: Mood,
student_id: &str,
all_classes: &[String],
my_classes: &mut Vec<String>,
) -> Result<()> {
match mood {
Mood::Add => {
let class = all_classes.choose(rng).unwrap();
signup(db, student_id.to_string(), class.to_string()).await?;
my_classes.push(class.to_string());
}
Mood::Ditch => {
let class = all_classes.choose(rng).unwrap();
ditch(db, student_id.to_string(), class.to_string()).await?;
my_classes.retain(|s| s != class);
}
Mood::Switch => {
let old_class = my_classes.choose(rng).unwrap().to_string();
let new_class = all_classes.choose(rng).unwrap();
switch_classes(
db,
student_id.to_string(),
old_class.to_string(),
new_class.to_string(),
)
.await?;
my_classes.retain(|s| s != &old_class);
my_classes.push(new_class.to_string());
}
}
Ok(())
}
async fn simulate_students(student_id: usize, num_ops: usize) {
let db = Database::new_compat(None)
.await
.expect("failed to get database");
let student_id = format!("s{}", student_id);
let mut rng = rand::thread_rng();
let mut available_classes = Cow::Borrowed(&*ALL_CLASSES);
let mut my_classes = Vec::<String>::new();
for _ in 0..num_ops {
let mut moods = Vec::<Mood>::new();
if !my_classes.is_empty() {
moods.push(Mood::Ditch);
moods.push(Mood::Switch);
}
if my_classes.len() < 5 {
moods.push(Mood::Add);
}
let mood = moods.choose(&mut rng).copied().unwrap();
if perform_op(
&db,
&mut rng,
mood,
&student_id,
&available_classes,
&mut my_classes,
)
.await
.is_err()
{
println!("getting available classes");
available_classes = Cow::Owned(get_available_classes(&db).await);
}
}
}
async fn run_sim(db: &Database, students: usize, ops_per_student: usize) {
let mut threads: Vec<(usize, thread::JoinHandle<()>)> = Vec::with_capacity(students);
for i in 0..students {
threads.push((
i,
thread::spawn(move || {
futures::executor::block_on(simulate_students(i, ops_per_student));
}),
));
}
for (id, thread) in threads {
thread.join().expect("failed to join thread");
let student_id = format!("s{}", id);
let attends_range = RangeOption::from(&("attends", &student_id).into());
for key_value in db
.create_trx()
.unwrap()
.get_range(&attends_range, 1_024, false)
.await
.expect("get_range failed")
.iter()
{
let (_, s, class) = unpack::<(String, String, String)>(key_value.key()).unwrap();
assert_eq!(student_id, s);
println!("{} is taking: {}", student_id, class);
}
}
println!("Ran {} transactions", students * ops_per_student);
}
#[tokio::main]
async fn main() {
let _guard = unsafe { fdb::boot() };
let db = fdb::Database::new_compat(None)
.await
.expect("failed to get database");
init(&db, &*ALL_CLASSES).await;
println!("Initialized");
run_sim(&db, 10, 10).await;
}