Skip to main content

infinite_db/engine/
io_thread.rs

1//! Dedicated disk I/O thread (fire-and-forget write drain).
2
3use std::collections::{HashMap, HashSet};
4use std::io;
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::thread::{self, JoinHandle};
9use std::time::Duration;
10
11use crossbeam_channel::Receiver;
12use parking_lot::RwLock;
13
14use crate::infinitedb_core::{
15    address::{RevisionId, SpaceId},
16    block::{Block, BlockId},
17    snapshot::{BlockIndexEntry, SnapshotId},
18    space::SpaceRegistry,
19};
20use crate::infinitedb_storage::{
21    hot_segment::{wal_entry_to_record, HotSegment},
22    nvme::{compute_checksum, BlockStore},
23    wal::{WalDurability, WalEntry, WalWriter},
24};
25
26use super::live_tail::LiveTailView;
27use super::query::space_key;
28use super::snapshot_store::SnapshotStore;
29use super::write_queue::{IoCommand, WriteJob, WriteQueueSender};
30
31/// Tuning for the dedicated I/O thread.
32#[derive(Debug, Clone)]
33pub struct IoThreadConfig {
34    pub direct_write_timeout: Duration,
35    pub hot_segment_seal_threshold: usize,
36    pub write_queue_capacity: usize,
37    pub wal_group_commit_interval: Duration,
38}
39
40impl Default for IoThreadConfig {
41    fn default() -> Self {
42        Self {
43            direct_write_timeout: Duration::from_millis(2),
44            hot_segment_seal_threshold: 256,
45            write_queue_capacity: 4096,
46            wal_group_commit_interval: Duration::from_millis(1),
47        }
48    }
49}
50
51/// Runtime statistics for the I/O thread.
52#[derive(Debug, Clone, Default)]
53pub struct IoStats {
54    pub queue_depth: usize,
55    pub direct_writes: u64,
56    pub staged_writes: u64,
57    pub staging_wal_frames: usize,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum WriteRoute {
62    Direct,
63    Staged,
64}
65
66pub struct IoThreadHandle {
67    join: Option<JoinHandle<io::Result<()>>>,
68    direct_writes: Arc<AtomicU64>,
69    staged_writes: Arc<AtomicU64>,
70}
71
72impl IoThreadHandle {
73    pub fn spawn(
74        root: PathBuf,
75        store: Arc<BlockStore>,
76        snapshots: Arc<SnapshotStore>,
77        live_tail: Arc<LiveTailView>,
78        spaces: Arc<RwLock<SpaceRegistry>>,
79        revision: Arc<AtomicU64>,
80        next_block_id: Arc<AtomicU64>,
81        next_snapshot_id: Arc<AtomicU64>,
82        rx: Receiver<IoCommand>,
83        config: IoThreadConfig,
84    ) -> Self {
85        let direct_writes = Arc::new(AtomicU64::new(0));
86        let staged_writes = Arc::new(AtomicU64::new(0));
87        let direct_clone = Arc::clone(&direct_writes);
88        let staged_clone = Arc::clone(&staged_writes);
89
90        let join = thread::Builder::new()
91            .name("infinitedb-io".into())
92            .spawn(move || {
93                run_io_loop(
94                    root,
95                    store,
96                    snapshots,
97                    live_tail,
98                    spaces,
99                    revision,
100                    next_block_id,
101                    next_snapshot_id,
102                    rx,
103                    config,
104                    direct_clone,
105                    staged_clone,
106                )
107            })
108            .expect("spawn io thread");
109
110        Self {
111            join: Some(join),
112            direct_writes,
113            staged_writes,
114        }
115    }
116
117    pub fn direct_writes(&self) -> u64 {
118        self.direct_writes.load(Ordering::Relaxed)
119    }
120
121    pub fn staged_writes(&self) -> u64 {
122        self.staged_writes.load(Ordering::Relaxed)
123    }
124
125    pub fn join(&mut self) -> io::Result<()> {
126        if let Some(handle) = self.join.take() {
127            handle
128                .join()
129                .map_err(|_| io::Error::new(io::ErrorKind::Other, "io thread panicked"))??;
130        }
131        Ok(())
132    }
133}
134
135pub fn open_io_pipeline(
136    root: PathBuf,
137    store: Arc<BlockStore>,
138    snapshots: Arc<SnapshotStore>,
139    live_tail: Arc<LiveTailView>,
140    spaces: Arc<RwLock<SpaceRegistry>>,
141    revision: Arc<AtomicU64>,
142    next_block_id: Arc<AtomicU64>,
143    next_snapshot_id: Arc<AtomicU64>,
144    config: IoThreadConfig,
145) -> (WriteQueueSender, IoThreadHandle) {
146    let (tx, rx) = WriteQueueSender::new(config.write_queue_capacity);
147    let handle = IoThreadHandle::spawn(
148        root,
149        store,
150        snapshots,
151        live_tail,
152        spaces,
153        revision,
154        next_block_id,
155        next_snapshot_id,
156        rx,
157        config,
158    );
159    (tx, handle)
160}
161
162struct IoState {
163    root: PathBuf,
164    store: Arc<BlockStore>,
165    snapshots: Arc<SnapshotStore>,
166    live_tail: Arc<LiveTailView>,
167    spaces: Arc<RwLock<SpaceRegistry>>,
168    revision: Arc<AtomicU64>,
169    next_block_id: Arc<AtomicU64>,
170    next_snapshot_id: Arc<AtomicU64>,
171    config: IoThreadConfig,
172    hot: HashMap<u64, HotSegment>,
173    staging: StagingWal,
174    hot_record_counts: HashMap<u64, usize>,
175}
176
177struct StagingWal {
178    writer: WalWriter,
179    entries: Vec<WalEntry>,
180}
181
182impl StagingWal {
183    fn open(path: PathBuf) -> io::Result<Self> {
184        std::fs::create_dir_all(path.parent().unwrap())?;
185        let writer =
186            WalWriter::open_with_durability(path.clone(), WalDurability::Buffered { sync_every: usize::MAX })?;
187        let mut reader = crate::infinitedb_storage::wal::WalReader::open(path)?;
188        let entries = reader.entries()?;
189        Ok(Self { writer, entries })
190    }
191
192    fn append(&mut self, entry: &WalEntry) -> io::Result<()> {
193        self.writer.append_frame(entry)?;
194        self.entries.push(entry.clone());
195        Ok(())
196    }
197
198    fn sync(&mut self) -> io::Result<()> {
199        self.writer.sync()
200    }
201
202    fn rewrite_remaining(&mut self, entries: Vec<WalEntry>) -> io::Result<()> {
203        self.entries = entries;
204        self.writer.rewrite(&self.entries)
205    }
206}
207
208fn run_io_loop(
209    root: PathBuf,
210    store: Arc<BlockStore>,
211    snapshots: Arc<SnapshotStore>,
212    live_tail: Arc<LiveTailView>,
213    spaces: Arc<RwLock<SpaceRegistry>>,
214    revision: Arc<AtomicU64>,
215    next_block_id: Arc<AtomicU64>,
216    next_snapshot_id: Arc<AtomicU64>,
217    rx: Receiver<IoCommand>,
218    config: IoThreadConfig,
219    direct_writes: Arc<AtomicU64>,
220    staged_writes: Arc<AtomicU64>,
221) -> io::Result<()> {
222    let staging_path = store.staging_wal_path();
223    let mut state = IoState {
224        root,
225        store,
226        snapshots,
227        live_tail,
228        spaces,
229        revision,
230        next_block_id,
231        next_snapshot_id,
232        config: config.clone(),
233        hot: HashMap::new(),
234        staging: StagingWal::open(staging_path)?,
235        hot_record_counts: HashMap::new(),
236    };
237
238    // Recover staging WAL into live tail.
239    for entry in state.staging.entries.clone() {
240        if let Some(record) = wal_entry_to_record(entry) {
241            state.live_tail.append(record);
242        }
243    }
244
245    // Recover hot segments into live tail (idempotent merge handled by revision uniqueness in tests).
246    let hot_dir = state.root.join("hot");
247    if hot_dir.exists() {
248        for entry in std::fs::read_dir(hot_dir)? {
249            let entry = entry?;
250            let name = entry.file_name().to_string_lossy().to_string();
251            if let Some(stem) = name.strip_suffix(".seg") {
252                if let Ok(space_id) = stem.parse::<u64>() {
253                    let mut seg = HotSegment::open(state.root.clone(), space_id)?;
254                    let records = seg.read_all_records()?;
255                    state.hot_record_counts.insert(space_id, records.len());
256                    for record in records {
257                        state.live_tail.append(record);
258                    }
259                    state.hot.insert(space_id, seg);
260                }
261            }
262        }
263    }
264
265    let mut shutting_down = false;
266    while !shutting_down {
267        match rx.recv_timeout(config.wal_group_commit_interval) {
268            Ok(cmd) => {
269                if matches!(cmd, IoCommand::Shutdown) {
270                    handle_command(&mut state, cmd, &direct_writes, &staged_writes)?;
271                    shutting_down = true;
272                } else {
273                    handle_command(&mut state, cmd, &direct_writes, &staged_writes)?;
274                }
275            }
276            Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
277                promote_staging(&mut state)?;
278            }
279            Err(crossbeam_channel::RecvTimeoutError::Disconnected) => shutting_down = true,
280        }
281    }
282
283    Ok(())
284}
285
286fn handle_command(
287    state: &mut IoState,
288    cmd: IoCommand,
289    direct_writes: &AtomicU64,
290    staged_writes: &AtomicU64,
291) -> io::Result<()> {
292    match cmd {
293        IoCommand::Write(job) => {
294            let space_id = job_space(&job);
295            process_write(state, job, direct_writes, staged_writes)?;
296            maybe_auto_seal(state, space_id)?;
297        }
298        IoCommand::Sync { done } => {
299            promote_staging(state)?;
300            state.staging.sync()?;
301            let _ = done.send(Ok(()));
302        }
303        IoCommand::Flush { space_id, done } => {
304            let result = seal_space(state, space_id);
305            let _ = done.send(result);
306        }
307        IoCommand::Shutdown => {
308            promote_staging(state)?;
309            state.staging.sync()?;
310            return Ok(());
311        }
312    }
313    Ok(())
314}
315
316fn job_space(job: &WriteJob) -> u64 {
317    job.record.address.space.0
318}
319
320fn process_write(
321    state: &mut IoState,
322    job: WriteJob,
323    direct_writes: &AtomicU64,
324    staged_writes: &AtomicU64,
325) -> io::Result<()> {
326    let space_id = job_space(&job);
327    let hot = state
328        .hot
329        .entry(space_id)
330        .or_insert_with(|| {
331            HotSegment::open(state.root.clone(), space_id).expect("open hot segment")
332        });
333
334    let routed = if hot.try_append_with_deadline(&job.entry, state.config.direct_write_timeout)? {
335        direct_writes.fetch_add(1, Ordering::Relaxed);
336        WriteRoute::Direct
337    } else {
338        state.staging.append(&job.entry)?;
339        state.staging.sync()?;
340        staged_writes.fetch_add(1, Ordering::Relaxed);
341        WriteRoute::Staged
342    };
343
344    let _ = routed;
345    state.live_tail.append(job.record);
346    *state.hot_record_counts.entry(space_id).or_insert(0) += 1;
347    Ok(())
348}
349
350fn maybe_auto_seal(state: &mut IoState, space_id: u64) -> io::Result<()> {
351    let count = state.hot_record_counts.get(&space_id).copied().unwrap_or(0);
352    if count >= state.config.hot_segment_seal_threshold {
353        seal_space(state, space_id)?;
354    }
355    Ok(())
356}
357
358fn promote_staging(state: &mut IoState) -> io::Result<()> {
359    if state.staging.entries.is_empty() {
360        return Ok(());
361    }
362
363    let mut remaining = Vec::new();
364    for entry in state.staging.entries.drain(..) {
365        let space_id = match &entry {
366            WalEntry::Write { address, .. } | WalEntry::Tombstone { address, .. } => address.space.0,
367            _ => {
368                remaining.push(entry);
369                continue;
370            }
371        };
372
373        let hot = state
374            .hot
375            .entry(space_id)
376            .or_insert_with(|| HotSegment::open(state.root.clone(), space_id).expect("open hot segment"));
377
378        if hot.append_and_sync(&entry).is_ok() {
379            // promoted
380        } else {
381            remaining.push(entry);
382        }
383    }
384
385    state.staging.rewrite_remaining(remaining)?;
386    Ok(())
387}
388
389fn seal_space(state: &mut IoState, space_id: u64) -> io::Result<()> {
390    let mut hot = match state.hot.remove(&space_id) {
391        Some(seg) => seg,
392        None => HotSegment::open(state.root.clone(), space_id)?,
393    };
394
395    let mut records = hot.read_all_records()?;
396    if records.is_empty() {
397        state.hot.insert(space_id, hot);
398        return Ok(());
399    }
400
401    let space = SpaceId(space_id);
402    let spaces = state.spaces.read();
403    records.sort_by_key(|r| {
404        let key = space_key(&spaces, space, &r.address.point);
405        (key, r.revision.0)
406    });
407    drop(spaces);
408
409    let min_rev = records.iter().map(|r| r.revision).min().unwrap_or(RevisionId::ZERO);
410    let max_rev = records.iter().map(|r| r.revision).max().unwrap_or(RevisionId::ZERO);
411    let block_id = BlockId(state.next_block_id.fetch_add(1, Ordering::Relaxed));
412
413    let mut block = Block {
414        id: block_id,
415        space,
416        records: records.clone(),
417        min_revision: min_rev,
418        max_revision: max_rev,
419        checksum: [0u8; 32],
420    };
421    block.checksum = compute_checksum(&block)?;
422    state.store.write_block(&block)?;
423
424    let hilbert_min = {
425        let spaces = state.spaces.read();
426        block
427            .records
428            .first()
429            .map(|r| space_key(&spaces, space, &r.address.point))
430            .unwrap_or(0)
431    };
432    let hilbert_max = {
433        let spaces = state.spaces.read();
434        block
435            .records
436            .last()
437            .map(|r| space_key(&spaces, space, &r.address.point))
438            .unwrap_or(hilbert_min)
439    };
440
441    let snap_id = SnapshotId(state.next_snapshot_id.fetch_add(1, Ordering::Relaxed));
442    state.snapshots.update(space, |snap| {
443        snap.blocks.insert(
444            hilbert_min,
445            BlockIndexEntry {
446                block_id,
447                max_key: hilbert_max,
448            },
449        );
450        if snap.revision < max_rev {
451            snap.revision = max_rev;
452        }
453        if snap.id.0 == 0 {
454            snap.id = snap_id;
455        }
456    });
457
458    let sealed: HashSet<(Vec<u32>, u64)> = records
459        .iter()
460        .map(|r| (r.address.point.coords.clone(), r.revision.0))
461        .collect();
462
463    let mut tail = state.live_tail.snapshot();
464    tail.retain(|r| !sealed.contains(&(r.address.point.coords.clone(), r.revision.0)));
465    state.live_tail.publish(tail);
466
467    hot.reset()?;
468    state.hot.insert(space_id, hot);
469    state.hot_record_counts.insert(space_id, 0);
470    Ok(())
471}