infinite_db/engine/
write_queue.rs1use std::io;
4
5use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
6
7use crate::infinitedb_core::{
8 address::RevisionId,
9 block::Record,
10 branch::BranchId,
11};
12use crate::infinitedb_storage::wal::WalEntry;
13
14#[derive(Debug, Clone)]
16pub struct WriteJob {
17 pub branch_id: BranchId,
18 pub revision: RevisionId,
19 pub entry: WalEntry,
20 pub record: Record,
21}
22
23impl WriteJob {
24 pub fn main(revision: RevisionId, entry: WalEntry, record: Record) -> Self {
26 Self {
27 branch_id: BranchId::MAIN,
28 revision,
29 entry,
30 record,
31 }
32 }
33}
34
35#[derive(Debug)]
37pub enum IoCommand {
38 Write(WriteJob),
39 Sync {
41 done: crossbeam_channel::Sender<io::Result<()>>,
42 },
43 Flush {
45 space_id: u64,
46 done: crossbeam_channel::Sender<io::Result<()>>,
47 },
48 Shutdown,
49}
50
51#[derive(Clone)]
53pub struct WriteQueueSender {
54 tx: Sender<IoCommand>,
55 capacity: usize,
56}
57
58impl WriteQueueSender {
59 pub fn new(capacity: usize) -> (Self, Receiver<IoCommand>) {
60 let (tx, rx) = bounded(capacity);
61 (
62 Self {
63 tx,
64 capacity,
65 },
66 rx,
67 )
68 }
69
70 pub fn enqueue_write(&self, job: WriteJob) -> io::Result<()> {
72 self.tx
73 .send(IoCommand::Write(job))
74 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))
75 }
76
77 pub fn try_enqueue_write(&self, job: WriteJob) -> io::Result<()> {
79 match self.tx.try_send(IoCommand::Write(job)) {
80 Ok(()) => Ok(()),
81 Err(TrySendError::Full(_)) => Err(io::Error::new(
82 io::ErrorKind::WouldBlock,
83 "write queue full",
84 )),
85 Err(TrySendError::Disconnected(_)) => Err(io::Error::new(
86 io::ErrorKind::BrokenPipe,
87 "I/O thread stopped",
88 )),
89 }
90 }
91
92 pub fn request_sync(&self) -> io::Result<()> {
93 let (done_tx, done_rx) = bounded(1);
94 self.tx
95 .send(IoCommand::Sync { done: done_tx })
96 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))?;
97 done_rx
98 .recv()
99 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))?
100 }
101
102 pub fn request_flush(&self, space_id: u64) -> io::Result<()> {
103 let (done_tx, done_rx) = bounded(1);
104 self.tx
105 .send(IoCommand::Flush {
106 space_id,
107 done: done_tx,
108 })
109 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))?;
110 done_rx
111 .recv()
112 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))?
113 }
114
115 pub fn shutdown(&self) -> io::Result<()> {
116 self.tx
117 .send(IoCommand::Shutdown)
118 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))
119 }
120
121 pub fn capacity(&self) -> usize {
122 self.capacity
123 }
124
125 pub fn queued_count(&self) -> usize {
126 self.tx.len()
127 }
128}