use heed::{
Database, EnvOpenOptions,
byteorder::BigEndian,
types::{Bytes, U64},
};
use crate::disruptor::{
Envelope, RingSlot,
traits::{RkyvError, RkyvToBytes},
};
use crate::error::{RecoveryError, WalError};
use core_affinity::CoreId;
use disrupt_rs::{EventPoller, Polling, Sequence, wait_strategies::WaitStrategy};
use std::{
cell::RefCell,
path::Path,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::Instant,
};
use tracing::{error, info, trace, warn};
type BEU64 = U64<BigEndian>;
type LogDb = Database<BEU64, Bytes>;
type MetaDb = Database<Bytes, BEU64>;
const META_SCHEMA_VERSION_KEY: &[u8] = b"schema_version";
const WAL_SCHEMA_VERSION: u64 = 0;
fn wal_map_size_bytes() -> usize {
const DEFAULT_TEST: usize = 32 * 1024 * 1024;
const DEFAULT: usize = 10 * 1024 * 1024 * 1024;
if let Ok(v) = std::env::var("BTX_WAL_MAP_SIZE_BYTES")
&& let Ok(n) = v.parse::<u64>()
{
return n.min(usize::MAX as u64) as usize;
}
if cfg!(test) { DEFAULT_TEST } else { DEFAULT }
}
thread_local! {
static SCRATCH: RefCell<rkyv::util::AlignedVec> =
RefCell::new(rkyv::util::AlignedVec::with_capacity(1024));
}
#[derive(Debug, Clone, Copy)]
pub struct RecoveryState {
pub last_app_seq: u64,
pub last_tx_id: u64,
}
pub struct JournalHandler {
env: heed::Env,
log: LogDb,
}
pub type ArchivedEnvelopeOf<T> = <Envelope<T> as rkyv::Archive>::Archived;
impl JournalHandler {
pub fn open(path: impl AsRef<Path>, db: &str) -> anyhow::Result<Self> {
let path = path.as_ref();
std::fs::create_dir_all(path)?;
let env = unsafe {
EnvOpenOptions::new()
.map_size(wal_map_size_bytes())
.max_dbs(8)
.open(path)?
};
let log = {
let mut wtxn = env.write_txn()?;
let db: LogDb = env.create_database(&mut wtxn, Some(db))?;
let meta: MetaDb = env.create_database(&mut wtxn, Some("meta"))?;
match meta.get(&wtxn, META_SCHEMA_VERSION_KEY)? {
Some(v) if v == WAL_SCHEMA_VERSION => {}
Some(v) => {
return Err(WalError::SchemaMismatch {
expected: WAL_SCHEMA_VERSION,
found: v,
}
.into());
}
None => {
let has_any = {
let mut it = db.iter(&wtxn)?;
it.next().transpose()?.is_some()
};
if has_any {
return Err(WalError::SchemaMissing.into());
}
meta.put(&mut wtxn, META_SCHEMA_VERSION_KEY, &WAL_SCHEMA_VERSION)?;
}
}
wtxn.commit()?;
db
};
Ok(Self { env, log })
}
pub fn write_txn(&self) -> anyhow::Result<heed::RwTxn<'_>> {
Ok(self.env.write_txn()?)
}
pub fn put_envelope<T>(
&self,
wtxn: &mut heed::RwTxn<'_>,
envelope: &Envelope<T>,
) -> anyhow::Result<()>
where
T: RkyvToBytes,
{
SCRATCH.with(|scratch| -> anyhow::Result<()> {
let mut bytes = scratch.borrow_mut();
let mut writer = std::mem::take(&mut *bytes);
writer.clear();
let writer = rkyv::api::high::to_bytes_in::<_, RkyvError>(envelope, writer)?;
self.log.put(wtxn, &envelope.seq, writer.as_slice())?;
*bytes = writer;
Ok(())
})?;
Ok(())
}
pub fn recover_from<T, F>(&self, after_seq: Option<u64>, mut on_event: F) -> anyhow::Result<()>
where
T: rkyv::Archive,
ArchivedEnvelopeOf<T>:
for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, RkyvError>>,
F: for<'a> FnMut(&'a ArchivedEnvelopeOf<T>, bool) -> anyhow::Result<()>,
{
let start_seq = after_seq.map(|s| s.saturating_add(1)).unwrap_or(0);
info!(after_seq = ?after_seq, start_seq, "starting wal recovery");
let rtxn = self.env.read_txn()?;
let mut iter = self.log.range(&rtxn, &(start_seq..))?;
struct TxInProgress<'a, AE> {
tx_id: u64,
tx_len: u16,
events: Vec<&'a AE>,
}
let mut current_tx: Option<TxInProgress<'_, ArchivedEnvelopeOf<T>>> = None;
let mut last_seen_seq: Option<u64> = None;
let mut pending_last: Option<&ArchivedEnvelopeOf<T>> = None;
while let Some(entry) = iter.next().transpose()? {
let (app_seq, val_bytes) = entry;
last_seen_seq = Some(app_seq);
let archived =
match rkyv::api::high::access::<ArchivedEnvelopeOf<T>, RkyvError>(val_bytes) {
Ok(a) => a,
Err(e) => {
error!(app_seq, error = %e, "corrupt WAL record");
return Err(WalError::CorruptRecord {
app_seq,
details: e.to_string(),
}
.into());
}
};
let tx_id: u64 = archived.tx_id.into();
let tx_len: u16 = archived.tx_len.into();
let tx_ix: u16 = archived.tx_ix.into();
if tx_len == 0 || tx_ix >= tx_len {
error!(app_seq, tx_id, tx_len, tx_ix, "invalid wal tx framing");
return Err(WalError::InvalidTxFraming {
app_seq,
tx_id,
tx_len,
tx_ix,
}
.into());
}
match &mut current_tx {
None => {
if tx_ix != 0 {
continue;
}
current_tx = Some(TxInProgress {
tx_id,
tx_len,
events: vec![archived],
});
}
Some(tx) => {
let expected_ix: u16 = tx.events.len().try_into().unwrap_or(u16::MAX);
let framing_mismatch =
tx.tx_id != tx_id || tx.tx_len != tx_len || tx_ix != expected_ix;
if framing_mismatch {
if tx_ix != 0 {
current_tx = None;
continue;
}
*tx = TxInProgress {
tx_id,
tx_len,
events: vec![archived],
};
} else {
tx.events.push(archived);
}
}
}
if let Some(tx) = &mut current_tx
&& tx.events.len() == tx.tx_len as usize
{
for e in tx.events.drain(..) {
if let Some(prev) = pending_last.take() {
on_event(prev, false)?;
}
pending_last = Some(e);
}
current_tx = None;
}
}
if let Some(tx) = current_tx.take() {
let trimmed_count = tx.events.len();
if trimmed_count != 0 {
let Some(first) = tx.events.first() else {
return Err(RecoveryError::WalTailMissingFirst.into());
};
let Some(last) = tx.events.last() else {
return Err(RecoveryError::WalTailMissingLast.into());
};
let trimmed_start: u64 = first.seq.into();
let trimmed_end: u64 = last.seq.into();
let trim_to = trimmed_start.saturating_sub(1);
warn!(
trimmed_start,
trimmed_end, trimmed_count, trim_to, "dropping incomplete wal tail"
);
}
}
if let Some(last) = pending_last.take() {
on_event(last, true)?;
}
let last_seq = last_seen_seq.or(after_seq).unwrap_or(0);
info!(last_seq, "wal recovery complete");
Ok(())
}
pub fn into_poller<T, W, B>(
self,
poller: EventPoller<RingSlot<T>, B>,
wait_strategy: W,
gate: Arc<disrupt_rs::DependentSequence>,
shutdown: Arc<AtomicBool>,
) -> JournalPoller<T, W, B> {
JournalPoller::<T, W, B>::new(self, poller, wait_strategy, gate, shutdown)
}
}
pub struct JournalPoller<T, W, B> {
handler: JournalHandler,
poller: EventPoller<RingSlot<T>, B>,
wait_strategy: W,
gate: Arc<disrupt_rs::DependentSequence>,
shutdown: Arc<AtomicBool>,
_phantom: std::marker::PhantomData<T>,
}
impl<T, W, B> JournalPoller<T, W, B> {
pub fn new(
handler: JournalHandler,
poller: EventPoller<RingSlot<T>, B>,
wait_strategy: W,
gate: Arc<disrupt_rs::DependentSequence>,
shutdown: Arc<AtomicBool>,
) -> Self {
Self {
handler,
poller,
wait_strategy,
gate,
shutdown,
_phantom: std::marker::PhantomData,
}
}
}
impl<T, W, B> JournalPoller<T, W, B>
where
W: WaitStrategy + Send + 'static,
T: RkyvToBytes + Send + Sync + 'static,
B: disrupt_rs::Barrier + 'static,
{
pub fn poll(self) -> thread::JoinHandle<anyhow::Result<()>> {
self.spawn(None)
}
pub fn poll_pinned(self, core_id: Option<usize>) -> thread::JoinHandle<anyhow::Result<()>> {
self.spawn(core_id)
}
fn spawn(self, core_id: Option<usize>) -> thread::JoinHandle<anyhow::Result<()>> {
let mut this = self;
thread::spawn(move || {
trace!(core_id = ?core_id, "starting wal poller");
if let Some(id) = core_id {
let _ = core_affinity::set_for_current(CoreId { id });
}
crate::metrics_stage!("wal_poller");
let mut waiter = this.wait_strategy.new_waiter();
let res = (|| {
let mut wtxn = this.handler.env.write_txn()?;
let mut last_completed_tx_end: Sequence = 0;
'poll: loop {
match this.poller.poll_wait(&mut waiter) {
Ok(mut events) => {
let t0 = Instant::now();
let events_len = events.len();
crate::metric!({ inflight: 1, in_total: events_len as u64 });
for (s, e) in &mut events {
let envelope: &Envelope<T> = e;
this.handler.put_envelope(&mut wtxn, envelope)?;
if envelope.is_tx_end() {
last_completed_tx_end = s;
}
}
wtxn.commit()?;
this.gate.set(last_completed_tx_end);
wtxn = this.handler.env.write_txn()?;
crate::metric!({
out_total: 1, inflight: 0,
duration_ns: t0.elapsed().as_nanos() as u64
});
}
Err(Polling::NoEvents) => {
if this.shutdown.load(Ordering::Acquire) {
break 'poll;
}
continue;
}
Err(Polling::Shutdown) => break 'poll,
}
}
Ok::<_, anyhow::Error>(())
})();
if let Err(e) = res {
error!(error = ?e, "journal poller error");
crate::metric!({ err_total: 1 });
this.shutdown.store(true, Ordering::Relaxed);
return Err(e);
}
Ok(())
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::book::{BookEvent, BookEventEnvelope};
use crate::disruptor::{Envelope, RingSlot};
use crate::metrics::globals;
use crate::types::MarketId;
use crossbeam_utils::CachePadded;
use disrupt_rs::wait_strategies::WaitStrategy;
use disrupt_rs::{BusySpin, DependentSequence, Polling, Producer, build_single_producer};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
fn open_journal(path: &std::path::Path, db: &str) -> anyhow::Result<Option<JournalHandler>> {
match JournalHandler::open(path, db) {
Ok(j) => Ok(Some(j)),
Err(e) => {
let is_permission = e.chain().any(|cause| {
cause.downcast_ref::<std::io::Error>().is_some_and(|io| {
io.kind() == std::io::ErrorKind::PermissionDenied
|| io.raw_os_error() == Some(1)
})
}) || e.to_string().contains("Operation not permitted");
if is_permission {
warn!(error = ?e, "skipping LMDB WAL test");
Ok(None)
} else {
Err(e)
}
}
}
}
fn temp_lmdb_path(test_name: &str) -> std::path::PathBuf {
let uniq = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("target")
.join("wal_tests")
.join(format!("btx-{test_name}-{uniq}"))
}
#[test]
fn recover_from_trims_incomplete_tail_transaction() -> anyhow::Result<()> {
#[derive(
Debug, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize, bytecheck::CheckBytes,
)]
struct Payload {
value: u64,
}
let path = temp_lmdb_path("recover-from-trims-tail");
let Some(journaler) = open_journal(&path, "walw")? else {
return Ok(());
};
let mut wtxn = journaler.write_txn()?;
for seq in 91u64..=97u64 {
let e = Envelope {
seq,
payload: Payload { value: seq },
response_cb: None,
tx_id: seq,
tx_len: 1,
tx_ix: 0,
};
journaler.put_envelope(&mut wtxn, &e)?;
}
let tx_id = 999;
for (ix, seq) in (98u64..=100u64).enumerate() {
let e = Envelope {
seq,
payload: Payload { value: seq },
response_cb: None,
tx_id,
tx_len: 5,
tx_ix: ix as u16,
};
journaler.put_envelope(&mut wtxn, &e)?;
}
wtxn.commit()?;
let mut seqs: Vec<u64> = Vec::new();
journaler.recover_from::<Payload, _>(Some(90), |e, _eob| {
seqs.push(e.seq.into());
Ok(())
})?;
assert_eq!(seqs, (91u64..=97u64).collect::<Vec<_>>());
let _ = std::fs::remove_dir_all(&path);
Ok(())
}
#[test]
fn recover_from_emits_eob_on_last_valid_event_before_incomplete_tail() -> anyhow::Result<()> {
#[derive(
Debug, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize, bytecheck::CheckBytes,
)]
struct Payload {
value: u64,
}
let path = temp_lmdb_path("recover-from-eob-before-tail");
let Some(journaler) = open_journal(&path, "walw")? else {
return Ok(());
};
let mut wtxn = journaler.write_txn()?;
for seq in 10u64..=12u64 {
let e = Envelope {
seq,
payload: Payload { value: seq },
response_cb: None,
tx_id: seq,
tx_len: 1,
tx_ix: 0,
};
journaler.put_envelope(&mut wtxn, &e)?;
}
let tx_id = 999;
for (ix, seq) in (13u64..=14u64).enumerate() {
let e = Envelope {
seq,
payload: Payload { value: seq },
response_cb: None,
tx_id,
tx_len: 3,
tx_ix: ix as u16,
};
journaler.put_envelope(&mut wtxn, &e)?;
}
wtxn.commit()?;
let mut seen: Vec<(u64, bool)> = Vec::new();
journaler.recover_from::<Payload, _>(Some(9), |e, eob| {
seen.push((e.seq.into(), eob));
Ok(())
})?;
assert_eq!(seen.len(), 3);
assert_eq!(seen[0], (10, false));
assert_eq!(seen[1], (11, false));
assert_eq!(seen[2], (12, true));
let _ = std::fs::remove_dir_all(&path);
Ok(())
}
fn spawn_downstream_poller(
mut poller: disrupt_rs::EventPoller<
RingSlot<BookEventEnvelope>,
disrupt_rs::MultiConsumerDependentsBarrier,
>,
stop: Arc<AtomicBool>,
seen: Arc<parking_lot::Mutex<Vec<u64>>>,
done_after_events: usize,
) -> (std::thread::JoinHandle<()>, flume::Receiver<()>) {
let (done_tx, done_rx) = flume::bounded::<()>(1);
let t = std::thread::spawn(move || {
let mut waiter = BusySpin.new_waiter();
while !stop.load(Ordering::Relaxed) {
match poller.poll_wait(&mut waiter) {
Ok(mut events) => {
let mut seen = seen.lock();
for (_s, e) in &mut events {
seen.push(e.tx_id);
}
if seen.len() >= done_after_events {
let _ = done_tx.send(());
break;
}
}
Err(Polling::NoEvents) => continue,
Err(Polling::Shutdown) => break,
}
}
});
(t, done_rx)
}
#[test]
fn downstream_only_sees_full_transactions_when_gated_on_wal_commit() -> anyhow::Result<()> {
let path = temp_lmdb_path("pipeline-gating");
let wal_shutdown = Arc::new(AtomicBool::new(false));
let Some(journal) = open_journal(&path, "walw")? else {
return Ok(());
};
let wait_strategy = BusySpin;
let (wal_poller, builder) = build_single_producer(
64,
|| {
CachePadded::new(Envelope {
seq: 0,
payload: BookEventEnvelope {
market_id: MarketId(0),
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
},
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
},
wait_strategy,
)
.event_poller();
let wal_gate = Arc::new(DependentSequence::new());
let (downstream_poller, builder) = builder
.and_then_with_dependents(vec![wal_gate.clone()])
.event_poller();
let producer = builder.build();
let seen: Arc<parking_lot::Mutex<Vec<u64>>> = Arc::new(parking_lot::Mutex::new(Vec::new()));
let stop_downstream = Arc::new(AtomicBool::new(false));
let (downstream_thread, done_rx) = spawn_downstream_poller(
downstream_poller,
Arc::clone(&stop_downstream),
Arc::clone(&seen),
1,
);
globals::init(&["wal_poller"]);
let wal_thread = journal
.into_poller(
wal_poller,
wait_strategy,
Arc::clone(&wal_gate),
Arc::clone(&wal_shutdown),
)
.poll();
let mut p = producer;
p.try_publish(|e| {
e.seq = 0;
e.payload = BookEventEnvelope {
market_id: MarketId(0),
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
};
e.response_cb = None;
e.tx_id = 1;
e.tx_len = 1;
e.tx_ix = 0;
})?;
p.try_batch_publish(3, |iter| {
let mut seq = 1u64;
for (ix, e) in iter.enumerate() {
e.seq = seq;
e.payload = BookEventEnvelope {
market_id: MarketId(0),
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
};
e.response_cb = None;
e.tx_id = 2;
e.tx_len = 5;
e.tx_ix = ix as u16;
seq += 1;
}
})?;
drop(p);
wal_thread.join().expect("wal thread should not panic")?;
let _ = done_rx.recv_timeout(std::time::Duration::from_secs(2));
stop_downstream.store(true, Ordering::Relaxed);
downstream_thread
.join()
.expect("downstream thread should not panic");
let tx_ids = seen.lock().clone();
assert!(tx_ids.contains(&1), "expected to see committed tx_id=1");
assert!(
!tx_ids.contains(&2),
"did not expect downstream to see partial tx_id=2"
);
let Some(recovered) = open_journal(&path, "walw")? else {
return Ok(());
};
let mut recovered_tx_ids: Vec<u64> = Vec::new();
recovered.recover_from::<BookEventEnvelope, _>(None, |e, _eob| {
recovered_tx_ids.push(e.tx_id.into());
Ok(())
})?;
assert_eq!(recovered_tx_ids, vec![1u64]);
let _ = std::fs::remove_dir_all(&path);
Ok(())
}
#[test]
fn downstream_sees_multiple_full_transactions_in_one_commit() -> anyhow::Result<()> {
let path = temp_lmdb_path("pipeline-multi-tx-one-commit");
let wal_shutdown = Arc::new(AtomicBool::new(false));
let Some(journal) = open_journal(&path, "walw")? else {
return Ok(());
};
let wait_strategy = BusySpin;
let (wal_poller, builder) = build_single_producer(
64,
|| {
CachePadded::new(Envelope {
seq: 0,
payload: BookEventEnvelope {
market_id: MarketId(0),
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
},
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
},
wait_strategy,
)
.event_poller();
let wal_gate = Arc::new(DependentSequence::new());
let (downstream_poller, builder) = builder
.and_then_with_dependents(vec![wal_gate.clone()])
.event_poller();
let producer = builder.build();
let expected_events = 6usize; let seen: Arc<parking_lot::Mutex<Vec<u64>>> = Arc::new(parking_lot::Mutex::new(Vec::new()));
let stop_downstream = Arc::new(AtomicBool::new(false));
let (downstream_thread, done_rx) = spawn_downstream_poller(
downstream_poller,
Arc::clone(&stop_downstream),
Arc::clone(&seen),
expected_events,
);
globals::init(&["wal_poller"]);
let wal_thread = journal
.into_poller(
wal_poller,
wait_strategy,
Arc::clone(&wal_gate),
Arc::clone(&wal_shutdown),
)
.poll();
let mut p = producer;
p.try_batch_publish(2, |iter| {
for (ix, e) in iter.enumerate() {
e.seq = ix as u64;
e.payload = BookEventEnvelope {
market_id: MarketId(0),
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
};
e.response_cb = None;
e.tx_id = 1;
e.tx_len = 2;
e.tx_ix = ix as u16;
}
})?;
p.try_batch_publish(3, |iter| {
for (ix, e) in iter.enumerate() {
e.seq = 2 + ix as u64;
e.payload = BookEventEnvelope {
market_id: MarketId(0),
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
};
e.response_cb = None;
e.tx_id = 2;
e.tx_len = 3;
e.tx_ix = ix as u16;
}
})?;
p.try_publish(|e| {
e.seq = 5;
e.payload = BookEventEnvelope {
market_id: MarketId(0),
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
};
e.response_cb = None;
e.tx_id = 3;
e.tx_len = 1;
e.tx_ix = 0;
})?;
drop(p);
wal_thread.join().expect("wal thread should not panic")?;
done_rx.recv_timeout(std::time::Duration::from_secs(2))?;
stop_downstream.store(true, Ordering::Relaxed);
downstream_thread
.join()
.expect("downstream thread should not panic");
let tx_ids = seen.lock().clone();
assert_eq!(tx_ids.len(), expected_events);
assert!(tx_ids.contains(&1));
assert!(tx_ids.contains(&2));
assert!(tx_ids.contains(&3));
let Some(recovered) = open_journal(&path, "walw")? else {
return Ok(());
};
let mut recovered_tx_ids: Vec<u64> = Vec::new();
recovered.recover_from::<BookEventEnvelope, _>(None, |e, _eob| {
recovered_tx_ids.push(e.tx_id.into());
Ok(())
})?;
assert_eq!(recovered_tx_ids.len(), expected_events);
let _ = std::fs::remove_dir_all(&path);
Ok(())
}
#[test]
fn crash_before_commit_drops_uncommitted_writes() -> anyhow::Result<()> {
let path = temp_lmdb_path("pipeline-crash-before-commit");
{
let Some(journal) = open_journal(&path, "walw")? else {
return Ok(());
};
let mut wtxn = journal.env.write_txn()?;
let e = Envelope {
seq: 0,
payload: BookEventEnvelope {
market_id: MarketId(0),
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
},
response_cb: None,
tx_id: 1,
tx_len: 1,
tx_ix: 0,
};
journal.put_envelope(&mut wtxn, &e)?;
wtxn.commit()?;
}
{
let Some(journal) = open_journal(&path, "walw")? else {
return Ok(());
};
let mut wtxn = journal.env.write_txn()?;
for (ix, seq) in (1u64..=3u64).enumerate() {
let e = Envelope {
seq,
payload: BookEventEnvelope {
market_id: MarketId(0),
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
},
response_cb: None,
tx_id: 2,
tx_len: 3,
tx_ix: ix as u16,
};
journal.put_envelope(&mut wtxn, &e)?;
}
drop(wtxn);
}
let Some(recovered) = open_journal(&path, "walw")? else {
return Ok(());
};
let mut recovered_tx_ids: Vec<u64> = Vec::new();
recovered.recover_from::<BookEventEnvelope, _>(None, |e, _eob| {
recovered_tx_ids.push(e.tx_id.into());
Ok(())
})?;
assert_eq!(recovered_tx_ids, vec![1u64]);
let _ = std::fs::remove_dir_all(&path);
Ok(())
}
}