use rocks_sys as ll;
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::slice;
use std::str;
use crate::compaction_job_stats::CompactionJobStats;
use crate::db::{ColumnFamilyHandle, DBRef};
use crate::options::CompressionType;
use crate::table_properties::{TableProperties, TablePropertiesCollection};
use crate::to_raw::FromRaw;
use crate::types::SequenceNumber;
use crate::{Error, Result};
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum TableFileCreationReason {
Flush,
Compaction,
Recovery,
}
pub struct TableFileCreationBriefInfo {
raw: *const ll::rocks_table_file_creation_brief_info_t,
}
impl fmt::Debug for TableFileCreationBriefInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("TableFileCreationBriefInfo")
.field("job_id", &self.job_id())
.field("reason", &self.reason())
.field("cf", &self.cf_name())
.field("file_path", &self.file_path())
.finish()
}
}
impl TableFileCreationBriefInfo {
pub fn db_name(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_file_creation_brief_info_get_db_name(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn cf_name(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_file_creation_brief_info_get_cf_name(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn file_path(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_file_creation_brief_info_get_file_path(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn job_id(&self) -> i32 {
unsafe { ll::rocks_table_file_creation_brief_info_get_job_id(self.raw) as i32 }
}
pub fn reason(&self) -> TableFileCreationReason {
unsafe { mem::transmute(ll::rocks_table_file_creation_brief_info_get_reason(self.raw)) }
}
}
pub struct TableFileCreationInfo {
raw: *const ll::rocks_table_file_creation_info_t,
}
impl fmt::Debug for TableFileCreationInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("TableFileCreationInfo")
.field("job_id", &self.job_id())
.field("reason", &self.reason())
.field("cf", &self.cf_name())
.field("file_path", &self.file_path())
.field("status", &self.status())
.finish()
}
}
impl TableFileCreationInfo {
pub fn file_size(&self) -> u64 {
unsafe { ll::rocks_table_file_creation_info_get_file_size(self.raw) }
}
pub fn table_properties<'a>(&'a self) -> TableProperties<'a> {
unsafe { TableProperties::from_ll(ll::rocks_table_file_creation_info_get_table_properties(self.raw)) }
}
pub fn status(&self) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_table_file_creation_info_get_status(self.raw, &mut status);
Result::from_ll(status)
}
}
unsafe fn brief_info(&self) -> *const ll::rocks_table_file_creation_brief_info_t {
ll::rocks_table_file_creation_info_get_brief_info(self.raw)
}
pub fn db_name(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_file_creation_brief_info_get_db_name(self.brief_info(), &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn cf_name(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_file_creation_brief_info_get_cf_name(self.brief_info(), &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn file_path(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_file_creation_brief_info_get_file_path(self.brief_info(), &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn job_id(&self) -> i32 {
unsafe { ll::rocks_table_file_creation_brief_info_get_job_id(self.brief_info()) as i32 }
}
pub fn reason(&self) -> TableFileCreationReason {
unsafe { mem::transmute(ll::rocks_table_file_creation_brief_info_get_reason(self.brief_info())) }
}
}
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum CompactionReason {
Unknown,
LevelL0FilesNum,
LevelMaxLevelSize,
UniversalSizeAmplification,
UniversalSizeRatio,
UniversalSortedRunNum,
FIFOMaxSize,
ManualCompaction,
FilesMarkedForCompaction,
}
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum BackgroundErrorReason {
Flush,
Compaction,
WriteCallback,
MemTable,
}
#[derive(Debug)]
pub struct TableFileDeletionInfo<'a> {
pub db_name: &'a str,
pub file_path: &'a str,
pub job_id: i32,
pub status: Result<()>,
}
#[derive(Debug)]
pub struct FlushJobInfo<'a> {
pub cf_name: &'a str,
pub file_path: &'a str,
pub thread_id: u64,
pub job_id: i32,
pub triggered_writes_slowdown: bool,
pub triggered_writes_stop: bool,
pub smallest_seqno: SequenceNumber,
pub largest_seqno: SequenceNumber,
pub table_properties: TableProperties<'a>,
}
pub struct CompactionJobInfo<'a> {
raw: *mut ll::rocks_compaction_job_info_t,
_marker: PhantomData<&'a ()>,
}
impl<'a> fmt::Debug for CompactionJobInfo<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("CompactionJobInfo")
.field("cf_name", &self.cf_name())
.field("status", &self.status())
.field("inputs", &self.input_files().len())
.field("outputs", &self.output_files().len())
.finish()
}
}
impl<'a> CompactionJobInfo<'a> {
pub fn cf_name(&self) -> &'a str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_compaction_job_info_get_cf_name(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn status(&self) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_compaction_job_info_get_status(self.raw, &mut status);
Result::from_ll(status)
}
}
pub fn thread_id(&self) -> u64 {
unsafe { ll::rocks_compaction_job_info_get_thread_id(self.raw) }
}
pub fn job_id(&self) -> i32 {
unsafe { ll::rocks_compaction_job_info_get_job_id(self.raw) as i32 }
}
pub fn base_input_level(&self) -> i32 {
unsafe { ll::rocks_compaction_job_info_get_base_input_level(self.raw) as i32 }
}
pub fn output_level(&self) -> i32 {
unsafe { ll::rocks_compaction_job_info_get_output_level(self.raw) as i32 }
}
pub fn input_files(&self) -> Vec<&'a str> {
unsafe {
let num = ll::rocks_compaction_job_info_get_input_files_num(self.raw);
let mut ptrs = vec![ptr::null(); num];
let mut sizes = vec![0_usize; num];
ll::rocks_compaction_job_info_get_input_files(self.raw, ptrs.as_mut_ptr(), sizes.as_mut_ptr());
ptrs.iter()
.zip(sizes.iter())
.map(|(&ptr, &len)| str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len)))
.collect()
}
}
pub fn output_files(&self) -> Vec<&'a str> {
unsafe {
let num = ll::rocks_compaction_job_info_get_output_files_num(self.raw);
let mut ptrs = vec![ptr::null(); num];
let mut sizes = vec![0_usize; num];
ll::rocks_compaction_job_info_get_output_files(self.raw, ptrs.as_mut_ptr(), sizes.as_mut_ptr());
ptrs.iter()
.zip(sizes.iter())
.map(|(&ptr, &len)| str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len)))
.collect()
}
}
pub fn table_properties(&self) -> TablePropertiesCollection {
unsafe { TablePropertiesCollection::from_ll(ll::rocks_compaction_job_info_get_table_properties(self.raw)) }
}
pub fn compaction_reason(&self) -> CompactionReason {
unsafe { mem::transmute(ll::rocks_compaction_job_info_get_compaction_reason(self.raw)) }
}
pub fn compression(&self) -> CompressionType {
unsafe { mem::transmute(ll::rocks_compaction_job_info_get_compression(self.raw)) }
}
pub fn stats(&self) -> CompactionJobStats {
unsafe { CompactionJobStats::from_ll(ll::rocks_compaction_job_info_get_stats(self.raw)) }
}
}
pub struct MemTableInfo {
raw: *const ll::rocks_mem_table_info_t,
}
impl fmt::Debug for MemTableInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MemTableInfo")
.field("cf", &self.cf_name())
.field("first_seqno", &self.first_seqno())
.field("earliest_seqno", &self.earliest_seqno())
.field("num_entries", &self.num_entries())
.field("num_deletes", &self.num_deletes())
.finish()
}
}
impl MemTableInfo {
pub fn cf_name(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_mem_table_info_get_cf_name(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn first_seqno(&self) -> SequenceNumber {
unsafe { SequenceNumber(ll::rocks_mem_table_info_get_first_seqno(self.raw)) }
}
pub fn earliest_seqno(&self) -> SequenceNumber {
unsafe { SequenceNumber(ll::rocks_mem_table_info_get_earliest_seqno(self.raw)) }
}
pub fn num_entries(&self) -> u64 {
unsafe { ll::rocks_mem_table_info_get_num_entries(self.raw) }
}
pub fn num_deletes(&self) -> u64 {
unsafe { ll::rocks_mem_table_info_get_num_deletes(self.raw) }
}
}
pub struct ExternalFileIngestionInfo {
raw: *const ll::rocks_external_file_ingestion_info_t,
}
impl fmt::Debug for ExternalFileIngestionInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ExternalFileIngestionInfo")
.field("cf", &self.cf_name())
.field("external_file_path", &self.external_file_path())
.field("internal_file_path", &self.internal_file_path())
.field("global_seqno", &self.global_seqno())
.finish()
}
}
impl ExternalFileIngestionInfo {
pub fn cf_name(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_external_file_ingestion_info_get_cf_name(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn external_file_path(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_external_file_ingestion_info_get_external_file_path(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn internal_file_path(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_external_file_ingestion_info_get_internal_file_path(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
}
}
pub fn global_seqno(&self) -> SequenceNumber {
unsafe { SequenceNumber(ll::rocks_external_file_ingestion_info_get_global_seqno(self.raw)) }
}
pub fn table_properties(&self) -> TableProperties {
unsafe { TableProperties::from_ll(ll::rocks_external_file_ingestion_info_get_table_properties(self.raw)) }
}
}
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum CompactionListenerValueType {
Value,
MergeOperand,
Delete,
SingleDelete,
RangeDelete,
Invalid,
}
pub trait CompactionEventListener {
fn on_compaction(
&mut self,
level: i32,
key: &[u8],
value_type: CompactionListenerValueType,
existing_value: &[u8],
sn: SequenceNumber,
is_new: bool,
);
}
#[derive(Debug, Clone)]
pub struct CompactionEvent<'a> {
pub level: i32,
pub key: &'a [u8],
pub value_type: CompactionListenerValueType,
pub existing_value: &'a [u8],
pub sn: SequenceNumber,
pub is_new: bool,
}
impl<F> CompactionEventListener for F
where
F: FnMut(CompactionEvent),
{
fn on_compaction(
&mut self,
level: i32,
key: &[u8],
value_type: CompactionListenerValueType,
existing_value: &[u8],
sn: SequenceNumber,
is_new: bool,
) {
(*self)(CompactionEvent {
level: level,
key: key,
value_type: value_type,
existing_value: existing_value,
sn: sn,
is_new: is_new,
});
}
}
pub trait EventListener {
fn on_flush_completed(&mut self, db: &DBRef, flush_job_info: &FlushJobInfo) {}
fn on_flush_begin(&mut self, db: &DBRef, flush_job_info: &FlushJobInfo) {}
fn on_table_file_deleted(&mut self, info: &TableFileDeletionInfo) {}
fn on_compaction_completed(&mut self, db: &DBRef, ci: &CompactionJobInfo) {}
fn on_table_file_created(&mut self, info: &TableFileCreationInfo) {}
fn on_table_file_creation_started(&mut self, info: &TableFileCreationBriefInfo) {}
fn on_memtable_sealed(&mut self, info: &MemTableInfo) {}
fn on_column_family_handle_deletion_started(&mut self, handle: &ColumnFamilyHandle) {}
fn on_external_file_ingested(&mut self, db: &DBRef, info: &ExternalFileIngestionInfo) {}
fn on_background_error(&mut self, reason: BackgroundErrorReason, bg_error: Error) -> Result<()> {
Err(bg_error)
}
fn get_compaction_event_listener(&mut self) -> Option<&mut dyn CompactionEventListener> {
None
}
}
#[doc(hidden)]
pub mod c {
use super::*;
use crate::db::DBRef;
use crate::to_raw::FromRaw;
use std::mem;
use std::ptr;
use std::slice;
use std::str;
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_drop(l: *mut ()) {
let listener = l as *mut Box<dyn EventListener>;
Box::from_raw(listener);
}
unsafe fn flush_job_info_convert<'a>(info: *mut ll::rocks_flush_job_info_t) -> FlushJobInfo<'a> {
FlushJobInfo {
cf_name: {
let mut len = 0;
let ptr = ll::rocks_flush_job_info_get_cf_name(info, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
},
file_path: {
let mut len = 0;
let ptr = ll::rocks_flush_job_info_get_file_path(info, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
},
thread_id: ll::rocks_flush_job_info_get_thread_id(info),
job_id: ll::rocks_flush_job_info_get_job_id(info) as i32,
triggered_writes_slowdown: ll::rocks_flush_job_info_get_triggered_writes_slowdown(info) != 0,
triggered_writes_stop: ll::rocks_flush_job_info_get_triggered_writes_stop(info) != 0,
smallest_seqno: SequenceNumber(ll::rocks_flush_job_info_get_smallest_seqno(info)),
largest_seqno: SequenceNumber(ll::rocks_flush_job_info_get_largest_seqno(info)),
table_properties: TableProperties::from_ll(ll::rocks_flush_job_info_get_table_properties(info)),
}
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_flush_completed(
l: *mut (),
db: *mut (),
info: *mut ll::rocks_flush_job_info_t,
) {
let listener = l as *mut Box<dyn EventListener>;
let db_ref = mem::transmute::<_, DBRef>(db);
let flush_job_info = flush_job_info_convert(info);
(*listener).on_flush_completed(&db_ref, &flush_job_info);
mem::forget(db_ref);
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_flush_begin(
l: *mut (),
db: *mut (),
info: *mut ll::rocks_flush_job_info_t,
) {
let listener = l as *mut Box<dyn EventListener>;
let db_ref = mem::transmute::<_, DBRef>(db);
let flush_job_info = flush_job_info_convert(info);
(*listener).on_flush_begin(&db_ref, &flush_job_info);
mem::forget(db_ref);
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_table_file_deleted(
l: *mut (),
info: *mut ll::rocks_table_file_deletion_info_t,
) {
let listener = l as *mut Box<dyn EventListener>;
let info = TableFileDeletionInfo {
db_name: {
let mut len = 0;
let ptr = ll::rocks_table_file_deletion_info_get_db_name(info, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
},
file_path: {
let mut len = 0;
let ptr = ll::rocks_table_file_deletion_info_get_file_path(info, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const u8, len))
},
job_id: ll::rocks_table_file_deletion_info_get_job_id(info) as i32,
status: {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
ll::rocks_table_file_deletion_info_get_status(info, &mut status);
Result::from_ll(status)
},
};
(*listener).on_table_file_deleted(&info);
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_compaction_completed(
l: *mut (),
db: *mut (),
ci: *mut ll::rocks_compaction_job_info_t,
) {
let listener = l as *mut Box<dyn EventListener>;
let db_ref = mem::transmute::<_, DBRef>(db);
let info = CompactionJobInfo {
raw: ci,
_marker: PhantomData,
};
(*listener).on_compaction_completed(&db_ref, &info);
mem::forget(db_ref);
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_table_file_created(
l: *mut (),
info: *mut ll::rocks_table_file_creation_info_t,
) {
let listener = l as *mut Box<dyn EventListener>;
let info = TableFileCreationInfo { raw: info };
(*listener).on_table_file_created(&info);
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_table_file_creation_started(
l: *mut (),
info: *mut ll::rocks_table_file_creation_brief_info_t,
) {
let listener = l as *mut Box<dyn EventListener>;
let info = TableFileCreationBriefInfo { raw: info };
(*listener).on_table_file_creation_started(&info);
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_memtable_sealed(l: *mut (), info: *mut ll::rocks_mem_table_info_t) {
let listener = l as *mut Box<dyn EventListener>;
let info = MemTableInfo { raw: info };
(*listener).on_memtable_sealed(&info);
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_column_family_handle_deletion_started(
l: *mut (),
handle: *mut ll::rocks_column_family_handle_t,
) {
let listener = l as *mut Box<dyn EventListener>;
let cf = ColumnFamilyHandle::from_ll(handle);
(*listener).on_column_family_handle_deletion_started(&cf);
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_external_file_ingested(
l: *mut (),
db: *mut (),
info: *const ll::rocks_external_file_ingestion_info_t,
) {
let listener = l as *mut Box<dyn EventListener>;
let db_ref = mem::transmute::<_, DBRef>(db);
let info = ExternalFileIngestionInfo { raw: info };
(*listener).on_external_file_ingested(&db_ref, &info);
mem::forget(db_ref);
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_on_background_error(
l: *mut (),
reason: BackgroundErrorReason,
bg_error: *mut ll::rocks_status_t,
) -> u8 {
let listener = l as *mut Box<dyn EventListener>;
let result = Result::from_ll(bg_error);
let ret = (*listener).on_background_error(reason, result.unwrap_err());
if ret.is_ok() {
0
} else {
1
}
}
#[no_mangle]
pub unsafe extern "C" fn rust_event_listener_get_compaction_event_listener(l: *mut ()) -> *mut () {
let listener = l as *mut Box<dyn EventListener>;
match (*listener).get_compaction_event_listener() {
Some(mut_ref) => Box::into_raw(Box::new(mut_ref)) as *mut (),
None => ptr::null_mut(),
}
}
#[no_mangle]
pub unsafe extern "C" fn rust_compaction_event_listener_drop(l: *mut ()) {
let compaction_listener = l as *mut &mut dyn CompactionEventListener;
Box::from_raw(compaction_listener);
}
#[no_mangle]
pub unsafe extern "C" fn rust_compaction_event_listener_on_compaction(
l: *mut (),
level: i32,
key: &&[u8],
value_type: CompactionListenerValueType,
existing_value: &&[u8],
sn: u64,
is_new: u8,
) {
let compaction_listener = l as *mut &mut dyn CompactionEventListener;
(*compaction_listener).on_compaction(level, key, value_type, existing_value, SequenceNumber(sn), is_new != 0)
}
}
#[cfg(test)]
mod tests {
use super::super::rocksdb::*;
use super::*;
#[derive(Default)]
struct MyEventListener {
flush_completed_called: usize,
flush_begin_called: usize,
table_file_deleted_called: usize,
compaction_completed_called: usize,
table_file_created_called: usize,
table_file_creation_started_called: usize,
on_memtable_sealed_called: usize,
on_external_file_ingested_called: usize,
on_column_family_handle_deletion_started_called: usize,
}
impl Drop for MyEventListener {
fn drop(&mut self) {
assert!(
self.flush_begin_called
* self.flush_completed_called
* self.table_file_deleted_called
* self.compaction_completed_called
* self.table_file_created_called
* self.table_file_creation_started_called
* self.on_memtable_sealed_called
* self.on_external_file_ingested_called
> 0
);
assert!(self.on_column_family_handle_deletion_started_called > 0);
}
}
impl EventListener for MyEventListener {
fn on_flush_completed(&mut self, db: &DBRef, flush_job_info: &FlushJobInfo) {
assert!(db.name().len() > 0, "DB name is accessible");
self.flush_completed_called += 1;
}
fn on_flush_begin(&mut self, db: &DBRef, flush_job_info: &FlushJobInfo) {
self.flush_begin_called += 1;
}
fn on_table_file_deleted(&mut self, info: &TableFileDeletionInfo) {
assert!(info.status.is_ok());
self.table_file_deleted_called += 1;
}
fn on_compaction_completed(&mut self, db: &DBRef, ci: &CompactionJobInfo) {
assert!(ci.status().is_ok());
assert!(ci.stats().num_input_files() > 0);
self.compaction_completed_called += 1;
}
fn on_table_file_created(&mut self, info: &TableFileCreationInfo) {
assert!(info.status().is_ok());
assert!(info.file_size() > 0);
assert!(info.table_properties().num_entries() > 0);
assert!(info.reason() != TableFileCreationReason::Recovery);
self.table_file_created_called += 1;
}
fn on_table_file_creation_started(&mut self, info: &TableFileCreationBriefInfo) {
assert!(info.reason() != TableFileCreationReason::Recovery);
self.table_file_creation_started_called += 1;
}
fn on_memtable_sealed(&mut self, info: &MemTableInfo) {
assert!(info.num_entries() > 0);
self.on_memtable_sealed_called += 1;
}
fn on_column_family_handle_deletion_started(&mut self, handle: &ColumnFamilyHandle) {
assert_eq!(handle.id(), 0);
self.on_column_family_handle_deletion_started_called += 1;
}
fn on_external_file_ingested(&mut self, db: &DBRef, info: &ExternalFileIngestionInfo) {
assert_eq!(info.table_properties().num_entries(), 9);
self.on_external_file_ingested_called += 1;
}
fn on_background_error(&mut self, reason: BackgroundErrorReason, bg_error: Error) -> Result<()> {
Err(bg_error)
}
fn get_compaction_event_listener(&mut self) -> Option<&mut dyn CompactionEventListener> {
static mut FUNC: &'static dyn Fn(CompactionEvent) = &|event| {
assert!(event.is_new);
};
unsafe { Some(&mut FUNC) }
}
}
#[test]
fn event_listener_works() {
let tmp_dir = ::tempdir::TempDir::new_in(".", "rocks").unwrap();
let db = DB::open(
Options::default().map_db_options(|db| db.create_if_missing(true).add_listener(MyEventListener::default())),
&tmp_dir,
)
.unwrap();
for i in 0..100 {
let key = format!("test2-key-{}", i);
let val = format!("rocksdb-value-{}", i * 10);
db.put(&WriteOptions::default(), key.as_bytes(), val.as_bytes())
.unwrap();
if i % 6 == 0 {
assert!(db.flush(&FlushOptions::default().wait(true)).is_ok());
}
if i % 36 == 0 {
assert!(db.compact_range(&CompactRangeOptions::default(), ..).is_ok());
}
}
assert!(db.flush(&Default::default()).is_ok());
use crate::sst_file_writer::SstFileWriter;
let sst_dir = ::tempdir::TempDir::new_in(".", "rocks.sst").unwrap();
let writer = SstFileWriter::builder().build();
writer.open(sst_dir.path().join("2333.sst")).unwrap();
for i in 0..9 {
let key = format!("B{:05}", i);
let value = format!("ABCDEFGH{:03}IJKLMN", i);
writer.put(key.as_bytes(), value.as_bytes()).unwrap();
}
let info = writer.finish().unwrap();
assert_eq!(info.num_entries(), 9);
let ret = db.ingest_external_file(
&[sst_dir.path().join("2333.sst")],
&IngestExternalFileOptions::default(),
);
assert!(ret.is_ok(), "ingest external file fails: {:?}", ret);
assert!(Env::default_instance().get_thread_list().len() > 0);
assert!(db.pause_background_work().is_ok());
}
}