1use std::io;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak};
8use std::thread::JoinHandle;
9use std::time::Instant;
10
11use crate::metric::KevyMetric;
12
13use kevy_persist::{Aof, Argv, RewriteStats};
14use kevy_store::{ExpireStats, StoreError};
15
16use crate::config::Config;
17use crate::pubsub::PubsubBus;
18use crate::shard::{build_shards, shard_idx};
19
20pub(crate) type Shards = Arc<Vec<Arc<RwLock<Inner>>>>;
24
25#[derive(Clone)]
50pub struct Store {
51 shards: Shards,
52 guard: Arc<DropGuard>,
55 config: Config,
56}
57
58pub struct WeakStore {
64 shards: Weak<Vec<Arc<RwLock<Inner>>>>,
65 guard: Weak<DropGuard>,
66 config: Config,
67}
68
69impl WeakStore {
70 pub fn upgrade(&self) -> Option<Store> {
73 Some(Store {
74 shards: self.shards.upgrade()?,
75 guard: self.guard.upgrade()?,
76 config: self.config.clone(),
77 })
78 }
79}
80
81pub(crate) struct Inner {
82 pub(crate) store: kevy_store::Store,
83 pub(crate) aof: Option<Aof>,
84 pub(crate) bus: PubsubBus,
87}
88
89impl Inner {
90 pub(crate) fn new(store: kevy_store::Store, aof: Option<Aof>) -> Self {
91 Inner { store, aof, bus: PubsubBus::new() }
92 }
93}
94
95pub(crate) struct DropGuard {
99 reaper_stop: Option<Arc<AtomicBool>>,
100 reaper_join: Mutex<Option<JoinHandle<()>>>,
101 shards_for_flush: Shards,
102}
103
104impl Store {
105 pub fn open(config: Config) -> io::Result<Self> {
113 let shards: Shards = Arc::new(build_shards(&config)?);
114 let (reaper_stop, reaper_join) = crate::reaper::spawn_reaper(&config, &shards)?;
115 let guard = Arc::new(DropGuard {
116 reaper_stop,
117 reaper_join: Mutex::new(reaper_join),
118 shards_for_flush: shards.clone(),
119 });
120 Ok(Store { shards, guard, config })
121 }
122
123 pub fn downgrade(&self) -> WeakStore {
125 WeakStore {
126 shards: Arc::downgrade(&self.shards),
127 guard: Arc::downgrade(&self.guard),
128 config: self.config.clone(),
129 }
130 }
131
132 pub fn config(&self) -> &Config {
135 &self.config
136 }
137
138 pub fn with<F, R>(&self, f: F) -> R
148 where
149 F: FnOnce(&mut kevy_store::Store) -> R,
150 {
151 let mut g = self.lock();
152 f(&mut g.store)
153 }
154
155 pub fn with_key<F, R>(&self, key: &[u8], f: F) -> R
157 where
158 F: FnOnce(&mut kevy_store::Store) -> R,
159 {
160 let mut g = self.wshard(key);
161 f(&mut g.store)
162 }
163
164 pub fn collect_keys(&self, pattern: Option<&[u8]>, limit: Option<usize>) -> Vec<Vec<u8>> {
170 let mut out = Vec::new();
171 for shard in self.shards.iter() {
172 if limit.is_some_and(|l| out.len() >= l) {
173 break;
174 }
175 let remaining = limit.map(|l| l - out.len());
176 out.extend(lock_read(shard).store.collect_keys(pattern, remaining));
177 }
178 out
179 }
180
181 pub fn for_each_shard<F: FnMut(&mut kevy_store::Store)>(&self, mut f: F) {
186 for shard in self.shards.iter() {
187 f(&mut lock_write(shard).store);
188 }
189 }
190
191 #[inline]
193 pub fn shard_count(&self) -> usize {
194 self.shards.len()
195 }
196
197 pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
200 let mut g = match parts.get(1) {
201 Some(key) => self.wshard(key),
202 None => self.lock(),
203 };
204 if let Some(aof) = &mut g.aof {
205 let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
206 aof.append(&argv)?;
207 }
208 Ok(())
209 }
210
211 pub fn tick(&self) -> ExpireStats {
216 let mut total = ExpireStats::default();
217 for shard in self.shards.iter() {
218 let stats = {
219 let mut g = lock_write(shard);
220 g.store.tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
221 };
222 total.sampled += stats.sampled;
223 total.expired += stats.expired;
224 crate::reaper::concurrent_auto_rewrite(
227 shard,
228 self.config.auto_aof_rewrite_pct,
229 self.config.auto_aof_rewrite_min_size,
230 self.config.metric_sink.as_ref(),
231 );
232 }
233 total
234 }
235
236 pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
240 let mut agg: Option<RewriteStats> = None;
241 for shard in self.shards.iter() {
242 let start = Instant::now();
243 let (view, tmp, before_bytes) = {
246 let mut g = lock_write(shard);
247 let Inner { store, aof, bus: _ } = &mut *g;
248 let Some(aof) = aof else { continue };
249 if aof.is_rewriting() {
250 continue;
251 }
252 let before = aof.size_bytes();
253 let view = store.collect_snapshot();
254 (view, aof.begin_view_rewrite()?, before)
255 };
256 let keys = match kevy_persist::dump_aof(&tmp, &view) {
258 Ok((keys, _)) => keys,
259 Err(e) => {
260 let mut g = lock_write(shard);
261 if let Some(aof) = &mut g.aof {
262 aof.abort_concurrent_rewrite();
263 }
264 let _ = std::fs::remove_file(&tmp);
265 return Err(e);
266 }
267 };
268 let mut g = lock_write(shard);
270 let Some(aof) = &mut g.aof else { continue };
271 let stats = match aof.finish_concurrent_rewrite(&tmp, keys) {
272 Ok(s) => s,
273 Err(e) => {
274 aof.abort_concurrent_rewrite();
275 let _ = std::fs::remove_file(&tmp);
276 return Err(e);
277 }
278 };
279 if let Some(sink) = &self.config.metric_sink {
280 sink.emit(KevyMetric::Rewrite {
281 keys: stats.keys,
282 before_bytes,
283 after_bytes: stats.bytes,
284 elapsed_ms: start.elapsed().as_millis() as u64,
285 });
286 }
287 let acc = agg.get_or_insert(RewriteStats { keys: 0, bytes: 0 });
288 acc.keys += stats.keys;
289 acc.bytes += stats.bytes;
290 }
291 Ok(agg)
292 }
293
294 pub fn save_snapshot(&self) -> io::Result<bool> {
297 let Some(dir) = self.config.data_dir.as_ref() else {
298 return Ok(false);
299 };
300 let n = self.shards.len();
301 for (i, shard) in self.shards.iter().enumerate() {
302 let name = if n == 1 {
303 self.config.snapshot_filename.clone()
304 } else {
305 kevy_persist::layout::snapshot_file(i)
306 };
307 save_shard_snapshot(shard, &dir.join(name))?;
308 }
309 Ok(true)
310 }
311
312 pub(crate) fn inner_handle(&self) -> Arc<RwLock<Inner>> {
316 self.shards[0].clone()
317 }
318
319 pub(crate) fn guard_handle(&self) -> Arc<DropGuard> {
321 self.guard.clone()
322 }
323
324 fn shard_for(&self, key: &[u8]) -> &Arc<RwLock<Inner>> {
325 &self.shards[shard_idx(key, self.shards.len())]
326 }
327
328 pub(crate) fn wshard(&self, key: &[u8]) -> RwLockWriteGuard<'_, Inner> {
330 lock_write(self.shard_for(key))
331 }
332
333 pub(crate) fn rshard(&self, key: &[u8]) -> RwLockReadGuard<'_, Inner> {
336 lock_read(self.shard_for(key))
337 }
338
339 pub(crate) fn lock(&self) -> RwLockWriteGuard<'_, Inner> {
341 lock_write(&self.shards[0])
342 }
343
344 pub(crate) fn sum_shards<F: Fn(&mut Inner) -> usize>(&self, f: F) -> usize {
346 self.shards.iter().map(|s| f(&mut lock_write(s))).sum()
347 }
348
349 pub(crate) fn sum_shards_u64<F: Fn(&mut Inner) -> u64>(&self, f: F) -> u64 {
351 self.shards.iter().map(|s| f(&mut lock_write(s))).sum()
352 }
353
354 pub(crate) fn try_for_each_shard<F: FnMut(&mut Inner) -> io::Result<()>>(
356 &self,
357 mut f: F,
358 ) -> io::Result<()> {
359 for s in self.shards.iter() {
360 f(&mut lock_write(s))?;
361 }
362 Ok(())
363 }
364}
365
366fn save_shard_snapshot(shard: &RwLock<Inner>, path: &std::path::Path) -> io::Result<()> {
379 let (view, reset_tmp) = freeze_for_save(shard)?;
380 let tmp = match kevy_persist::write_snapshot_tmp(&view, path) {
381 Ok(t) => t,
382 Err(e) => {
383 if reset_tmp.is_some()
384 && let Some(aof) = &mut lock_write(shard).aof
385 {
386 aof.abort_concurrent_rewrite();
387 }
388 return Err(e);
389 }
390 };
391 let mut g = lock_write(shard);
392 std::fs::rename(&tmp, path)?;
393 if let (Some(reset), Some(aof)) = (reset_tmp, &mut g.aof) {
394 let swap = kevy_persist::write_aof_base(&reset)
395 .and_then(|()| aof.finish_concurrent_rewrite(&reset, 0));
396 if let Err(e) = swap {
397 aof.abort_concurrent_rewrite();
398 let _ = std::fs::remove_file(&reset);
399 return Err(e);
400 }
401 }
402 Ok(())
403}
404
405fn freeze_for_save(
410 shard: &RwLock<Inner>,
411) -> io::Result<(kevy_store::SnapshotView, Option<std::path::PathBuf>)> {
412 for _ in 0..2000 {
413 {
414 let mut g = lock_write(shard);
415 let Inner { store, aof, .. } = &mut *g;
416 match aof {
417 Some(a) if a.is_rewriting() => {} Some(a) => {
419 let view = store.collect_snapshot();
420 return Ok((view, Some(a.begin_view_rewrite()?)));
421 }
422 None => return Ok((store.collect_snapshot(), None)),
423 }
424 }
425 std::thread::sleep(std::time::Duration::from_millis(5));
426 }
427 Err(io::Error::new(
428 io::ErrorKind::TimedOut,
429 "kevy-embedded: AOF rewrite still in flight after 10s; snapshot aborted",
430 ))
431}
432
433pub(crate) fn lock_write(shard: &RwLock<Inner>) -> RwLockWriteGuard<'_, Inner> {
434 shard.write().unwrap_or_else(|p| p.into_inner())
435}
436
437pub(crate) fn lock_read(shard: &RwLock<Inner>) -> RwLockReadGuard<'_, Inner> {
439 shard.read().unwrap_or_else(|p| p.into_inner())
440}
441
442fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
443 if let Some(aof) = aof {
444 let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
445 aof.append(&argv)?;
446 }
447 Ok(())
448}
449
450pub(crate) fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
453 log_argv(&mut inner.aof, parts)?;
454 inner.store.try_evict_after_write();
455 Ok(())
456}
457
458pub(crate) fn store_err(e: StoreError) -> io::Error {
459 io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
460}
461
462impl Drop for DropGuard {
463 fn drop(&mut self) {
464 if let Some(stop) = &self.reaper_stop {
467 stop.store(true, Ordering::Relaxed);
468 }
469 if let Some(j) = self
470 .reaper_join
471 .lock()
472 .unwrap_or_else(|p| p.into_inner())
473 .take()
474 {
475 let _ = j.join();
476 }
477 for shard in self.shards_for_flush.iter() {
478 let mut g = lock_write(shard);
479 if let Some(aof) = &mut g.aof {
480 let _ = aof.maybe_sync();
481 }
482 }
483 }
484}
485
486#[cfg(test)]
487#[path = "store_tests.rs"]
488mod tests;
489#[cfg(test)]
490#[path = "store_tests_shard.rs"]
491mod tests_shard;