1use super::*;
2use crate::api::DurabilityMode;
3use crate::storage::wal::{WalReader, WalRecord, WalWriter};
4use std::cell::RefCell;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9
10const DEFAULT_ADAPTIVE_WINDOW_US: u64 = 200;
27
28type WalMutex = parking_lot::Mutex<WalWriter>;
35
36type CommitStateMutex = parking_lot::Mutex<CommitState>;
42type CommitStateCondvar = parking_lot::Condvar;
43use std::time::{Duration, Instant};
44
45static NEXT_STORE_TX_ID: AtomicU64 = AtomicU64::new(1);
46
47#[derive(Debug, Clone)]
48pub(crate) enum StoreWalAction {
49 CreateCollection {
50 name: String,
51 },
52 DropCollection {
53 name: String,
54 },
55 UpsertEntityRecord {
56 collection: String,
57 record: Vec<u8>,
58 },
59 DeleteEntityRecord {
60 collection: String,
61 entity_id: u64,
62 },
63 BulkUpsertEntityRecords {
68 collection: String,
69 records: Vec<Vec<u8>>,
70 },
71 RefreshCollection {
79 collection: String,
80 records: Vec<Vec<u8>>,
81 },
82}
83
84#[derive(Debug, Default)]
85pub(crate) struct DeferredStoreWalActions {
86 actions: Vec<StoreWalAction>,
87}
88
89impl DeferredStoreWalActions {
90 pub(crate) fn is_empty(&self) -> bool {
91 self.actions.is_empty()
92 }
93
94 pub(crate) fn extend(&mut self, other: Self) {
95 self.actions.extend(other.actions);
96 }
97}
98
99thread_local! {
100 static DEFERRED_STORE_WAL_ACTIONS: RefCell<Option<Vec<StoreWalAction>>> =
101 const { RefCell::new(None) };
102}
103
104fn begin_deferred_store_wal_capture() {
105 DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
106 let mut guard = cell.borrow_mut();
107 debug_assert!(guard.is_none());
108 *guard = Some(Vec::new());
109 });
110}
111
112fn capture_deferred_store_wal_actions(actions: Vec<StoreWalAction>) -> bool {
113 DEFERRED_STORE_WAL_ACTIONS.with(|cell| {
114 let mut guard = cell.borrow_mut();
115 if let Some(pending) = guard.as_mut() {
116 pending.extend(actions);
117 true
118 } else {
119 false
120 }
121 })
122}
123
124fn deferred_store_wal_capture_active() -> bool {
125 DEFERRED_STORE_WAL_ACTIONS.with(|cell| cell.borrow().is_some())
126}
127
128fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
129 DEFERRED_STORE_WAL_ACTIONS.with(|cell| DeferredStoreWalActions {
130 actions: cell.borrow_mut().take().unwrap_or_default(),
131 })
132}
133
134impl StoreWalAction {
135 pub(crate) fn upsert_entity(
136 collection: &str,
137 entity: &UnifiedEntity,
138 metadata: Option<&Metadata>,
139 format_version: u32,
140 ) -> Self {
141 Self::UpsertEntityRecord {
142 collection: collection.to_string(),
143 record: UnifiedStore::serialize_entity_record(entity, metadata, format_version),
144 }
145 }
146
147 fn encode(&self) -> Vec<u8> {
148 reddb_file::encode_store_wal_action_frame(&self.to_frame())
149 .expect("encode store wal action frame")
150 }
151
152 fn to_frame(&self) -> reddb_file::StoreWalActionFrame {
153 match self {
154 Self::CreateCollection { name } => {
155 reddb_file::StoreWalActionFrame::CreateCollection { name: name.clone() }
156 }
157 Self::DropCollection { name } => {
158 reddb_file::StoreWalActionFrame::DropCollection { name: name.clone() }
159 }
160 Self::UpsertEntityRecord { collection, record } => {
161 reddb_file::StoreWalActionFrame::UpsertEntityRecord {
162 collection: collection.clone(),
163 record: record.clone(),
164 }
165 }
166 Self::DeleteEntityRecord {
167 collection,
168 entity_id,
169 } => reddb_file::StoreWalActionFrame::DeleteEntityRecord {
170 collection: collection.clone(),
171 entity_id: *entity_id,
172 },
173 Self::BulkUpsertEntityRecords {
174 collection,
175 records,
176 } => reddb_file::StoreWalActionFrame::BulkUpsertEntityRecords {
177 collection: collection.clone(),
178 records: records.clone(),
179 },
180 Self::RefreshCollection {
181 collection,
182 records,
183 } => reddb_file::StoreWalActionFrame::RefreshCollection {
184 collection: collection.clone(),
185 records: records.clone(),
186 },
187 }
188 }
189
190 fn decode(bytes: &[u8]) -> io::Result<Self> {
191 match reddb_file::decode_store_wal_action_frame(bytes)? {
192 reddb_file::StoreWalActionFrame::CreateCollection { name } => {
193 Ok(Self::CreateCollection { name })
194 }
195 reddb_file::StoreWalActionFrame::DropCollection { name } => {
196 Ok(Self::DropCollection { name })
197 }
198 reddb_file::StoreWalActionFrame::UpsertEntityRecord { collection, record } => {
199 Ok(Self::UpsertEntityRecord { collection, record })
200 }
201 reddb_file::StoreWalActionFrame::DeleteEntityRecord {
202 collection,
203 entity_id,
204 } => Ok(Self::DeleteEntityRecord {
205 collection,
206 entity_id,
207 }),
208 reddb_file::StoreWalActionFrame::BulkUpsertEntityRecords {
209 collection,
210 records,
211 } => Ok(Self::BulkUpsertEntityRecords {
212 collection,
213 records,
214 }),
215 reddb_file::StoreWalActionFrame::RefreshCollection {
216 collection,
217 records,
218 } => Ok(Self::RefreshCollection {
219 collection,
220 records,
221 }),
222 }
223 }
224}
225
226#[derive(Debug)]
227struct CommitState {
228 durable_lsn: u64,
229 pending_target_lsn: u64,
230 pending_statements: usize,
231 pending_wal_bytes: u64,
232 first_pending_at: Option<Instant>,
233 shutdown: bool,
234 last_error: Option<String>,
235}
236
237impl CommitState {
238 fn new(initial_durable_lsn: u64) -> Self {
239 Self {
240 durable_lsn: initial_durable_lsn,
241 pending_target_lsn: initial_durable_lsn,
242 pending_statements: 0,
243 pending_wal_bytes: 0,
244 first_pending_at: None,
245 shutdown: false,
246 last_error: None,
247 }
248 }
249}
250
251pub(crate) struct WalAppendQueue {
265 pending: parking_lot::Mutex<WalQueueState>,
280}
281
282struct WalQueueState {
283 next_lsn: u64,
284 entries: Vec<(u64, Vec<u8>)>,
285}
286
287impl WalAppendQueue {
288 fn new(initial_lsn: u64) -> Self {
289 Self {
290 pending: parking_lot::Mutex::new(WalQueueState {
291 next_lsn: initial_lsn,
292 entries: Vec::with_capacity(64),
293 }),
294 }
295 }
296
297 fn enqueue(&self, bytes: Vec<u8>) -> u64 {
303 let len = bytes.len() as u64;
304 let mut state = self.pending.lock();
305 let start_lsn = state.next_lsn;
306 state.next_lsn = start_lsn + len;
307 state.entries.push((start_lsn, bytes));
308 start_lsn + len
309 }
310
311 fn drain_sorted(&self) -> Vec<(u64, Vec<u8>)> {
315 let mut state = self.pending.lock();
316 let mut v = std::mem::take(&mut state.entries);
317 drop(state);
318 v.sort_by_key(|(lsn, _)| *lsn);
319 v
320 }
321
322 fn has_pending(&self) -> bool {
325 !self.pending.lock().entries.is_empty()
326 }
327
328 fn reset(&self, next_lsn: u64) {
334 let mut state = self.pending.lock();
335 state.next_lsn = next_lsn;
336 state.entries.clear();
337 }
338}
339
340pub(crate) struct StoreCommitCoordinator {
341 mode: DurabilityMode,
342 config: crate::api::GroupCommitOptions,
343 wal_path: PathBuf,
344 wal: Arc<WalMutex>,
345 queue: Arc<WalAppendQueue>,
351 state: Arc<(CommitStateMutex, CommitStateCondvar)>,
352 fsync_count: Arc<AtomicU64>,
358}
359
360impl StoreCommitCoordinator {
361 pub(crate) fn should_open(path: &Path, mode: DurabilityMode) -> bool {
362 matches!(
363 mode,
364 DurabilityMode::WalDurableGrouped | DurabilityMode::Async
365 ) || path.exists()
366 }
367
368 pub(crate) fn open(
369 wal_path: impl Into<PathBuf>,
370 mode: DurabilityMode,
371 config: crate::api::GroupCommitOptions,
372 ) -> io::Result<Self> {
373 let wal_path = wal_path.into();
374 let wal = WalWriter::open(&wal_path)?;
375 let initial_durable_lsn = wal.durable_lsn();
376 let initial_current_lsn = wal.current_lsn();
377 let wal = Arc::new(WalMutex::new(wal));
378 let queue = Arc::new(WalAppendQueue::new(initial_current_lsn));
379 let state = Arc::new((
380 CommitStateMutex::new(CommitState::new(initial_durable_lsn)),
381 CommitStateCondvar::new(),
382 ));
383 let fsync_count = Arc::new(AtomicU64::new(0));
384
385 if matches!(
386 mode,
387 DurabilityMode::WalDurableGrouped | DurabilityMode::Async
388 ) {
389 let wal_bg = Arc::clone(&wal);
390 let queue_bg = Arc::clone(&queue);
391 let state_bg = Arc::clone(&state);
392 let fsync_bg = Arc::clone(&fsync_count);
393 let window = Self::resolve_window(&config);
408 let max_statements = config.max_statements.max(1);
409 let max_wal_bytes = config.max_wal_bytes.max(1);
410 std::thread::spawn(move || {
411 Self::run_group_commit_loop(
412 wal_bg,
413 queue_bg,
414 state_bg,
415 fsync_bg,
416 window,
417 max_statements,
418 max_wal_bytes,
419 );
420 });
421 }
422
423 Ok(Self {
424 mode,
425 config,
426 wal_path,
427 wal,
428 queue,
429 state,
430 fsync_count,
431 })
432 }
433
434 fn resolve_window(config: &crate::api::GroupCommitOptions) -> Duration {
443 if let Ok(raw) = std::env::var("REDDB_GROUP_COMMIT_WINDOW_US") {
444 if let Ok(parsed) = raw.parse::<u64>() {
445 return Duration::from_micros(parsed);
446 }
447 }
448 if config.window_ms != 0 {
449 return Duration::from_millis(config.window_ms);
450 }
451 Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
452 }
453
454 #[cfg(test)]
458 pub(crate) fn fsync_count(&self) -> u64 {
459 self.fsync_count.load(Ordering::Relaxed)
460 }
461
462 pub(crate) fn append_actions(&self, actions: &[StoreWalAction]) -> io::Result<()> {
463 if actions.is_empty() {
464 return Ok(());
465 }
466
467 let tx_id = NEXT_STORE_TX_ID.fetch_add(1, Ordering::SeqCst);
468
469 if matches!(self.mode, DurabilityMode::Strict) {
474 let commit_lsn = {
475 let mut wal = self.wal.lock();
476 wal.append(&WalRecord::TxCommitBatch {
477 tx_id,
478 actions: actions.iter().map(StoreWalAction::encode).collect(),
479 })?;
480 wal.current_lsn()
481 };
482 self.force_sync()?;
483 let _ = commit_lsn;
484 return Ok(());
485 }
486
487 let encoded_actions: Vec<Vec<u8>> = actions.iter().map(StoreWalAction::encode).collect();
491 let wal_bytes = encoded_actions.iter().fold(0u64, |total, payload| {
492 total.saturating_add(payload.len() as u64)
493 });
494 let blob = WalRecord::TxCommitBatch {
495 tx_id,
496 actions: encoded_actions,
497 }
498 .encode();
499
500 let commit_lsn = self.queue.enqueue(blob);
501 self.wait_until_durable(commit_lsn, wal_bytes)?;
502 Ok(())
503 }
504
505 pub(crate) fn append_single_record(&self, record: WalRecord) -> io::Result<()> {
510 let blob = record.encode();
511 let wal_bytes = blob.len() as u64;
512 if matches!(self.mode, DurabilityMode::Strict) {
513 {
514 let mut wal = self.wal.lock();
515 wal.append(&record)?;
516 }
517 self.force_sync()?;
518 return Ok(());
519 }
520 let commit_lsn = self.queue.enqueue(blob);
521 self.wait_until_durable(commit_lsn, wal_bytes)?;
522 Ok(())
523 }
524
525 pub(crate) fn force_sync(&self) -> io::Result<()> {
526 {
527 let mut wal = self.wal.lock();
528 wal.sync()?;
529 self.fsync_count.fetch_add(1, Ordering::Relaxed);
530 let durable = wal.durable_lsn();
531 drop(wal);
532 let (state_lock, cond) = &*self.state;
533 let mut state = state_lock.lock();
534 state.durable_lsn = durable;
535 state.pending_target_lsn = durable.max(state.pending_target_lsn);
536 state.pending_statements = 0;
537 state.pending_wal_bytes = 0;
538 state.first_pending_at = None;
539 state.last_error = None;
540 cond.notify_all();
541 }
542 Ok(())
543 }
544
545 pub(crate) fn truncate(&self) -> io::Result<()> {
546 let mut wal = self.wal.lock();
547 wal.truncate()?;
548 let durable = wal.durable_lsn();
549 let current = wal.current_lsn();
550 drop(wal);
551
552 self.queue.reset(current);
557
558 let (state_lock, cond) = &*self.state;
559 let mut state = state_lock.lock();
560 state.durable_lsn = durable;
561 state.pending_target_lsn = durable;
562 state.pending_statements = 0;
563 state.pending_wal_bytes = 0;
564 state.first_pending_at = None;
565 state.last_error = None;
566 cond.notify_all();
567 Ok(())
568 }
569
570 pub(crate) fn replay_into(&self, store: &UnifiedStore) -> io::Result<()> {
571 if !self.wal_path.exists() {
572 return Ok(());
573 }
574
575 let reader = match WalReader::open(&self.wal_path) {
576 Ok(reader) => reader,
577 Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()),
578 Err(err) => return Err(err),
579 };
580
581 let mut tx_states = std::collections::HashMap::<u64, bool>::new();
582 let mut pending = Vec::<(u64, Vec<u8>)>::new();
583
584 for record in reader.iter() {
585 let (_lsn, record) = match record {
586 Ok(record) => record,
587 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
588 Err(err) => return Err(err),
589 };
590 match record {
591 WalRecord::TxCommitBatch { actions, .. } => {
592 for payload in actions {
593 let action = StoreWalAction::decode(&payload)?;
594 store.apply_replayed_action(&action).map_err(|err| {
595 io::Error::other(format!("failed to replay store wal action: {err}"))
596 })?;
597 }
598 }
599 WalRecord::Begin { tx_id } => {
600 tx_states.insert(tx_id, false);
601 }
602 WalRecord::Commit { tx_id } => {
603 tx_states.insert(tx_id, true);
604 }
605 WalRecord::Rollback { tx_id } => {
606 tx_states.remove(&tx_id);
607 }
608 WalRecord::PageWrite {
609 tx_id,
610 page_id: _,
611 data,
612 } => pending.push((tx_id, data)),
613 WalRecord::Checkpoint { .. } => {}
614 WalRecord::VectorInsert {
615 collection,
616 entity_id,
617 vector,
618 } => {
619 let mut map = store.replayed_turbo_inserts.lock();
625 map.entry(collection).or_default().push((entity_id, vector));
626 }
627 WalRecord::FullPageImage { .. } => {
628 }
630 }
631 }
632
633 for (tx_id, payload) in pending {
634 if !tx_states.get(&tx_id).copied().unwrap_or(false) {
635 continue;
636 }
637 let action = StoreWalAction::decode(&payload)?;
638 store.apply_replayed_action(&action).map_err(|err| {
639 io::Error::other(format!("failed to replay store wal action: {err}"))
640 })?;
641 }
642
643 Ok(())
644 }
645
646 fn wait_until_durable(&self, target_lsn: u64, wal_bytes: u64) -> io::Result<()> {
647 match self.mode {
648 DurabilityMode::Strict => self.force_sync(),
649 DurabilityMode::Async => {
654 let (state_lock, cond) = &*self.state;
655 let mut state = state_lock.lock();
656 state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
657 state.pending_statements = state.pending_statements.saturating_add(1);
658 state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
659 state.first_pending_at.get_or_insert_with(Instant::now);
660 cond.notify_all();
661 Ok(())
662 }
663 DurabilityMode::WalDurableGrouped => {
664 let (state_lock, cond) = &*self.state;
665 let mut state = state_lock.lock();
666 state.pending_target_lsn = state.pending_target_lsn.max(target_lsn);
667 state.pending_statements = state.pending_statements.saturating_add(1);
668 state.pending_wal_bytes = state.pending_wal_bytes.saturating_add(wal_bytes);
669 state.first_pending_at.get_or_insert_with(Instant::now);
670 cond.notify_all();
671
672 loop {
673 if let Some(err) = state.last_error.clone() {
674 return Err(io::Error::other(err));
675 }
676 if state.durable_lsn >= target_lsn {
677 return Ok(());
678 }
679 cond.wait(&mut state);
682 }
683 }
684 }
685 }
686
687 fn run_group_commit_loop(
688 wal: Arc<WalMutex>,
689 queue: Arc<WalAppendQueue>,
690 state: Arc<(CommitStateMutex, CommitStateCondvar)>,
691 fsync_count: Arc<AtomicU64>,
692 window: Duration,
693 max_statements: usize,
694 max_wal_bytes: u64,
695 ) {
696 let (state_lock, cond) = &*state;
697 loop {
698 let target_lsn = {
699 let mut guard = state_lock.lock();
700
701 while !guard.shutdown && guard.pending_target_lsn <= guard.durable_lsn {
702 cond.wait(&mut guard);
703 }
704
705 if guard.shutdown {
706 return;
707 }
708
709 let immediate = window.is_zero()
710 || guard.pending_statements >= max_statements
711 || guard.pending_wal_bytes >= max_wal_bytes;
712
713 if !immediate {
714 let deadline = guard.first_pending_at.unwrap_or_else(Instant::now) + window;
715 let now = Instant::now();
716 if now < deadline {
717 let timeout = deadline.saturating_duration_since(now);
718 let _ = cond.wait_for(&mut guard, timeout);
719 if guard.shutdown {
720 return;
721 }
722 if guard.pending_target_lsn <= guard.durable_lsn {
723 continue;
724 }
725 let should_wait_again = guard.pending_statements < max_statements
726 && guard.pending_wal_bytes < max_wal_bytes
727 && guard
728 .first_pending_at
729 .map(|first| first.elapsed() < window)
730 .unwrap_or(false);
731 if should_wait_again {
732 continue;
733 }
734 }
735 }
736
737 guard.pending_target_lsn
738 };
739
740 let batches = queue.drain_sorted();
746
747 let sync_result = {
748 let mut wal = wal.lock();
749 let mut write_err: Option<io::Error> = None;
750 for (_lsn, bytes) in batches {
751 if let Err(e) = wal.append_bytes(&bytes) {
752 write_err = Some(e);
753 break;
754 }
755 }
756 match write_err {
757 Some(e) => Err(e),
758 None => wal.sync().map(|_| {
759 fsync_count.fetch_add(1, Ordering::Relaxed);
764 wal.durable_lsn()
765 }),
766 }
767 };
768
769 let more_pending = queue.has_pending();
774
775 let mut guard = state_lock.lock();
776 match sync_result {
777 Ok(durable_lsn) => {
778 guard.durable_lsn = durable_lsn;
779 if !more_pending {
780 guard.pending_statements = 0;
781 guard.pending_wal_bytes = 0;
782 guard.first_pending_at = None;
783 }
784 guard.last_error = None;
785 let _ = target_lsn;
786 }
787 Err(err) => {
788 guard.last_error = Some(err.to_string());
789 }
790 }
791 cond.notify_all();
792 }
793 }
794}
795
796impl Drop for StoreCommitCoordinator {
797 fn drop(&mut self) {
798 let (state_lock, cond) = &*self.state;
799 let mut state = state_lock.lock();
801 state.shutdown = true;
802 cond.notify_all();
803 }
804}
805
806impl UnifiedStore {
807 pub(crate) fn begin_deferred_store_wal_capture() {
808 begin_deferred_store_wal_capture();
809 }
810
811 pub(crate) fn take_deferred_store_wal_capture() -> DeferredStoreWalActions {
812 take_deferred_store_wal_capture()
813 }
814
815 pub(crate) fn append_deferred_store_wal_actions(
816 &self,
817 actions: DeferredStoreWalActions,
818 ) -> Result<(), StoreError> {
819 if actions.actions.is_empty() {
820 return Ok(());
821 }
822 if self.config.embedded_wal_path.is_some() {
823 return self.append_embedded_store_wal_actions(&actions.actions);
824 }
825 match self.config.durability_mode {
826 DurabilityMode::Strict => self.flush_paged_state(),
827 DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
828 if let Some(commit) = &self.commit {
829 commit
830 .append_actions(&actions.actions)
831 .map_err(StoreError::Io)
832 } else {
833 self.flush_paged_state()
834 }
835 }
836 }
837 }
838
839 pub(crate) fn append_vector_insert_record(
846 &self,
847 collection: &str,
848 entity_id: u64,
849 vector: &[f32],
850 ) -> std::io::Result<()> {
851 let Some(commit) = &self.commit else {
852 return Ok(());
853 };
854 let record = WalRecord::VectorInsert {
855 collection: collection.to_string(),
856 entity_id,
857 vector: vector.to_vec(),
858 };
859 commit.append_single_record(record)
860 }
861
862 pub(crate) fn finish_paged_write(
863 &self,
864 actions: impl IntoIterator<Item = StoreWalAction>,
865 ) -> Result<(), StoreError> {
866 let actions: Vec<StoreWalAction> = actions.into_iter().collect();
867 if deferred_store_wal_capture_active() {
868 let captured = capture_deferred_store_wal_actions(actions);
869 debug_assert!(captured);
870 return Ok(());
871 }
872 if self.config.embedded_wal_path.is_some() {
873 return self.append_embedded_store_wal_actions(&actions);
874 }
875 match self.config.durability_mode {
876 DurabilityMode::Strict => self.flush_paged_state(),
877 DurabilityMode::WalDurableGrouped | DurabilityMode::Async => {
878 if let Some(commit) = &self.commit {
879 commit.append_actions(&actions).map_err(StoreError::Io)?;
880 Ok(())
881 } else {
882 self.flush_paged_state()
883 }
884 }
885 }
886 }
887
888 pub(crate) fn apply_encoded_store_wal_action(&self, payload: &[u8]) -> Result<(), StoreError> {
889 let action = StoreWalAction::decode(payload).map_err(StoreError::Io)?;
890 self.apply_replayed_action(&action)
891 }
892
893 fn append_embedded_store_wal_actions(
894 &self,
895 actions: &[StoreWalAction],
896 ) -> Result<(), StoreError> {
897 let Some(path) = self.config.embedded_wal_path.as_deref() else {
898 return Ok(());
899 };
900 if actions.is_empty() {
901 return Ok(());
902 }
903 let payloads: Vec<Vec<u8>> = actions.iter().map(StoreWalAction::encode).collect();
904 let encoded_len = crate::storage::EmbeddedRdbArtifact::wal_payloads_encoded_len(&payloads)
905 .map_err(|err| StoreError::Io(std::io::Error::other(err.to_string())))?;
906 match crate::storage::EmbeddedRdbArtifact::append_wal_payloads(path, &payloads) {
907 Ok(_) => Ok(()),
908 Err(crate::api::RedDBError::InvalidOperation(msg))
909 if msg.contains("embedded wal region full") =>
910 {
911 let snapshot = self.to_binary_dump_bytes();
912 crate::storage::EmbeddedRdbArtifact::write_snapshot_with_wal_capacity(
913 path,
914 &snapshot,
915 encoded_len,
916 )
917 .map_err(|err| StoreError::Io(std::io::Error::other(err.to_string())))?;
918 crate::storage::EmbeddedRdbArtifact::append_wal_payloads(path, &payloads)
919 .map(|_| ())
920 .map_err(|err| StoreError::Io(std::io::Error::other(err.to_string())))
921 }
922 Err(err) => Err(StoreError::Io(std::io::Error::other(err.to_string()))),
923 }
924 }
925
926 pub(crate) fn apply_replayed_action(&self, action: &StoreWalAction) -> Result<(), StoreError> {
927 match action {
928 StoreWalAction::CreateCollection { name } => {
929 if self.get_collection(name).is_none() {
930 let _ = self.create_collection_in_memory(name);
931 }
932 Ok(())
933 }
934 StoreWalAction::DropCollection { name } => self.drop_collection_in_memory(name),
935 StoreWalAction::UpsertEntityRecord { collection, record } => {
936 self.apply_replayed_upsert(collection, record)
937 }
938 StoreWalAction::DeleteEntityRecord {
939 collection,
940 entity_id,
941 } => self.apply_replayed_delete(collection, EntityId::new(*entity_id)),
942 StoreWalAction::BulkUpsertEntityRecords {
943 collection,
944 records,
945 } => {
946 for record in records {
947 self.apply_replayed_upsert(collection, record)?;
948 }
949 Ok(())
950 }
951 StoreWalAction::RefreshCollection {
952 collection,
953 records,
954 } => self.apply_replayed_refresh_collection(collection, records),
955 }
956 }
957
958 pub fn refresh_collection(
975 &self,
976 name: &str,
977 entities: Vec<UnifiedEntity>,
978 ) -> Result<Vec<Vec<u8>>, StoreError> {
979 let fv = self.format_version();
980
981 let new_manager = Arc::new(SegmentManager::with_config(
984 name,
985 self.config.manager_config.clone(),
986 ));
987
988 let mut prepared = entities;
989 for entity in &mut prepared {
990 if entity.id.raw() == 0 {
991 entity.id = self.next_entity_id();
992 } else {
993 self.register_entity_id(entity.id);
994 }
995 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
996 if *row_id == 0 {
997 *row_id = new_manager.next_row_id();
998 } else {
999 new_manager.register_row_id(*row_id);
1000 }
1001 }
1002 entity.ensure_table_logical_id();
1003 }
1004
1005 let serialized: Vec<Vec<u8>> = prepared
1006 .iter()
1007 .map(|e| Self::serialize_entity_record(e, None, fv))
1008 .collect();
1009
1010 new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1011 StoreError::Io(std::io::Error::other(format!(
1012 "refresh_collection: bulk_insert into new manager failed: {e}"
1013 )))
1014 })?;
1015
1016 self.swap_collection_state(name, new_manager, &prepared, &serialized);
1017
1018 self.finish_paged_write([StoreWalAction::RefreshCollection {
1019 collection: name.to_string(),
1020 records: serialized.clone(),
1021 }])?;
1022
1023 Ok(serialized)
1024 }
1025
1026 fn swap_collection_state(
1032 &self,
1033 name: &str,
1034 new_manager: Arc<SegmentManager>,
1035 prepared: &[UnifiedEntity],
1036 serialized: &[Vec<u8>],
1037 ) {
1038 {
1039 let mut collections = self.collections.write();
1040 collections.insert(name.to_string(), new_manager);
1041 }
1042
1043 self.entity_cache
1044 .retain(|_, (collection, _)| collection != name);
1045 self.remove_from_graph_label_index_batch(
1046 name,
1047 &prepared.iter().map(|e| e.id).collect::<Vec<_>>(),
1048 );
1049
1050 if let Some(pager) = &self.pager {
1051 let new_btree = Arc::new(BTree::new(Arc::clone(pager)));
1052 let mut sorted: Vec<(Vec<u8>, Vec<u8>)> = prepared
1053 .iter()
1054 .zip(serialized.iter())
1055 .map(|(e, r)| (e.id.raw().to_be_bytes().to_vec(), r.clone()))
1056 .collect();
1057 sorted.sort_by(|a, b| a.0.cmp(&b.0));
1058 if !sorted.is_empty() {
1059 let _ = new_btree.bulk_insert_sorted(&sorted);
1060 }
1061 self.btree_indices
1062 .write()
1063 .insert(name.to_string(), new_btree);
1064 self.mark_paged_registry_dirty();
1065 }
1066 }
1067
1068 fn apply_replayed_refresh_collection(
1069 &self,
1070 collection: &str,
1071 records: &[Vec<u8>],
1072 ) -> Result<(), StoreError> {
1073 let new_manager = Arc::new(SegmentManager::with_config(
1074 collection,
1075 self.config.manager_config.clone(),
1076 ));
1077
1078 let mut prepared: Vec<UnifiedEntity> = Vec::with_capacity(records.len());
1079 for record in records {
1080 let (entity, _metadata) =
1081 Self::deserialize_entity_record(record, self.format_version())?;
1082 self.register_entity_id(entity.id);
1083 if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1084 new_manager.register_row_id(*row_id);
1085 }
1086 prepared.push(entity);
1087 }
1088
1089 if !prepared.is_empty() {
1090 new_manager.bulk_insert(prepared.clone()).map_err(|e| {
1091 StoreError::Io(std::io::Error::other(format!(
1092 "replay refresh_collection: bulk_insert failed: {e}"
1093 )))
1094 })?;
1095 }
1096
1097 self.swap_collection_state(collection, new_manager, &prepared, records);
1098 Ok(())
1099 }
1100
1101 pub fn refresh_collection_from_records(
1115 &self,
1116 name: &str,
1117 records: Vec<Vec<u8>>,
1118 ) -> Result<(), StoreError> {
1119 self.apply_replayed_refresh_collection(name, &records)?;
1120 self.finish_paged_write([StoreWalAction::RefreshCollection {
1121 collection: name.to_string(),
1122 records,
1123 }])?;
1124 Ok(())
1125 }
1126
1127 pub(crate) fn create_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1128 let mut collections = self.collections.write();
1129 if collections.contains_key(name) {
1130 return Ok(());
1131 }
1132 let manager = SegmentManager::with_config(name, self.config.manager_config.clone());
1133 collections.insert(name.to_string(), Arc::new(manager));
1134 self.mark_paged_registry_dirty();
1135 Ok(())
1136 }
1137
1138 fn drop_collection_in_memory(&self, name: &str) -> Result<(), StoreError> {
1139 let manager = {
1140 let mut collections = self.collections.write();
1141 match collections.remove(name) {
1142 Some(manager) => manager,
1143 None => return Ok(()),
1144 }
1145 };
1146
1147 let entities = manager.query_all(|_| true);
1148 let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
1149 for entity_id in &entity_ids {
1150 self.context_index.remove_entity(*entity_id);
1151 let _ = self.unindex_cross_refs(*entity_id);
1152 }
1153 self.btree_indices.write().remove(name);
1154 self.entity_cache.retain(|entity_id, (collection, _)| {
1155 collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
1156 });
1157 self.remove_from_graph_label_index_batch(name, &entity_ids);
1158 self.mark_paged_registry_dirty();
1159 Ok(())
1160 }
1161
1162 fn apply_replayed_upsert(&self, collection: &str, record: &[u8]) -> Result<(), StoreError> {
1163 self.create_collection_in_memory(collection)?;
1164 let (entity, metadata) = Self::deserialize_entity_record(record, self.format_version())?;
1165 let manager = self
1166 .get_collection(collection)
1167 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1168
1169 self.register_entity_id(entity.id);
1170 if let EntityKind::TableRow { row_id, .. } = &entity.kind {
1171 manager.register_row_id(*row_id);
1172 }
1173
1174 self.context_index.remove_entity(entity.id);
1175 let _ = self.unindex_cross_refs(entity.id);
1176 self.remove_from_graph_label_index(collection, entity.id);
1177
1178 if manager.get(entity.id).is_some() {
1179 manager
1180 .update_with_metadata(entity.clone(), metadata.as_ref())
1181 .map_err(StoreError::from)?;
1182 } else {
1183 manager.insert(entity.clone())?;
1184 if let Some(metadata) = metadata.as_ref() {
1185 manager.set_metadata(entity.id, metadata.clone())?;
1186 }
1187 }
1188
1189 self.context_index.index_entity(collection, &entity);
1190 if let EntityKind::GraphNode(node) = &entity.kind {
1191 self.update_graph_label_index(collection, &node.label, entity.id);
1192 }
1193 self.index_cross_refs(&entity, collection)?;
1194
1195 if let Some(pager) = &self.pager {
1196 let mut btree_indices = self.btree_indices.write();
1197 let btree = btree_indices
1198 .entry(collection.to_string())
1199 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
1200 let root_before = btree.root_page_id();
1201 let key = entity.id.raw().to_be_bytes();
1202 match btree.insert(&key, record) {
1203 Ok(_) => {}
1204 Err(BTreeError::DuplicateKey) => {
1205 let _ = btree.delete(&key);
1206 let _ = btree.insert(&key, record);
1207 }
1208 Err(err) => {
1209 return Err(StoreError::Io(io::Error::other(format!(
1210 "replay upsert btree error: {err}"
1211 ))));
1212 }
1213 }
1214 if root_before != btree.root_page_id() {
1215 self.mark_paged_registry_dirty();
1216 }
1217 }
1218
1219 Ok(())
1220 }
1221
1222 fn apply_replayed_delete(&self, collection: &str, id: EntityId) -> Result<(), StoreError> {
1223 self.entity_cache.remove(id.raw());
1224 if let Some(manager) = self.get_collection(collection) {
1225 let deleted = manager.delete(id)?;
1226 if !deleted {
1227 return Ok(());
1228 }
1229 } else {
1230 return Ok(());
1231 }
1232
1233 if let Some(_pager) = &self.pager {
1234 let btree_indices = self.btree_indices.read();
1235 if let Some(btree) = btree_indices.get(collection) {
1236 let root_before = btree.root_page_id();
1237 let key = id.raw().to_be_bytes();
1238 let _ = btree.delete(&key);
1239 if root_before != btree.root_page_id() {
1240 self.mark_paged_registry_dirty();
1241 }
1242 }
1243 }
1244
1245 let _ = self.unindex_cross_refs(id);
1246 self.remove_from_graph_label_index(collection, id);
1247 self.context_index.remove_entity(id);
1248 Ok(())
1249 }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254 use super::*;
1255 use crate::api::{DurabilityMode, GroupCommitOptions};
1256 use std::sync::{Barrier, Mutex as StdMutex, OnceLock};
1257 use std::time::SystemTime;
1258
1259 fn env_lock() -> &'static StdMutex<()> {
1264 static LOCK: OnceLock<StdMutex<()>> = OnceLock::new();
1265 LOCK.get_or_init(|| StdMutex::new(()))
1266 }
1267
1268 fn temp_wal(name: &str) -> PathBuf {
1269 let nanos = SystemTime::now()
1270 .duration_since(std::time::UNIX_EPOCH)
1271 .unwrap()
1272 .as_nanos();
1273 let path = reddb_file::layout::store_commit_coord_temp_wal_path(
1274 &std::env::temp_dir(),
1275 name,
1276 std::process::id(),
1277 nanos,
1278 );
1279 let _ = std::fs::remove_file(&path);
1280 path
1281 }
1282
1283 #[test]
1294 fn group_commit_coalesces_concurrent_autocommits() {
1295 let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1296 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1299
1300 let path = temp_wal("coalesce");
1301 let coord = Arc::new(
1302 StoreCommitCoordinator::open(
1303 path.clone(),
1304 DurabilityMode::WalDurableGrouped,
1305 GroupCommitOptions::default(),
1306 )
1307 .expect("open commit coordinator"),
1308 );
1309
1310 const WRITERS: usize = 32;
1311 let barrier = Arc::new(Barrier::new(WRITERS));
1312 let mut handles = Vec::with_capacity(WRITERS);
1313 for tx in 0..WRITERS {
1314 let coord_c = Arc::clone(&coord);
1315 let barrier_c = Arc::clone(&barrier);
1316 handles.push(std::thread::spawn(move || {
1317 barrier_c.wait();
1321 let action = StoreWalAction::CreateCollection {
1322 name: format!("c{tx}"),
1323 };
1324 coord_c
1325 .append_actions(std::slice::from_ref(&action))
1326 .expect("append_actions");
1327 }));
1328 }
1329 for h in handles {
1330 h.join().expect("writer thread");
1331 }
1332
1333 let fsyncs = coord.fsync_count();
1334 assert!(fsyncs > 0, "expected at least one fsync, got {fsyncs}");
1335 assert!(
1336 fsyncs < WRITERS as u64,
1337 "expected fsyncs ({fsyncs}) to be strictly less than \
1338 concurrent writers ({WRITERS}); coalescing failed"
1339 );
1340
1341 drop(coord);
1342 let _ = std::fs::remove_file(&path);
1343 }
1344
1345 #[test]
1351 fn zero_window_disables_coalescing_floor() {
1352 let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1353 std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1354
1355 let path = temp_wal("zero_window");
1356 let coord = Arc::new(
1357 StoreCommitCoordinator::open(
1358 path.clone(),
1359 DurabilityMode::WalDurableGrouped,
1360 GroupCommitOptions::default(),
1361 )
1362 .expect("open commit coordinator"),
1363 );
1364
1365 const WRITERS: usize = 8;
1366 let barrier = Arc::new(Barrier::new(WRITERS));
1367 let mut handles = Vec::with_capacity(WRITERS);
1368 for tx in 0..WRITERS {
1369 let coord_c = Arc::clone(&coord);
1370 let barrier_c = Arc::clone(&barrier);
1371 handles.push(std::thread::spawn(move || {
1372 barrier_c.wait();
1373 let action = StoreWalAction::CreateCollection {
1374 name: format!("z{tx}"),
1375 };
1376 coord_c
1377 .append_actions(std::slice::from_ref(&action))
1378 .expect("append_actions");
1379 }));
1380 }
1381 for h in handles {
1382 h.join().expect("writer thread");
1383 }
1384
1385 let fsyncs = coord.fsync_count();
1391 assert!(fsyncs >= 1, "expected at least one fsync, got {fsyncs}");
1392
1393 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1394 drop(coord);
1395 let _ = std::fs::remove_file(&path);
1396 }
1397
1398 #[test]
1400 fn resolve_window_precedence() {
1401 let _env = env_lock().lock().unwrap_or_else(|p| p.into_inner());
1402 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1404 let cfg = GroupCommitOptions::default();
1405 assert_eq!(
1406 StoreCommitCoordinator::resolve_window(&cfg),
1407 Duration::from_micros(DEFAULT_ADAPTIVE_WINDOW_US)
1408 );
1409
1410 let cfg_ms = GroupCommitOptions {
1412 window_ms: 5,
1413 ..GroupCommitOptions::default()
1414 };
1415 assert_eq!(
1416 StoreCommitCoordinator::resolve_window(&cfg_ms),
1417 Duration::from_millis(5)
1418 );
1419
1420 std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "750");
1422 assert_eq!(
1423 StoreCommitCoordinator::resolve_window(&cfg),
1424 Duration::from_micros(750)
1425 );
1426 assert_eq!(
1427 StoreCommitCoordinator::resolve_window(&cfg_ms),
1428 Duration::from_micros(750)
1429 );
1430
1431 std::env::set_var("REDDB_GROUP_COMMIT_WINDOW_US", "0");
1433 assert_eq!(
1434 StoreCommitCoordinator::resolve_window(&cfg),
1435 Duration::from_micros(0)
1436 );
1437
1438 std::env::remove_var("REDDB_GROUP_COMMIT_WINDOW_US");
1439 }
1440}