use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::Range;
use proptest::prelude::prop;
use proptest::prop_oneof;
use proptest::strategy::{Just, Strategy};
use tempfile::TempDir;
use tokio::runtime::Runtime;
use crate::record::{MultiPlexedRecord, MultiRecord};
use crate::{MultiRecordLog, Serializable};
struct PropTestEnv {
tempdir: TempDir,
record_log: MultiRecordLog,
state: HashMap<&'static str, Range<u64>>,
block_to_write: Vec<u8>,
}
impl PropTestEnv {
pub async fn new(block_size: usize) -> Self {
let tempdir = tempfile::tempdir().unwrap();
let mut record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
record_log.create_queue("q1").await.unwrap();
record_log.create_queue("q2").await.unwrap();
let mut state = HashMap::default();
state.insert("q1", 0..0);
state.insert("q2", 0..0);
PropTestEnv {
tempdir,
record_log,
state,
block_to_write: vec![b'A'; block_size],
}
}
pub async fn apply(&mut self, op: Operation) {
match op {
Operation::Reopen => {
self.reload().await;
}
Operation::MultiAppend { queue, count } => {
self.multi_append(queue, count).await;
}
Operation::RedundantAppend { queue } => {
self.double_append(queue).await;
}
Operation::Truncate { queue, pos } => {
self.truncate(queue, pos).await;
}
}
}
pub async fn reload(&mut self) {
self.record_log = MultiRecordLog::open(self.tempdir.path()).await.unwrap();
for (queue, range) in &self.state {
assert_eq!(
self.record_log.range(queue, ..).unwrap().count() as u64,
range.end - range.start,
);
}
}
pub async fn double_append(&mut self, queue: &str) {
let range = self.state.get_mut(queue).unwrap();
let res = self
.record_log
.append_records(queue, Some(range.end), std::iter::once(&b"BB"[..]))
.await
.unwrap()
.unwrap();
assert!(self
.record_log
.append_records(queue, Some(range.end), std::iter::once(&b"BB"[..]))
.await
.unwrap()
.is_none());
assert_eq!(range.end, res);
range.end += 1;
}
pub async fn multi_append(&mut self, queue: &str, count: u64) {
let range = self.state.get_mut(queue).unwrap();
let res = self
.record_log
.append_records(
queue,
Some(range.end),
std::iter::repeat(&self.block_to_write[..]).take(count as usize),
)
.await
.unwrap();
if count != 0 {
let res = res.unwrap();
assert_eq!(range.end + count - 1, res);
range.end += count;
}
}
pub async fn truncate(&mut self, queue: &str, pos: u64) {
let range = self.state.get_mut(queue).unwrap();
if range.contains(&pos) {
range.start = pos + 1;
self.record_log.truncate(queue, pos).await.unwrap();
} else if pos >= range.end {
self.record_log.truncate(queue, pos).await.unwrap_err();
} else {
self.record_log.truncate(queue, pos).await.unwrap();
}
}
}
fn queue_strategy() -> impl Strategy<Value = &'static str> {
prop_oneof![Just("q1"), Just("q2"),]
}
fn operation_strategy() -> impl Strategy<Value = Operation> {
prop_oneof![
Just(Operation::Reopen),
queue_strategy().prop_map(|queue| Operation::RedundantAppend { queue }),
(queue_strategy(), 0u64..10u64).prop_map(|(queue, pos)| Operation::Truncate { queue, pos }),
(queue_strategy(), 0u64..10u64)
.prop_map(|(queue, count)| Operation::MultiAppend { queue, count }),
]
}
fn operations_strategy() -> impl Strategy<Value = Vec<Operation>> {
prop::collection::vec(operation_strategy(), 1..100)
}
fn random_bytevec_strategy(max_len: usize) -> impl Strategy<Value = Vec<u8>> {
prop::collection::vec(proptest::num::u8::ANY, 0..max_len)
}
fn random_multi_record_strategy(
max_record_count: usize,
max_len: usize,
) -> impl Strategy<Value = Vec<Vec<u8>>> {
prop::collection::vec(random_bytevec_strategy(max_len), 1..max_record_count)
}
#[test]
fn test_scenario_end_on_full_file() {
use Operation::*;
let ops = [
MultiAppend {
queue: "q1",
count: 1,
},
MultiAppend {
queue: "q1",
count: 1,
},
MultiAppend {
queue: "q2",
count: 1,
},
MultiAppend {
queue: "q2",
count: 1,
},
MultiAppend {
queue: "q2",
count: 1,
},
Reopen,
RedundantAppend { queue: "q2" },
Reopen,
];
Runtime::new().unwrap().block_on(async {
let mut env = PropTestEnv::new(52381).await;
for op in ops {
env.apply(op).await;
}
});
}
#[test]
fn test_scenario_big_records() {
use Operation::*;
let ops = [
MultiAppend {
queue: "q1",
count: 2,
},
MultiAppend {
queue: "q2",
count: 4,
},
Reopen,
MultiAppend {
queue: "q2",
count: 1,
},
Reopen,
RedundantAppend { queue: "q2" },
Reopen,
];
Runtime::new().unwrap().block_on(async {
let mut env = PropTestEnv::new(1 << 26).await;
for op in ops {
env.apply(op).await;
}
});
}
fn queue_name_len() -> impl Strategy<Value = usize> {
prop_oneof![
100 => 1..10usize,
1 => 65_534..=65_536usize
]
}
fn queue_name_strategy() -> impl Strategy<Value = String> {
queue_name_len().prop_flat_map(|num_bytes| {
proptest::collection::vec(proptest::prelude::any::<char>(), num_bytes).prop_map(
move |chars| {
let mut s: String = String::new();
s.extend(chars.into_iter());
let boundaries = (0..num_bytes).rev();
for boundary in boundaries {
if s.is_char_boundary(boundary) {
s.truncate(boundary);
break;
}
}
s
},
)
})
}
proptest::proptest! {
#[test]
fn test_proptest_multirecord((ops, block_size) in (operations_strategy(), 0usize..65535)) {
Runtime::new().unwrap().block_on(async {
let mut env = PropTestEnv::new(block_size).await;
for op in ops {
env.apply(op).await;
}
});
}
#[test]
fn test_proptest_multiplexed_record_roundtrip((kind, queue, position, payload) in
(0u8..4u8, queue_name_strategy(), proptest::num::u64::ANY, random_multi_record_strategy(64, 65536))) {
let mut buffer = Vec::new();
MultiRecord::serialize(payload.iter().map(|p| p.as_ref()), position, &mut buffer);
let record = match kind {
0 => MultiPlexedRecord::AppendRecords {
queue: &queue,
position,
records: MultiRecord::new(&buffer).unwrap(),
},
1 => MultiPlexedRecord::Truncate {
queue: &queue,
position},
2 => MultiPlexedRecord::RecordPosition {queue: &queue, position},
3 => MultiPlexedRecord::DeleteQueue {queue: &queue, position},
4.. => unreachable!(),
};
let mut buffer = Vec::new();
record.serialize(&mut buffer);
let deser = MultiPlexedRecord::deserialize(&buffer).unwrap();
assert_eq!(record, deser);
if let MultiPlexedRecord::AppendRecords { records, .. } = deser {
assert!(records
.map(|record| record.unwrap().1)
.zip(payload)
.all(|(record, payload)| record == payload));
}
}
}
#[derive(Debug, Clone)]
enum Operation {
Reopen,
MultiAppend { queue: &'static str, count: u64 },
RedundantAppend { queue: &'static str },
Truncate { queue: &'static str, pos: u64 },
}
#[tokio::test]
async fn test_multi_record() {
let tempdir = tempfile::tempdir().unwrap();
eprintln!("dir={tempdir:?}");
{
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
multi_record_log.create_queue("queue").await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, &b"1"[..])
.await
.unwrap(),
Some(0)
);
}
{
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, &b"22"[..])
.await
.unwrap(),
Some(1)
);
}
{
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
multi_record_log.truncate("queue", 0).await.unwrap();
assert_eq!(
multi_record_log
.range("queue", ..)
.unwrap()
.collect::<Vec<_>>(),
[(1, Cow::Borrowed(&b"22"[..]))],
);
}
{
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, &b"hello"[..])
.await
.unwrap(),
Some(2)
);
}
{
let multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
assert_eq!(
multi_record_log
.range("queue", ..)
.unwrap()
.collect::<Vec<_>>(),
[
(1, Cow::Borrowed(&b"22"[..])),
(2, Cow::Borrowed(&b"hello"[..])),
]
);
}
}