use std::io;
use std::io::Write;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use goldenfile::Mint;
use raft_log::Config;
use raft_log::RaftLog;
use raft_log::Types;
use raft_log::api::raft_log_writer::RaftLogWriter;
use tempfile::TempDir;
#[derive(Debug, Clone, PartialEq, Eq, Default)]
struct TestTypes;
impl Types for TestTypes {
type LogId = (u64, u64); type LogPayload = String;
type Vote = (u64, u64); type Callback = std::sync::mpsc::SyncSender<Result<(), io::Error>>;
type UserData = Vec<u8>;
fn log_index(log_id: &Self::LogId) -> u64 {
log_id.1 }
fn payload_size(payload: &Self::LogPayload) -> u64 {
payload.len() as u64
}
}
#[test]
fn test_massive_load() -> std::io::Result<()> {
let mut mint = Mint::new("tests/massive");
let file = &mut mint.new_goldenfile("periodical-read.txt")?;
let temp_dir = TempDir::new()?;
let path = temp_dir.path().to_path_buf().to_str().unwrap().to_string();
let config = Arc::new(Config {
dir: path,
log_cache_max_items: Some(200),
chunk_max_records: Some(100),
..Default::default()
});
let mut log_index = 0u64;
let mut purge_index = 0u64;
for reopen_index in 0..3 {
let mut log = RaftLog::<TestTypes>::open(config.clone())?;
for i in 1u64..500 {
log_index += 1;
let log_id = (log_index, log_index);
let entries =
vec![(log_id, format!("data-{}-{}", reopen_index, log_index))];
log.append(entries)?;
if i % 11 == 0 {
log.commit(log_id)?;
}
if i % 13 == 0 {
log.purge((purge_index, purge_index))?;
purge_index += 7;
}
if i % 17 == 0 {
log.save_vote((log_index, 1u64))?;
}
if i % 29 == 0 {
log.truncate(log_index - 5)?;
log_index -= 6;
}
if i % 23 == 0 {
let random_index =
purge_index + (i * 40503 % (log_index - purge_index + 1));
let entries = log
.read(random_index, random_index + 6)
.collect::<Result<Vec<_>, _>>()?;
let state = log.log_state();
writeln!(file, "{:?}", state)?;
log.wait_worker_idle();
log.drain_cache_evictable();
let stat = log.stat();
writeln!(file, "{:#}", stat)?;
for entry in entries {
writeln!(file, "{:?}", entry)?;
}
}
}
log.wait_worker_idle();
log.drain_cache_evictable();
let stat = log.stat();
writeln!(file, "sync start: {:#}", stat)?;
let (tx, rx) = std::sync::mpsc::sync_channel(1);
log.flush(Some(tx))?;
rx.recv().unwrap()?;
let stat = log.stat();
writeln!(file, "sync done: {:#}", stat)?;
sleep(Duration::from_millis(200));
}
Ok(())
}