1use std::collections::{HashMap, HashSet};
4use std::io;
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::thread::{self, JoinHandle};
9use std::time::Duration;
10
11use crossbeam_channel::Receiver;
12use parking_lot::RwLock;
13
14use crate::infinitedb_core::{
15 address::{RevisionId, SpaceId},
16 block::{Block, BlockId},
17 snapshot::{BlockIndexEntry, SnapshotId},
18 space::SpaceRegistry,
19};
20use crate::infinitedb_storage::{
21 hot_segment::{wal_entry_to_record, HotSegment},
22 nvme::{compute_checksum, BlockStore},
23 wal::{WalDurability, WalEntry, WalWriter},
24};
25
26use super::live_tail::LiveTailView;
27use super::query::space_key;
28use super::snapshot_store::SnapshotStore;
29use super::write_queue::{IoCommand, WriteJob, WriteQueueSender};
30
31#[derive(Debug, Clone)]
33pub struct IoThreadConfig {
34 pub direct_write_timeout: Duration,
35 pub hot_segment_seal_threshold: usize,
36 pub write_queue_capacity: usize,
37 pub wal_group_commit_interval: Duration,
38}
39
40impl Default for IoThreadConfig {
41 fn default() -> Self {
42 Self {
43 direct_write_timeout: Duration::from_millis(2),
44 hot_segment_seal_threshold: 256,
45 write_queue_capacity: 4096,
46 wal_group_commit_interval: Duration::from_millis(1),
47 }
48 }
49}
50
51#[derive(Debug, Clone, Default)]
53pub struct IoStats {
54 pub queue_depth: usize,
55 pub direct_writes: u64,
56 pub staged_writes: u64,
57 pub staging_wal_frames: usize,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum WriteRoute {
62 Direct,
63 Staged,
64}
65
66pub struct IoThreadHandle {
67 join: Option<JoinHandle<io::Result<()>>>,
68 direct_writes: Arc<AtomicU64>,
69 staged_writes: Arc<AtomicU64>,
70}
71
72impl IoThreadHandle {
73 pub fn spawn(
74 root: PathBuf,
75 store: Arc<BlockStore>,
76 snapshots: Arc<SnapshotStore>,
77 live_tail: Arc<LiveTailView>,
78 spaces: Arc<RwLock<SpaceRegistry>>,
79 revision: Arc<AtomicU64>,
80 next_block_id: Arc<AtomicU64>,
81 next_snapshot_id: Arc<AtomicU64>,
82 rx: Receiver<IoCommand>,
83 config: IoThreadConfig,
84 ) -> Self {
85 let direct_writes = Arc::new(AtomicU64::new(0));
86 let staged_writes = Arc::new(AtomicU64::new(0));
87 let direct_clone = Arc::clone(&direct_writes);
88 let staged_clone = Arc::clone(&staged_writes);
89
90 let join = thread::Builder::new()
91 .name("infinitedb-io".into())
92 .spawn(move || {
93 run_io_loop(
94 root,
95 store,
96 snapshots,
97 live_tail,
98 spaces,
99 revision,
100 next_block_id,
101 next_snapshot_id,
102 rx,
103 config,
104 direct_clone,
105 staged_clone,
106 )
107 })
108 .expect("spawn io thread");
109
110 Self {
111 join: Some(join),
112 direct_writes,
113 staged_writes,
114 }
115 }
116
117 pub fn direct_writes(&self) -> u64 {
118 self.direct_writes.load(Ordering::Relaxed)
119 }
120
121 pub fn staged_writes(&self) -> u64 {
122 self.staged_writes.load(Ordering::Relaxed)
123 }
124
125 pub fn join(&mut self) -> io::Result<()> {
126 if let Some(handle) = self.join.take() {
127 handle
128 .join()
129 .map_err(|_| io::Error::new(io::ErrorKind::Other, "io thread panicked"))??;
130 }
131 Ok(())
132 }
133}
134
135pub fn open_io_pipeline(
136 root: PathBuf,
137 store: Arc<BlockStore>,
138 snapshots: Arc<SnapshotStore>,
139 live_tail: Arc<LiveTailView>,
140 spaces: Arc<RwLock<SpaceRegistry>>,
141 revision: Arc<AtomicU64>,
142 next_block_id: Arc<AtomicU64>,
143 next_snapshot_id: Arc<AtomicU64>,
144 config: IoThreadConfig,
145) -> (WriteQueueSender, IoThreadHandle) {
146 let (tx, rx) = WriteQueueSender::new(config.write_queue_capacity);
147 let handle = IoThreadHandle::spawn(
148 root,
149 store,
150 snapshots,
151 live_tail,
152 spaces,
153 revision,
154 next_block_id,
155 next_snapshot_id,
156 rx,
157 config,
158 );
159 (tx, handle)
160}
161
162struct IoState {
163 root: PathBuf,
164 store: Arc<BlockStore>,
165 snapshots: Arc<SnapshotStore>,
166 live_tail: Arc<LiveTailView>,
167 spaces: Arc<RwLock<SpaceRegistry>>,
168 revision: Arc<AtomicU64>,
169 next_block_id: Arc<AtomicU64>,
170 next_snapshot_id: Arc<AtomicU64>,
171 config: IoThreadConfig,
172 hot: HashMap<u64, HotSegment>,
173 staging: StagingWal,
174 hot_record_counts: HashMap<u64, usize>,
175}
176
177struct StagingWal {
178 writer: WalWriter,
179 entries: Vec<WalEntry>,
180}
181
182impl StagingWal {
183 fn open(path: PathBuf) -> io::Result<Self> {
184 std::fs::create_dir_all(path.parent().unwrap())?;
185 let writer =
186 WalWriter::open_with_durability(path.clone(), WalDurability::Buffered { sync_every: usize::MAX })?;
187 let mut reader = crate::infinitedb_storage::wal::WalReader::open(path)?;
188 let entries = reader.entries()?;
189 Ok(Self { writer, entries })
190 }
191
192 fn append(&mut self, entry: &WalEntry) -> io::Result<()> {
193 self.writer.append_frame(entry)?;
194 self.entries.push(entry.clone());
195 Ok(())
196 }
197
198 fn sync(&mut self) -> io::Result<()> {
199 self.writer.sync()
200 }
201
202 fn rewrite_remaining(&mut self, entries: Vec<WalEntry>) -> io::Result<()> {
203 self.entries = entries;
204 self.writer.rewrite(&self.entries)
205 }
206}
207
208fn run_io_loop(
209 root: PathBuf,
210 store: Arc<BlockStore>,
211 snapshots: Arc<SnapshotStore>,
212 live_tail: Arc<LiveTailView>,
213 spaces: Arc<RwLock<SpaceRegistry>>,
214 revision: Arc<AtomicU64>,
215 next_block_id: Arc<AtomicU64>,
216 next_snapshot_id: Arc<AtomicU64>,
217 rx: Receiver<IoCommand>,
218 config: IoThreadConfig,
219 direct_writes: Arc<AtomicU64>,
220 staged_writes: Arc<AtomicU64>,
221) -> io::Result<()> {
222 let staging_path = store.staging_wal_path();
223 let mut state = IoState {
224 root,
225 store,
226 snapshots,
227 live_tail,
228 spaces,
229 revision,
230 next_block_id,
231 next_snapshot_id,
232 config: config.clone(),
233 hot: HashMap::new(),
234 staging: StagingWal::open(staging_path)?,
235 hot_record_counts: HashMap::new(),
236 };
237
238 for entry in state.staging.entries.clone() {
240 if let Some(record) = wal_entry_to_record(entry) {
241 state.live_tail.append(record);
242 }
243 }
244
245 let hot_dir = state.root.join("hot");
247 if hot_dir.exists() {
248 for entry in std::fs::read_dir(hot_dir)? {
249 let entry = entry?;
250 let name = entry.file_name().to_string_lossy().to_string();
251 if let Some(stem) = name.strip_suffix(".seg") {
252 if let Ok(space_id) = stem.parse::<u64>() {
253 let mut seg = HotSegment::open(state.root.clone(), space_id)?;
254 let records = seg.read_all_records()?;
255 state.hot_record_counts.insert(space_id, records.len());
256 for record in records {
257 state.live_tail.append(record);
258 }
259 state.hot.insert(space_id, seg);
260 }
261 }
262 }
263 }
264
265 let mut shutting_down = false;
266 while !shutting_down {
267 match rx.recv_timeout(config.wal_group_commit_interval) {
268 Ok(cmd) => {
269 if matches!(cmd, IoCommand::Shutdown) {
270 handle_command(&mut state, cmd, &direct_writes, &staged_writes)?;
271 shutting_down = true;
272 } else {
273 handle_command(&mut state, cmd, &direct_writes, &staged_writes)?;
274 }
275 }
276 Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
277 promote_staging(&mut state)?;
278 }
279 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => shutting_down = true,
280 }
281 }
282
283 Ok(())
284}
285
286fn handle_command(
287 state: &mut IoState,
288 cmd: IoCommand,
289 direct_writes: &AtomicU64,
290 staged_writes: &AtomicU64,
291) -> io::Result<()> {
292 match cmd {
293 IoCommand::Write(job) => {
294 let space_id = job_space(&job);
295 process_write(state, job, direct_writes, staged_writes)?;
296 maybe_auto_seal(state, space_id)?;
297 }
298 IoCommand::Sync { done } => {
299 promote_staging(state)?;
300 state.staging.sync()?;
301 let _ = done.send(Ok(()));
302 }
303 IoCommand::Flush { space_id, done } => {
304 let result = seal_space(state, space_id);
305 let _ = done.send(result);
306 }
307 IoCommand::Shutdown => {
308 promote_staging(state)?;
309 state.staging.sync()?;
310 return Ok(());
311 }
312 }
313 Ok(())
314}
315
316fn job_space(job: &WriteJob) -> u64 {
317 job.record.address.space.0
318}
319
320fn process_write(
321 state: &mut IoState,
322 job: WriteJob,
323 direct_writes: &AtomicU64,
324 staged_writes: &AtomicU64,
325) -> io::Result<()> {
326 let space_id = job_space(&job);
327 let hot = state
328 .hot
329 .entry(space_id)
330 .or_insert_with(|| {
331 HotSegment::open(state.root.clone(), space_id).expect("open hot segment")
332 });
333
334 let routed = if hot.try_append_with_deadline(&job.entry, state.config.direct_write_timeout)? {
335 direct_writes.fetch_add(1, Ordering::Relaxed);
336 WriteRoute::Direct
337 } else {
338 state.staging.append(&job.entry)?;
339 state.staging.sync()?;
340 staged_writes.fetch_add(1, Ordering::Relaxed);
341 WriteRoute::Staged
342 };
343
344 let _ = routed;
345 state.live_tail.append(job.record);
346 *state.hot_record_counts.entry(space_id).or_insert(0) += 1;
347 Ok(())
348}
349
350fn maybe_auto_seal(state: &mut IoState, space_id: u64) -> io::Result<()> {
351 let count = state.hot_record_counts.get(&space_id).copied().unwrap_or(0);
352 if count >= state.config.hot_segment_seal_threshold {
353 seal_space(state, space_id)?;
354 }
355 Ok(())
356}
357
358fn promote_staging(state: &mut IoState) -> io::Result<()> {
359 if state.staging.entries.is_empty() {
360 return Ok(());
361 }
362
363 let mut remaining = Vec::new();
364 for entry in state.staging.entries.drain(..) {
365 let space_id = match &entry {
366 WalEntry::Write { address, .. } | WalEntry::Tombstone { address, .. } => address.space.0,
367 _ => {
368 remaining.push(entry);
369 continue;
370 }
371 };
372
373 let hot = state
374 .hot
375 .entry(space_id)
376 .or_insert_with(|| HotSegment::open(state.root.clone(), space_id).expect("open hot segment"));
377
378 if hot.append_and_sync(&entry).is_ok() {
379 } else {
381 remaining.push(entry);
382 }
383 }
384
385 state.staging.rewrite_remaining(remaining)?;
386 Ok(())
387}
388
389fn seal_space(state: &mut IoState, space_id: u64) -> io::Result<()> {
390 let mut hot = match state.hot.remove(&space_id) {
391 Some(seg) => seg,
392 None => HotSegment::open(state.root.clone(), space_id)?,
393 };
394
395 let mut records = hot.read_all_records()?;
396 if records.is_empty() {
397 state.hot.insert(space_id, hot);
398 return Ok(());
399 }
400
401 let space = SpaceId(space_id);
402 let spaces = state.spaces.read();
403 records.sort_by_key(|r| {
404 let key = space_key(&spaces, space, &r.address.point);
405 (key, r.revision.0)
406 });
407 drop(spaces);
408
409 let min_rev = records.iter().map(|r| r.revision).min().unwrap_or(RevisionId::ZERO);
410 let max_rev = records.iter().map(|r| r.revision).max().unwrap_or(RevisionId::ZERO);
411 let block_id = BlockId(state.next_block_id.fetch_add(1, Ordering::Relaxed));
412
413 let mut block = Block {
414 id: block_id,
415 space,
416 records: records.clone(),
417 min_revision: min_rev,
418 max_revision: max_rev,
419 checksum: [0u8; 32],
420 };
421 block.checksum = compute_checksum(&block)?;
422 state.store.write_block(&block)?;
423
424 let hilbert_min = {
425 let spaces = state.spaces.read();
426 block
427 .records
428 .first()
429 .map(|r| space_key(&spaces, space, &r.address.point))
430 .unwrap_or(0)
431 };
432 let hilbert_max = {
433 let spaces = state.spaces.read();
434 block
435 .records
436 .last()
437 .map(|r| space_key(&spaces, space, &r.address.point))
438 .unwrap_or(hilbert_min)
439 };
440
441 let snap_id = SnapshotId(state.next_snapshot_id.fetch_add(1, Ordering::Relaxed));
442 state.snapshots.update(space, |snap| {
443 snap.blocks.insert(
444 hilbert_min,
445 BlockIndexEntry {
446 block_id,
447 max_key: hilbert_max,
448 },
449 );
450 if snap.revision < max_rev {
451 snap.revision = max_rev;
452 }
453 if snap.id.0 == 0 {
454 snap.id = snap_id;
455 }
456 });
457
458 let sealed: HashSet<(Vec<u32>, u64)> = records
459 .iter()
460 .map(|r| (r.address.point.coords.clone(), r.revision.0))
461 .collect();
462
463 let mut tail = state.live_tail.snapshot();
464 tail.retain(|r| !sealed.contains(&(r.address.point.coords.clone(), r.revision.0)));
465 state.live_tail.publish(tail);
466
467 hot.reset()?;
468 state.hot.insert(space_id, hot);
469 state.hot_record_counts.insert(space_id, 0);
470 Ok(())
471}