1use super::*;
2use crate::api::DurabilityMode;
3use crate::storage::wal::{WalReader, WalRecord, WalWriter};
4use std::cell::RefCell;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9
10const DEFAULT_ADAPTIVE_WINDOW_US: u64 = 200;
27
28type WalMutex = parking_lot::Mutex<WalWriter>;
35
36type CommitStateMutex = parking_lot::Mutex<CommitState>;
42type CommitStateCondvar = parking_lot::Condvar;
43use std::time::{Duration, Instant};
44
45static NEXT_STORE_TX_ID: AtomicU64 = AtomicU64::new(1);
46
47const STORE_WAL_VERSION: u8 = 1;
48
49#[derive(Debug, Clone)]
50pub(crate) enum StoreWalAction {
51 CreateCollection {
52 name: String,
53 },
54 DropCollection {
55 name: String,
56 },
57 UpsertEntityRecord {
58 collection: String,
59 record: Vec<u8>,
60 },
61 DeleteEntityRecord {
62 collection: String,
63 entity_id: u64,
64 },
65 BulkUpsertEntityRecords {
70 collection: String,
71 records: Vec<Vec<u8>>,
72 },
73 RefreshCollection {
81 collection: String,
82 records: Vec<Vec<u8>>,
83 },
84}
85
86#[derive(Debug, Default)]
87pub(crate) struct DeferredStoreWalActions {
88 actions: Vec<StoreWalAction>,
89}
90
91impl DeferredStoreWalActions {
92 pub(crate) fn is_empty(&self) -> bool {
93 self.actions.is_empty()
94 }
95
96 pub(crate) fn extend(&mut self, other: Self) {
97 self.actions.extend(other.actions);
98 }
99}
100
101thread_local! {
102 static DEFERRED_STORE_WAL_ACTIONS: RefCell<Option<Vec<StoreWalAction>>> =
103 const { RefCell::new(None) };
104}
105
106fn begin_deferred_store_wal_capture() {
107 DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
108 let mut guard = cell.borrow_mut();
109 debug_assert!(guard.is_none());
110 *guard = Some(Vec::new());
111 });
112}
113
114fn capture_deferred_store_wal_actions(actions: Vec<StoreWalAction>) -> bool {
115 DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
116 let mut guard = cell.borrow_mut();
117 if let Some(pending) = guard.as_mut() {
118 pending.extend(actions);
119 true
120 } else {
121 false
122 }
123 })
124}
125
126fn deferred_store_wal_capture_active() -> bool {
127 DEFERRED_STORE_WAL_ACTIONS.with(|cell| cell.borrow().is_some())
128}
129
130fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
131 DEFERRED_STORE_WAL_ACTIONS.with(|cell| DeferredStoreWalActions {
132 actions: cell.borrow_mut().take().unwrap_or_default(),
133 })
134}
135
136impl StoreWalAction {
137 pub(crate) fn upsert_entity(
138 collection: &str,
139 entity: &UnifiedEntity,
140 metadata: Option<&Metadata>,
141 format_version: u32,
142 ) -> Self {
143 Self::UpsertEntityRecord {
144 collection: collection.to_string(),
145 record: UnifiedStore::serialize_entity_record(entity, metadata, format_version),
146 }
147 }
148
149 fn encode(&self) -> Vec<u8> {
150 let mut out = Vec::new();
151 out.push(STORE_WAL_VERSION);
152 match self {
153 Self::CreateCollection { name } => {
154 out.push(1);
155 write_string(&mut out, name);
156 }
157 Self::DropCollection { name } => {
158 out.push(2);
159 write_string(&mut out, name);
160 }
161 Self::UpsertEntityRecord { collection, record } => {
162 out.push(3);
163 write_string(&mut out, collection);
164 write_bytes(&mut out, record);
165 }
166 Self::DeleteEntityRecord {
167 collection,
168 entity_id,
169 } => {
170 out.push(4);
171 write_string(&mut out, collection);
172 out.extend_from_slice(&entity_id.to_le_bytes());
173 }
174 Self::BulkUpsertEntityRecords {
175 collection,
176 records,
177 } => {
178 out.push(5);
179 write_string(&mut out, collection);
180 out.extend_from_slice(&(records.len() as u32).to_le_bytes());
181 for record in records {
182 write_bytes(&mut out, record);
183 }
184 }
185 Self::RefreshCollection {
186 collection,
187 records,
188 } => {
189 out.push(6);
190 write_string(&mut out, collection);
191 out.extend_from_slice(&(records.len() as u32).to_le_bytes());
192 for record in records {
193 write_bytes(&mut out, record);
194 }
195 }
196 }
197 out
198 }
199
200 fn decode(bytes: &[u8]) -> io::Result<Self> {
201 if bytes.len() < 2 {
202 return Err(io::Error::new(
203 io::ErrorKind::InvalidData,
204 "store wal action too short",
205 ));
206 }
207 if bytes[0] != STORE_WAL_VERSION {
208 return Err(io::Error::new(
209 io::ErrorKind::InvalidData,
210 format!("unsupported store wal version: {}", bytes[0]),
211 ));
212 }
213
214 let mut pos = 2usize;
215 match bytes[1] {
216 1 => Ok(Self::CreateCollection {
217 name: read_string(bytes, &mut pos)?,
218 }),
219 2 => Ok(Self::DropCollection {
220 name: read_string(bytes, &mut pos)?,
221 }),
222 3 => Ok(Self::UpsertEntityRecord {
223 collection: read_string(bytes, &mut pos)?,
224 record: read_bytes(bytes, &mut pos)?,
225 }),
226 4 => {
227 let collection = read_string(bytes, &mut pos)?;
228 let entity_id = read_u64(bytes, &mut pos)?;
229 Ok(Self::DeleteEntityRecord {
230 collection,
231 entity_id,
232 })
233 }
234 5 => {
235 let collection = read_string(bytes, &mut pos)?;
236 if pos + 4 > bytes.len() {
237 return Err(io::Error::new(
238 io::ErrorKind::InvalidData,
239 "bulk upsert wal action: missing record count",
240 ));
241 }
242 let count = u32::from_le_bytes([
243 bytes[pos],
244 bytes[pos + 1],
245 bytes[pos + 2],
246 bytes[pos + 3],
247 ]) as usize;
248 pos += 4;
249 let mut records = Vec::with_capacity(count);
250 for _ in 0..count {
251 records.push(read_bytes(bytes, &mut pos)?);
252 }
253 Ok(Self::BulkUpsertEntityRecords {
254 collection,
255 records,
256 })
257 }
258 6 => {
259 let collection = read_string(bytes, &mut pos)?;
260 if pos + 4 > bytes.len() {
261 return Err(io::Error::new(
262 io::ErrorKind::InvalidData,
263 "refresh collection wal action: missing record count",
264 ));
265 }
266 let count = u32::from_le_bytes([
267 bytes[pos],
268 bytes[pos + 1],
269 bytes[pos + 2],
270 bytes[pos + 3],
271 ]) as usize;
272 pos += 4;
273 let mut records = Vec::with_capacity(count);
274 for _ in 0..count {
275 records.push(read_bytes(bytes, &mut pos)?);
276 }
277 Ok(Self::RefreshCollection {
278 collection,
279 records,
280 })
281 }
282 other => Err(io::Error::new(
283 io::ErrorKind::InvalidData,
284 format!("unsupported store wal action tag: {other}"),
285 )),
286 }
287 }
288}
289
290#[derive(Debug)]
291struct CommitState {
292 durable_lsn: u64,
293 pending_target_lsn: u64,
294 pending_statements: usize,
295 pending_wal_bytes: u64,
296 first_pending_at: Option<Instant>,
297 shutdown: bool,
298 last_error: Option<String>,
299}
300
301impl CommitState {
302 fn new(initial_durable_lsn: u64) -> Self {
303 Self {
304 durable_lsn: initial_durable_lsn,
305 pending_target_lsn: initial_durable_lsn,
306 pending_statements: 0,
307 pending_wal_bytes: 0,
308 first_pending_at: None,
309 shutdown: false,
310 last_error: None,
311 }
312 }
313}
314
315pub(crate) struct WalAppendQueue {
329 pending: parking_lot::Mutex<WalQueueState>,
344}
345
346struct WalQueueState {
347 next_lsn: u64,
348 entries: Vec<(u64, Vec<u8>)>,
349}
350
351impl WalAppendQueue {
352 fn new(initial_lsn: u64) -> Self {
353 Self {
354 pending: parking_lot::Mutex::new(WalQueueState {
355 next_lsn: initial_lsn,
356 entries: Vec::with_capacity(64),
357 }),
358 }
359 }
360
361 fn enqueue(&self, bytes: Vec<u8>) -> u64 {
367 let len = bytes.len() as u64;
368 let mut state = self.pending.lock();
369 let start_lsn = state.next_lsn;
370 state.next_lsn = start_lsn + len;
371 state.entries.push((start_lsn, bytes));
372 start_lsn + len
373 }
374
375 fn drain_sorted(&self) -> Vec<(u64, Vec<u8>)> {
379 let mut state = self.pending.lock();
380 let mut v = std::mem::take(&mut state.entries);
381 drop(state);
382 v.sort_by_key(|(lsn, _)| *lsn);
383 v
384 }
385
386 fn has_pending(&self) -> bool {
389 !self.pending.lock().entries.is_empty()
390 }
391
392 fn reset(&self, next_lsn: u64) {
398 let mut state = self.pending.lock();
399 state.next_lsn = next_lsn;
400 state.entries.clear();
401 }
402}
403
404pub(crate) struct StoreCommitCoordinator {
405 mode: DurabilityMode,
406 config: crate::api::GroupCommitOptions,
407 wal_path: PathBuf,
408 wal: Arc<WalMutex>,
409 queue: Arc<WalAppendQueue>,
415 state: Arc<(CommitStateMutex, CommitStateCondvar)>,
416 fsync_count: Arc<AtomicU64>,
422}
423
424impl StoreCommitCoordinator {
425 pub(crate) fn should_open(path: &Path, mode: DurabilityMode) -> bool {
426 matches!(
427 mode,
428 DurabilityMode::WalDurableGrouped | DurabilityMode::Async
429 ) || path.exists()
430 }
431
432 pub(crate) fn open(
433 wal_path: impl Into<PathBuf>,
434 mode: DurabilityMode,
435 config: crate::api::GroupCommitOptions,
436 ) -> io::Result<Self> {
437 let wal_path = wal_path.into();
438 let wal = WalWriter::open(&wal_path)?;
439 let initial_durable_lsn = wal.durable_lsn();
440 let initial_current_lsn = wal.current_lsn();
441 let wal = Arc::new(WalMutex::new(wal));
442 let queue = Arc::new(WalAppendQueue::new(initial_current_lsn));
443 let state = Arc::new((
444 CommitStateMutex::new(CommitState::new(initial_durable_lsn)),
445 CommitStateCondvar::new(),
446 ));
447 let fsync_count = Arc::new(AtomicU64::new(0));
448
449 if matches!(
450 mode,
451 DurabilityMode::WalDurableGrouped | DurabilityMode::Async
452 ) {
453 let wal_bg = Arc::clone(&wal);
454 let queue_bg = Arc::clone(&queue);
455 let state_bg = Arc::clone(&state);
456 let fsync_bg = Arc::clone(&fsync_count);
457 let window = Self::resolve_window(&config);
472 let max_statements = config.max_statements.max(1);
473 let max_wal_bytes = config.max_wal_bytes.max(1);
474 std::thread::spawn(move || {
475 Self::run_group_commit_loop(
476 wal_bg,
477 queue_bg,
478 state_bg,
479 fsync_bg,
480 window,
481 max_statements,
482 max_wal_bytes,
483 );
484 });
485 }
486
487 Ok(Self {
488 mode,
489 config,
490 wal_path,
491 wal,
492 queue,
493 state,
494 fsync_count,
495 })
496 }
497
498 fn resolve_window(config: &crate::api::GroupCommitOptions) -> Duration {
507 if let Ok(raw) = std::env::var("REDDB_GROUP_COMMIT_WINDOW_US") {
508 if let Ok(parsed) = raw.parse::<u64>() {
509 return Duration::from_micros(parsed);
510 }
511 }
512 if config.window_ms != 0 {
513 return Duration::from_millis(config.window_ms);
514 }
515 Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
516 }
517
518 #[cfg(test)]
522 pub(crate) fn fsync_count(&self) -> u64 {
523 self.fsync_count.load(Ordering::Relaxed)
524 }
525
526 pub(crate) fn append_actions(&self, actions: &[StoreWalAction]) -> io::Result<()> {
527 if actions.is_empty() {
528 return Ok(());
529 }
530
531 let tx_id = NEXT_STORE_TX_ID.fetch_add(1, Ordering::SeqCst);
532
533 if matches!(self.mode, DurabilityMode::Strict) {
538 let commit_lsn = {
539 let mut wal = self.wal.lock();
540 wal.append(&WalRecord::TxCommitBatch {
541 tx_id,
542 actions: actions.iter().map(StoreWalAction::encode).collect(),
543 })?;
544 wal.current_lsn()
545 };
546 self.force_sync()?;
547 let _ = commit_lsn;
548 return Ok(());
549 }
550
551 let encoded_actions: Vec<Vec<u8>> = actions.iter().map(StoreWalAction::encode).collect();
555 let wal_bytes = encoded_actions.iter().fold(0u64, |total, payload| {
556 total.saturating_add(payload.len() as u64)
557 });
558 let blob = WalRecord::TxCommitBatch {
559 tx_id,
560 actions: encoded_actions,
561 }
562 .encode();
563
564 let commit_lsn = self.queue.enqueue(blob);
565 self.wait_until_durable(commit_lsn, wal_bytes)?;
566 Ok(())
567 }
568
569 pub(crate) fn append_single_record(&self, record: WalRecord) -> io::Result<()> {
574 let blob = record.encode();
575 let wal_bytes = blob.len() as u64;
576 if matches!(self.mode, DurabilityMode::Strict) {
577 {
578 let mut wal = self.wal.lock();
579 wal.append(&record)?;
580 }
581 self.force_sync()?;
582 return Ok(());
583 }
584 let commit_lsn = self.queue.enqueue(blob);
585 self.wait_until_durable(commit_lsn, wal_bytes)?;
586 Ok(())
587 }
588
589 pub(crate) fn force_sync(&self) -> io::Result<()> {
590 {
591 let mut wal = self.wal.lock();
592 wal.sync()?;
593 self.fsync_count.fetch_add(1, Ordering::Relaxed);
594 let durable = wal.durable_lsn();
595 drop(wal);
596 let (state_lock, cond) = &*self.state;
597 let mut state = state_lock.lock();
598 state.durable_lsn = durable;
599 state.pending_target_lsn = durable.max(state.pending_target_lsn);
600 state.pending_statements = 0;
601 state.pending_wal_bytes = 0;
602 state.first_pending_at = None;
603 state.last_error = None;
604 cond.notify_all();
605 }
606 Ok(())
607 }
608
609 pub(crate) fn truncate(&self) -> io::Result<()> {
610 let mut wal = self.wal.lock();
611 wal.truncate()?;
612 let durable = wal.durable_lsn();
613 let current = wal.current_lsn();
614 drop(wal);
615
616 self.queue.reset(current);
621
622 let (state_lock, cond) = &*self.state;
623 let mut state = state_lock.lock();
624 state.durable_lsn = durable;
625 state.pending_target_lsn = durable;
626 state.pending_statements = 0;
627 state.pending_wal_bytes = 0;
628 state.first_pending_at = None;
629 state.last_error = None;
630 cond.notify_all();
631 Ok(())
632 }
633
634 pub(crate) fn replay_into(&self, store: &UnifiedStore) -> io::Result<()> {
635 if !self.wal_path.exists() {
636 return Ok(());
637 }
638
639 let reader = match WalReader::open(&self.wal_path) {
640 Ok(reader) => reader,
641 Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()),
642 Err(err) => return Err(err),
643 };
644
645 let mut tx_states = std::collections::HashMap::<u64, bool>::new();
646 let mut pending = Vec::<(u64, Vec<u8>)>::new();
647
648 for record in reader.iter() {
649 let (_lsn, record) = match record {
650 Ok(record) => record,
651 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
652 Err(err) => return Err(err),
653 };
654 match record {
655 WalRecord::TxCommitBatch { actions, .. } => {
656 for payload in actions {
657 let action = StoreWalAction::decode(&payload)?;
658 store.apply_replayed_action(&action).map_err(|err| {
659 io::Error::other(format!("failed to replay store wal action: {err}"))
660 })?;
661 }
662 }
663 WalRecord::Begin { tx_id } => {
664 tx_states.insert(tx_id, false);
665 }
666 WalRecord::Commit { tx_id } => {
667 tx_states.insert(tx_id, true);
668 }
669 WalRecord::Rollback { tx_id } => {
670 tx_states.remove(&tx_id);
671 }
672 WalRecord::PageWrite {
673 tx_id,
674 page_id: _,
675 data,
676 } => pending.push((tx_id, data)),
677 WalRecord::Checkpoint { .. } => {}
678 WalRecord::VectorInsert {
679 collection,
680 entity_id,
681 vector,
682 } => {
683 let mut map = store.replayed_turbo_inserts.lock();
689 map.entry(collection).or_default().push((entity_id, vector));
690 }
691 WalRecord::FullPageImage { .. } => {
692 }
694 }
695 }
696
697 for (tx_id, payload) in pending {
698 if !tx_states.get(&tx_id).copied().unwrap_or(false) {
699 continue;
700 }
701 let action = StoreWalAction::decode(&payload)?;
702 store.apply_replayed_action(&action).map_err(|err| {
703 io::Error::other(format!("failed to replay store wal action: {err}"))
704 })?;
705 }
706
707 Ok(())
708 }
709
710 fn wait_until_durable(&self, target_lsn: u64, wal_bytes: u64) -> io::Result<()> {
711 match self.mode {
712 DurabilityMode::Strict => self.force_sync(),
713 DurabilityMode::Async => {
718 let (state_lock, cond) = &*self.state;
719 let mut state = state_lock.lock();
720 state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
721 state.pending_statements = state.pending_statements.saturating_add(1);
722 state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
723 state.first_pending_at.get_or_insert_with(Instant::now);
724 cond.notify_all();
725 Ok(())
726 }
727 DurabilityMode::WalDurableGrouped => {
728 let (state_lock, cond) = &*self.state;
729 let mut state = state_lock.lock();
730 state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
731 state.pending_statements = state.pending_statements.saturating_add(1);
732 state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
733 state.first_pending_at.get_or_insert_with(Instant::now);
734 cond.notify_all();
735
736 loop {
737 if let Some(err) = state.last_error.clone() {
738 return Err(io::Error::other(err));
739 }
740 if state.durable_lsn >= target_lsn {
741 return Ok(());
742 }
743 cond.wait(&mut state);
746 }
747 }
748 }
749 }
750
751 fn run_group_commit_loop(
752 wal: Arc<WalMutex>,
753 queue: Arc<WalAppendQueue>,
754 state: Arc<(CommitStateMutex, CommitStateCondvar)>,
755 fsync_count: Arc<AtomicU64>,
756 window: Duration,
757 max_statements: usize,
758 max_wal_bytes: u64,
759 ) {
760 let (state_lock, cond) = &*state;
761 loop {
762 let target_lsn = {
763 let mut guard = state_lock.lock();
764
765 while !guard.shutdown && guard.pending_target_lsn <= guard.durable_lsn {
766 cond.wait(&mut guard);
767 }
768
769 if guard.shutdown {
770 return;
771 }
772
773 let immediate = window.is_zero()
774 || guard.pending_statements >= max_statements
775 || guard.pending_wal_bytes >= max_wal_bytes;
776
777 if !immediate {
778 let deadline = guard.first_pending_at.unwrap_or_else(Instant::now) + window;
779 let now = Instant::now();
780 if now < deadline {
781 let timeout = deadline.saturating_duration_since(now);
782 let _ = cond.wait_for(&mut guard, timeout);
783 if guard.shutdown {
784 return;
785 }
786 if guard.pending_target_lsn <= guard.durable_lsn {
787 continue;
788 }
789 let should_wait_again = guard.pending_statements < max_statements
790 && guard.pending_wal_bytes < max_wal_bytes
791 && guard
792 .first_pending_at
793 .map(|first| first.elapsed() < window)
794 .unwrap_or(false);
795 if should_wait_again {
796 continue;
797 }
798 }
799 }
800
801 guard.pending_target_lsn
802 };
803
804 let batches = queue.drain_sorted();
810
811 let sync_result = {
812 let mut wal = wal.lock();
813 let mut write_err: Option<io::Error> = None;
814 for (_lsn, bytes) in batches {
815 if let Err(e) = wal.append_bytes(&bytes) {
816 write_err = Some(e);
817 break;
818 }
819 }
820 match write_err {
821 Some(e) => Err(e),
822 None => wal.sync().map(|_| {
823 fsync_count.fetch_add(1, Ordering::Relaxed);
828 wal.durable_lsn()
829 }),
830 }
831 };
832
833 let more_pending = queue.has_pending();
838
839 let mut guard = state_lock.lock();
840 match sync_result {
841 Ok(durable_lsn) => {
842 guard.durable_lsn = durable_lsn;
843 if !more_pending {
844 guard.pending_statements = 0;
845 guard.pending_wal_bytes = 0;
846 guard.first_pending_at = None;
847 }
848 guard.last_error = None;
849 let _ = target_lsn;
850 }
851 Err(err) => {
852 guard.last_error = Some(err.to_string());
853 }
854 }
855 cond.notify_all();
856 }
857 }
858}
859
860impl Drop for StoreCommitCoordinator {
861 fn drop(&mut self) {
862 let (state_lock, cond) = &*self.state;
863 let mut state = state_lock.lock();
865 state.shutdown = true;
866 cond.notify_all();
867 }
868}
869
870impl UnifiedStore {
871 pub(crate) fn begin_deferred_store_wal_capture() {
872 begin_deferred_store_wal_capture();
873 }
874
875 pub(crate) fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
876 take_deferred_store_wal_capture()
877 }
878
879 pub(crate) fn append_deferred_store_wal_actions(
880 &self,
881 actions: DeferredStoreWalActions,
882 ) -> Result<(), StoreError> {
883 if actions.actions.is_empty() {
884 return Ok(());
885 }
886 match self.config.durability_mode {
887 DurabilityMode::Strict => self.flush_paged_state(),
888 DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
889 if let Some(commit) = &self.commit {
890 commit
891 .append_actions(&actions.actions)
892 .map_err(StoreError::Io)
893 } else {
894 self.flush_paged_state()
895 }
896 }
897 }
898 }
899
900 pub(crate) fn wal_path_for_db(path: &Path) -> PathBuf {
901 path.with_extension("rdb-uwal")
902 }
903
904 pub(crate) fn append_vector_insert_record(
911 &self,
912 collection: &str,
913 entity_id: u64,
914 vector: &[f32],
915 ) -> std::io::Result<()> {
916 let Some(commit) = &self.commit else {
917 return Ok(());
918 };
919 let record = WalRecord::VectorInsert {
920 collection: collection.to_string(),
921 entity_id,
922 vector: vector.to_vec(),
923 };
924 commit.append_single_record(record)
925 }
926
927 pub(crate) fn finish_paged_write(
928 &self,
929 actions: impl IntoIterator<Item = StoreWalAction>,
930 ) -> Result<(), StoreError> {
931 let actions: Vec<StoreWalAction> = actions.into_iter().collect();
932 if deferred_store_wal_capture_active() {
933 let captured = capture_deferred_store_wal_actions(actions);
934 debug_assert!(captured);
935 return Ok(());
936 }
937 match self.config.durability_mode {
938 DurabilityMode::Strict => self.flush_paged_state(),
939 DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
940 if let Some(commit) = &self.commit {
941 commit.append_actions(&actions).map_err(StoreError::Io)?;
942 Ok(())
943 } else {
944 self.flush_paged_state()
945 }
946 }
947 }
948 }
949
950 pub(crate) fn apply_replayed_action(&self, action: &StoreWalAction) -> Result<(), StoreError> {
951 match action {
952 StoreWalAction::CreateCollection { name } => {
953 if self.get_collection(name).is_none() {
954 let _ = self.create_collection_in_memory(name);
955 }
956 Ok(())
957 }
958 StoreWalAction::DropCollection { name } => self.drop_collection_in_memory(name),
959 StoreWalAction::UpsertEntityRecord { collection, record } => {
960 self.apply_replayed_upsert(collection, record)
961 }
962 StoreWalAction::DeleteEntityRecord {
963 collection,
964 entity_id,
965 } => self.apply_replayed_delete(collection, EntityId::new(*entity_id)),
966 StoreWalAction::BulkUpsertEntityRecords {
967 collection,
968 records,
969 } => {
970 for record in records {
971 self.apply_replayed_upsert(collection, record)?;
972 }
973 Ok(())
974 }
975 StoreWalAction::RefreshCollection {
976 collection,
977 records,
978 } => self.apply_replayed_refresh_collection(collection, records),
979 }
980 }
981
982 pub fn refresh_collection(
999 &self,
1000 name: &str,
1001 entities: Vec<UnifiedEntity>,
1002 ) -> Result<Vec<Vec<u8>>, StoreError> {
1003 let fv = self.format_version();
1004
1005 let new_manager = Arc::new(SegmentManager::with_config(
1008 name,
1009 self.config.manager_config.clone(),
1010 ));
1011
1012 let mut prepared = entities;
1013 for entity in &mut prepared {
1014 if entity.id.raw() == 0 {
1015 entity.id = self.next_entity_id();
1016 } else {
1017 self.register_entity_id(entity.id);
1018 }
1019 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
1020 if *row_id == 0 {
1021 *row_id = new_manager.next_row_id();
1022 } else {
1023 new_manager.register_row_id(*row_id);
1024 }
1025 }
1026 entity.ensure_table_logical_id();
1027 }
1028
1029 let serialized: Vec<Vec<u8>> = prepared
1030 .iter()
1031 .map(|e| Self::serialize_entity_record(e, None, fv))
1032 .collect();
1033
1034 new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1035 StoreError::Io(std::io::Error::other(format!(
1036 "refresh_collection: bulk_insert into new manager failed: {e}"
1037 )))
1038 })?;
1039
1040 self.swap_collection_state(name, new_manager, &prepared, &serialized);
1041
1042 self.finish_paged_write([StoreWalAction::RefreshCollection {
1043 collection: name.to_string(),
1044 records: serialized.clone(),
1045 }])?;
1046
1047 Ok(serialized)
1048 }
1049
1050 fn swap_collection_state(
1056 &self,
1057 name: &str,
1058 new_manager: Arc<SegmentManager>,
1059 prepared: &[UnifiedEntity],
1060 serialized: &[Vec<u8>],
1061 ) {
1062 {
1063 let mut collections = self.collections.write();
1064 collections.insert(name.to_string(), new_manager);
1065 }
1066
1067 self.entity_cache
1068 .retain(|_, (collection, _)| collection != name);
1069 self.remove_from_graph_label_index_batch(
1070 name,
1071 &prepared.iter().map(|e| e.id).collect::<Vec<_>>(),
1072 );
1073
1074 if let Some(pager) = &self.pager {
1075 let new_btree = Arc::new(BTree::new(Arc::clone(pager)));
1076 let mut sorted: Vec<(Vec<u8>, Vec<u8>)> = prepared
1077 .iter()
1078 .zip(serialized.iter())
1079 .map(|(e, r)| (e.id.raw().to_be_bytes().to_vec(), r.clone()))
1080 .collect();
1081 sorted.sort_by(|a, b| a.0.cmp(&b.0));
1082 if !sorted.is_empty() {
1083 let _ = new_btree.bulk_insert_sorted(&sorted);
1084 }
1085 self.btree_indices
1086 .write()
1087 .insert(name.to_string(), new_btree);
1088 self.mark_paged_registry_dirty();
1089 }
1090 }
1091
1092 fn apply_replayed_refresh_collection(
1093 &self,
1094 collection: &str,
1095 records: &[Vec<u8>],
1096 ) -> Result<(), StoreError> {
1097 let new_manager = Arc::new(SegmentManager::with_config(
1098 collection,
1099 self.config.manager_config.clone(),
1100 ));
1101
1102 let mut prepared: Vec<UnifiedEntity> = Vec::with_capacity(records.len());
1103 for record in records {
1104 let (entity, _metadata) =
1105 Self::deserialize_entity_record(record, self.format_version())?;
1106 self.register_entity_id(entity.id);
1107 if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1108 new_manager.register_row_id(*row_id);
1109 }
1110 prepared.push(entity);
1111 }
1112
1113 if !prepared.is_empty() {
1114 new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1115 StoreError::Io(std::io::Error::other(format!(
1116 "replay refresh_collection: bulk_insert failed: {e}"
1117 )))
1118 })?;
1119 }
1120
1121 self.swap_collection_state(collection, new_manager, &prepared, records);
1122 Ok(())
1123 }
1124
1125 pub fn refresh_collection_from_records(
1139 &self,
1140 name: &str,
1141 records: Vec<Vec<u8>>,
1142 ) -> Result<(), StoreError> {
1143 self.apply_replayed_refresh_collection(name, &records)?;
1144 self.finish_paged_write([StoreWalAction::RefreshCollection {
1145 collection: name.to_string(),
1146 records,
1147 }])?;
1148 Ok(())
1149 }
1150
1151 pub(crate) fn create_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1152 let mut collections = self.collections.write();
1153 if collections.contains_key(name) {
1154 return Ok(());
1155 }
1156 let manager = SegmentManager::with_config(name, self.config.manager_config.clone());
1157 collections.insert(name.to_string(), Arc::new(manager));
1158 self.mark_paged_registry_dirty();
1159 Ok(())
1160 }
1161
1162 fn drop_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1163 let manager = {
1164 let mut collections = self.collections.write();
1165 match collections.remove(name) {
1166 Some(manager) => manager,
1167 None => return Ok(()),
1168 }
1169 };
1170
1171 let entities = manager.query_all(|_| true);
1172 let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
1173 for entity_id in &entity_ids {
1174 self.context_index.remove_entity(*entity_id);
1175 let _ = self.unindex_cross_refs(*entity_id);
1176 }
1177 self.btree_indices.write().remove(name);
1178 self.entity_cache.retain(|entity_id, (collection, _)| {
1179 collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
1180 });
1181 self.remove_from_graph_label_index_batch(name, &entity_ids);
1182 self.mark_paged_registry_dirty();
1183 Ok(())
1184 }
1185
1186 fn apply_replayed_upsert(&self, collection: &str, record: &[u8]) -> Result<(), StoreError> {
1187 self.create_collection_in_memory(collection)?;
1188 let (entity, metadata) = Self::deserialize_entity_record(record, self.format_version())?;
1189 let manager = self
1190 .get_collection(collection)
1191 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1192
1193 self.register_entity_id(entity.id);
1194 if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1195 manager.register_row_id(*row_id);
1196 }
1197
1198 self.context_index.remove_entity(entity.id);
1199 let _ = self.unindex_cross_refs(entity.id);
1200 self.remove_from_graph_label_index(collection, entity.id);
1201
1202 if manager.get(entity.id).is_some() {
1203 manager
1204 .update_with_metadata(entity.clone(), metadata.as_ref())
1205 .map_err(StoreError::from)?;
1206 } else {
1207 manager.insert(entity.clone())?;
1208 if let Some(metadata) = metadata.as_ref() {
1209 manager.set_metadata(entity.id, metadata.clone())?;
1210 }
1211 }
1212
1213 self.context_index.index_entity(collection, &entity);
1214 if let EntityKind::GraphNode(node) = &entity.kind {
1215 self.update_graph_label_index(collection, &node.label, entity.id);
1216 }
1217 self.index_cross_refs(&entity, collection)?;
1218
1219 if let Some(pager) = &self.pager {
1220 let mut btree_indices = self.btree_indices.write();
1221 let btree = btree_indices
1222 .entry(collection.to_string())
1223 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
1224 let root_before = btree.root_page_id();
1225 let key = entity.id.raw().to_be_bytes();
1226 match btree.insert(&key, record) {
1227 Ok(_) => {}
1228 Err(BTreeError::DuplicateKey) => {
1229 let _ = btree.delete(&key);
1230 let _ = btree.insert(&key, record);
1231 }
1232 Err(err) => {
1233 return Err(StoreError::Io(io::Error::other(format!(
1234 "replay upsert btree error: {err}"
1235 ))));
1236 }
1237 }
1238 if root_before != btree.root_page_id() {
1239 self.mark_paged_registry_dirty();
1240 }
1241 }
1242
1243 Ok(())
1244 }
1245
1246 fn apply_replayed_delete(&self, collection: &str, id: EntityId) -> Result<(), StoreError> {
1247 self.entity_cache.remove(id.raw());
1248 if let Some(manager) = self.get_collection(collection) {
1249 let deleted = manager.delete(id)?;
1250 if !deleted {
1251 return Ok(());
1252 }
1253 } else {
1254 return Ok(());
1255 }
1256
1257 if let Some(_pager) = &self.pager {
1258 let btree_indices = self.btree_indices.read();
1259 if let Some(btree) = btree_indices.get(collection) {
1260 let root_before = btree.root_page_id();
1261 let key = id.raw().to_be_bytes();
1262 let _ = btree.delete(&key);
1263 if root_before != btree.root_page_id() {
1264 self.mark_paged_registry_dirty();
1265 }
1266 }
1267 }
1268
1269 let _ = self.unindex_cross_refs(id);
1270 self.remove_from_graph_label_index(collection, id);
1271 self.context_index.remove_entity(id);
1272 Ok(())
1273 }
1274}
1275
1276fn write_string(out: &mut Vec<u8>, value: &str) {
1277 out.extend_from_slice(&(value.len() as u32).to_le_bytes());
1278 out.extend_from_slice(value.as_bytes());
1279}
1280
1281fn write_bytes(out: &mut Vec<u8>, value: &[u8]) {
1282 out.extend_from_slice(&(value.len() as u32).to_le_bytes());
1283 out.extend_from_slice(value);
1284}
1285
1286fn read_u32(data: &[u8], pos: &mut usize) -> io::Result<u32> {
1287 if data.len().saturating_sub(*pos) < 4 {
1288 return Err(io::Error::new(
1289 io::ErrorKind::UnexpectedEof,
1290 "unexpected eof while reading u32",
1291 ));
1292 }
1293 let value = u32::from_le_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]);
1294 *pos += 4;
1295 Ok(value)
1296}
1297
1298fn read_u64(data: &[u8], pos: &mut usize) -> io::Result<u64> {
1299 if data.len().saturating_sub(*pos) < 8 {
1300 return Err(io::Error::new(
1301 io::ErrorKind::UnexpectedEof,
1302 "unexpected eof while reading u64",
1303 ));
1304 }
1305 let value = u64::from_le_bytes([
1306 data[*pos],
1307 data[*pos + 1],
1308 data[*pos + 2],
1309 data[*pos + 3],
1310 data[*pos + 4],
1311 data[*pos + 5],
1312 data[*pos + 6],
1313 data[*pos + 7],
1314 ]);
1315 *pos += 8;
1316 Ok(value)
1317}
1318
1319fn read_string(data: &[u8], pos: &mut usize) -> io::Result<String> {
1320 let len = read_u32(data, pos)? as usize;
1321 if data.len().saturating_sub(*pos) < len {
1322 return Err(io::Error::new(
1323 io::ErrorKind::UnexpectedEof,
1324 "unexpected eof while reading string",
1325 ));
1326 }
1327 let value = std::str::from_utf8(&data[*pos..*pos + len])
1328 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?
1329 .to_string();
1330 *pos += len;
1331 Ok(value)
1332}
1333
1334fn read_bytes(data: &[u8], pos: &mut usize) -> io::Result<Vec<u8>> {
1335 let len = read_u32(data, pos)? as usize;
1336 if data.len().saturating_sub(*pos) < len {
1337 return Err(io::Error::new(
1338 io::ErrorKind::UnexpectedEof,
1339 "unexpected eof while reading bytes",
1340 ));
1341 }
1342 let value = data[*pos..*pos + len].to_vec();
1343 *pos += len;
1344 Ok(value)
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349 use super::*;
1350 use crate::api::{DurabilityMode, GroupCommitOptions};
1351 use std::sync::{Barrier, Mutex as StdMutex, OnceLock};
1352 use std::time::SystemTime;
1353
1354 fn env_lock() -> &'static StdMutex<()> {
1359 static LOCK: OnceLock<StdMutex<()>> = OnceLock::new();
1360 LOCK.get_or_init(|| StdMutex::new(()))
1361 }
1362
1363 fn temp_wal(name: &str) -> PathBuf {
1364 let nanos = SystemTime::now()
1365 .duration_since(std::time::UNIX_EPOCH)
1366 .unwrap()
1367 .as_nanos();
1368 let path = std::env::temp_dir().join(format!(
1369 "rb_commit_coord_{}_{}_{}.wal",
1370 name,
1371 std::process::id(),
1372 nanos
1373 ));
1374 let _ = std::fs::remove_file(&path);
1375 path
1376 }
1377
1378 #[test]
1389 fn group_commit_coalesces_concurrent_autocommits() {
1390 let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1391 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1394
1395 let path = temp_wal("coalesce");
1396 let coord = Arc::new(
1397 StoreCommitCoordinator::open(
1398 path.clone(),
1399 DurabilityMode::WalDurableGrouped,
1400 GroupCommitOptions::default(),
1401 )
1402 .expect("open commit coordinator"),
1403 );
1404
1405 const WRITERS: usize = 32;
1406 let barrier = Arc::new(Barrier::new(WRITERS));
1407 let mut handles = Vec::with_capacity(WRITERS);
1408 for tx in 0..WRITERS {
1409 let coord_c = Arc::clone(&coord);
1410 let barrier_c = Arc::clone(&barrier);
1411 handles.push(std::thread::spawn(move || {
1412 barrier_c.wait();
1416 let action = StoreWalAction::CreateCollection {
1417 name: format!("c{tx}"),
1418 };
1419 coord_c
1420 .append_actions(std::slice::from_ref(&action))
1421 .expect("append_actions");
1422 }));
1423 }
1424 for h in handles {
1425 h.join().expect("writer thread");
1426 }
1427
1428 let fsyncs = coord.fsync_count();
1429 assert!(fsyncs > 0, "expected at least one fsync, got {fsyncs}");
1430 assert!(
1431 fsyncs < WRITERS as u64,
1432 "expected fsyncs ({fsyncs}) to be strictly less than \
1433 concurrent writers ({WRITERS}); coalescing failed"
1434 );
1435
1436 drop(coord);
1437 let _ = std::fs::remove_file(&path);
1438 }
1439
1440 #[test]
1446 fn zero_window_disables_coalescing_floor() {
1447 let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1448 std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1449
1450 let path = temp_wal("zero_window");
1451 let coord = Arc::new(
1452 StoreCommitCoordinator::open(
1453 path.clone(),
1454 DurabilityMode::WalDurableGrouped,
1455 GroupCommitOptions::default(),
1456 )
1457 .expect("open commit coordinator"),
1458 );
1459
1460 const WRITERS: usize = 8;
1461 let barrier = Arc::new(Barrier::new(WRITERS));
1462 let mut handles = Vec::with_capacity(WRITERS);
1463 for tx in 0..WRITERS {
1464 let coord_c = Arc::clone(&coord);
1465 let barrier_c = Arc::clone(&barrier);
1466 handles.push(std::thread::spawn(move || {
1467 barrier_c.wait();
1468 let action = StoreWalAction::CreateCollection {
1469 name: format!("z{tx}"),
1470 };
1471 coord_c
1472 .append_actions(std::slice::from_ref(&action))
1473 .expect("append_actions");
1474 }));
1475 }
1476 for h in handles {
1477 h.join().expect("writer thread");
1478 }
1479
1480 let fsyncs = coord.fsync_count();
1486 assert!(fsyncs >= 1, "expected at least one fsync, got {fsyncs}");
1487
1488 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1489 drop(coord);
1490 let _ = std::fs::remove_file(&path);
1491 }
1492
1493 #[test]
1495 fn resolve_window_precedence() {
1496 let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1497 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1499 let cfg = GroupCommitOptions::default();
1500 assert_eq!(
1501 StoreCommitCoordinator::resolve_window(&cfg),
1502 Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
1503 );
1504
1505 let cfg_ms = GroupCommitOptions {
1507 window_ms: 5,
1508 ..GroupCommitOptions::default()
1509 };
1510 assert_eq!(
1511 StoreCommitCoordinator::resolve_window(&cfg_ms),
1512 Duration::from_millis(5)
1513 );
1514
1515 std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "750");
1517 assert_eq!(
1518 StoreCommitCoordinator::resolve_window(&cfg),
1519 Duration::from_micros(750)
1520 );
1521 assert_eq!(
1522 StoreCommitCoordinator::resolve_window(&cfg_ms),
1523 Duration::from_micros(750)
1524 );
1525
1526 std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1528 assert_eq!(
1529 StoreCommitCoordinator::resolve_window(&cfg),
1530 Duration::from_micros(0)
1531 );
1532
1533 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1534 }
1535}