#[macro_use]
extern crate lazy_static;
use std::borrow::Cow;
use std::ops::Deref;
use std::thread;
use rand::rngs::ThreadRng;
use rand::prelude::IndexedRandom;
use foundationdb as fdb;
use foundationdb::tuple::{Subspace, pack, unpack};
use foundationdb::{Database, FdbBindingError, FdbResult, RangeOption, Transaction};
use futures::TryStreamExt;
type Result<T> = std::result::Result<T, FdbBindingError>;
#[derive(Debug, Clone, Copy)]
enum Error {
NoRemainingSeats,
TooManyClasses,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Error::NoRemainingSeats => write!(f, "No remaining seats"),
Error::TooManyClasses => write!(f, "Too many classes"),
}
}
}
impl std::error::Error for Error {}
impl From<Error> for FdbBindingError {
fn from(err: Error) -> Self {
FdbBindingError::CustomError(Box::new(err))
}
}
const LEVELS: &[&str] = &[
"intro",
"for dummies",
"remedial",
"101",
"201",
"301",
"mastery",
"lab",
"seminar",
];
const TYPES: &[&str] = &[
"chem", "bio", "cs", "geometry", "calc", "alg", "film", "music", "art", "dance",
];
const TIMES: &[&str] = &[
"2:00", "3:00", "4:00", "5:00", "6:00", "7:00", "8:00", "9:00", "10:00", "11:00", "12:00",
"13:00", "14:00", "15:00", "16:00", "17:00", "18:00", "19:00",
];
lazy_static! {
static ref ALL_CLASSES: Vec<String> = all_classes();
}
fn all_classes() -> Vec<String> {
let mut class_names: Vec<String> = Vec::new();
for level in LEVELS {
for _type in TYPES {
for time in TIMES {
class_names.push(format!("{time} {_type} {level}"));
}
}
}
class_names
}
fn init_classes(trx: &Transaction, all_classes: &[String]) {
let class_subspace = Subspace::from("class");
for class in all_classes {
trx.set(&class_subspace.pack(class), &pack(&100_i64));
}
}
async fn init(db: &Database, all_classes: &[String]) {
db.run(|trx, _maybe_committed| async move {
trx.clear_subspace_range(&"attends".into());
trx.clear_subspace_range(&"class".into());
init_classes(&trx, all_classes);
Ok(())
})
.await
.expect("failed to initialize data");
}
async fn get_available_classes(db: &Database) -> Vec<String> {
db.run(|trx, _maybe_committed| async move {
let range = RangeOption::from(&Subspace::from("class"));
let mut available_classes = Vec::<String>::new();
let mut stream = trx.get_ranges_keyvalues(range, false);
while let Some(key_value) = stream.try_next().await? {
let count: i64 = unpack(key_value.value()).expect("failed to decode count");
if count > 0 {
let class: String = unpack(key_value.key()).expect("failed to decode class");
available_classes.push(class);
}
}
Ok(available_classes)
})
.await
.expect("failed to get classes")
}
async fn ditch_trx(trx: &Transaction, student: &str, class: &str) -> FdbResult<()> {
let attends_key = pack(&("attends", student, class));
if trx.get(&attends_key, true).await?.is_none() {
return Ok(());
}
let class_key = pack(&("class", class));
let available_seats = trx
.get(&class_key, true)
.await?
.expect("class seats were not initialized");
let available_seats: i64 =
unpack::<i64>(available_seats.deref()).expect("failed to decode i64") + 1;
trx.set(&class_key, &pack(&available_seats));
trx.clear(&attends_key);
Ok(())
}
async fn ditch(db: &Database, student: String, class: String) -> Result<()> {
db.run(move |trx, _maybe_committed| {
let student = student.clone();
let class = class.clone();
async move {
ditch_trx(&trx, &student, &class).await?;
Ok(())
}
})
.await
}
async fn signup_trx(trx: &Transaction, student: &str, class: &str) -> Result<()> {
let attends_key = pack(&("attends", student, class));
if trx.get(&attends_key, true).await?.is_some() {
return Ok(());
}
let class_key = pack(&("class", class));
let available_seats: i64 = unpack(
&trx.get(&class_key, true)
.await?
.expect("class seats were not initialized"),
)
.expect("failed to decode i64");
if available_seats <= 0 {
return Err(Error::NoRemainingSeats.into());
}
let attends_range = RangeOption::from(&("attends", &student).into());
let attends: Vec<_> = trx
.get_ranges_keyvalues(attends_range, false)
.try_collect()
.await?;
if attends.len() >= 5 {
return Err(Error::TooManyClasses.into());
}
trx.set(&class_key, &pack(&(available_seats - 1)));
trx.set(&attends_key, &pack(&""));
Ok(())
}
async fn signup(db: &Database, student: String, class: String) -> Result<()> {
db.run(move |trx, _maybe_committed| {
let student = student.clone();
let class = class.clone();
async move { signup_trx(&trx, &student, &class).await }
})
.await
}
async fn switch_classes(
db: &Database,
student_id: String,
old_class: String,
new_class: String,
) -> Result<()> {
db.run(move |trx, _maybe_committed| {
let student_id = student_id.clone();
let old_class = old_class.clone();
let new_class = new_class.clone();
async move {
ditch_trx(&trx, &student_id, &old_class).await?;
signup_trx(&trx, &student_id, &new_class).await?;
Ok(())
}
})
.await
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum Mood {
Add,
Ditch,
Switch,
}
async fn perform_op(
db: &Database,
rng: &mut ThreadRng,
mood: Mood,
student_id: &str,
all_classes: &[String],
my_classes: &mut Vec<String>,
) -> Result<()> {
match mood {
Mood::Add => {
let class = all_classes.choose(rng).unwrap();
signup(db, student_id.to_string(), class.to_string()).await?;
my_classes.push(class.to_string());
}
Mood::Ditch => {
let class = all_classes.choose(rng).unwrap();
ditch(db, student_id.to_string(), class.to_string()).await?;
my_classes.retain(|s| s != class);
}
Mood::Switch => {
let old_class = my_classes.choose(rng).unwrap().to_string();
let new_class = all_classes.choose(rng).unwrap();
switch_classes(
db,
student_id.to_string(),
old_class.to_string(),
new_class.to_string(),
)
.await?;
my_classes.retain(|s| s != &old_class);
my_classes.push(new_class.to_string());
}
}
Ok(())
}
async fn simulate_students(student_id: usize, num_ops: usize) {
let db = Database::new_compat(None)
.await
.expect("failed to get database");
db.set_option(fdb::options::DatabaseOption::TransactionTimeout(5000))
.expect("failed to set transaction timeout");
db.set_option(fdb::options::DatabaseOption::TransactionRetryLimit(3))
.expect("failed to set transaction retry limit");
let student_id = format!("s{student_id}");
let mut rng = rand::rng();
let mut available_classes = Cow::Borrowed(&*ALL_CLASSES);
let mut my_classes = Vec::<String>::new();
for _ in 0..num_ops {
let mut moods = Vec::<Mood>::new();
if !my_classes.is_empty() {
moods.push(Mood::Ditch);
moods.push(Mood::Switch);
}
if my_classes.len() < 5 {
moods.push(Mood::Add);
}
let mood = moods.choose(&mut rng).copied().unwrap();
if perform_op(
&db,
&mut rng,
mood,
&student_id,
&available_classes,
&mut my_classes,
)
.await
.is_err()
{
println!("getting available classes");
available_classes = Cow::Owned(get_available_classes(&db).await);
}
}
}
async fn run_sim(db: &Database, students: usize, ops_per_student: usize) {
let mut threads: Vec<(usize, thread::JoinHandle<()>)> = Vec::with_capacity(students);
for i in 0..students {
threads.push((
i,
thread::spawn(move || {
futures::executor::block_on(simulate_students(i, ops_per_student));
}),
));
}
for (id, thread) in threads {
thread.join().expect("failed to join thread");
let student_id = format!("s{id}");
let student_id_clone = student_id.clone();
db.run(move |trx, _maybe_committed| {
let student_id_inner = student_id_clone.clone();
async move {
let attends_range = RangeOption::from(&("attends", &student_id_inner).into());
let mut stream = trx.get_ranges_keyvalues(attends_range, false);
while let Some(key_value) = stream.try_next().await? {
let (_, s, class) =
unpack::<(String, String, String)>(key_value.key()).unwrap();
assert_eq!(student_id_inner, s);
println!("{student_id_inner} is taking: {class}");
}
Ok(())
}
})
.await
.expect("get_ranges_keyvalues failed");
}
println!("Ran {} transactions", students * ops_per_student);
}
#[tokio::main]
async fn main() {
let _guard = unsafe { fdb::boot() };
let db = fdb::Database::new_compat(None)
.await
.expect("failed to get database");
db.set_option(fdb::options::DatabaseOption::TransactionTimeout(5000))
.expect("failed to set transaction timeout");
db.set_option(fdb::options::DatabaseOption::TransactionRetryLimit(3))
.expect("failed to set transaction retry limit");
init(&db, &ALL_CLASSES).await;
println!("Initialized");
run_sim(&db, 10, 10).await;
}