1use super::*;
2use crate::api::DurabilityMode;
3use crate::storage::wal::{WalReader, WalRecord, WalWriter};
4use std::cell::RefCell;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9
10const DEFAULT_ADAPTIVE_WINDOW_US: u64 = 200;
27
28type WalMutex = parking_lot::Mutex<WalWriter>;
35
36type CommitStateMutex = parking_lot::Mutex<CommitState>;
42type CommitStateCondvar = parking_lot::Condvar;
43use std::time::{Duration, Instant};
44
45static NEXT_STORE_TX_ID: AtomicU64 = AtomicU64::new(1);
46
47const STORE_WAL_VERSION: u8 = 1;
48
49#[derive(Debug, Clone)]
50pub(crate) enum StoreWalAction {
51 CreateCollection {
52 name: String,
53 },
54 DropCollection {
55 name: String,
56 },
57 UpsertEntityRecord {
58 collection: String,
59 record: Vec<u8>,
60 },
61 DeleteEntityRecord {
62 collection: String,
63 entity_id: u64,
64 },
65 BulkUpsertEntityRecords {
70 collection: String,
71 records: Vec<Vec<u8>>,
72 },
73 RefreshCollection {
81 collection: String,
82 records: Vec<Vec<u8>>,
83 },
84}
85
86#[derive(Debug, Default)]
87pub(crate) struct DeferredStoreWalActions {
88 actions: Vec<StoreWalAction>,
89}
90
91impl DeferredStoreWalActions {
92 pub(crate) fn is_empty(&self) -> bool {
93 self.actions.is_empty()
94 }
95
96 pub(crate) fn extend(&mut self, other: Self) {
97 self.actions.extend(other.actions);
98 }
99}
100
101thread_local! {
102 static DEFERRED_STORE_WAL_ACTIONS: RefCell<Option<Vec<StoreWalAction>>> =
103 const { RefCell::new(None) };
104}
105
106fn begin_deferred_store_wal_capture() {
107 DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
108 let mut guard = cell.borrow_mut();
109 debug_assert!(guard.is_none());
110 *guard = Some(Vec::new());
111 });
112}
113
114fn capture_deferred_store_wal_actions(actions: Vec<StoreWalAction>) -> bool {
115 DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
116 let mut guard = cell.borrow_mut();
117 if let Some(pending) = guard.as_mut() {
118 pending.extend(actions);
119 true
120 } else {
121 false
122 }
123 })
124}
125
126fn deferred_store_wal_capture_active() -> bool {
127 DEFERRED_STORE_WAL_ACTIONS.with(|cell| cell.borrow().is_some())
128}
129
130fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
131 DEFERRED_STORE_WAL_ACTIONS.with(|cell| DeferredStoreWalActions {
132 actions: cell.borrow_mut().take().unwrap_or_default(),
133 })
134}
135
136impl StoreWalAction {
137 pub(crate) fn upsert_entity(
138 collection: &str,
139 entity: &UnifiedEntity,
140 metadata: Option<&Metadata>,
141 format_version: u32,
142 ) -> Self {
143 Self::UpsertEntityRecord {
144 collection: collection.to_string(),
145 record: UnifiedStore::serialize_entity_record(entity, metadata, format_version),
146 }
147 }
148
149 fn encode(&self) -> Vec<u8> {
150 let mut out = Vec::new();
151 out.push(STORE_WAL_VERSION);
152 match self {
153 Self::CreateCollection { name } => {
154 out.push(1);
155 write_string(&mut out, name);
156 }
157 Self::DropCollection { name } => {
158 out.push(2);
159 write_string(&mut out, name);
160 }
161 Self::UpsertEntityRecord { collection, record } => {
162 out.push(3);
163 write_string(&mut out, collection);
164 write_bytes(&mut out, record);
165 }
166 Self::DeleteEntityRecord {
167 collection,
168 entity_id,
169 } => {
170 out.push(4);
171 write_string(&mut out, collection);
172 out.extend_from_slice(&entity_id.to_le_bytes());
173 }
174 Self::BulkUpsertEntityRecords {
175 collection,
176 records,
177 } => {
178 out.push(5);
179 write_string(&mut out, collection);
180 out.extend_from_slice(&(records.len() as u32).to_le_bytes());
181 for record in records {
182 write_bytes(&mut out, record);
183 }
184 }
185 Self::RefreshCollection {
186 collection,
187 records,
188 } => {
189 out.push(6);
190 write_string(&mut out, collection);
191 out.extend_from_slice(&(records.len() as u32).to_le_bytes());
192 for record in records {
193 write_bytes(&mut out, record);
194 }
195 }
196 }
197 out
198 }
199
200 fn decode(bytes: &[u8]) -> io::Result<Self> {
201 if bytes.len() < 2 {
202 return Err(io::Error::new(
203 io::ErrorKind::InvalidData,
204 "store wal action too short",
205 ));
206 }
207 if bytes[0] != STORE_WAL_VERSION {
208 return Err(io::Error::new(
209 io::ErrorKind::InvalidData,
210 format!("unsupported store wal version: {}", bytes[0]),
211 ));
212 }
213
214 let mut pos = 2usize;
215 match bytes[1] {
216 1 => Ok(Self::CreateCollection {
217 name: read_string(bytes, &mut pos)?,
218 }),
219 2 => Ok(Self::DropCollection {
220 name: read_string(bytes, &mut pos)?,
221 }),
222 3 => Ok(Self::UpsertEntityRecord {
223 collection: read_string(bytes, &mut pos)?,
224 record: read_bytes(bytes, &mut pos)?,
225 }),
226 4 => {
227 let collection = read_string(bytes, &mut pos)?;
228 let entity_id = read_u64(bytes, &mut pos)?;
229 Ok(Self::DeleteEntityRecord {
230 collection,
231 entity_id,
232 })
233 }
234 5 => {
235 let collection = read_string(bytes, &mut pos)?;
236 if pos + 4 > bytes.len() {
237 return Err(io::Error::new(
238 io::ErrorKind::InvalidData,
239 "bulk upsert wal action: missing record count",
240 ));
241 }
242 let count = u32::from_le_bytes([
243 bytes[pos],
244 bytes[pos + 1],
245 bytes[pos + 2],
246 bytes[pos + 3],
247 ]) as usize;
248 pos += 4;
249 let mut records = Vec::with_capacity(count);
250 for _ in 0..count {
251 records.push(read_bytes(bytes, &mut pos)?);
252 }
253 Ok(Self::BulkUpsertEntityRecords {
254 collection,
255 records,
256 })
257 }
258 6 => {
259 let collection = read_string(bytes, &mut pos)?;
260 if pos + 4 > bytes.len() {
261 return Err(io::Error::new(
262 io::ErrorKind::InvalidData,
263 "refresh collection wal action: missing record count",
264 ));
265 }
266 let count = u32::from_le_bytes([
267 bytes[pos],
268 bytes[pos + 1],
269 bytes[pos + 2],
270 bytes[pos + 3],
271 ]) as usize;
272 pos += 4;
273 let mut records = Vec::with_capacity(count);
274 for _ in 0..count {
275 records.push(read_bytes(bytes, &mut pos)?);
276 }
277 Ok(Self::RefreshCollection {
278 collection,
279 records,
280 })
281 }
282 other => Err(io::Error::new(
283 io::ErrorKind::InvalidData,
284 format!("unsupported store wal action tag: {other}"),
285 )),
286 }
287 }
288}
289
290#[derive(Debug)]
291struct CommitState {
292 durable_lsn: u64,
293 pending_target_lsn: u64,
294 pending_statements: usize,
295 pending_wal_bytes: u64,
296 first_pending_at: Option<Instant>,
297 shutdown: bool,
298 last_error: Option<String>,
299}
300
301impl CommitState {
302 fn new(initial_durable_lsn: u64) -> Self {
303 Self {
304 durable_lsn: initial_durable_lsn,
305 pending_target_lsn: initial_durable_lsn,
306 pending_statements: 0,
307 pending_wal_bytes: 0,
308 first_pending_at: None,
309 shutdown: false,
310 last_error: None,
311 }
312 }
313}
314
315pub(crate) struct WalAppendQueue {
329 pending: parking_lot::Mutex<WalQueueState>,
344}
345
346struct WalQueueState {
347 next_lsn: u64,
348 entries: Vec<(u64, Vec<u8>)>,
349}
350
351impl WalAppendQueue {
352 fn new(initial_lsn: u64) -> Self {
353 Self {
354 pending: parking_lot::Mutex::new(WalQueueState {
355 next_lsn: initial_lsn,
356 entries: Vec::with_capacity(64),
357 }),
358 }
359 }
360
361 fn enqueue(&self, bytes: Vec<u8>) -> u64 {
367 let len = bytes.len() as u64;
368 let mut state = self.pending.lock();
369 let start_lsn = state.next_lsn;
370 state.next_lsn = start_lsn + len;
371 state.entries.push((start_lsn, bytes));
372 start_lsn + len
373 }
374
375 fn drain_sorted(&self) -> Vec<(u64, Vec<u8>)> {
379 let mut state = self.pending.lock();
380 let mut v = std::mem::take(&mut state.entries);
381 drop(state);
382 v.sort_by_key(|(lsn, _)| *lsn);
383 v
384 }
385
386 fn has_pending(&self) -> bool {
389 !self.pending.lock().entries.is_empty()
390 }
391
392 fn reset(&self, next_lsn: u64) {
398 let mut state = self.pending.lock();
399 state.next_lsn = next_lsn;
400 state.entries.clear();
401 }
402}
403
404pub(crate) struct StoreCommitCoordinator {
405 mode: DurabilityMode,
406 config: crate::api::GroupCommitOptions,
407 wal_path: PathBuf,
408 wal: Arc<WalMutex>,
409 queue: Arc<WalAppendQueue>,
415 state: Arc<(CommitStateMutex, CommitStateCondvar)>,
416 fsync_count: Arc<AtomicU64>,
422}
423
424impl StoreCommitCoordinator {
425 pub(crate) fn should_open(path: &Path, mode: DurabilityMode) -> bool {
426 matches!(
427 mode,
428 DurabilityMode::WalDurableGrouped | DurabilityMode::Async
429 ) || path.exists()
430 }
431
432 pub(crate) fn open(
433 wal_path: impl Into<PathBuf>,
434 mode: DurabilityMode,
435 config: crate::api::GroupCommitOptions,
436 ) -> io::Result<Self> {
437 let wal_path = wal_path.into();
438 let wal = WalWriter::open(&wal_path)?;
439 let initial_durable_lsn = wal.durable_lsn();
440 let initial_current_lsn = wal.current_lsn();
441 let wal = Arc::new(WalMutex::new(wal));
442 let queue = Arc::new(WalAppendQueue::new(initial_current_lsn));
443 let state = Arc::new((
444 CommitStateMutex::new(CommitState::new(initial_durable_lsn)),
445 CommitStateCondvar::new(),
446 ));
447 let fsync_count = Arc::new(AtomicU64::new(0));
448
449 if matches!(
450 mode,
451 DurabilityMode::WalDurableGrouped | DurabilityMode::Async
452 ) {
453 let wal_bg = Arc::clone(&wal);
454 let queue_bg = Arc::clone(&queue);
455 let state_bg = Arc::clone(&state);
456 let fsync_bg = Arc::clone(&fsync_count);
457 let window = Self::resolve_window(&config);
472 let max_statements = config.max_statements.max(1);
473 let max_wal_bytes = config.max_wal_bytes.max(1);
474 std::thread::spawn(move || {
475 Self::run_group_commit_loop(
476 wal_bg,
477 queue_bg,
478 state_bg,
479 fsync_bg,
480 window,
481 max_statements,
482 max_wal_bytes,
483 );
484 });
485 }
486
487 Ok(Self {
488 mode,
489 config,
490 wal_path,
491 wal,
492 queue,
493 state,
494 fsync_count,
495 })
496 }
497
498 fn resolve_window(config: &crate::api::GroupCommitOptions) -> Duration {
507 if let Ok(raw) = std::env::var("REDDB_GROUP_COMMIT_WINDOW_US") {
508 if let Ok(parsed) = raw.parse::<u64>() {
509 return Duration::from_micros(parsed);
510 }
511 }
512 if config.window_ms != 0 {
513 return Duration::from_millis(config.window_ms);
514 }
515 Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
516 }
517
518 #[cfg(test)]
522 pub(crate) fn fsync_count(&self) -> u64 {
523 self.fsync_count.load(Ordering::Relaxed)
524 }
525
526 pub(crate) fn append_actions(&self, actions: &[StoreWalAction]) -> io::Result<()> {
527 if actions.is_empty() {
528 return Ok(());
529 }
530
531 let tx_id = NEXT_STORE_TX_ID.fetch_add(1, Ordering::SeqCst);
532
533 if matches!(self.mode, DurabilityMode::Strict) {
538 let commit_lsn = {
539 let mut wal = self.wal.lock();
540 wal.append(&WalRecord::TxCommitBatch {
541 tx_id,
542 actions: actions.iter().map(StoreWalAction::encode).collect(),
543 })?;
544 wal.current_lsn()
545 };
546 self.force_sync()?;
547 let _ = commit_lsn;
548 return Ok(());
549 }
550
551 let encoded_actions: Vec<Vec<u8>> = actions.iter().map(StoreWalAction::encode).collect();
555 let wal_bytes = encoded_actions.iter().fold(0u64, |total, payload| {
556 total.saturating_add(payload.len() as u64)
557 });
558 let blob = WalRecord::TxCommitBatch {
559 tx_id,
560 actions: encoded_actions,
561 }
562 .encode();
563
564 let commit_lsn = self.queue.enqueue(blob);
565 self.wait_until_durable(commit_lsn, wal_bytes)?;
566 Ok(())
567 }
568
569 pub(crate) fn force_sync(&self) -> io::Result<()> {
570 {
571 let mut wal = self.wal.lock();
572 wal.sync()?;
573 self.fsync_count.fetch_add(1, Ordering::Relaxed);
574 let durable = wal.durable_lsn();
575 drop(wal);
576 let (state_lock, cond) = &*self.state;
577 let mut state = state_lock.lock();
578 state.durable_lsn = durable;
579 state.pending_target_lsn = durable.max(state.pending_target_lsn);
580 state.pending_statements = 0;
581 state.pending_wal_bytes = 0;
582 state.first_pending_at = None;
583 state.last_error = None;
584 cond.notify_all();
585 }
586 Ok(())
587 }
588
589 pub(crate) fn truncate(&self) -> io::Result<()> {
590 let mut wal = self.wal.lock();
591 wal.truncate()?;
592 let durable = wal.durable_lsn();
593 let current = wal.current_lsn();
594 drop(wal);
595
596 self.queue.reset(current);
601
602 let (state_lock, cond) = &*self.state;
603 let mut state = state_lock.lock();
604 state.durable_lsn = durable;
605 state.pending_target_lsn = durable;
606 state.pending_statements = 0;
607 state.pending_wal_bytes = 0;
608 state.first_pending_at = None;
609 state.last_error = None;
610 cond.notify_all();
611 Ok(())
612 }
613
614 pub(crate) fn replay_into(&self, store: &UnifiedStore) -> io::Result<()> {
615 if !self.wal_path.exists() {
616 return Ok(());
617 }
618
619 let reader = match WalReader::open(&self.wal_path) {
620 Ok(reader) => reader,
621 Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()),
622 Err(err) => return Err(err),
623 };
624
625 let mut tx_states = std::collections::HashMap::<u64, bool>::new();
626 let mut pending = Vec::<(u64, Vec<u8>)>::new();
627
628 for record in reader.iter() {
629 let (_lsn, record) = match record {
630 Ok(record) => record,
631 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
632 Err(err) => return Err(err),
633 };
634 match record {
635 WalRecord::TxCommitBatch { actions, .. } => {
636 for payload in actions {
637 let action = StoreWalAction::decode(&payload)?;
638 store.apply_replayed_action(&action).map_err(|err| {
639 io::Error::other(format!("failed to replay store wal action: {err}"))
640 })?;
641 }
642 }
643 WalRecord::Begin { tx_id } => {
644 tx_states.insert(tx_id, false);
645 }
646 WalRecord::Commit { tx_id } => {
647 tx_states.insert(tx_id, true);
648 }
649 WalRecord::Rollback { tx_id } => {
650 tx_states.remove(&tx_id);
651 }
652 WalRecord::PageWrite {
653 tx_id,
654 page_id: _,
655 data,
656 } => pending.push((tx_id, data)),
657 WalRecord::Checkpoint { .. } => {}
658 WalRecord::FullPageImage { .. } => {
659 }
661 }
662 }
663
664 for (tx_id, payload) in pending {
665 if !tx_states.get(&tx_id).copied().unwrap_or(false) {
666 continue;
667 }
668 let action = StoreWalAction::decode(&payload)?;
669 store.apply_replayed_action(&action).map_err(|err| {
670 io::Error::other(format!("failed to replay store wal action: {err}"))
671 })?;
672 }
673
674 Ok(())
675 }
676
677 fn wait_until_durable(&self, target_lsn: u64, wal_bytes: u64) -> io::Result<()> {
678 match self.mode {
679 DurabilityMode::Strict => self.force_sync(),
680 DurabilityMode::Async => {
685 let (state_lock, cond) = &*self.state;
686 let mut state = state_lock.lock();
687 state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
688 state.pending_statements = state.pending_statements.saturating_add(1);
689 state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
690 state.first_pending_at.get_or_insert_with(Instant::now);
691 cond.notify_all();
692 Ok(())
693 }
694 DurabilityMode::WalDurableGrouped => {
695 let (state_lock, cond) = &*self.state;
696 let mut state = state_lock.lock();
697 state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
698 state.pending_statements = state.pending_statements.saturating_add(1);
699 state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
700 state.first_pending_at.get_or_insert_with(Instant::now);
701 cond.notify_all();
702
703 loop {
704 if let Some(err) = state.last_error.clone() {
705 return Err(io::Error::other(err));
706 }
707 if state.durable_lsn >= target_lsn {
708 return Ok(());
709 }
710 cond.wait(&mut state);
713 }
714 }
715 }
716 }
717
718 fn run_group_commit_loop(
719 wal: Arc<WalMutex>,
720 queue: Arc<WalAppendQueue>,
721 state: Arc<(CommitStateMutex, CommitStateCondvar)>,
722 fsync_count: Arc<AtomicU64>,
723 window: Duration,
724 max_statements: usize,
725 max_wal_bytes: u64,
726 ) {
727 let (state_lock, cond) = &*state;
728 loop {
729 let target_lsn = {
730 let mut guard = state_lock.lock();
731
732 while !guard.shutdown && guard.pending_target_lsn <= guard.durable_lsn {
733 cond.wait(&mut guard);
734 }
735
736 if guard.shutdown {
737 return;
738 }
739
740 let immediate = window.is_zero()
741 || guard.pending_statements >= max_statements
742 || guard.pending_wal_bytes >= max_wal_bytes;
743
744 if !immediate {
745 let deadline = guard.first_pending_at.unwrap_or_else(Instant::now) + window;
746 let now = Instant::now();
747 if now < deadline {
748 let timeout = deadline.saturating_duration_since(now);
749 let _ = cond.wait_for(&mut guard, timeout);
750 if guard.shutdown {
751 return;
752 }
753 if guard.pending_target_lsn <= guard.durable_lsn {
754 continue;
755 }
756 let should_wait_again = guard.pending_statements < max_statements
757 && guard.pending_wal_bytes < max_wal_bytes
758 && guard
759 .first_pending_at
760 .map(|first| first.elapsed() < window)
761 .unwrap_or(false);
762 if should_wait_again {
763 continue;
764 }
765 }
766 }
767
768 guard.pending_target_lsn
769 };
770
771 let batches = queue.drain_sorted();
777
778 let sync_result = {
779 let mut wal = wal.lock();
780 let mut write_err: Option<io::Error> = None;
781 for (_lsn, bytes) in batches {
782 if let Err(e) = wal.append_bytes(&bytes) {
783 write_err = Some(e);
784 break;
785 }
786 }
787 match write_err {
788 Some(e) => Err(e),
789 None => wal.sync().map(|_| {
790 fsync_count.fetch_add(1, Ordering::Relaxed);
795 wal.durable_lsn()
796 }),
797 }
798 };
799
800 let more_pending = queue.has_pending();
805
806 let mut guard = state_lock.lock();
807 match sync_result {
808 Ok(durable_lsn) => {
809 guard.durable_lsn = durable_lsn;
810 if !more_pending {
811 guard.pending_statements = 0;
812 guard.pending_wal_bytes = 0;
813 guard.first_pending_at = None;
814 }
815 guard.last_error = None;
816 let _ = target_lsn;
817 }
818 Err(err) => {
819 guard.last_error = Some(err.to_string());
820 }
821 }
822 cond.notify_all();
823 }
824 }
825}
826
827impl Drop for StoreCommitCoordinator {
828 fn drop(&mut self) {
829 let (state_lock, cond) = &*self.state;
830 let mut state = state_lock.lock();
832 state.shutdown = true;
833 cond.notify_all();
834 }
835}
836
837impl UnifiedStore {
838 pub(crate) fn begin_deferred_store_wal_capture() {
839 begin_deferred_store_wal_capture();
840 }
841
842 pub(crate) fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
843 take_deferred_store_wal_capture()
844 }
845
846 pub(crate) fn append_deferred_store_wal_actions(
847 &self,
848 actions: DeferredStoreWalActions,
849 ) -> Result<(), StoreError> {
850 if actions.actions.is_empty() {
851 return Ok(());
852 }
853 match self.config.durability_mode {
854 DurabilityMode::Strict => self.flush_paged_state(),
855 DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
856 if let Some(commit) = &self.commit {
857 commit
858 .append_actions(&actions.actions)
859 .map_err(StoreError::Io)
860 } else {
861 self.flush_paged_state()
862 }
863 }
864 }
865 }
866
867 pub(crate) fn wal_path_for_db(path: &Path) -> PathBuf {
868 path.with_extension("rdb-uwal")
869 }
870
871 pub(crate) fn finish_paged_write(
872 &self,
873 actions: impl IntoIterator<Item = StoreWalAction>,
874 ) -> Result<(), StoreError> {
875 let actions: Vec<StoreWalAction> = actions.into_iter().collect();
876 if deferred_store_wal_capture_active() {
877 let captured = capture_deferred_store_wal_actions(actions);
878 debug_assert!(captured);
879 return Ok(());
880 }
881 match self.config.durability_mode {
882 DurabilityMode::Strict => self.flush_paged_state(),
883 DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
884 if let Some(commit) = &self.commit {
885 commit.append_actions(&actions).map_err(StoreError::Io)?;
886 Ok(())
887 } else {
888 self.flush_paged_state()
889 }
890 }
891 }
892 }
893
894 pub(crate) fn apply_replayed_action(&self, action: &StoreWalAction) -> Result<(), StoreError> {
895 match action {
896 StoreWalAction::CreateCollection { name } => {
897 if self.get_collection(name).is_none() {
898 let _ = self.create_collection_in_memory(name);
899 }
900 Ok(())
901 }
902 StoreWalAction::DropCollection { name } => self.drop_collection_in_memory(name),
903 StoreWalAction::UpsertEntityRecord { collection, record } => {
904 self.apply_replayed_upsert(collection, record)
905 }
906 StoreWalAction::DeleteEntityRecord {
907 collection,
908 entity_id,
909 } => self.apply_replayed_delete(collection, EntityId::new(*entity_id)),
910 StoreWalAction::BulkUpsertEntityRecords {
911 collection,
912 records,
913 } => {
914 for record in records {
915 self.apply_replayed_upsert(collection, record)?;
916 }
917 Ok(())
918 }
919 StoreWalAction::RefreshCollection {
920 collection,
921 records,
922 } => self.apply_replayed_refresh_collection(collection, records),
923 }
924 }
925
926 pub fn refresh_collection(
943 &self,
944 name: &str,
945 entities: Vec<UnifiedEntity>,
946 ) -> Result<Vec<Vec<u8>>, StoreError> {
947 let fv = self.format_version();
948
949 let new_manager = Arc::new(SegmentManager::with_config(
952 name,
953 self.config.manager_config.clone(),
954 ));
955
956 let mut prepared = entities;
957 for entity in &mut prepared {
958 if entity.id.raw() == 0 {
959 entity.id = self.next_entity_id();
960 } else {
961 self.register_entity_id(entity.id);
962 }
963 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
964 if *row_id == 0 {
965 *row_id = new_manager.next_row_id();
966 } else {
967 new_manager.register_row_id(*row_id);
968 }
969 }
970 entity.ensure_table_logical_id();
971 }
972
973 let serialized: Vec<Vec<u8>> = prepared
974 .iter()
975 .map(|e| Self::serialize_entity_record(e, None, fv))
976 .collect();
977
978 new_manager.bulk_insert(prepared.clone()).map_err(|e| {
979 StoreError::Io(std::io::Error::other(format!(
980 "refresh_collection: bulk_insert into new manager failed: {e}"
981 )))
982 })?;
983
984 self.swap_collection_state(name, new_manager, &prepared, &serialized);
985
986 self.finish_paged_write([StoreWalAction::RefreshCollection {
987 collection: name.to_string(),
988 records: serialized.clone(),
989 }])?;
990
991 Ok(serialized)
992 }
993
994 fn swap_collection_state(
1000 &self,
1001 name: &str,
1002 new_manager: Arc<SegmentManager>,
1003 prepared: &[UnifiedEntity],
1004 serialized: &[Vec<u8>],
1005 ) {
1006 {
1007 let mut collections = self.collections.write();
1008 collections.insert(name.to_string(), new_manager);
1009 }
1010
1011 self.entity_cache
1012 .retain(|_, (collection, _)| collection != name);
1013 self.remove_from_graph_label_index_batch(
1014 name,
1015 &prepared.iter().map(|e| e.id).collect::<Vec<_>>(),
1016 );
1017
1018 if let Some(pager) = &self.pager {
1019 let new_btree = Arc::new(BTree::new(Arc::clone(pager)));
1020 let mut sorted: Vec<(Vec<u8>, Vec<u8>)> = prepared
1021 .iter()
1022 .zip(serialized.iter())
1023 .map(|(e, r)| (e.id.raw().to_be_bytes().to_vec(), r.clone()))
1024 .collect();
1025 sorted.sort_by(|a, b| a.0.cmp(&b.0));
1026 if !sorted.is_empty() {
1027 let _ = new_btree.bulk_insert_sorted(&sorted);
1028 }
1029 self.btree_indices
1030 .write()
1031 .insert(name.to_string(), new_btree);
1032 self.mark_paged_registry_dirty();
1033 }
1034 }
1035
1036 fn apply_replayed_refresh_collection(
1037 &self,
1038 collection: &str,
1039 records: &[Vec<u8>],
1040 ) -> Result<(), StoreError> {
1041 let new_manager = Arc::new(SegmentManager::with_config(
1042 collection,
1043 self.config.manager_config.clone(),
1044 ));
1045
1046 let mut prepared: Vec<UnifiedEntity> = Vec::with_capacity(records.len());
1047 for record in records {
1048 let (entity, _metadata) =
1049 Self::deserialize_entity_record(record, self.format_version())?;
1050 self.register_entity_id(entity.id);
1051 if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1052 new_manager.register_row_id(*row_id);
1053 }
1054 prepared.push(entity);
1055 }
1056
1057 if !prepared.is_empty() {
1058 new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1059 StoreError::Io(std::io::Error::other(format!(
1060 "replay refresh_collection: bulk_insert failed: {e}"
1061 )))
1062 })?;
1063 }
1064
1065 self.swap_collection_state(collection, new_manager, &prepared, records);
1066 Ok(())
1067 }
1068
1069 pub fn refresh_collection_from_records(
1083 &self,
1084 name: &str,
1085 records: Vec<Vec<u8>>,
1086 ) -> Result<(), StoreError> {
1087 self.apply_replayed_refresh_collection(name, &records)?;
1088 self.finish_paged_write([StoreWalAction::RefreshCollection {
1089 collection: name.to_string(),
1090 records,
1091 }])?;
1092 Ok(())
1093 }
1094
1095 pub(crate) fn create_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1096 let mut collections = self.collections.write();
1097 if collections.contains_key(name) {
1098 return Ok(());
1099 }
1100 let manager = SegmentManager::with_config(name, self.config.manager_config.clone());
1101 collections.insert(name.to_string(), Arc::new(manager));
1102 self.mark_paged_registry_dirty();
1103 Ok(())
1104 }
1105
1106 fn drop_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1107 let manager = {
1108 let mut collections = self.collections.write();
1109 match collections.remove(name) {
1110 Some(manager) => manager,
1111 None => return Ok(()),
1112 }
1113 };
1114
1115 let entities = manager.query_all(|_| true);
1116 let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
1117 for entity_id in &entity_ids {
1118 self.context_index.remove_entity(*entity_id);
1119 let _ = self.unindex_cross_refs(*entity_id);
1120 }
1121 self.btree_indices.write().remove(name);
1122 self.entity_cache.retain(|entity_id, (collection, _)| {
1123 collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
1124 });
1125 self.remove_from_graph_label_index_batch(name, &entity_ids);
1126 self.mark_paged_registry_dirty();
1127 Ok(())
1128 }
1129
1130 fn apply_replayed_upsert(&self, collection: &str, record: &[u8]) -> Result<(), StoreError> {
1131 self.create_collection_in_memory(collection)?;
1132 let (entity, metadata) = Self::deserialize_entity_record(record, self.format_version())?;
1133 let manager = self
1134 .get_collection(collection)
1135 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1136
1137 self.register_entity_id(entity.id);
1138 if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1139 manager.register_row_id(*row_id);
1140 }
1141
1142 self.context_index.remove_entity(entity.id);
1143 let _ = self.unindex_cross_refs(entity.id);
1144 self.remove_from_graph_label_index(collection, entity.id);
1145
1146 if manager.get(entity.id).is_some() {
1147 manager
1148 .update_with_metadata(entity.clone(), metadata.as_ref())
1149 .map_err(StoreError::from)?;
1150 } else {
1151 manager.insert(entity.clone())?;
1152 if let Some(metadata) = metadata.as_ref() {
1153 manager.set_metadata(entity.id, metadata.clone())?;
1154 }
1155 }
1156
1157 self.context_index.index_entity(collection, &entity);
1158 if let EntityKind::GraphNode(node) = &entity.kind {
1159 self.update_graph_label_index(collection, &node.label, entity.id);
1160 }
1161 self.index_cross_refs(&entity, collection)?;
1162
1163 if let Some(pager) = &self.pager {
1164 let mut btree_indices = self.btree_indices.write();
1165 let btree = btree_indices
1166 .entry(collection.to_string())
1167 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
1168 let root_before = btree.root_page_id();
1169 let key = entity.id.raw().to_be_bytes();
1170 match btree.insert(&key, record) {
1171 Ok(_) => {}
1172 Err(BTreeError::DuplicateKey) => {
1173 let _ = btree.delete(&key);
1174 let _ = btree.insert(&key, record);
1175 }
1176 Err(err) => {
1177 return Err(StoreError::Io(io::Error::other(format!(
1178 "replay upsert btree error: {err}"
1179 ))));
1180 }
1181 }
1182 if root_before != btree.root_page_id() {
1183 self.mark_paged_registry_dirty();
1184 }
1185 }
1186
1187 Ok(())
1188 }
1189
1190 fn apply_replayed_delete(&self, collection: &str, id: EntityId) -> Result<(), StoreError> {
1191 self.entity_cache.remove(id.raw());
1192 if let Some(manager) = self.get_collection(collection) {
1193 let deleted = manager.delete(id)?;
1194 if !deleted {
1195 return Ok(());
1196 }
1197 } else {
1198 return Ok(());
1199 }
1200
1201 if let Some(_pager) = &self.pager {
1202 let btree_indices = self.btree_indices.read();
1203 if let Some(btree) = btree_indices.get(collection) {
1204 let root_before = btree.root_page_id();
1205 let key = id.raw().to_be_bytes();
1206 let _ = btree.delete(&key);
1207 if root_before != btree.root_page_id() {
1208 self.mark_paged_registry_dirty();
1209 }
1210 }
1211 }
1212
1213 let _ = self.unindex_cross_refs(id);
1214 self.remove_from_graph_label_index(collection, id);
1215 self.context_index.remove_entity(id);
1216 Ok(())
1217 }
1218}
1219
1220fn write_string(out: &mut Vec<u8>, value: &str) {
1221 out.extend_from_slice(&(value.len() as u32).to_le_bytes());
1222 out.extend_from_slice(value.as_bytes());
1223}
1224
1225fn write_bytes(out: &mut Vec<u8>, value: &[u8]) {
1226 out.extend_from_slice(&(value.len() as u32).to_le_bytes());
1227 out.extend_from_slice(value);
1228}
1229
1230fn read_u32(data: &[u8], pos: &mut usize) -> io::Result<u32> {
1231 if data.len().saturating_sub(*pos) < 4 {
1232 return Err(io::Error::new(
1233 io::ErrorKind::UnexpectedEof,
1234 "unexpected eof while reading u32",
1235 ));
1236 }
1237 let value = u32::from_le_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]);
1238 *pos += 4;
1239 Ok(value)
1240}
1241
1242fn read_u64(data: &[u8], pos: &mut usize) -> io::Result<u64> {
1243 if data.len().saturating_sub(*pos) < 8 {
1244 return Err(io::Error::new(
1245 io::ErrorKind::UnexpectedEof,
1246 "unexpected eof while reading u64",
1247 ));
1248 }
1249 let value = u64::from_le_bytes([
1250 data[*pos],
1251 data[*pos + 1],
1252 data[*pos + 2],
1253 data[*pos + 3],
1254 data[*pos + 4],
1255 data[*pos + 5],
1256 data[*pos + 6],
1257 data[*pos + 7],
1258 ]);
1259 *pos += 8;
1260 Ok(value)
1261}
1262
1263fn read_string(data: &[u8], pos: &mut usize) -> io::Result<String> {
1264 let len = read_u32(data, pos)? as usize;
1265 if data.len().saturating_sub(*pos) < len {
1266 return Err(io::Error::new(
1267 io::ErrorKind::UnexpectedEof,
1268 "unexpected eof while reading string",
1269 ));
1270 }
1271 let value = std::str::from_utf8(&data[*pos..*pos + len])
1272 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?
1273 .to_string();
1274 *pos += len;
1275 Ok(value)
1276}
1277
1278fn read_bytes(data: &[u8], pos: &mut usize) -> io::Result<Vec<u8>> {
1279 let len = read_u32(data, pos)? as usize;
1280 if data.len().saturating_sub(*pos) < len {
1281 return Err(io::Error::new(
1282 io::ErrorKind::UnexpectedEof,
1283 "unexpected eof while reading bytes",
1284 ));
1285 }
1286 let value = data[*pos..*pos + len].to_vec();
1287 *pos += len;
1288 Ok(value)
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293 use super::*;
1294 use crate::api::{DurabilityMode, GroupCommitOptions};
1295 use std::sync::{Barrier, Mutex as StdMutex, OnceLock};
1296 use std::time::SystemTime;
1297
1298 fn env_lock() -> &'static StdMutex<()> {
1303 static LOCK: OnceLock<StdMutex<()>> = OnceLock::new();
1304 LOCK.get_or_init(|| StdMutex::new(()))
1305 }
1306
1307 fn temp_wal(name: &str) -> PathBuf {
1308 let nanos = SystemTime::now()
1309 .duration_since(std::time::UNIX_EPOCH)
1310 .unwrap()
1311 .as_nanos();
1312 let path = std::env::temp_dir().join(format!(
1313 "rb_commit_coord_{}_{}_{}.wal",
1314 name,
1315 std::process::id(),
1316 nanos
1317 ));
1318 let _ = std::fs::remove_file(&path);
1319 path
1320 }
1321
1322 #[test]
1333 fn group_commit_coalesces_concurrent_autocommits() {
1334 let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1335 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1338
1339 let path = temp_wal("coalesce");
1340 let coord = Arc::new(
1341 StoreCommitCoordinator::open(
1342 path.clone(),
1343 DurabilityMode::WalDurableGrouped,
1344 GroupCommitOptions::default(),
1345 )
1346 .expect("open commit coordinator"),
1347 );
1348
1349 const WRITERS: usize = 32;
1350 let barrier = Arc::new(Barrier::new(WRITERS));
1351 let mut handles = Vec::with_capacity(WRITERS);
1352 for tx in 0..WRITERS {
1353 let coord_c = Arc::clone(&coord);
1354 let barrier_c = Arc::clone(&barrier);
1355 handles.push(std::thread::spawn(move || {
1356 barrier_c.wait();
1360 let action = StoreWalAction::CreateCollection {
1361 name: format!("c{tx}"),
1362 };
1363 coord_c
1364 .append_actions(std::slice::from_ref(&action))
1365 .expect("append_actions");
1366 }));
1367 }
1368 for h in handles {
1369 h.join().expect("writer thread");
1370 }
1371
1372 let fsyncs = coord.fsync_count();
1373 assert!(fsyncs > 0, "expected at least one fsync, got {fsyncs}");
1374 assert!(
1375 fsyncs < WRITERS as u64,
1376 "expected fsyncs ({fsyncs}) to be strictly less than \
1377 concurrent writers ({WRITERS}); coalescing failed"
1378 );
1379
1380 drop(coord);
1381 let _ = std::fs::remove_file(&path);
1382 }
1383
1384 #[test]
1390 fn zero_window_disables_coalescing_floor() {
1391 let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1392 std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1393
1394 let path = temp_wal("zero_window");
1395 let coord = Arc::new(
1396 StoreCommitCoordinator::open(
1397 path.clone(),
1398 DurabilityMode::WalDurableGrouped,
1399 GroupCommitOptions::default(),
1400 )
1401 .expect("open commit coordinator"),
1402 );
1403
1404 const WRITERS: usize = 8;
1405 let barrier = Arc::new(Barrier::new(WRITERS));
1406 let mut handles = Vec::with_capacity(WRITERS);
1407 for tx in 0..WRITERS {
1408 let coord_c = Arc::clone(&coord);
1409 let barrier_c = Arc::clone(&barrier);
1410 handles.push(std::thread::spawn(move || {
1411 barrier_c.wait();
1412 let action = StoreWalAction::CreateCollection {
1413 name: format!("z{tx}"),
1414 };
1415 coord_c
1416 .append_actions(std::slice::from_ref(&action))
1417 .expect("append_actions");
1418 }));
1419 }
1420 for h in handles {
1421 h.join().expect("writer thread");
1422 }
1423
1424 let fsyncs = coord.fsync_count();
1430 assert!(fsyncs >= 1, "expected at least one fsync, got {fsyncs}");
1431
1432 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1433 drop(coord);
1434 let _ = std::fs::remove_file(&path);
1435 }
1436
1437 #[test]
1439 fn resolve_window_precedence() {
1440 let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1441 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1443 let cfg = GroupCommitOptions::default();
1444 assert_eq!(
1445 StoreCommitCoordinator::resolve_window(&cfg),
1446 Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
1447 );
1448
1449 let cfg_ms = GroupCommitOptions {
1451 window_ms: 5,
1452 ..GroupCommitOptions::default()
1453 };
1454 assert_eq!(
1455 StoreCommitCoordinator::resolve_window(&cfg_ms),
1456 Duration::from_millis(5)
1457 );
1458
1459 std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "750");
1461 assert_eq!(
1462 StoreCommitCoordinator::resolve_window(&cfg),
1463 Duration::from_micros(750)
1464 );
1465 assert_eq!(
1466 StoreCommitCoordinator::resolve_window(&cfg_ms),
1467 Duration::from_micros(750)
1468 );
1469
1470 std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1472 assert_eq!(
1473 StoreCommitCoordinator::resolve_window(&cfg),
1474 Duration::from_micros(0)
1475 );
1476
1477 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1478 }
1479}