use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::io::Seek;
use std::os::unix::fs::FileExt;
use byteorder::WriteBytesExt;
use indoc::indoc;
use pretty_assertions::assert_eq;
use crate::ChunkId;
use crate::Dump;
use crate::DumpApi;
use crate::api::raft_log_writer::RaftLogWriter;
use crate::api::raft_log_writer::blocking_flush;
use crate::testing::TestTypes;
use crate::testing::ss;
use crate::tests::context::TestContext;
use crate::tests::sample_data;
fn open_chunk_file(
config: &chunked_wal::Config,
chunk_id: ChunkId,
) -> Result<File, io::Error> {
OpenOptions::new().read(true).write(true).open(config.chunk_path(chunk_id))
}
#[test]
fn test_reopen() -> Result<(), io::Error> {
let mut ctx = TestContext::new()?;
{
let config = &mut ctx.config;
config.wal.chunk_max_records = Some(5);
}
let (state, logs) = {
let mut rl = ctx.new_raft_log()?;
sample_data::build_sample_data_purge_upto_3(&mut rl)?;
(
rl.log_state().clone(),
rl.read(0, 1000).collect::<Result<Vec<_>, _>>()?,
)
};
{
let config = &mut ctx.config;
config.wal.chunk_max_records = Some(7);
}
{
let mut rl = ctx.new_raft_log()?;
assert_eq!(state, rl.log_state().clone());
assert_eq!(
logs,
rl.read(0, 1000).collect::<Result<Vec<_>, io::Error>>()?
);
let dump = rl.dump().write_to_string()?;
println!("After reopen:\n{}", dump);
assert_eq!(
indoc! {r#"
RaftLog:
ChunkId(00_000_000_000_000_000_324)
R-00000: [000_000_000, 000_000_050) Size(50): State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
R-00001: [000_000_050, 000_000_078) Size(28): PurgeUpto((1, 1))
R-00002: [000_000_078, 000_000_115) Size(37): Append((2, 4), "world")
R-00003: [000_000_115, 000_000_150) Size(35): Append((2, 5), "foo")
R-00004: [000_000_150, 000_000_185) Size(35): Append((2, 6), "bar")
ChunkId(00_000_000_000_000_000_509)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
R-00001: [000_000_066, 000_000_101) Size(35): Append((2, 7), "wow")
R-00002: [000_000_101, 000_000_129) Size(28): PurgeUpto((2, 3))
"#},
dump
);
let logs = [
((3, 8), ss("hi")),
((3, 9), ss("hello")),
((3, 10), ss("world")),
((3, 11), ss("foo")),
((3, 12), ss("foo")),
];
rl.append(logs)?;
blocking_flush(&mut rl)?;
let dump = rl.dump().write_to_string()?;
println!("After reopen:\n{}", dump);
assert_eq!(
indoc! {r#"
RaftLog:
ChunkId(00_000_000_000_000_000_324)
R-00000: [000_000_000, 000_000_050) Size(50): State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
R-00001: [000_000_050, 000_000_078) Size(28): PurgeUpto((1, 1))
R-00002: [000_000_078, 000_000_115) Size(37): Append((2, 4), "world")
R-00003: [000_000_115, 000_000_150) Size(35): Append((2, 5), "foo")
R-00004: [000_000_150, 000_000_185) Size(35): Append((2, 6), "bar")
ChunkId(00_000_000_000_000_000_509)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
R-00001: [000_000_066, 000_000_101) Size(35): Append((2, 7), "wow")
R-00002: [000_000_101, 000_000_129) Size(28): PurgeUpto((2, 3))
R-00003: [000_000_129, 000_000_163) Size(34): Append((3, 8), "hi")
R-00004: [000_000_163, 000_000_200) Size(37): Append((3, 9), "hello")
R-00005: [000_000_200, 000_000_237) Size(37): Append((3, 10), "world")
R-00006: [000_000_237, 000_000_272) Size(35): Append((3, 11), "foo")
ChunkId(00_000_000_000_000_000_781)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((3, 11)), committed: Some((1, 2)), purged: Some((2, 3)), user_data: None })
R-00001: [000_000_066, 000_000_101) Size(35): Append((3, 12), "foo")
"#},
dump
);
}
Ok(())
}
#[test]
fn test_reopen_restores_payload_cache_evictable_boundary()
-> Result<(), io::Error> {
let mut ctx = TestContext::new()?;
ctx.config.wal.chunk_max_records = Some(5);
{
let mut rl = ctx.new_raft_log()?;
sample_data::build_sample_data(&mut rl)?;
}
{
let rl = ctx.new_raft_log()?;
let stat = rl.stat();
assert_eq!(Some((2, 6)), stat.payload_cache_last_evictable);
assert_eq!(6, stat.payload_cache_item_count);
}
Ok(())
}
#[test]
fn test_reopen_unfinished_chunk() -> Result<(), io::Error> {
let mut ctx = TestContext::new()?;
let config = &mut ctx.config;
config.wal.chunk_max_records = Some(5);
let (mut state, logs) = {
let mut rl = ctx.new_raft_log()?;
sample_data::build_sample_data_purge_upto_3(&mut rl)?;
(
rl.log_state().clone(),
rl.read(0, 1000).collect::<Result<Vec<_>, _>>()?,
)
};
{
let chunk_id = ChunkId(509);
let f = open_chunk_file(&ctx.config.wal, chunk_id)?;
f.set_len(126)?;
state.purged = Some((1, 1));
}
{
let rl = ctx.new_raft_log()?;
assert_eq!(state, rl.log_state().clone());
assert_eq!(logs, rl.read(0, 1000).collect::<Result<Vec<_>, _>>()?);
let dump = rl.dump().write_to_string()?;
println!("After reopen:\n{}", dump);
assert_eq!(
indoc! {r#"
RaftLog:
ChunkId(00_000_000_000_000_000_324)
R-00000: [000_000_000, 000_000_050) Size(50): State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
R-00001: [000_000_050, 000_000_078) Size(28): PurgeUpto((1, 1))
R-00002: [000_000_078, 000_000_115) Size(37): Append((2, 4), "world")
R-00003: [000_000_115, 000_000_150) Size(35): Append((2, 5), "foo")
R-00004: [000_000_150, 000_000_185) Size(35): Append((2, 6), "bar")
ChunkId(00_000_000_000_000_000_509)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
R-00001: [000_000_066, 000_000_101) Size(35): Append((2, 7), "wow")
ChunkId(00_000_000_000_000_000_610)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((2, 7)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
"#},
dump
);
}
Ok(())
}
#[test]
fn test_reopen_unfinished_tailing_zero_chunk() -> Result<(), io::Error> {
for append_zeros in [3, 1024 * 33] {
let mut ctx = TestContext::new()?;
let config = &mut ctx.config;
config.wal.chunk_max_records = Some(5);
let (state, logs) = {
let mut rl = ctx.new_raft_log()?;
sample_data::build_sample_data_purge_upto_3(&mut rl)?;
(
rl.log_state().clone(),
rl.read(0, 1000).collect::<Result<Vec<_>, _>>()?,
)
};
{
let chunk_id = ChunkId(509);
let f = open_chunk_file(&ctx.config.wal, chunk_id)?;
f.set_len(129 + append_zeros)?;
}
{
let rl = ctx.new_raft_log()?;
assert_eq!(
rl.wal.last_closed_chunk_truncated_file_size(),
Some(129 + append_zeros)
);
assert_eq!(state, rl.log_state().clone());
assert_eq!(logs, rl.read(0, 1000).collect::<Result<Vec<_>, _>>()?);
let dump = rl.dump().write_to_string()?;
println!("After reopen:\n{}", dump);
assert_eq!(
indoc! {r#"
RaftLog:
ChunkId(00_000_000_000_000_000_324)
R-00000: [000_000_000, 000_000_050) Size(50): State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
R-00001: [000_000_050, 000_000_078) Size(28): PurgeUpto((1, 1))
R-00002: [000_000_078, 000_000_115) Size(37): Append((2, 4), "world")
R-00003: [000_000_115, 000_000_150) Size(35): Append((2, 5), "foo")
R-00004: [000_000_150, 000_000_185) Size(35): Append((2, 6), "bar")
ChunkId(00_000_000_000_000_000_509)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
R-00001: [000_000_066, 000_000_101) Size(35): Append((2, 7), "wow")
R-00002: [000_000_101, 000_000_129) Size(28): PurgeUpto((2, 3))
ChunkId(00_000_000_000_000_000_638)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((2, 7)), committed: Some((1, 2)), purged: Some((2, 3)), user_data: None })
"#},
dump
);
}
}
Ok(())
}
#[test]
fn test_reopen_unfinished_tailing_not_all_zero_chunk() -> Result<(), io::Error>
{
let append_zeros = 1024 * 32;
let mut ctx = TestContext::new()?;
let config = &mut ctx.config;
config.wal.chunk_max_records = Some(5);
{
let mut rl = ctx.new_raft_log()?;
sample_data::build_sample_data_purge_upto_3(&mut rl)?;
}
{
let chunk_id = ChunkId(509);
let mut f = open_chunk_file(&ctx.config.wal, chunk_id)?;
f.set_len(129 + append_zeros)?;
f.seek(io::SeekFrom::Start(129 + append_zeros))?;
f.write_u8(1)?;
}
{
let res = ctx.new_raft_log();
assert!(res.is_err());
assert_eq!(
"crc32 checksum mismatch: expected fd59b8d, got 0, \
while Record::decode(); \
when:(decode Record at offset 129); \
when:(iterate ChunkId(00_000_000_000_000_000_509))",
res.unwrap_err().to_string()
);
let dump =
Dump::<TestTypes>::new(ctx.arc_config())?.write_to_string()?;
println!("After reopen:\n{}", dump);
assert_eq!(
indoc! {r#"
RaftLog:
ChunkId(00_000_000_000_000_000_324)
R-00000: [000_000_000, 000_000_050) Size(50): State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
R-00001: [000_000_050, 000_000_078) Size(28): PurgeUpto((1, 1))
R-00002: [000_000_078, 000_000_115) Size(37): Append((2, 4), "world")
R-00003: [000_000_115, 000_000_150) Size(35): Append((2, 5), "foo")
R-00004: [000_000_150, 000_000_185) Size(35): Append((2, 6), "bar")
ChunkId(00_000_000_000_000_000_509)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
R-00001: [000_000_066, 000_000_101) Size(35): Append((2, 7), "wow")
R-00002: [000_000_101, 000_000_129) Size(28): PurgeUpto((2, 3))
Error: crc32 checksum mismatch: expected fd59b8d, got 0, while Record::decode(); when:(decode Record at offset 129); when:(iterate ChunkId(00_000_000_000_000_000_509))
"#},
dump
);
}
Ok(())
}
#[test]
fn test_reopen_unfinished_non_last_chunk() -> Result<(), io::Error> {
let mut ctx = TestContext::new()?;
let config = &mut ctx.config;
config.wal.chunk_max_records = Some(5);
{
let mut rl = ctx.new_raft_log()?;
sample_data::build_sample_data_purge_upto_3(&mut rl)?;
}
{
let second_last_chunk_id = ChunkId(324);
let f = open_chunk_file(&ctx.config.wal, second_last_chunk_id)?;
f.set_len(182)?;
}
{
let res = ctx.new_raft_log();
assert!(res.is_err());
assert_eq!(
"failed to fill whole buffer; when:(decode Record at offset 150); when:(iterate ChunkId(00_000_000_000_000_000_324))",
res.unwrap_err().to_string()
);
let dump =
Dump::<TestTypes>::new(ctx.arc_config())?.write_to_string()?;
println!("After reopen:\n{}", dump);
assert_eq!(
indoc! {r#"
RaftLog:
ChunkId(00_000_000_000_000_000_324)
R-00000: [000_000_000, 000_000_050) Size(50): State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
R-00001: [000_000_050, 000_000_078) Size(28): PurgeUpto((1, 1))
R-00002: [000_000_078, 000_000_115) Size(37): Append((2, 4), "world")
R-00003: [000_000_115, 000_000_150) Size(35): Append((2, 5), "foo")
Error: failed to fill whole buffer; when:(decode Record at offset 150); when:(iterate ChunkId(00_000_000_000_000_000_324))
ChunkId(00_000_000_000_000_000_509)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
R-00001: [000_000_066, 000_000_101) Size(35): Append((2, 7), "wow")
R-00002: [000_000_101, 000_000_129) Size(28): PurgeUpto((2, 3))
"#},
dump
);
}
Ok(())
}
#[test]
fn test_reopen_damaged_last_record() -> Result<(), io::Error> {
let mut ctx = TestContext::new()?;
let config = &mut ctx.config;
config.wal.chunk_max_records = Some(5);
{
let mut rl = ctx.new_raft_log()?;
sample_data::build_sample_data_purge_upto_3(&mut rl)?;
}
{
let last_chunk_id = ChunkId(509);
let mut f = open_chunk_file(&ctx.config.wal, last_chunk_id)?;
let mut byte_buf = [0u8; 1];
f.read_exact_at(&mut byte_buf, 126)?;
let byt = byte_buf[0].wrapping_add(1);
f.seek(io::SeekFrom::Start(126))?;
f.write_u8(byt)?;
}
{
let res = ctx.new_raft_log();
assert!(res.is_err());
assert_eq!(
"crc32 checksum mismatch: expected cb22c57e, got cb23c57e, \
while Record::decode(); \
when:(decode Record at offset 101); \
when:(iterate ChunkId(00_000_000_000_000_000_509))",
res.unwrap_err().to_string()
);
let dump =
Dump::<TestTypes>::new(ctx.arc_config())?.write_to_string()?;
println!("After reopen:\n{}", dump);
assert_eq!(
indoc! {r#"
RaftLog:
ChunkId(00_000_000_000_000_000_324)
R-00000: [000_000_000, 000_000_050) Size(50): State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
R-00001: [000_000_050, 000_000_078) Size(28): PurgeUpto((1, 1))
R-00002: [000_000_078, 000_000_115) Size(37): Append((2, 4), "world")
R-00003: [000_000_115, 000_000_150) Size(35): Append((2, 5), "foo")
R-00004: [000_000_150, 000_000_185) Size(35): Append((2, 6), "bar")
ChunkId(00_000_000_000_000_000_509)
R-00000: [000_000_000, 000_000_066) Size(66): State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
R-00001: [000_000_066, 000_000_101) Size(35): Append((2, 7), "wow")
Error: crc32 checksum mismatch: expected cb22c57e, got cb23c57e, while Record::decode(); when:(decode Record at offset 101); when:(iterate ChunkId(00_000_000_000_000_000_509))
"#},
dump
);
}
Ok(())
}