use bytes::Bytes;
use fdb::database::FdbDatabase;
use fdb::error::FdbResult;
use fdb::range::{Range, RangeOptions};
use fdb::transaction::{FdbTransaction, Transaction};
use fdb::tuple::Tuple;
use fdb::{Key, Mapper, Value};
use tokio::runtime::Runtime;
use tokio_stream::StreamExt;
use std::env;
use std::error::Error;
const PREFIX: &str = "prefix";
const RECORD: &str = "RECORD";
const INDEX: &str = "INDEX";
fn empty() -> Bytes {
Tuple::new().pack()
}
fn primary_key(i: u32) -> String {
format!("primary-key-of-record-{:08}", i)
}
fn index_key(i: u32) -> String {
format!("index-key-of-record-{:08}", i)
}
fn data_of_record(i: u32) -> String {
format!("data-of-record-{:08}", i)
}
fn mapper() -> Mapper {
let mapper_tup: (&'static str, &'static str, &'static str, &'static str) =
(PREFIX, RECORD, "{K[3]}", "{...}");
let mapper = {
let mut tup = Tuple::new();
tup.add_string((mapper_tup.0).to_string());
tup.add_string((mapper_tup.1).to_string());
tup.add_string((mapper_tup.2).to_string());
tup.add_string((mapper_tup.3).to_string());
tup
};
mapper.into()
}
const SPLIT_SIZE: u32 = 3;
fn index_entry_key(i: u32) -> Key {
let index_tup: (&'static str, &'static str, String, String) =
(PREFIX, INDEX, index_key(i), primary_key(i));
let index_key = {
let mut tup = Tuple::new();
tup.add_string((index_tup.0).to_string());
tup.add_string((index_tup.1).to_string());
tup.add_string(index_tup.2);
tup.add_string(index_tup.3);
tup
};
index_key.pack().into()
}
#[allow(dead_code)]
fn record_key_prefix(i: u32) -> Tuple {
let rec_key_prefix_tup: (&'static str, &'static str, String) = (PREFIX, RECORD, primary_key(i));
{
let mut tup = Tuple::new();
tup.add_string((rec_key_prefix_tup.0).to_string());
tup.add_string((rec_key_prefix_tup.1).to_string());
tup.add_string(rec_key_prefix_tup.2);
tup
}
}
fn record_key(i: u32, split: u32) -> Key {
let rec_key_tup: (&'static str, &'static str, String, u32) =
(PREFIX, RECORD, primary_key(i), split);
let rec_key = {
let mut tup = Tuple::new();
tup.add_string((rec_key_tup.0).to_string());
tup.add_string((rec_key_tup.1).to_string());
tup.add_string(rec_key_tup.2);
tup.add_i64(rec_key_tup.3.into());
tup
};
rec_key.pack().into()
}
fn record_value(i: u32, split: u32) -> Value {
let rec_value_tup: (String, u32) = (data_of_record(i), split);
let rec_value = {
let mut tup = Tuple::new();
tup.add_string(rec_value_tup.0);
tup.add_i64(rec_value_tup.1.into());
tup
};
rec_value.pack().into()
}
fn insert_record_with_index(tr: &FdbTransaction, i: u32) {
tr.set(index_entry_key(i), empty());
(0..SPLIT_SIZE).for_each(|j| {
tr.set(record_key(i, j), record_value(i, j));
});
}
async fn insert_record_with_indexes(n: u32, db: &FdbDatabase) -> FdbResult<()> {
db.run(|tr| async move {
(0..n).for_each(|i| insert_record_with_index(&tr, i));
Ok(())
})
.await
}
fn main() -> Result<(), Box<dyn Error>> {
let fdb_cluster_file = env::var("FDB_CLUSTER_FILE").expect("FDB_CLUSTER_FILE not defined!");
unsafe {
fdb::select_api_version(fdb::FDB_API_VERSION as i32);
fdb::start_network();
}
let fdb_database = fdb::open_database(fdb_cluster_file)?;
let rt = Runtime::new()?;
let cloned_fdb_database = fdb_database.clone();
rt.block_on(async {
let fdb_database = cloned_fdb_database;
fdb_database
.run(|tr| async move {
tr.clear_range(Range::new(Bytes::new(), Bytes::from_static(b"\xFF")));
Ok(())
})
.await?;
insert_record_with_indexes(5, &fdb_database).await?;
let _ = fdb_database
.run(|tr| async move {
let mut mapped_range_stream = Range::new(index_entry_key(1), index_entry_key(4))
.into_mapped_stream(&tr, mapper(), RangeOptions::default());
while let Some(x) = mapped_range_stream.next().await {
let (kv, mapped_range, mapped_kvs) = x?.into_parts();
println!();
println!("-----");
let (kv_key, kv_value) = kv.into_parts();
println!("kv_key: {:?}", Tuple::from_bytes(kv_key)?);
println!("kv_value: {:?}", Tuple::from_bytes(kv_value)?);
println!();
let (mapped_range_begin_key, mapped_range_end_key) = mapped_range.into_parts();
println!(
"mapped_range_begin_key: {:?}",
Tuple::from_bytes(mapped_range_begin_key)?
);
println!("mapped_range_end_key: {:?}", mapped_range_end_key);
println!();
for mapped_kv in mapped_kvs {
let (mapped_kv_key, mapped_kv_value) = mapped_kv.into_parts();
println!("mapped_kv_key: {:?}", Tuple::from_bytes(mapped_kv_key)?);
println!("mapped_kv_value: {:?}", Tuple::from_bytes(mapped_kv_value)?);
}
println!("-----");
}
Ok(())
})
.await?;
Result::<(), Box<dyn Error>>::Ok(())
})?;
drop(fdb_database);
unsafe {
fdb::stop_network();
}
Ok(())
}