use std::io;
use crossbeam_channel::{bounded, Receiver, Sender};
use crate::infinitedb_core::{
address::{RevisionId, SpaceId},
block::Record,
branch::BranchId,
hilbert_key::{CachedHilbertKey, HilbertKey},
};
use crate::infinitedb_storage::wal::WalEntry;
#[derive(Debug)]
pub struct WriteJob {
pub branch_id: BranchId,
pub revision: RevisionId,
pub entry: WalEntry,
pub hilbert_key: HilbertKey,
}
impl WriteJob {
pub fn main(revision: RevisionId, entry: WalEntry, hilbert_key: HilbertKey) -> Self {
Self {
branch_id: BranchId::MAIN,
revision,
entry,
hilbert_key,
}
}
pub fn into_record(self) -> Record {
match self.entry {
WalEntry::Write {
address,
revision,
data,
} => Record {
address,
revision,
data,
tombstone: false,
hilbert_key: CachedHilbertKey::set(self.hilbert_key),
},
WalEntry::Tombstone { address, revision } => Record {
address,
revision,
data: vec![],
tombstone: true,
hilbert_key: CachedHilbertKey::set(self.hilbert_key),
},
other => panic!("unsupported WAL entry in write job: {other:?}"),
}
}
pub fn entry(&self) -> &WalEntry {
&self.entry
}
pub fn space_id(&self) -> SpaceId {
match &self.entry {
WalEntry::Write { address, .. } | WalEntry::Tombstone { address, .. } => {
address.space
}
_ => SpaceId(0),
}
}
}
#[derive(Debug)]
pub enum IoCommand {
Write(WriteJob),
WriteBatch(Vec<WriteJob>),
Sync {
done: crossbeam_channel::Sender<io::Result<()>>,
},
Flush {
space: SpaceId,
done: crossbeam_channel::Sender<io::Result<()>>,
},
Shutdown,
}
#[derive(Clone)]
pub struct WriteQueueSender {
tx: Sender<IoCommand>,
}
impl WriteQueueSender {
pub fn new(capacity: usize) -> (Self, Receiver<IoCommand>) {
let (tx, rx) = bounded(capacity);
(
Self { tx },
rx,
)
}
pub fn enqueue_write(&self, job: WriteJob) -> io::Result<()> {
self.tx
.send(IoCommand::Write(job))
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))
}
pub fn enqueue_write_batch(&self, jobs: Vec<WriteJob>) -> io::Result<()> {
if jobs.is_empty() {
return Ok(());
}
self.tx
.send(IoCommand::WriteBatch(jobs))
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))
}
pub fn post_sync(&self, done: crossbeam_channel::Sender<io::Result<()>>) -> io::Result<()> {
self.tx
.send(IoCommand::Sync { done })
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))
}
pub fn request_flush(&self, space: SpaceId) -> io::Result<()> {
let (done_tx, done_rx) = bounded(1);
self.post_flush(space, done_tx)?;
done_rx
.recv()
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))?
}
pub fn post_flush(
&self,
space: SpaceId,
done: crossbeam_channel::Sender<io::Result<()>>,
) -> io::Result<()> {
self.tx
.send(IoCommand::Flush { space, done })
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))
}
pub fn shutdown(&self) -> io::Result<()> {
self.tx
.send(IoCommand::Shutdown)
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))
}
pub fn queued_count(&self) -> usize {
self.tx.len()
}
}