use std::fmt;
use std::iter;
use std::ptr;
use rocks_sys as ll;
use crate::to_raw::{FromRaw, ToRaw};
use crate::types::SequenceNumber;
use crate::write_batch::WriteBatch;
use crate::{Error, Result};
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[repr(C)]
pub enum WalFileType {
Archived = 0,
Alive = 1,
}
pub struct LogFile {
pub path_name: String,
pub log_number: u64,
pub file_type: WalFileType,
pub start_sequence: SequenceNumber,
pub size_in_bytes: u64,
}
impl fmt::Debug for LogFile {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"WalFile({:?}, {:?}, #{}, {} bytes)",
self.path_name, self.file_type, self.log_number, self.size_in_bytes
)
}
}
#[derive(Debug)]
pub struct BatchResult {
pub sequence: SequenceNumber,
pub write_batch: WriteBatch,
}
#[derive(Debug)]
pub struct TransactionLogIterator {
raw: *mut ll::rocks_transaction_log_iterator_t,
}
impl ToRaw<ll::rocks_transaction_log_iterator_t> for TransactionLogIterator {
fn raw(&self) -> *mut ll::rocks_transaction_log_iterator_t {
self.raw
}
}
impl FromRaw<ll::rocks_transaction_log_iterator_t> for TransactionLogIterator {
unsafe fn from_ll(raw: *mut ll::rocks_transaction_log_iterator_t) -> TransactionLogIterator {
TransactionLogIterator { raw: raw }
}
}
impl Drop for TransactionLogIterator {
fn drop(&mut self) {
unsafe {
ll::rocks_transaction_log_iterator_destory(self.raw);
}
}
}
impl TransactionLogIterator {
pub fn is_valid(&self) -> bool {
unsafe { ll::rocks_transaction_log_iterator_valid(self.raw) != 0 }
}
pub fn move_next(&mut self) {
unsafe {
ll::rocks_transaction_log_iterator_next(self.raw);
}
}
pub fn status(&self) -> Result<()> {
let mut status = ptr::null_mut();
unsafe {
ll::rocks_transaction_log_iterator_status(self.raw, &mut status);
Error::from_ll(status)
}
}
pub fn get_batch(&self) -> BatchResult {
let mut seq_no = 0;
unsafe {
let batch_raw_ptr = ll::rocks_transaction_log_iterator_get_batch(self.raw, &mut seq_no);
BatchResult {
sequence: SequenceNumber(seq_no),
write_batch: WriteBatch::from_ll(batch_raw_ptr),
}
}
}
}
impl iter::Iterator for TransactionLogIterator {
type Item = BatchResult;
fn next(&mut self) -> Option<Self::Item> {
if self.is_valid() && self.status().is_ok() {
let batch = self.get_batch();
self.move_next();
Some(batch)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::super::rocksdb::*;
use crate::write_batch::WriteBatchIteratorHandler;
#[test]
fn transaction_log_iter() {
let tmp_dir = ::tempdir::TempDir::new_in("", "rocks").unwrap();
let db = DB::open(
Options::default()
.map_db_options(|db| {
db.create_if_missing(true)
.wal_ttl_seconds(1000000)
.wal_size_limit_mb(1024)
})
.map_cf_options(|cf| cf.disable_auto_compactions(false)), &tmp_dir,
)
.unwrap();
for i in 0..100 {
let key = format!("k{}", i);
let val = format!("v{}", i * i);
let mut batch = WriteBatch::default();
batch
.put(format!("K{}", i).as_bytes(), format!("V{}", i * i).as_bytes())
.put(format!("M{}", i).as_bytes(), format!("V{}", i).as_bytes())
.put(format!("N{}", i).as_bytes(), format!("V{}", i * i * i).as_bytes());
assert!(db.write(WriteOptions::default_instance(), &batch).is_ok());
if i % 9 == 0 {
assert!(db.flush(&FlushOptions::default().wait(true)).is_ok());
}
}
let it = db.get_updates_since(2000.into());
assert!(it.is_err());
let it = db.get_updates_since(20.into());
assert!(it.is_ok());
let mut it = it.unwrap();
assert!(it.is_valid());
assert!(it.status().is_ok());
assert!(it.next().is_some());
let batch = it.get_batch();
println!("batch => {:?}", batch);
assert!(batch.sequence.0 >= 20);
assert_eq!(batch.write_batch.count(), 3);
let mut handler = WriteBatchIteratorHandler::default();
let ret = batch.write_batch.iterate(&mut handler);
assert!(ret.is_ok(), "error: {:?}", ret);
assert_eq!(handler.entries.len(), 3);
for batch in db.get_updates_since(20.into()).unwrap() {
assert!(batch.sequence.0 > 20 - 3);
}
}
}