1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use crate::infrastructure::cluster::crdt::MergeStrategy;
use std::path::{Path, PathBuf};
/// Configuration for [`EmbeddedCore`](super::EmbeddedCore).
///
/// Created via `Config::builder()`. Defaults to in-memory, single-tenant mode.
#[derive(Debug, Clone)]
pub struct EmbeddedConfig {
data_dir: Option<PathBuf>,
wal_sync_on_write: bool,
wal_fsync_interval_ms: Option<u64>,
parquet_flush_interval_secs: u64,
single_tenant: bool,
node_id: Option<u32>,
merge_strategies: Vec<(String, MergeStrategy)>,
}
impl EmbeddedConfig {
/// Begin building a new configuration.
pub fn builder() -> ConfigBuilder {
ConfigBuilder::default()
}
/// Directory for durable storage (WAL + Parquet).
/// `None` means in-memory only.
pub fn data_dir(&self) -> Option<&Path> {
self.data_dir.as_deref()
}
/// Whether the WAL syncs to disk on every write.
pub fn wal_sync_on_write(&self) -> bool {
self.wal_sync_on_write
}
/// Interval-based fsync period in milliseconds.
/// `None` means no background fsync task.
pub fn wal_fsync_interval_ms(&self) -> Option<u64> {
self.wal_fsync_interval_ms
}
/// Whether single-tenant mode is enabled.
pub fn single_tenant(&self) -> bool {
self.single_tenant
}
/// Node ID for HLC-based bidirectional sync.
/// `None` disables sync capabilities (single-node mode).
pub fn node_id(&self) -> Option<u32> {
self.node_id
}
/// Per-event-type merge strategies for conflict resolution.
pub fn merge_strategies(&self) -> &[(String, MergeStrategy)] {
&self.merge_strategies
}
pub(crate) fn parquet_flush_interval_secs(&self) -> u64 {
self.parquet_flush_interval_secs
}
}
/// Builder for [`EmbeddedConfig`](super::Config).
pub struct ConfigBuilder {
data_dir: Option<PathBuf>,
wal_sync_on_write: bool,
wal_fsync_interval_ms: Option<u64>,
parquet_flush_interval_secs: u64,
single_tenant: bool,
node_id: Option<u32>,
merge_strategies: Vec<(String, MergeStrategy)>,
}
impl Default for ConfigBuilder {
fn default() -> Self {
Self {
data_dir: None,
wal_sync_on_write: true,
wal_fsync_interval_ms: None,
parquet_flush_interval_secs: 300,
single_tenant: true,
node_id: None,
merge_strategies: Vec::new(),
}
}
}
impl ConfigBuilder {
/// Set the directory where WAL and Parquet files are stored.
/// When not called, EmbeddedCore runs in-memory (no durability).
pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.data_dir = Some(path.into());
self
}
/// Control whether WAL syncs to disk on every write.
/// Default: `true`. Set `false` for higher throughput at the cost of
/// potential data loss on crash within the last sync window.
pub fn wal_sync_on_write(mut self, sync: bool) -> Self {
self.wal_sync_on_write = sync;
self
}
/// Set the interval for background coalesced fsync in milliseconds.
///
/// When set, a background task flushes and fsyncs the WAL every `ms`
/// milliseconds instead of on every write. This gives near-zero write
/// latency with a bounded data-loss window of at most `ms` milliseconds.
///
/// Automatically disables per-write `sync_on_write` to prevent double-fsync.
/// Default: `None` (no background fsync task).
pub fn wal_fsync_interval_ms(mut self, ms: u64) -> Self {
self.wal_fsync_interval_ms = Some(ms);
self
}
/// How often Parquet files are flushed (in seconds). Default: 300.
pub fn parquet_flush_interval_secs(mut self, secs: u64) -> Self {
self.parquet_flush_interval_secs = secs;
self
}
/// Enable or disable single-tenant mode. Default: `true`.
/// In single-tenant mode, all events automatically use "default" as tenant_id.
pub fn single_tenant(mut self, enabled: bool) -> Self {
self.single_tenant = enabled;
self
}
/// Set a node ID for HLC-based bidirectional sync.
/// Each instance in a sync group must have a unique node ID.
/// When not set, sync capabilities are disabled (single-node mode).
pub fn node_id(mut self, id: u32) -> Self {
self.node_id = Some(id);
self
}
/// Register a per-event-type merge strategy for conflict resolution.
///
/// The `prefix` is matched against event types: `"config."` matches
/// `"config.updated"`, `"config.deleted"`, etc. The longest matching
/// prefix wins. Unmatched types default to `AppendOnly`.
pub fn merge_strategy(mut self, prefix: impl Into<String>, strategy: MergeStrategy) -> Self {
self.merge_strategies.push((prefix.into(), strategy));
self
}
/// Build the configuration. Returns `Result` for forward compatibility.
pub fn build(self) -> crate::error::Result<EmbeddedConfig> {
Ok(EmbeddedConfig {
data_dir: self.data_dir,
wal_sync_on_write: self.wal_sync_on_write,
wal_fsync_interval_ms: self.wal_fsync_interval_ms,
parquet_flush_interval_secs: self.parquet_flush_interval_secs,
single_tenant: self.single_tenant,
node_id: self.node_id,
merge_strategies: self.merge_strategies,
})
}
}