Skip to main content

infinite_db/engine/
write_queue.rs

1//! Bounded fire-and-forget write queue.
2
3use std::io;
4
5use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
6
7use crate::infinitedb_core::{
8    address::RevisionId,
9    block::Record,
10    branch::BranchId,
11};
12use crate::infinitedb_storage::wal::WalEntry;
13
14/// A single write job enqueued by application threads.
15#[derive(Debug, Clone)]
16pub struct WriteJob {
17    pub branch_id: BranchId,
18    pub revision: RevisionId,
19    pub entry: WalEntry,
20    pub record: Record,
21}
22
23impl WriteJob {
24    /// Main-branch write job.
25    pub fn main(revision: RevisionId, entry: WalEntry, record: Record) -> Self {
26        Self {
27            branch_id: BranchId::MAIN,
28            revision,
29            entry,
30            record,
31        }
32    }
33}
34
35/// I/O commands processed by the dedicated disk thread.
36#[derive(Debug)]
37pub enum IoCommand {
38    Write(WriteJob),
39    /// Drain all pending work and fsync staging state.
40    Sync {
41        done: crossbeam_channel::Sender<io::Result<()>>,
42    },
43    /// Seal hot segments for a space into a block.
44    Flush {
45        space_id: u64,
46        done: crossbeam_channel::Sender<io::Result<()>>,
47    },
48    Shutdown,
49}
50
51/// Sender half of the bounded write queue.
52#[derive(Clone)]
53pub struct WriteQueueSender {
54    tx: Sender<IoCommand>,
55    capacity: usize,
56}
57
58impl WriteQueueSender {
59    pub fn new(capacity: usize) -> (Self, Receiver<IoCommand>) {
60        let (tx, rx) = bounded(capacity);
61        (
62            Self {
63                tx,
64                capacity,
65            },
66            rx,
67        )
68    }
69
70    /// Enqueue a write. Blocks only when the queue is full (backpressure).
71    pub fn enqueue_write(&self, job: WriteJob) -> io::Result<()> {
72        self.tx
73            .send(IoCommand::Write(job))
74            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))
75    }
76
77    /// Try enqueue without blocking. Returns `WouldBlock` when full.
78    pub fn try_enqueue_write(&self, job: WriteJob) -> io::Result<()> {
79        match self.tx.try_send(IoCommand::Write(job)) {
80            Ok(()) => Ok(()),
81            Err(TrySendError::Full(_)) => Err(io::Error::new(
82                io::ErrorKind::WouldBlock,
83                "write queue full",
84            )),
85            Err(TrySendError::Disconnected(_)) => Err(io::Error::new(
86                io::ErrorKind::BrokenPipe,
87                "I/O thread stopped",
88            )),
89        }
90    }
91
92    pub fn request_sync(&self) -> io::Result<()> {
93        let (done_tx, done_rx) = bounded(1);
94        self.tx
95            .send(IoCommand::Sync { done: done_tx })
96            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))?;
97        done_rx
98            .recv()
99            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))?
100    }
101
102    pub fn request_flush(&self, space_id: u64) -> io::Result<()> {
103        let (done_tx, done_rx) = bounded(1);
104        self.tx
105            .send(IoCommand::Flush {
106                space_id,
107                done: done_tx,
108            })
109            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))?;
110        done_rx
111            .recv()
112            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))?
113    }
114
115    pub fn shutdown(&self) -> io::Result<()> {
116        self.tx
117            .send(IoCommand::Shutdown)
118            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "I/O thread stopped"))
119    }
120
121    pub fn capacity(&self) -> usize {
122        self.capacity
123    }
124
125    pub fn queued_count(&self) -> usize {
126        self.tx.len()
127    }
128}